From 018c655304bfc185740b3a163b369ead7d5131e8 Mon Sep 17 00:00:00 2001 From: iximeow Date: Tue, 27 Dec 2022 23:53:41 +0000 Subject: finally actually support goodfiles some more refinements to how builds are run as well: build state discusses if a build us running, where the result is either a pass or fail this is useful for deciding if a build is in progress and how artifacts (if any) should be presented --- src/ci_runner.rs | 291 ++++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 203 insertions(+), 88 deletions(-) (limited to 'src/ci_runner.rs') diff --git a/src/ci_runner.rs b/src/ci_runner.rs index 1c2ba92..de423eb 100644 --- a/src/ci_runner.rs +++ b/src/ci_runner.rs @@ -1,4 +1,6 @@ use std::time::Duration; +use rlua::prelude::LuaError; +use std::sync::{Arc, Mutex}; use reqwest::{StatusCode, Response}; use tokio::process::Command; use std::process::Stdio; @@ -11,6 +13,8 @@ use std::task::{Context, Poll}; use std::pin::Pin; use std::marker::Unpin; +mod lua; + #[derive(Debug)] enum WorkAcquireError { Reqwest(reqwest::Error), @@ -34,12 +38,73 @@ struct RequestedJob { } impl RequestedJob { + pub fn into_running(self, client: RunnerClient) -> RunningJob { + RunningJob { + job: self, + client, + } + } +} + +struct JobEnv { + lua: lua::BuildEnv, + job: Arc>, +} + +impl JobEnv { + fn new(job: &Arc>) -> Self { + let lua = lua::BuildEnv::new(job); + JobEnv { + lua, + job: Arc::clone(job) + } + } + + async fn default_goodfile(self) -> Result<(), LuaError> { + self.lua.run_build(crate::lua::DEFAULT_RUST_GOODFILE).await + } + + async fn exec_goodfile(self) -> Result<(), LuaError> { + let script = std::fs::read_to_string("./tmpdir/goodfile").unwrap(); + self.lua.run_build(script.as_bytes()).await + } +} + +pub struct RunningJob { + job: RequestedJob, + client: RunnerClient, +} + +async fn forward_data(mut source: impl AsyncRead + Unpin, mut dest: impl AsyncWrite + Unpin) -> Result<(), String> { + let mut buf = vec![0; 1024 * 1024]; + loop { + let n_read = source.read(&mut buf).await + .map_err(|e| format!("failed to read: {:?}", e))?; + + if n_read == 0 { + return Ok(()); + } + + dest.write_all(&buf[..n_read]).await + .map_err(|e| format!("failed to write: {:?}", e))?; + } +} + +impl RunningJob { + async fn send_metric(&mut self, name: &str, value: String) -> Result<(), String> { + self.client.send(serde_json::json!({ + "kind": "metric", + "value": value.to_string(), + })).await + .map_err(|e| format!("failed to send metric {}: {:?})", name, e)) + } + // TODO: panics if hyper finds the channel is closed. hum - async fn create_artifact(&self, client: &mut RunnerClient, name: &str, desc: &str) -> Result { + async fn create_artifact(&self, name: &str, desc: &str) -> Result { let (mut sender, body) = hyper::Body::channel(); - let resp = client.http.post("https://ci.butactuallyin.space:9876/api/artifact") + let resp = self.client.http.post("https://ci.butactuallyin.space:9876/api/artifact") .header("user-agent", "ci-butactuallyin-space-runner") - .header("x-job-token", &self.build_token) + .header("x-job-token", &self.job.build_token) .header("x-artifact-name", name) .header("x-artifact-desc", desc) .body(body) @@ -57,30 +122,71 @@ impl RequestedJob { } } - async fn execute_command(&self, client: &mut RunnerClient, mut command: Command, name: &str, desc: &str) -> Result { - eprintln!("[.] running {}", name); - async fn forward_data(mut source: impl AsyncRead + Unpin, mut dest: impl AsyncWrite + Unpin) -> Result<(), String> { - let mut buf = vec![0; 1024 * 1024]; - loop { - let n_read = source.read(&mut buf).await - .map_err(|e| format!("failed to read: {:?}", e))?; - - if n_read == 0 { - return Ok(()); - } + async fn clone_remote(&self) -> Result<(), String> { + let mut git_clone = Command::new("git"); + git_clone + .arg("clone") + .arg(&self.job.remote_url) + .arg("tmpdir"); - dest.write_all(&buf[..n_read]).await - .map_err(|e| format!("failed to write: {:?}", e))?; - } + let clone_res = self.execute_command(git_clone, "git clone log", &format!("git clone {} tmpdir", &self.job.remote_url)).await?; + + if !clone_res.success() { + return Err(format!("git clone failed: {:?}", clone_res)); } + let mut git_checkout = Command::new("git"); + git_checkout + .current_dir("tmpdir") + .arg("checkout") + .arg(&self.job.commit); + + let checkout_res = self.execute_command(git_checkout, "git checkout log", &format!("git checkout {}", &self.job.commit)).await?; + + if !checkout_res.success() { + return Err(format!("git checkout failed: {:?}", checkout_res)); + } + + Ok(()) + } + + async fn execute_goodfile(&self) -> Result { + Ok("string".to_string()) + } + + async fn default_goodfile(&self) -> Result { + let mut build = Command::new("cargo"); + build + .current_dir("tmpdir") + .arg("build"); + + let build_res = self.execute_command(build, "cargo build log", "cargo build").await?; + + if !build_res.success() { + return Err(format!("cargo build failed: {:?}", build_res)); + } + + let mut test = Command::new("cargo"); + test + .current_dir("tmpdir") + .arg("test"); + + let test_res = self.execute_command(test, "cargo test log", "cargo test").await?; + + match test_res.code() { + Some(0) => Ok("pass".to_string()), + Some(n) => Ok(format!("error: {}", n)), + None => Ok(format!("abnormal exit")), + } + } + + async fn execute_command(&self, mut command: Command, name: &str, desc: &str) -> Result { + eprintln!("[.] running {}", name); let stdout_artifact = self.create_artifact( - client, &format!("{} (stdout)", name), &format!("{} (stdout)", desc) ).await.expect("works"); let stderr_artifact = self.create_artifact( - client, &format!("{} (stderr)", name), &format!("{} (stderr)", desc) ).await.expect("works"); @@ -112,54 +218,93 @@ impl RequestedJob { Ok(res) } - async fn execute_goodfile(&self, client: &mut RunnerClient) -> Result { - let mut git_clone = Command::new("git"); - git_clone - .arg("clone") - .arg(&self.remote_url) - .arg("tmpdir"); + async fn run(mut self) { + self.client.send(serde_json::json!({ + "status": "started" + })).await.unwrap(); - let clone_res = self.execute_command(client, git_clone, "git clone log", &format!("git clone {} tmpdir", &self.remote_url)).await?; + std::fs::remove_dir_all("tmpdir").unwrap(); + std::fs::create_dir("tmpdir").unwrap(); - if !clone_res.success() { - return Err(format!("git clone failed: {:?}", clone_res)); - } + self.clone_remote().await.expect("clone succeeds"); + + let ctx = Arc::new(Mutex::new(self)); + + let lua_env = JobEnv::new(&ctx); + + let metadata = std::fs::metadata("./tmpdir/goodfile"); + let res: Result = match metadata { + Ok(_) => { + match lua_env.exec_goodfile().await { + Ok(()) => { + Ok("pass".to_string()) + }, + Err(lua_err) => { + Err(("failed".to_string(), lua_err.to_string())) + } + } + }, + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + match lua_env.default_goodfile().await { + Ok(()) => { + Ok("pass".to_string()) + }, + Err(lua_err) => { + Err(("failed".to_string(), lua_err.to_string())) + } + } + }, + Err(e) => { + eprintln!("[-] error finding goodfile: {:?}", e); + Err(("failed".to_string(), "inaccessible goodfile".to_string())) + } + }; - let mut git_checkout = Command::new("git"); - git_checkout - .current_dir("tmpdir") - .arg("checkout") - .arg(&self.commit); + match res { + Ok(status) => { + eprintln!("[+] job success!"); + let status = serde_json::json!({ + "kind": "job_status", + "state": "finished", + "result": status + }); + eprintln!("reporting status: {}", status); - let checkout_res = self.execute_command(client, git_checkout, "git checkout log", &format!("git checkout {}", &self.commit)).await?; + let res = ctx.lock().unwrap().client.send(status).await; + if let Err(e) = res { + eprintln!("[!] FAILED TO REPORT JOB STATUS ({}): {:?}", "success", e); + } + } + Err((status, lua_err)) => { + eprintln!("[-] job error: {}", status); - if !checkout_res.success() { - return Err(format!("git checkout failed: {:?}", checkout_res)); + let res = ctx.lock().unwrap().client.send(serde_json::json!({ + "kind": "job_status", + "state": "interrupted", + "result": status, + "desc": lua_err.to_string(), + })).await; + if let Err(e) = res { + eprintln!("[!] FAILED TO REPORT JOB STATUS ({}): {:?}", status, e); + } + } } + } - let mut build = Command::new("cargo"); - build + async fn run_command(&self, command: &[String]) -> Result<(), String> { + let mut cmd = Command::new(&command[0]); + let human_name = command.join(" "); + cmd .current_dir("tmpdir") - .arg("build"); + .args(&command[1..]); - let build_res = self.execute_command(client, build, "cargo build log", "cargo build").await?; + let cmd_res = self.execute_command(cmd, &format!("{} log", human_name), &human_name).await?; - if !build_res.success() { - return Err(format!("cargo build failed: {:?}", build_res)); + if !cmd_res.success() { + return Err(format!("{} failed: {:?}", &human_name, cmd_res)); } - let mut test = Command::new("cargo"); - test - .current_dir("tmpdir") - .arg("test"); - - let test_res = self.execute_command(client, test, "cargo test log", "cargo test").await?; - - match test_res.code() { - Some(0) => Ok("pass".to_string()), - Some(n) => Ok(format!("error: {}", n)), - None => Ok(format!("abnormal exit")), - } + Ok(()) } } @@ -269,38 +414,6 @@ impl RunnerClient { ).await .map_err(|e| format!("send error: {:?}", e)) } - - async fn run_job(&mut self, job: RequestedJob) { - self.send(serde_json::json!({ - "status": "started" - })).await.unwrap(); - - std::fs::remove_dir_all("tmpdir").unwrap(); - std::fs::create_dir("tmpdir").unwrap(); - - let res = job.execute_goodfile(self).await; - - match res { - Ok(status) => { - eprintln!("[+] job success!"); - - self.send(serde_json::json!({ - "kind": "job_status", - "state": "finished", - "result": status - })).await.unwrap(); - } - Err(status) => { - eprintln!("[-] job error: {}", status); - - self.send(serde_json::json!({ - "kind": "job_status", - "state": "interrupted", - "result": status - })).await.unwrap(); - } - } - } } #[tokio::main] @@ -355,7 +468,9 @@ async fn main() { eprintln!("requested work: {:?}", job); eprintln!("doing {:?}", job); - client.run_job(job).await; + + let mut job = job.into_running(client); + job.run().await; std::thread::sleep(Duration::from_millis(10000)); }, Err(e) => { -- cgit v1.1