diff options
author | iximeow <me@iximeow.net> | 2023-07-04 01:52:55 -0700 |
---|---|---|
committer | iximeow <me@iximeow.net> | 2023-07-04 01:52:55 -0700 |
commit | 3a3a8ba8b773d36a0b3c8208e37928e3c6dbe696 (patch) | |
tree | e840c014db88f75921d739433db3d42cbeb2a67d | |
parent | 00b72715aa496363dd3c783bd5b2ff965869c8c3 (diff) |
pretty sure this gets host-preferential task assignment in place?
-rw-r--r-- | src/ci_ctl.rs | 13 | ||||
-rw-r--r-- | src/ci_driver.rs | 68 | ||||
-rw-r--r-- | src/dbctx.rs | 59 | ||||
-rw-r--r-- | src/lua/mod.rs | 20 | ||||
-rw-r--r-- | src/main.rs | 17 | ||||
-rw-r--r-- | src/sql.rs | 21 |
6 files changed, 142 insertions, 56 deletions
diff --git a/src/ci_ctl.rs b/src/ci_ctl.rs index 74e6e55..6d8b004 100644 --- a/src/ci_ctl.rs +++ b/src/ci_ctl.rs @@ -85,22 +85,27 @@ fn main() { let mut query = conn.prepare(crate::sql::SELECT_ALL_RUNS_WITH_JOB_INFO).unwrap(); let mut jobs = query.query([]).unwrap(); while let Some(row) = jobs.next().unwrap() { - let (job_id, run_id, state, created_time, commit_id): (u64, u64, u64, u64, u64) = row.try_into().unwrap(); + let (job_id, run_id, state, created_time, commit_id, run_preferences): (u64, u64, u64, u64, u64, Option<String>) = row.try_into().unwrap(); - eprintln!("[+] {:04} ({:04}) | {: >8?} | {} | {}", run_id, job_id, state, created_time, commit_id); + eprint!("[+] {:04} ({:04}) | {: >8?} | {} | {}", run_id, job_id, state, created_time, commit_id); + if let Some(run_preferences) = run_preferences { + eprintln!(" | run preference: {}", run_preferences); + } else { + eprintln!(""); + } } eprintln!("jobs"); }, JobAction::Rerun { which } => { let db = DbCtx::new(&config_path, &db_path); - let task_id = db.new_run(which as u64).expect("db can be queried"); + let task_id = db.new_run(which as u64, None).expect("db can be queried").id; eprintln!("[+] rerunning job {} as task {}", which, task_id); } JobAction::RerunCommit { commit } => { let db = DbCtx::new(&config_path, &db_path); let job_id = db.job_for_commit(&commit).unwrap(); if let Some(job_id) = job_id { - let task_id = db.new_run(job_id).expect("db can be queried"); + let task_id = db.new_run(job_id, None).expect("db can be queried").id; eprintln!("[+] rerunning job {} (commit {}) as task {}", job_id, commit, task_id); } else { eprintln!("[-] no job for commit {}", commit); diff --git a/src/ci_driver.rs b/src/ci_driver.rs index d8e60e5..c40e6b2 100644 --- a/src/ci_driver.rs +++ b/src/ci_driver.rs @@ -57,9 +57,8 @@ fn reserve_artifacts_dir(run: u64) -> std::io::Result<PathBuf> { } } -async fn activate_run(dbctx: Arc<DbCtx>, run: &PendingRun, clients: &mut mpsc::Receiver<RunnerClient>) -> Result<(), String> { +async fn activate_run(dbctx: Arc<DbCtx>, candidate: RunnerClient, job: &Job, run: &PendingRun) -> Result<(), String> { eprintln!("activating task {:?}", run); - let job = dbctx.job_by_id(run.job_id).expect("can query").expect("job exists"); let connection = dbctx.conn.lock().unwrap(); @@ -84,28 +83,16 @@ async fn activate_run(dbctx: Arc<DbCtx>, run: &PendingRun, clients: &mut mpsc::R eprintln!("running {}", repo_name); - let mut client_job = loop { - let mut candidate = clients.recv().await - .ok_or_else(|| "client channel disconnected".to_string())?; + let res = candidate.submit(&dbctx, &run, &remote_git_url, &commit_sha).await; - if !candidate.will_accept(&job) { - eprintln!("client {:?} would not accept job {:?}", candidate, job); - continue; + let mut client_job = match res { + Ok(Some(mut client_job)) => { client_job } + Ok(None) => { + return Err("client hung up instead of acking task".to_string()); } - - let res = candidate.submit(&dbctx, &run, &remote_git_url, &commit_sha).await; - - match res { - Ok(Some(mut client_job)) => { - break client_job; - } - Ok(None) => { - eprintln!("client hung up instead of acking task"); - } - Err(e) => { - // failed to submit job, move on for now - eprintln!("failed to submit task: {:?}", e); - } + Err(e) => { + // failed to submit job, move on for now + return Err(format!("failed to submit task: {:?}", e)); } }; @@ -521,16 +508,47 @@ async fn main() { spawn(old_task_reaper(Arc::clone(&dbctx))); - loop { - let runs = dbctx.get_pending_runs().unwrap(); + 'accept: loop { + let mut candidate = match channel.recv().await + .ok_or_else(|| "client channel disconnected".to_string()) { + + Ok(candidate) => { candidate }, + Err(e) => { eprintln!("client error: {}", e); continue; } + }; + + // try to find a job for this candidate: + // * start with pending runs - these need *some* client to run them, but do not care which + // * if no new jobs, maybe an existing job still needs a rerun on this client? + // * otherwise, um, i dunno. do nothing? + + let runs = dbctx.get_pending_runs(Some(candidate.host_id)).unwrap(); if runs.len() > 0 { println!("{} new runs", runs.len()); for run in runs.into_iter() { - activate_run(Arc::clone(&dbctx), &run, &mut channel).await; + let job = dbctx.job_by_id(run.job_id).expect("can query").expect("job exists"); + + if candidate.will_accept(&job) { + eprintln!("client {:?} would not accept job {:?}", candidate, job); + activate_run(Arc::clone(&dbctx), candidate, &job, &run).await; + continue 'accept; + } + } } + + let alt_run_jobs = dbctx.jobs_needing_task_runs_for_host(candidate.host_id as u64).expect("can query"); + + for job in alt_run_jobs.iter() { + if candidate.will_accept(&job) { + let run = dbctx.new_run(job.id, Some(candidate.host_id)).unwrap(); + eprintln!("enqueueing job {} for alternate run under host id {}", job.id, candidate.host_id); + activate_run(Arc::clone(&dbctx), candidate, &job, &run).await; + continue 'accept; + } + } + tokio::time::sleep(std::time::Duration::from_millis(100)).await; } } diff --git a/src/dbctx.rs b/src/dbctx.rs index 314f09f..797c762 100644 --- a/src/dbctx.rs +++ b/src/dbctx.rs @@ -23,6 +23,7 @@ pub struct DbCtx { pub struct Repo { pub id: u64, pub name: String, + pub default_run_preference: Option<String>, } #[derive(Debug)] @@ -45,6 +46,7 @@ pub struct Job { pub commit_id: u64, pub created_time: u64, pub source: Option<String>, + pub run_preferences: Option<String>, } // a run tracks the intent or obligation to have some runner somewhere run a goodfile and report @@ -277,10 +279,10 @@ impl DbCtx { self.conn.lock() .unwrap() .query_row(crate::sql::JOB_BY_ID, [id], |row| { - let (id, source, created_time, remote_id, commit_id) = row.try_into().unwrap(); + let (id, source, created_time, remote_id, commit_id, run_preferences) = row.try_into().unwrap(); Ok(Job { - id, source, created_time, remote_id, commit_id + id, source, created_time, remote_id, commit_id, run_preferences }) }) .optional() @@ -341,7 +343,7 @@ impl DbCtx { Ok(conn.last_insert_rowid() as u64) } - pub fn new_job(&self, remote_id: u64, sha: &str, pusher: Option<&str>) -> Result<u64, String> { + pub fn new_job(&self, remote_id: u64, sha: &str, pusher: Option<&str>, repo_default_run_pref: Option<String>) -> Result<u64, String> { // TODO: potential race: if two remotes learn about a commit at the same time and we decide // to create two jobs at the same time, this might return an incorrect id if the insert // didn't actually insert a new row. @@ -355,8 +357,8 @@ impl DbCtx { let conn = self.conn.lock().unwrap(); let rows_modified = conn.execute( - "insert into jobs (remote_id, commit_id, created_time, source) values (?1, ?2, ?3, ?4);", - (remote_id, commit_id, created_time, pusher) + "insert into jobs (remote_id, commit_id, created_time, source, run_preferences) values (?1, ?2, ?3, ?4, ?5);", + (remote_id, commit_id, created_time, pusher, repo_default_run_pref) ).unwrap(); assert_eq!(1, rows_modified); @@ -366,7 +368,7 @@ impl DbCtx { Ok(job_id) } - pub fn new_run(&self, job_id: u64) -> Result<u64, String> { + pub fn new_run(&self, job_id: u64, host_preference: Option<u32>) -> Result<PendingRun, String> { let created_time = SystemTime::now() .duration_since(UNIX_EPOCH) .expect("now is before epoch") @@ -375,15 +377,19 @@ impl DbCtx { let conn = self.conn.lock().unwrap(); let rows_modified = conn.execute( - "insert into runs (job_id, state, created_time) values (?1, ?2, ?3);", - (job_id, crate::sql::RunState::Pending as u64, created_time) + "insert into runs (job_id, state, created_time, host_preference) values (?1, ?2, ?3, ?4);", + (job_id, crate::sql::RunState::Pending as u64, created_time, host_preference) ).unwrap(); assert_eq!(1, rows_modified); let run_id = conn.last_insert_rowid() as u64; - Ok(run_id) + Ok(PendingRun { + id: run_id, + job_id, + create_time: created_time, + }) } pub fn reap_task(&self, task_id: u64) -> Result<(), String> { @@ -430,11 +436,12 @@ impl DbCtx { pub fn repo_by_id(&self, id: u64) -> Result<Option<Repo>, String> { self.conn.lock() .unwrap() - .query_row("select * from repos where id=?1", [id], |row| { - let (id, repo_name) = row.try_into().unwrap(); + .query_row("select id, repo_name, default_run_preference from repos where id=?1", [id], |row| { + let (id, repo_name, default_run_preference) = row.try_into().unwrap(); Ok(Repo { id, name: repo_name, + default_run_preference, }) }) .optional() @@ -449,10 +456,11 @@ impl DbCtx { let mut result = Vec::new(); while let Some(row) = repos.next().unwrap() { - let (id, repo_name) = row.try_into().unwrap(); + let (id, repo_name, default_run_preference) = row.try_into().unwrap(); result.push(Repo { id, name: repo_name, + default_run_preference, }); } @@ -469,13 +477,14 @@ impl DbCtx { conn .query_row(sql::JOB_BY_COMMIT_ID, [commit_id], |row| { - let (id, source, created_time, remote_id, commit_id) = row.try_into().unwrap(); + let (id, source, created_time, remote_id, commit_id, run_preferences) = row.try_into().unwrap(); Ok(Job { id, remote_id, commit_id, created_time, source, + run_preferences, }) }) .optional() @@ -491,13 +500,14 @@ impl DbCtx { let mut jobs = Vec::new(); while let Some(row) = result.next().unwrap() { - let (id, source, created_time, remote_id, commit_id) = row.try_into().unwrap(); + let (id, source, created_time, remote_id, commit_id, run_preferences) = row.try_into().unwrap(); jobs.push(Job { id, remote_id, commit_id, created_time, source, + run_preferences, }); } @@ -518,7 +528,7 @@ impl DbCtx { Ok(started) } - pub fn get_pending_runs(&self) -> Result<Vec<PendingRun>, String> { + pub fn get_pending_runs(&self, host_id: Option<u32>) -> Result<Vec<PendingRun>, String> { let conn = self.conn.lock().unwrap(); let mut pending_query = conn.prepare(sql::PENDING_RUNS).unwrap(); @@ -538,6 +548,25 @@ impl DbCtx { Ok(pending) } + pub fn jobs_needing_task_runs_for_host(&self, host_id: u64) -> Result<Vec<Job>, String> { + let conn = self.conn.lock().unwrap(); + + let mut jobs_needing_task_runs = conn.prepare(sql::JOBS_NEEDING_HOST_RUN).unwrap(); + let mut job_rows = jobs_needing_task_runs.query([host_id]).unwrap(); + let mut jobs = Vec::new(); + + while let Some(row) = job_rows.next().unwrap() { + let (id, source, created_time, remote_id, commit_id, run_preferences) = row.try_into().unwrap(); + + jobs.push(Job { + id, source, created_time, remote_id, commit_id, run_preferences, + }); + } + + Ok(jobs) + } + + pub fn remotes_by_repo(&self, repo_id: u64) -> Result<Vec<Remote>, String> { let mut remotes: Vec<Remote> = Vec::new(); diff --git a/src/lua/mod.rs b/src/lua/mod.rs index 1b86582..89ff4c2 100644 --- a/src/lua/mod.rs +++ b/src/lua/mod.rs @@ -152,6 +152,21 @@ mod lua_exports { Ok(result) } + pub fn check_dependencies(commands: Vec<String>) -> Result<(), rlua::Error> { + let mut missing_deps = Vec::new(); + for command in commands.iter() { + if !has_cmd(command)? { + missing_deps.push(command.clone()); + } + } + + if missing_deps.len() > 0 { + return Err(LuaError::RuntimeError(format!("missing dependencies: {}", missing_deps.join(", ")))); + } + + Ok(()) + } + pub fn metric(name: String, value: String, job_ctx: Arc<Mutex<RunningJob>>) -> Result<(), rlua::Error> { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() @@ -283,6 +298,10 @@ impl BuildEnv { Ok(()) })?; + let check_dependencies = decl_env.create_function("dependencies", move |_, job_ref, commands: Vec<String>| { + lua_exports::check_dependencies(commands) + })?; + let build = decl_env.create_function("build", move |_, job_ref, (command, params): (LuaValue, LuaValue)| { lua_exports::build_command_impl(command, params, job_ref) })?; @@ -337,6 +356,7 @@ impl BuildEnv { vec![ ("hello", hello), ("run", build), + ("dependencies", check_dependencies), ("metric", metric), ("error", error), ("artifact", artifact), diff --git a/src/main.rs b/src/main.rs index 0ca0765..0e94b96 100644 --- a/src/main.rs +++ b/src/main.rs @@ -256,14 +256,21 @@ async fn process_push_event(ctx: Arc<DbCtx>, owner: String, repo: String, event: } }; + let repo_default_run_pref: Option<String> = ctx.conn.lock().unwrap() + .query_row("select default_run_preference from repos where id=?1;", [repo_id], |row| { + Ok((row.get(0)).unwrap()) + }) + .optional() + .expect("can query"); + let pusher_email = pusher .get("email") .expect("has email") .as_str() .expect("is str"); - let job_id = ctx.new_job(remote_id, &sha, Some(pusher_email)).unwrap(); - let _ = ctx.new_run(job_id).unwrap(); + let job_id = ctx.new_job(remote_id, &sha, Some(pusher_email), repo_default_run_pref).unwrap(); + let _ = ctx.new_run(job_id, None).unwrap(); let notifiers = ctx.notifiers_by_repo(repo_id).expect("can get notifiers"); @@ -714,8 +721,8 @@ async fn handle_repo_summary(Path(path): Path<String>, State(ctx): State<Webserv let mut last_builds = Vec::new(); - let (repo_id, repo_name): (u64, String) = match ctx.dbctx.conn.lock().unwrap() - .query_row("select id, repo_name from repos where repo_name=?1;", [&path], |row| Ok((row.get(0).unwrap(), row.get(1).unwrap()))) + let (repo_id, repo_name, default_run_preference): (u64, String) = match ctx.dbctx.conn.lock().unwrap() + .query_row("select id, repo_name, default_run_preference from repos where repo_name=?1;", [&path], |row| Ok((row.get(0).unwrap(), row.get(1).unwrap(), row.get(2).unwrap()))) .optional() .unwrap() { Some(elem) => elem, @@ -725,6 +732,8 @@ async fn handle_repo_summary(Path(path): Path<String>, State(ctx): State<Webserv } }; + // TODO: display default_run_preference somehow on the web summary? + for remote in ctx.dbctx.remotes_by_repo(repo_id).expect("can get repo from a path") { let mut last_ten_jobs = ctx.dbctx.recent_jobs_from_remote(remote.id, 10).expect("can look up jobs for a repo"); last_builds.extend(last_ten_jobs.drain(..)); @@ -61,7 +61,8 @@ pub const CREATE_JOBS_TABLE: &'static str = "\ source TEXT, created_time INTEGER, remote_id INTEGER, - commit_id INTEGER);"; + commit_id INTEGER, + run_preferences TEXT);"; pub const CREATE_METRICS_TABLE: &'static str = "\ CREATE TABLE IF NOT EXISTS metrics (id INTEGER PRIMARY KEY AUTOINCREMENT, @@ -76,7 +77,8 @@ pub const CREATE_COMMITS_TABLE: &'static str = "\ pub const CREATE_REPOS_TABLE: &'static str = "\ CREATE TABLE IF NOT EXISTS repos (id INTEGER PRIMARY KEY AUTOINCREMENT, - repo_name TEXT);"; + repo_name TEXT, + default_run_preference TEXT);"; // remote_api is `github` or NULL for now. hopefully a future cgit-style notifier one day. // remote_path is some unique identifier for the relevant remote. @@ -137,7 +139,10 @@ pub const CREATE_REPO_NAME_INDEX: &'static str = "\ CREATE UNIQUE INDEX IF NOT EXISTS 'repo_names' ON repos(repo_name);"; pub const PENDING_RUNS: &'static str = "\ - select id, job_id, created_time from runs where state=0;"; + select id, job_id, created_time, host_preference from runs where state=0 and (host_preference=?1 or host_preference is null) order by created_time desc;"; + +pub const JOBS_NEEDING_HOST_RUN: &'static str = "\ + select jobs.id, jobs.source, jobs.created_time, jobs.remote_id, jobs.commit_id, jobs.run_preferences from jobs left join runs on jobs.id=runs.job_id where jobs.run_preferences=\"all\" and (host_id!=?1 or host_id is null);"; pub const ACTIVE_RUNS: &'static str = "\ select id, @@ -157,13 +162,13 @@ pub const LAST_ARTIFACTS_FOR_RUN: &'static str = "\ select * from artifacts where run_id=?1 and (name like \"%(stderr)%\" or name like \"%(stdout)%\") order by id desc limit ?2;"; pub const JOB_BY_COMMIT_ID: &'static str = "\ - select id, source, created_time, remote_id, commit_id from jobs where commit_id=?1;"; + select id, source, created_time, remote_id, commit_id, run_preferences from jobs where commit_id=?1;"; pub const ARTIFACT_BY_ID: &'static str = "\ select * from artifacts where id=?1 and run_id=?2;"; pub const JOB_BY_ID: &'static str = "\ - select id, source, created_time, remote_id, commit_id from jobs where id=?1"; + select id, source, created_time, remote_id, commit_id, run_preferences from jobs where id=?1"; pub const METRICS_FOR_RUN: &'static str = "\ select * from metrics where run_id=?1 order by id asc;"; @@ -175,10 +180,10 @@ pub const REMOTES_FOR_REPO: &'static str = "\ select * from remotes where repo_id=?1;"; pub const ALL_REPOS: &'static str = "\ - select * from repos;"; + select id, repo_name, default_run_preference from repos;"; pub const LAST_JOBS_FROM_REMOTE: &'static str = "\ - select id, source, created_time, remote_id, commit_id from jobs where remote_id=?1 order by created_time desc limit ?2;"; + select id, source, created_time, remote_id, commit_id, run_preferences from jobs where remote_id=?1 order by created_time desc limit ?2;"; pub const LAST_RUN_FOR_JOB: &'static str = "\ select id, @@ -195,6 +200,6 @@ pub const LAST_RUN_FOR_JOB: &'static str = "\ final_status from runs where job_id=?1 order by started_time desc limit 1;"; pub const SELECT_ALL_RUNS_WITH_JOB_INFO: &'static str = "\ - select jobs.id as job_id, runs.id as run_id, runs.state, runs.created_time, jobs.commit_id + select jobs.id as job_id, runs.id as run_id, runs.state, runs.created_time, jobs.commit_id, jobs.run_preferences from jobs join runs on jobs.id=runs.job_id oder by runs.created_time asc;"; |