diff options
| author | iximeow <me@iximeow.net> | 2023-07-13 00:51:51 -0700 | 
|---|---|---|
| committer | iximeow <me@iximeow.net> | 2023-07-13 00:51:51 -0700 | 
| commit | 9e6906c00c49186189d211dc96e132d85e7ff641 (patch) | |
| tree | 05c20145ebc306313e3a12dc73c34b5dea40bbdc /ci-runner | |
| parent | 543150f1666690351d4698421cc6ceb115c1e251 (diff) | |
reorganize the whole thing into crates/ packages
Diffstat (limited to 'ci-runner')
| -rw-r--r-- | ci-runner/Cargo.toml | 25 | ||||
| -rw-r--r-- | ci-runner/src/lua/mod.rs | 411 | ||||
| -rw-r--r-- | ci-runner/src/main.rs | 668 | 
3 files changed, 1104 insertions, 0 deletions
diff --git a/ci-runner/Cargo.toml b/ci-runner/Cargo.toml new file mode 100644 index 0000000..038ed14 --- /dev/null +++ b/ci-runner/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "ci-runner" +version = "0.0.1" +authors = [ "iximeow <me@iximeow.net>" ] +license = "0BSD" +edition = "2021" + +[[bin]] +name = "ci_runner" +path = "src/main.rs" + +[dependencies] +ci-lib-core = { path = "../ci-lib-core" } +ci-lib-native = { path = "../ci-lib-native" } + +libc = "*" +serde = "*" +serde_derive = "*" +serde_json = "*" +tokio = { version = "*", features = ["full"] } +reqwest = "*" +rlua = "*" +hyper = "*" +tracing = "*" +tracing-subscriber = "*" diff --git a/ci-runner/src/lua/mod.rs b/ci-runner/src/lua/mod.rs new file mode 100644 index 0000000..62ac68b --- /dev/null +++ b/ci-runner/src/lua/mod.rs @@ -0,0 +1,411 @@ +use crate::RunningJob; + +use rlua::prelude::*; + +use std::sync::{Arc, Mutex}; +use std::path::PathBuf; + +pub const DEFAULT_RUST_GOODFILE: &'static [u8] = include_bytes!("../../../config/goodfiles/rust.lua"); + +pub struct BuildEnv { +    lua: Lua, +    job: Arc<Mutex<RunningJob>>, +} + +#[derive(Debug)] +pub struct RunParams { +    step: Option<String>, +    name: Option<String>, +    cwd: Option<String>, +} + +pub struct CommandOutput { +    pub exit_status: std::process::ExitStatus, +    pub stdout: Vec<u8>, +    pub stderr: Vec<u8>, +} + +mod lua_exports { +    use crate::RunningJob; +    use crate::lua::{CommandOutput, RunParams}; + +    use std::sync::{Arc, Mutex}; +    use std::path::PathBuf; + +    use rlua::prelude::*; + +    pub fn collect_build_args(command: LuaValue, params: LuaValue) -> Result<(Vec<String>, RunParams), rlua::Error> { +        let args = match command { +            LuaValue::Table(table) => { +                let len = table.len().expect("command table has a length"); +                let mut command_args = Vec::new(); +                for i in 0..len { +                    let value = table.get(i + 1).expect("command arg is gettble"); +                    match value { +                        LuaValue::String(s) => { +                            command_args.push(s.to_str().unwrap().to_owned()); +                        }, +                        other => { +                            return Err(LuaError::RuntimeError(format!("argument {} was not a string, was {:?}", i, other))); +                        } +                    }; +                } + +                command_args +            }, +            other => { +                return Err(LuaError::RuntimeError(format!("argument 1 was not a table: {:?}", other))); +            } +        }; + +        let params = match params { +            LuaValue::Table(table) => { +                let step = match table.get("step").expect("can get from table") { +                    LuaValue::String(v) => { +                        Some(v.to_str()?.to_owned()) +                    }, +                    LuaValue::Nil => { +                        None +                    }, +                    other => { +                        return Err(LuaError::RuntimeError(format!("params[\"step\"] must be a string"))); +                    } +                }; +                let name = match table.get("name").expect("can get from table") { +                    LuaValue::String(v) => { +                        Some(v.to_str()?.to_owned()) +                    }, +                    LuaValue::Nil => { +                        None +                    }, +                    other => { +                        return Err(LuaError::RuntimeError(format!("params[\"name\"] must be a string"))); +                    } +                }; +                let cwd = match table.get("cwd").expect("can get from table") { +                    LuaValue::String(v) => { +                        Some(v.to_str()?.to_owned()) +                    }, +                    LuaValue::Nil => { +                        None +                    }, +                    other => { +                        return Err(LuaError::RuntimeError(format!("params[\"cwd\"] must be a string"))); +                    } +                }; + +                RunParams { +                    step, +                    name, +                    cwd, +                } +            }, +            LuaValue::Nil => { +                RunParams { +                    step: None, +                    name: None, +                    cwd: None, +                } +            } +            other => { +                return Err(LuaError::RuntimeError(format!("argument 2 was not a table: {:?}", other))); +            } +        }; + +        Ok((args, params)) +    } + +    pub fn build_command_impl(command: LuaValue, params: LuaValue, job_ctx: Arc<Mutex<RunningJob>>) -> Result<(), rlua::Error> { +        let (args, params) = collect_build_args(command, params)?; +        eprintln!("args: {:?}", args); +        eprintln!("  params: {:?}", params); +        let rt = tokio::runtime::Builder::new_current_thread() +            .enable_all() +            .build() +            .unwrap(); +        rt.block_on(async move { +            job_ctx.lock().unwrap().run_command(&args, params.cwd.as_ref().map(|x| x.as_str())).await +                .map_err(|e| LuaError::RuntimeError(format!("run_command error: {:?}", e))) +        }) +    } + +    pub fn check_output_impl<'lua>(ctx: rlua::Context<'lua>, command: LuaValue<'lua>, params: LuaValue<'lua>, job_ctx: Arc<Mutex<RunningJob>>) -> Result<rlua::Table<'lua>, rlua::Error> { +        let (args, params) = collect_build_args(command, params)?; +        eprintln!("args: {:?}", args); +        eprintln!("  params: {:?}", params); +        let rt = tokio::runtime::Builder::new_current_thread() +            .enable_all() +            .build() +            .unwrap(); +        let command_output = rt.block_on(async move { +            job_ctx.lock().unwrap().run_with_output(&args, params.cwd.as_ref().map(|x| x.as_str())).await +                .map_err(|e| LuaError::RuntimeError(format!("run_command error: {:?}", e))) +        })?; + +        let stdout = ctx.create_string(command_output.stdout.as_slice())?; +        let stderr = ctx.create_string(command_output.stderr.as_slice())?; + +        let result = ctx.create_table()?; +        result.set("stdout", stdout)?; +        result.set("stderr", stderr)?; +        result.set("status", command_output.exit_status.code())?; +        Ok(result) +    } + +    pub fn check_dependencies(commands: Vec<String>) -> Result<(), rlua::Error> { +        let mut missing_deps = Vec::new(); +        for command in commands.iter() { +            if !has_cmd(command)? { +                missing_deps.push(command.clone()); +            } +        } + +        if missing_deps.len() > 0 { +            return Err(LuaError::RuntimeError(format!("missing dependencies: {}", missing_deps.join(", ")))); +        } + +        Ok(()) +    } + +    pub fn metric(name: String, value: String, job_ctx: Arc<Mutex<RunningJob>>) -> Result<(), rlua::Error> { +        let rt = tokio::runtime::Builder::new_current_thread() +            .enable_all() +            .build() +            .unwrap(); +        rt.block_on(async move { +            job_ctx.lock().unwrap().send_metric(&name, value).await +                .map_err(|e| LuaError::RuntimeError(format!("send_metric error: {:?}", e))) +        }) +    } + +    pub fn artifact(path: String, name: Option<String>, job_ctx: Arc<Mutex<RunningJob>>) -> Result<(), rlua::Error> { +        let path: PathBuf = path.into(); + +        let default_name: String = match (path.file_name(), path.parent()) { +            (Some(name), _) => name +                .to_str() +                .ok_or(LuaError::RuntimeError("artifact name is not a unicode string".to_string()))? +                .to_string(), +            (_, Some(parent)) => format!("{}", parent.display()), +            (None, None) => { +                // one day using directories for artifacts might work but / is still not going +                // to be accepted +                return Err(LuaError::RuntimeError(format!("cannot infer a default path name for {}", path.display()))); +            } +        }; + +        let name: String = match name { +            Some(name) => name, +            None => default_name, +        }; +        let rt = tokio::runtime::Builder::new_current_thread() +            .enable_all() +            .build() +            .unwrap(); +        rt.block_on(async move { +            let mut artifact = job_ctx.lock().unwrap().create_artifact(&name, &format!("{} (from {})", name, path.display())).await +                .map_err(|e| LuaError::RuntimeError(format!("create_artifact error: {:?}", e))) +                .unwrap(); +            let mut file = tokio::fs::File::open(&format!("tmpdir/{}", path.display())).await.unwrap(); +            eprintln!("uploading..."); +            crate::io::forward_data(&mut file, &mut artifact).await +                .map_err(|e| LuaError::RuntimeError(format!("failed uploading data for {}: {:?}", name, e)))?; +            std::mem::drop(artifact); +            Ok(()) +        }) +    } + +    pub fn has_cmd(name: &str) -> Result<bool, rlua::Error> { +        Ok(std::process::Command::new("which") +            .arg(name) +            .status() +            .map_err(|e| LuaError::RuntimeError(format!("could not fork which? {:?}", e)))? +            .success()) +    } + +    pub fn file_size(path: &str) -> Result<u64, rlua::Error> { +        Ok(std::fs::metadata(&format!("tmpdir/{}", path)) +            .map_err(|e| LuaError::RuntimeError(format!("could not stat {:?}", path)))? +            .len()) +    } + +    pub mod step { +        use crate::RunningJob; +        use std::sync::{Arc, Mutex}; + +        pub fn start(job_ref: Arc<Mutex<RunningJob>>, name: String) -> Result<(), rlua::Error> { +            let mut job = job_ref.lock().unwrap(); +            job.current_step.clear(); +            job.current_step.push(name); +            Ok(()) +        } + +        pub fn push(job_ref: Arc<Mutex<RunningJob>>, name: String) -> Result<(), rlua::Error> { +            let mut job = job_ref.lock().unwrap(); +            job.current_step.push(name); +            Ok(()) +        } + +        pub fn advance(job_ref: Arc<Mutex<RunningJob>>, name: String) -> Result<(), rlua::Error> { +            let mut job = job_ref.lock().unwrap(); +            job.current_step.pop(); +            job.current_step.push(name); +            Ok(()) +        } +    } +} + +struct DeclEnv<'lua, 'env> { +    lua_ctx: &'env rlua::Context<'lua>, +    job_ref: &'env Arc<Mutex<RunningJob>>, +} +impl<'lua, 'env> DeclEnv<'lua, 'env> { +    fn create_function<A, R, F>(&self, name: &str, f: F) ->  Result<rlua::Function<'lua>, String> +        where +        A: FromLuaMulti<'lua>, +        R: ToLuaMulti<'lua>, +        F: 'static + Send + Fn(rlua::Context<'lua>, Arc<Mutex<RunningJob>>, A) -> Result<R, rlua::Error> { + +        let job_ref = Arc::clone(self.job_ref); +        self.lua_ctx.create_function(move |ctx, args| { +            let job_ref = Arc::clone(&job_ref); +            f(ctx, job_ref, args) +        }) +            .map_err(|e| format!("problem defining {} function: {:?}", name, e)) +    } +} + +impl BuildEnv { +    pub fn new(job: &Arc<Mutex<RunningJob>>) -> Self { +        let env = BuildEnv { +            lua: Lua::new(), +            job: Arc::clone(job), +        }; +        env.lua.context(|lua_ctx| { +            env.define_env(lua_ctx) +        }).expect("can define context"); +        env +    } + +    fn define_env(&self, lua_ctx: rlua::Context) -> Result<(), String> { +        let decl_env = DeclEnv { +            lua_ctx: &lua_ctx, +            job_ref: &self.job, +        }; + +        let hello = decl_env.create_function("hello", |_, _, ()| { +            eprintln!("hello from lua!!!"); +            Ok(()) +        })?; + +        let check_dependencies = decl_env.create_function("dependencies", move |_, job_ref, commands: Vec<String>| { +            lua_exports::check_dependencies(commands) +        })?; + +        let build = decl_env.create_function("build", move |_, job_ref, (command, params): (LuaValue, LuaValue)| { +            lua_exports::build_command_impl(command, params, job_ref) +        })?; + +        let check_output = decl_env.create_function("check_output", move |ctx, job_ref, (command, params): (LuaValue, LuaValue)| { +            lua_exports::check_output_impl(ctx, command, params, job_ref) +        })?; + +        let metric = decl_env.create_function("metric", move |_, job_ref, (name, value): (String, String)| { +            lua_exports::metric(name, value, job_ref) +        })?; + +        let now_ms = decl_env.create_function("now_ms", move |_, job_ref, ()| Ok(ci_lib_core::now_ms()))?; + +        let artifact = decl_env.create_function("artifact", move |_, job_ref, (path, name): (String, Option<String>)| { +            lua_exports::artifact(path, name, job_ref) +        })?; + +        let error = decl_env.create_function("error", move |_, job_ref, msg: String| { +            Err::<(), LuaError>(LuaError::RuntimeError(format!("explicit error: {}", msg))) +        })?; + +        let path_has_cmd = decl_env.create_function("path_has_cmd", move |_, job_ref, name: String| { +            lua_exports::has_cmd(&name) +        })?; + +        let size_of_file = decl_env.create_function("size_of_file", move |_, job_ref, name: String| { +            lua_exports::file_size(&name) +        })?; + +        let native_rust_triple = match std::env::consts::ARCH { +            "x86_64" => "x86_64-unknown-linux-gnu", +            "aarch64" => "aarch64-unknown-linux-gnu", +            other => { panic!("dunno native rust triple for arch {}", other); } +        }; +        let native_rust_triple = lua_ctx.create_string(native_rust_triple).unwrap(); +        let build_env_vars = lua_ctx.create_table_from( +            vec![ +                ("native_rust_triple", native_rust_triple) +            ] +        ).unwrap(); + +        let build_environment = lua_ctx.create_table_from( +            vec![ +                ("has", path_has_cmd), +                ("size", size_of_file), +            ] +        ).unwrap(); +        build_environment.set("vars", build_env_vars).unwrap(); + +        let build_functions = lua_ctx.create_table_from( +            vec![ +                ("hello", hello), +                ("run", build), +                ("dependencies", check_dependencies), +                ("metric", metric), +                ("error", error), +                ("artifact", artifact), +                ("now_ms", now_ms), +                ("check_output", check_output), +            ] +        ).unwrap(); +        build_functions.set("environment", build_environment).unwrap(); +        let current_commit = self.job.lock().unwrap().job.commit.clone(); +        build_functions.set("sha", lua_ctx.create_string(current_commit.as_bytes()).unwrap()).unwrap(); +        let globals = lua_ctx.globals(); +        globals.set("Build", build_functions).unwrap(); + + +        let step_start = decl_env.create_function("step_start", move |_, job_ref, name: String| { +            lua_exports::step::start(job_ref, name) +        })?; + +        let step_push = decl_env.create_function("step_push", move |_, job_ref, name: String| { +            lua_exports::step::push(job_ref, name) +        })?; + +        let step_advance = decl_env.create_function("step_advance", move |_, job_ref, name: String| { +            lua_exports::step::advance(job_ref, name) +        })?; + +        let step_functions = lua_ctx.create_table_from( +            vec![ +                ("start", step_start), +                ("push", step_push), +                ("advance", step_advance), +            ] +        ).unwrap(); +        globals.set("Step", step_functions).unwrap(); +        Ok(()) +    } + +    pub async fn run_build(self, script: &[u8]) -> Result<(), LuaError> { +        let script = script.to_vec(); +        let res: Result<(), LuaError> = tokio::task::spawn_blocking(|| { +            std::thread::spawn(move || { +                self.lua.context(|lua_ctx| { +                    lua_ctx.load(&script) +                        .set_name("goodfile")? +                        .exec() +                }) +            }).join().unwrap() +        }).await.unwrap(); +        eprintln!("lua res: {:?}", res); +        res +    } +} diff --git a/ci-runner/src/main.rs b/ci-runner/src/main.rs new file mode 100644 index 0000000..41f5594 --- /dev/null +++ b/ci-runner/src/main.rs @@ -0,0 +1,668 @@ +use std::time::Duration; +use std::os::unix::process::ExitStatusExt; +use rlua::prelude::LuaError; +use std::sync::{Arc, Mutex}; +use reqwest::{StatusCode, Response}; +use tokio::process::Command; +use std::process::Stdio; +use std::process::ExitStatus; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use serde_json::json; +use serde::{Deserialize, de::DeserializeOwned, Serialize}; +use std::task::{Context, Poll}; +use std::pin::Pin; +use std::marker::Unpin; + +use ci_lib_native::io; +use ci_lib_native::io::{ArtifactStream, VecSink}; +use ci_lib_core::protocol::{ClientProto, CommandInfo, TaskInfo, RequestedJob}; + +mod lua; + +use crate::lua::CommandOutput; + +#[derive(Debug)] +enum WorkAcquireError { +    Reqwest(reqwest::Error), +    EarlyEof, +    Protocol(String), +} + +struct RunnerClient { +    http: reqwest::Client, +    host: String, +    tx: hyper::body::Sender, +    rx: Response, +    current_job: Option<RequestedJob>, +} + +impl RunningJob { +    fn from_job(job: RequestedJob, client: RunnerClient) -> Self { +        Self { +            job, +            client, +            current_step: StepTracker::new(), +        } +    } +} + +struct JobEnv { +    lua: lua::BuildEnv, +    job: Arc<Mutex<RunningJob>>, +} + +impl JobEnv { +    fn new(job: &Arc<Mutex<RunningJob>>) -> Self { +        let lua = lua::BuildEnv::new(job); +        JobEnv { +            lua, +            job: Arc::clone(job) +        } +    } + +    async fn default_goodfile(self) -> Result<(), LuaError> { +        self.lua.run_build(crate::lua::DEFAULT_RUST_GOODFILE).await +    } + +    async fn exec_goodfile(self) -> Result<(), LuaError> { +        let script = std::fs::read_to_string("./tmpdir/goodfile").unwrap(); +        self.lua.run_build(script.as_bytes()).await +    } +} + +pub struct RunningJob { +    job: RequestedJob, +    client: RunnerClient, +    current_step: StepTracker, +} + +enum RepoError { +    CloneFailedIdk { exit_code: ExitStatus }, +    CheckoutFailedIdk { exit_code: ExitStatus }, +    CheckoutFailedMissingRef, +} + +pub struct StepTracker { +    scopes: Vec<String> +} + +impl StepTracker { +    pub fn new() -> Self { +        StepTracker { +            scopes: Vec::new() +        } +    } + +    pub fn push(&mut self, name: String) { +        self.scopes.push(name); +    } + +    pub fn pop(&mut self) { +        self.scopes.pop(); +    } + +    pub fn clear(&mut self) { +        self.scopes.clear(); +    } + +    pub fn full_step_path(&self) -> &[String] { +        self.scopes.as_slice() +    } +} + +impl RunningJob { +    async fn send_metric(&mut self, name: &str, value: String) -> Result<(), String> { +        self.client.send_typed(&ClientProto::metric(name, value)) +            .await +            .map_err(|e| format!("failed to send metric {}: {:?})", name, e)) +    } + +    // TODO: panics if hyper finds the channel is closed. hum +    async fn create_artifact(&self, name: &str, desc: &str) -> Result<ArtifactStream, String> { +        let (mut sender, body) = hyper::Body::channel(); +        let resp = self.client.http.post("https://ci.butactuallyin.space:9876/api/artifact") +            .header("user-agent", "ci-butactuallyin-space-runner") +            .header("x-task-token", &self.job.build_token) +            .header("x-artifact-name", name) +            .header("x-artifact-desc", desc) +            .body(body) +            .send() +            .await +            .map_err(|e| format!("unable to send request: {:?}", e))?; + +        if resp.status() == StatusCode::OK { +            eprintln!("[+] artifact '{}' started", name); +            Ok(ArtifactStream::new(sender)) +        } else { +            Err(format!("[-] unable to create artifact: {:?}", resp)) +        } +    } + +    async fn clone_remote(&self) -> Result<(), RepoError> { +        let mut git_clone = Command::new("git"); +        git_clone +            .arg("clone") +            .arg(&self.job.remote_url) +            .arg("tmpdir"); + +        let clone_res = self.execute_command_and_report(git_clone, "git clone log", &format!("git clone {} tmpdir", &self.job.remote_url)).await +            .map_err(|e| { +                eprintln!("stringy error (exec failed?) for clone: {}", e); +                RepoError::CloneFailedIdk { exit_code: ExitStatus::from_raw(0) } +            })?; + +        if !clone_res.success() { +            return Err(RepoError::CloneFailedIdk { exit_code: clone_res }); +        } + +        let mut git_checkout = Command::new("git"); +        git_checkout +            .current_dir("tmpdir") +            .arg("checkout") +            .arg(&self.job.commit); + +        let checkout_res = self.execute_command_and_report(git_checkout, "git checkout log", &format!("git checkout {}", &self.job.commit)).await +            .map_err(|e| { +                eprintln!("stringy error (exec failed?) for checkout: {}", e); +                RepoError::CheckoutFailedIdk { exit_code: ExitStatus::from_raw(0) } +            })?; + +        if !checkout_res.success() { +            if checkout_res.code() == Some(128) { +                return Err(RepoError::CheckoutFailedIdk { exit_code: checkout_res }); +            } else { +                return Err(RepoError::CheckoutFailedMissingRef); +            } +        } + +        Ok(()) +    } + +    async fn execute_command_and_report(&self, mut command: Command, name: &str, desc: &str) -> Result<ExitStatus, String> { +        let stdout_artifact = self.create_artifact( +            &format!("{} (stdout)", name), +            &format!("{} (stdout)", desc) +        ).await.expect("works"); +        let stderr_artifact = self.create_artifact( +            &format!("{} (stderr)", name), +            &format!("{} (stderr)", desc) +        ).await.expect("works"); + +        let exit_status = self.execute_command(command, name, desc, stdout_artifact, stderr_artifact).await?; + +        Ok(exit_status) +    } + +    async fn execute_command_capture_output(&self, mut command: Command, name: &str, desc: &str) -> Result<crate::lua::CommandOutput, String> { +        let stdout_collector = VecSink::new(); +        let stderr_collector = VecSink::new(); + +        let exit_status = self.execute_command(command, name, desc, stdout_collector.clone(), stderr_collector.clone()).await?; + +        Ok(CommandOutput { +            exit_status, +            stdout: stdout_collector.take_buf(), +            stderr: stderr_collector.take_buf(), +        }) +    } + +    async fn execute_command(&self, mut command: Command, name: &str, desc: &str, mut stdout_reporter: impl AsyncWrite + Unpin + Send + 'static, mut stderr_reporter: impl AsyncWrite + Unpin + Send + 'static) -> Result<ExitStatus, String> { +        eprintln!("[.] running {}", name); + +        let mut child = command +            .stdin(Stdio::null()) +            .stdout(Stdio::piped()) +            .stderr(Stdio::piped()) +            .spawn() +            .map_err(|e| format!("failed to spawn '{}', {:?}", name, e))?; + +        let mut child_stdout = child.stdout.take().unwrap(); +        let mut child_stderr = child.stderr.take().unwrap(); + +        eprintln!("[.] '{}': forwarding stdout", name); +        tokio::spawn(async move { io::forward_data(&mut child_stdout, &mut stdout_reporter).await }); +        eprintln!("[.] '{}': forwarding stderr", name); +        tokio::spawn(async move { io::forward_data(&mut child_stderr, &mut stderr_reporter).await }); + +        let res = child.wait().await +            .map_err(|e| format!("failed to wait? {:?}", e))?; + +        if res.success() { +            eprintln!("[+] '{}' success", name); +        } else { +            eprintln!("[-] '{}' fail: {:?}", name, res); +        } + +        Ok(res) +    } + +    async fn run(mut self) { +        self.client.send_typed(&ClientProto::Started).await.unwrap(); + +        std::fs::remove_dir_all("tmpdir").unwrap(); +        std::fs::create_dir("tmpdir").unwrap(); + +        let ctx = Arc::new(Mutex::new(self)); + +        let checkout_res = ctx.lock().unwrap().clone_remote().await; + +        if let Err(e) = checkout_res { +            let status = "bad_ref"; +            let status = ClientProto::task_status(TaskInfo::finished(status)); +            eprintln!("checkout failed, reporting status: {:?}", status); + +            let res = ctx.lock().unwrap().client.send_typed(&status).await; +            if let Err(e) = res { +                eprintln!("[!] FAILED TO REPORT JOB STATUS ({}): {:?}", "success", e); +            } + +            return; +        } + +        let lua_env = JobEnv::new(&ctx); + +        let metadata = std::fs::metadata("./tmpdir/goodfile"); +        let res: Result<String, (String, String)> = match metadata { +            Ok(_) => { +                match lua_env.exec_goodfile().await { +                    Ok(()) => { +                        Ok("pass".to_string()) +                    }, +                    Err(lua_err) => { +                        Err(("failed".to_string(), lua_err.to_string())) +                    } +                } +            }, +            Err(e) if e.kind() == std::io::ErrorKind::NotFound => { +                match lua_env.default_goodfile().await { +                    Ok(()) => { +                        Ok("pass".to_string()) +                    }, +                    Err(lua_err) => { +                        Err(("failed".to_string(), lua_err.to_string())) +                    } +                } +            }, +            Err(e) => { +                eprintln!("[-] error finding goodfile: {:?}", e); +                Err(("failed".to_string(), "inaccessible goodfile".to_string())) +            } +        }; + +        match res { +            Ok(status) => { +                eprintln!("[+] job success!"); +                let status = ClientProto::task_status(TaskInfo::finished(status)); +                eprintln!("reporting status: {:?}", status); + +                let res = ctx.lock().unwrap().client.send_typed(&status).await; +                if let Err(e) = res { +                    eprintln!("[!] FAILED TO REPORT JOB STATUS ({}): {:?}", "success", e); +                } +            } +            Err((status, lua_err)) => { +                eprintln!("[-] job error: {}", status); + +                let status = ClientProto::task_status(TaskInfo::interrupted(status, lua_err.to_string())); +                let res = ctx.lock().unwrap().client.send_typed(&status).await; +                if let Err(e) = res { +                    eprintln!("[!] FAILED TO REPORT JOB STATUS ({:?}): {:?}", status, e); +                } +            } +        } +    } + +    fn prep_command(command: &[String], working_dir: Option<&str>) -> (Command, String) { +        let mut cmd = Command::new(&command[0]); +        let cwd = match working_dir { +            Some(dir) => { +                format!("tmpdir/{}", dir) +            }, +            None => { +                "tmpdir".to_string() +            } +        }; +        eprintln!("prepared {:?} to run in {}", &command, &cwd); +        let human_name = command.join(" "); +        cmd +            .current_dir(cwd) +            .args(&command[1..]); +        (cmd, human_name) +    } + +    async fn run_with_output(&mut self, command: &[String], working_dir: Option<&str>) -> Result<CommandOutput, String> { +        let (cmd, human_name) = Self::prep_command(command, working_dir); + +        let cmd_res = self.execute_command_capture_output(cmd, &format!("{} log", human_name), &human_name).await?; + +        if !cmd_res.exit_status.success() { +            return Err(format!("{} failed: {:?}", &human_name, cmd_res.exit_status)); +        } +        Ok(cmd_res) +    } + +    async fn run_command(&mut self, command: &[String], working_dir: Option<&str>) -> Result<(), String> { +        self.client.send_typed(&ClientProto::command(CommandInfo::started(command, working_dir, 1))) +            .await.unwrap(); + +        let (cmd, human_name) = Self::prep_command(command, working_dir); + +        let cmd_res = self.execute_command_and_report(cmd, &format!("{} log", human_name), &human_name).await?; + +        self.client.send_typed(&ClientProto::command(CommandInfo::finished(cmd_res.code(), 1))) +            .await.unwrap(); + + +        if !cmd_res.success() { +            return Err(format!("{} failed: {:?}", &human_name, cmd_res)); +        } + +        Ok(()) +    } +} + +impl RunnerClient { +    async fn new(host: &str, mut sender: hyper::body::Sender, mut res: Response) -> Result<Self, String> { +        if res.status() != StatusCode::OK { +            return Err(format!("server returned a bad response: {:?}, response itself: {:?}", res.status(), res)); +        } + +        let hello = res.chunk().await.expect("chunk"); +        if hello.as_ref().map(|x| &x[..]) != Some(b"hello") { +            return Err(format!("bad hello: {:?}", hello)); +        } + +        Ok(Self { +            http: reqwest::ClientBuilder::new() +                .connect_timeout(Duration::from_millis(1000)) +                .timeout(Duration::from_millis(600000)) +                .build() +                .expect("can build client"), +            host: host.to_string(), +            tx: sender, +            rx: res, +            current_job: None, +        }) +    } + +    async fn wait_for_work(&mut self, accepted_pushers: Option<&[String]>) -> Result<Option<RequestedJob>, WorkAcquireError> { +        loop { +            let message = self.recv_typed::<ClientProto>().await; +            match message { +                Ok(Some(ClientProto::NewTask(new_task))) => { +                    return Ok(Some(new_task)); +                }, +                Ok(Some(ClientProto::Ping)) => { +                    self.send_typed(&ClientProto::Pong).await +                        .map_err(|e| WorkAcquireError::Protocol(format!("failed to pong: {}", e)))?; +                }, +                Ok(Some(other)) => { +                    return Err(WorkAcquireError::Protocol(format!("unexpected message: {:?}", other))); +                }, +                Ok(None) => { +                    return Ok(None); +                }, +                Err(e) => { +                    return Err(WorkAcquireError::Protocol(e)); +                } +            } +        } +    } + +    async fn recv(&mut self) -> Result<Option<serde_json::Value>, String> { +        self.recv_typed().await +    } + +    async fn recv_typed<T: DeserializeOwned>(&mut self) -> Result<Option<T>, String> { +        match self.rx.chunk().await { +            Ok(Some(chunk)) => { +                serde_json::from_slice(&chunk) +                    .map(Option::Some) +                    .map_err(|e| { +                        format!("not json: {:?}", e) +                    }) +            }, +            Ok(None) => Ok(None), +            Err(e) => { +                Err(format!("error in recv: {:?}", e)) +            } +        } +    } + +    async fn send(&mut self, value: serde_json::Value) -> Result<(), String> { +        self.send_typed(&value).await +    } + +    async fn send_typed<T: Serialize>(&mut self, t: &T) -> Result<(), String> { +        self.tx.send_data( +            serde_json::to_vec(t) +                .map_err(|e| format!("json error: {:?}", e))? +                .into() +        ).await +            .map_err(|e| format!("send error: {:?}", e)) +    } +} + +#[derive(Deserialize, Serialize)] +struct RunnerConfig { +    server_address: String, +    auth_secret: String, +    allowed_pushers: Option<Vec<String>>, +} + +#[tokio::main] +async fn main() { +    tracing_subscriber::fmt::init(); +    let mut args = std::env::args(); +    args.next().expect("first arg exists"); +    let config_path = args.next().unwrap_or("./runner_config.json".to_string()); +    let runner_config: RunnerConfig = serde_json::from_reader(std::fs::File::open(config_path).expect("file exists and is accessible")).expect("valid json for RunnerConfig"); +    let client = reqwest::ClientBuilder::new() +        .connect_timeout(Duration::from_millis(1000)) +        .timeout(Duration::from_millis(600000)) +        .build() +        .expect("can build client"); + +    let host_info = host_info::collect_host_info(); +    eprintln!("host info: {:?}", host_info); + +    loop { +        let (mut sender, body) = hyper::Body::channel(); + +        sender.send_data(serde_json::to_string(&ClientProto::new_task_please( +            runner_config.allowed_pushers.clone(), +            host_info.clone(), +        )).unwrap().into()).await.expect("req"); + +        let poll = client.post("https://ci.butactuallyin.space:9876/api/next_job") +            .header("user-agent", "ci-butactuallyin-space-runner") +            .header("authorization", runner_config.auth_secret.trim()) +            .body(body) +            .send() +            .await; + +        match poll { +            Ok(mut res) => { +                let mut client = match RunnerClient::new("ci.butactuallyin.space:9876", sender, res).await { +                    Ok(client) => client, +                    Err(e) => { +                        eprintln!("failed to initialize client: {:?}", e); +                        std::thread::sleep(Duration::from_millis(10000)); +                        continue; +                    } +                }; +                let job = match client.wait_for_work(runner_config.allowed_pushers.as_ref().map(|x| x.as_ref())).await { +                    Ok(Some(request)) => request, +                    Ok(None) => { +                        eprintln!("no work to do (yet)"); +                        std::thread::sleep(Duration::from_millis(2000)); +                        continue; +                    } +                    Err(e) => { +                        eprintln!("failed to get work: {:?}", e); +                        std::thread::sleep(Duration::from_millis(10000)); +                        continue; +                    } +                }; +                eprintln!("requested work: {:?}", job); + +                eprintln!("doing {:?}", job); + +                let mut job = RunningJob::from_job(job, client); +                job.run().await; +                std::thread::sleep(Duration::from_millis(10000)); +            }, +            Err(e) => { +                let message = format!("{}", e); + +                if message.contains("tcp connect error") { +                    eprintln!("could not reach server. sleeping a bit and retrying."); +                    std::thread::sleep(Duration::from_millis(5000)); +                    continue; +                } + +                eprintln!("unhandled error: {}", message); + +                std::thread::sleep(Duration::from_millis(1000)); +            } +        } +    } +} + +mod host_info { +    use ci_lib_core::protocol::{CpuInfo, EnvInfo, HostInfo, MemoryInfo}; + +    // get host model name, microcode, and how many cores +    fn collect_cpu_info() -> CpuInfo { +        fn find_line(lines: &[String], prefix: &str) -> String { +            lines.iter() +                .find(|line| line.starts_with(prefix)) +                .expect(&format!("{} line is present", prefix)) +                .split(":") +                .last() +                .unwrap() +                .trim() +                .to_string() +        } + +        /// try finding core `cpu`'s max frequency in khz. we'll assume this is the actual speed a +        /// build would run at.. fingers crossed. +        fn try_finding_cpu_freq(cpu: u32) -> Result<u64, String> { +            if let Ok(freq_str) = std::fs::read_to_string("/sys/devices/system/cpu/cpu0/cpufreq/cpuinfo_max_freq") { +                Ok(freq_str.trim().parse().unwrap()) +            } else { +                // so cpufreq probably isn't around, maybe /proc/cpuinfo's mhz figure is present? +                let cpu_lines: Vec<String> = std::fs::read_to_string("/proc/cpuinfo").unwrap().split("\n").map(|line| line.to_string()).collect(); +                let cpu_mhzes: Vec<&String> = cpu_lines.iter().filter(|line| line.starts_with("cpu MHz")).collect(); +                match cpu_mhzes.get(cpu as usize) { +                    Some(mhz) => { +                        let mut line_parts = cpu_mhzes[cpu as usize].split(":"); +                        let _ = line_parts.next(); +                        let mhz = line_parts.next().unwrap().trim(); +                        let mhz: f64 = mhz.parse().unwrap(); +                        Ok((mhz * 1000.0) as u64) +                    }, +                    None => { +                        panic!("could not get cpu freq either from cpufreq or /proc/cpuinfo?"); +                    } +                } +            } +        } + +        // we'll have to deploy one of a few techniques, because x86/x86_64 is internally +        // consistent, but aarch64 is different. who knows what other CPUs think. +        match std::env::consts::ARCH { +            "x86" | "x86_64" => { +                let cpu_lines: Vec<String> = std::fs::read_to_string("/proc/cpuinfo").unwrap().split("\n").map(|line| line.to_string()).collect(); +                let model_names: Vec<&String> = cpu_lines.iter().filter(|line| line.starts_with("model name")).collect(); +                let cores = model_names.len() as u32; +                let model_name = find_line(&cpu_lines, "model name"); +                let vendor_id = find_line(&cpu_lines, "vendor_id"); +                let family = find_line(&cpu_lines, "cpu family"); +                let model = find_line(&cpu_lines, "model\t"); +                let microcode = find_line(&cpu_lines, "microcode"); +                let max_freq = try_finding_cpu_freq(0).unwrap(); + +                CpuInfo { model_name, microcode, cores, vendor_id, family, model, max_freq } +            } +            "aarch64" => { +                let cpu_lines: Vec<String> = std::fs::read_to_string("/proc/cpuinfo").unwrap().split("\n").map(|line| line.to_string()).collect(); +                let processors: Vec<&String> = cpu_lines.iter().filter(|line| line.starts_with("processor")).collect(); +                let cores = processors.len() as u32; + +                // alternate possible path: /sys/firmware/devicetree/base/compatible +                let model_name = std::fs::read_to_string("/proc/device-tree/compatible").unwrap(); +                let model_name = model_name.replace("\x00", ";"); +                let vendor_id = find_line(&cpu_lines, "CPU implementer"); +                let vendor_name = match vendor_id.as_str() { +                    "0x41" => "Arm Limited".to_string(), +                    "0x42" => "Broadcom Corporation".to_string(), +                    "0x43" => "Cavium Inc".to_string(), +                    "0x44" => "Digital Equipment Corporation".to_string(), +                    "0x46" => "Fujitsu Ltd".to_string(), +                    "0x49" => "Infineon Technologies AG".to_string(), +                    "0x4d" => "Motorola".to_string(), +                    "0x4e" => "NVIDIA Corporation".to_string(), +                    "0x50" => "Applied Micro Circuits Corporation".to_string(), +                    "0x51" => "Qualcomm Inc".to_string(), +                    "0x56" => "Marvell International Ltd".to_string(), +                    "0x69" => "Intel Corporation".to_string(), +                    "0xc0" => "Ampere Computing".to_string(), +                    other => format!("unknown aarch64 vendor {}", other), +                }; +                let family = find_line(&cpu_lines, "CPU architecture"); +                let model = find_line(&cpu_lines, "CPU part"); +                let microcode = String::new(); +                let max_freq = std::fs::read_to_string("/sys/devices/system/cpu/cpu0/cpufreq/cpuinfo_max_freq").unwrap().trim().parse().unwrap(); + +                CpuInfo { model_name, microcode, cores, vendor_id: vendor_name, family, model, max_freq } +            } +            other => { +                panic!("dunno how to find cpu info for {}, panik", other); +            } +        } +    } + +    fn collect_mem_info() -> MemoryInfo { +        let mem_lines: Vec<String> = std::fs::read_to_string("/proc/meminfo").unwrap().split("\n").map(|line| line.to_string()).collect(); +        let total = mem_lines[0].split(":").last().unwrap().trim().to_string(); +        let available = mem_lines[2].split(":").last().unwrap().trim().to_string(); + +        MemoryInfo { total, available } +    } + +    fn hostname() -> String { +        let mut bytes = [0u8; 4096]; +        let res = unsafe { +            libc::gethostname(bytes.as_mut_ptr() as *mut std::ffi::c_char, bytes.len()) +        }; +        if res != 0 { +            panic!("gethostname failed {:?}", res); +        } +        let end = bytes.iter().position(|b| *b == 0).expect("hostname is null-terminated"); +        std::ffi::CStr::from_bytes_with_nul(&bytes[..end+1]).expect("null-terminated string").to_str().expect("is utf-8").to_string() +    } + +    pub fn collect_env_info() -> EnvInfo { +        EnvInfo { +            arch: std::env::consts::ARCH.to_string(), +            family: std::env::consts::FAMILY.to_string(), +            os: std::env::consts::OS.to_string(), +        } +    } + +    pub fn collect_host_info() -> HostInfo { +        let cpu_info = collect_cpu_info(); +        let memory_info = collect_mem_info(); +        let hostname = hostname(); +        let env_info = collect_env_info(); + +        HostInfo { +            hostname, +            cpu_info, +            memory_info, +            env_info, +        } +    } +} +  | 
