From ec5a274436bc8dda0b55d2c4da1411ff3c52434d Mon Sep 17 00:00:00 2001 From: iximeow Date: Sat, 1 Jul 2023 14:08:00 -0700 Subject: add a notion of runs distinct from jobs, lets see how well this goes over --- src/dbctx.rs | 216 ++++++++++++++++++++++++++++++++--------------------------- 1 file changed, 118 insertions(+), 98 deletions(-) (limited to 'src/dbctx.rs') diff --git a/src/dbctx.rs b/src/dbctx.rs index dcca362..331e55f 100644 --- a/src/dbctx.rs +++ b/src/dbctx.rs @@ -36,34 +36,51 @@ pub struct Remote { pub notifier_config_path: String, } +// a job tracks when we became aware of a commit from remote. typically a job will have a 1-1 +// relationship with commits, and potentially many *runs* of that job. #[derive(Debug, Clone)] pub struct Job { pub id: u64, - pub artifacts_path: Option, - pub state: sql::JobState, - pub run_host: Option, pub remote_id: u64, pub commit_id: u64, pub created_time: u64, + pub source: Option, +} + +// a run tracks the intent or obligation to have some runner somewhere run a goodfile and report +// results. a job may have many runs from many different hosts rebuliding history, or reruns of the +// same job on the same hardware to collect more datapoints on the operation. +#[derive(Debug, Clone)] +pub struct Run { + pub id: u64, + pub job_id: u64, + pub artifacts_path: Option, + pub state: sql::RunState, + pub run_host: Option, + pub create_time: u64, pub start_time: Option, pub complete_time: Option, pub build_token: Option, - pub job_timeout: Option, - pub source: Option, + pub run_timeout: Option, pub build_result: Option, pub final_text: Option, } +impl Run { + fn into_pending_run(self) -> PendingRun { + PendingRun { + id: self.id, + job_id: self.job_id, + create_time: self.create_time, + } + } +} + #[derive(Debug, Clone)] -pub struct PendingJob { +pub struct PendingRun { pub id: u64, - pub artifacts: Option, - pub state: sql::JobState, - pub run_host: Option, - pub remote_id: u64, - pub commit_id: u64, - pub created_time: u64, - pub source: Option, + pub job_id: u64, + pub create_time: u64, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -76,7 +93,7 @@ pub enum TokenValidity { #[derive(Debug, Clone, PartialEq, Eq)] pub struct MetricRecord { pub id: u64, - pub job_id: u64, + pub run_id: u64, pub name: String, pub value: String } @@ -84,7 +101,7 @@ pub struct MetricRecord { #[derive(Debug, Clone, PartialEq, Eq)] pub struct ArtifactRecord { pub id: u64, - pub job_id: u64, + pub run_id: u64, pub name: String, pub desc: String, pub created_time: u64, @@ -113,12 +130,12 @@ impl DbCtx { Ok(()) } - pub fn insert_metric(&self, job_id: u64, name: &str, value: &str) -> Result<(), String> { + pub fn insert_metric(&self, run_id: u64, name: &str, value: &str) -> Result<(), String> { let conn = self.conn.lock().unwrap(); conn .execute( - "insert into metrics (job_id, name, value) values (?1, ?2, ?3) on conflict (job_id, name) do update set value=excluded.value", - (job_id, name, value) + "insert into metrics (run_id, name, value) values (?1, ?2, ?3) on conflict (run_id, name) do update set value=excluded.value", + (run_id, name, value) ) .expect("can upsert"); Ok(()) @@ -163,7 +180,7 @@ impl DbCtx { }) } - pub async fn reserve_artifact(&self, job_id: u64, name: &str, desc: &str) -> Result { + pub async fn reserve_artifact(&self, run_id: u64, name: &str, desc: &str) -> Result { let artifact_id = { let created_time = SystemTime::now() .duration_since(UNIX_EPOCH) @@ -172,8 +189,8 @@ impl DbCtx { let conn = self.conn.lock().unwrap(); conn .execute( - "insert into artifacts (job_id, name, desc, created_time) values (?1, ?2, ?3, ?4)", - (job_id, name, desc, created_time) + "insert into artifacts (run_id, name, desc, created_time) values (?1, ?2, ?3, ?4)", + (run_id, name, desc, created_time) ) .map_err(|e| { format!("{:?}", e) @@ -182,17 +199,17 @@ impl DbCtx { conn.last_insert_rowid() as u64 }; - ArtifactDescriptor::new(job_id, artifact_id).await + ArtifactDescriptor::new(run_id, artifact_id).await } - pub fn lookup_artifact(&self, job_id: u64, artifact_id: u64) -> Result, String> { + pub fn lookup_artifact(&self, run_id: u64, artifact_id: u64) -> Result, String> { let conn = self.conn.lock().unwrap(); conn - .query_row(sql::ARTIFACT_BY_ID, [artifact_id, job_id], |row| { - let (id, job_id, name, desc, created_time, completed_time) = row.try_into().unwrap(); + .query_row(sql::ARTIFACT_BY_ID, [artifact_id, run_id], |row| { + let (id, run_id, name, desc, created_time, completed_time) = row.try_into().unwrap(); Ok(ArtifactRecord { - id, job_id, name, desc, created_time, completed_time + id, run_id, name, desc, created_time, completed_time }) }) .optional() @@ -222,11 +239,11 @@ impl DbCtx { .map_err(|e| e.to_string()) } - pub fn job_for_token(&self, token: &str) -> Result, TokenValidity)>, String> { + pub fn run_for_token(&self, token: &str) -> Result, TokenValidity)>, String> { self.conn.lock() .unwrap() .query_row( - "select id, artifacts_path, started_time, job_timeout from jobs where build_token=?1", + "select id, artifacts_path, started_time, run_timeout from runs where build_token=?1", [token], |row| { let timeout: Option = row.get(3).unwrap(); @@ -254,6 +271,20 @@ impl DbCtx { .map_err(|e| e.to_string()) } + pub fn job_by_id(&self, id: u64) -> Result, String> { + self.conn.lock() + .unwrap() + .query_row(crate::sql::JOB_BY_ID, [id], |row| { + let (id, source, created_time, remote_id, commit_id) = row.try_into().unwrap(); + + Ok(Job { + id, source, created_time, remote_id, commit_id + }) + }) + .optional() + .map_err(|e| e.to_string()) + } + pub fn remote_by_id(&self, id: u64) -> Result, String> { self.conn.lock() .unwrap() @@ -322,40 +353,62 @@ impl DbCtx { let conn = self.conn.lock().unwrap(); let rows_modified = conn.execute( - "insert into jobs (state, remote_id, commit_id, created_time, source) values (?1, ?2, ?3, ?4, ?5);", - (crate::sql::JobState::Pending as u64, remote_id, commit_id, created_time, pusher) + "insert into jobs (remote_id, commit_id, created_time, source) values (?1, ?2, ?3, ?4, ?5);", + (remote_id, commit_id, created_time, pusher) ).unwrap(); assert_eq!(1, rows_modified); - Ok(conn.last_insert_rowid() as u64) + let job_id = conn.last_insert_rowid() as u64; + + Ok(job_id) } - pub fn metrics_for_job(&self, job: u64) -> Result, String> { + pub fn new_run(&self, job_id: u64) -> Result { + let created_time = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("now is before epoch") + .as_millis() as u64; + let conn = self.conn.lock().unwrap(); - let mut metrics_query = conn.prepare(sql::METRICS_FOR_JOB).unwrap(); - let mut result = metrics_query.query([job]).unwrap(); + let rows_modified = conn.execute( + "insert into runs (job_id, state, created_time) values (?1, ?2, ?3);", + (job_id, crate::sql::RunState::Pending as u64, created_time) + ).unwrap(); + + assert_eq!(1, rows_modified); + + let run_id = conn.last_insert_rowid() as u64; + + Ok(run_id) + } + + pub fn metrics_for_run(&self, run: u64) -> Result, String> { + let conn = self.conn.lock().unwrap(); + + let mut metrics_query = conn.prepare(sql::METRICS_FOR_RUN).unwrap(); + let mut result = metrics_query.query([run]).unwrap(); let mut metrics = Vec::new(); while let Some(row) = result.next().unwrap() { - let (id, job_id, name, value): (u64, u64, String, String) = row.try_into().unwrap(); - metrics.push(MetricRecord { id, job_id, name, value }); + let (id, run_id, name, value): (u64, u64, String, String) = row.try_into().unwrap(); + metrics.push(MetricRecord { id, run_id, name, value }); } Ok(metrics) } - pub fn artifacts_for_job(&self, job: u64, limit: Option) -> Result, String> { + pub fn artifacts_for_run(&self, run: u64, limit: Option) -> Result, String> { let conn = self.conn.lock().unwrap(); - let mut artifacts_query = conn.prepare(sql::LAST_ARTIFACTS_FOR_JOB).unwrap(); - let mut result = artifacts_query.query([job, limit.unwrap_or(65535)]).unwrap(); + let mut artifacts_query = conn.prepare(sql::LAST_ARTIFACTS_FOR_RUN).unwrap(); + let mut result = artifacts_query.query([run, limit.unwrap_or(65535)]).unwrap(); let mut artifacts = Vec::new(); while let Some(row) = result.next().unwrap() { - let (id, job_id, name, desc, created_time, completed_time): (u64, u64, String, String, u64, Option) = row.try_into().unwrap(); - artifacts.push(ArtifactRecord { id, job_id, name, desc, created_time, completed_time }); + let (id, run_id, name, desc, created_time, completed_time): (u64, u64, String, String, u64, Option) = row.try_into().unwrap(); + artifacts.push(ArtifactRecord { id, run_id, name, desc, created_time, completed_time }); } Ok(artifacts) @@ -403,23 +456,13 @@ impl DbCtx { conn .query_row(sql::JOB_BY_COMMIT_ID, [commit_id], |row| { - let (id, artifacts_path, state, run_host, remote_id, commit_id, created_time, start_time, complete_time, build_token, job_timeout, source, build_result, final_text) = row.try_into().unwrap(); - let state: u8 = state; + let (id, source, created_time, remote_id, commit_id) = row.try_into().unwrap(); Ok(Job { id, - artifacts_path, - state: state.try_into().unwrap(), - run_host, remote_id, commit_id, created_time, - start_time, - complete_time, - build_token, - job_timeout, source, - build_result, - final_text, }) }) .optional() @@ -435,77 +478,43 @@ impl DbCtx { let mut jobs = Vec::new(); while let Some(row) = result.next().unwrap() { - let (id, artifacts_path, state, run_host, remote_id, commit_id, created_time, start_time, complete_time, build_token, job_timeout, source, build_result, final_text)= row.try_into().unwrap(); - let state: u8 = state; + let (id, source, created_time, remote_id, commit_id) = row.try_into().unwrap(); jobs.push(Job { id, - artifacts_path, - state: state.try_into().unwrap(), - run_host, remote_id, commit_id, created_time, - start_time, - complete_time, - build_token, - job_timeout, source, - build_result, - final_text, }); } Ok(jobs) } - pub fn get_active_jobs(&self) -> Result, String> { + pub fn get_active_runs(&self) -> Result, String> { let conn = self.conn.lock().unwrap(); - let mut started_query = conn.prepare(sql::ACTIVE_JOBS).unwrap(); - let mut jobs = started_query.query([]).unwrap(); + let mut started_query = conn.prepare(sql::ACTIVE_RUNS).unwrap(); + let mut runs = started_query.query([]).unwrap(); let mut started = Vec::new(); - while let Some(row) = jobs.next().unwrap() { - let (id, artifacts_path, state, run_host, remote_id, commit_id, created_time, start_time, complete_time, build_token, job_timeout, source, build_result, final_text) = row.try_into().unwrap(); - let state: u8 = state; - - started.push(Job { - id, - artifacts_path, - state: state.try_into().unwrap(), - run_host, - remote_id, - commit_id, - created_time, - start_time, - complete_time, - build_token, - job_timeout, - source, - build_result, - final_text, - }); + while let Some(row) = runs.next().unwrap() { + started.push(crate::sql::row2run(row)); } Ok(started) } - pub fn get_pending_jobs(&self) -> Result, String> { + pub fn get_pending_runs(&self) -> Result, String> { let conn = self.conn.lock().unwrap(); - let mut pending_query = conn.prepare(sql::PENDING_JOBS).unwrap(); - let mut jobs = pending_query.query([]).unwrap(); + let mut pending_query = conn.prepare(sql::PENDING_RUNS).unwrap(); + let mut runs = pending_query.query([]).unwrap(); let mut pending = Vec::new(); - while let Some(row) = jobs.next().unwrap() { - let (id, artifacts, state, run_host, remote_id, commit_id, created_time, source) = row.try_into().unwrap(); - let state: u8 = state; - pending.push(PendingJob { - id, artifacts, - state: state.try_into().unwrap(), - run_host, remote_id, commit_id, created_time, - source, - }); + while let Some(row) = runs.next().unwrap() { + let run = crate::sql::row2run(row).into_pending_run(); + pending.push(run); } Ok(pending) @@ -526,6 +535,17 @@ impl DbCtx { Ok(remotes) } + pub fn last_run_for_job(&self, job_id: u64) -> Result, String> { + let conn = self.conn.lock().unwrap(); + + conn + .query_row(sql::LAST_RUN_FOR_JOB, [job_id], |row| { + Ok(crate::sql::row2run(row)) + }) + .optional() + .map_err(|e| e.to_string()) + } + pub fn notifiers_by_repo(&self, repo_id: u64) -> Result, String> { let remotes = self.remotes_by_repo(repo_id)?; -- cgit v1.1