diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/ci_driver.rs | 48 | ||||
| -rw-r--r-- | src/ci_runner.rs | 36 | ||||
| -rw-r--r-- | src/protocol.rs | 2 | 
3 files changed, 60 insertions, 26 deletions
| diff --git a/src/ci_driver.rs b/src/ci_driver.rs index c40e6b2..3d6e351 100644 --- a/src/ci_driver.rs +++ b/src/ci_driver.rs @@ -245,6 +245,22 @@ impl RunnerClient {          Ok(client)      } +    async fn test_connection(&mut self) -> Result<(), String> { +        self.send_typed(&ClientProto::Ping).await?; +        let resp = self.recv_typed::<ClientProto>().await?; +        match resp { +            Some(ClientProto::Pong) => { +                Ok(()) +            } +            Some(other) => { +                Err(format!("unexpected connection test response: {:?}", other)) +            } +            None => { +                Err("client hung up".to_string()) +            } +        } +    } +      async fn send(&mut self, msg: serde_json::Value) -> Result<(), String> {          self.send_typed(&msg).await      } @@ -508,7 +524,7 @@ async fn main() {      spawn(old_task_reaper(Arc::clone(&dbctx))); -    'accept: loop { +    loop {          let mut candidate = match channel.recv().await              .ok_or_else(|| "client channel disconnected".to_string()) { @@ -516,6 +532,15 @@ async fn main() {              Err(e) => { eprintln!("client error: {}", e); continue; }          }; +        let dbctx = Arc::clone(&dbctx); +        spawn(async move { +            find_client_task(dbctx, candidate); +        }); +    } +} + +async fn find_client_task(dbctx: Arc<DbCtx>, mut candidate: RunnerClient) -> Result<(), String> { +    let (run, job) = 'find_work: loop {          // try to find a job for this candidate:          // * start with pending runs - these need *some* client to run them, but do not care which          // * if no new jobs, maybe an existing job still needs a rerun on this client? @@ -530,9 +555,7 @@ async fn main() {                  let job = dbctx.job_by_id(run.job_id).expect("can query").expect("job exists");                  if candidate.will_accept(&job) { -                    eprintln!("client {:?} would not accept job {:?}", candidate, job); -                    activate_run(Arc::clone(&dbctx), candidate, &job, &run).await; -                    continue 'accept; +                    break 'find_work (run, job);                  }              } @@ -540,17 +563,24 @@ async fn main() {          let alt_run_jobs = dbctx.jobs_needing_task_runs_for_host(candidate.host_id as u64).expect("can query"); -        for job in alt_run_jobs.iter() { +        for job in alt_run_jobs.into_iter() {              if candidate.will_accept(&job) {                  let run = dbctx.new_run(job.id, Some(candidate.host_id)).unwrap(); -                eprintln!("enqueueing job {} for alternate run under host id {}", job.id, candidate.host_id); -                activate_run(Arc::clone(&dbctx), candidate, &job, &run).await; -                continue 'accept; +                break 'find_work (run, job);              }          }          tokio::time::sleep(std::time::Duration::from_millis(100)).await; -    } + +        if candidate.test_connection().await.is_err() { +            return Err("lost client connection".to_string()); +        } +    }; + +    eprintln!("enqueueing job {} for alternate run under host id {}", job.id, candidate.host_id); +    activate_run(Arc::clone(&dbctx), candidate, &job, &run).await?; + +    Ok(())  }  async fn old_task_reaper(dbctx: Arc<DbCtx>) { diff --git a/src/ci_runner.rs b/src/ci_runner.rs index a4d15ef..27da9da 100644 --- a/src/ci_runner.rs +++ b/src/ci_runner.rs @@ -386,25 +386,27 @@ impl RunnerClient {      }      async fn wait_for_work(&mut self, accepted_pushers: Option<&[String]>) -> Result<Option<RequestedJob>, WorkAcquireError> { -        match self.rx.chunk().await { -            Ok(Some(chunk)) => { -                eprintln!("got chunk: {:?}", &chunk); -                let proto_message: ClientProto = serde_json::from_slice(&chunk) -                    .map_err(|e| { -                        WorkAcquireError::Protocol(format!("not json: {:?}", e)) -                    })?; -                if let ClientProto::NewTask(new_task) = proto_message { -                    Ok(Some(new_task)) -                } else { -                    Err(WorkAcquireError::Protocol(format!("unexpected message: {:?}", proto_message))) +        loop { +            let message = self.recv_typed::<ClientProto>().await; +            eprintln!("got message: {:?}", &message); +            match message { +                Ok(Some(ClientProto::NewTask(new_task))) => { +                    return Ok(Some(new_task)); +                }, +                Ok(Some(ClientProto::Ping)) => { +                    self.send_typed(&ClientProto::Pong).await +                        .map_err(|e| WorkAcquireError::Protocol(format!("failed to pong: {}", e)))?; +                }, +                Ok(Some(other)) => { +                    return Err(WorkAcquireError::Protocol(format!("unexpected message: {:?}", other))); +                }, +                Ok(None) => { +                    return Ok(None); +                }, +                Err(e) => { +                    return Err(WorkAcquireError::Protocol(e));                  }              } -            Ok(None) => { -                Ok(None) -            }, -            Err(e) => { -                Err(WorkAcquireError::Reqwest(e)) -            }          }      } diff --git a/src/protocol.rs b/src/protocol.rs index 9f79a7c..d7193e4 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -11,6 +11,8 @@ pub enum ClientProto {      Metric { name: String, value: String },      Command(CommandInfo),      TaskStatus(TaskInfo), +    Ping, +    Pong,  }  #[derive(Serialize, Deserialize, Debug)] | 
