From 1dccbc5b319c9675ef69b34576275d8777375cc8 Mon Sep 17 00:00:00 2001 From: iximeow Date: Sun, 2 Jul 2023 13:52:35 -0700 Subject: add a protocol, host info reporting --- src/ci_driver.rs | 86 ++++++++++++++++++++++++++++---------------------------- 1 file changed, 43 insertions(+), 43 deletions(-) (limited to 'src/ci_driver.rs') diff --git a/src/ci_driver.rs b/src/ci_driver.rs index fca1e6e..710272d 100644 --- a/src/ci_driver.rs +++ b/src/ci_driver.rs @@ -28,10 +28,12 @@ mod dbctx; mod sql; mod notifier; mod io; +mod protocol; use crate::dbctx::{DbCtx, PendingRun, Job, Run}; use crate::sql::JobResult; use crate::sql::RunState; +use crate::protocol::{ClientProto, CommandInfo, TaskInfo, RequestedJob}; lazy_static! { static ref AUTH_SECRET: RwLock> = RwLock::new(None); @@ -151,7 +153,7 @@ impl ClientJob { pub async fn run(&mut self) { loop { eprintln!("waiting on response.."); - let msg = match self.client.recv().await.expect("recv works") { + let msg = match self.client.recv_typed::().await.expect("recv works") { Some(msg) => msg, None => { eprintln!("client hung up. task's done, i hope?"); @@ -159,38 +161,30 @@ impl ClientJob { } }; eprintln!("got {:?}", msg); - let msg_kind = msg.as_object().unwrap().get("kind").unwrap().as_str().unwrap(); - match msg_kind { - "new_task_please" => { + match msg { + ClientProto::NewTaskPlease { allowed_pushers, host_info } => { eprintln!("misdirected task request (after handshake?)"); return; - }, - "task_status" => { - let state = msg.as_object().unwrap().get("state").unwrap().as_str().unwrap(); - let (result, state): (Result, RunState) = if state == "finished" { - let result = msg.as_object().unwrap().get("result").unwrap().as_str().unwrap(); - eprintln!("task update: state is {} and result is {}", state, result); - match result { - "pass" => { - (Ok("success".to_string()), RunState::Finished) - }, - other => { - let desc = msg.as_object().unwrap().get("desc") - .map(|x| x.as_str().unwrap().to_string()) - .unwrap_or_else(|| other.to_string()); - (Err(desc), RunState::Error) + } + ClientProto::TaskStatus(task_info) => { + let (result, state): (Result, RunState) = match task_info { + TaskInfo::Finished { status } => { + eprintln!("task update: state is finished and result is {}", status); + match status.as_str() { + "pass" => { + (Ok("success".to_string()), RunState::Finished) + }, + other => { + eprintln!("unhandled task completion status: {}", other); + (Err(other.to_string()), RunState::Error) + } } + }, + TaskInfo::Interrupted { status, description } => { + eprintln!("task update: state is interrupted and result is {}", status); + let desc = description.unwrap_or_else(|| status.clone()); + (Err(desc), RunState::Error) } - } else if state == "interrupted" { - let result = msg.as_object().unwrap().get("result").unwrap().as_str().unwrap(); - eprintln!("task update: state is {} and result is {}", state, result); - let desc = msg.as_object().unwrap().get("desc") - .map(|x| x.as_str().unwrap().to_string()) - .unwrap_or_else(|| result.to_string()); - (Err(desc), RunState::Error) - } else { - eprintln!("task update: state is {}", state); - (Err(format!("atypical completion status: {}", state)), RunState::Invalid) }; let job = self.dbctx.job_by_id(self.task.job_id).expect("can query").expect("job exists"); @@ -223,26 +217,23 @@ impl ClientJob { ) .expect("can update"); } - "artifact_create" => { - eprintln!("creating artifact {:?}", msg); + ClientProto::ArtifactCreate => { + eprintln!("creating artifact"); self.client.send(serde_json::json!({ "status": "ok", "object_id": "10", })).await.unwrap(); }, - "metric" => { - let name = msg.as_object().unwrap().get("name").unwrap().as_str().unwrap(); - let value = msg.as_object().unwrap().get("value").unwrap().as_str().unwrap(); - - self.dbctx.insert_metric(self.task.id, name, value) + ClientProto::Metric { name, value } => { + self.dbctx.insert_metric(self.task.id, &name, &value) .expect("TODO handle metric insert error?"); } - "command" => { + ClientProto::Command(_command_info) => { // record information about commands, start/stop, etc. probably also allow // artifacts to be attached to commands and default to attaching stdout/stderr? } other => { - eprintln!("unhandled message kind {:?} ({:?})", msg_kind, msg); + eprintln!("unhandled message {:?}", other); } } } @@ -264,7 +255,11 @@ impl RunnerClient { } async fn send(&mut self, msg: serde_json::Value) -> Result<(), String> { - self.tx.send(Ok(serde_json::to_string(&msg).unwrap())) + self.send_typed(&msg).await + } + + async fn send_typed(&mut self, msg: &T) -> Result<(), String> { + self.tx.send(Ok(serde_json::to_string(msg).unwrap())) .await .map_err(|e| e.to_string()) } @@ -286,6 +281,11 @@ impl RunnerClient { } } + async fn recv_typed(&mut self) -> Result, String> { + let json = self.recv().await?; + Ok(json.map(|v| serde_json::from_value(v).unwrap())) + } + // is this client willing to run the job based on what it has told us so far? fn will_accept(&self, job: &Job) -> bool { match (job.source.as_ref(), self.accepted_sources.as_ref()) { @@ -298,10 +298,10 @@ impl RunnerClient { } async fn submit(mut self, dbctx: &Arc, job: &PendingRun, remote_git_url: &str, sha: &str) -> Result, String> { - self.send(serde_json::json!({ - "commit": sha, - "remote_url": remote_git_url, - "build_token": &self.build_token, + self.send_typed(&ClientProto::new_task(RequestedJob { + commit: sha.to_string(), + remote_url: remote_git_url.to_string(), + build_token: self.build_token.to_string(), })).await?; match self.recv().await { Ok(Some(resp)) => { -- cgit v1.1