From 9e6906c00c49186189d211dc96e132d85e7ff641 Mon Sep 17 00:00:00 2001 From: iximeow Date: Thu, 13 Jul 2023 00:51:51 -0700 Subject: reorganize the whole thing into crates/ packages --- ci-runner/src/lua/mod.rs | 411 +++++++++++++++++++++++++++++ ci-runner/src/main.rs | 668 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 1079 insertions(+) create mode 100644 ci-runner/src/lua/mod.rs create mode 100644 ci-runner/src/main.rs (limited to 'ci-runner/src') diff --git a/ci-runner/src/lua/mod.rs b/ci-runner/src/lua/mod.rs new file mode 100644 index 0000000..62ac68b --- /dev/null +++ b/ci-runner/src/lua/mod.rs @@ -0,0 +1,411 @@ +use crate::RunningJob; + +use rlua::prelude::*; + +use std::sync::{Arc, Mutex}; +use std::path::PathBuf; + +pub const DEFAULT_RUST_GOODFILE: &'static [u8] = include_bytes!("../../../config/goodfiles/rust.lua"); + +pub struct BuildEnv { + lua: Lua, + job: Arc>, +} + +#[derive(Debug)] +pub struct RunParams { + step: Option, + name: Option, + cwd: Option, +} + +pub struct CommandOutput { + pub exit_status: std::process::ExitStatus, + pub stdout: Vec, + pub stderr: Vec, +} + +mod lua_exports { + use crate::RunningJob; + use crate::lua::{CommandOutput, RunParams}; + + use std::sync::{Arc, Mutex}; + use std::path::PathBuf; + + use rlua::prelude::*; + + pub fn collect_build_args(command: LuaValue, params: LuaValue) -> Result<(Vec, RunParams), rlua::Error> { + let args = match command { + LuaValue::Table(table) => { + let len = table.len().expect("command table has a length"); + let mut command_args = Vec::new(); + for i in 0..len { + let value = table.get(i + 1).expect("command arg is gettble"); + match value { + LuaValue::String(s) => { + command_args.push(s.to_str().unwrap().to_owned()); + }, + other => { + return Err(LuaError::RuntimeError(format!("argument {} was not a string, was {:?}", i, other))); + } + }; + } + + command_args + }, + other => { + return Err(LuaError::RuntimeError(format!("argument 1 was not a table: {:?}", other))); + } + }; + + let params = match params { + LuaValue::Table(table) => { + let step = match table.get("step").expect("can get from table") { + LuaValue::String(v) => { + Some(v.to_str()?.to_owned()) + }, + LuaValue::Nil => { + None + }, + other => { + return Err(LuaError::RuntimeError(format!("params[\"step\"] must be a string"))); + } + }; + let name = match table.get("name").expect("can get from table") { + LuaValue::String(v) => { + Some(v.to_str()?.to_owned()) + }, + LuaValue::Nil => { + None + }, + other => { + return Err(LuaError::RuntimeError(format!("params[\"name\"] must be a string"))); + } + }; + let cwd = match table.get("cwd").expect("can get from table") { + LuaValue::String(v) => { + Some(v.to_str()?.to_owned()) + }, + LuaValue::Nil => { + None + }, + other => { + return Err(LuaError::RuntimeError(format!("params[\"cwd\"] must be a string"))); + } + }; + + RunParams { + step, + name, + cwd, + } + }, + LuaValue::Nil => { + RunParams { + step: None, + name: None, + cwd: None, + } + } + other => { + return Err(LuaError::RuntimeError(format!("argument 2 was not a table: {:?}", other))); + } + }; + + Ok((args, params)) + } + + pub fn build_command_impl(command: LuaValue, params: LuaValue, job_ctx: Arc>) -> Result<(), rlua::Error> { + let (args, params) = collect_build_args(command, params)?; + eprintln!("args: {:?}", args); + eprintln!(" params: {:?}", params); + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + rt.block_on(async move { + job_ctx.lock().unwrap().run_command(&args, params.cwd.as_ref().map(|x| x.as_str())).await + .map_err(|e| LuaError::RuntimeError(format!("run_command error: {:?}", e))) + }) + } + + pub fn check_output_impl<'lua>(ctx: rlua::Context<'lua>, command: LuaValue<'lua>, params: LuaValue<'lua>, job_ctx: Arc>) -> Result, rlua::Error> { + let (args, params) = collect_build_args(command, params)?; + eprintln!("args: {:?}", args); + eprintln!(" params: {:?}", params); + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + let command_output = rt.block_on(async move { + job_ctx.lock().unwrap().run_with_output(&args, params.cwd.as_ref().map(|x| x.as_str())).await + .map_err(|e| LuaError::RuntimeError(format!("run_command error: {:?}", e))) + })?; + + let stdout = ctx.create_string(command_output.stdout.as_slice())?; + let stderr = ctx.create_string(command_output.stderr.as_slice())?; + + let result = ctx.create_table()?; + result.set("stdout", stdout)?; + result.set("stderr", stderr)?; + result.set("status", command_output.exit_status.code())?; + Ok(result) + } + + pub fn check_dependencies(commands: Vec) -> Result<(), rlua::Error> { + let mut missing_deps = Vec::new(); + for command in commands.iter() { + if !has_cmd(command)? { + missing_deps.push(command.clone()); + } + } + + if missing_deps.len() > 0 { + return Err(LuaError::RuntimeError(format!("missing dependencies: {}", missing_deps.join(", ")))); + } + + Ok(()) + } + + pub fn metric(name: String, value: String, job_ctx: Arc>) -> Result<(), rlua::Error> { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + rt.block_on(async move { + job_ctx.lock().unwrap().send_metric(&name, value).await + .map_err(|e| LuaError::RuntimeError(format!("send_metric error: {:?}", e))) + }) + } + + pub fn artifact(path: String, name: Option, job_ctx: Arc>) -> Result<(), rlua::Error> { + let path: PathBuf = path.into(); + + let default_name: String = match (path.file_name(), path.parent()) { + (Some(name), _) => name + .to_str() + .ok_or(LuaError::RuntimeError("artifact name is not a unicode string".to_string()))? + .to_string(), + (_, Some(parent)) => format!("{}", parent.display()), + (None, None) => { + // one day using directories for artifacts might work but / is still not going + // to be accepted + return Err(LuaError::RuntimeError(format!("cannot infer a default path name for {}", path.display()))); + } + }; + + let name: String = match name { + Some(name) => name, + None => default_name, + }; + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + rt.block_on(async move { + let mut artifact = job_ctx.lock().unwrap().create_artifact(&name, &format!("{} (from {})", name, path.display())).await + .map_err(|e| LuaError::RuntimeError(format!("create_artifact error: {:?}", e))) + .unwrap(); + let mut file = tokio::fs::File::open(&format!("tmpdir/{}", path.display())).await.unwrap(); + eprintln!("uploading..."); + crate::io::forward_data(&mut file, &mut artifact).await + .map_err(|e| LuaError::RuntimeError(format!("failed uploading data for {}: {:?}", name, e)))?; + std::mem::drop(artifact); + Ok(()) + }) + } + + pub fn has_cmd(name: &str) -> Result { + Ok(std::process::Command::new("which") + .arg(name) + .status() + .map_err(|e| LuaError::RuntimeError(format!("could not fork which? {:?}", e)))? + .success()) + } + + pub fn file_size(path: &str) -> Result { + Ok(std::fs::metadata(&format!("tmpdir/{}", path)) + .map_err(|e| LuaError::RuntimeError(format!("could not stat {:?}", path)))? + .len()) + } + + pub mod step { + use crate::RunningJob; + use std::sync::{Arc, Mutex}; + + pub fn start(job_ref: Arc>, name: String) -> Result<(), rlua::Error> { + let mut job = job_ref.lock().unwrap(); + job.current_step.clear(); + job.current_step.push(name); + Ok(()) + } + + pub fn push(job_ref: Arc>, name: String) -> Result<(), rlua::Error> { + let mut job = job_ref.lock().unwrap(); + job.current_step.push(name); + Ok(()) + } + + pub fn advance(job_ref: Arc>, name: String) -> Result<(), rlua::Error> { + let mut job = job_ref.lock().unwrap(); + job.current_step.pop(); + job.current_step.push(name); + Ok(()) + } + } +} + +struct DeclEnv<'lua, 'env> { + lua_ctx: &'env rlua::Context<'lua>, + job_ref: &'env Arc>, +} +impl<'lua, 'env> DeclEnv<'lua, 'env> { + fn create_function(&self, name: &str, f: F) -> Result, String> + where + A: FromLuaMulti<'lua>, + R: ToLuaMulti<'lua>, + F: 'static + Send + Fn(rlua::Context<'lua>, Arc>, A) -> Result { + + let job_ref = Arc::clone(self.job_ref); + self.lua_ctx.create_function(move |ctx, args| { + let job_ref = Arc::clone(&job_ref); + f(ctx, job_ref, args) + }) + .map_err(|e| format!("problem defining {} function: {:?}", name, e)) + } +} + +impl BuildEnv { + pub fn new(job: &Arc>) -> Self { + let env = BuildEnv { + lua: Lua::new(), + job: Arc::clone(job), + }; + env.lua.context(|lua_ctx| { + env.define_env(lua_ctx) + }).expect("can define context"); + env + } + + fn define_env(&self, lua_ctx: rlua::Context) -> Result<(), String> { + let decl_env = DeclEnv { + lua_ctx: &lua_ctx, + job_ref: &self.job, + }; + + let hello = decl_env.create_function("hello", |_, _, ()| { + eprintln!("hello from lua!!!"); + Ok(()) + })?; + + let check_dependencies = decl_env.create_function("dependencies", move |_, job_ref, commands: Vec| { + lua_exports::check_dependencies(commands) + })?; + + let build = decl_env.create_function("build", move |_, job_ref, (command, params): (LuaValue, LuaValue)| { + lua_exports::build_command_impl(command, params, job_ref) + })?; + + let check_output = decl_env.create_function("check_output", move |ctx, job_ref, (command, params): (LuaValue, LuaValue)| { + lua_exports::check_output_impl(ctx, command, params, job_ref) + })?; + + let metric = decl_env.create_function("metric", move |_, job_ref, (name, value): (String, String)| { + lua_exports::metric(name, value, job_ref) + })?; + + let now_ms = decl_env.create_function("now_ms", move |_, job_ref, ()| Ok(ci_lib_core::now_ms()))?; + + let artifact = decl_env.create_function("artifact", move |_, job_ref, (path, name): (String, Option)| { + lua_exports::artifact(path, name, job_ref) + })?; + + let error = decl_env.create_function("error", move |_, job_ref, msg: String| { + Err::<(), LuaError>(LuaError::RuntimeError(format!("explicit error: {}", msg))) + })?; + + let path_has_cmd = decl_env.create_function("path_has_cmd", move |_, job_ref, name: String| { + lua_exports::has_cmd(&name) + })?; + + let size_of_file = decl_env.create_function("size_of_file", move |_, job_ref, name: String| { + lua_exports::file_size(&name) + })?; + + let native_rust_triple = match std::env::consts::ARCH { + "x86_64" => "x86_64-unknown-linux-gnu", + "aarch64" => "aarch64-unknown-linux-gnu", + other => { panic!("dunno native rust triple for arch {}", other); } + }; + let native_rust_triple = lua_ctx.create_string(native_rust_triple).unwrap(); + let build_env_vars = lua_ctx.create_table_from( + vec![ + ("native_rust_triple", native_rust_triple) + ] + ).unwrap(); + + let build_environment = lua_ctx.create_table_from( + vec![ + ("has", path_has_cmd), + ("size", size_of_file), + ] + ).unwrap(); + build_environment.set("vars", build_env_vars).unwrap(); + + let build_functions = lua_ctx.create_table_from( + vec![ + ("hello", hello), + ("run", build), + ("dependencies", check_dependencies), + ("metric", metric), + ("error", error), + ("artifact", artifact), + ("now_ms", now_ms), + ("check_output", check_output), + ] + ).unwrap(); + build_functions.set("environment", build_environment).unwrap(); + let current_commit = self.job.lock().unwrap().job.commit.clone(); + build_functions.set("sha", lua_ctx.create_string(current_commit.as_bytes()).unwrap()).unwrap(); + let globals = lua_ctx.globals(); + globals.set("Build", build_functions).unwrap(); + + + let step_start = decl_env.create_function("step_start", move |_, job_ref, name: String| { + lua_exports::step::start(job_ref, name) + })?; + + let step_push = decl_env.create_function("step_push", move |_, job_ref, name: String| { + lua_exports::step::push(job_ref, name) + })?; + + let step_advance = decl_env.create_function("step_advance", move |_, job_ref, name: String| { + lua_exports::step::advance(job_ref, name) + })?; + + let step_functions = lua_ctx.create_table_from( + vec![ + ("start", step_start), + ("push", step_push), + ("advance", step_advance), + ] + ).unwrap(); + globals.set("Step", step_functions).unwrap(); + Ok(()) + } + + pub async fn run_build(self, script: &[u8]) -> Result<(), LuaError> { + let script = script.to_vec(); + let res: Result<(), LuaError> = tokio::task::spawn_blocking(|| { + std::thread::spawn(move || { + self.lua.context(|lua_ctx| { + lua_ctx.load(&script) + .set_name("goodfile")? + .exec() + }) + }).join().unwrap() + }).await.unwrap(); + eprintln!("lua res: {:?}", res); + res + } +} diff --git a/ci-runner/src/main.rs b/ci-runner/src/main.rs new file mode 100644 index 0000000..41f5594 --- /dev/null +++ b/ci-runner/src/main.rs @@ -0,0 +1,668 @@ +use std::time::Duration; +use std::os::unix::process::ExitStatusExt; +use rlua::prelude::LuaError; +use std::sync::{Arc, Mutex}; +use reqwest::{StatusCode, Response}; +use tokio::process::Command; +use std::process::Stdio; +use std::process::ExitStatus; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use serde_json::json; +use serde::{Deserialize, de::DeserializeOwned, Serialize}; +use std::task::{Context, Poll}; +use std::pin::Pin; +use std::marker::Unpin; + +use ci_lib_native::io; +use ci_lib_native::io::{ArtifactStream, VecSink}; +use ci_lib_core::protocol::{ClientProto, CommandInfo, TaskInfo, RequestedJob}; + +mod lua; + +use crate::lua::CommandOutput; + +#[derive(Debug)] +enum WorkAcquireError { + Reqwest(reqwest::Error), + EarlyEof, + Protocol(String), +} + +struct RunnerClient { + http: reqwest::Client, + host: String, + tx: hyper::body::Sender, + rx: Response, + current_job: Option, +} + +impl RunningJob { + fn from_job(job: RequestedJob, client: RunnerClient) -> Self { + Self { + job, + client, + current_step: StepTracker::new(), + } + } +} + +struct JobEnv { + lua: lua::BuildEnv, + job: Arc>, +} + +impl JobEnv { + fn new(job: &Arc>) -> Self { + let lua = lua::BuildEnv::new(job); + JobEnv { + lua, + job: Arc::clone(job) + } + } + + async fn default_goodfile(self) -> Result<(), LuaError> { + self.lua.run_build(crate::lua::DEFAULT_RUST_GOODFILE).await + } + + async fn exec_goodfile(self) -> Result<(), LuaError> { + let script = std::fs::read_to_string("./tmpdir/goodfile").unwrap(); + self.lua.run_build(script.as_bytes()).await + } +} + +pub struct RunningJob { + job: RequestedJob, + client: RunnerClient, + current_step: StepTracker, +} + +enum RepoError { + CloneFailedIdk { exit_code: ExitStatus }, + CheckoutFailedIdk { exit_code: ExitStatus }, + CheckoutFailedMissingRef, +} + +pub struct StepTracker { + scopes: Vec +} + +impl StepTracker { + pub fn new() -> Self { + StepTracker { + scopes: Vec::new() + } + } + + pub fn push(&mut self, name: String) { + self.scopes.push(name); + } + + pub fn pop(&mut self) { + self.scopes.pop(); + } + + pub fn clear(&mut self) { + self.scopes.clear(); + } + + pub fn full_step_path(&self) -> &[String] { + self.scopes.as_slice() + } +} + +impl RunningJob { + async fn send_metric(&mut self, name: &str, value: String) -> Result<(), String> { + self.client.send_typed(&ClientProto::metric(name, value)) + .await + .map_err(|e| format!("failed to send metric {}: {:?})", name, e)) + } + + // TODO: panics if hyper finds the channel is closed. hum + async fn create_artifact(&self, name: &str, desc: &str) -> Result { + let (mut sender, body) = hyper::Body::channel(); + let resp = self.client.http.post("https://ci.butactuallyin.space:9876/api/artifact") + .header("user-agent", "ci-butactuallyin-space-runner") + .header("x-task-token", &self.job.build_token) + .header("x-artifact-name", name) + .header("x-artifact-desc", desc) + .body(body) + .send() + .await + .map_err(|e| format!("unable to send request: {:?}", e))?; + + if resp.status() == StatusCode::OK { + eprintln!("[+] artifact '{}' started", name); + Ok(ArtifactStream::new(sender)) + } else { + Err(format!("[-] unable to create artifact: {:?}", resp)) + } + } + + async fn clone_remote(&self) -> Result<(), RepoError> { + let mut git_clone = Command::new("git"); + git_clone + .arg("clone") + .arg(&self.job.remote_url) + .arg("tmpdir"); + + let clone_res = self.execute_command_and_report(git_clone, "git clone log", &format!("git clone {} tmpdir", &self.job.remote_url)).await + .map_err(|e| { + eprintln!("stringy error (exec failed?) for clone: {}", e); + RepoError::CloneFailedIdk { exit_code: ExitStatus::from_raw(0) } + })?; + + if !clone_res.success() { + return Err(RepoError::CloneFailedIdk { exit_code: clone_res }); + } + + let mut git_checkout = Command::new("git"); + git_checkout + .current_dir("tmpdir") + .arg("checkout") + .arg(&self.job.commit); + + let checkout_res = self.execute_command_and_report(git_checkout, "git checkout log", &format!("git checkout {}", &self.job.commit)).await + .map_err(|e| { + eprintln!("stringy error (exec failed?) for checkout: {}", e); + RepoError::CheckoutFailedIdk { exit_code: ExitStatus::from_raw(0) } + })?; + + if !checkout_res.success() { + if checkout_res.code() == Some(128) { + return Err(RepoError::CheckoutFailedIdk { exit_code: checkout_res }); + } else { + return Err(RepoError::CheckoutFailedMissingRef); + } + } + + Ok(()) + } + + async fn execute_command_and_report(&self, mut command: Command, name: &str, desc: &str) -> Result { + let stdout_artifact = self.create_artifact( + &format!("{} (stdout)", name), + &format!("{} (stdout)", desc) + ).await.expect("works"); + let stderr_artifact = self.create_artifact( + &format!("{} (stderr)", name), + &format!("{} (stderr)", desc) + ).await.expect("works"); + + let exit_status = self.execute_command(command, name, desc, stdout_artifact, stderr_artifact).await?; + + Ok(exit_status) + } + + async fn execute_command_capture_output(&self, mut command: Command, name: &str, desc: &str) -> Result { + let stdout_collector = VecSink::new(); + let stderr_collector = VecSink::new(); + + let exit_status = self.execute_command(command, name, desc, stdout_collector.clone(), stderr_collector.clone()).await?; + + Ok(CommandOutput { + exit_status, + stdout: stdout_collector.take_buf(), + stderr: stderr_collector.take_buf(), + }) + } + + async fn execute_command(&self, mut command: Command, name: &str, desc: &str, mut stdout_reporter: impl AsyncWrite + Unpin + Send + 'static, mut stderr_reporter: impl AsyncWrite + Unpin + Send + 'static) -> Result { + eprintln!("[.] running {}", name); + + let mut child = command + .stdin(Stdio::null()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + .map_err(|e| format!("failed to spawn '{}', {:?}", name, e))?; + + let mut child_stdout = child.stdout.take().unwrap(); + let mut child_stderr = child.stderr.take().unwrap(); + + eprintln!("[.] '{}': forwarding stdout", name); + tokio::spawn(async move { io::forward_data(&mut child_stdout, &mut stdout_reporter).await }); + eprintln!("[.] '{}': forwarding stderr", name); + tokio::spawn(async move { io::forward_data(&mut child_stderr, &mut stderr_reporter).await }); + + let res = child.wait().await + .map_err(|e| format!("failed to wait? {:?}", e))?; + + if res.success() { + eprintln!("[+] '{}' success", name); + } else { + eprintln!("[-] '{}' fail: {:?}", name, res); + } + + Ok(res) + } + + async fn run(mut self) { + self.client.send_typed(&ClientProto::Started).await.unwrap(); + + std::fs::remove_dir_all("tmpdir").unwrap(); + std::fs::create_dir("tmpdir").unwrap(); + + let ctx = Arc::new(Mutex::new(self)); + + let checkout_res = ctx.lock().unwrap().clone_remote().await; + + if let Err(e) = checkout_res { + let status = "bad_ref"; + let status = ClientProto::task_status(TaskInfo::finished(status)); + eprintln!("checkout failed, reporting status: {:?}", status); + + let res = ctx.lock().unwrap().client.send_typed(&status).await; + if let Err(e) = res { + eprintln!("[!] FAILED TO REPORT JOB STATUS ({}): {:?}", "success", e); + } + + return; + } + + let lua_env = JobEnv::new(&ctx); + + let metadata = std::fs::metadata("./tmpdir/goodfile"); + let res: Result = match metadata { + Ok(_) => { + match lua_env.exec_goodfile().await { + Ok(()) => { + Ok("pass".to_string()) + }, + Err(lua_err) => { + Err(("failed".to_string(), lua_err.to_string())) + } + } + }, + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + match lua_env.default_goodfile().await { + Ok(()) => { + Ok("pass".to_string()) + }, + Err(lua_err) => { + Err(("failed".to_string(), lua_err.to_string())) + } + } + }, + Err(e) => { + eprintln!("[-] error finding goodfile: {:?}", e); + Err(("failed".to_string(), "inaccessible goodfile".to_string())) + } + }; + + match res { + Ok(status) => { + eprintln!("[+] job success!"); + let status = ClientProto::task_status(TaskInfo::finished(status)); + eprintln!("reporting status: {:?}", status); + + let res = ctx.lock().unwrap().client.send_typed(&status).await; + if let Err(e) = res { + eprintln!("[!] FAILED TO REPORT JOB STATUS ({}): {:?}", "success", e); + } + } + Err((status, lua_err)) => { + eprintln!("[-] job error: {}", status); + + let status = ClientProto::task_status(TaskInfo::interrupted(status, lua_err.to_string())); + let res = ctx.lock().unwrap().client.send_typed(&status).await; + if let Err(e) = res { + eprintln!("[!] FAILED TO REPORT JOB STATUS ({:?}): {:?}", status, e); + } + } + } + } + + fn prep_command(command: &[String], working_dir: Option<&str>) -> (Command, String) { + let mut cmd = Command::new(&command[0]); + let cwd = match working_dir { + Some(dir) => { + format!("tmpdir/{}", dir) + }, + None => { + "tmpdir".to_string() + } + }; + eprintln!("prepared {:?} to run in {}", &command, &cwd); + let human_name = command.join(" "); + cmd + .current_dir(cwd) + .args(&command[1..]); + (cmd, human_name) + } + + async fn run_with_output(&mut self, command: &[String], working_dir: Option<&str>) -> Result { + let (cmd, human_name) = Self::prep_command(command, working_dir); + + let cmd_res = self.execute_command_capture_output(cmd, &format!("{} log", human_name), &human_name).await?; + + if !cmd_res.exit_status.success() { + return Err(format!("{} failed: {:?}", &human_name, cmd_res.exit_status)); + } + Ok(cmd_res) + } + + async fn run_command(&mut self, command: &[String], working_dir: Option<&str>) -> Result<(), String> { + self.client.send_typed(&ClientProto::command(CommandInfo::started(command, working_dir, 1))) + .await.unwrap(); + + let (cmd, human_name) = Self::prep_command(command, working_dir); + + let cmd_res = self.execute_command_and_report(cmd, &format!("{} log", human_name), &human_name).await?; + + self.client.send_typed(&ClientProto::command(CommandInfo::finished(cmd_res.code(), 1))) + .await.unwrap(); + + + if !cmd_res.success() { + return Err(format!("{} failed: {:?}", &human_name, cmd_res)); + } + + Ok(()) + } +} + +impl RunnerClient { + async fn new(host: &str, mut sender: hyper::body::Sender, mut res: Response) -> Result { + if res.status() != StatusCode::OK { + return Err(format!("server returned a bad response: {:?}, response itself: {:?}", res.status(), res)); + } + + let hello = res.chunk().await.expect("chunk"); + if hello.as_ref().map(|x| &x[..]) != Some(b"hello") { + return Err(format!("bad hello: {:?}", hello)); + } + + Ok(Self { + http: reqwest::ClientBuilder::new() + .connect_timeout(Duration::from_millis(1000)) + .timeout(Duration::from_millis(600000)) + .build() + .expect("can build client"), + host: host.to_string(), + tx: sender, + rx: res, + current_job: None, + }) + } + + async fn wait_for_work(&mut self, accepted_pushers: Option<&[String]>) -> Result, WorkAcquireError> { + loop { + let message = self.recv_typed::().await; + match message { + Ok(Some(ClientProto::NewTask(new_task))) => { + return Ok(Some(new_task)); + }, + Ok(Some(ClientProto::Ping)) => { + self.send_typed(&ClientProto::Pong).await + .map_err(|e| WorkAcquireError::Protocol(format!("failed to pong: {}", e)))?; + }, + Ok(Some(other)) => { + return Err(WorkAcquireError::Protocol(format!("unexpected message: {:?}", other))); + }, + Ok(None) => { + return Ok(None); + }, + Err(e) => { + return Err(WorkAcquireError::Protocol(e)); + } + } + } + } + + async fn recv(&mut self) -> Result, String> { + self.recv_typed().await + } + + async fn recv_typed(&mut self) -> Result, String> { + match self.rx.chunk().await { + Ok(Some(chunk)) => { + serde_json::from_slice(&chunk) + .map(Option::Some) + .map_err(|e| { + format!("not json: {:?}", e) + }) + }, + Ok(None) => Ok(None), + Err(e) => { + Err(format!("error in recv: {:?}", e)) + } + } + } + + async fn send(&mut self, value: serde_json::Value) -> Result<(), String> { + self.send_typed(&value).await + } + + async fn send_typed(&mut self, t: &T) -> Result<(), String> { + self.tx.send_data( + serde_json::to_vec(t) + .map_err(|e| format!("json error: {:?}", e))? + .into() + ).await + .map_err(|e| format!("send error: {:?}", e)) + } +} + +#[derive(Deserialize, Serialize)] +struct RunnerConfig { + server_address: String, + auth_secret: String, + allowed_pushers: Option>, +} + +#[tokio::main] +async fn main() { + tracing_subscriber::fmt::init(); + let mut args = std::env::args(); + args.next().expect("first arg exists"); + let config_path = args.next().unwrap_or("./runner_config.json".to_string()); + let runner_config: RunnerConfig = serde_json::from_reader(std::fs::File::open(config_path).expect("file exists and is accessible")).expect("valid json for RunnerConfig"); + let client = reqwest::ClientBuilder::new() + .connect_timeout(Duration::from_millis(1000)) + .timeout(Duration::from_millis(600000)) + .build() + .expect("can build client"); + + let host_info = host_info::collect_host_info(); + eprintln!("host info: {:?}", host_info); + + loop { + let (mut sender, body) = hyper::Body::channel(); + + sender.send_data(serde_json::to_string(&ClientProto::new_task_please( + runner_config.allowed_pushers.clone(), + host_info.clone(), + )).unwrap().into()).await.expect("req"); + + let poll = client.post("https://ci.butactuallyin.space:9876/api/next_job") + .header("user-agent", "ci-butactuallyin-space-runner") + .header("authorization", runner_config.auth_secret.trim()) + .body(body) + .send() + .await; + + match poll { + Ok(mut res) => { + let mut client = match RunnerClient::new("ci.butactuallyin.space:9876", sender, res).await { + Ok(client) => client, + Err(e) => { + eprintln!("failed to initialize client: {:?}", e); + std::thread::sleep(Duration::from_millis(10000)); + continue; + } + }; + let job = match client.wait_for_work(runner_config.allowed_pushers.as_ref().map(|x| x.as_ref())).await { + Ok(Some(request)) => request, + Ok(None) => { + eprintln!("no work to do (yet)"); + std::thread::sleep(Duration::from_millis(2000)); + continue; + } + Err(e) => { + eprintln!("failed to get work: {:?}", e); + std::thread::sleep(Duration::from_millis(10000)); + continue; + } + }; + eprintln!("requested work: {:?}", job); + + eprintln!("doing {:?}", job); + + let mut job = RunningJob::from_job(job, client); + job.run().await; + std::thread::sleep(Duration::from_millis(10000)); + }, + Err(e) => { + let message = format!("{}", e); + + if message.contains("tcp connect error") { + eprintln!("could not reach server. sleeping a bit and retrying."); + std::thread::sleep(Duration::from_millis(5000)); + continue; + } + + eprintln!("unhandled error: {}", message); + + std::thread::sleep(Duration::from_millis(1000)); + } + } + } +} + +mod host_info { + use ci_lib_core::protocol::{CpuInfo, EnvInfo, HostInfo, MemoryInfo}; + + // get host model name, microcode, and how many cores + fn collect_cpu_info() -> CpuInfo { + fn find_line(lines: &[String], prefix: &str) -> String { + lines.iter() + .find(|line| line.starts_with(prefix)) + .expect(&format!("{} line is present", prefix)) + .split(":") + .last() + .unwrap() + .trim() + .to_string() + } + + /// try finding core `cpu`'s max frequency in khz. we'll assume this is the actual speed a + /// build would run at.. fingers crossed. + fn try_finding_cpu_freq(cpu: u32) -> Result { + if let Ok(freq_str) = std::fs::read_to_string("/sys/devices/system/cpu/cpu0/cpufreq/cpuinfo_max_freq") { + Ok(freq_str.trim().parse().unwrap()) + } else { + // so cpufreq probably isn't around, maybe /proc/cpuinfo's mhz figure is present? + let cpu_lines: Vec = std::fs::read_to_string("/proc/cpuinfo").unwrap().split("\n").map(|line| line.to_string()).collect(); + let cpu_mhzes: Vec<&String> = cpu_lines.iter().filter(|line| line.starts_with("cpu MHz")).collect(); + match cpu_mhzes.get(cpu as usize) { + Some(mhz) => { + let mut line_parts = cpu_mhzes[cpu as usize].split(":"); + let _ = line_parts.next(); + let mhz = line_parts.next().unwrap().trim(); + let mhz: f64 = mhz.parse().unwrap(); + Ok((mhz * 1000.0) as u64) + }, + None => { + panic!("could not get cpu freq either from cpufreq or /proc/cpuinfo?"); + } + } + } + } + + // we'll have to deploy one of a few techniques, because x86/x86_64 is internally + // consistent, but aarch64 is different. who knows what other CPUs think. + match std::env::consts::ARCH { + "x86" | "x86_64" => { + let cpu_lines: Vec = std::fs::read_to_string("/proc/cpuinfo").unwrap().split("\n").map(|line| line.to_string()).collect(); + let model_names: Vec<&String> = cpu_lines.iter().filter(|line| line.starts_with("model name")).collect(); + let cores = model_names.len() as u32; + let model_name = find_line(&cpu_lines, "model name"); + let vendor_id = find_line(&cpu_lines, "vendor_id"); + let family = find_line(&cpu_lines, "cpu family"); + let model = find_line(&cpu_lines, "model\t"); + let microcode = find_line(&cpu_lines, "microcode"); + let max_freq = try_finding_cpu_freq(0).unwrap(); + + CpuInfo { model_name, microcode, cores, vendor_id, family, model, max_freq } + } + "aarch64" => { + let cpu_lines: Vec = std::fs::read_to_string("/proc/cpuinfo").unwrap().split("\n").map(|line| line.to_string()).collect(); + let processors: Vec<&String> = cpu_lines.iter().filter(|line| line.starts_with("processor")).collect(); + let cores = processors.len() as u32; + + // alternate possible path: /sys/firmware/devicetree/base/compatible + let model_name = std::fs::read_to_string("/proc/device-tree/compatible").unwrap(); + let model_name = model_name.replace("\x00", ";"); + let vendor_id = find_line(&cpu_lines, "CPU implementer"); + let vendor_name = match vendor_id.as_str() { + "0x41" => "Arm Limited".to_string(), + "0x42" => "Broadcom Corporation".to_string(), + "0x43" => "Cavium Inc".to_string(), + "0x44" => "Digital Equipment Corporation".to_string(), + "0x46" => "Fujitsu Ltd".to_string(), + "0x49" => "Infineon Technologies AG".to_string(), + "0x4d" => "Motorola".to_string(), + "0x4e" => "NVIDIA Corporation".to_string(), + "0x50" => "Applied Micro Circuits Corporation".to_string(), + "0x51" => "Qualcomm Inc".to_string(), + "0x56" => "Marvell International Ltd".to_string(), + "0x69" => "Intel Corporation".to_string(), + "0xc0" => "Ampere Computing".to_string(), + other => format!("unknown aarch64 vendor {}", other), + }; + let family = find_line(&cpu_lines, "CPU architecture"); + let model = find_line(&cpu_lines, "CPU part"); + let microcode = String::new(); + let max_freq = std::fs::read_to_string("/sys/devices/system/cpu/cpu0/cpufreq/cpuinfo_max_freq").unwrap().trim().parse().unwrap(); + + CpuInfo { model_name, microcode, cores, vendor_id: vendor_name, family, model, max_freq } + } + other => { + panic!("dunno how to find cpu info for {}, panik", other); + } + } + } + + fn collect_mem_info() -> MemoryInfo { + let mem_lines: Vec = std::fs::read_to_string("/proc/meminfo").unwrap().split("\n").map(|line| line.to_string()).collect(); + let total = mem_lines[0].split(":").last().unwrap().trim().to_string(); + let available = mem_lines[2].split(":").last().unwrap().trim().to_string(); + + MemoryInfo { total, available } + } + + fn hostname() -> String { + let mut bytes = [0u8; 4096]; + let res = unsafe { + libc::gethostname(bytes.as_mut_ptr() as *mut std::ffi::c_char, bytes.len()) + }; + if res != 0 { + panic!("gethostname failed {:?}", res); + } + let end = bytes.iter().position(|b| *b == 0).expect("hostname is null-terminated"); + std::ffi::CStr::from_bytes_with_nul(&bytes[..end+1]).expect("null-terminated string").to_str().expect("is utf-8").to_string() + } + + pub fn collect_env_info() -> EnvInfo { + EnvInfo { + arch: std::env::consts::ARCH.to_string(), + family: std::env::consts::FAMILY.to_string(), + os: std::env::consts::OS.to_string(), + } + } + + pub fn collect_host_info() -> HostInfo { + let cpu_info = collect_cpu_info(); + let memory_info = collect_mem_info(); + let hostname = hostname(); + let env_info = collect_env_info(); + + HostInfo { + hostname, + cpu_info, + memory_info, + env_info, + } + } +} + -- cgit v1.1