From 75697526b512e5597d9bbfcda1bb74e3fc8bd1bb Mon Sep 17 00:00:00 2001 From: iximeow Date: Tue, 4 Jul 2023 10:14:44 -0700 Subject: let clients keep connections alive a little longer --- src/ci_runner.rs | 36 +++++++++++++++++++----------------- 1 file changed, 19 insertions(+), 17 deletions(-) (limited to 'src/ci_runner.rs') diff --git a/src/ci_runner.rs b/src/ci_runner.rs index a4d15ef..27da9da 100644 --- a/src/ci_runner.rs +++ b/src/ci_runner.rs @@ -386,25 +386,27 @@ impl RunnerClient { } async fn wait_for_work(&mut self, accepted_pushers: Option<&[String]>) -> Result, WorkAcquireError> { - match self.rx.chunk().await { - Ok(Some(chunk)) => { - eprintln!("got chunk: {:?}", &chunk); - let proto_message: ClientProto = serde_json::from_slice(&chunk) - .map_err(|e| { - WorkAcquireError::Protocol(format!("not json: {:?}", e)) - })?; - if let ClientProto::NewTask(new_task) = proto_message { - Ok(Some(new_task)) - } else { - Err(WorkAcquireError::Protocol(format!("unexpected message: {:?}", proto_message))) + loop { + let message = self.recv_typed::().await; + eprintln!("got message: {:?}", &message); + match message { + Ok(Some(ClientProto::NewTask(new_task))) => { + return Ok(Some(new_task)); + }, + Ok(Some(ClientProto::Ping)) => { + self.send_typed(&ClientProto::Pong).await + .map_err(|e| WorkAcquireError::Protocol(format!("failed to pong: {}", e)))?; + }, + Ok(Some(other)) => { + return Err(WorkAcquireError::Protocol(format!("unexpected message: {:?}", other))); + }, + Ok(None) => { + return Ok(None); + }, + Err(e) => { + return Err(WorkAcquireError::Protocol(e)); } } - Ok(None) => { - Ok(None) - }, - Err(e) => { - Err(WorkAcquireError::Reqwest(e)) - } } } -- cgit v1.1