summaryrefslogtreecommitdiff
path: root/src/ci_driver.rs
diff options
context:
space:
mode:
authoriximeow <me@iximeow.net>2023-07-04 01:52:55 -0700
committeriximeow <me@iximeow.net>2023-07-04 01:52:55 -0700
commit3a3a8ba8b773d36a0b3c8208e37928e3c6dbe696 (patch)
treee840c014db88f75921d739433db3d42cbeb2a67d /src/ci_driver.rs
parent00b72715aa496363dd3c783bd5b2ff965869c8c3 (diff)
pretty sure this gets host-preferential task assignment in place?
Diffstat (limited to 'src/ci_driver.rs')
-rw-r--r--src/ci_driver.rs68
1 files changed, 43 insertions, 25 deletions
diff --git a/src/ci_driver.rs b/src/ci_driver.rs
index d8e60e5..c40e6b2 100644
--- a/src/ci_driver.rs
+++ b/src/ci_driver.rs
@@ -57,9 +57,8 @@ fn reserve_artifacts_dir(run: u64) -> std::io::Result<PathBuf> {
}
}
-async fn activate_run(dbctx: Arc<DbCtx>, run: &PendingRun, clients: &mut mpsc::Receiver<RunnerClient>) -> Result<(), String> {
+async fn activate_run(dbctx: Arc<DbCtx>, candidate: RunnerClient, job: &Job, run: &PendingRun) -> Result<(), String> {
eprintln!("activating task {:?}", run);
- let job = dbctx.job_by_id(run.job_id).expect("can query").expect("job exists");
let connection = dbctx.conn.lock().unwrap();
@@ -84,28 +83,16 @@ async fn activate_run(dbctx: Arc<DbCtx>, run: &PendingRun, clients: &mut mpsc::R
eprintln!("running {}", repo_name);
- let mut client_job = loop {
- let mut candidate = clients.recv().await
- .ok_or_else(|| "client channel disconnected".to_string())?;
+ let res = candidate.submit(&dbctx, &run, &remote_git_url, &commit_sha).await;
- if !candidate.will_accept(&job) {
- eprintln!("client {:?} would not accept job {:?}", candidate, job);
- continue;
+ let mut client_job = match res {
+ Ok(Some(mut client_job)) => { client_job }
+ Ok(None) => {
+ return Err("client hung up instead of acking task".to_string());
}
-
- let res = candidate.submit(&dbctx, &run, &remote_git_url, &commit_sha).await;
-
- match res {
- Ok(Some(mut client_job)) => {
- break client_job;
- }
- Ok(None) => {
- eprintln!("client hung up instead of acking task");
- }
- Err(e) => {
- // failed to submit job, move on for now
- eprintln!("failed to submit task: {:?}", e);
- }
+ Err(e) => {
+ // failed to submit job, move on for now
+ return Err(format!("failed to submit task: {:?}", e));
}
};
@@ -521,16 +508,47 @@ async fn main() {
spawn(old_task_reaper(Arc::clone(&dbctx)));
- loop {
- let runs = dbctx.get_pending_runs().unwrap();
+ 'accept: loop {
+ let mut candidate = match channel.recv().await
+ .ok_or_else(|| "client channel disconnected".to_string()) {
+
+ Ok(candidate) => { candidate },
+ Err(e) => { eprintln!("client error: {}", e); continue; }
+ };
+
+ // try to find a job for this candidate:
+ // * start with pending runs - these need *some* client to run them, but do not care which
+ // * if no new jobs, maybe an existing job still needs a rerun on this client?
+ // * otherwise, um, i dunno. do nothing?
+
+ let runs = dbctx.get_pending_runs(Some(candidate.host_id)).unwrap();
if runs.len() > 0 {
println!("{} new runs", runs.len());
for run in runs.into_iter() {
- activate_run(Arc::clone(&dbctx), &run, &mut channel).await;
+ let job = dbctx.job_by_id(run.job_id).expect("can query").expect("job exists");
+
+ if candidate.will_accept(&job) {
+ eprintln!("client {:?} would not accept job {:?}", candidate, job);
+ activate_run(Arc::clone(&dbctx), candidate, &job, &run).await;
+ continue 'accept;
+ }
+
}
}
+
+ let alt_run_jobs = dbctx.jobs_needing_task_runs_for_host(candidate.host_id as u64).expect("can query");
+
+ for job in alt_run_jobs.iter() {
+ if candidate.will_accept(&job) {
+ let run = dbctx.new_run(job.id, Some(candidate.host_id)).unwrap();
+ eprintln!("enqueueing job {} for alternate run under host id {}", job.id, candidate.host_id);
+ activate_run(Arc::clone(&dbctx), candidate, &job, &run).await;
+ continue 'accept;
+ }
+ }
+
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
}