From 3a3a8ba8b773d36a0b3c8208e37928e3c6dbe696 Mon Sep 17 00:00:00 2001 From: iximeow Date: Tue, 4 Jul 2023 01:52:55 -0700 Subject: pretty sure this gets host-preferential task assignment in place? --- src/ci_driver.rs | 68 +++++++++++++++++++++++++++++++++++--------------------- 1 file changed, 43 insertions(+), 25 deletions(-) (limited to 'src/ci_driver.rs') diff --git a/src/ci_driver.rs b/src/ci_driver.rs index d8e60e5..c40e6b2 100644 --- a/src/ci_driver.rs +++ b/src/ci_driver.rs @@ -57,9 +57,8 @@ fn reserve_artifacts_dir(run: u64) -> std::io::Result { } } -async fn activate_run(dbctx: Arc, run: &PendingRun, clients: &mut mpsc::Receiver) -> Result<(), String> { +async fn activate_run(dbctx: Arc, candidate: RunnerClient, job: &Job, run: &PendingRun) -> Result<(), String> { eprintln!("activating task {:?}", run); - let job = dbctx.job_by_id(run.job_id).expect("can query").expect("job exists"); let connection = dbctx.conn.lock().unwrap(); @@ -84,28 +83,16 @@ async fn activate_run(dbctx: Arc, run: &PendingRun, clients: &mut mpsc::R eprintln!("running {}", repo_name); - let mut client_job = loop { - let mut candidate = clients.recv().await - .ok_or_else(|| "client channel disconnected".to_string())?; + let res = candidate.submit(&dbctx, &run, &remote_git_url, &commit_sha).await; - if !candidate.will_accept(&job) { - eprintln!("client {:?} would not accept job {:?}", candidate, job); - continue; + let mut client_job = match res { + Ok(Some(mut client_job)) => { client_job } + Ok(None) => { + return Err("client hung up instead of acking task".to_string()); } - - let res = candidate.submit(&dbctx, &run, &remote_git_url, &commit_sha).await; - - match res { - Ok(Some(mut client_job)) => { - break client_job; - } - Ok(None) => { - eprintln!("client hung up instead of acking task"); - } - Err(e) => { - // failed to submit job, move on for now - eprintln!("failed to submit task: {:?}", e); - } + Err(e) => { + // failed to submit job, move on for now + return Err(format!("failed to submit task: {:?}", e)); } }; @@ -521,16 +508,47 @@ async fn main() { spawn(old_task_reaper(Arc::clone(&dbctx))); - loop { - let runs = dbctx.get_pending_runs().unwrap(); + 'accept: loop { + let mut candidate = match channel.recv().await + .ok_or_else(|| "client channel disconnected".to_string()) { + + Ok(candidate) => { candidate }, + Err(e) => { eprintln!("client error: {}", e); continue; } + }; + + // try to find a job for this candidate: + // * start with pending runs - these need *some* client to run them, but do not care which + // * if no new jobs, maybe an existing job still needs a rerun on this client? + // * otherwise, um, i dunno. do nothing? + + let runs = dbctx.get_pending_runs(Some(candidate.host_id)).unwrap(); if runs.len() > 0 { println!("{} new runs", runs.len()); for run in runs.into_iter() { - activate_run(Arc::clone(&dbctx), &run, &mut channel).await; + let job = dbctx.job_by_id(run.job_id).expect("can query").expect("job exists"); + + if candidate.will_accept(&job) { + eprintln!("client {:?} would not accept job {:?}", candidate, job); + activate_run(Arc::clone(&dbctx), candidate, &job, &run).await; + continue 'accept; + } + } } + + let alt_run_jobs = dbctx.jobs_needing_task_runs_for_host(candidate.host_id as u64).expect("can query"); + + for job in alt_run_jobs.iter() { + if candidate.will_accept(&job) { + let run = dbctx.new_run(job.id, Some(candidate.host_id)).unwrap(); + eprintln!("enqueueing job {} for alternate run under host id {}", job.id, candidate.host_id); + activate_run(Arc::clone(&dbctx), candidate, &job, &run).await; + continue 'accept; + } + } + tokio::time::sleep(std::time::Duration::from_millis(100)).await; } } -- cgit v1.1