From 3a3a8ba8b773d36a0b3c8208e37928e3c6dbe696 Mon Sep 17 00:00:00 2001 From: iximeow Date: Tue, 4 Jul 2023 01:52:55 -0700 Subject: pretty sure this gets host-preferential task assignment in place? --- src/dbctx.rs | 59 ++++++++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 44 insertions(+), 15 deletions(-) (limited to 'src/dbctx.rs') diff --git a/src/dbctx.rs b/src/dbctx.rs index 314f09f..797c762 100644 --- a/src/dbctx.rs +++ b/src/dbctx.rs @@ -23,6 +23,7 @@ pub struct DbCtx { pub struct Repo { pub id: u64, pub name: String, + pub default_run_preference: Option, } #[derive(Debug)] @@ -45,6 +46,7 @@ pub struct Job { pub commit_id: u64, pub created_time: u64, pub source: Option, + pub run_preferences: Option, } // a run tracks the intent or obligation to have some runner somewhere run a goodfile and report @@ -277,10 +279,10 @@ impl DbCtx { self.conn.lock() .unwrap() .query_row(crate::sql::JOB_BY_ID, [id], |row| { - let (id, source, created_time, remote_id, commit_id) = row.try_into().unwrap(); + let (id, source, created_time, remote_id, commit_id, run_preferences) = row.try_into().unwrap(); Ok(Job { - id, source, created_time, remote_id, commit_id + id, source, created_time, remote_id, commit_id, run_preferences }) }) .optional() @@ -341,7 +343,7 @@ impl DbCtx { Ok(conn.last_insert_rowid() as u64) } - pub fn new_job(&self, remote_id: u64, sha: &str, pusher: Option<&str>) -> Result { + pub fn new_job(&self, remote_id: u64, sha: &str, pusher: Option<&str>, repo_default_run_pref: Option) -> Result { // TODO: potential race: if two remotes learn about a commit at the same time and we decide // to create two jobs at the same time, this might return an incorrect id if the insert // didn't actually insert a new row. @@ -355,8 +357,8 @@ impl DbCtx { let conn = self.conn.lock().unwrap(); let rows_modified = conn.execute( - "insert into jobs (remote_id, commit_id, created_time, source) values (?1, ?2, ?3, ?4);", - (remote_id, commit_id, created_time, pusher) + "insert into jobs (remote_id, commit_id, created_time, source, run_preferences) values (?1, ?2, ?3, ?4, ?5);", + (remote_id, commit_id, created_time, pusher, repo_default_run_pref) ).unwrap(); assert_eq!(1, rows_modified); @@ -366,7 +368,7 @@ impl DbCtx { Ok(job_id) } - pub fn new_run(&self, job_id: u64) -> Result { + pub fn new_run(&self, job_id: u64, host_preference: Option) -> Result { let created_time = SystemTime::now() .duration_since(UNIX_EPOCH) .expect("now is before epoch") @@ -375,15 +377,19 @@ impl DbCtx { let conn = self.conn.lock().unwrap(); let rows_modified = conn.execute( - "insert into runs (job_id, state, created_time) values (?1, ?2, ?3);", - (job_id, crate::sql::RunState::Pending as u64, created_time) + "insert into runs (job_id, state, created_time, host_preference) values (?1, ?2, ?3, ?4);", + (job_id, crate::sql::RunState::Pending as u64, created_time, host_preference) ).unwrap(); assert_eq!(1, rows_modified); let run_id = conn.last_insert_rowid() as u64; - Ok(run_id) + Ok(PendingRun { + id: run_id, + job_id, + create_time: created_time, + }) } pub fn reap_task(&self, task_id: u64) -> Result<(), String> { @@ -430,11 +436,12 @@ impl DbCtx { pub fn repo_by_id(&self, id: u64) -> Result, String> { self.conn.lock() .unwrap() - .query_row("select * from repos where id=?1", [id], |row| { - let (id, repo_name) = row.try_into().unwrap(); + .query_row("select id, repo_name, default_run_preference from repos where id=?1", [id], |row| { + let (id, repo_name, default_run_preference) = row.try_into().unwrap(); Ok(Repo { id, name: repo_name, + default_run_preference, }) }) .optional() @@ -449,10 +456,11 @@ impl DbCtx { let mut result = Vec::new(); while let Some(row) = repos.next().unwrap() { - let (id, repo_name) = row.try_into().unwrap(); + let (id, repo_name, default_run_preference) = row.try_into().unwrap(); result.push(Repo { id, name: repo_name, + default_run_preference, }); } @@ -469,13 +477,14 @@ impl DbCtx { conn .query_row(sql::JOB_BY_COMMIT_ID, [commit_id], |row| { - let (id, source, created_time, remote_id, commit_id) = row.try_into().unwrap(); + let (id, source, created_time, remote_id, commit_id, run_preferences) = row.try_into().unwrap(); Ok(Job { id, remote_id, commit_id, created_time, source, + run_preferences, }) }) .optional() @@ -491,13 +500,14 @@ impl DbCtx { let mut jobs = Vec::new(); while let Some(row) = result.next().unwrap() { - let (id, source, created_time, remote_id, commit_id) = row.try_into().unwrap(); + let (id, source, created_time, remote_id, commit_id, run_preferences) = row.try_into().unwrap(); jobs.push(Job { id, remote_id, commit_id, created_time, source, + run_preferences, }); } @@ -518,7 +528,7 @@ impl DbCtx { Ok(started) } - pub fn get_pending_runs(&self) -> Result, String> { + pub fn get_pending_runs(&self, host_id: Option) -> Result, String> { let conn = self.conn.lock().unwrap(); let mut pending_query = conn.prepare(sql::PENDING_RUNS).unwrap(); @@ -538,6 +548,25 @@ impl DbCtx { Ok(pending) } + pub fn jobs_needing_task_runs_for_host(&self, host_id: u64) -> Result, String> { + let conn = self.conn.lock().unwrap(); + + let mut jobs_needing_task_runs = conn.prepare(sql::JOBS_NEEDING_HOST_RUN).unwrap(); + let mut job_rows = jobs_needing_task_runs.query([host_id]).unwrap(); + let mut jobs = Vec::new(); + + while let Some(row) = job_rows.next().unwrap() { + let (id, source, created_time, remote_id, commit_id, run_preferences) = row.try_into().unwrap(); + + jobs.push(Job { + id, source, created_time, remote_id, commit_id, run_preferences, + }); + } + + Ok(jobs) + } + + pub fn remotes_by_repo(&self, repo_id: u64) -> Result, String> { let mut remotes: Vec = Vec::new(); -- cgit v1.1