From 75697526b512e5597d9bbfcda1bb74e3fc8bd1bb Mon Sep 17 00:00:00 2001 From: iximeow Date: Tue, 4 Jul 2023 10:14:44 -0700 Subject: let clients keep connections alive a little longer --- src/ci_driver.rs | 48 +++++++++++++++++++++++++++++++++++++++--------- src/ci_runner.rs | 36 +++++++++++++++++++----------------- src/protocol.rs | 2 ++ 3 files changed, 60 insertions(+), 26 deletions(-) (limited to 'src') diff --git a/src/ci_driver.rs b/src/ci_driver.rs index c40e6b2..3d6e351 100644 --- a/src/ci_driver.rs +++ b/src/ci_driver.rs @@ -245,6 +245,22 @@ impl RunnerClient { Ok(client) } + async fn test_connection(&mut self) -> Result<(), String> { + self.send_typed(&ClientProto::Ping).await?; + let resp = self.recv_typed::().await?; + match resp { + Some(ClientProto::Pong) => { + Ok(()) + } + Some(other) => { + Err(format!("unexpected connection test response: {:?}", other)) + } + None => { + Err("client hung up".to_string()) + } + } + } + async fn send(&mut self, msg: serde_json::Value) -> Result<(), String> { self.send_typed(&msg).await } @@ -508,7 +524,7 @@ async fn main() { spawn(old_task_reaper(Arc::clone(&dbctx))); - 'accept: loop { + loop { let mut candidate = match channel.recv().await .ok_or_else(|| "client channel disconnected".to_string()) { @@ -516,6 +532,15 @@ async fn main() { Err(e) => { eprintln!("client error: {}", e); continue; } }; + let dbctx = Arc::clone(&dbctx); + spawn(async move { + find_client_task(dbctx, candidate); + }); + } +} + +async fn find_client_task(dbctx: Arc, mut candidate: RunnerClient) -> Result<(), String> { + let (run, job) = 'find_work: loop { // try to find a job for this candidate: // * start with pending runs - these need *some* client to run them, but do not care which // * if no new jobs, maybe an existing job still needs a rerun on this client? @@ -530,9 +555,7 @@ async fn main() { let job = dbctx.job_by_id(run.job_id).expect("can query").expect("job exists"); if candidate.will_accept(&job) { - eprintln!("client {:?} would not accept job {:?}", candidate, job); - activate_run(Arc::clone(&dbctx), candidate, &job, &run).await; - continue 'accept; + break 'find_work (run, job); } } @@ -540,17 +563,24 @@ async fn main() { let alt_run_jobs = dbctx.jobs_needing_task_runs_for_host(candidate.host_id as u64).expect("can query"); - for job in alt_run_jobs.iter() { + for job in alt_run_jobs.into_iter() { if candidate.will_accept(&job) { let run = dbctx.new_run(job.id, Some(candidate.host_id)).unwrap(); - eprintln!("enqueueing job {} for alternate run under host id {}", job.id, candidate.host_id); - activate_run(Arc::clone(&dbctx), candidate, &job, &run).await; - continue 'accept; + break 'find_work (run, job); } } tokio::time::sleep(std::time::Duration::from_millis(100)).await; - } + + if candidate.test_connection().await.is_err() { + return Err("lost client connection".to_string()); + } + }; + + eprintln!("enqueueing job {} for alternate run under host id {}", job.id, candidate.host_id); + activate_run(Arc::clone(&dbctx), candidate, &job, &run).await?; + + Ok(()) } async fn old_task_reaper(dbctx: Arc) { diff --git a/src/ci_runner.rs b/src/ci_runner.rs index a4d15ef..27da9da 100644 --- a/src/ci_runner.rs +++ b/src/ci_runner.rs @@ -386,25 +386,27 @@ impl RunnerClient { } async fn wait_for_work(&mut self, accepted_pushers: Option<&[String]>) -> Result, WorkAcquireError> { - match self.rx.chunk().await { - Ok(Some(chunk)) => { - eprintln!("got chunk: {:?}", &chunk); - let proto_message: ClientProto = serde_json::from_slice(&chunk) - .map_err(|e| { - WorkAcquireError::Protocol(format!("not json: {:?}", e)) - })?; - if let ClientProto::NewTask(new_task) = proto_message { - Ok(Some(new_task)) - } else { - Err(WorkAcquireError::Protocol(format!("unexpected message: {:?}", proto_message))) + loop { + let message = self.recv_typed::().await; + eprintln!("got message: {:?}", &message); + match message { + Ok(Some(ClientProto::NewTask(new_task))) => { + return Ok(Some(new_task)); + }, + Ok(Some(ClientProto::Ping)) => { + self.send_typed(&ClientProto::Pong).await + .map_err(|e| WorkAcquireError::Protocol(format!("failed to pong: {}", e)))?; + }, + Ok(Some(other)) => { + return Err(WorkAcquireError::Protocol(format!("unexpected message: {:?}", other))); + }, + Ok(None) => { + return Ok(None); + }, + Err(e) => { + return Err(WorkAcquireError::Protocol(e)); } } - Ok(None) => { - Ok(None) - }, - Err(e) => { - Err(WorkAcquireError::Reqwest(e)) - } } } diff --git a/src/protocol.rs b/src/protocol.rs index 9f79a7c..d7193e4 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -11,6 +11,8 @@ pub enum ClientProto { Metric { name: String, value: String }, Command(CommandInfo), TaskStatus(TaskInfo), + Ping, + Pong, } #[derive(Serialize, Deserialize, Debug)] -- cgit v1.1