From ec5a274436bc8dda0b55d2c4da1411ff3c52434d Mon Sep 17 00:00:00 2001 From: iximeow Date: Sat, 1 Jul 2023 14:08:00 -0700 Subject: add a notion of runs distinct from jobs, lets see how well this goes over --- src/ci_ctl.rs | 17 ++--- src/ci_driver.rs | 124 +++++++++++++------------------- src/ci_runner.rs | 2 +- src/dbctx.rs | 216 ++++++++++++++++++++++++++++++------------------------- src/main.rs | 140 +++++++++++++++++++----------------- src/sql.rs | 127 +++++++++++++++++++++++--------- 6 files changed, 340 insertions(+), 286 deletions(-) diff --git a/src/ci_ctl.rs b/src/ci_ctl.rs index 36eada5..15feca9 100644 --- a/src/ci_ctl.rs +++ b/src/ci_ctl.rs @@ -5,7 +5,6 @@ mod dbctx; mod notifier; mod io; -use sql::JobState; use dbctx::DbCtx; use notifier::NotifierConfig; @@ -82,28 +81,26 @@ fn main() { JobAction::List => { let db = DbCtx::new(&config_path, &db_path); let mut conn = db.conn.lock().unwrap(); - let mut query = conn.prepare("select id, artifacts_path, state, commit_id, created_time from jobs;").unwrap(); + let mut query = conn.prepare(crate::sql::SELECT_ALL_RUNS_WITH_JOB_INFO).unwrap(); let mut jobs = query.query([]).unwrap(); while let Some(row) = jobs.next().unwrap() { - let (id, artifacts, state, commit_id, created_time): (u64, Option, u64, u64, u64) = row.try_into().unwrap(); + let (job_id, run_id, state, created_time, commit_id): (u64, u64, u64, u64, u64) = row.try_into().unwrap(); - eprintln!("[+] {:04} | {: >8?} | {}", id, state, created_time); + eprintln!("[+] {:04} ({:04}) | {: >8?} | {} | {}", run_id, job_id, state, created_time, commit_id); } eprintln!("jobs"); }, JobAction::Rerun { which } => { let db = DbCtx::new(&config_path, &db_path); - db.conn.lock().unwrap().execute("update jobs set state=0 where id=?1", [which]) - .expect("works"); - eprintln!("[+] job {} set to pending", which); + let task_id = db.new_run(which as u64).expect("db can be queried"); + eprintln!("[+] rerunning job {} as task {}", which, task_id); } JobAction::RerunCommit { commit } => { let db = DbCtx::new(&config_path, &db_path); let job_id = db.job_for_commit(&commit).unwrap(); if let Some(job_id) = job_id { - db.conn.lock().unwrap().execute("update jobs set state=0 where id=?1", [job_id]) - .expect("works"); - eprintln!("[+] job {} (commit {}) set to pending", job_id, commit); + let task_id = db.new_run(job_id).expect("db can be queried"); + eprintln!("[+] rerunning job {} (commit {}) as task {}", job_id, commit, task_id); } else { eprintln!("[-] no job for commit {}", commit); } diff --git a/src/ci_driver.rs b/src/ci_driver.rs index 0f629e7..25928ae 100644 --- a/src/ci_driver.rs +++ b/src/ci_driver.rs @@ -28,17 +28,17 @@ mod sql; mod notifier; mod io; -use crate::dbctx::{DbCtx, PendingJob}; +use crate::dbctx::{DbCtx, PendingRun, Job}; use crate::sql::JobResult; -use crate::sql::JobState; +use crate::sql::RunState; lazy_static! { static ref AUTH_SECRET: RwLock> = RwLock::new(None); } -fn reserve_artifacts_dir(job: u64) -> std::io::Result { - let mut path: PathBuf = "/root/ixi_ci_server/jobs/".into(); - path.push(job.to_string()); +fn reserve_artifacts_dir(run: u64) -> std::io::Result { + let mut path: PathBuf = "/root/ixi_ci_server/artifacts/".into(); + path.push(run.to_string()); match std::fs::create_dir(&path) { Ok(()) => { Ok(path) @@ -53,9 +53,12 @@ fn reserve_artifacts_dir(job: u64) -> std::io::Result { } } -async fn activate_job(dbctx: Arc, job: &PendingJob, clients: &mut mpsc::Receiver) -> Result<(), String> { - eprintln!("activating job {:?}", job); +async fn activate_run(dbctx: Arc, run: &PendingRun, clients: &mut mpsc::Receiver) -> Result<(), String> { + eprintln!("activating task {:?}", run); + let job = dbctx.job_by_id(run.job_id).expect("can query").expect("job exists"); + let connection = dbctx.conn.lock().unwrap(); + let (repo_id, remote_git_url): (u64, String) = connection .query_row("select repo_id, remote_git_url from remotes where id=?1", [job.remote_id], |row| Ok((row.get_unwrap(0), row.get_unwrap(1)))) .expect("query succeeds"); @@ -73,52 +76,20 @@ async fn activate_job(dbctx: Arc, job: &PendingJob, clients: &mut mpsc::R .expect("can run query"); std::mem::drop(connection); - let artifacts: PathBuf = match &job.artifacts { - Some(artifacts) => PathBuf::from(artifacts), - None => reserve_artifacts_dir(job.id).expect("can reserve a directory for artifacts") - }; - /* - - if job.run_host.as_ref() == None { - eprintln!("need to find a host to run the job"); - } - - eprintln!("cloning {}", remote_git_url); - let mut repo_dir = artifacts.clone(); - repo_dir.push("repo"); - eprintln!(" ... into {}", repo_dir.display()); - - Command::new("git") - .arg("clone") - .arg(&remote_git_url) - .arg(&format!("{}", repo_dir.display())) - .status() - .expect("can clone the repo"); - - eprintln!("checking out {}", commit_sha); - Command::new("git") - .current_dir(&repo_dir) - .arg("checkout") - .arg(&commit_sha) - .status() - .expect("can checkout hash"); - */ + let artifacts: PathBuf = reserve_artifacts_dir(run.id).expect("can reserve a directory for artifacts"); eprintln!("running {}", repo_name); - /* - * find the CI script, figure out how to run it - */ let mut client_job = loop { let mut candidate = clients.recv().await .ok_or_else(|| "client channel disconnected".to_string())?; - if !candidate.will_accept(job) { + if !candidate.will_accept(&job) { eprintln!("client {:?} would not accept job {:?}", candidate, job); continue; } - if let Ok(Some(mut client_job)) = candidate.submit(&dbctx, &job, &remote_git_url, &commit_sha).await { + if let Ok(Some(mut client_job)) = candidate.submit(&dbctx, &run, &remote_git_url, &commit_sha).await { break client_job; } else { // failed to submit job, move on for now @@ -168,8 +139,8 @@ struct ClientJob { dbctx: Arc, remote_git_url: String, sha: String, - job: PendingJob, - client: RunnerClient + task: PendingRun, + client: RunnerClient } impl ClientJob { @@ -179,49 +150,50 @@ impl ClientJob { let msg = match self.client.recv().await.expect("recv works") { Some(msg) => msg, None => { - eprintln!("client hung up. job's done, i hope?"); + eprintln!("client hung up. task's done, i hope?"); return; } }; eprintln!("got {:?}", msg); let msg_kind = msg.as_object().unwrap().get("kind").unwrap().as_str().unwrap(); match msg_kind { - "new_job_please" => { - eprintln!("misdirected job request (after handshake?)"); + "new_task_please" => { + eprintln!("misdirected task request (after handshake?)"); return; }, - "job_status" => { + "task_status" => { let state = msg.as_object().unwrap().get("state").unwrap().as_str().unwrap(); - let (result, state): (Result, JobState) = if state == "finished" { + let (result, state): (Result, RunState) = if state == "finished" { let result = msg.as_object().unwrap().get("result").unwrap().as_str().unwrap(); - eprintln!("job update: state is {} and result is {}", state, result); + eprintln!("task update: state is {} and result is {}", state, result); match result { "pass" => { - (Ok("success".to_string()), JobState::Finished) + (Ok("success".to_string()), RunState::Finished) }, other => { let desc = msg.as_object().unwrap().get("desc") .map(|x| x.as_str().unwrap().to_string()) .unwrap_or_else(|| other.to_string()); - (Err(desc), JobState::Error) + (Err(desc), RunState::Error) } } } else if state == "interrupted" { let result = msg.as_object().unwrap().get("result").unwrap().as_str().unwrap(); - eprintln!("job update: state is {} and result is {}", state, result); + eprintln!("task update: state is {} and result is {}", state, result); let desc = msg.as_object().unwrap().get("desc") .map(|x| x.as_str().unwrap().to_string()) .unwrap_or_else(|| result.to_string()); - (Err(desc), JobState::Error) + (Err(desc), RunState::Error) } else { - eprintln!("job update: state is {}", state); - (Err(format!("atypical completion status: {}", state)), JobState::Invalid) + eprintln!("task update: state is {}", state); + (Err(format!("atypical completion status: {}", state)), RunState::Invalid) }; - let repo_id = self.dbctx.repo_id_by_remote(self.job.remote_id).unwrap().expect("remote exists"); + let job = self.dbctx.job_by_id(self.task.job_id).expect("can query").expect("job exists"); + let repo_id = self.dbctx.repo_id_by_remote(job.remote_id).unwrap().expect("remote exists"); for notifier in self.dbctx.notifiers_by_repo(repo_id).expect("can get notifiers") { - if let Err(e) = notifier.tell_complete_job(&self.dbctx, repo_id, &self.sha, self.job.id, result.clone()).await { + if let Err(e) = notifier.tell_complete_job(&self.dbctx, repo_id, &self.sha, self.task.id, result.clone()).await { eprintln!("could not notify {:?}: {:?}", notifier.remote_path, e); } } @@ -242,8 +214,8 @@ impl ClientJob { }; self.dbctx.conn.lock().unwrap().execute( - "update jobs set complete_time=?1, state=?2, build_result=?3, final_status=?4 where id=?5", - (now as u64, state as u64, build_result as u8, result_desc, self.job.id) + "update runs set complete_time=?1, state=?2, build_result=?3, final_status=?4 where id=?5", + (now as u64, state as u64, build_result as u8, result_desc, self.task.id) ) .expect("can update"); } @@ -258,7 +230,7 @@ impl ClientJob { let name = msg.as_object().unwrap().get("name").unwrap().as_str().unwrap(); let value = msg.as_object().unwrap().get("value").unwrap().as_str().unwrap(); - self.dbctx.insert_metric(self.job.id, name, value) + self.dbctx.insert_metric(self.task.id, name, value) .expect("TODO handle metric insert error?"); } "command" => { @@ -311,7 +283,7 @@ impl RunnerClient { } // is this client willing to run the job based on what it has told us so far? - fn will_accept(&self, job: &PendingJob) -> bool { + fn will_accept(&self, job: &Job) -> bool { match (job.source.as_ref(), self.accepted_sources.as_ref()) { (_, None) => true, (None, Some(_)) => false, @@ -321,7 +293,7 @@ impl RunnerClient { } } - async fn submit(mut self, dbctx: &Arc, job: &PendingJob, remote_git_url: &str, sha: &str) -> Result, String> { + async fn submit(mut self, dbctx: &Arc, job: &PendingRun, remote_git_url: &str, sha: &str) -> Result, String> { self.send(serde_json::json!({ "commit": sha, "remote_url": remote_git_url, @@ -334,7 +306,7 @@ impl RunnerClient { }) { eprintln!("resp: {:?}", resp); Ok(Some(ClientJob { - job: job.clone(), + task: job.clone(), dbctx: Arc::clone(dbctx), sha: sha.to_string(), remote_git_url: remote_git_url.to_string(), @@ -364,18 +336,18 @@ impl fmt::Debug for RunnerClient { #[axum_macros::debug_handler] async fn handle_artifact(State(ctx): State<(Arc, mpsc::Sender)>, headers: HeaderMap, artifact_content: BodyStream) -> impl IntoResponse { eprintln!("artifact request"); - let job_token = match headers.get("x-job-token") { - Some(job_token) => job_token.to_str().expect("valid string"), + let run_token = match headers.get("x-task-token") { + Some(run_token) => run_token.to_str().expect("valid string"), None => { - eprintln!("bad artifact post: headers: {:?}\nno x-job-token", headers); + eprintln!("bad artifact post: headers: {:?}\nno x-tasak-token", headers); return (StatusCode::BAD_REQUEST, "").into_response(); } }; - let (job, artifact_path, token_validity) = match ctx.0.job_for_token(&job_token).unwrap() { + let (run, artifact_path, token_validity) = match ctx.0.run_for_token(&run_token).unwrap() { Some(result) => result, None => { - eprintln!("bad artifact post: headers: {:?}\njob token is not known", headers); + eprintln!("bad artifact post: headers: {:?}\nrun token is not known", headers); return (StatusCode::BAD_REQUEST, "").into_response(); } }; @@ -408,7 +380,7 @@ async fn handle_artifact(State(ctx): State<(Arc, mpsc::Sender artifact, Err(err) => { eprintln!("failure to reserve artifact: {:?}", err); @@ -442,7 +414,7 @@ async fn handle_next_job(State(ctx): State<(Arc, mpsc::Sender { - eprintln!("bad artifact post: headers: {:?}\nno x-job-token", headers); + eprintln!("bad artifact post: headers: {:?}\nno authorization", headers); return (StatusCode::BAD_REQUEST, "").into_response(); } }; @@ -533,13 +505,13 @@ async fn main() { dbctx.create_tables().unwrap(); loop { - let jobs = dbctx.get_pending_jobs().unwrap(); + let runs = dbctx.get_pending_runs().unwrap(); - if jobs.len() > 0 { - println!("{} new jobs", jobs.len()); + if runs.len() > 0 { + println!("{} new runs", runs.len()); - for job in jobs.into_iter() { - activate_job(Arc::clone(&dbctx), &job, &mut channel).await; + for run in runs.into_iter() { + activate_run(Arc::clone(&dbctx), &run, &mut channel).await; } } tokio::time::sleep(std::time::Duration::from_millis(100)).await; diff --git a/src/ci_runner.rs b/src/ci_runner.rs index 4052ba5..330490c 100644 --- a/src/ci_runner.rs +++ b/src/ci_runner.rs @@ -129,7 +129,7 @@ impl RunningJob { let (mut sender, body) = hyper::Body::channel(); let resp = self.client.http.post("https://ci.butactuallyin.space:9876/api/artifact") .header("user-agent", "ci-butactuallyin-space-runner") - .header("x-job-token", &self.job.build_token) + .header("x-task-token", &self.job.build_token) .header("x-artifact-name", name) .header("x-artifact-desc", desc) .body(body) diff --git a/src/dbctx.rs b/src/dbctx.rs index dcca362..331e55f 100644 --- a/src/dbctx.rs +++ b/src/dbctx.rs @@ -36,34 +36,51 @@ pub struct Remote { pub notifier_config_path: String, } +// a job tracks when we became aware of a commit from remote. typically a job will have a 1-1 +// relationship with commits, and potentially many *runs* of that job. #[derive(Debug, Clone)] pub struct Job { pub id: u64, - pub artifacts_path: Option, - pub state: sql::JobState, - pub run_host: Option, pub remote_id: u64, pub commit_id: u64, pub created_time: u64, + pub source: Option, +} + +// a run tracks the intent or obligation to have some runner somewhere run a goodfile and report +// results. a job may have many runs from many different hosts rebuliding history, or reruns of the +// same job on the same hardware to collect more datapoints on the operation. +#[derive(Debug, Clone)] +pub struct Run { + pub id: u64, + pub job_id: u64, + pub artifacts_path: Option, + pub state: sql::RunState, + pub run_host: Option, + pub create_time: u64, pub start_time: Option, pub complete_time: Option, pub build_token: Option, - pub job_timeout: Option, - pub source: Option, + pub run_timeout: Option, pub build_result: Option, pub final_text: Option, } +impl Run { + fn into_pending_run(self) -> PendingRun { + PendingRun { + id: self.id, + job_id: self.job_id, + create_time: self.create_time, + } + } +} + #[derive(Debug, Clone)] -pub struct PendingJob { +pub struct PendingRun { pub id: u64, - pub artifacts: Option, - pub state: sql::JobState, - pub run_host: Option, - pub remote_id: u64, - pub commit_id: u64, - pub created_time: u64, - pub source: Option, + pub job_id: u64, + pub create_time: u64, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -76,7 +93,7 @@ pub enum TokenValidity { #[derive(Debug, Clone, PartialEq, Eq)] pub struct MetricRecord { pub id: u64, - pub job_id: u64, + pub run_id: u64, pub name: String, pub value: String } @@ -84,7 +101,7 @@ pub struct MetricRecord { #[derive(Debug, Clone, PartialEq, Eq)] pub struct ArtifactRecord { pub id: u64, - pub job_id: u64, + pub run_id: u64, pub name: String, pub desc: String, pub created_time: u64, @@ -113,12 +130,12 @@ impl DbCtx { Ok(()) } - pub fn insert_metric(&self, job_id: u64, name: &str, value: &str) -> Result<(), String> { + pub fn insert_metric(&self, run_id: u64, name: &str, value: &str) -> Result<(), String> { let conn = self.conn.lock().unwrap(); conn .execute( - "insert into metrics (job_id, name, value) values (?1, ?2, ?3) on conflict (job_id, name) do update set value=excluded.value", - (job_id, name, value) + "insert into metrics (run_id, name, value) values (?1, ?2, ?3) on conflict (run_id, name) do update set value=excluded.value", + (run_id, name, value) ) .expect("can upsert"); Ok(()) @@ -163,7 +180,7 @@ impl DbCtx { }) } - pub async fn reserve_artifact(&self, job_id: u64, name: &str, desc: &str) -> Result { + pub async fn reserve_artifact(&self, run_id: u64, name: &str, desc: &str) -> Result { let artifact_id = { let created_time = SystemTime::now() .duration_since(UNIX_EPOCH) @@ -172,8 +189,8 @@ impl DbCtx { let conn = self.conn.lock().unwrap(); conn .execute( - "insert into artifacts (job_id, name, desc, created_time) values (?1, ?2, ?3, ?4)", - (job_id, name, desc, created_time) + "insert into artifacts (run_id, name, desc, created_time) values (?1, ?2, ?3, ?4)", + (run_id, name, desc, created_time) ) .map_err(|e| { format!("{:?}", e) @@ -182,17 +199,17 @@ impl DbCtx { conn.last_insert_rowid() as u64 }; - ArtifactDescriptor::new(job_id, artifact_id).await + ArtifactDescriptor::new(run_id, artifact_id).await } - pub fn lookup_artifact(&self, job_id: u64, artifact_id: u64) -> Result, String> { + pub fn lookup_artifact(&self, run_id: u64, artifact_id: u64) -> Result, String> { let conn = self.conn.lock().unwrap(); conn - .query_row(sql::ARTIFACT_BY_ID, [artifact_id, job_id], |row| { - let (id, job_id, name, desc, created_time, completed_time) = row.try_into().unwrap(); + .query_row(sql::ARTIFACT_BY_ID, [artifact_id, run_id], |row| { + let (id, run_id, name, desc, created_time, completed_time) = row.try_into().unwrap(); Ok(ArtifactRecord { - id, job_id, name, desc, created_time, completed_time + id, run_id, name, desc, created_time, completed_time }) }) .optional() @@ -222,11 +239,11 @@ impl DbCtx { .map_err(|e| e.to_string()) } - pub fn job_for_token(&self, token: &str) -> Result, TokenValidity)>, String> { + pub fn run_for_token(&self, token: &str) -> Result, TokenValidity)>, String> { self.conn.lock() .unwrap() .query_row( - "select id, artifacts_path, started_time, job_timeout from jobs where build_token=?1", + "select id, artifacts_path, started_time, run_timeout from runs where build_token=?1", [token], |row| { let timeout: Option = row.get(3).unwrap(); @@ -254,6 +271,20 @@ impl DbCtx { .map_err(|e| e.to_string()) } + pub fn job_by_id(&self, id: u64) -> Result, String> { + self.conn.lock() + .unwrap() + .query_row(crate::sql::JOB_BY_ID, [id], |row| { + let (id, source, created_time, remote_id, commit_id) = row.try_into().unwrap(); + + Ok(Job { + id, source, created_time, remote_id, commit_id + }) + }) + .optional() + .map_err(|e| e.to_string()) + } + pub fn remote_by_id(&self, id: u64) -> Result, String> { self.conn.lock() .unwrap() @@ -322,40 +353,62 @@ impl DbCtx { let conn = self.conn.lock().unwrap(); let rows_modified = conn.execute( - "insert into jobs (state, remote_id, commit_id, created_time, source) values (?1, ?2, ?3, ?4, ?5);", - (crate::sql::JobState::Pending as u64, remote_id, commit_id, created_time, pusher) + "insert into jobs (remote_id, commit_id, created_time, source) values (?1, ?2, ?3, ?4, ?5);", + (remote_id, commit_id, created_time, pusher) ).unwrap(); assert_eq!(1, rows_modified); - Ok(conn.last_insert_rowid() as u64) + let job_id = conn.last_insert_rowid() as u64; + + Ok(job_id) } - pub fn metrics_for_job(&self, job: u64) -> Result, String> { + pub fn new_run(&self, job_id: u64) -> Result { + let created_time = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("now is before epoch") + .as_millis() as u64; + let conn = self.conn.lock().unwrap(); - let mut metrics_query = conn.prepare(sql::METRICS_FOR_JOB).unwrap(); - let mut result = metrics_query.query([job]).unwrap(); + let rows_modified = conn.execute( + "insert into runs (job_id, state, created_time) values (?1, ?2, ?3);", + (job_id, crate::sql::RunState::Pending as u64, created_time) + ).unwrap(); + + assert_eq!(1, rows_modified); + + let run_id = conn.last_insert_rowid() as u64; + + Ok(run_id) + } + + pub fn metrics_for_run(&self, run: u64) -> Result, String> { + let conn = self.conn.lock().unwrap(); + + let mut metrics_query = conn.prepare(sql::METRICS_FOR_RUN).unwrap(); + let mut result = metrics_query.query([run]).unwrap(); let mut metrics = Vec::new(); while let Some(row) = result.next().unwrap() { - let (id, job_id, name, value): (u64, u64, String, String) = row.try_into().unwrap(); - metrics.push(MetricRecord { id, job_id, name, value }); + let (id, run_id, name, value): (u64, u64, String, String) = row.try_into().unwrap(); + metrics.push(MetricRecord { id, run_id, name, value }); } Ok(metrics) } - pub fn artifacts_for_job(&self, job: u64, limit: Option) -> Result, String> { + pub fn artifacts_for_run(&self, run: u64, limit: Option) -> Result, String> { let conn = self.conn.lock().unwrap(); - let mut artifacts_query = conn.prepare(sql::LAST_ARTIFACTS_FOR_JOB).unwrap(); - let mut result = artifacts_query.query([job, limit.unwrap_or(65535)]).unwrap(); + let mut artifacts_query = conn.prepare(sql::LAST_ARTIFACTS_FOR_RUN).unwrap(); + let mut result = artifacts_query.query([run, limit.unwrap_or(65535)]).unwrap(); let mut artifacts = Vec::new(); while let Some(row) = result.next().unwrap() { - let (id, job_id, name, desc, created_time, completed_time): (u64, u64, String, String, u64, Option) = row.try_into().unwrap(); - artifacts.push(ArtifactRecord { id, job_id, name, desc, created_time, completed_time }); + let (id, run_id, name, desc, created_time, completed_time): (u64, u64, String, String, u64, Option) = row.try_into().unwrap(); + artifacts.push(ArtifactRecord { id, run_id, name, desc, created_time, completed_time }); } Ok(artifacts) @@ -403,23 +456,13 @@ impl DbCtx { conn .query_row(sql::JOB_BY_COMMIT_ID, [commit_id], |row| { - let (id, artifacts_path, state, run_host, remote_id, commit_id, created_time, start_time, complete_time, build_token, job_timeout, source, build_result, final_text) = row.try_into().unwrap(); - let state: u8 = state; + let (id, source, created_time, remote_id, commit_id) = row.try_into().unwrap(); Ok(Job { id, - artifacts_path, - state: state.try_into().unwrap(), - run_host, remote_id, commit_id, created_time, - start_time, - complete_time, - build_token, - job_timeout, source, - build_result, - final_text, }) }) .optional() @@ -435,77 +478,43 @@ impl DbCtx { let mut jobs = Vec::new(); while let Some(row) = result.next().unwrap() { - let (id, artifacts_path, state, run_host, remote_id, commit_id, created_time, start_time, complete_time, build_token, job_timeout, source, build_result, final_text)= row.try_into().unwrap(); - let state: u8 = state; + let (id, source, created_time, remote_id, commit_id) = row.try_into().unwrap(); jobs.push(Job { id, - artifacts_path, - state: state.try_into().unwrap(), - run_host, remote_id, commit_id, created_time, - start_time, - complete_time, - build_token, - job_timeout, source, - build_result, - final_text, }); } Ok(jobs) } - pub fn get_active_jobs(&self) -> Result, String> { + pub fn get_active_runs(&self) -> Result, String> { let conn = self.conn.lock().unwrap(); - let mut started_query = conn.prepare(sql::ACTIVE_JOBS).unwrap(); - let mut jobs = started_query.query([]).unwrap(); + let mut started_query = conn.prepare(sql::ACTIVE_RUNS).unwrap(); + let mut runs = started_query.query([]).unwrap(); let mut started = Vec::new(); - while let Some(row) = jobs.next().unwrap() { - let (id, artifacts_path, state, run_host, remote_id, commit_id, created_time, start_time, complete_time, build_token, job_timeout, source, build_result, final_text) = row.try_into().unwrap(); - let state: u8 = state; - - started.push(Job { - id, - artifacts_path, - state: state.try_into().unwrap(), - run_host, - remote_id, - commit_id, - created_time, - start_time, - complete_time, - build_token, - job_timeout, - source, - build_result, - final_text, - }); + while let Some(row) = runs.next().unwrap() { + started.push(crate::sql::row2run(row)); } Ok(started) } - pub fn get_pending_jobs(&self) -> Result, String> { + pub fn get_pending_runs(&self) -> Result, String> { let conn = self.conn.lock().unwrap(); - let mut pending_query = conn.prepare(sql::PENDING_JOBS).unwrap(); - let mut jobs = pending_query.query([]).unwrap(); + let mut pending_query = conn.prepare(sql::PENDING_RUNS).unwrap(); + let mut runs = pending_query.query([]).unwrap(); let mut pending = Vec::new(); - while let Some(row) = jobs.next().unwrap() { - let (id, artifacts, state, run_host, remote_id, commit_id, created_time, source) = row.try_into().unwrap(); - let state: u8 = state; - pending.push(PendingJob { - id, artifacts, - state: state.try_into().unwrap(), - run_host, remote_id, commit_id, created_time, - source, - }); + while let Some(row) = runs.next().unwrap() { + let run = crate::sql::row2run(row).into_pending_run(); + pending.push(run); } Ok(pending) @@ -526,6 +535,17 @@ impl DbCtx { Ok(remotes) } + pub fn last_run_for_job(&self, job_id: u64) -> Result, String> { + let conn = self.conn.lock().unwrap(); + + conn + .query_row(sql::LAST_RUN_FOR_JOB, [job_id], |row| { + Ok(crate::sql::row2run(row)) + }) + .optional() + .map_err(|e| e.to_string()) + } + pub fn notifiers_by_repo(&self, repo_id: u64) -> Result, String> { let remotes = self.remotes_by_repo(repo_id)?; diff --git a/src/main.rs b/src/main.rs index 5f719a0..f008a56 100644 --- a/src/main.rs +++ b/src/main.rs @@ -35,9 +35,9 @@ mod sql; mod notifier; mod dbctx; -use sql::JobState; +use sql::RunState; -use dbctx::{DbCtx, Job, ArtifactRecord}; +use dbctx::{DbCtx, Job, Run, ArtifactRecord}; use rusqlite::OptionalExtension; @@ -148,14 +148,14 @@ fn job_url(job: &Job, commit_sha: &str, ctx: &Arc) -> String { format!("{}/{}", &remote.remote_path, commit_sha) } -/// render how long a job took, or is taking, in a human-friendly way -fn display_job_time(job: &Job) -> String { - if let Some(start_time) = job.start_time { - if let Some(complete_time) = job.complete_time { +/// render how long a run took, or is taking, in a human-friendly way +fn display_run_time(run: &Run) -> String { + if let Some(start_time) = run.start_time { + if let Some(complete_time) = run.complete_time { if complete_time < start_time { - if job.state == JobState::Started { - // this job has been restarted. the completed time is stale. - // further, this is a currently active job. + if run.state == RunState::Started { + // this run has been restarted. the completed time is stale. + // further, this is a currently active run. let now_ms = SystemTime::now().duration_since(UNIX_EPOCH).expect("now is after then").as_millis() as u64; let mut duration = duration_as_human_string(now_ms - start_time); duration.push_str(" (ongoing)"); @@ -332,21 +332,23 @@ async fn handle_ci_index(State(ctx): State) -> impl IntoResponse let mut row_num = 0; for repo in repos { - let mut most_recent_job: Option = None; + let mut most_recent_run: Option<(Job, Run)> = None; for remote in ctx.dbctx.remotes_by_repo(repo.id).expect("remotes by repo works") { let last_job = ctx.dbctx.last_job_from_remote(remote.id).expect("job by remote works"); if let Some(last_job) = last_job { - if most_recent_job.as_ref().map(|job| job.created_time < last_job.created_time).unwrap_or(true) { - most_recent_job = Some(last_job); + if let Some(last_run) = ctx.dbctx.last_run_for_job(last_job.id).expect("can query") { + if most_recent_run.as_ref().map(|run| run.1.create_time < last_run.create_time).unwrap_or(true) { + most_recent_run = Some((last_job, last_run)); + } } } } let repo_html = format!("{}", &repo.name, &repo.name); - let row_html: String = match most_recent_job { - Some(job) => { + let row_html: String = match most_recent_run { + Some((job, run)) => { let job_commit = ctx.dbctx.commit_sha(job.commit_id).expect("job has a commit"); let commit_html = match commit_url(&job, &job_commit, &ctx.dbctx) { Some(url) => format!("{}", url, &job_commit), @@ -355,17 +357,17 @@ async fn handle_ci_index(State(ctx): State) -> impl IntoResponse let job_html = format!("{}", job_url(&job, &job_commit, &ctx.dbctx), job.id); - let last_build_time = Utc.timestamp_millis_opt(job.created_time as i64).unwrap().to_rfc2822(); - let duration = display_job_time(&job); + let last_build_time = Utc.timestamp_millis_opt(run.create_time as i64).unwrap().to_rfc2822(); + let duration = display_run_time(&run); - let status = format!("{:?}", job.state).to_lowercase(); + let status = format!("{:?}", run.state).to_lowercase(); - let result = match job.build_result { + let result = match run.build_result { Some(0) => "pass", Some(_) => "fail", - None => match job.state { - JobState::Pending => { "unstarted" }, - JobState::Started => { "in progress" }, + None => match run.state { + RunState::Pending => { "unstarted" }, + RunState::Started => { "in progress" }, _ => { "unreported" } } }; @@ -401,10 +403,10 @@ async fn handle_ci_index(State(ctx): State) -> impl IntoResponse } response.push_str(""); - response.push_str("

active jobs

\n"); + response.push_str("

active tasks

\n"); - let jobs = ctx.dbctx.get_active_jobs().expect("can query"); - if jobs.len() == 0 { + let runs = ctx.dbctx.get_active_runs().expect("can query"); + if runs.len() == 0 { response.push_str("

(none)

\n"); } else { response.push_str(""); @@ -417,9 +419,10 @@ async fn handle_ci_index(State(ctx): State) -> impl IntoResponse let mut row_num = 0; - for job in jobs.iter() { + for run in runs.iter() { let row_index = row_num % 2; + let job = ctx.dbctx.job_by_id(run.job_id).expect("query succeeds").expect("job id is valid"); let remote = ctx.dbctx.remote_by_id(job.remote_id).expect("query succeeds").expect("remote id is valid"); let repo = ctx.dbctx.repo_by_id(remote.repo_id).expect("query succeeds").expect("repo id is valid"); @@ -433,17 +436,17 @@ async fn handle_ci_index(State(ctx): State) -> impl IntoResponse let job_html = format!("{}", job_url(&job, &job_commit, &ctx.dbctx), job.id); - let last_build_time = Utc.timestamp_millis_opt(job.created_time as i64).unwrap().to_rfc2822(); - let duration = display_job_time(&job); + let last_build_time = Utc.timestamp_millis_opt(run.create_time as i64).unwrap().to_rfc2822(); + let duration = display_run_time(&run); - let status = format!("{:?}", job.state).to_lowercase(); + let status = format!("{:?}", run.state).to_lowercase(); - let result = match job.build_result { + let result = match run.build_result { Some(0) => "pass", Some(_) => "fail", - None => match job.state { - JobState::Pending => { "unstarted" }, - JobState::Started => { "in progress" }, + None => match run.state { + RunState::Pending => { "unstarted" }, + RunState::Started => { "in progress" }, _ => { "unreported" } } }; @@ -498,9 +501,11 @@ async fn handle_commit_status(Path(path): Path<(String, String, String)>, State( let job = ctx.dbctx.job_by_commit_id(commit_id).expect("can query").expect("job exists"); - let complete_time = job.complete_time.unwrap_or_else(crate::io::now_ms); + let run = ctx.dbctx.last_run_for_job(job.id).expect("can query").expect("run exists"); + + let complete_time = run.complete_time.unwrap_or_else(crate::io::now_ms); - let debug_info = job.state == JobState::Finished && job.build_result == Some(1) || job.state == JobState::Error; + let debug_info = run.state == RunState::Finished && run.build_result == Some(1) || run.state == RunState::Error; let repo_name: String = ctx.dbctx.conn.lock().unwrap() .query_row("select repo_name from repos where id=?1;", [repo_id], |row| row.get(0)) @@ -511,42 +516,42 @@ async fn handle_commit_status(Path(path): Path<(String, String, String)>, State( let head = format!("ci.butactuallyin.space - {}", repo_name); let repo_html = format!("{}", &repo_name, &repo_name); let remote_commit_elem = format!("{}", &remote_path, &sha, &sha); - let status_elem = match job.state { - JobState::Pending | JobState::Started => { + let status_elem = match run.state { + RunState::Pending | RunState::Started => { "pending" }, - JobState::Finished => { - if let Some(build_result) = job.build_result { + RunState::Finished => { + if let Some(build_result) = run.build_result { if build_result == 0 { "pass" } else { "failed" } } else { - eprintln!("job {} for commit {} is missing a build result but is reportedly finished (old data)?", job.id, commit_id); + eprintln!("run {} for commit {} is missing a build result but is reportedly finished (old data)?", run.id, commit_id); "unreported" } }, - JobState::Error => { + RunState::Error => { "error" } - JobState::Invalid => { + RunState::Invalid => { "(server error)" } }; let mut artifacts_fragment = String::new(); - let mut artifacts: Vec = ctx.dbctx.artifacts_for_job(job.id, None).unwrap() - .into_iter() // HACK: filter out artifacts for previous runs of a job. artifacts should be attached to a run, runs should be distinct from jobs. but i'm sleepy. - .filter(|artifact| artifact.created_time >= job.start_time.unwrap_or_else(crate::io::now_ms)) + let mut artifacts: Vec = ctx.dbctx.artifacts_for_run(run.id, None).unwrap() + .into_iter() // HACK: filter out artifacts for previous runs of a run. artifacts should be attached to a run, runs should be distinct from run. but i'm sleepy. + .filter(|artifact| artifact.created_time >= run.start_time.unwrap_or_else(crate::io::now_ms)) .collect(); artifacts.sort_by_key(|artifact| artifact.created_time); - fn diff_times(job_completed: u64, artifact_completed: Option) -> u64 { + fn diff_times(run_completed: u64, artifact_completed: Option) -> u64 { let artifact_completed = artifact_completed.unwrap_or_else(crate::io::now_ms); - let job_completed = std::cmp::max(job_completed, artifact_completed); - job_completed - artifact_completed + let run_completed = std::cmp::max(run_completed, artifact_completed); + run_completed - artifact_completed } let recent_artifacts: Vec = artifacts.iter().filter(|artifact| diff_times(complete_time, artifact.completed_time) <= 60_000).cloned().collect(); @@ -556,7 +561,7 @@ async fn handle_commit_status(Path(path): Path<(String, String, String)>, State( let created_time_str = Utc.timestamp_millis_opt(artifact.created_time as i64).unwrap().to_rfc2822(); artifacts_fragment.push_str(&format!("
{}
step:
{}
\n", created_time_str, &artifact.name)); let duration_str = duration_as_human_string(artifact.completed_time.unwrap_or_else(crate::io::now_ms) - artifact.created_time); - let size_str = (std::fs::metadata(&format!("./jobs/{}/{}", artifact.job_id, artifact.id)).expect("metadata exists").len() / 1024).to_string(); + let size_str = (std::fs::metadata(&format!("./artifacts/{}/{}", artifact.run_id, artifact.id)).expect("metadata exists").len() / 1024).to_string(); artifacts_fragment.push_str(&format!("
  {}kb in {} 
\n", size_str, duration_str)); } @@ -565,18 +570,18 @@ async fn handle_commit_status(Path(path): Path<(String, String, String)>, State( artifacts_fragment.push_str(&format!("
{}
step:
{}
\n", created_time_str, &artifact.name)); if debug_info { artifacts_fragment.push_str("
");
-            artifacts_fragment.push_str(&std::fs::read_to_string(format!("./jobs/{}/{}", artifact.job_id, artifact.id)).unwrap());
+            artifacts_fragment.push_str(&std::fs::read_to_string(format!("./artifacts/{}/{}", artifact.run_id, artifact.id)).unwrap());
             artifacts_fragment.push_str("
\n"); } else { let duration_str = duration_as_human_string(artifact.completed_time.unwrap_or_else(crate::io::now_ms) - artifact.created_time); - let size_str = std::fs::metadata(&format!("./jobs/{}/{}", artifact.job_id, artifact.id)).map(|md| { + let size_str = std::fs::metadata(&format!("./artifacts/{}/{}", artifact.run_id, artifact.id)).map(|md| { (md.len() / 1024).to_string() }).unwrap_or_else(|e| format!("[{}]", e)); artifacts_fragment.push_str(&format!("
  {}kb in {} 
\n", size_str, duration_str)); } } - let metrics = ctx.dbctx.metrics_for_job(job.id).unwrap(); + let metrics = ctx.dbctx.metrics_for_run(run.id).unwrap(); let metrics_section = if metrics.len() > 0 { let mut section = String::new(); section.push_str("
"); @@ -599,9 +604,9 @@ async fn handle_commit_status(Path(path): Path<(String, String, String)>, State( html.push_str(" \n"); html.push_str("
\n");
     html.push_str(&format!("repo: {}\n", repo_html));
-    html.push_str(&format!("commit: {}, job: {}\n", remote_commit_elem, job.id));
-    html.push_str(&format!("status: {} in {}\n", status_elem, display_job_time(&job)));
-    if let Some(desc) = job.final_text.as_ref() {
+    html.push_str(&format!("commit: {}, run: {}\n", remote_commit_elem, run.id));
+    html.push_str(&format!("status: {} in {}\n", status_elem, display_run_time(&run)));
+    if let Some(desc) = run.final_text.as_ref() {
         html.push_str(&format!("  description: {}\n  ", desc));
     }
     html.push_str(&format!("deployed: {}\n", deployed));
@@ -620,11 +625,11 @@ async fn handle_commit_status(Path(path): Path<(String, String, String)>, State(
 }
 
 async fn handle_get_artifact(Path(path): Path<(String, String)>, State(ctx): State) -> impl IntoResponse {
-    eprintln!("get artifact, job={}, artifact={}", path.0, path.1);
-    let job_id: u64 = path.0.parse().unwrap();
+    eprintln!("get artifact, run={}, artifact={}", path.0, path.1);
+    let run: u64 = path.0.parse().unwrap();
     let artifact_id: u64 = path.1.parse().unwrap();
 
-    let artifact_descriptor = match ctx.dbctx.lookup_artifact(job_id, artifact_id).unwrap() {
+    let artifact_descriptor = match ctx.dbctx.lookup_artifact(run, artifact_id).unwrap() {
         Some(artifact) => artifact,
         None => {
             return (StatusCode::NOT_FOUND, Html("no such artifact")).into_response();
@@ -645,7 +650,7 @@ async fn handle_get_artifact(Path(path): Path<(String, String)>, State(ctx): Sta
         let (mut tx_sender, tx_receiver) = tokio::io::duplex(65536);
         let resp_body = axum_extra::body::AsyncReadBody::new(tx_receiver);
         let mut artifact_path = ctx.jobs_path.clone();
-        artifact_path.push(artifact_descriptor.job_id.to_string());
+        artifact_path.push(artifact_descriptor.run_id.to_string());
         artifact_path.push(artifact_descriptor.id.to_string());
         spawn(async move {
             let mut artifact = artifact_descriptor;
@@ -661,7 +666,7 @@ async fn handle_get_artifact(Path(path): Path<(String, String)>, State(ctx): Sta
                         // this would be much implemented as yielding on a condvar woken when an
                         // inotify event on the file indicates a write has occurred. but i am
                         // dreadfully lazy, so we'll just uhh. busy-poll on the file? lmao.
-                        artifact = ctx.dbctx.lookup_artifact(artifact.job_id, artifact.id)
+                        artifact = ctx.dbctx.lookup_artifact(artifact.run_id, artifact.id)
                             .expect("can query db")
                             .expect("artifact still exists");
                     }
@@ -724,6 +729,7 @@ async fn handle_repo_summary(Path(path): Path, State(ctx): State format!("{}", url, &job_commit),
@@ -732,17 +738,17 @@ async fn handle_repo_summary(Path(path): Path, State(ctx): State{}", job_url(&job, &job_commit, &ctx.dbctx), job.id);
 
-        let last_build_time = Utc.timestamp_millis_opt(job.created_time as i64).unwrap().to_rfc2822();
-        let duration = display_job_time(&job);
+        let last_build_time = Utc.timestamp_millis_opt(run.create_time as i64).unwrap().to_rfc2822();
+        let duration = display_run_time(&run);
 
-        let status = format!("{:?}", job.state).to_lowercase();
+        let status = format!("{:?}", run.state).to_lowercase();
 
-        let result = match job.build_result {
+        let result = match run.build_result {
             Some(0) => "pass",
             Some(_) => "fail",
-            None => match job.state {
-                JobState::Pending => { "unstarted" },
-                JobState::Started => { "in progress" },
+            None => match run.state {
+                RunState::Pending => { "unstarted" },
+                RunState::Started => { "in progress" },
                 _ => { "unreported" }
             }
         };
@@ -885,7 +891,7 @@ async fn make_app_server(jobs_path: PathBuf, cfg_path: &PathBuf, db_path: &PathB
         .route("/:owner/:repo/:sha", get(handle_commit_status))
         .route("/:owner", get(handle_repo_summary))
         .route("/:owner/:repo", post(handle_repo_event))
-        .route("/artifact/:job/:artifact_id", get(handle_get_artifact))
+        .route("/artifact/:b/:artifact_id", get(handle_get_artifact))
         .route("/", get(handle_ci_index))
         .fallback(fallback_get)
         .with_state(WebserverState {
diff --git a/src/sql.rs b/src/sql.rs
index 91edafb..cc33ad4 100644
--- a/src/sql.rs
+++ b/src/sql.rs
@@ -2,6 +2,9 @@
 
 use std::convert::TryFrom;
 
+use crate::dbctx::Run;
+use crate::dbctx::Job;
+
 #[derive(Debug, Clone)]
 pub enum JobResult {
     Pass = 0,
@@ -9,7 +12,7 @@ pub enum JobResult {
 }
 
 #[derive(Debug, Clone, PartialEq)]
-pub enum JobState {
+pub enum RunState {
     Pending = 0,
     Started = 1,
     Finished = 2,
@@ -17,38 +20,48 @@ pub enum JobState {
     Invalid = 4,
 }
 
-impl TryFrom for JobState {
+impl TryFrom for RunState {
     type Error = String;
 
     fn try_from(value: u8) -> Result {
         match value {
-            0 => Ok(JobState::Pending),
-            1 => Ok(JobState::Started),
-            2 => Ok(JobState::Finished),
-            3 => Ok(JobState::Error),
-            4 => Ok(JobState::Invalid),
+            0 => Ok(RunState::Pending),
+            1 => Ok(RunState::Started),
+            2 => Ok(RunState::Finished),
+            3 => Ok(RunState::Error),
+            4 => Ok(RunState::Invalid),
             other => Err(format!("invalid job state: {}", other)),
         }
     }
 }
 
+pub(crate) fn row2run(row: &rusqlite::Row) -> Run {
+    let (id, job_id, artifacts_path, state, run_host, build_token, create_time, start_time, complete_time, run_timeout, build_result, final_text) = row.try_into().unwrap();
+    let state: u8 = state;
+    Run {
+        id,
+        job_id,
+        artifacts_path,
+        state: state.try_into().unwrap(),
+        run_host,
+        create_time,
+        start_time,
+        complete_time,
+        build_token,
+        run_timeout,
+        build_result,
+        final_text,
+    }
+}
+
 // remote_id is the remote from which we were notified. this is necessary so we know which remote
 // to pull from to actually run the job.
 pub const CREATE_JOBS_TABLE: &'static str = "\
     CREATE TABLE IF NOT EXISTS jobs (id INTEGER PRIMARY KEY AUTOINCREMENT,
-        artifacts_path TEXT,
-        state INTEGER NOT NULL,
-        run_host TEXT,
-        build_token TEXT,
-        remote_id INTEGER,
-        commit_id INTEGER,
-        created_time INTEGER,
-        started_time INTEGER,
-        complete_time INTEGER,
-        job_timeout INTEGER,
         source TEXT,
-        build_result INTEGER,
-        final_status TEXT);";
+        created_time INTEGER,
+        remote_id INTEGER,
+        commit_id INTEGER);";
 
 pub const CREATE_METRICS_TABLE: &'static str = "\
     CREATE TABLE IF NOT EXISTS metrics (id INTEGER PRIMARY KEY AUTOINCREMENT,
@@ -82,35 +95,63 @@ pub const CREATE_REMOTES_TABLE: &'static str = "\
 
 pub const CREATE_ARTIFACTS_TABLE: &'static str = "\
     CREATE TABLE IF NOT EXISTS artifacts (id INTEGER PRIMARY KEY AUTOINCREMENT,
-        job_id INTEGER,
+        run_id INTEGER,
         name TEXT,
         desc TEXT,
         created_time INTEGER,
         completed_time INTEGER);";
 
+pub const CREATE_RUN_TABLE: &'static str = "\
+    CREATE TABLE IF NOT EXISTS runs (id INTEGER PRIMARY KEY AUTOINCREMENT,
+        job_id INTEGER,
+        artifacts_path TEXT,
+        state INTEGER NOT NULL,
+        run_host TEXT,
+        build_token TEXT,
+        created_time INTEGER,
+        started_time INTEGER,
+        complete_time INTEGER,
+        run_timeout INTEGER,
+        build_result INTEGER,
+        final_status TEXT);";
+
 pub const CREATE_REMOTES_INDEX: &'static str = "\
     CREATE INDEX IF NOT EXISTS 'repo_to_remote' ON remotes(repo_id);";
 
 pub const CREATE_REPO_NAME_INDEX: &'static str = "\
     CREATE UNIQUE INDEX IF NOT EXISTS 'repo_names' ON repos(repo_name);";
 
-pub const PENDING_JOBS: &'static str = "\
-    select id, artifacts_path, state, run_host, remote_id, commit_id, created_time, source from jobs where state=0;";
-
-pub const ACTIVE_JOBS: &'static str = "\
-    select * from jobs where state=1 or state=0;";
-
-pub const LAST_ARTIFACTS_FOR_JOB: &'static str = "\
-    select * from artifacts where job_id=?1 and (name like \"%(stderr)%\" or name like \"%(stdout)%\") order by id desc limit ?2;";
+pub const PENDING_RUNS: &'static str = "\
+    select id, job_id, artifacts_path, state, run_host, created_time from runs where state=0;";
+
+pub const ACTIVE_RUNS: &'static str = "\
+    select id,
+        job_id,
+        artifacts_path,
+        state,
+        run_host,
+        build_token,
+        created_time,
+        started_time,
+        complete_time,
+        run_timeout,
+        build_result,
+        final_status from runs where state=1 or state=0;";
+
+pub const LAST_ARTIFACTS_FOR_RUN: &'static str = "\
+    select * from artifacts where run_id=?1 and (name like \"%(stderr)%\" or name like \"%(stdout)%\") order by id desc limit ?2;";
 
 pub const JOB_BY_COMMIT_ID: &'static str = "\
-    select * from jobs where commit_id=?1;";
+    select id, source, created_time, remote_id, commit_id from jobs where commit_id=?1;";
 
 pub const ARTIFACT_BY_ID: &'static str = "\
-    select * from artifacts where id=?1 and job_id=?2;";
+    select * from artifacts where id=?1 and run_id=?2;";
 
-pub const METRICS_FOR_JOB: &'static str = "\
-    select * from metrics where job_id=?1 order by id asc;";
+pub const JOB_BY_ID: &'static str = "\
+    select id, source, created_time, remote_id, commit_id from jobs where id=?1";
+
+pub const METRICS_FOR_RUN: &'static str = "\
+    select * from metrics where run_id=?1 order by id asc;";
 
 pub const COMMIT_TO_ID: &'static str = "\
     select id from commits where sha=?1;";
@@ -122,5 +163,23 @@ pub const ALL_REPOS: &'static str = "\
     select * from repos;";
 
 pub const LAST_JOBS_FROM_REMOTE: &'static str = "\
-    select * from jobs where remote_id=?1 order by created_time desc limit ?2;";
-
+    select id, source, created_time, remote_id, commit_id from jobs where remote_id=?1 order by created_time desc limit ?2;";
+
+pub const LAST_RUN_FOR_JOB: &'static str = "\
+    select id,
+        job_id,
+        artifacts_path,
+        state,
+        run_host,
+        build_token,
+        created_time,
+        started_time,
+        complete_time,
+        run_timeout,
+        build_result,
+        final_status from runs where job_id=?1;";
+
+pub const SELECT_ALL_RUNS_WITH_JOB_INFO: &'static str = "\
+    select jobs.id as job_id, runs.id as run_id, runs.state, runs.created_time, jobs.commit_id
+    from jobs join runs on jobs.id=runs.job_id
+    oder by runs.created_time asc;";
-- 
cgit v1.1