diff options
Diffstat (limited to 'src/ci_runner.rs')
-rw-r--r-- | src/ci_runner.rs | 36 |
1 files changed, 19 insertions, 17 deletions
diff --git a/src/ci_runner.rs b/src/ci_runner.rs index a4d15ef..27da9da 100644 --- a/src/ci_runner.rs +++ b/src/ci_runner.rs @@ -386,25 +386,27 @@ impl RunnerClient { } async fn wait_for_work(&mut self, accepted_pushers: Option<&[String]>) -> Result<Option<RequestedJob>, WorkAcquireError> { - match self.rx.chunk().await { - Ok(Some(chunk)) => { - eprintln!("got chunk: {:?}", &chunk); - let proto_message: ClientProto = serde_json::from_slice(&chunk) - .map_err(|e| { - WorkAcquireError::Protocol(format!("not json: {:?}", e)) - })?; - if let ClientProto::NewTask(new_task) = proto_message { - Ok(Some(new_task)) - } else { - Err(WorkAcquireError::Protocol(format!("unexpected message: {:?}", proto_message))) + loop { + let message = self.recv_typed::<ClientProto>().await; + eprintln!("got message: {:?}", &message); + match message { + Ok(Some(ClientProto::NewTask(new_task))) => { + return Ok(Some(new_task)); + }, + Ok(Some(ClientProto::Ping)) => { + self.send_typed(&ClientProto::Pong).await + .map_err(|e| WorkAcquireError::Protocol(format!("failed to pong: {}", e)))?; + }, + Ok(Some(other)) => { + return Err(WorkAcquireError::Protocol(format!("unexpected message: {:?}", other))); + }, + Ok(None) => { + return Ok(None); + }, + Err(e) => { + return Err(WorkAcquireError::Protocol(e)); } } - Ok(None) => { - Ok(None) - }, - Err(e) => { - Err(WorkAcquireError::Reqwest(e)) - } } } |