summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authoriximeow <me@iximeow.net>2023-07-04 01:52:55 -0700
committeriximeow <me@iximeow.net>2023-07-04 01:52:55 -0700
commit3a3a8ba8b773d36a0b3c8208e37928e3c6dbe696 (patch)
treee840c014db88f75921d739433db3d42cbeb2a67d /src
parent00b72715aa496363dd3c783bd5b2ff965869c8c3 (diff)
pretty sure this gets host-preferential task assignment in place?
Diffstat (limited to 'src')
-rw-r--r--src/ci_ctl.rs13
-rw-r--r--src/ci_driver.rs68
-rw-r--r--src/dbctx.rs59
-rw-r--r--src/lua/mod.rs20
-rw-r--r--src/main.rs17
-rw-r--r--src/sql.rs21
6 files changed, 142 insertions, 56 deletions
diff --git a/src/ci_ctl.rs b/src/ci_ctl.rs
index 74e6e55..6d8b004 100644
--- a/src/ci_ctl.rs
+++ b/src/ci_ctl.rs
@@ -85,22 +85,27 @@ fn main() {
let mut query = conn.prepare(crate::sql::SELECT_ALL_RUNS_WITH_JOB_INFO).unwrap();
let mut jobs = query.query([]).unwrap();
while let Some(row) = jobs.next().unwrap() {
- let (job_id, run_id, state, created_time, commit_id): (u64, u64, u64, u64, u64) = row.try_into().unwrap();
+ let (job_id, run_id, state, created_time, commit_id, run_preferences): (u64, u64, u64, u64, u64, Option<String>) = row.try_into().unwrap();
- eprintln!("[+] {:04} ({:04}) | {: >8?} | {} | {}", run_id, job_id, state, created_time, commit_id);
+ eprint!("[+] {:04} ({:04}) | {: >8?} | {} | {}", run_id, job_id, state, created_time, commit_id);
+ if let Some(run_preferences) = run_preferences {
+ eprintln!(" | run preference: {}", run_preferences);
+ } else {
+ eprintln!("");
+ }
}
eprintln!("jobs");
},
JobAction::Rerun { which } => {
let db = DbCtx::new(&config_path, &db_path);
- let task_id = db.new_run(which as u64).expect("db can be queried");
+ let task_id = db.new_run(which as u64, None).expect("db can be queried").id;
eprintln!("[+] rerunning job {} as task {}", which, task_id);
}
JobAction::RerunCommit { commit } => {
let db = DbCtx::new(&config_path, &db_path);
let job_id = db.job_for_commit(&commit).unwrap();
if let Some(job_id) = job_id {
- let task_id = db.new_run(job_id).expect("db can be queried");
+ let task_id = db.new_run(job_id, None).expect("db can be queried").id;
eprintln!("[+] rerunning job {} (commit {}) as task {}", job_id, commit, task_id);
} else {
eprintln!("[-] no job for commit {}", commit);
diff --git a/src/ci_driver.rs b/src/ci_driver.rs
index d8e60e5..c40e6b2 100644
--- a/src/ci_driver.rs
+++ b/src/ci_driver.rs
@@ -57,9 +57,8 @@ fn reserve_artifacts_dir(run: u64) -> std::io::Result<PathBuf> {
}
}
-async fn activate_run(dbctx: Arc<DbCtx>, run: &PendingRun, clients: &mut mpsc::Receiver<RunnerClient>) -> Result<(), String> {
+async fn activate_run(dbctx: Arc<DbCtx>, candidate: RunnerClient, job: &Job, run: &PendingRun) -> Result<(), String> {
eprintln!("activating task {:?}", run);
- let job = dbctx.job_by_id(run.job_id).expect("can query").expect("job exists");
let connection = dbctx.conn.lock().unwrap();
@@ -84,28 +83,16 @@ async fn activate_run(dbctx: Arc<DbCtx>, run: &PendingRun, clients: &mut mpsc::R
eprintln!("running {}", repo_name);
- let mut client_job = loop {
- let mut candidate = clients.recv().await
- .ok_or_else(|| "client channel disconnected".to_string())?;
+ let res = candidate.submit(&dbctx, &run, &remote_git_url, &commit_sha).await;
- if !candidate.will_accept(&job) {
- eprintln!("client {:?} would not accept job {:?}", candidate, job);
- continue;
+ let mut client_job = match res {
+ Ok(Some(mut client_job)) => { client_job }
+ Ok(None) => {
+ return Err("client hung up instead of acking task".to_string());
}
-
- let res = candidate.submit(&dbctx, &run, &remote_git_url, &commit_sha).await;
-
- match res {
- Ok(Some(mut client_job)) => {
- break client_job;
- }
- Ok(None) => {
- eprintln!("client hung up instead of acking task");
- }
- Err(e) => {
- // failed to submit job, move on for now
- eprintln!("failed to submit task: {:?}", e);
- }
+ Err(e) => {
+ // failed to submit job, move on for now
+ return Err(format!("failed to submit task: {:?}", e));
}
};
@@ -521,16 +508,47 @@ async fn main() {
spawn(old_task_reaper(Arc::clone(&dbctx)));
- loop {
- let runs = dbctx.get_pending_runs().unwrap();
+ 'accept: loop {
+ let mut candidate = match channel.recv().await
+ .ok_or_else(|| "client channel disconnected".to_string()) {
+
+ Ok(candidate) => { candidate },
+ Err(e) => { eprintln!("client error: {}", e); continue; }
+ };
+
+ // try to find a job for this candidate:
+ // * start with pending runs - these need *some* client to run them, but do not care which
+ // * if no new jobs, maybe an existing job still needs a rerun on this client?
+ // * otherwise, um, i dunno. do nothing?
+
+ let runs = dbctx.get_pending_runs(Some(candidate.host_id)).unwrap();
if runs.len() > 0 {
println!("{} new runs", runs.len());
for run in runs.into_iter() {
- activate_run(Arc::clone(&dbctx), &run, &mut channel).await;
+ let job = dbctx.job_by_id(run.job_id).expect("can query").expect("job exists");
+
+ if candidate.will_accept(&job) {
+ eprintln!("client {:?} would not accept job {:?}", candidate, job);
+ activate_run(Arc::clone(&dbctx), candidate, &job, &run).await;
+ continue 'accept;
+ }
+
}
}
+
+ let alt_run_jobs = dbctx.jobs_needing_task_runs_for_host(candidate.host_id as u64).expect("can query");
+
+ for job in alt_run_jobs.iter() {
+ if candidate.will_accept(&job) {
+ let run = dbctx.new_run(job.id, Some(candidate.host_id)).unwrap();
+ eprintln!("enqueueing job {} for alternate run under host id {}", job.id, candidate.host_id);
+ activate_run(Arc::clone(&dbctx), candidate, &job, &run).await;
+ continue 'accept;
+ }
+ }
+
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
}
diff --git a/src/dbctx.rs b/src/dbctx.rs
index 314f09f..797c762 100644
--- a/src/dbctx.rs
+++ b/src/dbctx.rs
@@ -23,6 +23,7 @@ pub struct DbCtx {
pub struct Repo {
pub id: u64,
pub name: String,
+ pub default_run_preference: Option<String>,
}
#[derive(Debug)]
@@ -45,6 +46,7 @@ pub struct Job {
pub commit_id: u64,
pub created_time: u64,
pub source: Option<String>,
+ pub run_preferences: Option<String>,
}
// a run tracks the intent or obligation to have some runner somewhere run a goodfile and report
@@ -277,10 +279,10 @@ impl DbCtx {
self.conn.lock()
.unwrap()
.query_row(crate::sql::JOB_BY_ID, [id], |row| {
- let (id, source, created_time, remote_id, commit_id) = row.try_into().unwrap();
+ let (id, source, created_time, remote_id, commit_id, run_preferences) = row.try_into().unwrap();
Ok(Job {
- id, source, created_time, remote_id, commit_id
+ id, source, created_time, remote_id, commit_id, run_preferences
})
})
.optional()
@@ -341,7 +343,7 @@ impl DbCtx {
Ok(conn.last_insert_rowid() as u64)
}
- pub fn new_job(&self, remote_id: u64, sha: &str, pusher: Option<&str>) -> Result<u64, String> {
+ pub fn new_job(&self, remote_id: u64, sha: &str, pusher: Option<&str>, repo_default_run_pref: Option<String>) -> Result<u64, String> {
// TODO: potential race: if two remotes learn about a commit at the same time and we decide
// to create two jobs at the same time, this might return an incorrect id if the insert
// didn't actually insert a new row.
@@ -355,8 +357,8 @@ impl DbCtx {
let conn = self.conn.lock().unwrap();
let rows_modified = conn.execute(
- "insert into jobs (remote_id, commit_id, created_time, source) values (?1, ?2, ?3, ?4);",
- (remote_id, commit_id, created_time, pusher)
+ "insert into jobs (remote_id, commit_id, created_time, source, run_preferences) values (?1, ?2, ?3, ?4, ?5);",
+ (remote_id, commit_id, created_time, pusher, repo_default_run_pref)
).unwrap();
assert_eq!(1, rows_modified);
@@ -366,7 +368,7 @@ impl DbCtx {
Ok(job_id)
}
- pub fn new_run(&self, job_id: u64) -> Result<u64, String> {
+ pub fn new_run(&self, job_id: u64, host_preference: Option<u32>) -> Result<PendingRun, String> {
let created_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("now is before epoch")
@@ -375,15 +377,19 @@ impl DbCtx {
let conn = self.conn.lock().unwrap();
let rows_modified = conn.execute(
- "insert into runs (job_id, state, created_time) values (?1, ?2, ?3);",
- (job_id, crate::sql::RunState::Pending as u64, created_time)
+ "insert into runs (job_id, state, created_time, host_preference) values (?1, ?2, ?3, ?4);",
+ (job_id, crate::sql::RunState::Pending as u64, created_time, host_preference)
).unwrap();
assert_eq!(1, rows_modified);
let run_id = conn.last_insert_rowid() as u64;
- Ok(run_id)
+ Ok(PendingRun {
+ id: run_id,
+ job_id,
+ create_time: created_time,
+ })
}
pub fn reap_task(&self, task_id: u64) -> Result<(), String> {
@@ -430,11 +436,12 @@ impl DbCtx {
pub fn repo_by_id(&self, id: u64) -> Result<Option<Repo>, String> {
self.conn.lock()
.unwrap()
- .query_row("select * from repos where id=?1", [id], |row| {
- let (id, repo_name) = row.try_into().unwrap();
+ .query_row("select id, repo_name, default_run_preference from repos where id=?1", [id], |row| {
+ let (id, repo_name, default_run_preference) = row.try_into().unwrap();
Ok(Repo {
id,
name: repo_name,
+ default_run_preference,
})
})
.optional()
@@ -449,10 +456,11 @@ impl DbCtx {
let mut result = Vec::new();
while let Some(row) = repos.next().unwrap() {
- let (id, repo_name) = row.try_into().unwrap();
+ let (id, repo_name, default_run_preference) = row.try_into().unwrap();
result.push(Repo {
id,
name: repo_name,
+ default_run_preference,
});
}
@@ -469,13 +477,14 @@ impl DbCtx {
conn
.query_row(sql::JOB_BY_COMMIT_ID, [commit_id], |row| {
- let (id, source, created_time, remote_id, commit_id) = row.try_into().unwrap();
+ let (id, source, created_time, remote_id, commit_id, run_preferences) = row.try_into().unwrap();
Ok(Job {
id,
remote_id,
commit_id,
created_time,
source,
+ run_preferences,
})
})
.optional()
@@ -491,13 +500,14 @@ impl DbCtx {
let mut jobs = Vec::new();
while let Some(row) = result.next().unwrap() {
- let (id, source, created_time, remote_id, commit_id) = row.try_into().unwrap();
+ let (id, source, created_time, remote_id, commit_id, run_preferences) = row.try_into().unwrap();
jobs.push(Job {
id,
remote_id,
commit_id,
created_time,
source,
+ run_preferences,
});
}
@@ -518,7 +528,7 @@ impl DbCtx {
Ok(started)
}
- pub fn get_pending_runs(&self) -> Result<Vec<PendingRun>, String> {
+ pub fn get_pending_runs(&self, host_id: Option<u32>) -> Result<Vec<PendingRun>, String> {
let conn = self.conn.lock().unwrap();
let mut pending_query = conn.prepare(sql::PENDING_RUNS).unwrap();
@@ -538,6 +548,25 @@ impl DbCtx {
Ok(pending)
}
+ pub fn jobs_needing_task_runs_for_host(&self, host_id: u64) -> Result<Vec<Job>, String> {
+ let conn = self.conn.lock().unwrap();
+
+ let mut jobs_needing_task_runs = conn.prepare(sql::JOBS_NEEDING_HOST_RUN).unwrap();
+ let mut job_rows = jobs_needing_task_runs.query([host_id]).unwrap();
+ let mut jobs = Vec::new();
+
+ while let Some(row) = job_rows.next().unwrap() {
+ let (id, source, created_time, remote_id, commit_id, run_preferences) = row.try_into().unwrap();
+
+ jobs.push(Job {
+ id, source, created_time, remote_id, commit_id, run_preferences,
+ });
+ }
+
+ Ok(jobs)
+ }
+
+
pub fn remotes_by_repo(&self, repo_id: u64) -> Result<Vec<Remote>, String> {
let mut remotes: Vec<Remote> = Vec::new();
diff --git a/src/lua/mod.rs b/src/lua/mod.rs
index 1b86582..89ff4c2 100644
--- a/src/lua/mod.rs
+++ b/src/lua/mod.rs
@@ -152,6 +152,21 @@ mod lua_exports {
Ok(result)
}
+ pub fn check_dependencies(commands: Vec<String>) -> Result<(), rlua::Error> {
+ let mut missing_deps = Vec::new();
+ for command in commands.iter() {
+ if !has_cmd(command)? {
+ missing_deps.push(command.clone());
+ }
+ }
+
+ if missing_deps.len() > 0 {
+ return Err(LuaError::RuntimeError(format!("missing dependencies: {}", missing_deps.join(", "))));
+ }
+
+ Ok(())
+ }
+
pub fn metric(name: String, value: String, job_ctx: Arc<Mutex<RunningJob>>) -> Result<(), rlua::Error> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
@@ -283,6 +298,10 @@ impl BuildEnv {
Ok(())
})?;
+ let check_dependencies = decl_env.create_function("dependencies", move |_, job_ref, commands: Vec<String>| {
+ lua_exports::check_dependencies(commands)
+ })?;
+
let build = decl_env.create_function("build", move |_, job_ref, (command, params): (LuaValue, LuaValue)| {
lua_exports::build_command_impl(command, params, job_ref)
})?;
@@ -337,6 +356,7 @@ impl BuildEnv {
vec![
("hello", hello),
("run", build),
+ ("dependencies", check_dependencies),
("metric", metric),
("error", error),
("artifact", artifact),
diff --git a/src/main.rs b/src/main.rs
index 0ca0765..0e94b96 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -256,14 +256,21 @@ async fn process_push_event(ctx: Arc<DbCtx>, owner: String, repo: String, event:
}
};
+ let repo_default_run_pref: Option<String> = ctx.conn.lock().unwrap()
+ .query_row("select default_run_preference from repos where id=?1;", [repo_id], |row| {
+ Ok((row.get(0)).unwrap())
+ })
+ .optional()
+ .expect("can query");
+
let pusher_email = pusher
.get("email")
.expect("has email")
.as_str()
.expect("is str");
- let job_id = ctx.new_job(remote_id, &sha, Some(pusher_email)).unwrap();
- let _ = ctx.new_run(job_id).unwrap();
+ let job_id = ctx.new_job(remote_id, &sha, Some(pusher_email), repo_default_run_pref).unwrap();
+ let _ = ctx.new_run(job_id, None).unwrap();
let notifiers = ctx.notifiers_by_repo(repo_id).expect("can get notifiers");
@@ -714,8 +721,8 @@ async fn handle_repo_summary(Path(path): Path<String>, State(ctx): State<Webserv
let mut last_builds = Vec::new();
- let (repo_id, repo_name): (u64, String) = match ctx.dbctx.conn.lock().unwrap()
- .query_row("select id, repo_name from repos where repo_name=?1;", [&path], |row| Ok((row.get(0).unwrap(), row.get(1).unwrap())))
+ let (repo_id, repo_name, default_run_preference): (u64, String) = match ctx.dbctx.conn.lock().unwrap()
+ .query_row("select id, repo_name, default_run_preference from repos where repo_name=?1;", [&path], |row| Ok((row.get(0).unwrap(), row.get(1).unwrap(), row.get(2).unwrap())))
.optional()
.unwrap() {
Some(elem) => elem,
@@ -725,6 +732,8 @@ async fn handle_repo_summary(Path(path): Path<String>, State(ctx): State<Webserv
}
};
+ // TODO: display default_run_preference somehow on the web summary?
+
for remote in ctx.dbctx.remotes_by_repo(repo_id).expect("can get repo from a path") {
let mut last_ten_jobs = ctx.dbctx.recent_jobs_from_remote(remote.id, 10).expect("can look up jobs for a repo");
last_builds.extend(last_ten_jobs.drain(..));
diff --git a/src/sql.rs b/src/sql.rs
index 9f0d7aa..137cdb6 100644
--- a/src/sql.rs
+++ b/src/sql.rs
@@ -61,7 +61,8 @@ pub const CREATE_JOBS_TABLE: &'static str = "\
source TEXT,
created_time INTEGER,
remote_id INTEGER,
- commit_id INTEGER);";
+ commit_id INTEGER,
+ run_preferences TEXT);";
pub const CREATE_METRICS_TABLE: &'static str = "\
CREATE TABLE IF NOT EXISTS metrics (id INTEGER PRIMARY KEY AUTOINCREMENT,
@@ -76,7 +77,8 @@ pub const CREATE_COMMITS_TABLE: &'static str = "\
pub const CREATE_REPOS_TABLE: &'static str = "\
CREATE TABLE IF NOT EXISTS repos (id INTEGER PRIMARY KEY AUTOINCREMENT,
- repo_name TEXT);";
+ repo_name TEXT,
+ default_run_preference TEXT);";
// remote_api is `github` or NULL for now. hopefully a future cgit-style notifier one day.
// remote_path is some unique identifier for the relevant remote.
@@ -137,7 +139,10 @@ pub const CREATE_REPO_NAME_INDEX: &'static str = "\
CREATE UNIQUE INDEX IF NOT EXISTS 'repo_names' ON repos(repo_name);";
pub const PENDING_RUNS: &'static str = "\
- select id, job_id, created_time from runs where state=0;";
+ select id, job_id, created_time, host_preference from runs where state=0 and (host_preference=?1 or host_preference is null) order by created_time desc;";
+
+pub const JOBS_NEEDING_HOST_RUN: &'static str = "\
+ select jobs.id, jobs.source, jobs.created_time, jobs.remote_id, jobs.commit_id, jobs.run_preferences from jobs left join runs on jobs.id=runs.job_id where jobs.run_preferences=\"all\" and (host_id!=?1 or host_id is null);";
pub const ACTIVE_RUNS: &'static str = "\
select id,
@@ -157,13 +162,13 @@ pub const LAST_ARTIFACTS_FOR_RUN: &'static str = "\
select * from artifacts where run_id=?1 and (name like \"%(stderr)%\" or name like \"%(stdout)%\") order by id desc limit ?2;";
pub const JOB_BY_COMMIT_ID: &'static str = "\
- select id, source, created_time, remote_id, commit_id from jobs where commit_id=?1;";
+ select id, source, created_time, remote_id, commit_id, run_preferences from jobs where commit_id=?1;";
pub const ARTIFACT_BY_ID: &'static str = "\
select * from artifacts where id=?1 and run_id=?2;";
pub const JOB_BY_ID: &'static str = "\
- select id, source, created_time, remote_id, commit_id from jobs where id=?1";
+ select id, source, created_time, remote_id, commit_id, run_preferences from jobs where id=?1";
pub const METRICS_FOR_RUN: &'static str = "\
select * from metrics where run_id=?1 order by id asc;";
@@ -175,10 +180,10 @@ pub const REMOTES_FOR_REPO: &'static str = "\
select * from remotes where repo_id=?1;";
pub const ALL_REPOS: &'static str = "\
- select * from repos;";
+ select id, repo_name, default_run_preference from repos;";
pub const LAST_JOBS_FROM_REMOTE: &'static str = "\
- select id, source, created_time, remote_id, commit_id from jobs where remote_id=?1 order by created_time desc limit ?2;";
+ select id, source, created_time, remote_id, commit_id, run_preferences from jobs where remote_id=?1 order by created_time desc limit ?2;";
pub const LAST_RUN_FOR_JOB: &'static str = "\
select id,
@@ -195,6 +200,6 @@ pub const LAST_RUN_FOR_JOB: &'static str = "\
final_status from runs where job_id=?1 order by started_time desc limit 1;";
pub const SELECT_ALL_RUNS_WITH_JOB_INFO: &'static str = "\
- select jobs.id as job_id, runs.id as run_id, runs.state, runs.created_time, jobs.commit_id
+ select jobs.id as job_id, runs.id as run_id, runs.state, runs.created_time, jobs.commit_id, jobs.run_preferences
from jobs join runs on jobs.id=runs.job_id
oder by runs.created_time asc;";