diff options
-rw-r--r-- | src/ci_driver.rs | 28 | ||||
-rw-r--r-- | src/ci_runner.rs | 291 | ||||
-rw-r--r-- | src/dbctx.rs | 23 | ||||
-rw-r--r-- | src/lua/mod.rs | 157 | ||||
-rw-r--r-- | src/main.rs | 1 | ||||
-rw-r--r-- | src/sql.rs | 17 |
6 files changed, 420 insertions, 97 deletions
diff --git a/src/ci_driver.rs b/src/ci_driver.rs index d70580a..3be49f4 100644 --- a/src/ci_driver.rs +++ b/src/ci_driver.rs @@ -26,6 +26,7 @@ mod sql; mod notifier; use crate::dbctx::{DbCtx, PendingJob}; +use crate::sql::JobResult; use crate::sql::JobState; fn reserve_artifacts_dir(job: u64) -> std::io::Result<PathBuf> { @@ -186,16 +187,22 @@ impl ClientJob { eprintln!("job update: state is {} and result is {}", state, result); match result { "pass" => { - (Ok("success".to_string()), JobState::Complete) + (Ok("success".to_string()), JobState::Finished) }, other => { - (Err(other.to_string()), JobState::Error) + let desc = msg.as_object().unwrap().get("desc") + .map(|x| x.as_str().unwrap().to_string()) + .unwrap_or_else(|| other.to_string()); + (Err(desc), JobState::Error) } } } else if state == "interrupted" { let result = msg.as_object().unwrap().get("result").unwrap().as_str().unwrap(); eprintln!("job update: state is {} and result is {}", state, result); - (Err(result.to_string()), JobState::Error) + let desc = msg.as_object().unwrap().get("desc") + .map(|x| x.as_str().unwrap().to_string()) + .unwrap_or_else(|| result.to_string()); + (Err(desc), JobState::Error) } else { eprintln!("job update: state is {}", state); (Err(format!("atypical completion status: {}", state)), JobState::Invalid) @@ -212,9 +219,19 @@ impl ClientJob { .expect("now is before epoch") .as_millis(); + let build_result = if result.is_ok() { + JobResult::Pass + } else { + JobResult::Fail + }; + let result_desc = match result { + Ok(msg) => msg, + Err(msg) => msg, + }; + self.dbctx.conn.lock().unwrap().execute( - "update jobs set complete_time=?1, state=?2 where id=?3", - (now as u64, state as u64, self.job.id) + "update jobs set complete_time=?1, state=?2, build_result=?3, final_status=?4 where id=?5", + (now as u64, state as u64, build_result as u8, result_desc, self.job.id) ) .expect("can update"); } @@ -227,7 +244,6 @@ impl ClientJob { }, other => { eprintln!("unhandled message kind {:?} ({:?})", msg_kind, msg); - return; } } } diff --git a/src/ci_runner.rs b/src/ci_runner.rs index 1c2ba92..de423eb 100644 --- a/src/ci_runner.rs +++ b/src/ci_runner.rs @@ -1,4 +1,6 @@ use std::time::Duration; +use rlua::prelude::LuaError; +use std::sync::{Arc, Mutex}; use reqwest::{StatusCode, Response}; use tokio::process::Command; use std::process::Stdio; @@ -11,6 +13,8 @@ use std::task::{Context, Poll}; use std::pin::Pin; use std::marker::Unpin; +mod lua; + #[derive(Debug)] enum WorkAcquireError { Reqwest(reqwest::Error), @@ -34,12 +38,73 @@ struct RequestedJob { } impl RequestedJob { + pub fn into_running(self, client: RunnerClient) -> RunningJob { + RunningJob { + job: self, + client, + } + } +} + +struct JobEnv { + lua: lua::BuildEnv, + job: Arc<Mutex<RunningJob>>, +} + +impl JobEnv { + fn new(job: &Arc<Mutex<RunningJob>>) -> Self { + let lua = lua::BuildEnv::new(job); + JobEnv { + lua, + job: Arc::clone(job) + } + } + + async fn default_goodfile(self) -> Result<(), LuaError> { + self.lua.run_build(crate::lua::DEFAULT_RUST_GOODFILE).await + } + + async fn exec_goodfile(self) -> Result<(), LuaError> { + let script = std::fs::read_to_string("./tmpdir/goodfile").unwrap(); + self.lua.run_build(script.as_bytes()).await + } +} + +pub struct RunningJob { + job: RequestedJob, + client: RunnerClient, +} + +async fn forward_data(mut source: impl AsyncRead + Unpin, mut dest: impl AsyncWrite + Unpin) -> Result<(), String> { + let mut buf = vec![0; 1024 * 1024]; + loop { + let n_read = source.read(&mut buf).await + .map_err(|e| format!("failed to read: {:?}", e))?; + + if n_read == 0 { + return Ok(()); + } + + dest.write_all(&buf[..n_read]).await + .map_err(|e| format!("failed to write: {:?}", e))?; + } +} + +impl RunningJob { + async fn send_metric(&mut self, name: &str, value: String) -> Result<(), String> { + self.client.send(serde_json::json!({ + "kind": "metric", + "value": value.to_string(), + })).await + .map_err(|e| format!("failed to send metric {}: {:?})", name, e)) + } + // TODO: panics if hyper finds the channel is closed. hum - async fn create_artifact(&self, client: &mut RunnerClient, name: &str, desc: &str) -> Result<ArtifactStream, String> { + async fn create_artifact(&self, name: &str, desc: &str) -> Result<ArtifactStream, String> { let (mut sender, body) = hyper::Body::channel(); - let resp = client.http.post("https://ci.butactuallyin.space:9876/api/artifact") + let resp = self.client.http.post("https://ci.butactuallyin.space:9876/api/artifact") .header("user-agent", "ci-butactuallyin-space-runner") - .header("x-job-token", &self.build_token) + .header("x-job-token", &self.job.build_token) .header("x-artifact-name", name) .header("x-artifact-desc", desc) .body(body) @@ -57,30 +122,71 @@ impl RequestedJob { } } - async fn execute_command(&self, client: &mut RunnerClient, mut command: Command, name: &str, desc: &str) -> Result<ExitStatus, String> { - eprintln!("[.] running {}", name); - async fn forward_data(mut source: impl AsyncRead + Unpin, mut dest: impl AsyncWrite + Unpin) -> Result<(), String> { - let mut buf = vec![0; 1024 * 1024]; - loop { - let n_read = source.read(&mut buf).await - .map_err(|e| format!("failed to read: {:?}", e))?; - - if n_read == 0 { - return Ok(()); - } + async fn clone_remote(&self) -> Result<(), String> { + let mut git_clone = Command::new("git"); + git_clone + .arg("clone") + .arg(&self.job.remote_url) + .arg("tmpdir"); - dest.write_all(&buf[..n_read]).await - .map_err(|e| format!("failed to write: {:?}", e))?; - } + let clone_res = self.execute_command(git_clone, "git clone log", &format!("git clone {} tmpdir", &self.job.remote_url)).await?; + + if !clone_res.success() { + return Err(format!("git clone failed: {:?}", clone_res)); } + let mut git_checkout = Command::new("git"); + git_checkout + .current_dir("tmpdir") + .arg("checkout") + .arg(&self.job.commit); + + let checkout_res = self.execute_command(git_checkout, "git checkout log", &format!("git checkout {}", &self.job.commit)).await?; + + if !checkout_res.success() { + return Err(format!("git checkout failed: {:?}", checkout_res)); + } + + Ok(()) + } + + async fn execute_goodfile(&self) -> Result<String, String> { + Ok("string".to_string()) + } + + async fn default_goodfile(&self) -> Result<String, String> { + let mut build = Command::new("cargo"); + build + .current_dir("tmpdir") + .arg("build"); + + let build_res = self.execute_command(build, "cargo build log", "cargo build").await?; + + if !build_res.success() { + return Err(format!("cargo build failed: {:?}", build_res)); + } + + let mut test = Command::new("cargo"); + test + .current_dir("tmpdir") + .arg("test"); + + let test_res = self.execute_command(test, "cargo test log", "cargo test").await?; + + match test_res.code() { + Some(0) => Ok("pass".to_string()), + Some(n) => Ok(format!("error: {}", n)), + None => Ok(format!("abnormal exit")), + } + } + + async fn execute_command(&self, mut command: Command, name: &str, desc: &str) -> Result<ExitStatus, String> { + eprintln!("[.] running {}", name); let stdout_artifact = self.create_artifact( - client, &format!("{} (stdout)", name), &format!("{} (stdout)", desc) ).await.expect("works"); let stderr_artifact = self.create_artifact( - client, &format!("{} (stderr)", name), &format!("{} (stderr)", desc) ).await.expect("works"); @@ -112,54 +218,93 @@ impl RequestedJob { Ok(res) } - async fn execute_goodfile(&self, client: &mut RunnerClient) -> Result<String, String> { - let mut git_clone = Command::new("git"); - git_clone - .arg("clone") - .arg(&self.remote_url) - .arg("tmpdir"); + async fn run(mut self) { + self.client.send(serde_json::json!({ + "status": "started" + })).await.unwrap(); - let clone_res = self.execute_command(client, git_clone, "git clone log", &format!("git clone {} tmpdir", &self.remote_url)).await?; + std::fs::remove_dir_all("tmpdir").unwrap(); + std::fs::create_dir("tmpdir").unwrap(); - if !clone_res.success() { - return Err(format!("git clone failed: {:?}", clone_res)); - } + self.clone_remote().await.expect("clone succeeds"); + + let ctx = Arc::new(Mutex::new(self)); + + let lua_env = JobEnv::new(&ctx); + + let metadata = std::fs::metadata("./tmpdir/goodfile"); + let res: Result<String, (String, String)> = match metadata { + Ok(_) => { + match lua_env.exec_goodfile().await { + Ok(()) => { + Ok("pass".to_string()) + }, + Err(lua_err) => { + Err(("failed".to_string(), lua_err.to_string())) + } + } + }, + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + match lua_env.default_goodfile().await { + Ok(()) => { + Ok("pass".to_string()) + }, + Err(lua_err) => { + Err(("failed".to_string(), lua_err.to_string())) + } + } + }, + Err(e) => { + eprintln!("[-] error finding goodfile: {:?}", e); + Err(("failed".to_string(), "inaccessible goodfile".to_string())) + } + }; - let mut git_checkout = Command::new("git"); - git_checkout - .current_dir("tmpdir") - .arg("checkout") - .arg(&self.commit); + match res { + Ok(status) => { + eprintln!("[+] job success!"); + let status = serde_json::json!({ + "kind": "job_status", + "state": "finished", + "result": status + }); + eprintln!("reporting status: {}", status); - let checkout_res = self.execute_command(client, git_checkout, "git checkout log", &format!("git checkout {}", &self.commit)).await?; + let res = ctx.lock().unwrap().client.send(status).await; + if let Err(e) = res { + eprintln!("[!] FAILED TO REPORT JOB STATUS ({}): {:?}", "success", e); + } + } + Err((status, lua_err)) => { + eprintln!("[-] job error: {}", status); - if !checkout_res.success() { - return Err(format!("git checkout failed: {:?}", checkout_res)); + let res = ctx.lock().unwrap().client.send(serde_json::json!({ + "kind": "job_status", + "state": "interrupted", + "result": status, + "desc": lua_err.to_string(), + })).await; + if let Err(e) = res { + eprintln!("[!] FAILED TO REPORT JOB STATUS ({}): {:?}", status, e); + } + } } + } - let mut build = Command::new("cargo"); - build + async fn run_command(&self, command: &[String]) -> Result<(), String> { + let mut cmd = Command::new(&command[0]); + let human_name = command.join(" "); + cmd .current_dir("tmpdir") - .arg("build"); + .args(&command[1..]); - let build_res = self.execute_command(client, build, "cargo build log", "cargo build").await?; + let cmd_res = self.execute_command(cmd, &format!("{} log", human_name), &human_name).await?; - if !build_res.success() { - return Err(format!("cargo build failed: {:?}", build_res)); + if !cmd_res.success() { + return Err(format!("{} failed: {:?}", &human_name, cmd_res)); } - let mut test = Command::new("cargo"); - test - .current_dir("tmpdir") - .arg("test"); - - let test_res = self.execute_command(client, test, "cargo test log", "cargo test").await?; - - match test_res.code() { - Some(0) => Ok("pass".to_string()), - Some(n) => Ok(format!("error: {}", n)), - None => Ok(format!("abnormal exit")), - } + Ok(()) } } @@ -269,38 +414,6 @@ impl RunnerClient { ).await .map_err(|e| format!("send error: {:?}", e)) } - - async fn run_job(&mut self, job: RequestedJob) { - self.send(serde_json::json!({ - "status": "started" - })).await.unwrap(); - - std::fs::remove_dir_all("tmpdir").unwrap(); - std::fs::create_dir("tmpdir").unwrap(); - - let res = job.execute_goodfile(self).await; - - match res { - Ok(status) => { - eprintln!("[+] job success!"); - - self.send(serde_json::json!({ - "kind": "job_status", - "state": "finished", - "result": status - })).await.unwrap(); - } - Err(status) => { - eprintln!("[-] job error: {}", status); - - self.send(serde_json::json!({ - "kind": "job_status", - "state": "interrupted", - "result": status - })).await.unwrap(); - } - } - } } #[tokio::main] @@ -355,7 +468,9 @@ async fn main() { eprintln!("requested work: {:?}", job); eprintln!("doing {:?}", job); - client.run_job(job).await; + + let mut job = job.into_running(client); + job.run().await; std::thread::sleep(Duration::from_millis(10000)); }, Err(e) => { diff --git a/src/dbctx.rs b/src/dbctx.rs index 1bb8bc8..cb74010 100644 --- a/src/dbctx.rs +++ b/src/dbctx.rs @@ -37,6 +37,14 @@ pub enum TokenValidity { Valid, } +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ArtifactRecord { + pub id: u64, + pub job_id: u64, + pub name: String, + pub desc: String +} + pub struct ArtifactDescriptor { job_id: u64, artifact_id: u64, @@ -257,6 +265,21 @@ impl DbCtx { Ok(conn.last_insert_rowid() as u64) } + pub fn artifacts_for_job(&self, job: u64) -> Result<Vec<ArtifactRecord>, String> { + let conn = self.conn.lock().unwrap(); + + let mut artifacts_query = conn.prepare(sql::LAST_ARTIFACTS_FOR_JOB).unwrap(); + let mut result = artifacts_query.query([job]).unwrap(); + let mut artifacts = Vec::new(); + + while let Some(row) = result.next().unwrap() { + let (id, job_id, name, desc): (u64, u64, String, String) = row.try_into().unwrap(); + artifacts.push(ArtifactRecord { id, job_id, name, desc }); + } + + Ok(artifacts) + } + pub fn get_pending_jobs(&self) -> Result<Vec<PendingJob>, String> { let conn = self.conn.lock().unwrap(); diff --git a/src/lua/mod.rs b/src/lua/mod.rs new file mode 100644 index 0000000..034f766 --- /dev/null +++ b/src/lua/mod.rs @@ -0,0 +1,157 @@ +use crate::RunnerClient; +use crate::RunningJob; + +use rlua::prelude::*; + +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use tokio::process::Command; +use std::process::ExitStatus; +use std::process::Stdio; +use std::sync::{Arc, Mutex}; + +pub const DEFAULT_RUST_GOODFILE: &'static [u8] = include_bytes!("../../config/goodfiles/rust.lua"); + +pub struct BuildEnv { + lua: Lua, + job: Arc<Mutex<RunningJob>>, +} + +impl BuildEnv { + pub fn new(job: &Arc<Mutex<RunningJob>>) -> Self { + let env = BuildEnv { + lua: Lua::new(), + job: Arc::clone(job), + }; + env.lua.context(|lua_ctx| { + env.define_env(lua_ctx) + }).expect("can define context"); + env + } + + fn define_env(&self, lua_ctx: LuaContext) -> Result<(), String> { + let hello = lua_ctx.create_function(|_, ()| { + eprintln!("hello from lua!!!"); + Ok(()) + }) + .map_err(|e| format!("problem defining build function: {:?}", e))?; + let job_ref = Arc::clone(&self.job); + let build = lua_ctx.create_function(move |_, (command, params): (LuaValue, LuaValue)| { + let job_ref: Arc<Mutex<RunningJob>> = Arc::clone(&job_ref); + let args = match command { + LuaValue::Table(table) => { + let len = table.len().expect("command table has a length"); + let mut command_args = Vec::new(); + for i in 0..len { + let value = table.get(i + 1).expect("command arg is gettble"); + match value { + LuaValue::String(s) => { + command_args.push(s.to_str().unwrap().to_owned()); + }, + other => { + return Err(LuaError::RuntimeError(format!("argument {} was not a string, was {:?}", i, other))); + } + }; + } + + command_args + }, + other => { + return Err(LuaError::RuntimeError(format!("argument 1 was not a table: {:?}", other))); + } + }; + eprintln!("args: {:?}", args); + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + rt.block_on(async move { + job_ref.lock().unwrap().run_command(&args).await + .map_err(|e| LuaError::RuntimeError(format!("run_command error: {:?}", e))) + }) + }) + .map_err(|e| format!("problem defining build function: {:?}", e))?; + + let job_ref = Arc::clone(&self.job); + let metric = lua_ctx.create_function(move |_, (name, value): (String, String)| { + let job_ref: Arc<Mutex<RunningJob>> = Arc::clone(&job_ref); + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + rt.block_on(async move { + job_ref.lock().unwrap().send_metric(&name, value).await + .map_err(|e| LuaError::RuntimeError(format!("send_metric error: {:?}", e))) + }) + }) + .map_err(|e| format!("problem defining metric function: {:?}", e))?; + + let job_ref = Arc::clone(&self.job); + let artifact = lua_ctx.create_function(move |_, (name, path): (String, String)| { + let job_ref: Arc<Mutex<RunningJob>> = Arc::clone(&job_ref); + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + rt.block_on(async move { + let artifact = job_ref.lock().unwrap().create_artifact(&name, &format!("{} (from {})", name, path)).await + .map_err(|e| LuaError::RuntimeError(format!("create_artifact error: {:?}", e))) + .unwrap(); + crate::forward_data(tokio::fs::File::open(path).await.unwrap(), artifact).await + .map_err(|e| LuaError::RuntimeError(format!("failed uploading data for {}: {:?}", name, e))) + }) + }) + .map_err(|e| format!("problem defining metric function: {:?}", e))?; + + let error = lua_ctx.create_function(move |_, msg: String| { + Err::<(), LuaError>(LuaError::RuntimeError(format!("explicit error: {}", msg))) + }).unwrap(); + + let path_has_cmd = lua_ctx.create_function(move |_, name: String| { + Ok(std::process::Command::new("which") + .arg(name) + .status() + .map_err(|e| LuaError::RuntimeError(format!("could not fork which? {:?}", e)))? + .success()) + }).unwrap(); + + let size_of_file = lua_ctx.create_function(move |_, name: String| { + Ok(std::fs::metadata(&name) + .map_err(|e| LuaError::RuntimeError(format!("could not stat {:?}", name)))? + .len()) + }).unwrap(); + + let build_environment = lua_ctx.create_table_from( + vec![ + ("has", path_has_cmd), + ("size", size_of_file), + ] + ).unwrap(); + + let build_functions = lua_ctx.create_table_from( + vec![ + ("hello", hello), + ("run", build), + ("metric", metric), + ("error", error), + ("artifact", artifact), + ] + ).unwrap(); + build_functions.set("environment", build_environment).unwrap(); + let globals = lua_ctx.globals(); + globals.set("Build", build_functions); + Ok(()) + } + + pub async fn run_build(self, script: &[u8]) -> Result<(), LuaError> { + let script = script.to_vec(); + let res: Result<(), LuaError> = std::thread::spawn(move || { + self.lua.context(|lua_ctx| { + lua_ctx.load(&script) + .set_name("goodfile")? + .exec() + }) + }).join().unwrap(); + eprintln!("lua res: {:?}", res); + res + } +} diff --git a/src/main.rs b/src/main.rs index d8af992..ce01ecc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,6 @@ #![allow(dead_code)] #![allow(unused_variables)] +#![allow(unused_imports)] use tokio::spawn; use std::path::PathBuf; @@ -3,10 +3,16 @@ use std::convert::TryFrom; #[derive(Debug, Clone)] +pub enum JobResult { + Pass = 0, + Fail = 1, +} + +#[derive(Debug, Clone, PartialEq)] pub enum JobState { Pending = 0, Started = 1, - Complete = 2, + Finished = 2, Error = 3, Invalid = 4, } @@ -18,7 +24,7 @@ impl TryFrom<u8> for JobState { match value { 0 => Ok(JobState::Pending), 1 => Ok(JobState::Started), - 2 => Ok(JobState::Complete), + 2 => Ok(JobState::Finished), 3 => Ok(JobState::Error), 4 => Ok(JobState::Invalid), other => Err(format!("invalid job state: {}", other)), @@ -40,7 +46,9 @@ pub const CREATE_JOBS_TABLE: &'static str = "\ started_time INTEGER, complete_time INTEGER, job_timeout INTEGER, - source TEXT);"; + source TEXT, + build_result INTEGER, + final_status TEXT);"; pub const CREATE_COMMITS_TABLE: &'static str = "\ CREATE TABLE IF NOT EXISTS commits (id INTEGER PRIMARY KEY AUTOINCREMENT, sha TEXT UNIQUE);"; @@ -79,6 +87,9 @@ pub const CREATE_REPO_NAME_INDEX: &'static str = "\ pub const PENDING_JOBS: &'static str = "\ select id, artifacts_path, state, run_host, remote_id, commit_id, created_time, source from jobs where state=0;"; +pub const LAST_ARTIFACTS_FOR_JOB: &'static str = "\ + select * from artifacts where job_id=?1 and (name like \"%(stderr)%\" or name like \"%(stdout)%\") order by id desc limit 2;"; + pub const COMMIT_TO_ID: &'static str = "\ select id from commits where sha=?1;"; |