summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoriximeow <me@iximeow.net>2023-07-04 10:14:44 -0700
committeriximeow <me@iximeow.net>2023-07-04 10:14:44 -0700
commit75697526b512e5597d9bbfcda1bb74e3fc8bd1bb (patch)
treef12792c671d31a17c41ae35b327e4383c8f7d40d
parenta87fba7a82185fd3590cbaa211f78f96c6693472 (diff)
let clients keep connections alive a little longer
-rw-r--r--src/ci_driver.rs48
-rw-r--r--src/ci_runner.rs36
-rw-r--r--src/protocol.rs2
3 files changed, 60 insertions, 26 deletions
diff --git a/src/ci_driver.rs b/src/ci_driver.rs
index c40e6b2..3d6e351 100644
--- a/src/ci_driver.rs
+++ b/src/ci_driver.rs
@@ -245,6 +245,22 @@ impl RunnerClient {
Ok(client)
}
+ async fn test_connection(&mut self) -> Result<(), String> {
+ self.send_typed(&ClientProto::Ping).await?;
+ let resp = self.recv_typed::<ClientProto>().await?;
+ match resp {
+ Some(ClientProto::Pong) => {
+ Ok(())
+ }
+ Some(other) => {
+ Err(format!("unexpected connection test response: {:?}", other))
+ }
+ None => {
+ Err("client hung up".to_string())
+ }
+ }
+ }
+
async fn send(&mut self, msg: serde_json::Value) -> Result<(), String> {
self.send_typed(&msg).await
}
@@ -508,7 +524,7 @@ async fn main() {
spawn(old_task_reaper(Arc::clone(&dbctx)));
- 'accept: loop {
+ loop {
let mut candidate = match channel.recv().await
.ok_or_else(|| "client channel disconnected".to_string()) {
@@ -516,6 +532,15 @@ async fn main() {
Err(e) => { eprintln!("client error: {}", e); continue; }
};
+ let dbctx = Arc::clone(&dbctx);
+ spawn(async move {
+ find_client_task(dbctx, candidate);
+ });
+ }
+}
+
+async fn find_client_task(dbctx: Arc<DbCtx>, mut candidate: RunnerClient) -> Result<(), String> {
+ let (run, job) = 'find_work: loop {
// try to find a job for this candidate:
// * start with pending runs - these need *some* client to run them, but do not care which
// * if no new jobs, maybe an existing job still needs a rerun on this client?
@@ -530,9 +555,7 @@ async fn main() {
let job = dbctx.job_by_id(run.job_id).expect("can query").expect("job exists");
if candidate.will_accept(&job) {
- eprintln!("client {:?} would not accept job {:?}", candidate, job);
- activate_run(Arc::clone(&dbctx), candidate, &job, &run).await;
- continue 'accept;
+ break 'find_work (run, job);
}
}
@@ -540,17 +563,24 @@ async fn main() {
let alt_run_jobs = dbctx.jobs_needing_task_runs_for_host(candidate.host_id as u64).expect("can query");
- for job in alt_run_jobs.iter() {
+ for job in alt_run_jobs.into_iter() {
if candidate.will_accept(&job) {
let run = dbctx.new_run(job.id, Some(candidate.host_id)).unwrap();
- eprintln!("enqueueing job {} for alternate run under host id {}", job.id, candidate.host_id);
- activate_run(Arc::clone(&dbctx), candidate, &job, &run).await;
- continue 'accept;
+ break 'find_work (run, job);
}
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
- }
+
+ if candidate.test_connection().await.is_err() {
+ return Err("lost client connection".to_string());
+ }
+ };
+
+ eprintln!("enqueueing job {} for alternate run under host id {}", job.id, candidate.host_id);
+ activate_run(Arc::clone(&dbctx), candidate, &job, &run).await?;
+
+ Ok(())
}
async fn old_task_reaper(dbctx: Arc<DbCtx>) {
diff --git a/src/ci_runner.rs b/src/ci_runner.rs
index a4d15ef..27da9da 100644
--- a/src/ci_runner.rs
+++ b/src/ci_runner.rs
@@ -386,25 +386,27 @@ impl RunnerClient {
}
async fn wait_for_work(&mut self, accepted_pushers: Option<&[String]>) -> Result<Option<RequestedJob>, WorkAcquireError> {
- match self.rx.chunk().await {
- Ok(Some(chunk)) => {
- eprintln!("got chunk: {:?}", &chunk);
- let proto_message: ClientProto = serde_json::from_slice(&chunk)
- .map_err(|e| {
- WorkAcquireError::Protocol(format!("not json: {:?}", e))
- })?;
- if let ClientProto::NewTask(new_task) = proto_message {
- Ok(Some(new_task))
- } else {
- Err(WorkAcquireError::Protocol(format!("unexpected message: {:?}", proto_message)))
+ loop {
+ let message = self.recv_typed::<ClientProto>().await;
+ eprintln!("got message: {:?}", &message);
+ match message {
+ Ok(Some(ClientProto::NewTask(new_task))) => {
+ return Ok(Some(new_task));
+ },
+ Ok(Some(ClientProto::Ping)) => {
+ self.send_typed(&ClientProto::Pong).await
+ .map_err(|e| WorkAcquireError::Protocol(format!("failed to pong: {}", e)))?;
+ },
+ Ok(Some(other)) => {
+ return Err(WorkAcquireError::Protocol(format!("unexpected message: {:?}", other)));
+ },
+ Ok(None) => {
+ return Ok(None);
+ },
+ Err(e) => {
+ return Err(WorkAcquireError::Protocol(e));
}
}
- Ok(None) => {
- Ok(None)
- },
- Err(e) => {
- Err(WorkAcquireError::Reqwest(e))
- }
}
}
diff --git a/src/protocol.rs b/src/protocol.rs
index 9f79a7c..d7193e4 100644
--- a/src/protocol.rs
+++ b/src/protocol.rs
@@ -11,6 +11,8 @@ pub enum ClientProto {
Metric { name: String, value: String },
Command(CommandInfo),
TaskStatus(TaskInfo),
+ Ping,
+ Pong,
}
#[derive(Serialize, Deserialize, Debug)]