From 3a3a8ba8b773d36a0b3c8208e37928e3c6dbe696 Mon Sep 17 00:00:00 2001 From: iximeow Date: Tue, 4 Jul 2023 01:52:55 -0700 Subject: pretty sure this gets host-preferential task assignment in place? --- src/ci_ctl.rs | 13 +++++++---- src/ci_driver.rs | 68 +++++++++++++++++++++++++++++++++++--------------------- src/dbctx.rs | 59 +++++++++++++++++++++++++++++++++++------------- src/lua/mod.rs | 20 +++++++++++++++++ src/main.rs | 17 ++++++++++---- src/sql.rs | 21 ++++++++++------- 6 files changed, 142 insertions(+), 56 deletions(-) (limited to 'src') diff --git a/src/ci_ctl.rs b/src/ci_ctl.rs index 74e6e55..6d8b004 100644 --- a/src/ci_ctl.rs +++ b/src/ci_ctl.rs @@ -85,22 +85,27 @@ fn main() { let mut query = conn.prepare(crate::sql::SELECT_ALL_RUNS_WITH_JOB_INFO).unwrap(); let mut jobs = query.query([]).unwrap(); while let Some(row) = jobs.next().unwrap() { - let (job_id, run_id, state, created_time, commit_id): (u64, u64, u64, u64, u64) = row.try_into().unwrap(); + let (job_id, run_id, state, created_time, commit_id, run_preferences): (u64, u64, u64, u64, u64, Option) = row.try_into().unwrap(); - eprintln!("[+] {:04} ({:04}) | {: >8?} | {} | {}", run_id, job_id, state, created_time, commit_id); + eprint!("[+] {:04} ({:04}) | {: >8?} | {} | {}", run_id, job_id, state, created_time, commit_id); + if let Some(run_preferences) = run_preferences { + eprintln!(" | run preference: {}", run_preferences); + } else { + eprintln!(""); + } } eprintln!("jobs"); }, JobAction::Rerun { which } => { let db = DbCtx::new(&config_path, &db_path); - let task_id = db.new_run(which as u64).expect("db can be queried"); + let task_id = db.new_run(which as u64, None).expect("db can be queried").id; eprintln!("[+] rerunning job {} as task {}", which, task_id); } JobAction::RerunCommit { commit } => { let db = DbCtx::new(&config_path, &db_path); let job_id = db.job_for_commit(&commit).unwrap(); if let Some(job_id) = job_id { - let task_id = db.new_run(job_id).expect("db can be queried"); + let task_id = db.new_run(job_id, None).expect("db can be queried").id; eprintln!("[+] rerunning job {} (commit {}) as task {}", job_id, commit, task_id); } else { eprintln!("[-] no job for commit {}", commit); diff --git a/src/ci_driver.rs b/src/ci_driver.rs index d8e60e5..c40e6b2 100644 --- a/src/ci_driver.rs +++ b/src/ci_driver.rs @@ -57,9 +57,8 @@ fn reserve_artifacts_dir(run: u64) -> std::io::Result { } } -async fn activate_run(dbctx: Arc, run: &PendingRun, clients: &mut mpsc::Receiver) -> Result<(), String> { +async fn activate_run(dbctx: Arc, candidate: RunnerClient, job: &Job, run: &PendingRun) -> Result<(), String> { eprintln!("activating task {:?}", run); - let job = dbctx.job_by_id(run.job_id).expect("can query").expect("job exists"); let connection = dbctx.conn.lock().unwrap(); @@ -84,28 +83,16 @@ async fn activate_run(dbctx: Arc, run: &PendingRun, clients: &mut mpsc::R eprintln!("running {}", repo_name); - let mut client_job = loop { - let mut candidate = clients.recv().await - .ok_or_else(|| "client channel disconnected".to_string())?; + let res = candidate.submit(&dbctx, &run, &remote_git_url, &commit_sha).await; - if !candidate.will_accept(&job) { - eprintln!("client {:?} would not accept job {:?}", candidate, job); - continue; + let mut client_job = match res { + Ok(Some(mut client_job)) => { client_job } + Ok(None) => { + return Err("client hung up instead of acking task".to_string()); } - - let res = candidate.submit(&dbctx, &run, &remote_git_url, &commit_sha).await; - - match res { - Ok(Some(mut client_job)) => { - break client_job; - } - Ok(None) => { - eprintln!("client hung up instead of acking task"); - } - Err(e) => { - // failed to submit job, move on for now - eprintln!("failed to submit task: {:?}", e); - } + Err(e) => { + // failed to submit job, move on for now + return Err(format!("failed to submit task: {:?}", e)); } }; @@ -521,16 +508,47 @@ async fn main() { spawn(old_task_reaper(Arc::clone(&dbctx))); - loop { - let runs = dbctx.get_pending_runs().unwrap(); + 'accept: loop { + let mut candidate = match channel.recv().await + .ok_or_else(|| "client channel disconnected".to_string()) { + + Ok(candidate) => { candidate }, + Err(e) => { eprintln!("client error: {}", e); continue; } + }; + + // try to find a job for this candidate: + // * start with pending runs - these need *some* client to run them, but do not care which + // * if no new jobs, maybe an existing job still needs a rerun on this client? + // * otherwise, um, i dunno. do nothing? + + let runs = dbctx.get_pending_runs(Some(candidate.host_id)).unwrap(); if runs.len() > 0 { println!("{} new runs", runs.len()); for run in runs.into_iter() { - activate_run(Arc::clone(&dbctx), &run, &mut channel).await; + let job = dbctx.job_by_id(run.job_id).expect("can query").expect("job exists"); + + if candidate.will_accept(&job) { + eprintln!("client {:?} would not accept job {:?}", candidate, job); + activate_run(Arc::clone(&dbctx), candidate, &job, &run).await; + continue 'accept; + } + } } + + let alt_run_jobs = dbctx.jobs_needing_task_runs_for_host(candidate.host_id as u64).expect("can query"); + + for job in alt_run_jobs.iter() { + if candidate.will_accept(&job) { + let run = dbctx.new_run(job.id, Some(candidate.host_id)).unwrap(); + eprintln!("enqueueing job {} for alternate run under host id {}", job.id, candidate.host_id); + activate_run(Arc::clone(&dbctx), candidate, &job, &run).await; + continue 'accept; + } + } + tokio::time::sleep(std::time::Duration::from_millis(100)).await; } } diff --git a/src/dbctx.rs b/src/dbctx.rs index 314f09f..797c762 100644 --- a/src/dbctx.rs +++ b/src/dbctx.rs @@ -23,6 +23,7 @@ pub struct DbCtx { pub struct Repo { pub id: u64, pub name: String, + pub default_run_preference: Option, } #[derive(Debug)] @@ -45,6 +46,7 @@ pub struct Job { pub commit_id: u64, pub created_time: u64, pub source: Option, + pub run_preferences: Option, } // a run tracks the intent or obligation to have some runner somewhere run a goodfile and report @@ -277,10 +279,10 @@ impl DbCtx { self.conn.lock() .unwrap() .query_row(crate::sql::JOB_BY_ID, [id], |row| { - let (id, source, created_time, remote_id, commit_id) = row.try_into().unwrap(); + let (id, source, created_time, remote_id, commit_id, run_preferences) = row.try_into().unwrap(); Ok(Job { - id, source, created_time, remote_id, commit_id + id, source, created_time, remote_id, commit_id, run_preferences }) }) .optional() @@ -341,7 +343,7 @@ impl DbCtx { Ok(conn.last_insert_rowid() as u64) } - pub fn new_job(&self, remote_id: u64, sha: &str, pusher: Option<&str>) -> Result { + pub fn new_job(&self, remote_id: u64, sha: &str, pusher: Option<&str>, repo_default_run_pref: Option) -> Result { // TODO: potential race: if two remotes learn about a commit at the same time and we decide // to create two jobs at the same time, this might return an incorrect id if the insert // didn't actually insert a new row. @@ -355,8 +357,8 @@ impl DbCtx { let conn = self.conn.lock().unwrap(); let rows_modified = conn.execute( - "insert into jobs (remote_id, commit_id, created_time, source) values (?1, ?2, ?3, ?4);", - (remote_id, commit_id, created_time, pusher) + "insert into jobs (remote_id, commit_id, created_time, source, run_preferences) values (?1, ?2, ?3, ?4, ?5);", + (remote_id, commit_id, created_time, pusher, repo_default_run_pref) ).unwrap(); assert_eq!(1, rows_modified); @@ -366,7 +368,7 @@ impl DbCtx { Ok(job_id) } - pub fn new_run(&self, job_id: u64) -> Result { + pub fn new_run(&self, job_id: u64, host_preference: Option) -> Result { let created_time = SystemTime::now() .duration_since(UNIX_EPOCH) .expect("now is before epoch") @@ -375,15 +377,19 @@ impl DbCtx { let conn = self.conn.lock().unwrap(); let rows_modified = conn.execute( - "insert into runs (job_id, state, created_time) values (?1, ?2, ?3);", - (job_id, crate::sql::RunState::Pending as u64, created_time) + "insert into runs (job_id, state, created_time, host_preference) values (?1, ?2, ?3, ?4);", + (job_id, crate::sql::RunState::Pending as u64, created_time, host_preference) ).unwrap(); assert_eq!(1, rows_modified); let run_id = conn.last_insert_rowid() as u64; - Ok(run_id) + Ok(PendingRun { + id: run_id, + job_id, + create_time: created_time, + }) } pub fn reap_task(&self, task_id: u64) -> Result<(), String> { @@ -430,11 +436,12 @@ impl DbCtx { pub fn repo_by_id(&self, id: u64) -> Result, String> { self.conn.lock() .unwrap() - .query_row("select * from repos where id=?1", [id], |row| { - let (id, repo_name) = row.try_into().unwrap(); + .query_row("select id, repo_name, default_run_preference from repos where id=?1", [id], |row| { + let (id, repo_name, default_run_preference) = row.try_into().unwrap(); Ok(Repo { id, name: repo_name, + default_run_preference, }) }) .optional() @@ -449,10 +456,11 @@ impl DbCtx { let mut result = Vec::new(); while let Some(row) = repos.next().unwrap() { - let (id, repo_name) = row.try_into().unwrap(); + let (id, repo_name, default_run_preference) = row.try_into().unwrap(); result.push(Repo { id, name: repo_name, + default_run_preference, }); } @@ -469,13 +477,14 @@ impl DbCtx { conn .query_row(sql::JOB_BY_COMMIT_ID, [commit_id], |row| { - let (id, source, created_time, remote_id, commit_id) = row.try_into().unwrap(); + let (id, source, created_time, remote_id, commit_id, run_preferences) = row.try_into().unwrap(); Ok(Job { id, remote_id, commit_id, created_time, source, + run_preferences, }) }) .optional() @@ -491,13 +500,14 @@ impl DbCtx { let mut jobs = Vec::new(); while let Some(row) = result.next().unwrap() { - let (id, source, created_time, remote_id, commit_id) = row.try_into().unwrap(); + let (id, source, created_time, remote_id, commit_id, run_preferences) = row.try_into().unwrap(); jobs.push(Job { id, remote_id, commit_id, created_time, source, + run_preferences, }); } @@ -518,7 +528,7 @@ impl DbCtx { Ok(started) } - pub fn get_pending_runs(&self) -> Result, String> { + pub fn get_pending_runs(&self, host_id: Option) -> Result, String> { let conn = self.conn.lock().unwrap(); let mut pending_query = conn.prepare(sql::PENDING_RUNS).unwrap(); @@ -538,6 +548,25 @@ impl DbCtx { Ok(pending) } + pub fn jobs_needing_task_runs_for_host(&self, host_id: u64) -> Result, String> { + let conn = self.conn.lock().unwrap(); + + let mut jobs_needing_task_runs = conn.prepare(sql::JOBS_NEEDING_HOST_RUN).unwrap(); + let mut job_rows = jobs_needing_task_runs.query([host_id]).unwrap(); + let mut jobs = Vec::new(); + + while let Some(row) = job_rows.next().unwrap() { + let (id, source, created_time, remote_id, commit_id, run_preferences) = row.try_into().unwrap(); + + jobs.push(Job { + id, source, created_time, remote_id, commit_id, run_preferences, + }); + } + + Ok(jobs) + } + + pub fn remotes_by_repo(&self, repo_id: u64) -> Result, String> { let mut remotes: Vec = Vec::new(); diff --git a/src/lua/mod.rs b/src/lua/mod.rs index 1b86582..89ff4c2 100644 --- a/src/lua/mod.rs +++ b/src/lua/mod.rs @@ -152,6 +152,21 @@ mod lua_exports { Ok(result) } + pub fn check_dependencies(commands: Vec) -> Result<(), rlua::Error> { + let mut missing_deps = Vec::new(); + for command in commands.iter() { + if !has_cmd(command)? { + missing_deps.push(command.clone()); + } + } + + if missing_deps.len() > 0 { + return Err(LuaError::RuntimeError(format!("missing dependencies: {}", missing_deps.join(", ")))); + } + + Ok(()) + } + pub fn metric(name: String, value: String, job_ctx: Arc>) -> Result<(), rlua::Error> { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() @@ -283,6 +298,10 @@ impl BuildEnv { Ok(()) })?; + let check_dependencies = decl_env.create_function("dependencies", move |_, job_ref, commands: Vec| { + lua_exports::check_dependencies(commands) + })?; + let build = decl_env.create_function("build", move |_, job_ref, (command, params): (LuaValue, LuaValue)| { lua_exports::build_command_impl(command, params, job_ref) })?; @@ -337,6 +356,7 @@ impl BuildEnv { vec![ ("hello", hello), ("run", build), + ("dependencies", check_dependencies), ("metric", metric), ("error", error), ("artifact", artifact), diff --git a/src/main.rs b/src/main.rs index 0ca0765..0e94b96 100644 --- a/src/main.rs +++ b/src/main.rs @@ -256,14 +256,21 @@ async fn process_push_event(ctx: Arc, owner: String, repo: String, event: } }; + let repo_default_run_pref: Option = ctx.conn.lock().unwrap() + .query_row("select default_run_preference from repos where id=?1;", [repo_id], |row| { + Ok((row.get(0)).unwrap()) + }) + .optional() + .expect("can query"); + let pusher_email = pusher .get("email") .expect("has email") .as_str() .expect("is str"); - let job_id = ctx.new_job(remote_id, &sha, Some(pusher_email)).unwrap(); - let _ = ctx.new_run(job_id).unwrap(); + let job_id = ctx.new_job(remote_id, &sha, Some(pusher_email), repo_default_run_pref).unwrap(); + let _ = ctx.new_run(job_id, None).unwrap(); let notifiers = ctx.notifiers_by_repo(repo_id).expect("can get notifiers"); @@ -714,8 +721,8 @@ async fn handle_repo_summary(Path(path): Path, State(ctx): State elem, @@ -725,6 +732,8 @@ async fn handle_repo_summary(Path(path): Path, State(ctx): State