summaryrefslogtreecommitdiff
path: root/src/ci_runner.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/ci_runner.rs')
-rw-r--r--src/ci_runner.rs36
1 files changed, 19 insertions, 17 deletions
diff --git a/src/ci_runner.rs b/src/ci_runner.rs
index a4d15ef..27da9da 100644
--- a/src/ci_runner.rs
+++ b/src/ci_runner.rs
@@ -386,25 +386,27 @@ impl RunnerClient {
}
async fn wait_for_work(&mut self, accepted_pushers: Option<&[String]>) -> Result<Option<RequestedJob>, WorkAcquireError> {
- match self.rx.chunk().await {
- Ok(Some(chunk)) => {
- eprintln!("got chunk: {:?}", &chunk);
- let proto_message: ClientProto = serde_json::from_slice(&chunk)
- .map_err(|e| {
- WorkAcquireError::Protocol(format!("not json: {:?}", e))
- })?;
- if let ClientProto::NewTask(new_task) = proto_message {
- Ok(Some(new_task))
- } else {
- Err(WorkAcquireError::Protocol(format!("unexpected message: {:?}", proto_message)))
+ loop {
+ let message = self.recv_typed::<ClientProto>().await;
+ eprintln!("got message: {:?}", &message);
+ match message {
+ Ok(Some(ClientProto::NewTask(new_task))) => {
+ return Ok(Some(new_task));
+ },
+ Ok(Some(ClientProto::Ping)) => {
+ self.send_typed(&ClientProto::Pong).await
+ .map_err(|e| WorkAcquireError::Protocol(format!("failed to pong: {}", e)))?;
+ },
+ Ok(Some(other)) => {
+ return Err(WorkAcquireError::Protocol(format!("unexpected message: {:?}", other)));
+ },
+ Ok(None) => {
+ return Ok(None);
+ },
+ Err(e) => {
+ return Err(WorkAcquireError::Protocol(e));
}
}
- Ok(None) => {
- Ok(None)
- },
- Err(e) => {
- Err(WorkAcquireError::Reqwest(e))
- }
}
}