summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authoriximeow <me@iximeow.net>2023-07-01 14:08:00 -0700
committeriximeow <me@iximeow.net>2023-07-01 14:08:00 -0700
commitec5a274436bc8dda0b55d2c4da1411ff3c52434d (patch)
tree34bc1890b2a0cacd4d9e941e7b2221afe39ecfe9 /src
parent4657e736b6067c4dd0e25ad14253fdb8febffd89 (diff)
add a notion of runs distinct from jobs, lets see how well this goes over
Diffstat (limited to 'src')
-rw-r--r--src/ci_ctl.rs17
-rw-r--r--src/ci_driver.rs124
-rw-r--r--src/ci_runner.rs2
-rw-r--r--src/dbctx.rs216
-rw-r--r--src/main.rs140
-rw-r--r--src/sql.rs127
6 files changed, 340 insertions, 286 deletions
diff --git a/src/ci_ctl.rs b/src/ci_ctl.rs
index 36eada5..15feca9 100644
--- a/src/ci_ctl.rs
+++ b/src/ci_ctl.rs
@@ -5,7 +5,6 @@ mod dbctx;
mod notifier;
mod io;
-use sql::JobState;
use dbctx::DbCtx;
use notifier::NotifierConfig;
@@ -82,28 +81,26 @@ fn main() {
JobAction::List => {
let db = DbCtx::new(&config_path, &db_path);
let mut conn = db.conn.lock().unwrap();
- let mut query = conn.prepare("select id, artifacts_path, state, commit_id, created_time from jobs;").unwrap();
+ let mut query = conn.prepare(crate::sql::SELECT_ALL_RUNS_WITH_JOB_INFO).unwrap();
let mut jobs = query.query([]).unwrap();
while let Some(row) = jobs.next().unwrap() {
- let (id, artifacts, state, commit_id, created_time): (u64, Option<String>, u64, u64, u64) = row.try_into().unwrap();
+ let (job_id, run_id, state, created_time, commit_id): (u64, u64, u64, u64, u64) = row.try_into().unwrap();
- eprintln!("[+] {:04} | {: >8?} | {}", id, state, created_time);
+ eprintln!("[+] {:04} ({:04}) | {: >8?} | {} | {}", run_id, job_id, state, created_time, commit_id);
}
eprintln!("jobs");
},
JobAction::Rerun { which } => {
let db = DbCtx::new(&config_path, &db_path);
- db.conn.lock().unwrap().execute("update jobs set state=0 where id=?1", [which])
- .expect("works");
- eprintln!("[+] job {} set to pending", which);
+ let task_id = db.new_run(which as u64).expect("db can be queried");
+ eprintln!("[+] rerunning job {} as task {}", which, task_id);
}
JobAction::RerunCommit { commit } => {
let db = DbCtx::new(&config_path, &db_path);
let job_id = db.job_for_commit(&commit).unwrap();
if let Some(job_id) = job_id {
- db.conn.lock().unwrap().execute("update jobs set state=0 where id=?1", [job_id])
- .expect("works");
- eprintln!("[+] job {} (commit {}) set to pending", job_id, commit);
+ let task_id = db.new_run(job_id).expect("db can be queried");
+ eprintln!("[+] rerunning job {} (commit {}) as task {}", job_id, commit, task_id);
} else {
eprintln!("[-] no job for commit {}", commit);
}
diff --git a/src/ci_driver.rs b/src/ci_driver.rs
index 0f629e7..25928ae 100644
--- a/src/ci_driver.rs
+++ b/src/ci_driver.rs
@@ -28,17 +28,17 @@ mod sql;
mod notifier;
mod io;
-use crate::dbctx::{DbCtx, PendingJob};
+use crate::dbctx::{DbCtx, PendingRun, Job};
use crate::sql::JobResult;
-use crate::sql::JobState;
+use crate::sql::RunState;
lazy_static! {
static ref AUTH_SECRET: RwLock<Option<String>> = RwLock::new(None);
}
-fn reserve_artifacts_dir(job: u64) -> std::io::Result<PathBuf> {
- let mut path: PathBuf = "/root/ixi_ci_server/jobs/".into();
- path.push(job.to_string());
+fn reserve_artifacts_dir(run: u64) -> std::io::Result<PathBuf> {
+ let mut path: PathBuf = "/root/ixi_ci_server/artifacts/".into();
+ path.push(run.to_string());
match std::fs::create_dir(&path) {
Ok(()) => {
Ok(path)
@@ -53,9 +53,12 @@ fn reserve_artifacts_dir(job: u64) -> std::io::Result<PathBuf> {
}
}
-async fn activate_job(dbctx: Arc<DbCtx>, job: &PendingJob, clients: &mut mpsc::Receiver<RunnerClient>) -> Result<(), String> {
- eprintln!("activating job {:?}", job);
+async fn activate_run(dbctx: Arc<DbCtx>, run: &PendingRun, clients: &mut mpsc::Receiver<RunnerClient>) -> Result<(), String> {
+ eprintln!("activating task {:?}", run);
+ let job = dbctx.job_by_id(run.job_id).expect("can query").expect("job exists");
+
let connection = dbctx.conn.lock().unwrap();
+
let (repo_id, remote_git_url): (u64, String) = connection
.query_row("select repo_id, remote_git_url from remotes where id=?1", [job.remote_id], |row| Ok((row.get_unwrap(0), row.get_unwrap(1))))
.expect("query succeeds");
@@ -73,52 +76,20 @@ async fn activate_job(dbctx: Arc<DbCtx>, job: &PendingJob, clients: &mut mpsc::R
.expect("can run query");
std::mem::drop(connection);
- let artifacts: PathBuf = match &job.artifacts {
- Some(artifacts) => PathBuf::from(artifacts),
- None => reserve_artifacts_dir(job.id).expect("can reserve a directory for artifacts")
- };
- /*
-
- if job.run_host.as_ref() == None {
- eprintln!("need to find a host to run the job");
- }
-
- eprintln!("cloning {}", remote_git_url);
- let mut repo_dir = artifacts.clone();
- repo_dir.push("repo");
- eprintln!(" ... into {}", repo_dir.display());
-
- Command::new("git")
- .arg("clone")
- .arg(&remote_git_url)
- .arg(&format!("{}", repo_dir.display()))
- .status()
- .expect("can clone the repo");
-
- eprintln!("checking out {}", commit_sha);
- Command::new("git")
- .current_dir(&repo_dir)
- .arg("checkout")
- .arg(&commit_sha)
- .status()
- .expect("can checkout hash");
- */
+ let artifacts: PathBuf = reserve_artifacts_dir(run.id).expect("can reserve a directory for artifacts");
eprintln!("running {}", repo_name);
- /*
- * find the CI script, figure out how to run it
- */
let mut client_job = loop {
let mut candidate = clients.recv().await
.ok_or_else(|| "client channel disconnected".to_string())?;
- if !candidate.will_accept(job) {
+ if !candidate.will_accept(&job) {
eprintln!("client {:?} would not accept job {:?}", candidate, job);
continue;
}
- if let Ok(Some(mut client_job)) = candidate.submit(&dbctx, &job, &remote_git_url, &commit_sha).await {
+ if let Ok(Some(mut client_job)) = candidate.submit(&dbctx, &run, &remote_git_url, &commit_sha).await {
break client_job;
} else {
// failed to submit job, move on for now
@@ -168,8 +139,8 @@ struct ClientJob {
dbctx: Arc<DbCtx>,
remote_git_url: String,
sha: String,
- job: PendingJob,
- client: RunnerClient
+ task: PendingRun,
+ client: RunnerClient
}
impl ClientJob {
@@ -179,49 +150,50 @@ impl ClientJob {
let msg = match self.client.recv().await.expect("recv works") {
Some(msg) => msg,
None => {
- eprintln!("client hung up. job's done, i hope?");
+ eprintln!("client hung up. task's done, i hope?");
return;
}
};
eprintln!("got {:?}", msg);
let msg_kind = msg.as_object().unwrap().get("kind").unwrap().as_str().unwrap();
match msg_kind {
- "new_job_please" => {
- eprintln!("misdirected job request (after handshake?)");
+ "new_task_please" => {
+ eprintln!("misdirected task request (after handshake?)");
return;
},
- "job_status" => {
+ "task_status" => {
let state = msg.as_object().unwrap().get("state").unwrap().as_str().unwrap();
- let (result, state): (Result<String, String>, JobState) = if state == "finished" {
+ let (result, state): (Result<String, String>, RunState) = if state == "finished" {
let result = msg.as_object().unwrap().get("result").unwrap().as_str().unwrap();
- eprintln!("job update: state is {} and result is {}", state, result);
+ eprintln!("task update: state is {} and result is {}", state, result);
match result {
"pass" => {
- (Ok("success".to_string()), JobState::Finished)
+ (Ok("success".to_string()), RunState::Finished)
},
other => {
let desc = msg.as_object().unwrap().get("desc")
.map(|x| x.as_str().unwrap().to_string())
.unwrap_or_else(|| other.to_string());
- (Err(desc), JobState::Error)
+ (Err(desc), RunState::Error)
}
}
} else if state == "interrupted" {
let result = msg.as_object().unwrap().get("result").unwrap().as_str().unwrap();
- eprintln!("job update: state is {} and result is {}", state, result);
+ eprintln!("task update: state is {} and result is {}", state, result);
let desc = msg.as_object().unwrap().get("desc")
.map(|x| x.as_str().unwrap().to_string())
.unwrap_or_else(|| result.to_string());
- (Err(desc), JobState::Error)
+ (Err(desc), RunState::Error)
} else {
- eprintln!("job update: state is {}", state);
- (Err(format!("atypical completion status: {}", state)), JobState::Invalid)
+ eprintln!("task update: state is {}", state);
+ (Err(format!("atypical completion status: {}", state)), RunState::Invalid)
};
- let repo_id = self.dbctx.repo_id_by_remote(self.job.remote_id).unwrap().expect("remote exists");
+ let job = self.dbctx.job_by_id(self.task.job_id).expect("can query").expect("job exists");
+ let repo_id = self.dbctx.repo_id_by_remote(job.remote_id).unwrap().expect("remote exists");
for notifier in self.dbctx.notifiers_by_repo(repo_id).expect("can get notifiers") {
- if let Err(e) = notifier.tell_complete_job(&self.dbctx, repo_id, &self.sha, self.job.id, result.clone()).await {
+ if let Err(e) = notifier.tell_complete_job(&self.dbctx, repo_id, &self.sha, self.task.id, result.clone()).await {
eprintln!("could not notify {:?}: {:?}", notifier.remote_path, e);
}
}
@@ -242,8 +214,8 @@ impl ClientJob {
};
self.dbctx.conn.lock().unwrap().execute(
- "update jobs set complete_time=?1, state=?2, build_result=?3, final_status=?4 where id=?5",
- (now as u64, state as u64, build_result as u8, result_desc, self.job.id)
+ "update runs set complete_time=?1, state=?2, build_result=?3, final_status=?4 where id=?5",
+ (now as u64, state as u64, build_result as u8, result_desc, self.task.id)
)
.expect("can update");
}
@@ -258,7 +230,7 @@ impl ClientJob {
let name = msg.as_object().unwrap().get("name").unwrap().as_str().unwrap();
let value = msg.as_object().unwrap().get("value").unwrap().as_str().unwrap();
- self.dbctx.insert_metric(self.job.id, name, value)
+ self.dbctx.insert_metric(self.task.id, name, value)
.expect("TODO handle metric insert error?");
}
"command" => {
@@ -311,7 +283,7 @@ impl RunnerClient {
}
// is this client willing to run the job based on what it has told us so far?
- fn will_accept(&self, job: &PendingJob) -> bool {
+ fn will_accept(&self, job: &Job) -> bool {
match (job.source.as_ref(), self.accepted_sources.as_ref()) {
(_, None) => true,
(None, Some(_)) => false,
@@ -321,7 +293,7 @@ impl RunnerClient {
}
}
- async fn submit(mut self, dbctx: &Arc<DbCtx>, job: &PendingJob, remote_git_url: &str, sha: &str) -> Result<Option<ClientJob>, String> {
+ async fn submit(mut self, dbctx: &Arc<DbCtx>, job: &PendingRun, remote_git_url: &str, sha: &str) -> Result<Option<ClientJob>, String> {
self.send(serde_json::json!({
"commit": sha,
"remote_url": remote_git_url,
@@ -334,7 +306,7 @@ impl RunnerClient {
}) {
eprintln!("resp: {:?}", resp);
Ok(Some(ClientJob {
- job: job.clone(),
+ task: job.clone(),
dbctx: Arc::clone(dbctx),
sha: sha.to_string(),
remote_git_url: remote_git_url.to_string(),
@@ -364,18 +336,18 @@ impl fmt::Debug for RunnerClient {
#[axum_macros::debug_handler]
async fn handle_artifact(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClient>)>, headers: HeaderMap, artifact_content: BodyStream) -> impl IntoResponse {
eprintln!("artifact request");
- let job_token = match headers.get("x-job-token") {
- Some(job_token) => job_token.to_str().expect("valid string"),
+ let run_token = match headers.get("x-task-token") {
+ Some(run_token) => run_token.to_str().expect("valid string"),
None => {
- eprintln!("bad artifact post: headers: {:?}\nno x-job-token", headers);
+ eprintln!("bad artifact post: headers: {:?}\nno x-tasak-token", headers);
return (StatusCode::BAD_REQUEST, "").into_response();
}
};
- let (job, artifact_path, token_validity) = match ctx.0.job_for_token(&job_token).unwrap() {
+ let (run, artifact_path, token_validity) = match ctx.0.run_for_token(&run_token).unwrap() {
Some(result) => result,
None => {
- eprintln!("bad artifact post: headers: {:?}\njob token is not known", headers);
+ eprintln!("bad artifact post: headers: {:?}\nrun token is not known", headers);
return (StatusCode::BAD_REQUEST, "").into_response();
}
};
@@ -408,7 +380,7 @@ async fn handle_artifact(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClien
}
};
- let mut artifact = match ctx.0.reserve_artifact(job, artifact_name, artifact_desc).await {
+ let mut artifact = match ctx.0.reserve_artifact(run, artifact_name, artifact_desc).await {
Ok(artifact) => artifact,
Err(err) => {
eprintln!("failure to reserve artifact: {:?}", err);
@@ -442,7 +414,7 @@ async fn handle_next_job(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClien
}
}
None => {
- eprintln!("bad artifact post: headers: {:?}\nno x-job-token", headers);
+ eprintln!("bad artifact post: headers: {:?}\nno authorization", headers);
return (StatusCode::BAD_REQUEST, "").into_response();
}
};
@@ -533,13 +505,13 @@ async fn main() {
dbctx.create_tables().unwrap();
loop {
- let jobs = dbctx.get_pending_jobs().unwrap();
+ let runs = dbctx.get_pending_runs().unwrap();
- if jobs.len() > 0 {
- println!("{} new jobs", jobs.len());
+ if runs.len() > 0 {
+ println!("{} new runs", runs.len());
- for job in jobs.into_iter() {
- activate_job(Arc::clone(&dbctx), &job, &mut channel).await;
+ for run in runs.into_iter() {
+ activate_run(Arc::clone(&dbctx), &run, &mut channel).await;
}
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
diff --git a/src/ci_runner.rs b/src/ci_runner.rs
index 4052ba5..330490c 100644
--- a/src/ci_runner.rs
+++ b/src/ci_runner.rs
@@ -129,7 +129,7 @@ impl RunningJob {
let (mut sender, body) = hyper::Body::channel();
let resp = self.client.http.post("https://ci.butactuallyin.space:9876/api/artifact")
.header("user-agent", "ci-butactuallyin-space-runner")
- .header("x-job-token", &self.job.build_token)
+ .header("x-task-token", &self.job.build_token)
.header("x-artifact-name", name)
.header("x-artifact-desc", desc)
.body(body)
diff --git a/src/dbctx.rs b/src/dbctx.rs
index dcca362..331e55f 100644
--- a/src/dbctx.rs
+++ b/src/dbctx.rs
@@ -36,34 +36,51 @@ pub struct Remote {
pub notifier_config_path: String,
}
+// a job tracks when we became aware of a commit from remote. typically a job will have a 1-1
+// relationship with commits, and potentially many *runs* of that job.
#[derive(Debug, Clone)]
pub struct Job {
pub id: u64,
- pub artifacts_path: Option<String>,
- pub state: sql::JobState,
- pub run_host: Option<String>,
pub remote_id: u64,
pub commit_id: u64,
pub created_time: u64,
+ pub source: Option<String>,
+}
+
+// a run tracks the intent or obligation to have some runner somewhere run a goodfile and report
+// results. a job may have many runs from many different hosts rebuliding history, or reruns of the
+// same job on the same hardware to collect more datapoints on the operation.
+#[derive(Debug, Clone)]
+pub struct Run {
+ pub id: u64,
+ pub job_id: u64,
+ pub artifacts_path: Option<String>,
+ pub state: sql::RunState,
+ pub run_host: Option<String>,
+ pub create_time: u64,
pub start_time: Option<u64>,
pub complete_time: Option<u64>,
pub build_token: Option<String>,
- pub job_timeout: Option<u64>,
- pub source: Option<String>,
+ pub run_timeout: Option<u64>,
pub build_result: Option<u8>,
pub final_text: Option<String>,
}
+impl Run {
+ fn into_pending_run(self) -> PendingRun {
+ PendingRun {
+ id: self.id,
+ job_id: self.job_id,
+ create_time: self.create_time,
+ }
+ }
+}
+
#[derive(Debug, Clone)]
-pub struct PendingJob {
+pub struct PendingRun {
pub id: u64,
- pub artifacts: Option<String>,
- pub state: sql::JobState,
- pub run_host: Option<String>,
- pub remote_id: u64,
- pub commit_id: u64,
- pub created_time: u64,
- pub source: Option<String>,
+ pub job_id: u64,
+ pub create_time: u64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -76,7 +93,7 @@ pub enum TokenValidity {
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MetricRecord {
pub id: u64,
- pub job_id: u64,
+ pub run_id: u64,
pub name: String,
pub value: String
}
@@ -84,7 +101,7 @@ pub struct MetricRecord {
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ArtifactRecord {
pub id: u64,
- pub job_id: u64,
+ pub run_id: u64,
pub name: String,
pub desc: String,
pub created_time: u64,
@@ -113,12 +130,12 @@ impl DbCtx {
Ok(())
}
- pub fn insert_metric(&self, job_id: u64, name: &str, value: &str) -> Result<(), String> {
+ pub fn insert_metric(&self, run_id: u64, name: &str, value: &str) -> Result<(), String> {
let conn = self.conn.lock().unwrap();
conn
.execute(
- "insert into metrics (job_id, name, value) values (?1, ?2, ?3) on conflict (job_id, name) do update set value=excluded.value",
- (job_id, name, value)
+ "insert into metrics (run_id, name, value) values (?1, ?2, ?3) on conflict (run_id, name) do update set value=excluded.value",
+ (run_id, name, value)
)
.expect("can upsert");
Ok(())
@@ -163,7 +180,7 @@ impl DbCtx {
})
}
- pub async fn reserve_artifact(&self, job_id: u64, name: &str, desc: &str) -> Result<ArtifactDescriptor, String> {
+ pub async fn reserve_artifact(&self, run_id: u64, name: &str, desc: &str) -> Result<ArtifactDescriptor, String> {
let artifact_id = {
let created_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
@@ -172,8 +189,8 @@ impl DbCtx {
let conn = self.conn.lock().unwrap();
conn
.execute(
- "insert into artifacts (job_id, name, desc, created_time) values (?1, ?2, ?3, ?4)",
- (job_id, name, desc, created_time)
+ "insert into artifacts (run_id, name, desc, created_time) values (?1, ?2, ?3, ?4)",
+ (run_id, name, desc, created_time)
)
.map_err(|e| {
format!("{:?}", e)
@@ -182,17 +199,17 @@ impl DbCtx {
conn.last_insert_rowid() as u64
};
- ArtifactDescriptor::new(job_id, artifact_id).await
+ ArtifactDescriptor::new(run_id, artifact_id).await
}
- pub fn lookup_artifact(&self, job_id: u64, artifact_id: u64) -> Result<Option<ArtifactRecord>, String> {
+ pub fn lookup_artifact(&self, run_id: u64, artifact_id: u64) -> Result<Option<ArtifactRecord>, String> {
let conn = self.conn.lock().unwrap();
conn
- .query_row(sql::ARTIFACT_BY_ID, [artifact_id, job_id], |row| {
- let (id, job_id, name, desc, created_time, completed_time) = row.try_into().unwrap();
+ .query_row(sql::ARTIFACT_BY_ID, [artifact_id, run_id], |row| {
+ let (id, run_id, name, desc, created_time, completed_time) = row.try_into().unwrap();
Ok(ArtifactRecord {
- id, job_id, name, desc, created_time, completed_time
+ id, run_id, name, desc, created_time, completed_time
})
})
.optional()
@@ -222,11 +239,11 @@ impl DbCtx {
.map_err(|e| e.to_string())
}
- pub fn job_for_token(&self, token: &str) -> Result<Option<(u64, Option<String>, TokenValidity)>, String> {
+ pub fn run_for_token(&self, token: &str) -> Result<Option<(u64, Option<String>, TokenValidity)>, String> {
self.conn.lock()
.unwrap()
.query_row(
- "select id, artifacts_path, started_time, job_timeout from jobs where build_token=?1",
+ "select id, artifacts_path, started_time, run_timeout from runs where build_token=?1",
[token],
|row| {
let timeout: Option<u64> = row.get(3).unwrap();
@@ -254,6 +271,20 @@ impl DbCtx {
.map_err(|e| e.to_string())
}
+ pub fn job_by_id(&self, id: u64) -> Result<Option<Job>, String> {
+ self.conn.lock()
+ .unwrap()
+ .query_row(crate::sql::JOB_BY_ID, [id], |row| {
+ let (id, source, created_time, remote_id, commit_id) = row.try_into().unwrap();
+
+ Ok(Job {
+ id, source, created_time, remote_id, commit_id
+ })
+ })
+ .optional()
+ .map_err(|e| e.to_string())
+ }
+
pub fn remote_by_id(&self, id: u64) -> Result<Option<Remote>, String> {
self.conn.lock()
.unwrap()
@@ -322,40 +353,62 @@ impl DbCtx {
let conn = self.conn.lock().unwrap();
let rows_modified = conn.execute(
- "insert into jobs (state, remote_id, commit_id, created_time, source) values (?1, ?2, ?3, ?4, ?5);",
- (crate::sql::JobState::Pending as u64, remote_id, commit_id, created_time, pusher)
+ "insert into jobs (remote_id, commit_id, created_time, source) values (?1, ?2, ?3, ?4, ?5);",
+ (remote_id, commit_id, created_time, pusher)
).unwrap();
assert_eq!(1, rows_modified);
- Ok(conn.last_insert_rowid() as u64)
+ let job_id = conn.last_insert_rowid() as u64;
+
+ Ok(job_id)
}
- pub fn metrics_for_job(&self, job: u64) -> Result<Vec<MetricRecord>, String> {
+ pub fn new_run(&self, job_id: u64) -> Result<u64, String> {
+ let created_time = SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .expect("now is before epoch")
+ .as_millis() as u64;
+
let conn = self.conn.lock().unwrap();
- let mut metrics_query = conn.prepare(sql::METRICS_FOR_JOB).unwrap();
- let mut result = metrics_query.query([job]).unwrap();
+ let rows_modified = conn.execute(
+ "insert into runs (job_id, state, created_time) values (?1, ?2, ?3);",
+ (job_id, crate::sql::RunState::Pending as u64, created_time)
+ ).unwrap();
+
+ assert_eq!(1, rows_modified);
+
+ let run_id = conn.last_insert_rowid() as u64;
+
+ Ok(run_id)
+ }
+
+ pub fn metrics_for_run(&self, run: u64) -> Result<Vec<MetricRecord>, String> {
+ let conn = self.conn.lock().unwrap();
+
+ let mut metrics_query = conn.prepare(sql::METRICS_FOR_RUN).unwrap();
+ let mut result = metrics_query.query([run]).unwrap();
let mut metrics = Vec::new();
while let Some(row) = result.next().unwrap() {
- let (id, job_id, name, value): (u64, u64, String, String) = row.try_into().unwrap();
- metrics.push(MetricRecord { id, job_id, name, value });
+ let (id, run_id, name, value): (u64, u64, String, String) = row.try_into().unwrap();
+ metrics.push(MetricRecord { id, run_id, name, value });
}
Ok(metrics)
}
- pub fn artifacts_for_job(&self, job: u64, limit: Option<u64>) -> Result<Vec<ArtifactRecord>, String> {
+ pub fn artifacts_for_run(&self, run: u64, limit: Option<u64>) -> Result<Vec<ArtifactRecord>, String> {
let conn = self.conn.lock().unwrap();
- let mut artifacts_query = conn.prepare(sql::LAST_ARTIFACTS_FOR_JOB).unwrap();
- let mut result = artifacts_query.query([job, limit.unwrap_or(65535)]).unwrap();
+ let mut artifacts_query = conn.prepare(sql::LAST_ARTIFACTS_FOR_RUN).unwrap();
+ let mut result = artifacts_query.query([run, limit.unwrap_or(65535)]).unwrap();
let mut artifacts = Vec::new();
while let Some(row) = result.next().unwrap() {
- let (id, job_id, name, desc, created_time, completed_time): (u64, u64, String, String, u64, Option<u64>) = row.try_into().unwrap();
- artifacts.push(ArtifactRecord { id, job_id, name, desc, created_time, completed_time });
+ let (id, run_id, name, desc, created_time, completed_time): (u64, u64, String, String, u64, Option<u64>) = row.try_into().unwrap();
+ artifacts.push(ArtifactRecord { id, run_id, name, desc, created_time, completed_time });
}
Ok(artifacts)
@@ -403,23 +456,13 @@ impl DbCtx {
conn
.query_row(sql::JOB_BY_COMMIT_ID, [commit_id], |row| {
- let (id, artifacts_path, state, run_host, remote_id, commit_id, created_time, start_time, complete_time, build_token, job_timeout, source, build_result, final_text) = row.try_into().unwrap();
- let state: u8 = state;
+ let (id, source, created_time, remote_id, commit_id) = row.try_into().unwrap();
Ok(Job {
id,
- artifacts_path,
- state: state.try_into().unwrap(),
- run_host,
remote_id,
commit_id,
created_time,
- start_time,
- complete_time,
- build_token,
- job_timeout,
source,
- build_result,
- final_text,
})
})
.optional()
@@ -435,77 +478,43 @@ impl DbCtx {
let mut jobs = Vec::new();
while let Some(row) = result.next().unwrap() {
- let (id, artifacts_path, state, run_host, remote_id, commit_id, created_time, start_time, complete_time, build_token, job_timeout, source, build_result, final_text)= row.try_into().unwrap();
- let state: u8 = state;
+ let (id, source, created_time, remote_id, commit_id) = row.try_into().unwrap();
jobs.push(Job {
id,
- artifacts_path,
- state: state.try_into().unwrap(),
- run_host,
remote_id,
commit_id,
created_time,
- start_time,
- complete_time,
- build_token,
- job_timeout,
source,
- build_result,
- final_text,
});
}
Ok(jobs)
}
- pub fn get_active_jobs(&self) -> Result<Vec<Job>, String> {
+ pub fn get_active_runs(&self) -> Result<Vec<Run>, String> {
let conn = self.conn.lock().unwrap();
- let mut started_query = conn.prepare(sql::ACTIVE_JOBS).unwrap();
- let mut jobs = started_query.query([]).unwrap();
+ let mut started_query = conn.prepare(sql::ACTIVE_RUNS).unwrap();
+ let mut runs = started_query.query([]).unwrap();
let mut started = Vec::new();
- while let Some(row) = jobs.next().unwrap() {
- let (id, artifacts_path, state, run_host, remote_id, commit_id, created_time, start_time, complete_time, build_token, job_timeout, source, build_result, final_text) = row.try_into().unwrap();
- let state: u8 = state;
-
- started.push(Job {
- id,
- artifacts_path,
- state: state.try_into().unwrap(),
- run_host,
- remote_id,
- commit_id,
- created_time,
- start_time,
- complete_time,
- build_token,
- job_timeout,
- source,
- build_result,
- final_text,
- });
+ while let Some(row) = runs.next().unwrap() {
+ started.push(crate::sql::row2run(row));
}
Ok(started)
}
- pub fn get_pending_jobs(&self) -> Result<Vec<PendingJob>, String> {
+ pub fn get_pending_runs(&self) -> Result<Vec<PendingRun>, String> {
let conn = self.conn.lock().unwrap();
- let mut pending_query = conn.prepare(sql::PENDING_JOBS).unwrap();
- let mut jobs = pending_query.query([]).unwrap();
+ let mut pending_query = conn.prepare(sql::PENDING_RUNS).unwrap();
+ let mut runs = pending_query.query([]).unwrap();
let mut pending = Vec::new();
- while let Some(row) = jobs.next().unwrap() {
- let (id, artifacts, state, run_host, remote_id, commit_id, created_time, source) = row.try_into().unwrap();
- let state: u8 = state;
- pending.push(PendingJob {
- id, artifacts,
- state: state.try_into().unwrap(),
- run_host, remote_id, commit_id, created_time,
- source,
- });
+ while let Some(row) = runs.next().unwrap() {
+ let run = crate::sql::row2run(row).into_pending_run();
+ pending.push(run);
}
Ok(pending)
@@ -526,6 +535,17 @@ impl DbCtx {
Ok(remotes)
}
+ pub fn last_run_for_job(&self, job_id: u64) -> Result<Option<Run>, String> {
+ let conn = self.conn.lock().unwrap();
+
+ conn
+ .query_row(sql::LAST_RUN_FOR_JOB, [job_id], |row| {
+ Ok(crate::sql::row2run(row))
+ })
+ .optional()
+ .map_err(|e| e.to_string())
+ }
+
pub fn notifiers_by_repo(&self, repo_id: u64) -> Result<Vec<RemoteNotifier>, String> {
let remotes = self.remotes_by_repo(repo_id)?;
diff --git a/src/main.rs b/src/main.rs
index 5f719a0..f008a56 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -35,9 +35,9 @@ mod sql;
mod notifier;
mod dbctx;
-use sql::JobState;
+use sql::RunState;
-use dbctx::{DbCtx, Job, ArtifactRecord};
+use dbctx::{DbCtx, Job, Run, ArtifactRecord};
use rusqlite::OptionalExtension;
@@ -148,14 +148,14 @@ fn job_url(job: &Job, commit_sha: &str, ctx: &Arc<DbCtx>) -> String {
format!("{}/{}", &remote.remote_path, commit_sha)
}
-/// render how long a job took, or is taking, in a human-friendly way
-fn display_job_time(job: &Job) -> String {
- if let Some(start_time) = job.start_time {
- if let Some(complete_time) = job.complete_time {
+/// render how long a run took, or is taking, in a human-friendly way
+fn display_run_time(run: &Run) -> String {
+ if let Some(start_time) = run.start_time {
+ if let Some(complete_time) = run.complete_time {
if complete_time < start_time {
- if job.state == JobState::Started {
- // this job has been restarted. the completed time is stale.
- // further, this is a currently active job.
+ if run.state == RunState::Started {
+ // this run has been restarted. the completed time is stale.
+ // further, this is a currently active run.
let now_ms = SystemTime::now().duration_since(UNIX_EPOCH).expect("now is after then").as_millis() as u64;
let mut duration = duration_as_human_string(now_ms - start_time);
duration.push_str(" (ongoing)");
@@ -332,21 +332,23 @@ async fn handle_ci_index(State(ctx): State<WebserverState>) -> impl IntoResponse
let mut row_num = 0;
for repo in repos {
- let mut most_recent_job: Option<Job> = None;
+ let mut most_recent_run: Option<(Job, Run)> = None;
for remote in ctx.dbctx.remotes_by_repo(repo.id).expect("remotes by repo works") {
let last_job = ctx.dbctx.last_job_from_remote(remote.id).expect("job by remote works");
if let Some(last_job) = last_job {
- if most_recent_job.as_ref().map(|job| job.created_time < last_job.created_time).unwrap_or(true) {
- most_recent_job = Some(last_job);
+ if let Some(last_run) = ctx.dbctx.last_run_for_job(last_job.id).expect("can query") {
+ if most_recent_run.as_ref().map(|run| run.1.create_time < last_run.create_time).unwrap_or(true) {
+ most_recent_run = Some((last_job, last_run));
+ }
}
}
}
let repo_html = format!("<a href=\"/{}\">{}</a>", &repo.name, &repo.name);
- let row_html: String = match most_recent_job {
- Some(job) => {
+ let row_html: String = match most_recent_run {
+ Some((job, run)) => {
let job_commit = ctx.dbctx.commit_sha(job.commit_id).expect("job has a commit");
let commit_html = match commit_url(&job, &job_commit, &ctx.dbctx) {
Some(url) => format!("<a href=\"{}\">{}</a>", url, &job_commit),
@@ -355,17 +357,17 @@ async fn handle_ci_index(State(ctx): State<WebserverState>) -> impl IntoResponse
let job_html = format!("<a href=\"{}\">{}</a>", job_url(&job, &job_commit, &ctx.dbctx), job.id);
- let last_build_time = Utc.timestamp_millis_opt(job.created_time as i64).unwrap().to_rfc2822();
- let duration = display_job_time(&job);
+ let last_build_time = Utc.timestamp_millis_opt(run.create_time as i64).unwrap().to_rfc2822();
+ let duration = display_run_time(&run);
- let status = format!("{:?}", job.state).to_lowercase();
+ let status = format!("{:?}", run.state).to_lowercase();
- let result = match job.build_result {
+ let result = match run.build_result {
Some(0) => "<span style='color:green;'>pass</span>",
Some(_) => "<span style='color:red;'>fail</span>",
- None => match job.state {
- JobState::Pending => { "unstarted" },
- JobState::Started => { "<span style='color:darkgoldenrod;'>in progress</span>" },
+ None => match run.state {
+ RunState::Pending => { "unstarted" },
+ RunState::Started => { "<span style='color:darkgoldenrod;'>in progress</span>" },
_ => { "<span style='color:red;'>unreported</span>" }
}
};
@@ -401,10 +403,10 @@ async fn handle_ci_index(State(ctx): State<WebserverState>) -> impl IntoResponse
}
response.push_str("</table>");
- response.push_str("<h4>active jobs</h4>\n");
+ response.push_str("<h4>active tasks</h4>\n");
- let jobs = ctx.dbctx.get_active_jobs().expect("can query");
- if jobs.len() == 0 {
+ let runs = ctx.dbctx.get_active_runs().expect("can query");
+ if runs.len() == 0 {
response.push_str("<p>(none)</p>\n");
} else {
response.push_str("<table class='build-table'>");
@@ -417,9 +419,10 @@ async fn handle_ci_index(State(ctx): State<WebserverState>) -> impl IntoResponse
let mut row_num = 0;
- for job in jobs.iter() {
+ for run in runs.iter() {
let row_index = row_num % 2;
+ let job = ctx.dbctx.job_by_id(run.job_id).expect("query succeeds").expect("job id is valid");
let remote = ctx.dbctx.remote_by_id(job.remote_id).expect("query succeeds").expect("remote id is valid");
let repo = ctx.dbctx.repo_by_id(remote.repo_id).expect("query succeeds").expect("repo id is valid");
@@ -433,17 +436,17 @@ async fn handle_ci_index(State(ctx): State<WebserverState>) -> impl IntoResponse
let job_html = format!("<a href=\"{}\">{}</a>", job_url(&job, &job_commit, &ctx.dbctx), job.id);
- let last_build_time = Utc.timestamp_millis_opt(job.created_time as i64).unwrap().to_rfc2822();
- let duration = display_job_time(&job);
+ let last_build_time = Utc.timestamp_millis_opt(run.create_time as i64).unwrap().to_rfc2822();
+ let duration = display_run_time(&run);
- let status = format!("{:?}", job.state).to_lowercase();
+ let status = format!("{:?}", run.state).to_lowercase();
- let result = match job.build_result {
+ let result = match run.build_result {
Some(0) => "<span style='color:green;'>pass</span>",
Some(_) => "<span style='color:red;'>fail</span>",
- None => match job.state {
- JobState::Pending => { "unstarted" },
- JobState::Started => { "<span style='color:darkgoldenrod;'>in progress</span>" },
+ None => match run.state {
+ RunState::Pending => { "unstarted" },
+ RunState::Started => { "<span style='color:darkgoldenrod;'>in progress</span>" },
_ => { "<span style='color:red;'>unreported</span>" }
}
};
@@ -498,9 +501,11 @@ async fn handle_commit_status(Path(path): Path<(String, String, String)>, State(
let job = ctx.dbctx.job_by_commit_id(commit_id).expect("can query").expect("job exists");
- let complete_time = job.complete_time.unwrap_or_else(crate::io::now_ms);
+ let run = ctx.dbctx.last_run_for_job(job.id).expect("can query").expect("run exists");
+
+ let complete_time = run.complete_time.unwrap_or_else(crate::io::now_ms);
- let debug_info = job.state == JobState::Finished && job.build_result == Some(1) || job.state == JobState::Error;
+ let debug_info = run.state == RunState::Finished && run.build_result == Some(1) || run.state == RunState::Error;
let repo_name: String = ctx.dbctx.conn.lock().unwrap()
.query_row("select repo_name from repos where id=?1;", [repo_id], |row| row.get(0))
@@ -511,42 +516,42 @@ async fn handle_commit_status(Path(path): Path<(String, String, String)>, State(
let head = format!("<head><title>ci.butactuallyin.space - {}</title></head>", repo_name);
let repo_html = format!("<a href=\"/{}\">{}</a>", &repo_name, &repo_name);
let remote_commit_elem = format!("<a href=\"https://www.github.com/{}/commit/{}\">{}</a>", &remote_path, &sha, &sha);
- let status_elem = match job.state {
- JobState::Pending | JobState::Started => {
+ let status_elem = match run.state {
+ RunState::Pending | RunState::Started => {
"<span style='color:#660;'>pending</span>"
},
- JobState::Finished => {
- if let Some(build_result) = job.build_result {
+ RunState::Finished => {
+ if let Some(build_result) = run.build_result {
if build_result == 0 {
"<span style='color:green;'>pass</span>"
} else {
"<span style='color:red;'>failed</span>"
}
} else {
- eprintln!("job {} for commit {} is missing a build result but is reportedly finished (old data)?", job.id, commit_id);
+ eprintln!("run {} for commit {} is missing a build result but is reportedly finished (old data)?", run.id, commit_id);
"<span style='color:red;'>unreported</span>"
}
},
- JobState::Error => {
+ RunState::Error => {
"<span style='color:red;'>error</span>"
}
- JobState::Invalid => {
+ RunState::Invalid => {
"<span style='color:red;'>(server error)</span>"
}
};
let mut artifacts_fragment = String::new();
- let mut artifacts: Vec<ArtifactRecord> = ctx.dbctx.artifacts_for_job(job.id, None).unwrap()
- .into_iter() // HACK: filter out artifacts for previous runs of a job. artifacts should be attached to a run, runs should be distinct from jobs. but i'm sleepy.
- .filter(|artifact| artifact.created_time >= job.start_time.unwrap_or_else(crate::io::now_ms))
+ let mut artifacts: Vec<ArtifactRecord> = ctx.dbctx.artifacts_for_run(run.id, None).unwrap()
+ .into_iter() // HACK: filter out artifacts for previous runs of a run. artifacts should be attached to a run, runs should be distinct from run. but i'm sleepy.
+ .filter(|artifact| artifact.created_time >= run.start_time.unwrap_or_else(crate::io::now_ms))
.collect();
artifacts.sort_by_key(|artifact| artifact.created_time);
- fn diff_times(job_completed: u64, artifact_completed: Option<u64>) -> u64 {
+ fn diff_times(run_completed: u64, artifact_completed: Option<u64>) -> u64 {
let artifact_completed = artifact_completed.unwrap_or_else(crate::io::now_ms);
- let job_completed = std::cmp::max(job_completed, artifact_completed);
- job_completed - artifact_completed
+ let run_completed = std::cmp::max(run_completed, artifact_completed);
+ run_completed - artifact_completed
}
let recent_artifacts: Vec<ArtifactRecord> = artifacts.iter().filter(|artifact| diff_times(complete_time, artifact.completed_time) <= 60_000).cloned().collect();
@@ -556,7 +561,7 @@ async fn handle_commit_status(Path(path): Path<(String, String, String)>, State(
let created_time_str = Utc.timestamp_millis_opt(artifact.created_time as i64).unwrap().to_rfc2822();
artifacts_fragment.push_str(&format!("<div><pre style='display:inline;'>{}</pre> step: <pre style='display:inline;'>{}</pre></div>\n", created_time_str, &artifact.name));
let duration_str = duration_as_human_string(artifact.completed_time.unwrap_or_else(crate::io::now_ms) - artifact.created_time);
- let size_str = (std::fs::metadata(&format!("./jobs/{}/{}", artifact.job_id, artifact.id)).expect("metadata exists").len() / 1024).to_string();
+ let size_str = (std::fs::metadata(&format!("./artifacts/{}/{}", artifact.run_id, artifact.id)).expect("metadata exists").len() / 1024).to_string();
artifacts_fragment.push_str(&format!("<pre> {}kb in {} </pre>\n", size_str, duration_str));
}
@@ -565,18 +570,18 @@ async fn handle_commit_status(Path(path): Path<(String, String, String)>, State(
artifacts_fragment.push_str(&format!("<div><pre style='display:inline;'>{}</pre> step: <pre style='display:inline;'>{}</pre></div>\n", created_time_str, &artifact.name));
if debug_info {
artifacts_fragment.push_str("<pre>");
- artifacts_fragment.push_str(&std::fs::read_to_string(format!("./jobs/{}/{}", artifact.job_id, artifact.id)).unwrap());
+ artifacts_fragment.push_str(&std::fs::read_to_string(format!("./artifacts/{}/{}", artifact.run_id, artifact.id)).unwrap());
artifacts_fragment.push_str("</pre>\n");
} else {
let duration_str = duration_as_human_string(artifact.completed_time.unwrap_or_else(crate::io::now_ms) - artifact.created_time);
- let size_str = std::fs::metadata(&format!("./jobs/{}/{}", artifact.job_id, artifact.id)).map(|md| {
+ let size_str = std::fs::metadata(&format!("./artifacts/{}/{}", artifact.run_id, artifact.id)).map(|md| {
(md.len() / 1024).to_string()
}).unwrap_or_else(|e| format!("[{}]", e));
artifacts_fragment.push_str(&format!("<pre> {}kb in {} </pre>\n", size_str, duration_str));
}
}
- let metrics = ctx.dbctx.metrics_for_job(job.id).unwrap();
+ let metrics = ctx.dbctx.metrics_for_run(run.id).unwrap();
let metrics_section = if metrics.len() > 0 {
let mut section = String::new();
section.push_str("<div>");
@@ -599,9 +604,9 @@ async fn handle_commit_status(Path(path): Path<(String, String, String)>, State(
html.push_str(" <body>\n");
html.push_str(" <pre>\n");
html.push_str(&format!("repo: {}\n", repo_html));
- html.push_str(&format!("commit: {}, job: {}\n", remote_commit_elem, job.id));
- html.push_str(&format!("status: {} in {}\n", status_elem, display_job_time(&job)));
- if let Some(desc) = job.final_text.as_ref() {
+ html.push_str(&format!("commit: {}, run: {}\n", remote_commit_elem, run.id));
+ html.push_str(&format!("status: {} in {}\n", status_elem, display_run_time(&run)));
+ if let Some(desc) = run.final_text.as_ref() {
html.push_str(&format!(" description: {}\n ", desc));
}
html.push_str(&format!("deployed: {}\n", deployed));
@@ -620,11 +625,11 @@ async fn handle_commit_status(Path(path): Path<(String, String, String)>, State(
}
async fn handle_get_artifact(Path(path): Path<(String, String)>, State(ctx): State<WebserverState>) -> impl IntoResponse {
- eprintln!("get artifact, job={}, artifact={}", path.0, path.1);
- let job_id: u64 = path.0.parse().unwrap();
+ eprintln!("get artifact, run={}, artifact={}", path.0, path.1);
+ let run: u64 = path.0.parse().unwrap();
let artifact_id: u64 = path.1.parse().unwrap();
- let artifact_descriptor = match ctx.dbctx.lookup_artifact(job_id, artifact_id).unwrap() {
+ let artifact_descriptor = match ctx.dbctx.lookup_artifact(run, artifact_id).unwrap() {
Some(artifact) => artifact,
None => {
return (StatusCode::NOT_FOUND, Html("no such artifact")).into_response();
@@ -645,7 +650,7 @@ async fn handle_get_artifact(Path(path): Path<(String, String)>, State(ctx): Sta
let (mut tx_sender, tx_receiver) = tokio::io::duplex(65536);
let resp_body = axum_extra::body::AsyncReadBody::new(tx_receiver);
let mut artifact_path = ctx.jobs_path.clone();
- artifact_path.push(artifact_descriptor.job_id.to_string());
+ artifact_path.push(artifact_descriptor.run_id.to_string());
artifact_path.push(artifact_descriptor.id.to_string());
spawn(async move {
let mut artifact = artifact_descriptor;
@@ -661,7 +666,7 @@ async fn handle_get_artifact(Path(path): Path<(String, String)>, State(ctx): Sta
// this would be much implemented as yielding on a condvar woken when an
// inotify event on the file indicates a write has occurred. but i am
// dreadfully lazy, so we'll just uhh. busy-poll on the file? lmao.
- artifact = ctx.dbctx.lookup_artifact(artifact.job_id, artifact.id)
+ artifact = ctx.dbctx.lookup_artifact(artifact.run_id, artifact.id)
.expect("can query db")
.expect("artifact still exists");
}
@@ -724,6 +729,7 @@ async fn handle_repo_summary(Path(path): Path<String>, State(ctx): State<Webserv
let mut row_num = 0;
for job in last_builds.iter().take(10) {
+ let run = ctx.dbctx.last_run_for_job(job.id).expect("query succeeds").expect("TODO: run exists if job exists (small race if querying while creating job ....");
let job_commit = ctx.dbctx.commit_sha(job.commit_id).expect("job has a commit");
let commit_html = match commit_url(&job, &job_commit, &ctx.dbctx) {
Some(url) => format!("<a href=\"{}\">{}</a>", url, &job_commit),
@@ -732,17 +738,17 @@ async fn handle_repo_summary(Path(path): Path<String>, State(ctx): State<Webserv
let job_html = format!("<a href=\"{}\">{}</a>", job_url(&job, &job_commit, &ctx.dbctx), job.id);
- let last_build_time = Utc.timestamp_millis_opt(job.created_time as i64).unwrap().to_rfc2822();
- let duration = display_job_time(&job);
+ let last_build_time = Utc.timestamp_millis_opt(run.create_time as i64).unwrap().to_rfc2822();
+ let duration = display_run_time(&run);
- let status = format!("{:?}", job.state).to_lowercase();
+ let status = format!("{:?}", run.state).to_lowercase();
- let result = match job.build_result {
+ let result = match run.build_result {
Some(0) => "<span style='color:green;'>pass</span>",
Some(_) => "<span style='color:red;'>fail</span>",
- None => match job.state {
- JobState::Pending => { "unstarted" },
- JobState::Started => { "<span style='color:darkgoldenrod;'>in progress</span>" },
+ None => match run.state {
+ RunState::Pending => { "unstarted" },
+ RunState::Started => { "<span style='color:darkgoldenrod;'>in progress</span>" },
_ => { "<span style='color:red;'>unreported</span>" }
}
};
@@ -885,7 +891,7 @@ async fn make_app_server(jobs_path: PathBuf, cfg_path: &PathBuf, db_path: &PathB
.route("/:owner/:repo/:sha", get(handle_commit_status))
.route("/:owner", get(handle_repo_summary))
.route("/:owner/:repo", post(handle_repo_event))
- .route("/artifact/:job/:artifact_id", get(handle_get_artifact))
+ .route("/artifact/:b/:artifact_id", get(handle_get_artifact))
.route("/", get(handle_ci_index))
.fallback(fallback_get)
.with_state(WebserverState {
diff --git a/src/sql.rs b/src/sql.rs
index 91edafb..cc33ad4 100644
--- a/src/sql.rs
+++ b/src/sql.rs
@@ -2,6 +2,9 @@
use std::convert::TryFrom;
+use crate::dbctx::Run;
+use crate::dbctx::Job;
+
#[derive(Debug, Clone)]
pub enum JobResult {
Pass = 0,
@@ -9,7 +12,7 @@ pub enum JobResult {
}
#[derive(Debug, Clone, PartialEq)]
-pub enum JobState {
+pub enum RunState {
Pending = 0,
Started = 1,
Finished = 2,
@@ -17,38 +20,48 @@ pub enum JobState {
Invalid = 4,
}
-impl TryFrom<u8> for JobState {
+impl TryFrom<u8> for RunState {
type Error = String;
fn try_from(value: u8) -> Result<Self, String> {
match value {
- 0 => Ok(JobState::Pending),
- 1 => Ok(JobState::Started),
- 2 => Ok(JobState::Finished),
- 3 => Ok(JobState::Error),
- 4 => Ok(JobState::Invalid),
+ 0 => Ok(RunState::Pending),
+ 1 => Ok(RunState::Started),
+ 2 => Ok(RunState::Finished),
+ 3 => Ok(RunState::Error),
+ 4 => Ok(RunState::Invalid),
other => Err(format!("invalid job state: {}", other)),
}
}
}
+pub(crate) fn row2run(row: &rusqlite::Row) -> Run {
+ let (id, job_id, artifacts_path, state, run_host, build_token, create_time, start_time, complete_time, run_timeout, build_result, final_text) = row.try_into().unwrap();
+ let state: u8 = state;
+ Run {
+ id,
+ job_id,
+ artifacts_path,
+ state: state.try_into().unwrap(),
+ run_host,
+ create_time,
+ start_time,
+ complete_time,
+ build_token,
+ run_timeout,
+ build_result,
+ final_text,
+ }
+}
+
// remote_id is the remote from which we were notified. this is necessary so we know which remote
// to pull from to actually run the job.
pub const CREATE_JOBS_TABLE: &'static str = "\
CREATE TABLE IF NOT EXISTS jobs (id INTEGER PRIMARY KEY AUTOINCREMENT,
- artifacts_path TEXT,
- state INTEGER NOT NULL,
- run_host TEXT,
- build_token TEXT,
- remote_id INTEGER,
- commit_id INTEGER,
- created_time INTEGER,
- started_time INTEGER,
- complete_time INTEGER,
- job_timeout INTEGER,
source TEXT,
- build_result INTEGER,
- final_status TEXT);";
+ created_time INTEGER,
+ remote_id INTEGER,
+ commit_id INTEGER);";
pub const CREATE_METRICS_TABLE: &'static str = "\
CREATE TABLE IF NOT EXISTS metrics (id INTEGER PRIMARY KEY AUTOINCREMENT,
@@ -82,35 +95,63 @@ pub const CREATE_REMOTES_TABLE: &'static str = "\
pub const CREATE_ARTIFACTS_TABLE: &'static str = "\
CREATE TABLE IF NOT EXISTS artifacts (id INTEGER PRIMARY KEY AUTOINCREMENT,
- job_id INTEGER,
+ run_id INTEGER,
name TEXT,
desc TEXT,
created_time INTEGER,
completed_time INTEGER);";
+pub const CREATE_RUN_TABLE: &'static str = "\
+ CREATE TABLE IF NOT EXISTS runs (id INTEGER PRIMARY KEY AUTOINCREMENT,
+ job_id INTEGER,
+ artifacts_path TEXT,
+ state INTEGER NOT NULL,
+ run_host TEXT,
+ build_token TEXT,
+ created_time INTEGER,
+ started_time INTEGER,
+ complete_time INTEGER,
+ run_timeout INTEGER,
+ build_result INTEGER,
+ final_status TEXT);";
+
pub const CREATE_REMOTES_INDEX: &'static str = "\
CREATE INDEX IF NOT EXISTS 'repo_to_remote' ON remotes(repo_id);";
pub const CREATE_REPO_NAME_INDEX: &'static str = "\
CREATE UNIQUE INDEX IF NOT EXISTS 'repo_names' ON repos(repo_name);";
-pub const PENDING_JOBS: &'static str = "\
- select id, artifacts_path, state, run_host, remote_id, commit_id, created_time, source from jobs where state=0;";
-
-pub const ACTIVE_JOBS: &'static str = "\
- select * from jobs where state=1 or state=0;";
-
-pub const LAST_ARTIFACTS_FOR_JOB: &'static str = "\
- select * from artifacts where job_id=?1 and (name like \"%(stderr)%\" or name like \"%(stdout)%\") order by id desc limit ?2;";
+pub const PENDING_RUNS: &'static str = "\
+ select id, job_id, artifacts_path, state, run_host, created_time from runs where state=0;";
+
+pub const ACTIVE_RUNS: &'static str = "\
+ select id,
+ job_id,
+ artifacts_path,
+ state,
+ run_host,
+ build_token,
+ created_time,
+ started_time,
+ complete_time,
+ run_timeout,
+ build_result,
+ final_status from runs where state=1 or state=0;";
+
+pub const LAST_ARTIFACTS_FOR_RUN: &'static str = "\
+ select * from artifacts where run_id=?1 and (name like \"%(stderr)%\" or name like \"%(stdout)%\") order by id desc limit ?2;";
pub const JOB_BY_COMMIT_ID: &'static str = "\
- select * from jobs where commit_id=?1;";
+ select id, source, created_time, remote_id, commit_id from jobs where commit_id=?1;";
pub const ARTIFACT_BY_ID: &'static str = "\
- select * from artifacts where id=?1 and job_id=?2;";
+ select * from artifacts where id=?1 and run_id=?2;";
-pub const METRICS_FOR_JOB: &'static str = "\
- select * from metrics where job_id=?1 order by id asc;";
+pub const JOB_BY_ID: &'static str = "\
+ select id, source, created_time, remote_id, commit_id from jobs where id=?1";
+
+pub const METRICS_FOR_RUN: &'static str = "\
+ select * from metrics where run_id=?1 order by id asc;";
pub const COMMIT_TO_ID: &'static str = "\
select id from commits where sha=?1;";
@@ -122,5 +163,23 @@ pub const ALL_REPOS: &'static str = "\
select * from repos;";
pub const LAST_JOBS_FROM_REMOTE: &'static str = "\
- select * from jobs where remote_id=?1 order by created_time desc limit ?2;";
-
+ select id, source, created_time, remote_id, commit_id from jobs where remote_id=?1 order by created_time desc limit ?2;";
+
+pub const LAST_RUN_FOR_JOB: &'static str = "\
+ select id,
+ job_id,
+ artifacts_path,
+ state,
+ run_host,
+ build_token,
+ created_time,
+ started_time,
+ complete_time,
+ run_timeout,
+ build_result,
+ final_status from runs where job_id=?1;";
+
+pub const SELECT_ALL_RUNS_WITH_JOB_INFO: &'static str = "\
+ select jobs.id as job_id, runs.id as run_id, runs.state, runs.created_time, jobs.commit_id
+ from jobs join runs on jobs.id=runs.job_id
+ oder by runs.created_time asc;";