diff options
author | iximeow <me@iximeow.net> | 2023-07-04 10:14:44 -0700 |
---|---|---|
committer | iximeow <me@iximeow.net> | 2023-07-04 10:14:44 -0700 |
commit | 75697526b512e5597d9bbfcda1bb74e3fc8bd1bb (patch) | |
tree | f12792c671d31a17c41ae35b327e4383c8f7d40d /src/ci_runner.rs | |
parent | a87fba7a82185fd3590cbaa211f78f96c6693472 (diff) |
let clients keep connections alive a little longer
Diffstat (limited to 'src/ci_runner.rs')
-rw-r--r-- | src/ci_runner.rs | 36 |
1 files changed, 19 insertions, 17 deletions
diff --git a/src/ci_runner.rs b/src/ci_runner.rs index a4d15ef..27da9da 100644 --- a/src/ci_runner.rs +++ b/src/ci_runner.rs @@ -386,25 +386,27 @@ impl RunnerClient { } async fn wait_for_work(&mut self, accepted_pushers: Option<&[String]>) -> Result<Option<RequestedJob>, WorkAcquireError> { - match self.rx.chunk().await { - Ok(Some(chunk)) => { - eprintln!("got chunk: {:?}", &chunk); - let proto_message: ClientProto = serde_json::from_slice(&chunk) - .map_err(|e| { - WorkAcquireError::Protocol(format!("not json: {:?}", e)) - })?; - if let ClientProto::NewTask(new_task) = proto_message { - Ok(Some(new_task)) - } else { - Err(WorkAcquireError::Protocol(format!("unexpected message: {:?}", proto_message))) + loop { + let message = self.recv_typed::<ClientProto>().await; + eprintln!("got message: {:?}", &message); + match message { + Ok(Some(ClientProto::NewTask(new_task))) => { + return Ok(Some(new_task)); + }, + Ok(Some(ClientProto::Ping)) => { + self.send_typed(&ClientProto::Pong).await + .map_err(|e| WorkAcquireError::Protocol(format!("failed to pong: {}", e)))?; + }, + Ok(Some(other)) => { + return Err(WorkAcquireError::Protocol(format!("unexpected message: {:?}", other))); + }, + Ok(None) => { + return Ok(None); + }, + Err(e) => { + return Err(WorkAcquireError::Protocol(e)); } } - Ok(None) => { - Ok(None) - }, - Err(e) => { - Err(WorkAcquireError::Reqwest(e)) - } } } |