summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/ci_driver.rs50
1 files changed, 28 insertions, 22 deletions
diff --git a/src/ci_driver.rs b/src/ci_driver.rs
index e3cc4c9..c6bef14 100644
--- a/src/ci_driver.rs
+++ b/src/ci_driver.rs
@@ -93,10 +93,19 @@ async fn activate_run(dbctx: Arc<DbCtx>, run: &PendingRun, clients: &mut mpsc::R
continue;
}
- if let Ok(Some(mut client_job)) = candidate.submit(&dbctx, &run, &remote_git_url, &commit_sha).await {
- break client_job;
- } else {
- // failed to submit job, move on for now
+ let res = candidate.submit(&dbctx, &run, &remote_git_url, &commit_sha).await;
+
+ match res {
+ Ok(Some(mut client_job)) => {
+ break client_job;
+ }
+ Ok(None) => {
+ eprintln!("client hung up instead of acking task");
+ }
+ Err(e) => {
+ // failed to submit job, move on for now
+ eprintln!("failed to submit task: {:?}", e);
+ }
}
};
@@ -303,25 +312,22 @@ impl RunnerClient {
remote_url: remote_git_url.to_string(),
build_token: self.build_token.to_string(),
})).await?;
- match self.recv().await {
+ match self.recv_typed::<ClientProto>().await {
+ Ok(Some(ClientProto::Started)) => {
+ let task_witness = Arc::new(());
+ ACTIVE_TASKS.lock().unwrap().insert(job.id, Arc::downgrade(&task_witness));
+ Ok(Some(ClientJob {
+ task: job.clone(),
+ dbctx: Arc::clone(dbctx),
+ sha: sha.to_string(),
+ remote_git_url: remote_git_url.to_string(),
+ client: self,
+ task_witness,
+ }))
+ }
Ok(Some(resp)) => {
- if resp == serde_json::json!({
- "status": "started"
- }) {
- let task_witness = Arc::new(());
- ACTIVE_TASKS.lock().unwrap().insert(job.id, Arc::downgrade(&task_witness));
- eprintln!("resp: {:?}", resp);
- Ok(Some(ClientJob {
- task: job.clone(),
- dbctx: Arc::clone(dbctx),
- sha: sha.to_string(),
- remote_git_url: remote_git_url.to_string(),
- client: self,
- task_witness,
- }))
- } else {
- Err("client rejected job".to_string())
- }
+ eprintln!("invalid response: {:?}", resp);
+ Err("client rejected job".to_string())
}
Ok(None) => {
Ok(None)