From 1dccbc5b319c9675ef69b34576275d8777375cc8 Mon Sep 17 00:00:00 2001 From: iximeow Date: Sun, 2 Jul 2023 13:52:35 -0700 Subject: add a protocol, host info reporting --- src/ci_driver.rs | 86 +++++++++++++++--------------- src/ci_runner.rs | 155 ++++++++++++++++++++++++++++++++++--------------------- src/protocol.rs | 106 +++++++++++++++++++++++++++++++++++++ 3 files changed, 245 insertions(+), 102 deletions(-) create mode 100644 src/protocol.rs (limited to 'src') diff --git a/src/ci_driver.rs b/src/ci_driver.rs index fca1e6e..710272d 100644 --- a/src/ci_driver.rs +++ b/src/ci_driver.rs @@ -28,10 +28,12 @@ mod dbctx; mod sql; mod notifier; mod io; +mod protocol; use crate::dbctx::{DbCtx, PendingRun, Job, Run}; use crate::sql::JobResult; use crate::sql::RunState; +use crate::protocol::{ClientProto, CommandInfo, TaskInfo, RequestedJob}; lazy_static! { static ref AUTH_SECRET: RwLock> = RwLock::new(None); @@ -151,7 +153,7 @@ impl ClientJob { pub async fn run(&mut self) { loop { eprintln!("waiting on response.."); - let msg = match self.client.recv().await.expect("recv works") { + let msg = match self.client.recv_typed::().await.expect("recv works") { Some(msg) => msg, None => { eprintln!("client hung up. task's done, i hope?"); @@ -159,38 +161,30 @@ impl ClientJob { } }; eprintln!("got {:?}", msg); - let msg_kind = msg.as_object().unwrap().get("kind").unwrap().as_str().unwrap(); - match msg_kind { - "new_task_please" => { + match msg { + ClientProto::NewTaskPlease { allowed_pushers, host_info } => { eprintln!("misdirected task request (after handshake?)"); return; - }, - "task_status" => { - let state = msg.as_object().unwrap().get("state").unwrap().as_str().unwrap(); - let (result, state): (Result, RunState) = if state == "finished" { - let result = msg.as_object().unwrap().get("result").unwrap().as_str().unwrap(); - eprintln!("task update: state is {} and result is {}", state, result); - match result { - "pass" => { - (Ok("success".to_string()), RunState::Finished) - }, - other => { - let desc = msg.as_object().unwrap().get("desc") - .map(|x| x.as_str().unwrap().to_string()) - .unwrap_or_else(|| other.to_string()); - (Err(desc), RunState::Error) + } + ClientProto::TaskStatus(task_info) => { + let (result, state): (Result, RunState) = match task_info { + TaskInfo::Finished { status } => { + eprintln!("task update: state is finished and result is {}", status); + match status.as_str() { + "pass" => { + (Ok("success".to_string()), RunState::Finished) + }, + other => { + eprintln!("unhandled task completion status: {}", other); + (Err(other.to_string()), RunState::Error) + } } + }, + TaskInfo::Interrupted { status, description } => { + eprintln!("task update: state is interrupted and result is {}", status); + let desc = description.unwrap_or_else(|| status.clone()); + (Err(desc), RunState::Error) } - } else if state == "interrupted" { - let result = msg.as_object().unwrap().get("result").unwrap().as_str().unwrap(); - eprintln!("task update: state is {} and result is {}", state, result); - let desc = msg.as_object().unwrap().get("desc") - .map(|x| x.as_str().unwrap().to_string()) - .unwrap_or_else(|| result.to_string()); - (Err(desc), RunState::Error) - } else { - eprintln!("task update: state is {}", state); - (Err(format!("atypical completion status: {}", state)), RunState::Invalid) }; let job = self.dbctx.job_by_id(self.task.job_id).expect("can query").expect("job exists"); @@ -223,26 +217,23 @@ impl ClientJob { ) .expect("can update"); } - "artifact_create" => { - eprintln!("creating artifact {:?}", msg); + ClientProto::ArtifactCreate => { + eprintln!("creating artifact"); self.client.send(serde_json::json!({ "status": "ok", "object_id": "10", })).await.unwrap(); }, - "metric" => { - let name = msg.as_object().unwrap().get("name").unwrap().as_str().unwrap(); - let value = msg.as_object().unwrap().get("value").unwrap().as_str().unwrap(); - - self.dbctx.insert_metric(self.task.id, name, value) + ClientProto::Metric { name, value } => { + self.dbctx.insert_metric(self.task.id, &name, &value) .expect("TODO handle metric insert error?"); } - "command" => { + ClientProto::Command(_command_info) => { // record information about commands, start/stop, etc. probably also allow // artifacts to be attached to commands and default to attaching stdout/stderr? } other => { - eprintln!("unhandled message kind {:?} ({:?})", msg_kind, msg); + eprintln!("unhandled message {:?}", other); } } } @@ -264,7 +255,11 @@ impl RunnerClient { } async fn send(&mut self, msg: serde_json::Value) -> Result<(), String> { - self.tx.send(Ok(serde_json::to_string(&msg).unwrap())) + self.send_typed(&msg).await + } + + async fn send_typed(&mut self, msg: &T) -> Result<(), String> { + self.tx.send(Ok(serde_json::to_string(msg).unwrap())) .await .map_err(|e| e.to_string()) } @@ -286,6 +281,11 @@ impl RunnerClient { } } + async fn recv_typed(&mut self) -> Result, String> { + let json = self.recv().await?; + Ok(json.map(|v| serde_json::from_value(v).unwrap())) + } + // is this client willing to run the job based on what it has told us so far? fn will_accept(&self, job: &Job) -> bool { match (job.source.as_ref(), self.accepted_sources.as_ref()) { @@ -298,10 +298,10 @@ impl RunnerClient { } async fn submit(mut self, dbctx: &Arc, job: &PendingRun, remote_git_url: &str, sha: &str) -> Result, String> { - self.send(serde_json::json!({ - "commit": sha, - "remote_url": remote_git_url, - "build_token": &self.build_token, + self.send_typed(&ClientProto::new_task(RequestedJob { + commit: sha.to_string(), + remote_url: remote_git_url.to_string(), + build_token: self.build_token.to_string(), })).await?; match self.recv().await { Ok(Some(resp)) => { diff --git a/src/ci_runner.rs b/src/ci_runner.rs index 3ecacdb..11d8566 100644 --- a/src/ci_runner.rs +++ b/src/ci_runner.rs @@ -13,10 +13,12 @@ use std::task::{Context, Poll}; use std::pin::Pin; use std::marker::Unpin; +mod protocol; mod lua; mod io; use crate::io::ArtifactStream; +use crate::protocol::{ClientProto, CommandInfo, TaskInfo, RequestedJob}; #[derive(Debug)] enum WorkAcquireError { @@ -33,13 +35,6 @@ struct RunnerClient { current_job: Option, } -#[derive(Debug, Serialize, Deserialize)] -struct RequestedJob { - commit: String, - remote_url: String, - build_token: String, -} - impl RequestedJob { pub fn into_running(self, client: RunnerClient) -> RunningJob { RunningJob { @@ -116,11 +111,8 @@ impl StepTracker { impl RunningJob { async fn send_metric(&mut self, name: &str, value: String) -> Result<(), String> { - self.client.send(serde_json::json!({ - "kind": "metric", - "name": name, - "value": value, - })).await + self.client.send_typed(&ClientProto::metric(name, value)) + .await .map_err(|e| format!("failed to send metric {}: {:?})", name, e)) } @@ -224,9 +216,7 @@ impl RunningJob { } async fn run(mut self) { - self.client.send(serde_json::json!({ - "status": "started" - })).await.unwrap(); + self.client.send_typed(&ClientProto::Started).await.unwrap(); std::fs::remove_dir_all("tmpdir").unwrap(); std::fs::create_dir("tmpdir").unwrap(); @@ -237,14 +227,10 @@ impl RunningJob { if let Err(e) = checkout_res { let status = "bad_ref"; - let status = serde_json::json!({ - "kind": "task_status", - "state": "finished", - "result": status, - }); - eprintln!("checkout failed, reporting status: {}", status); - - let res = ctx.lock().unwrap().client.send(status).await; + let status = ClientProto::task_status(TaskInfo::finished(status)); + eprintln!("checkout failed, reporting status: {:?}", status); + + let res = ctx.lock().unwrap().client.send_typed(&status).await; if let Err(e) = res { eprintln!("[!] FAILED TO REPORT JOB STATUS ({}): {:?}", "success", e); } @@ -285,14 +271,10 @@ impl RunningJob { match res { Ok(status) => { eprintln!("[+] job success!"); - let status = serde_json::json!({ - "kind": "task_status", - "state": "finished", - "result": status - }); - eprintln!("reporting status: {}", status); - - let res = ctx.lock().unwrap().client.send(status).await; + let status = ClientProto::task_status(TaskInfo::finished(status)); + eprintln!("reporting status: {:?}", status); + + let res = ctx.lock().unwrap().client.send_typed(&status).await; if let Err(e) = res { eprintln!("[!] FAILED TO REPORT JOB STATUS ({}): {:?}", "success", e); } @@ -300,27 +282,18 @@ impl RunningJob { Err((status, lua_err)) => { eprintln!("[-] job error: {}", status); - let res = ctx.lock().unwrap().client.send(serde_json::json!({ - "kind": "task_status", - "state": "interrupted", - "result": status, - "desc": lua_err.to_string(), - })).await; + let status = ClientProto::task_status(TaskInfo::interrupted(status, lua_err.to_string())); + let res = ctx.lock().unwrap().client.send_typed(&status).await; if let Err(e) = res { - eprintln!("[!] FAILED TO REPORT JOB STATUS ({}): {:?}", status, e); + eprintln!("[!] FAILED TO REPORT JOB STATUS ({:?}): {:?}", status, e); } } } } async fn run_command(&mut self, command: &[String], working_dir: Option<&str>) -> Result<(), String> { - self.client.send(serde_json::json!({ - "kind": "command", - "state": "started", - "command": command, - "cwd": working_dir, - "id": 1, - })).await.unwrap(); + self.client.send_typed(&ClientProto::command(CommandInfo::started(command, working_dir, 1))) + .await.unwrap(); let mut cmd = Command::new(&command[0]); let cwd = match working_dir { @@ -339,12 +312,8 @@ impl RunningJob { let cmd_res = self.execute_command(cmd, &format!("{} log", human_name), &human_name).await?; - self.client.send(serde_json::json!({ - "kind": "command", - "state": "finished", - "exit_code": cmd_res.code(), - "id": 1, - })).await.unwrap(); + self.client.send_typed(&ClientProto::command(CommandInfo::finished(cmd_res.code(), 1))) + .await.unwrap(); if !cmd_res.success() { @@ -383,11 +352,15 @@ impl RunnerClient { match self.rx.chunk().await { Ok(Some(chunk)) => { eprintln!("got chunk: {:?}", &chunk); - serde_json::from_slice(&chunk) - .map(Option::Some) + let proto_message: ClientProto = serde_json::from_slice(&chunk) .map_err(|e| { WorkAcquireError::Protocol(format!("not json: {:?}", e)) - }) + })?; + if let ClientProto::NewTask(new_task) = proto_message { + Ok(Some(new_task)) + } else { + Err(WorkAcquireError::Protocol(format!("unexpected message: {:?}", proto_message))) + } } Ok(None) => { Ok(None) @@ -419,8 +392,12 @@ impl RunnerClient { } async fn send(&mut self, value: serde_json::Value) -> Result<(), String> { + self.send_typed(&value).await + } + + async fn send_typed(&mut self, t: &T) -> Result<(), String> { self.tx.send_data( - serde_json::to_vec(&value) + serde_json::to_vec(t) .map_err(|e| format!("json error: {:?}", e))? .into() ).await @@ -448,13 +425,16 @@ async fn main() { .build() .expect("can build client"); + let host_info = host_info::collect_host_info(); + eprintln!("host info: {:?}", host_info); + loop { let (mut sender, body) = hyper::Body::channel(); - sender.send_data(serde_json::to_string(&json!({ - "kind": "new_job_please", - "accepted_pushers": &runner_config.allowed_pushers, - })).unwrap().into()).await.expect("req"); + sender.send_data(serde_json::to_string(&ClientProto::new_task_please( + runner_config.allowed_pushers.clone(), + host_info.clone(), + )).unwrap().into()).await.expect("req"); let poll = client.post("https://ci.butactuallyin.space:9876/api/next_job") .header("user-agent", "ci-butactuallyin-space-runner") @@ -510,3 +490,60 @@ async fn main() { } } } + +mod host_info { + use crate::protocol::{CpuInfo, EnvInfo, HostInfo, MemoryInfo}; + + // get host model name, microcode, and how many cores + fn collect_cpu_info() -> CpuInfo { + let cpu_lines: Vec = std::fs::read_to_string("/proc/cpuinfo").unwrap().split("\n").map(|line| line.to_string()).collect(); + let model_names: Vec<&String> = cpu_lines.iter().filter(|line| line.starts_with("model name")).collect(); + let model = model_names.first().expect("can get model name").to_string().split(":").last().unwrap().trim().to_string(); + let cores = model_names.len() as u32; + let microcode = cpu_lines.iter().find(|line| line.starts_with("microcode")).expect("microcode line is present").split(":").last().unwrap().trim().to_string(); + + CpuInfo { model, microcode, cores } + } + + fn collect_mem_info() -> MemoryInfo { + let mem_lines: Vec = std::fs::read_to_string("/proc/meminfo").unwrap().split("\n").map(|line| line.to_string()).collect(); + let total = mem_lines[0].split(":").last().unwrap().trim().to_string(); + let available = mem_lines[2].split(":").last().unwrap().trim().to_string(); + + MemoryInfo { total, available } + } + + fn hostname() -> String { + let mut bytes = [0u8; 4096]; + let res = unsafe { + libc::gethostname(bytes.as_mut_ptr() as *mut i8, bytes.len()) + }; + if res != 0 { + panic!("gethostname failed {:?}", res); + } + let end = bytes.iter().position(|b| *b == 0).expect("hostname is null-terminated"); + std::ffi::CStr::from_bytes_with_nul(&bytes[..end+1]).expect("null-terminated string").to_str().expect("is utf-8").to_string() + } + + pub fn collect_env_info() -> EnvInfo { + EnvInfo { + arch: std::env::consts::ARCH.to_string(), + family: std::env::consts::FAMILY.to_string(), + os: std::env::consts::OS.to_string(), + } + } + + pub fn collect_host_info() -> HostInfo { + let cpu_info = collect_cpu_info(); + let memory_info = collect_mem_info(); +// let hostname = hostname(); + let env_info = collect_env_info(); + + HostInfo { + cpu_info, + memory_info, + env_info, + } + } +} + diff --git a/src/protocol.rs b/src/protocol.rs new file mode 100644 index 0000000..829dde1 --- /dev/null +++ b/src/protocol.rs @@ -0,0 +1,106 @@ +use serde::{Serialize, Deserialize}; + +#[derive(Serialize, Deserialize, Debug)] +#[serde(tag = "kind")] +#[serde(rename_all = "snake_case")] +pub enum ClientProto { + Started, + ArtifactCreate, + NewTask(RequestedJob), + NewTaskPlease { allowed_pushers: Option>, host_info: HostInfo }, + Metric { name: String, value: String }, + Command(CommandInfo), + TaskStatus(TaskInfo), +} + +#[derive(Serialize, Deserialize, Debug)] +#[serde(tag = "kind")] +#[serde(rename_all = "snake_case")] +pub enum CommandInfo { + Started { command: Vec, cwd: Option, id: u32 }, + Finished { exit_code: Option, id: u32 }, +} + +#[derive(Serialize, Deserialize, Debug)] +#[serde(tag = "kind")] +#[serde(rename_all = "snake_case")] +pub enum TaskInfo { + Finished { status: String }, + Interrupted { status: String, description: Option }, +} + +impl ClientProto { + pub fn metric(name: impl Into, value: impl Into) -> Self { + ClientProto::Metric { name: name.into(), value: value.into() } + } + + pub fn command(state: CommandInfo) -> Self { + ClientProto::Command(state) + } + + pub fn new_task_please(allowed_pushers: Option>, host_info: HostInfo) -> Self { + ClientProto::NewTaskPlease { allowed_pushers, host_info } + } + + pub fn task_status(state: TaskInfo) -> Self { + ClientProto::TaskStatus(state) + } + + pub fn new_task(task: RequestedJob) -> Self { + ClientProto::NewTask(task) + } +} + +impl CommandInfo { + pub fn started(command: impl Into>, cwd: Option<&str>, id: u32) -> Self { + CommandInfo::Started { command: command.into(), cwd: cwd.map(ToOwned::to_owned), id } + } + + pub fn finished(exit_code: Option, id: u32) -> Self { + CommandInfo::Finished { exit_code, id } + } +} + +impl TaskInfo { + pub fn finished(status: impl Into) -> Self { + TaskInfo::Finished { status: status.into() } + } + + pub fn interrupted(status: impl Into, description: impl Into>) -> Self { + TaskInfo::Interrupted { status: status.into(), description: description.into() } + } +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct HostInfo { + pub cpu_info: CpuInfo, + pub memory_info: MemoryInfo, + pub env_info: EnvInfo, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct CpuInfo { + pub model: String, + pub microcode: String, + pub cores: u32, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct MemoryInfo { + pub total: String, + pub available: String, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct EnvInfo { + pub arch: String, + pub family: String, + pub os: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RequestedJob { + pub commit: String, + pub remote_url: String, + pub build_token: String, +} -- cgit v1.1