summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/ci_driver.rs86
-rw-r--r--src/ci_runner.rs155
-rw-r--r--src/protocol.rs106
3 files changed, 245 insertions, 102 deletions
diff --git a/src/ci_driver.rs b/src/ci_driver.rs
index fca1e6e..710272d 100644
--- a/src/ci_driver.rs
+++ b/src/ci_driver.rs
@@ -28,10 +28,12 @@ mod dbctx;
mod sql;
mod notifier;
mod io;
+mod protocol;
use crate::dbctx::{DbCtx, PendingRun, Job, Run};
use crate::sql::JobResult;
use crate::sql::RunState;
+use crate::protocol::{ClientProto, CommandInfo, TaskInfo, RequestedJob};
lazy_static! {
static ref AUTH_SECRET: RwLock<Option<String>> = RwLock::new(None);
@@ -151,7 +153,7 @@ impl ClientJob {
pub async fn run(&mut self) {
loop {
eprintln!("waiting on response..");
- let msg = match self.client.recv().await.expect("recv works") {
+ let msg = match self.client.recv_typed::<ClientProto>().await.expect("recv works") {
Some(msg) => msg,
None => {
eprintln!("client hung up. task's done, i hope?");
@@ -159,38 +161,30 @@ impl ClientJob {
}
};
eprintln!("got {:?}", msg);
- let msg_kind = msg.as_object().unwrap().get("kind").unwrap().as_str().unwrap();
- match msg_kind {
- "new_task_please" => {
+ match msg {
+ ClientProto::NewTaskPlease { allowed_pushers, host_info } => {
eprintln!("misdirected task request (after handshake?)");
return;
- },
- "task_status" => {
- let state = msg.as_object().unwrap().get("state").unwrap().as_str().unwrap();
- let (result, state): (Result<String, String>, RunState) = if state == "finished" {
- let result = msg.as_object().unwrap().get("result").unwrap().as_str().unwrap();
- eprintln!("task update: state is {} and result is {}", state, result);
- match result {
- "pass" => {
- (Ok("success".to_string()), RunState::Finished)
- },
- other => {
- let desc = msg.as_object().unwrap().get("desc")
- .map(|x| x.as_str().unwrap().to_string())
- .unwrap_or_else(|| other.to_string());
- (Err(desc), RunState::Error)
+ }
+ ClientProto::TaskStatus(task_info) => {
+ let (result, state): (Result<String, String>, RunState) = match task_info {
+ TaskInfo::Finished { status } => {
+ eprintln!("task update: state is finished and result is {}", status);
+ match status.as_str() {
+ "pass" => {
+ (Ok("success".to_string()), RunState::Finished)
+ },
+ other => {
+ eprintln!("unhandled task completion status: {}", other);
+ (Err(other.to_string()), RunState::Error)
+ }
}
+ },
+ TaskInfo::Interrupted { status, description } => {
+ eprintln!("task update: state is interrupted and result is {}", status);
+ let desc = description.unwrap_or_else(|| status.clone());
+ (Err(desc), RunState::Error)
}
- } else if state == "interrupted" {
- let result = msg.as_object().unwrap().get("result").unwrap().as_str().unwrap();
- eprintln!("task update: state is {} and result is {}", state, result);
- let desc = msg.as_object().unwrap().get("desc")
- .map(|x| x.as_str().unwrap().to_string())
- .unwrap_or_else(|| result.to_string());
- (Err(desc), RunState::Error)
- } else {
- eprintln!("task update: state is {}", state);
- (Err(format!("atypical completion status: {}", state)), RunState::Invalid)
};
let job = self.dbctx.job_by_id(self.task.job_id).expect("can query").expect("job exists");
@@ -223,26 +217,23 @@ impl ClientJob {
)
.expect("can update");
}
- "artifact_create" => {
- eprintln!("creating artifact {:?}", msg);
+ ClientProto::ArtifactCreate => {
+ eprintln!("creating artifact");
self.client.send(serde_json::json!({
"status": "ok",
"object_id": "10",
})).await.unwrap();
},
- "metric" => {
- let name = msg.as_object().unwrap().get("name").unwrap().as_str().unwrap();
- let value = msg.as_object().unwrap().get("value").unwrap().as_str().unwrap();
-
- self.dbctx.insert_metric(self.task.id, name, value)
+ ClientProto::Metric { name, value } => {
+ self.dbctx.insert_metric(self.task.id, &name, &value)
.expect("TODO handle metric insert error?");
}
- "command" => {
+ ClientProto::Command(_command_info) => {
// record information about commands, start/stop, etc. probably also allow
// artifacts to be attached to commands and default to attaching stdout/stderr?
}
other => {
- eprintln!("unhandled message kind {:?} ({:?})", msg_kind, msg);
+ eprintln!("unhandled message {:?}", other);
}
}
}
@@ -264,7 +255,11 @@ impl RunnerClient {
}
async fn send(&mut self, msg: serde_json::Value) -> Result<(), String> {
- self.tx.send(Ok(serde_json::to_string(&msg).unwrap()))
+ self.send_typed(&msg).await
+ }
+
+ async fn send_typed<T: serde::Serialize>(&mut self, msg: &T) -> Result<(), String> {
+ self.tx.send(Ok(serde_json::to_string(msg).unwrap()))
.await
.map_err(|e| e.to_string())
}
@@ -286,6 +281,11 @@ impl RunnerClient {
}
}
+ async fn recv_typed<T: serde::de::DeserializeOwned>(&mut self) -> Result<Option<T>, String> {
+ let json = self.recv().await?;
+ Ok(json.map(|v| serde_json::from_value(v).unwrap()))
+ }
+
// is this client willing to run the job based on what it has told us so far?
fn will_accept(&self, job: &Job) -> bool {
match (job.source.as_ref(), self.accepted_sources.as_ref()) {
@@ -298,10 +298,10 @@ impl RunnerClient {
}
async fn submit(mut self, dbctx: &Arc<DbCtx>, job: &PendingRun, remote_git_url: &str, sha: &str) -> Result<Option<ClientJob>, String> {
- self.send(serde_json::json!({
- "commit": sha,
- "remote_url": remote_git_url,
- "build_token": &self.build_token,
+ self.send_typed(&ClientProto::new_task(RequestedJob {
+ commit: sha.to_string(),
+ remote_url: remote_git_url.to_string(),
+ build_token: self.build_token.to_string(),
})).await?;
match self.recv().await {
Ok(Some(resp)) => {
diff --git a/src/ci_runner.rs b/src/ci_runner.rs
index 3ecacdb..11d8566 100644
--- a/src/ci_runner.rs
+++ b/src/ci_runner.rs
@@ -13,10 +13,12 @@ use std::task::{Context, Poll};
use std::pin::Pin;
use std::marker::Unpin;
+mod protocol;
mod lua;
mod io;
use crate::io::ArtifactStream;
+use crate::protocol::{ClientProto, CommandInfo, TaskInfo, RequestedJob};
#[derive(Debug)]
enum WorkAcquireError {
@@ -33,13 +35,6 @@ struct RunnerClient {
current_job: Option<RequestedJob>,
}
-#[derive(Debug, Serialize, Deserialize)]
-struct RequestedJob {
- commit: String,
- remote_url: String,
- build_token: String,
-}
-
impl RequestedJob {
pub fn into_running(self, client: RunnerClient) -> RunningJob {
RunningJob {
@@ -116,11 +111,8 @@ impl StepTracker {
impl RunningJob {
async fn send_metric(&mut self, name: &str, value: String) -> Result<(), String> {
- self.client.send(serde_json::json!({
- "kind": "metric",
- "name": name,
- "value": value,
- })).await
+ self.client.send_typed(&ClientProto::metric(name, value))
+ .await
.map_err(|e| format!("failed to send metric {}: {:?})", name, e))
}
@@ -224,9 +216,7 @@ impl RunningJob {
}
async fn run(mut self) {
- self.client.send(serde_json::json!({
- "status": "started"
- })).await.unwrap();
+ self.client.send_typed(&ClientProto::Started).await.unwrap();
std::fs::remove_dir_all("tmpdir").unwrap();
std::fs::create_dir("tmpdir").unwrap();
@@ -237,14 +227,10 @@ impl RunningJob {
if let Err(e) = checkout_res {
let status = "bad_ref";
- let status = serde_json::json!({
- "kind": "task_status",
- "state": "finished",
- "result": status,
- });
- eprintln!("checkout failed, reporting status: {}", status);
-
- let res = ctx.lock().unwrap().client.send(status).await;
+ let status = ClientProto::task_status(TaskInfo::finished(status));
+ eprintln!("checkout failed, reporting status: {:?}", status);
+
+ let res = ctx.lock().unwrap().client.send_typed(&status).await;
if let Err(e) = res {
eprintln!("[!] FAILED TO REPORT JOB STATUS ({}): {:?}", "success", e);
}
@@ -285,14 +271,10 @@ impl RunningJob {
match res {
Ok(status) => {
eprintln!("[+] job success!");
- let status = serde_json::json!({
- "kind": "task_status",
- "state": "finished",
- "result": status
- });
- eprintln!("reporting status: {}", status);
-
- let res = ctx.lock().unwrap().client.send(status).await;
+ let status = ClientProto::task_status(TaskInfo::finished(status));
+ eprintln!("reporting status: {:?}", status);
+
+ let res = ctx.lock().unwrap().client.send_typed(&status).await;
if let Err(e) = res {
eprintln!("[!] FAILED TO REPORT JOB STATUS ({}): {:?}", "success", e);
}
@@ -300,27 +282,18 @@ impl RunningJob {
Err((status, lua_err)) => {
eprintln!("[-] job error: {}", status);
- let res = ctx.lock().unwrap().client.send(serde_json::json!({
- "kind": "task_status",
- "state": "interrupted",
- "result": status,
- "desc": lua_err.to_string(),
- })).await;
+ let status = ClientProto::task_status(TaskInfo::interrupted(status, lua_err.to_string()));
+ let res = ctx.lock().unwrap().client.send_typed(&status).await;
if let Err(e) = res {
- eprintln!("[!] FAILED TO REPORT JOB STATUS ({}): {:?}", status, e);
+ eprintln!("[!] FAILED TO REPORT JOB STATUS ({:?}): {:?}", status, e);
}
}
}
}
async fn run_command(&mut self, command: &[String], working_dir: Option<&str>) -> Result<(), String> {
- self.client.send(serde_json::json!({
- "kind": "command",
- "state": "started",
- "command": command,
- "cwd": working_dir,
- "id": 1,
- })).await.unwrap();
+ self.client.send_typed(&ClientProto::command(CommandInfo::started(command, working_dir, 1)))
+ .await.unwrap();
let mut cmd = Command::new(&command[0]);
let cwd = match working_dir {
@@ -339,12 +312,8 @@ impl RunningJob {
let cmd_res = self.execute_command(cmd, &format!("{} log", human_name), &human_name).await?;
- self.client.send(serde_json::json!({
- "kind": "command",
- "state": "finished",
- "exit_code": cmd_res.code(),
- "id": 1,
- })).await.unwrap();
+ self.client.send_typed(&ClientProto::command(CommandInfo::finished(cmd_res.code(), 1)))
+ .await.unwrap();
if !cmd_res.success() {
@@ -383,11 +352,15 @@ impl RunnerClient {
match self.rx.chunk().await {
Ok(Some(chunk)) => {
eprintln!("got chunk: {:?}", &chunk);
- serde_json::from_slice(&chunk)
- .map(Option::Some)
+ let proto_message: ClientProto = serde_json::from_slice(&chunk)
.map_err(|e| {
WorkAcquireError::Protocol(format!("not json: {:?}", e))
- })
+ })?;
+ if let ClientProto::NewTask(new_task) = proto_message {
+ Ok(Some(new_task))
+ } else {
+ Err(WorkAcquireError::Protocol(format!("unexpected message: {:?}", proto_message)))
+ }
}
Ok(None) => {
Ok(None)
@@ -419,8 +392,12 @@ impl RunnerClient {
}
async fn send(&mut self, value: serde_json::Value) -> Result<(), String> {
+ self.send_typed(&value).await
+ }
+
+ async fn send_typed<T: Serialize>(&mut self, t: &T) -> Result<(), String> {
self.tx.send_data(
- serde_json::to_vec(&value)
+ serde_json::to_vec(t)
.map_err(|e| format!("json error: {:?}", e))?
.into()
).await
@@ -448,13 +425,16 @@ async fn main() {
.build()
.expect("can build client");
+ let host_info = host_info::collect_host_info();
+ eprintln!("host info: {:?}", host_info);
+
loop {
let (mut sender, body) = hyper::Body::channel();
- sender.send_data(serde_json::to_string(&json!({
- "kind": "new_job_please",
- "accepted_pushers": &runner_config.allowed_pushers,
- })).unwrap().into()).await.expect("req");
+ sender.send_data(serde_json::to_string(&ClientProto::new_task_please(
+ runner_config.allowed_pushers.clone(),
+ host_info.clone(),
+ )).unwrap().into()).await.expect("req");
let poll = client.post("https://ci.butactuallyin.space:9876/api/next_job")
.header("user-agent", "ci-butactuallyin-space-runner")
@@ -510,3 +490,60 @@ async fn main() {
}
}
}
+
+mod host_info {
+ use crate::protocol::{CpuInfo, EnvInfo, HostInfo, MemoryInfo};
+
+ // get host model name, microcode, and how many cores
+ fn collect_cpu_info() -> CpuInfo {
+ let cpu_lines: Vec<String> = std::fs::read_to_string("/proc/cpuinfo").unwrap().split("\n").map(|line| line.to_string()).collect();
+ let model_names: Vec<&String> = cpu_lines.iter().filter(|line| line.starts_with("model name")).collect();
+ let model = model_names.first().expect("can get model name").to_string().split(":").last().unwrap().trim().to_string();
+ let cores = model_names.len() as u32;
+ let microcode = cpu_lines.iter().find(|line| line.starts_with("microcode")).expect("microcode line is present").split(":").last().unwrap().trim().to_string();
+
+ CpuInfo { model, microcode, cores }
+ }
+
+ fn collect_mem_info() -> MemoryInfo {
+ let mem_lines: Vec<String> = std::fs::read_to_string("/proc/meminfo").unwrap().split("\n").map(|line| line.to_string()).collect();
+ let total = mem_lines[0].split(":").last().unwrap().trim().to_string();
+ let available = mem_lines[2].split(":").last().unwrap().trim().to_string();
+
+ MemoryInfo { total, available }
+ }
+
+ fn hostname() -> String {
+ let mut bytes = [0u8; 4096];
+ let res = unsafe {
+ libc::gethostname(bytes.as_mut_ptr() as *mut i8, bytes.len())
+ };
+ if res != 0 {
+ panic!("gethostname failed {:?}", res);
+ }
+ let end = bytes.iter().position(|b| *b == 0).expect("hostname is null-terminated");
+ std::ffi::CStr::from_bytes_with_nul(&bytes[..end+1]).expect("null-terminated string").to_str().expect("is utf-8").to_string()
+ }
+
+ pub fn collect_env_info() -> EnvInfo {
+ EnvInfo {
+ arch: std::env::consts::ARCH.to_string(),
+ family: std::env::consts::FAMILY.to_string(),
+ os: std::env::consts::OS.to_string(),
+ }
+ }
+
+ pub fn collect_host_info() -> HostInfo {
+ let cpu_info = collect_cpu_info();
+ let memory_info = collect_mem_info();
+// let hostname = hostname();
+ let env_info = collect_env_info();
+
+ HostInfo {
+ cpu_info,
+ memory_info,
+ env_info,
+ }
+ }
+}
+
diff --git a/src/protocol.rs b/src/protocol.rs
new file mode 100644
index 0000000..829dde1
--- /dev/null
+++ b/src/protocol.rs
@@ -0,0 +1,106 @@
+use serde::{Serialize, Deserialize};
+
+#[derive(Serialize, Deserialize, Debug)]
+#[serde(tag = "kind")]
+#[serde(rename_all = "snake_case")]
+pub enum ClientProto {
+ Started,
+ ArtifactCreate,
+ NewTask(RequestedJob),
+ NewTaskPlease { allowed_pushers: Option<Vec<String>>, host_info: HostInfo },
+ Metric { name: String, value: String },
+ Command(CommandInfo),
+ TaskStatus(TaskInfo),
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+#[serde(tag = "kind")]
+#[serde(rename_all = "snake_case")]
+pub enum CommandInfo {
+ Started { command: Vec<String>, cwd: Option<String>, id: u32 },
+ Finished { exit_code: Option<i32>, id: u32 },
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+#[serde(tag = "kind")]
+#[serde(rename_all = "snake_case")]
+pub enum TaskInfo {
+ Finished { status: String },
+ Interrupted { status: String, description: Option<String> },
+}
+
+impl ClientProto {
+ pub fn metric(name: impl Into<String>, value: impl Into<String>) -> Self {
+ ClientProto::Metric { name: name.into(), value: value.into() }
+ }
+
+ pub fn command(state: CommandInfo) -> Self {
+ ClientProto::Command(state)
+ }
+
+ pub fn new_task_please(allowed_pushers: Option<Vec<String>>, host_info: HostInfo) -> Self {
+ ClientProto::NewTaskPlease { allowed_pushers, host_info }
+ }
+
+ pub fn task_status(state: TaskInfo) -> Self {
+ ClientProto::TaskStatus(state)
+ }
+
+ pub fn new_task(task: RequestedJob) -> Self {
+ ClientProto::NewTask(task)
+ }
+}
+
+impl CommandInfo {
+ pub fn started(command: impl Into<Vec<String>>, cwd: Option<&str>, id: u32) -> Self {
+ CommandInfo::Started { command: command.into(), cwd: cwd.map(ToOwned::to_owned), id }
+ }
+
+ pub fn finished(exit_code: Option<i32>, id: u32) -> Self {
+ CommandInfo::Finished { exit_code, id }
+ }
+}
+
+impl TaskInfo {
+ pub fn finished(status: impl Into<String>) -> Self {
+ TaskInfo::Finished { status: status.into() }
+ }
+
+ pub fn interrupted(status: impl Into<String>, description: impl Into<Option<String>>) -> Self {
+ TaskInfo::Interrupted { status: status.into(), description: description.into() }
+ }
+}
+
+#[derive(Serialize, Deserialize, Debug, Clone)]
+pub struct HostInfo {
+ pub cpu_info: CpuInfo,
+ pub memory_info: MemoryInfo,
+ pub env_info: EnvInfo,
+}
+
+#[derive(Serialize, Deserialize, Debug, Clone)]
+pub struct CpuInfo {
+ pub model: String,
+ pub microcode: String,
+ pub cores: u32,
+}
+
+#[derive(Serialize, Deserialize, Debug, Clone)]
+pub struct MemoryInfo {
+ pub total: String,
+ pub available: String,
+}
+
+#[derive(Serialize, Deserialize, Debug, Clone)]
+pub struct EnvInfo {
+ pub arch: String,
+ pub family: String,
+ pub os: String,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct RequestedJob {
+ pub commit: String,
+ pub remote_url: String,
+ pub build_token: String,
+}