summaryrefslogtreecommitdiff
path: root/src/ci_driver.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/ci_driver.rs')
-rw-r--r--src/ci_driver.rs48
1 files changed, 39 insertions, 9 deletions
diff --git a/src/ci_driver.rs b/src/ci_driver.rs
index c40e6b2..3d6e351 100644
--- a/src/ci_driver.rs
+++ b/src/ci_driver.rs
@@ -245,6 +245,22 @@ impl RunnerClient {
Ok(client)
}
+ async fn test_connection(&mut self) -> Result<(), String> {
+ self.send_typed(&ClientProto::Ping).await?;
+ let resp = self.recv_typed::<ClientProto>().await?;
+ match resp {
+ Some(ClientProto::Pong) => {
+ Ok(())
+ }
+ Some(other) => {
+ Err(format!("unexpected connection test response: {:?}", other))
+ }
+ None => {
+ Err("client hung up".to_string())
+ }
+ }
+ }
+
async fn send(&mut self, msg: serde_json::Value) -> Result<(), String> {
self.send_typed(&msg).await
}
@@ -508,7 +524,7 @@ async fn main() {
spawn(old_task_reaper(Arc::clone(&dbctx)));
- 'accept: loop {
+ loop {
let mut candidate = match channel.recv().await
.ok_or_else(|| "client channel disconnected".to_string()) {
@@ -516,6 +532,15 @@ async fn main() {
Err(e) => { eprintln!("client error: {}", e); continue; }
};
+ let dbctx = Arc::clone(&dbctx);
+ spawn(async move {
+ find_client_task(dbctx, candidate);
+ });
+ }
+}
+
+async fn find_client_task(dbctx: Arc<DbCtx>, mut candidate: RunnerClient) -> Result<(), String> {
+ let (run, job) = 'find_work: loop {
// try to find a job for this candidate:
// * start with pending runs - these need *some* client to run them, but do not care which
// * if no new jobs, maybe an existing job still needs a rerun on this client?
@@ -530,9 +555,7 @@ async fn main() {
let job = dbctx.job_by_id(run.job_id).expect("can query").expect("job exists");
if candidate.will_accept(&job) {
- eprintln!("client {:?} would not accept job {:?}", candidate, job);
- activate_run(Arc::clone(&dbctx), candidate, &job, &run).await;
- continue 'accept;
+ break 'find_work (run, job);
}
}
@@ -540,17 +563,24 @@ async fn main() {
let alt_run_jobs = dbctx.jobs_needing_task_runs_for_host(candidate.host_id as u64).expect("can query");
- for job in alt_run_jobs.iter() {
+ for job in alt_run_jobs.into_iter() {
if candidate.will_accept(&job) {
let run = dbctx.new_run(job.id, Some(candidate.host_id)).unwrap();
- eprintln!("enqueueing job {} for alternate run under host id {}", job.id, candidate.host_id);
- activate_run(Arc::clone(&dbctx), candidate, &job, &run).await;
- continue 'accept;
+ break 'find_work (run, job);
}
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
- }
+
+ if candidate.test_connection().await.is_err() {
+ return Err("lost client connection".to_string());
+ }
+ };
+
+ eprintln!("enqueueing job {} for alternate run under host id {}", job.id, candidate.host_id);
+ activate_run(Arc::clone(&dbctx), candidate, &job, &run).await?;
+
+ Ok(())
}
async fn old_task_reaper(dbctx: Arc<DbCtx>) {