diff options
| author | iximeow <me@iximeow.net> | 2023-07-02 13:52:35 -0700 | 
|---|---|---|
| committer | iximeow <me@iximeow.net> | 2023-07-02 13:52:35 -0700 | 
| commit | 1dccbc5b319c9675ef69b34576275d8777375cc8 (patch) | |
| tree | 7db7ef102199933417a457a82843834a7b259924 | |
| parent | 69169e6324c82a2a2ce2b0aa86a5a226763054f3 (diff) | |
add a protocol, host info reporting
| -rw-r--r-- | src/ci_driver.rs | 86 | ||||
| -rw-r--r-- | src/ci_runner.rs | 155 | ||||
| -rw-r--r-- | src/protocol.rs | 106 | 
3 files changed, 245 insertions, 102 deletions
| diff --git a/src/ci_driver.rs b/src/ci_driver.rs index fca1e6e..710272d 100644 --- a/src/ci_driver.rs +++ b/src/ci_driver.rs @@ -28,10 +28,12 @@ mod dbctx;  mod sql;  mod notifier;  mod io; +mod protocol;  use crate::dbctx::{DbCtx, PendingRun, Job, Run};  use crate::sql::JobResult;  use crate::sql::RunState; +use crate::protocol::{ClientProto, CommandInfo, TaskInfo, RequestedJob};  lazy_static! {      static ref AUTH_SECRET: RwLock<Option<String>> = RwLock::new(None); @@ -151,7 +153,7 @@ impl ClientJob {      pub async fn run(&mut self) {          loop {              eprintln!("waiting on response.."); -            let msg = match self.client.recv().await.expect("recv works") { +            let msg = match self.client.recv_typed::<ClientProto>().await.expect("recv works") {                  Some(msg) => msg,                  None => {                      eprintln!("client hung up. task's done, i hope?"); @@ -159,38 +161,30 @@ impl ClientJob {                  }              };              eprintln!("got {:?}", msg); -            let msg_kind = msg.as_object().unwrap().get("kind").unwrap().as_str().unwrap(); -            match msg_kind { -                "new_task_please" => { +            match msg { +                ClientProto::NewTaskPlease { allowed_pushers, host_info } => {                      eprintln!("misdirected task request (after handshake?)");                      return; -                }, -                "task_status" => { -                    let state = msg.as_object().unwrap().get("state").unwrap().as_str().unwrap(); -                    let (result, state): (Result<String, String>, RunState) = if state == "finished" { -                        let result = msg.as_object().unwrap().get("result").unwrap().as_str().unwrap(); -                        eprintln!("task update: state is {} and result is {}", state, result); -                        match result { -                            "pass" => { -                                (Ok("success".to_string()), RunState::Finished) -                            }, -                            other => { -                                let desc = msg.as_object().unwrap().get("desc") -                                    .map(|x| x.as_str().unwrap().to_string()) -                                    .unwrap_or_else(|| other.to_string()); -                                (Err(desc), RunState::Error) +                } +                ClientProto::TaskStatus(task_info) => { +                    let (result, state): (Result<String, String>, RunState) = match task_info { +                        TaskInfo::Finished { status } => { +                            eprintln!("task update: state is finished and result is {}", status); +                            match status.as_str() { +                                "pass" => { +                                    (Ok("success".to_string()), RunState::Finished) +                                }, +                                other => { +                                    eprintln!("unhandled task completion status: {}", other); +                                    (Err(other.to_string()), RunState::Error) +                                }                              } +                        }, +                        TaskInfo::Interrupted { status, description } => { +                            eprintln!("task update: state is interrupted and result is {}", status); +                            let desc = description.unwrap_or_else(|| status.clone()); +                            (Err(desc), RunState::Error)                          } -                    } else if state == "interrupted" { -                        let result = msg.as_object().unwrap().get("result").unwrap().as_str().unwrap(); -                        eprintln!("task update: state is {} and result is {}", state, result); -                        let desc = msg.as_object().unwrap().get("desc") -                            .map(|x| x.as_str().unwrap().to_string()) -                            .unwrap_or_else(|| result.to_string()); -                        (Err(desc), RunState::Error) -                    } else { -                        eprintln!("task update: state is {}", state); -                        (Err(format!("atypical completion status: {}", state)), RunState::Invalid)                      };                      let job = self.dbctx.job_by_id(self.task.job_id).expect("can query").expect("job exists"); @@ -223,26 +217,23 @@ impl ClientJob {                      )                          .expect("can update");                  } -                "artifact_create" => { -                    eprintln!("creating artifact {:?}", msg); +                ClientProto::ArtifactCreate => { +                    eprintln!("creating artifact");                      self.client.send(serde_json::json!({                          "status": "ok",                          "object_id": "10",                      })).await.unwrap();                  }, -                "metric" => { -                    let name = msg.as_object().unwrap().get("name").unwrap().as_str().unwrap(); -                    let value = msg.as_object().unwrap().get("value").unwrap().as_str().unwrap(); - -                    self.dbctx.insert_metric(self.task.id, name, value) +                ClientProto::Metric { name, value } => { +                    self.dbctx.insert_metric(self.task.id, &name, &value)                          .expect("TODO handle metric insert error?");                  } -                "command" => { +                ClientProto::Command(_command_info) => {                      // record information about commands, start/stop, etc. probably also allow                      // artifacts to be attached to commands and default to attaching stdout/stderr?                  }                  other => { -                    eprintln!("unhandled message kind {:?} ({:?})", msg_kind, msg); +                    eprintln!("unhandled message {:?}", other);                  }              }          } @@ -264,7 +255,11 @@ impl RunnerClient {      }      async fn send(&mut self, msg: serde_json::Value) -> Result<(), String> { -        self.tx.send(Ok(serde_json::to_string(&msg).unwrap())) +        self.send_typed(&msg).await +    } + +    async fn send_typed<T: serde::Serialize>(&mut self, msg: &T) -> Result<(), String> { +        self.tx.send(Ok(serde_json::to_string(msg).unwrap()))              .await              .map_err(|e| e.to_string())      } @@ -286,6 +281,11 @@ impl RunnerClient {          }      } +    async fn recv_typed<T: serde::de::DeserializeOwned>(&mut self) -> Result<Option<T>, String> { +        let json = self.recv().await?; +        Ok(json.map(|v| serde_json::from_value(v).unwrap())) +    } +      // is this client willing to run the job based on what it has told us so far?      fn will_accept(&self, job: &Job) -> bool {          match (job.source.as_ref(), self.accepted_sources.as_ref()) { @@ -298,10 +298,10 @@ impl RunnerClient {      }      async fn submit(mut self, dbctx: &Arc<DbCtx>, job: &PendingRun, remote_git_url: &str, sha: &str) -> Result<Option<ClientJob>, String> { -        self.send(serde_json::json!({ -            "commit": sha, -            "remote_url": remote_git_url, -            "build_token": &self.build_token, +        self.send_typed(&ClientProto::new_task(RequestedJob { +            commit: sha.to_string(), +            remote_url: remote_git_url.to_string(), +            build_token: self.build_token.to_string(),          })).await?;          match self.recv().await {              Ok(Some(resp)) => { diff --git a/src/ci_runner.rs b/src/ci_runner.rs index 3ecacdb..11d8566 100644 --- a/src/ci_runner.rs +++ b/src/ci_runner.rs @@ -13,10 +13,12 @@ use std::task::{Context, Poll};  use std::pin::Pin;  use std::marker::Unpin; +mod protocol;  mod lua;  mod io;  use crate::io::ArtifactStream; +use crate::protocol::{ClientProto, CommandInfo, TaskInfo, RequestedJob};  #[derive(Debug)]  enum WorkAcquireError { @@ -33,13 +35,6 @@ struct RunnerClient {      current_job: Option<RequestedJob>,  } -#[derive(Debug, Serialize, Deserialize)] -struct RequestedJob { -    commit: String, -    remote_url: String, -    build_token: String, -} -  impl RequestedJob {      pub fn into_running(self, client: RunnerClient) -> RunningJob {          RunningJob { @@ -116,11 +111,8 @@ impl StepTracker {  impl RunningJob {      async fn send_metric(&mut self, name: &str, value: String) -> Result<(), String> { -        self.client.send(serde_json::json!({ -            "kind": "metric", -            "name": name, -            "value": value, -        })).await +        self.client.send_typed(&ClientProto::metric(name, value)) +            .await              .map_err(|e| format!("failed to send metric {}: {:?})", name, e))      } @@ -224,9 +216,7 @@ impl RunningJob {      }      async fn run(mut self) { -        self.client.send(serde_json::json!({ -            "status": "started" -        })).await.unwrap(); +        self.client.send_typed(&ClientProto::Started).await.unwrap();          std::fs::remove_dir_all("tmpdir").unwrap();          std::fs::create_dir("tmpdir").unwrap(); @@ -237,14 +227,10 @@ impl RunningJob {          if let Err(e) = checkout_res {              let status = "bad_ref"; -            let status = serde_json::json!({ -                "kind": "task_status", -                "state": "finished", -                "result": status, -            }); -            eprintln!("checkout failed, reporting status: {}", status); - -            let res = ctx.lock().unwrap().client.send(status).await; +            let status = ClientProto::task_status(TaskInfo::finished(status)); +            eprintln!("checkout failed, reporting status: {:?}", status); + +            let res = ctx.lock().unwrap().client.send_typed(&status).await;              if let Err(e) = res {                  eprintln!("[!] FAILED TO REPORT JOB STATUS ({}): {:?}", "success", e);              } @@ -285,14 +271,10 @@ impl RunningJob {          match res {              Ok(status) => {                  eprintln!("[+] job success!"); -                let status = serde_json::json!({ -                    "kind": "task_status", -                    "state": "finished", -                    "result": status -                }); -                eprintln!("reporting status: {}", status); - -                let res = ctx.lock().unwrap().client.send(status).await; +                let status = ClientProto::task_status(TaskInfo::finished(status)); +                eprintln!("reporting status: {:?}", status); + +                let res = ctx.lock().unwrap().client.send_typed(&status).await;                  if let Err(e) = res {                      eprintln!("[!] FAILED TO REPORT JOB STATUS ({}): {:?}", "success", e);                  } @@ -300,27 +282,18 @@ impl RunningJob {              Err((status, lua_err)) => {                  eprintln!("[-] job error: {}", status); -                let res = ctx.lock().unwrap().client.send(serde_json::json!({ -                    "kind": "task_status", -                    "state": "interrupted", -                    "result": status, -                    "desc": lua_err.to_string(), -                })).await; +                let status = ClientProto::task_status(TaskInfo::interrupted(status, lua_err.to_string())); +                let res = ctx.lock().unwrap().client.send_typed(&status).await;                  if let Err(e) = res { -                    eprintln!("[!] FAILED TO REPORT JOB STATUS ({}): {:?}", status, e); +                    eprintln!("[!] FAILED TO REPORT JOB STATUS ({:?}): {:?}", status, e);                  }              }          }      }      async fn run_command(&mut self, command: &[String], working_dir: Option<&str>) -> Result<(), String> { -        self.client.send(serde_json::json!({ -            "kind": "command", -            "state": "started", -            "command": command, -            "cwd": working_dir, -            "id": 1, -        })).await.unwrap(); +        self.client.send_typed(&ClientProto::command(CommandInfo::started(command, working_dir, 1))) +            .await.unwrap();          let mut cmd = Command::new(&command[0]);          let cwd = match working_dir { @@ -339,12 +312,8 @@ impl RunningJob {          let cmd_res = self.execute_command(cmd, &format!("{} log", human_name), &human_name).await?; -        self.client.send(serde_json::json!({ -            "kind": "command", -            "state": "finished", -            "exit_code": cmd_res.code(), -            "id": 1, -        })).await.unwrap(); +        self.client.send_typed(&ClientProto::command(CommandInfo::finished(cmd_res.code(), 1))) +            .await.unwrap();          if !cmd_res.success() { @@ -383,11 +352,15 @@ impl RunnerClient {          match self.rx.chunk().await {              Ok(Some(chunk)) => {                  eprintln!("got chunk: {:?}", &chunk); -                serde_json::from_slice(&chunk) -                    .map(Option::Some) +                let proto_message: ClientProto = serde_json::from_slice(&chunk)                      .map_err(|e| {                          WorkAcquireError::Protocol(format!("not json: {:?}", e)) -                    }) +                    })?; +                if let ClientProto::NewTask(new_task) = proto_message { +                    Ok(Some(new_task)) +                } else { +                    Err(WorkAcquireError::Protocol(format!("unexpected message: {:?}", proto_message))) +                }              }              Ok(None) => {                  Ok(None) @@ -419,8 +392,12 @@ impl RunnerClient {      }      async fn send(&mut self, value: serde_json::Value) -> Result<(), String> { +        self.send_typed(&value).await +    } + +    async fn send_typed<T: Serialize>(&mut self, t: &T) -> Result<(), String> {          self.tx.send_data( -            serde_json::to_vec(&value) +            serde_json::to_vec(t)                  .map_err(|e| format!("json error: {:?}", e))?                  .into()          ).await @@ -448,13 +425,16 @@ async fn main() {          .build()          .expect("can build client"); +    let host_info = host_info::collect_host_info(); +    eprintln!("host info: {:?}", host_info); +      loop {          let (mut sender, body) = hyper::Body::channel(); -        sender.send_data(serde_json::to_string(&json!({ -            "kind": "new_job_please", -            "accepted_pushers": &runner_config.allowed_pushers, -        })).unwrap().into()).await.expect("req"); +        sender.send_data(serde_json::to_string(&ClientProto::new_task_please( +            runner_config.allowed_pushers.clone(), +            host_info.clone(), +        )).unwrap().into()).await.expect("req");          let poll = client.post("https://ci.butactuallyin.space:9876/api/next_job")              .header("user-agent", "ci-butactuallyin-space-runner") @@ -510,3 +490,60 @@ async fn main() {          }      }  } + +mod host_info { +    use crate::protocol::{CpuInfo, EnvInfo, HostInfo, MemoryInfo}; + +    // get host model name, microcode, and how many cores +    fn collect_cpu_info() -> CpuInfo { +        let cpu_lines: Vec<String> = std::fs::read_to_string("/proc/cpuinfo").unwrap().split("\n").map(|line| line.to_string()).collect(); +        let model_names: Vec<&String> = cpu_lines.iter().filter(|line| line.starts_with("model name")).collect(); +        let model = model_names.first().expect("can get model name").to_string().split(":").last().unwrap().trim().to_string(); +        let cores = model_names.len() as u32; +        let microcode = cpu_lines.iter().find(|line| line.starts_with("microcode")).expect("microcode line is present").split(":").last().unwrap().trim().to_string(); + +        CpuInfo { model, microcode, cores } +    } + +    fn collect_mem_info() -> MemoryInfo { +        let mem_lines: Vec<String> = std::fs::read_to_string("/proc/meminfo").unwrap().split("\n").map(|line| line.to_string()).collect(); +        let total = mem_lines[0].split(":").last().unwrap().trim().to_string(); +        let available = mem_lines[2].split(":").last().unwrap().trim().to_string(); + +        MemoryInfo { total, available } +    } + +    fn hostname() -> String { +        let mut bytes = [0u8; 4096]; +        let res = unsafe { +            libc::gethostname(bytes.as_mut_ptr() as *mut i8, bytes.len()) +        }; +        if res != 0 { +            panic!("gethostname failed {:?}", res); +        } +        let end = bytes.iter().position(|b| *b == 0).expect("hostname is null-terminated"); +        std::ffi::CStr::from_bytes_with_nul(&bytes[..end+1]).expect("null-terminated string").to_str().expect("is utf-8").to_string() +    } + +    pub fn collect_env_info() -> EnvInfo { +        EnvInfo { +            arch: std::env::consts::ARCH.to_string(), +            family: std::env::consts::FAMILY.to_string(), +            os: std::env::consts::OS.to_string(), +        } +    } + +    pub fn collect_host_info() -> HostInfo { +        let cpu_info = collect_cpu_info(); +        let memory_info = collect_mem_info(); +//        let hostname = hostname(); +        let env_info = collect_env_info(); + +        HostInfo { +            cpu_info, +            memory_info, +            env_info, +        } +    } +} + diff --git a/src/protocol.rs b/src/protocol.rs new file mode 100644 index 0000000..829dde1 --- /dev/null +++ b/src/protocol.rs @@ -0,0 +1,106 @@ +use serde::{Serialize, Deserialize}; + +#[derive(Serialize, Deserialize, Debug)] +#[serde(tag = "kind")] +#[serde(rename_all = "snake_case")] +pub enum ClientProto { +    Started, +    ArtifactCreate, +    NewTask(RequestedJob), +    NewTaskPlease { allowed_pushers: Option<Vec<String>>, host_info: HostInfo }, +    Metric { name: String, value: String }, +    Command(CommandInfo), +    TaskStatus(TaskInfo), +} + +#[derive(Serialize, Deserialize, Debug)] +#[serde(tag = "kind")] +#[serde(rename_all = "snake_case")] +pub enum CommandInfo { +    Started { command: Vec<String>, cwd: Option<String>, id: u32 }, +    Finished { exit_code: Option<i32>, id: u32 }, +} + +#[derive(Serialize, Deserialize, Debug)] +#[serde(tag = "kind")] +#[serde(rename_all = "snake_case")] +pub enum TaskInfo { +    Finished { status: String }, +    Interrupted { status: String, description: Option<String> }, +} + +impl ClientProto { +    pub fn metric(name: impl Into<String>, value: impl Into<String>) -> Self { +        ClientProto::Metric { name: name.into(), value: value.into() } +    } + +    pub fn command(state: CommandInfo) -> Self { +        ClientProto::Command(state) +    } + +    pub fn new_task_please(allowed_pushers: Option<Vec<String>>, host_info: HostInfo) -> Self { +        ClientProto::NewTaskPlease { allowed_pushers, host_info } +    } + +    pub fn task_status(state: TaskInfo) -> Self { +        ClientProto::TaskStatus(state) +    } + +    pub fn new_task(task: RequestedJob) -> Self { +        ClientProto::NewTask(task) +    } +} + +impl CommandInfo { +    pub fn started(command: impl Into<Vec<String>>, cwd: Option<&str>, id: u32) -> Self { +        CommandInfo::Started { command: command.into(), cwd: cwd.map(ToOwned::to_owned), id } +    } + +    pub fn finished(exit_code: Option<i32>, id: u32) -> Self { +        CommandInfo::Finished { exit_code, id } +    } +} + +impl TaskInfo { +    pub fn finished(status: impl Into<String>) -> Self { +        TaskInfo::Finished { status: status.into() } +    } + +    pub fn interrupted(status: impl Into<String>, description: impl Into<Option<String>>) -> Self { +        TaskInfo::Interrupted { status: status.into(), description: description.into() } +    } +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct HostInfo { +    pub cpu_info: CpuInfo, +    pub memory_info: MemoryInfo, +    pub env_info: EnvInfo, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct CpuInfo { +    pub model: String, +    pub microcode: String, +    pub cores: u32, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct MemoryInfo { +    pub total: String, +    pub available: String, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct EnvInfo { +    pub arch: String, +    pub family: String, +    pub os: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RequestedJob { +    pub commit: String, +    pub remote_url: String, +    pub build_token: String, +} | 
