diff options
author | iximeow <me@iximeow.net> | 2023-07-01 14:08:00 -0700 |
---|---|---|
committer | iximeow <me@iximeow.net> | 2023-07-01 14:08:00 -0700 |
commit | ec5a274436bc8dda0b55d2c4da1411ff3c52434d (patch) | |
tree | 34bc1890b2a0cacd4d9e941e7b2221afe39ecfe9 /src | |
parent | 4657e736b6067c4dd0e25ad14253fdb8febffd89 (diff) |
add a notion of runs distinct from jobs, lets see how well this goes over
Diffstat (limited to 'src')
-rw-r--r-- | src/ci_ctl.rs | 17 | ||||
-rw-r--r-- | src/ci_driver.rs | 124 | ||||
-rw-r--r-- | src/ci_runner.rs | 2 | ||||
-rw-r--r-- | src/dbctx.rs | 216 | ||||
-rw-r--r-- | src/main.rs | 140 | ||||
-rw-r--r-- | src/sql.rs | 127 |
6 files changed, 340 insertions, 286 deletions
diff --git a/src/ci_ctl.rs b/src/ci_ctl.rs index 36eada5..15feca9 100644 --- a/src/ci_ctl.rs +++ b/src/ci_ctl.rs @@ -5,7 +5,6 @@ mod dbctx; mod notifier; mod io; -use sql::JobState; use dbctx::DbCtx; use notifier::NotifierConfig; @@ -82,28 +81,26 @@ fn main() { JobAction::List => { let db = DbCtx::new(&config_path, &db_path); let mut conn = db.conn.lock().unwrap(); - let mut query = conn.prepare("select id, artifacts_path, state, commit_id, created_time from jobs;").unwrap(); + let mut query = conn.prepare(crate::sql::SELECT_ALL_RUNS_WITH_JOB_INFO).unwrap(); let mut jobs = query.query([]).unwrap(); while let Some(row) = jobs.next().unwrap() { - let (id, artifacts, state, commit_id, created_time): (u64, Option<String>, u64, u64, u64) = row.try_into().unwrap(); + let (job_id, run_id, state, created_time, commit_id): (u64, u64, u64, u64, u64) = row.try_into().unwrap(); - eprintln!("[+] {:04} | {: >8?} | {}", id, state, created_time); + eprintln!("[+] {:04} ({:04}) | {: >8?} | {} | {}", run_id, job_id, state, created_time, commit_id); } eprintln!("jobs"); }, JobAction::Rerun { which } => { let db = DbCtx::new(&config_path, &db_path); - db.conn.lock().unwrap().execute("update jobs set state=0 where id=?1", [which]) - .expect("works"); - eprintln!("[+] job {} set to pending", which); + let task_id = db.new_run(which as u64).expect("db can be queried"); + eprintln!("[+] rerunning job {} as task {}", which, task_id); } JobAction::RerunCommit { commit } => { let db = DbCtx::new(&config_path, &db_path); let job_id = db.job_for_commit(&commit).unwrap(); if let Some(job_id) = job_id { - db.conn.lock().unwrap().execute("update jobs set state=0 where id=?1", [job_id]) - .expect("works"); - eprintln!("[+] job {} (commit {}) set to pending", job_id, commit); + let task_id = db.new_run(job_id).expect("db can be queried"); + eprintln!("[+] rerunning job {} (commit {}) as task {}", job_id, commit, task_id); } else { eprintln!("[-] no job for commit {}", commit); } diff --git a/src/ci_driver.rs b/src/ci_driver.rs index 0f629e7..25928ae 100644 --- a/src/ci_driver.rs +++ b/src/ci_driver.rs @@ -28,17 +28,17 @@ mod sql; mod notifier; mod io; -use crate::dbctx::{DbCtx, PendingJob}; +use crate::dbctx::{DbCtx, PendingRun, Job}; use crate::sql::JobResult; -use crate::sql::JobState; +use crate::sql::RunState; lazy_static! { static ref AUTH_SECRET: RwLock<Option<String>> = RwLock::new(None); } -fn reserve_artifacts_dir(job: u64) -> std::io::Result<PathBuf> { - let mut path: PathBuf = "/root/ixi_ci_server/jobs/".into(); - path.push(job.to_string()); +fn reserve_artifacts_dir(run: u64) -> std::io::Result<PathBuf> { + let mut path: PathBuf = "/root/ixi_ci_server/artifacts/".into(); + path.push(run.to_string()); match std::fs::create_dir(&path) { Ok(()) => { Ok(path) @@ -53,9 +53,12 @@ fn reserve_artifacts_dir(job: u64) -> std::io::Result<PathBuf> { } } -async fn activate_job(dbctx: Arc<DbCtx>, job: &PendingJob, clients: &mut mpsc::Receiver<RunnerClient>) -> Result<(), String> { - eprintln!("activating job {:?}", job); +async fn activate_run(dbctx: Arc<DbCtx>, run: &PendingRun, clients: &mut mpsc::Receiver<RunnerClient>) -> Result<(), String> { + eprintln!("activating task {:?}", run); + let job = dbctx.job_by_id(run.job_id).expect("can query").expect("job exists"); + let connection = dbctx.conn.lock().unwrap(); + let (repo_id, remote_git_url): (u64, String) = connection .query_row("select repo_id, remote_git_url from remotes where id=?1", [job.remote_id], |row| Ok((row.get_unwrap(0), row.get_unwrap(1)))) .expect("query succeeds"); @@ -73,52 +76,20 @@ async fn activate_job(dbctx: Arc<DbCtx>, job: &PendingJob, clients: &mut mpsc::R .expect("can run query"); std::mem::drop(connection); - let artifacts: PathBuf = match &job.artifacts { - Some(artifacts) => PathBuf::from(artifacts), - None => reserve_artifacts_dir(job.id).expect("can reserve a directory for artifacts") - }; - /* - - if job.run_host.as_ref() == None { - eprintln!("need to find a host to run the job"); - } - - eprintln!("cloning {}", remote_git_url); - let mut repo_dir = artifacts.clone(); - repo_dir.push("repo"); - eprintln!(" ... into {}", repo_dir.display()); - - Command::new("git") - .arg("clone") - .arg(&remote_git_url) - .arg(&format!("{}", repo_dir.display())) - .status() - .expect("can clone the repo"); - - eprintln!("checking out {}", commit_sha); - Command::new("git") - .current_dir(&repo_dir) - .arg("checkout") - .arg(&commit_sha) - .status() - .expect("can checkout hash"); - */ + let artifacts: PathBuf = reserve_artifacts_dir(run.id).expect("can reserve a directory for artifacts"); eprintln!("running {}", repo_name); - /* - * find the CI script, figure out how to run it - */ let mut client_job = loop { let mut candidate = clients.recv().await .ok_or_else(|| "client channel disconnected".to_string())?; - if !candidate.will_accept(job) { + if !candidate.will_accept(&job) { eprintln!("client {:?} would not accept job {:?}", candidate, job); continue; } - if let Ok(Some(mut client_job)) = candidate.submit(&dbctx, &job, &remote_git_url, &commit_sha).await { + if let Ok(Some(mut client_job)) = candidate.submit(&dbctx, &run, &remote_git_url, &commit_sha).await { break client_job; } else { // failed to submit job, move on for now @@ -168,8 +139,8 @@ struct ClientJob { dbctx: Arc<DbCtx>, remote_git_url: String, sha: String, - job: PendingJob, - client: RunnerClient + task: PendingRun, + client: RunnerClient } impl ClientJob { @@ -179,49 +150,50 @@ impl ClientJob { let msg = match self.client.recv().await.expect("recv works") { Some(msg) => msg, None => { - eprintln!("client hung up. job's done, i hope?"); + eprintln!("client hung up. task's done, i hope?"); return; } }; eprintln!("got {:?}", msg); let msg_kind = msg.as_object().unwrap().get("kind").unwrap().as_str().unwrap(); match msg_kind { - "new_job_please" => { - eprintln!("misdirected job request (after handshake?)"); + "new_task_please" => { + eprintln!("misdirected task request (after handshake?)"); return; }, - "job_status" => { + "task_status" => { let state = msg.as_object().unwrap().get("state").unwrap().as_str().unwrap(); - let (result, state): (Result<String, String>, JobState) = if state == "finished" { + let (result, state): (Result<String, String>, RunState) = if state == "finished" { let result = msg.as_object().unwrap().get("result").unwrap().as_str().unwrap(); - eprintln!("job update: state is {} and result is {}", state, result); + eprintln!("task update: state is {} and result is {}", state, result); match result { "pass" => { - (Ok("success".to_string()), JobState::Finished) + (Ok("success".to_string()), RunState::Finished) }, other => { let desc = msg.as_object().unwrap().get("desc") .map(|x| x.as_str().unwrap().to_string()) .unwrap_or_else(|| other.to_string()); - (Err(desc), JobState::Error) + (Err(desc), RunState::Error) } } } else if state == "interrupted" { let result = msg.as_object().unwrap().get("result").unwrap().as_str().unwrap(); - eprintln!("job update: state is {} and result is {}", state, result); + eprintln!("task update: state is {} and result is {}", state, result); let desc = msg.as_object().unwrap().get("desc") .map(|x| x.as_str().unwrap().to_string()) .unwrap_or_else(|| result.to_string()); - (Err(desc), JobState::Error) + (Err(desc), RunState::Error) } else { - eprintln!("job update: state is {}", state); - (Err(format!("atypical completion status: {}", state)), JobState::Invalid) + eprintln!("task update: state is {}", state); + (Err(format!("atypical completion status: {}", state)), RunState::Invalid) }; - let repo_id = self.dbctx.repo_id_by_remote(self.job.remote_id).unwrap().expect("remote exists"); + let job = self.dbctx.job_by_id(self.task.job_id).expect("can query").expect("job exists"); + let repo_id = self.dbctx.repo_id_by_remote(job.remote_id).unwrap().expect("remote exists"); for notifier in self.dbctx.notifiers_by_repo(repo_id).expect("can get notifiers") { - if let Err(e) = notifier.tell_complete_job(&self.dbctx, repo_id, &self.sha, self.job.id, result.clone()).await { + if let Err(e) = notifier.tell_complete_job(&self.dbctx, repo_id, &self.sha, self.task.id, result.clone()).await { eprintln!("could not notify {:?}: {:?}", notifier.remote_path, e); } } @@ -242,8 +214,8 @@ impl ClientJob { }; self.dbctx.conn.lock().unwrap().execute( - "update jobs set complete_time=?1, state=?2, build_result=?3, final_status=?4 where id=?5", - (now as u64, state as u64, build_result as u8, result_desc, self.job.id) + "update runs set complete_time=?1, state=?2, build_result=?3, final_status=?4 where id=?5", + (now as u64, state as u64, build_result as u8, result_desc, self.task.id) ) .expect("can update"); } @@ -258,7 +230,7 @@ impl ClientJob { let name = msg.as_object().unwrap().get("name").unwrap().as_str().unwrap(); let value = msg.as_object().unwrap().get("value").unwrap().as_str().unwrap(); - self.dbctx.insert_metric(self.job.id, name, value) + self.dbctx.insert_metric(self.task.id, name, value) .expect("TODO handle metric insert error?"); } "command" => { @@ -311,7 +283,7 @@ impl RunnerClient { } // is this client willing to run the job based on what it has told us so far? - fn will_accept(&self, job: &PendingJob) -> bool { + fn will_accept(&self, job: &Job) -> bool { match (job.source.as_ref(), self.accepted_sources.as_ref()) { (_, None) => true, (None, Some(_)) => false, @@ -321,7 +293,7 @@ impl RunnerClient { } } - async fn submit(mut self, dbctx: &Arc<DbCtx>, job: &PendingJob, remote_git_url: &str, sha: &str) -> Result<Option<ClientJob>, String> { + async fn submit(mut self, dbctx: &Arc<DbCtx>, job: &PendingRun, remote_git_url: &str, sha: &str) -> Result<Option<ClientJob>, String> { self.send(serde_json::json!({ "commit": sha, "remote_url": remote_git_url, @@ -334,7 +306,7 @@ impl RunnerClient { }) { eprintln!("resp: {:?}", resp); Ok(Some(ClientJob { - job: job.clone(), + task: job.clone(), dbctx: Arc::clone(dbctx), sha: sha.to_string(), remote_git_url: remote_git_url.to_string(), @@ -364,18 +336,18 @@ impl fmt::Debug for RunnerClient { #[axum_macros::debug_handler] async fn handle_artifact(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClient>)>, headers: HeaderMap, artifact_content: BodyStream) -> impl IntoResponse { eprintln!("artifact request"); - let job_token = match headers.get("x-job-token") { - Some(job_token) => job_token.to_str().expect("valid string"), + let run_token = match headers.get("x-task-token") { + Some(run_token) => run_token.to_str().expect("valid string"), None => { - eprintln!("bad artifact post: headers: {:?}\nno x-job-token", headers); + eprintln!("bad artifact post: headers: {:?}\nno x-tasak-token", headers); return (StatusCode::BAD_REQUEST, "").into_response(); } }; - let (job, artifact_path, token_validity) = match ctx.0.job_for_token(&job_token).unwrap() { + let (run, artifact_path, token_validity) = match ctx.0.run_for_token(&run_token).unwrap() { Some(result) => result, None => { - eprintln!("bad artifact post: headers: {:?}\njob token is not known", headers); + eprintln!("bad artifact post: headers: {:?}\nrun token is not known", headers); return (StatusCode::BAD_REQUEST, "").into_response(); } }; @@ -408,7 +380,7 @@ async fn handle_artifact(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClien } }; - let mut artifact = match ctx.0.reserve_artifact(job, artifact_name, artifact_desc).await { + let mut artifact = match ctx.0.reserve_artifact(run, artifact_name, artifact_desc).await { Ok(artifact) => artifact, Err(err) => { eprintln!("failure to reserve artifact: {:?}", err); @@ -442,7 +414,7 @@ async fn handle_next_job(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClien } } None => { - eprintln!("bad artifact post: headers: {:?}\nno x-job-token", headers); + eprintln!("bad artifact post: headers: {:?}\nno authorization", headers); return (StatusCode::BAD_REQUEST, "").into_response(); } }; @@ -533,13 +505,13 @@ async fn main() { dbctx.create_tables().unwrap(); loop { - let jobs = dbctx.get_pending_jobs().unwrap(); + let runs = dbctx.get_pending_runs().unwrap(); - if jobs.len() > 0 { - println!("{} new jobs", jobs.len()); + if runs.len() > 0 { + println!("{} new runs", runs.len()); - for job in jobs.into_iter() { - activate_job(Arc::clone(&dbctx), &job, &mut channel).await; + for run in runs.into_iter() { + activate_run(Arc::clone(&dbctx), &run, &mut channel).await; } } tokio::time::sleep(std::time::Duration::from_millis(100)).await; diff --git a/src/ci_runner.rs b/src/ci_runner.rs index 4052ba5..330490c 100644 --- a/src/ci_runner.rs +++ b/src/ci_runner.rs @@ -129,7 +129,7 @@ impl RunningJob { let (mut sender, body) = hyper::Body::channel(); let resp = self.client.http.post("https://ci.butactuallyin.space:9876/api/artifact") .header("user-agent", "ci-butactuallyin-space-runner") - .header("x-job-token", &self.job.build_token) + .header("x-task-token", &self.job.build_token) .header("x-artifact-name", name) .header("x-artifact-desc", desc) .body(body) diff --git a/src/dbctx.rs b/src/dbctx.rs index dcca362..331e55f 100644 --- a/src/dbctx.rs +++ b/src/dbctx.rs @@ -36,34 +36,51 @@ pub struct Remote { pub notifier_config_path: String, } +// a job tracks when we became aware of a commit from remote. typically a job will have a 1-1 +// relationship with commits, and potentially many *runs* of that job. #[derive(Debug, Clone)] pub struct Job { pub id: u64, - pub artifacts_path: Option<String>, - pub state: sql::JobState, - pub run_host: Option<String>, pub remote_id: u64, pub commit_id: u64, pub created_time: u64, + pub source: Option<String>, +} + +// a run tracks the intent or obligation to have some runner somewhere run a goodfile and report +// results. a job may have many runs from many different hosts rebuliding history, or reruns of the +// same job on the same hardware to collect more datapoints on the operation. +#[derive(Debug, Clone)] +pub struct Run { + pub id: u64, + pub job_id: u64, + pub artifacts_path: Option<String>, + pub state: sql::RunState, + pub run_host: Option<String>, + pub create_time: u64, pub start_time: Option<u64>, pub complete_time: Option<u64>, pub build_token: Option<String>, - pub job_timeout: Option<u64>, - pub source: Option<String>, + pub run_timeout: Option<u64>, pub build_result: Option<u8>, pub final_text: Option<String>, } +impl Run { + fn into_pending_run(self) -> PendingRun { + PendingRun { + id: self.id, + job_id: self.job_id, + create_time: self.create_time, + } + } +} + #[derive(Debug, Clone)] -pub struct PendingJob { +pub struct PendingRun { pub id: u64, - pub artifacts: Option<String>, - pub state: sql::JobState, - pub run_host: Option<String>, - pub remote_id: u64, - pub commit_id: u64, - pub created_time: u64, - pub source: Option<String>, + pub job_id: u64, + pub create_time: u64, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -76,7 +93,7 @@ pub enum TokenValidity { #[derive(Debug, Clone, PartialEq, Eq)] pub struct MetricRecord { pub id: u64, - pub job_id: u64, + pub run_id: u64, pub name: String, pub value: String } @@ -84,7 +101,7 @@ pub struct MetricRecord { #[derive(Debug, Clone, PartialEq, Eq)] pub struct ArtifactRecord { pub id: u64, - pub job_id: u64, + pub run_id: u64, pub name: String, pub desc: String, pub created_time: u64, @@ -113,12 +130,12 @@ impl DbCtx { Ok(()) } - pub fn insert_metric(&self, job_id: u64, name: &str, value: &str) -> Result<(), String> { + pub fn insert_metric(&self, run_id: u64, name: &str, value: &str) -> Result<(), String> { let conn = self.conn.lock().unwrap(); conn .execute( - "insert into metrics (job_id, name, value) values (?1, ?2, ?3) on conflict (job_id, name) do update set value=excluded.value", - (job_id, name, value) + "insert into metrics (run_id, name, value) values (?1, ?2, ?3) on conflict (run_id, name) do update set value=excluded.value", + (run_id, name, value) ) .expect("can upsert"); Ok(()) @@ -163,7 +180,7 @@ impl DbCtx { }) } - pub async fn reserve_artifact(&self, job_id: u64, name: &str, desc: &str) -> Result<ArtifactDescriptor, String> { + pub async fn reserve_artifact(&self, run_id: u64, name: &str, desc: &str) -> Result<ArtifactDescriptor, String> { let artifact_id = { let created_time = SystemTime::now() .duration_since(UNIX_EPOCH) @@ -172,8 +189,8 @@ impl DbCtx { let conn = self.conn.lock().unwrap(); conn .execute( - "insert into artifacts (job_id, name, desc, created_time) values (?1, ?2, ?3, ?4)", - (job_id, name, desc, created_time) + "insert into artifacts (run_id, name, desc, created_time) values (?1, ?2, ?3, ?4)", + (run_id, name, desc, created_time) ) .map_err(|e| { format!("{:?}", e) @@ -182,17 +199,17 @@ impl DbCtx { conn.last_insert_rowid() as u64 }; - ArtifactDescriptor::new(job_id, artifact_id).await + ArtifactDescriptor::new(run_id, artifact_id).await } - pub fn lookup_artifact(&self, job_id: u64, artifact_id: u64) -> Result<Option<ArtifactRecord>, String> { + pub fn lookup_artifact(&self, run_id: u64, artifact_id: u64) -> Result<Option<ArtifactRecord>, String> { let conn = self.conn.lock().unwrap(); conn - .query_row(sql::ARTIFACT_BY_ID, [artifact_id, job_id], |row| { - let (id, job_id, name, desc, created_time, completed_time) = row.try_into().unwrap(); + .query_row(sql::ARTIFACT_BY_ID, [artifact_id, run_id], |row| { + let (id, run_id, name, desc, created_time, completed_time) = row.try_into().unwrap(); Ok(ArtifactRecord { - id, job_id, name, desc, created_time, completed_time + id, run_id, name, desc, created_time, completed_time }) }) .optional() @@ -222,11 +239,11 @@ impl DbCtx { .map_err(|e| e.to_string()) } - pub fn job_for_token(&self, token: &str) -> Result<Option<(u64, Option<String>, TokenValidity)>, String> { + pub fn run_for_token(&self, token: &str) -> Result<Option<(u64, Option<String>, TokenValidity)>, String> { self.conn.lock() .unwrap() .query_row( - "select id, artifacts_path, started_time, job_timeout from jobs where build_token=?1", + "select id, artifacts_path, started_time, run_timeout from runs where build_token=?1", [token], |row| { let timeout: Option<u64> = row.get(3).unwrap(); @@ -254,6 +271,20 @@ impl DbCtx { .map_err(|e| e.to_string()) } + pub fn job_by_id(&self, id: u64) -> Result<Option<Job>, String> { + self.conn.lock() + .unwrap() + .query_row(crate::sql::JOB_BY_ID, [id], |row| { + let (id, source, created_time, remote_id, commit_id) = row.try_into().unwrap(); + + Ok(Job { + id, source, created_time, remote_id, commit_id + }) + }) + .optional() + .map_err(|e| e.to_string()) + } + pub fn remote_by_id(&self, id: u64) -> Result<Option<Remote>, String> { self.conn.lock() .unwrap() @@ -322,40 +353,62 @@ impl DbCtx { let conn = self.conn.lock().unwrap(); let rows_modified = conn.execute( - "insert into jobs (state, remote_id, commit_id, created_time, source) values (?1, ?2, ?3, ?4, ?5);", - (crate::sql::JobState::Pending as u64, remote_id, commit_id, created_time, pusher) + "insert into jobs (remote_id, commit_id, created_time, source) values (?1, ?2, ?3, ?4, ?5);", + (remote_id, commit_id, created_time, pusher) ).unwrap(); assert_eq!(1, rows_modified); - Ok(conn.last_insert_rowid() as u64) + let job_id = conn.last_insert_rowid() as u64; + + Ok(job_id) } - pub fn metrics_for_job(&self, job: u64) -> Result<Vec<MetricRecord>, String> { + pub fn new_run(&self, job_id: u64) -> Result<u64, String> { + let created_time = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("now is before epoch") + .as_millis() as u64; + let conn = self.conn.lock().unwrap(); - let mut metrics_query = conn.prepare(sql::METRICS_FOR_JOB).unwrap(); - let mut result = metrics_query.query([job]).unwrap(); + let rows_modified = conn.execute( + "insert into runs (job_id, state, created_time) values (?1, ?2, ?3);", + (job_id, crate::sql::RunState::Pending as u64, created_time) + ).unwrap(); + + assert_eq!(1, rows_modified); + + let run_id = conn.last_insert_rowid() as u64; + + Ok(run_id) + } + + pub fn metrics_for_run(&self, run: u64) -> Result<Vec<MetricRecord>, String> { + let conn = self.conn.lock().unwrap(); + + let mut metrics_query = conn.prepare(sql::METRICS_FOR_RUN).unwrap(); + let mut result = metrics_query.query([run]).unwrap(); let mut metrics = Vec::new(); while let Some(row) = result.next().unwrap() { - let (id, job_id, name, value): (u64, u64, String, String) = row.try_into().unwrap(); - metrics.push(MetricRecord { id, job_id, name, value }); + let (id, run_id, name, value): (u64, u64, String, String) = row.try_into().unwrap(); + metrics.push(MetricRecord { id, run_id, name, value }); } Ok(metrics) } - pub fn artifacts_for_job(&self, job: u64, limit: Option<u64>) -> Result<Vec<ArtifactRecord>, String> { + pub fn artifacts_for_run(&self, run: u64, limit: Option<u64>) -> Result<Vec<ArtifactRecord>, String> { let conn = self.conn.lock().unwrap(); - let mut artifacts_query = conn.prepare(sql::LAST_ARTIFACTS_FOR_JOB).unwrap(); - let mut result = artifacts_query.query([job, limit.unwrap_or(65535)]).unwrap(); + let mut artifacts_query = conn.prepare(sql::LAST_ARTIFACTS_FOR_RUN).unwrap(); + let mut result = artifacts_query.query([run, limit.unwrap_or(65535)]).unwrap(); let mut artifacts = Vec::new(); while let Some(row) = result.next().unwrap() { - let (id, job_id, name, desc, created_time, completed_time): (u64, u64, String, String, u64, Option<u64>) = row.try_into().unwrap(); - artifacts.push(ArtifactRecord { id, job_id, name, desc, created_time, completed_time }); + let (id, run_id, name, desc, created_time, completed_time): (u64, u64, String, String, u64, Option<u64>) = row.try_into().unwrap(); + artifacts.push(ArtifactRecord { id, run_id, name, desc, created_time, completed_time }); } Ok(artifacts) @@ -403,23 +456,13 @@ impl DbCtx { conn .query_row(sql::JOB_BY_COMMIT_ID, [commit_id], |row| { - let (id, artifacts_path, state, run_host, remote_id, commit_id, created_time, start_time, complete_time, build_token, job_timeout, source, build_result, final_text) = row.try_into().unwrap(); - let state: u8 = state; + let (id, source, created_time, remote_id, commit_id) = row.try_into().unwrap(); Ok(Job { id, - artifacts_path, - state: state.try_into().unwrap(), - run_host, remote_id, commit_id, created_time, - start_time, - complete_time, - build_token, - job_timeout, source, - build_result, - final_text, }) }) .optional() @@ -435,77 +478,43 @@ impl DbCtx { let mut jobs = Vec::new(); while let Some(row) = result.next().unwrap() { - let (id, artifacts_path, state, run_host, remote_id, commit_id, created_time, start_time, complete_time, build_token, job_timeout, source, build_result, final_text)= row.try_into().unwrap(); - let state: u8 = state; + let (id, source, created_time, remote_id, commit_id) = row.try_into().unwrap(); jobs.push(Job { id, - artifacts_path, - state: state.try_into().unwrap(), - run_host, remote_id, commit_id, created_time, - start_time, - complete_time, - build_token, - job_timeout, source, - build_result, - final_text, }); } Ok(jobs) } - pub fn get_active_jobs(&self) -> Result<Vec<Job>, String> { + pub fn get_active_runs(&self) -> Result<Vec<Run>, String> { let conn = self.conn.lock().unwrap(); - let mut started_query = conn.prepare(sql::ACTIVE_JOBS).unwrap(); - let mut jobs = started_query.query([]).unwrap(); + let mut started_query = conn.prepare(sql::ACTIVE_RUNS).unwrap(); + let mut runs = started_query.query([]).unwrap(); let mut started = Vec::new(); - while let Some(row) = jobs.next().unwrap() { - let (id, artifacts_path, state, run_host, remote_id, commit_id, created_time, start_time, complete_time, build_token, job_timeout, source, build_result, final_text) = row.try_into().unwrap(); - let state: u8 = state; - - started.push(Job { - id, - artifacts_path, - state: state.try_into().unwrap(), - run_host, - remote_id, - commit_id, - created_time, - start_time, - complete_time, - build_token, - job_timeout, - source, - build_result, - final_text, - }); + while let Some(row) = runs.next().unwrap() { + started.push(crate::sql::row2run(row)); } Ok(started) } - pub fn get_pending_jobs(&self) -> Result<Vec<PendingJob>, String> { + pub fn get_pending_runs(&self) -> Result<Vec<PendingRun>, String> { let conn = self.conn.lock().unwrap(); - let mut pending_query = conn.prepare(sql::PENDING_JOBS).unwrap(); - let mut jobs = pending_query.query([]).unwrap(); + let mut pending_query = conn.prepare(sql::PENDING_RUNS).unwrap(); + let mut runs = pending_query.query([]).unwrap(); let mut pending = Vec::new(); - while let Some(row) = jobs.next().unwrap() { - let (id, artifacts, state, run_host, remote_id, commit_id, created_time, source) = row.try_into().unwrap(); - let state: u8 = state; - pending.push(PendingJob { - id, artifacts, - state: state.try_into().unwrap(), - run_host, remote_id, commit_id, created_time, - source, - }); + while let Some(row) = runs.next().unwrap() { + let run = crate::sql::row2run(row).into_pending_run(); + pending.push(run); } Ok(pending) @@ -526,6 +535,17 @@ impl DbCtx { Ok(remotes) } + pub fn last_run_for_job(&self, job_id: u64) -> Result<Option<Run>, String> { + let conn = self.conn.lock().unwrap(); + + conn + .query_row(sql::LAST_RUN_FOR_JOB, [job_id], |row| { + Ok(crate::sql::row2run(row)) + }) + .optional() + .map_err(|e| e.to_string()) + } + pub fn notifiers_by_repo(&self, repo_id: u64) -> Result<Vec<RemoteNotifier>, String> { let remotes = self.remotes_by_repo(repo_id)?; diff --git a/src/main.rs b/src/main.rs index 5f719a0..f008a56 100644 --- a/src/main.rs +++ b/src/main.rs @@ -35,9 +35,9 @@ mod sql; mod notifier; mod dbctx; -use sql::JobState; +use sql::RunState; -use dbctx::{DbCtx, Job, ArtifactRecord}; +use dbctx::{DbCtx, Job, Run, ArtifactRecord}; use rusqlite::OptionalExtension; @@ -148,14 +148,14 @@ fn job_url(job: &Job, commit_sha: &str, ctx: &Arc<DbCtx>) -> String { format!("{}/{}", &remote.remote_path, commit_sha) } -/// render how long a job took, or is taking, in a human-friendly way -fn display_job_time(job: &Job) -> String { - if let Some(start_time) = job.start_time { - if let Some(complete_time) = job.complete_time { +/// render how long a run took, or is taking, in a human-friendly way +fn display_run_time(run: &Run) -> String { + if let Some(start_time) = run.start_time { + if let Some(complete_time) = run.complete_time { if complete_time < start_time { - if job.state == JobState::Started { - // this job has been restarted. the completed time is stale. - // further, this is a currently active job. + if run.state == RunState::Started { + // this run has been restarted. the completed time is stale. + // further, this is a currently active run. let now_ms = SystemTime::now().duration_since(UNIX_EPOCH).expect("now is after then").as_millis() as u64; let mut duration = duration_as_human_string(now_ms - start_time); duration.push_str(" (ongoing)"); @@ -332,21 +332,23 @@ async fn handle_ci_index(State(ctx): State<WebserverState>) -> impl IntoResponse let mut row_num = 0; for repo in repos { - let mut most_recent_job: Option<Job> = None; + let mut most_recent_run: Option<(Job, Run)> = None; for remote in ctx.dbctx.remotes_by_repo(repo.id).expect("remotes by repo works") { let last_job = ctx.dbctx.last_job_from_remote(remote.id).expect("job by remote works"); if let Some(last_job) = last_job { - if most_recent_job.as_ref().map(|job| job.created_time < last_job.created_time).unwrap_or(true) { - most_recent_job = Some(last_job); + if let Some(last_run) = ctx.dbctx.last_run_for_job(last_job.id).expect("can query") { + if most_recent_run.as_ref().map(|run| run.1.create_time < last_run.create_time).unwrap_or(true) { + most_recent_run = Some((last_job, last_run)); + } } } } let repo_html = format!("<a href=\"/{}\">{}</a>", &repo.name, &repo.name); - let row_html: String = match most_recent_job { - Some(job) => { + let row_html: String = match most_recent_run { + Some((job, run)) => { let job_commit = ctx.dbctx.commit_sha(job.commit_id).expect("job has a commit"); let commit_html = match commit_url(&job, &job_commit, &ctx.dbctx) { Some(url) => format!("<a href=\"{}\">{}</a>", url, &job_commit), @@ -355,17 +357,17 @@ async fn handle_ci_index(State(ctx): State<WebserverState>) -> impl IntoResponse let job_html = format!("<a href=\"{}\">{}</a>", job_url(&job, &job_commit, &ctx.dbctx), job.id); - let last_build_time = Utc.timestamp_millis_opt(job.created_time as i64).unwrap().to_rfc2822(); - let duration = display_job_time(&job); + let last_build_time = Utc.timestamp_millis_opt(run.create_time as i64).unwrap().to_rfc2822(); + let duration = display_run_time(&run); - let status = format!("{:?}", job.state).to_lowercase(); + let status = format!("{:?}", run.state).to_lowercase(); - let result = match job.build_result { + let result = match run.build_result { Some(0) => "<span style='color:green;'>pass</span>", Some(_) => "<span style='color:red;'>fail</span>", - None => match job.state { - JobState::Pending => { "unstarted" }, - JobState::Started => { "<span style='color:darkgoldenrod;'>in progress</span>" }, + None => match run.state { + RunState::Pending => { "unstarted" }, + RunState::Started => { "<span style='color:darkgoldenrod;'>in progress</span>" }, _ => { "<span style='color:red;'>unreported</span>" } } }; @@ -401,10 +403,10 @@ async fn handle_ci_index(State(ctx): State<WebserverState>) -> impl IntoResponse } response.push_str("</table>"); - response.push_str("<h4>active jobs</h4>\n"); + response.push_str("<h4>active tasks</h4>\n"); - let jobs = ctx.dbctx.get_active_jobs().expect("can query"); - if jobs.len() == 0 { + let runs = ctx.dbctx.get_active_runs().expect("can query"); + if runs.len() == 0 { response.push_str("<p>(none)</p>\n"); } else { response.push_str("<table class='build-table'>"); @@ -417,9 +419,10 @@ async fn handle_ci_index(State(ctx): State<WebserverState>) -> impl IntoResponse let mut row_num = 0; - for job in jobs.iter() { + for run in runs.iter() { let row_index = row_num % 2; + let job = ctx.dbctx.job_by_id(run.job_id).expect("query succeeds").expect("job id is valid"); let remote = ctx.dbctx.remote_by_id(job.remote_id).expect("query succeeds").expect("remote id is valid"); let repo = ctx.dbctx.repo_by_id(remote.repo_id).expect("query succeeds").expect("repo id is valid"); @@ -433,17 +436,17 @@ async fn handle_ci_index(State(ctx): State<WebserverState>) -> impl IntoResponse let job_html = format!("<a href=\"{}\">{}</a>", job_url(&job, &job_commit, &ctx.dbctx), job.id); - let last_build_time = Utc.timestamp_millis_opt(job.created_time as i64).unwrap().to_rfc2822(); - let duration = display_job_time(&job); + let last_build_time = Utc.timestamp_millis_opt(run.create_time as i64).unwrap().to_rfc2822(); + let duration = display_run_time(&run); - let status = format!("{:?}", job.state).to_lowercase(); + let status = format!("{:?}", run.state).to_lowercase(); - let result = match job.build_result { + let result = match run.build_result { Some(0) => "<span style='color:green;'>pass</span>", Some(_) => "<span style='color:red;'>fail</span>", - None => match job.state { - JobState::Pending => { "unstarted" }, - JobState::Started => { "<span style='color:darkgoldenrod;'>in progress</span>" }, + None => match run.state { + RunState::Pending => { "unstarted" }, + RunState::Started => { "<span style='color:darkgoldenrod;'>in progress</span>" }, _ => { "<span style='color:red;'>unreported</span>" } } }; @@ -498,9 +501,11 @@ async fn handle_commit_status(Path(path): Path<(String, String, String)>, State( let job = ctx.dbctx.job_by_commit_id(commit_id).expect("can query").expect("job exists"); - let complete_time = job.complete_time.unwrap_or_else(crate::io::now_ms); + let run = ctx.dbctx.last_run_for_job(job.id).expect("can query").expect("run exists"); + + let complete_time = run.complete_time.unwrap_or_else(crate::io::now_ms); - let debug_info = job.state == JobState::Finished && job.build_result == Some(1) || job.state == JobState::Error; + let debug_info = run.state == RunState::Finished && run.build_result == Some(1) || run.state == RunState::Error; let repo_name: String = ctx.dbctx.conn.lock().unwrap() .query_row("select repo_name from repos where id=?1;", [repo_id], |row| row.get(0)) @@ -511,42 +516,42 @@ async fn handle_commit_status(Path(path): Path<(String, String, String)>, State( let head = format!("<head><title>ci.butactuallyin.space - {}</title></head>", repo_name); let repo_html = format!("<a href=\"/{}\">{}</a>", &repo_name, &repo_name); let remote_commit_elem = format!("<a href=\"https://www.github.com/{}/commit/{}\">{}</a>", &remote_path, &sha, &sha); - let status_elem = match job.state { - JobState::Pending | JobState::Started => { + let status_elem = match run.state { + RunState::Pending | RunState::Started => { "<span style='color:#660;'>pending</span>" }, - JobState::Finished => { - if let Some(build_result) = job.build_result { + RunState::Finished => { + if let Some(build_result) = run.build_result { if build_result == 0 { "<span style='color:green;'>pass</span>" } else { "<span style='color:red;'>failed</span>" } } else { - eprintln!("job {} for commit {} is missing a build result but is reportedly finished (old data)?", job.id, commit_id); + eprintln!("run {} for commit {} is missing a build result but is reportedly finished (old data)?", run.id, commit_id); "<span style='color:red;'>unreported</span>" } }, - JobState::Error => { + RunState::Error => { "<span style='color:red;'>error</span>" } - JobState::Invalid => { + RunState::Invalid => { "<span style='color:red;'>(server error)</span>" } }; let mut artifacts_fragment = String::new(); - let mut artifacts: Vec<ArtifactRecord> = ctx.dbctx.artifacts_for_job(job.id, None).unwrap() - .into_iter() // HACK: filter out artifacts for previous runs of a job. artifacts should be attached to a run, runs should be distinct from jobs. but i'm sleepy. - .filter(|artifact| artifact.created_time >= job.start_time.unwrap_or_else(crate::io::now_ms)) + let mut artifacts: Vec<ArtifactRecord> = ctx.dbctx.artifacts_for_run(run.id, None).unwrap() + .into_iter() // HACK: filter out artifacts for previous runs of a run. artifacts should be attached to a run, runs should be distinct from run. but i'm sleepy. + .filter(|artifact| artifact.created_time >= run.start_time.unwrap_or_else(crate::io::now_ms)) .collect(); artifacts.sort_by_key(|artifact| artifact.created_time); - fn diff_times(job_completed: u64, artifact_completed: Option<u64>) -> u64 { + fn diff_times(run_completed: u64, artifact_completed: Option<u64>) -> u64 { let artifact_completed = artifact_completed.unwrap_or_else(crate::io::now_ms); - let job_completed = std::cmp::max(job_completed, artifact_completed); - job_completed - artifact_completed + let run_completed = std::cmp::max(run_completed, artifact_completed); + run_completed - artifact_completed } let recent_artifacts: Vec<ArtifactRecord> = artifacts.iter().filter(|artifact| diff_times(complete_time, artifact.completed_time) <= 60_000).cloned().collect(); @@ -556,7 +561,7 @@ async fn handle_commit_status(Path(path): Path<(String, String, String)>, State( let created_time_str = Utc.timestamp_millis_opt(artifact.created_time as i64).unwrap().to_rfc2822(); artifacts_fragment.push_str(&format!("<div><pre style='display:inline;'>{}</pre> step: <pre style='display:inline;'>{}</pre></div>\n", created_time_str, &artifact.name)); let duration_str = duration_as_human_string(artifact.completed_time.unwrap_or_else(crate::io::now_ms) - artifact.created_time); - let size_str = (std::fs::metadata(&format!("./jobs/{}/{}", artifact.job_id, artifact.id)).expect("metadata exists").len() / 1024).to_string(); + let size_str = (std::fs::metadata(&format!("./artifacts/{}/{}", artifact.run_id, artifact.id)).expect("metadata exists").len() / 1024).to_string(); artifacts_fragment.push_str(&format!("<pre> {}kb in {} </pre>\n", size_str, duration_str)); } @@ -565,18 +570,18 @@ async fn handle_commit_status(Path(path): Path<(String, String, String)>, State( artifacts_fragment.push_str(&format!("<div><pre style='display:inline;'>{}</pre> step: <pre style='display:inline;'>{}</pre></div>\n", created_time_str, &artifact.name)); if debug_info { artifacts_fragment.push_str("<pre>"); - artifacts_fragment.push_str(&std::fs::read_to_string(format!("./jobs/{}/{}", artifact.job_id, artifact.id)).unwrap()); + artifacts_fragment.push_str(&std::fs::read_to_string(format!("./artifacts/{}/{}", artifact.run_id, artifact.id)).unwrap()); artifacts_fragment.push_str("</pre>\n"); } else { let duration_str = duration_as_human_string(artifact.completed_time.unwrap_or_else(crate::io::now_ms) - artifact.created_time); - let size_str = std::fs::metadata(&format!("./jobs/{}/{}", artifact.job_id, artifact.id)).map(|md| { + let size_str = std::fs::metadata(&format!("./artifacts/{}/{}", artifact.run_id, artifact.id)).map(|md| { (md.len() / 1024).to_string() }).unwrap_or_else(|e| format!("[{}]", e)); artifacts_fragment.push_str(&format!("<pre> {}kb in {} </pre>\n", size_str, duration_str)); } } - let metrics = ctx.dbctx.metrics_for_job(job.id).unwrap(); + let metrics = ctx.dbctx.metrics_for_run(run.id).unwrap(); let metrics_section = if metrics.len() > 0 { let mut section = String::new(); section.push_str("<div>"); @@ -599,9 +604,9 @@ async fn handle_commit_status(Path(path): Path<(String, String, String)>, State( html.push_str(" <body>\n"); html.push_str(" <pre>\n"); html.push_str(&format!("repo: {}\n", repo_html)); - html.push_str(&format!("commit: {}, job: {}\n", remote_commit_elem, job.id)); - html.push_str(&format!("status: {} in {}\n", status_elem, display_job_time(&job))); - if let Some(desc) = job.final_text.as_ref() { + html.push_str(&format!("commit: {}, run: {}\n", remote_commit_elem, run.id)); + html.push_str(&format!("status: {} in {}\n", status_elem, display_run_time(&run))); + if let Some(desc) = run.final_text.as_ref() { html.push_str(&format!(" description: {}\n ", desc)); } html.push_str(&format!("deployed: {}\n", deployed)); @@ -620,11 +625,11 @@ async fn handle_commit_status(Path(path): Path<(String, String, String)>, State( } async fn handle_get_artifact(Path(path): Path<(String, String)>, State(ctx): State<WebserverState>) -> impl IntoResponse { - eprintln!("get artifact, job={}, artifact={}", path.0, path.1); - let job_id: u64 = path.0.parse().unwrap(); + eprintln!("get artifact, run={}, artifact={}", path.0, path.1); + let run: u64 = path.0.parse().unwrap(); let artifact_id: u64 = path.1.parse().unwrap(); - let artifact_descriptor = match ctx.dbctx.lookup_artifact(job_id, artifact_id).unwrap() { + let artifact_descriptor = match ctx.dbctx.lookup_artifact(run, artifact_id).unwrap() { Some(artifact) => artifact, None => { return (StatusCode::NOT_FOUND, Html("no such artifact")).into_response(); @@ -645,7 +650,7 @@ async fn handle_get_artifact(Path(path): Path<(String, String)>, State(ctx): Sta let (mut tx_sender, tx_receiver) = tokio::io::duplex(65536); let resp_body = axum_extra::body::AsyncReadBody::new(tx_receiver); let mut artifact_path = ctx.jobs_path.clone(); - artifact_path.push(artifact_descriptor.job_id.to_string()); + artifact_path.push(artifact_descriptor.run_id.to_string()); artifact_path.push(artifact_descriptor.id.to_string()); spawn(async move { let mut artifact = artifact_descriptor; @@ -661,7 +666,7 @@ async fn handle_get_artifact(Path(path): Path<(String, String)>, State(ctx): Sta // this would be much implemented as yielding on a condvar woken when an // inotify event on the file indicates a write has occurred. but i am // dreadfully lazy, so we'll just uhh. busy-poll on the file? lmao. - artifact = ctx.dbctx.lookup_artifact(artifact.job_id, artifact.id) + artifact = ctx.dbctx.lookup_artifact(artifact.run_id, artifact.id) .expect("can query db") .expect("artifact still exists"); } @@ -724,6 +729,7 @@ async fn handle_repo_summary(Path(path): Path<String>, State(ctx): State<Webserv let mut row_num = 0; for job in last_builds.iter().take(10) { + let run = ctx.dbctx.last_run_for_job(job.id).expect("query succeeds").expect("TODO: run exists if job exists (small race if querying while creating job ...."); let job_commit = ctx.dbctx.commit_sha(job.commit_id).expect("job has a commit"); let commit_html = match commit_url(&job, &job_commit, &ctx.dbctx) { Some(url) => format!("<a href=\"{}\">{}</a>", url, &job_commit), @@ -732,17 +738,17 @@ async fn handle_repo_summary(Path(path): Path<String>, State(ctx): State<Webserv let job_html = format!("<a href=\"{}\">{}</a>", job_url(&job, &job_commit, &ctx.dbctx), job.id); - let last_build_time = Utc.timestamp_millis_opt(job.created_time as i64).unwrap().to_rfc2822(); - let duration = display_job_time(&job); + let last_build_time = Utc.timestamp_millis_opt(run.create_time as i64).unwrap().to_rfc2822(); + let duration = display_run_time(&run); - let status = format!("{:?}", job.state).to_lowercase(); + let status = format!("{:?}", run.state).to_lowercase(); - let result = match job.build_result { + let result = match run.build_result { Some(0) => "<span style='color:green;'>pass</span>", Some(_) => "<span style='color:red;'>fail</span>", - None => match job.state { - JobState::Pending => { "unstarted" }, - JobState::Started => { "<span style='color:darkgoldenrod;'>in progress</span>" }, + None => match run.state { + RunState::Pending => { "unstarted" }, + RunState::Started => { "<span style='color:darkgoldenrod;'>in progress</span>" }, _ => { "<span style='color:red;'>unreported</span>" } } }; @@ -885,7 +891,7 @@ async fn make_app_server(jobs_path: PathBuf, cfg_path: &PathBuf, db_path: &PathB .route("/:owner/:repo/:sha", get(handle_commit_status)) .route("/:owner", get(handle_repo_summary)) .route("/:owner/:repo", post(handle_repo_event)) - .route("/artifact/:job/:artifact_id", get(handle_get_artifact)) + .route("/artifact/:b/:artifact_id", get(handle_get_artifact)) .route("/", get(handle_ci_index)) .fallback(fallback_get) .with_state(WebserverState { @@ -2,6 +2,9 @@ use std::convert::TryFrom; +use crate::dbctx::Run; +use crate::dbctx::Job; + #[derive(Debug, Clone)] pub enum JobResult { Pass = 0, @@ -9,7 +12,7 @@ pub enum JobResult { } #[derive(Debug, Clone, PartialEq)] -pub enum JobState { +pub enum RunState { Pending = 0, Started = 1, Finished = 2, @@ -17,38 +20,48 @@ pub enum JobState { Invalid = 4, } -impl TryFrom<u8> for JobState { +impl TryFrom<u8> for RunState { type Error = String; fn try_from(value: u8) -> Result<Self, String> { match value { - 0 => Ok(JobState::Pending), - 1 => Ok(JobState::Started), - 2 => Ok(JobState::Finished), - 3 => Ok(JobState::Error), - 4 => Ok(JobState::Invalid), + 0 => Ok(RunState::Pending), + 1 => Ok(RunState::Started), + 2 => Ok(RunState::Finished), + 3 => Ok(RunState::Error), + 4 => Ok(RunState::Invalid), other => Err(format!("invalid job state: {}", other)), } } } +pub(crate) fn row2run(row: &rusqlite::Row) -> Run { + let (id, job_id, artifacts_path, state, run_host, build_token, create_time, start_time, complete_time, run_timeout, build_result, final_text) = row.try_into().unwrap(); + let state: u8 = state; + Run { + id, + job_id, + artifacts_path, + state: state.try_into().unwrap(), + run_host, + create_time, + start_time, + complete_time, + build_token, + run_timeout, + build_result, + final_text, + } +} + // remote_id is the remote from which we were notified. this is necessary so we know which remote // to pull from to actually run the job. pub const CREATE_JOBS_TABLE: &'static str = "\ CREATE TABLE IF NOT EXISTS jobs (id INTEGER PRIMARY KEY AUTOINCREMENT, - artifacts_path TEXT, - state INTEGER NOT NULL, - run_host TEXT, - build_token TEXT, - remote_id INTEGER, - commit_id INTEGER, - created_time INTEGER, - started_time INTEGER, - complete_time INTEGER, - job_timeout INTEGER, source TEXT, - build_result INTEGER, - final_status TEXT);"; + created_time INTEGER, + remote_id INTEGER, + commit_id INTEGER);"; pub const CREATE_METRICS_TABLE: &'static str = "\ CREATE TABLE IF NOT EXISTS metrics (id INTEGER PRIMARY KEY AUTOINCREMENT, @@ -82,35 +95,63 @@ pub const CREATE_REMOTES_TABLE: &'static str = "\ pub const CREATE_ARTIFACTS_TABLE: &'static str = "\ CREATE TABLE IF NOT EXISTS artifacts (id INTEGER PRIMARY KEY AUTOINCREMENT, - job_id INTEGER, + run_id INTEGER, name TEXT, desc TEXT, created_time INTEGER, completed_time INTEGER);"; +pub const CREATE_RUN_TABLE: &'static str = "\ + CREATE TABLE IF NOT EXISTS runs (id INTEGER PRIMARY KEY AUTOINCREMENT, + job_id INTEGER, + artifacts_path TEXT, + state INTEGER NOT NULL, + run_host TEXT, + build_token TEXT, + created_time INTEGER, + started_time INTEGER, + complete_time INTEGER, + run_timeout INTEGER, + build_result INTEGER, + final_status TEXT);"; + pub const CREATE_REMOTES_INDEX: &'static str = "\ CREATE INDEX IF NOT EXISTS 'repo_to_remote' ON remotes(repo_id);"; pub const CREATE_REPO_NAME_INDEX: &'static str = "\ CREATE UNIQUE INDEX IF NOT EXISTS 'repo_names' ON repos(repo_name);"; -pub const PENDING_JOBS: &'static str = "\ - select id, artifacts_path, state, run_host, remote_id, commit_id, created_time, source from jobs where state=0;"; - -pub const ACTIVE_JOBS: &'static str = "\ - select * from jobs where state=1 or state=0;"; - -pub const LAST_ARTIFACTS_FOR_JOB: &'static str = "\ - select * from artifacts where job_id=?1 and (name like \"%(stderr)%\" or name like \"%(stdout)%\") order by id desc limit ?2;"; +pub const PENDING_RUNS: &'static str = "\ + select id, job_id, artifacts_path, state, run_host, created_time from runs where state=0;"; + +pub const ACTIVE_RUNS: &'static str = "\ + select id, + job_id, + artifacts_path, + state, + run_host, + build_token, + created_time, + started_time, + complete_time, + run_timeout, + build_result, + final_status from runs where state=1 or state=0;"; + +pub const LAST_ARTIFACTS_FOR_RUN: &'static str = "\ + select * from artifacts where run_id=?1 and (name like \"%(stderr)%\" or name like \"%(stdout)%\") order by id desc limit ?2;"; pub const JOB_BY_COMMIT_ID: &'static str = "\ - select * from jobs where commit_id=?1;"; + select id, source, created_time, remote_id, commit_id from jobs where commit_id=?1;"; pub const ARTIFACT_BY_ID: &'static str = "\ - select * from artifacts where id=?1 and job_id=?2;"; + select * from artifacts where id=?1 and run_id=?2;"; -pub const METRICS_FOR_JOB: &'static str = "\ - select * from metrics where job_id=?1 order by id asc;"; +pub const JOB_BY_ID: &'static str = "\ + select id, source, created_time, remote_id, commit_id from jobs where id=?1"; + +pub const METRICS_FOR_RUN: &'static str = "\ + select * from metrics where run_id=?1 order by id asc;"; pub const COMMIT_TO_ID: &'static str = "\ select id from commits where sha=?1;"; @@ -122,5 +163,23 @@ pub const ALL_REPOS: &'static str = "\ select * from repos;"; pub const LAST_JOBS_FROM_REMOTE: &'static str = "\ - select * from jobs where remote_id=?1 order by created_time desc limit ?2;"; - + select id, source, created_time, remote_id, commit_id from jobs where remote_id=?1 order by created_time desc limit ?2;"; + +pub const LAST_RUN_FOR_JOB: &'static str = "\ + select id, + job_id, + artifacts_path, + state, + run_host, + build_token, + created_time, + started_time, + complete_time, + run_timeout, + build_result, + final_status from runs where job_id=?1;"; + +pub const SELECT_ALL_RUNS_WITH_JOB_INFO: &'static str = "\ + select jobs.id as job_id, runs.id as run_id, runs.state, runs.created_time, jobs.commit_id + from jobs join runs on jobs.id=runs.job_id + oder by runs.created_time asc;"; |