diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/ci_driver.rs | 28 | ||||
| -rw-r--r-- | src/ci_runner.rs | 291 | ||||
| -rw-r--r-- | src/dbctx.rs | 23 | ||||
| -rw-r--r-- | src/lua/mod.rs | 157 | ||||
| -rw-r--r-- | src/main.rs | 1 | ||||
| -rw-r--r-- | src/sql.rs | 17 | 
6 files changed, 420 insertions, 97 deletions
| diff --git a/src/ci_driver.rs b/src/ci_driver.rs index d70580a..3be49f4 100644 --- a/src/ci_driver.rs +++ b/src/ci_driver.rs @@ -26,6 +26,7 @@ mod sql;  mod notifier;  use crate::dbctx::{DbCtx, PendingJob}; +use crate::sql::JobResult;  use crate::sql::JobState;  fn reserve_artifacts_dir(job: u64) -> std::io::Result<PathBuf> { @@ -186,16 +187,22 @@ impl ClientJob {                          eprintln!("job update: state is {} and result is {}", state, result);                          match result {                              "pass" => { -                                (Ok("success".to_string()), JobState::Complete) +                                (Ok("success".to_string()), JobState::Finished)                              },                              other => { -                                (Err(other.to_string()), JobState::Error) +                                let desc = msg.as_object().unwrap().get("desc") +                                    .map(|x| x.as_str().unwrap().to_string()) +                                    .unwrap_or_else(|| other.to_string()); +                                (Err(desc), JobState::Error)                              }                          }                      } else if state == "interrupted" {                          let result = msg.as_object().unwrap().get("result").unwrap().as_str().unwrap();                          eprintln!("job update: state is {} and result is {}", state, result); -                        (Err(result.to_string()), JobState::Error) +                        let desc = msg.as_object().unwrap().get("desc") +                            .map(|x| x.as_str().unwrap().to_string()) +                            .unwrap_or_else(|| result.to_string()); +                        (Err(desc), JobState::Error)                      } else {                          eprintln!("job update: state is {}", state);                          (Err(format!("atypical completion status: {}", state)), JobState::Invalid) @@ -212,9 +219,19 @@ impl ClientJob {                          .expect("now is before epoch")                          .as_millis(); +                    let build_result = if result.is_ok() { +                        JobResult::Pass +                    } else { +                        JobResult::Fail +                    }; +                    let result_desc = match result { +                        Ok(msg) => msg, +                        Err(msg) => msg, +                    }; +                      self.dbctx.conn.lock().unwrap().execute( -                        "update jobs set complete_time=?1, state=?2 where id=?3", -                        (now as u64, state as u64, self.job.id) +                        "update jobs set complete_time=?1, state=?2, build_result=?3, final_status=?4 where id=?5", +                        (now as u64, state as u64, build_result as u8, result_desc, self.job.id)                      )                          .expect("can update");                  } @@ -227,7 +244,6 @@ impl ClientJob {                  },                  other => {                      eprintln!("unhandled message kind {:?} ({:?})", msg_kind, msg); -                    return;                  }              }          } diff --git a/src/ci_runner.rs b/src/ci_runner.rs index 1c2ba92..de423eb 100644 --- a/src/ci_runner.rs +++ b/src/ci_runner.rs @@ -1,4 +1,6 @@  use std::time::Duration; +use rlua::prelude::LuaError; +use std::sync::{Arc, Mutex};  use reqwest::{StatusCode, Response};  use tokio::process::Command;  use std::process::Stdio; @@ -11,6 +13,8 @@ use std::task::{Context, Poll};  use std::pin::Pin;  use std::marker::Unpin; +mod lua; +  #[derive(Debug)]  enum WorkAcquireError {      Reqwest(reqwest::Error), @@ -34,12 +38,73 @@ struct RequestedJob {  }  impl RequestedJob { +    pub fn into_running(self, client: RunnerClient) -> RunningJob { +        RunningJob { +            job: self, +            client, +        } +    } +} + +struct JobEnv { +    lua: lua::BuildEnv, +    job: Arc<Mutex<RunningJob>>, +} + +impl JobEnv { +    fn new(job: &Arc<Mutex<RunningJob>>) -> Self { +        let lua = lua::BuildEnv::new(job); +        JobEnv { +            lua, +            job: Arc::clone(job) +        } +    } + +    async fn default_goodfile(self) -> Result<(), LuaError> { +        self.lua.run_build(crate::lua::DEFAULT_RUST_GOODFILE).await +    } + +    async fn exec_goodfile(self) -> Result<(), LuaError> { +        let script = std::fs::read_to_string("./tmpdir/goodfile").unwrap(); +        self.lua.run_build(script.as_bytes()).await +    } +} + +pub struct RunningJob { +    job: RequestedJob, +    client: RunnerClient, +} + +async fn forward_data(mut source: impl AsyncRead + Unpin, mut dest: impl AsyncWrite + Unpin) -> Result<(), String> { +    let mut buf = vec![0; 1024 * 1024]; +    loop { +        let n_read = source.read(&mut buf).await +            .map_err(|e| format!("failed to read: {:?}", e))?; + +        if n_read == 0 { +            return Ok(()); +        } + +        dest.write_all(&buf[..n_read]).await +            .map_err(|e| format!("failed to write: {:?}", e))?; +    } +} + +impl RunningJob { +    async fn send_metric(&mut self, name: &str, value: String) -> Result<(), String> { +        self.client.send(serde_json::json!({ +            "kind": "metric", +            "value": value.to_string(), +        })).await +            .map_err(|e| format!("failed to send metric {}: {:?})", name, e)) +    } +      // TODO: panics if hyper finds the channel is closed. hum -    async fn create_artifact(&self, client: &mut RunnerClient, name: &str, desc: &str) -> Result<ArtifactStream, String> { +    async fn create_artifact(&self, name: &str, desc: &str) -> Result<ArtifactStream, String> {          let (mut sender, body) = hyper::Body::channel(); -        let resp = client.http.post("https://ci.butactuallyin.space:9876/api/artifact") +        let resp = self.client.http.post("https://ci.butactuallyin.space:9876/api/artifact")              .header("user-agent", "ci-butactuallyin-space-runner") -            .header("x-job-token", &self.build_token) +            .header("x-job-token", &self.job.build_token)              .header("x-artifact-name", name)              .header("x-artifact-desc", desc)              .body(body) @@ -57,30 +122,71 @@ impl RequestedJob {          }      } -    async fn execute_command(&self, client: &mut RunnerClient, mut command: Command, name: &str, desc: &str) -> Result<ExitStatus, String> { -        eprintln!("[.] running {}", name); -        async fn forward_data(mut source: impl AsyncRead + Unpin, mut dest: impl AsyncWrite + Unpin) -> Result<(), String> { -            let mut buf = vec![0; 1024 * 1024]; -            loop { -                let n_read = source.read(&mut buf).await -                    .map_err(|e| format!("failed to read: {:?}", e))?; - -                if n_read == 0 { -                    return Ok(()); -                } +    async fn clone_remote(&self) -> Result<(), String> { +        let mut git_clone = Command::new("git"); +        git_clone +            .arg("clone") +            .arg(&self.job.remote_url) +            .arg("tmpdir"); -                dest.write_all(&buf[..n_read]).await -                    .map_err(|e| format!("failed to write: {:?}", e))?; -            } +        let clone_res = self.execute_command(git_clone, "git clone log", &format!("git clone {} tmpdir", &self.job.remote_url)).await?; + +        if !clone_res.success() { +            return Err(format!("git clone failed: {:?}", clone_res));          } +        let mut git_checkout = Command::new("git"); +        git_checkout +            .current_dir("tmpdir") +            .arg("checkout") +            .arg(&self.job.commit); + +        let checkout_res = self.execute_command(git_checkout, "git checkout log", &format!("git checkout {}", &self.job.commit)).await?; + +        if !checkout_res.success() { +            return Err(format!("git checkout failed: {:?}", checkout_res)); +        } + +        Ok(()) +    } + +    async fn execute_goodfile(&self) -> Result<String, String> { +        Ok("string".to_string()) +    } + +    async fn default_goodfile(&self) -> Result<String, String> { +        let mut build = Command::new("cargo"); +        build +            .current_dir("tmpdir") +            .arg("build"); + +        let build_res = self.execute_command(build, "cargo build log", "cargo build").await?; + +        if !build_res.success() { +            return Err(format!("cargo build failed: {:?}", build_res)); +        } + +        let mut test = Command::new("cargo"); +        test +            .current_dir("tmpdir") +            .arg("test"); + +        let test_res = self.execute_command(test, "cargo test log", "cargo test").await?; + +        match test_res.code() { +            Some(0) => Ok("pass".to_string()), +            Some(n) => Ok(format!("error: {}", n)), +            None => Ok(format!("abnormal exit")), +        } +    } + +    async fn execute_command(&self, mut command: Command, name: &str, desc: &str) -> Result<ExitStatus, String> { +        eprintln!("[.] running {}", name);          let stdout_artifact = self.create_artifact( -            client,              &format!("{} (stdout)", name),              &format!("{} (stdout)", desc)          ).await.expect("works");          let stderr_artifact = self.create_artifact( -            client,              &format!("{} (stderr)", name),              &format!("{} (stderr)", desc)          ).await.expect("works"); @@ -112,54 +218,93 @@ impl RequestedJob {          Ok(res)      } -    async fn execute_goodfile(&self, client: &mut RunnerClient) -> Result<String, String> { -        let mut git_clone = Command::new("git"); -        git_clone -            .arg("clone") -            .arg(&self.remote_url) -            .arg("tmpdir"); +    async fn run(mut self) { +        self.client.send(serde_json::json!({ +            "status": "started" +        })).await.unwrap(); -        let clone_res = self.execute_command(client, git_clone, "git clone log", &format!("git clone {} tmpdir", &self.remote_url)).await?; +        std::fs::remove_dir_all("tmpdir").unwrap(); +        std::fs::create_dir("tmpdir").unwrap(); -        if !clone_res.success() { -            return Err(format!("git clone failed: {:?}", clone_res)); -        } +        self.clone_remote().await.expect("clone succeeds"); +         +        let ctx = Arc::new(Mutex::new(self)); + +        let lua_env = JobEnv::new(&ctx); + +        let metadata = std::fs::metadata("./tmpdir/goodfile"); +        let res: Result<String, (String, String)> = match metadata { +            Ok(_) => { +                match lua_env.exec_goodfile().await { +                    Ok(()) => { +                        Ok("pass".to_string()) +                    }, +                    Err(lua_err) => { +                        Err(("failed".to_string(), lua_err.to_string())) +                    } +                } +            }, +            Err(e) if e.kind() == std::io::ErrorKind::NotFound => { +                match lua_env.default_goodfile().await { +                    Ok(()) => { +                        Ok("pass".to_string()) +                    }, +                    Err(lua_err) => { +                        Err(("failed".to_string(), lua_err.to_string())) +                    } +                } +            }, +            Err(e) => { +                eprintln!("[-] error finding goodfile: {:?}", e); +                Err(("failed".to_string(), "inaccessible goodfile".to_string())) +            } +        }; -        let mut git_checkout = Command::new("git"); -        git_checkout -            .current_dir("tmpdir") -            .arg("checkout") -            .arg(&self.commit); +        match res { +            Ok(status) => { +                eprintln!("[+] job success!"); +                let status = serde_json::json!({ +                    "kind": "job_status", +                    "state": "finished", +                    "result": status +                }); +                eprintln!("reporting status: {}", status); -        let checkout_res = self.execute_command(client, git_checkout, "git checkout log", &format!("git checkout {}", &self.commit)).await?; +                let res = ctx.lock().unwrap().client.send(status).await; +                if let Err(e) = res { +                    eprintln!("[!] FAILED TO REPORT JOB STATUS ({}): {:?}", "success", e); +                } +            } +            Err((status, lua_err)) => { +                eprintln!("[-] job error: {}", status); -        if !checkout_res.success() { -            return Err(format!("git checkout failed: {:?}", checkout_res)); +                let res = ctx.lock().unwrap().client.send(serde_json::json!({ +                    "kind": "job_status", +                    "state": "interrupted", +                    "result": status, +                    "desc": lua_err.to_string(), +                })).await; +                if let Err(e) = res { +                    eprintln!("[!] FAILED TO REPORT JOB STATUS ({}): {:?}", status, e); +                } +            }          } +    } -        let mut build = Command::new("cargo"); -        build +    async fn run_command(&self, command: &[String]) -> Result<(), String> { +        let mut cmd = Command::new(&command[0]); +        let human_name = command.join(" "); +        cmd              .current_dir("tmpdir") -            .arg("build"); +            .args(&command[1..]); -        let build_res = self.execute_command(client, build, "cargo build log", "cargo build").await?; +        let cmd_res = self.execute_command(cmd, &format!("{} log", human_name), &human_name).await?; -        if !build_res.success() { -            return Err(format!("cargo build failed: {:?}", build_res)); +        if !cmd_res.success() { +            return Err(format!("{} failed: {:?}", &human_name, cmd_res));          } -        let mut test = Command::new("cargo"); -        test -            .current_dir("tmpdir") -            .arg("test"); - -        let test_res = self.execute_command(client, test, "cargo test log", "cargo test").await?; - -        match test_res.code() { -            Some(0) => Ok("pass".to_string()), -            Some(n) => Ok(format!("error: {}", n)), -            None => Ok(format!("abnormal exit")), -        } +        Ok(())      }  } @@ -269,38 +414,6 @@ impl RunnerClient {          ).await              .map_err(|e| format!("send error: {:?}", e))      } - -    async fn run_job(&mut self, job: RequestedJob) { -        self.send(serde_json::json!({ -            "status": "started" -        })).await.unwrap(); - -        std::fs::remove_dir_all("tmpdir").unwrap(); -        std::fs::create_dir("tmpdir").unwrap(); - -        let res = job.execute_goodfile(self).await; - -        match res { -            Ok(status) => { -                eprintln!("[+] job success!"); - -                self.send(serde_json::json!({ -                    "kind": "job_status", -                    "state": "finished", -                    "result": status -                })).await.unwrap(); -            } -            Err(status) => { -                eprintln!("[-] job error: {}", status); - -                self.send(serde_json::json!({ -                    "kind": "job_status", -                    "state": "interrupted", -                    "result": status -                })).await.unwrap(); -            } -        } -    }  }  #[tokio::main] @@ -355,7 +468,9 @@ async fn main() {                  eprintln!("requested work: {:?}", job);                  eprintln!("doing {:?}", job); -                client.run_job(job).await; + +                let mut job = job.into_running(client); +                job.run().await;                  std::thread::sleep(Duration::from_millis(10000));              },              Err(e) => { diff --git a/src/dbctx.rs b/src/dbctx.rs index 1bb8bc8..cb74010 100644 --- a/src/dbctx.rs +++ b/src/dbctx.rs @@ -37,6 +37,14 @@ pub enum TokenValidity {      Valid,  } +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ArtifactRecord { +    pub id: u64, +    pub job_id: u64, +    pub name: String, +    pub desc: String +} +  pub struct ArtifactDescriptor {      job_id: u64,      artifact_id: u64, @@ -257,6 +265,21 @@ impl DbCtx {          Ok(conn.last_insert_rowid() as u64)      } +    pub fn artifacts_for_job(&self, job: u64) -> Result<Vec<ArtifactRecord>, String> { +        let conn = self.conn.lock().unwrap(); + +        let mut artifacts_query = conn.prepare(sql::LAST_ARTIFACTS_FOR_JOB).unwrap(); +        let mut result = artifacts_query.query([job]).unwrap(); +        let mut artifacts = Vec::new(); + +        while let Some(row) = result.next().unwrap() { +            let (id, job_id, name, desc): (u64, u64, String, String) = row.try_into().unwrap(); +            artifacts.push(ArtifactRecord { id, job_id, name, desc }); +        } + +        Ok(artifacts) +    } +      pub fn get_pending_jobs(&self) -> Result<Vec<PendingJob>, String> {          let conn = self.conn.lock().unwrap(); diff --git a/src/lua/mod.rs b/src/lua/mod.rs new file mode 100644 index 0000000..034f766 --- /dev/null +++ b/src/lua/mod.rs @@ -0,0 +1,157 @@ +use crate::RunnerClient; +use crate::RunningJob; + +use rlua::prelude::*; + +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use tokio::process::Command; +use std::process::ExitStatus; +use std::process::Stdio; +use std::sync::{Arc, Mutex}; + +pub const DEFAULT_RUST_GOODFILE: &'static [u8] = include_bytes!("../../config/goodfiles/rust.lua"); + +pub struct BuildEnv { +    lua: Lua, +    job: Arc<Mutex<RunningJob>>, +} + +impl BuildEnv { +    pub fn new(job: &Arc<Mutex<RunningJob>>) -> Self { +        let env = BuildEnv { +            lua: Lua::new(), +            job: Arc::clone(job), +        }; +        env.lua.context(|lua_ctx| { +            env.define_env(lua_ctx) +        }).expect("can define context"); +        env +    } + +    fn define_env(&self, lua_ctx: LuaContext) -> Result<(), String> { +        let hello = lua_ctx.create_function(|_, ()| { +            eprintln!("hello from lua!!!"); +            Ok(()) +        }) +            .map_err(|e| format!("problem defining build function: {:?}", e))?; +        let job_ref = Arc::clone(&self.job); +        let build = lua_ctx.create_function(move |_, (command, params): (LuaValue, LuaValue)| { +            let job_ref: Arc<Mutex<RunningJob>> = Arc::clone(&job_ref); +            let args = match command { +                LuaValue::Table(table) => { +                    let len = table.len().expect("command table has a length"); +                    let mut command_args = Vec::new(); +                    for i in 0..len { +                        let value = table.get(i + 1).expect("command arg is gettble"); +                        match value { +                            LuaValue::String(s) => { +                                command_args.push(s.to_str().unwrap().to_owned()); +                            }, +                            other => { +                                return Err(LuaError::RuntimeError(format!("argument {} was not a string, was {:?}", i, other))); +                            } +                        }; +                    } + +                    command_args +                }, +                other => { +                    return Err(LuaError::RuntimeError(format!("argument 1 was not a table: {:?}", other))); +                } +            }; +            eprintln!("args: {:?}", args); +            let rt = tokio::runtime::Builder::new_current_thread() +                .enable_all() +                .build() +                .unwrap(); +            rt.block_on(async move { +                job_ref.lock().unwrap().run_command(&args).await +                    .map_err(|e| LuaError::RuntimeError(format!("run_command error: {:?}", e))) +            }) +        }) +            .map_err(|e| format!("problem defining build function: {:?}", e))?; + +        let job_ref = Arc::clone(&self.job); +        let metric = lua_ctx.create_function(move |_, (name, value): (String, String)| { +            let job_ref: Arc<Mutex<RunningJob>> = Arc::clone(&job_ref); +            let rt = tokio::runtime::Builder::new_current_thread() +                .enable_all() +                .build() +                .unwrap(); +            rt.block_on(async move { +                job_ref.lock().unwrap().send_metric(&name, value).await +                    .map_err(|e| LuaError::RuntimeError(format!("send_metric error: {:?}", e))) +            }) +        }) +            .map_err(|e| format!("problem defining metric function: {:?}", e))?; + +        let job_ref = Arc::clone(&self.job); +        let artifact = lua_ctx.create_function(move |_, (name, path): (String, String)| { +            let job_ref: Arc<Mutex<RunningJob>> = Arc::clone(&job_ref); +            let rt = tokio::runtime::Builder::new_current_thread() +                .enable_all() +                .build() +                .unwrap(); +            rt.block_on(async move { +                let artifact = job_ref.lock().unwrap().create_artifact(&name, &format!("{} (from {})", name, path)).await +                    .map_err(|e| LuaError::RuntimeError(format!("create_artifact error: {:?}", e))) +                    .unwrap(); +                crate::forward_data(tokio::fs::File::open(path).await.unwrap(), artifact).await +                    .map_err(|e| LuaError::RuntimeError(format!("failed uploading data for {}: {:?}", name, e))) +            }) +        }) +            .map_err(|e| format!("problem defining metric function: {:?}", e))?; + +        let error = lua_ctx.create_function(move |_, msg: String| { +            Err::<(), LuaError>(LuaError::RuntimeError(format!("explicit error: {}", msg))) +        }).unwrap(); + +        let path_has_cmd = lua_ctx.create_function(move |_, name: String| { +            Ok(std::process::Command::new("which") +                .arg(name) +                .status() +                .map_err(|e| LuaError::RuntimeError(format!("could not fork which? {:?}", e)))? +                .success()) +        }).unwrap(); + +        let size_of_file = lua_ctx.create_function(move |_, name: String| { +            Ok(std::fs::metadata(&name) +                .map_err(|e| LuaError::RuntimeError(format!("could not stat {:?}", name)))? +                .len()) +        }).unwrap(); + +        let build_environment = lua_ctx.create_table_from( +            vec![ +                ("has", path_has_cmd), +                ("size", size_of_file), +            ] +        ).unwrap(); + +        let build_functions = lua_ctx.create_table_from( +            vec![ +                ("hello", hello), +                ("run", build), +                ("metric", metric), +                ("error", error), +                ("artifact", artifact), +            ] +        ).unwrap(); +        build_functions.set("environment", build_environment).unwrap(); +        let globals = lua_ctx.globals(); +        globals.set("Build", build_functions); +        Ok(()) +    } + +    pub async fn run_build(self, script: &[u8]) -> Result<(), LuaError> { +        let script = script.to_vec(); +        let res: Result<(), LuaError> = std::thread::spawn(move || { +            self.lua.context(|lua_ctx| { +                lua_ctx.load(&script) +                    .set_name("goodfile")? +                    .exec() +            }) +        }).join().unwrap(); +        eprintln!("lua res: {:?}", res); +        res +    } +} diff --git a/src/main.rs b/src/main.rs index d8af992..ce01ecc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,6 @@  #![allow(dead_code)]  #![allow(unused_variables)] +#![allow(unused_imports)]  use tokio::spawn;  use std::path::PathBuf; @@ -3,10 +3,16 @@  use std::convert::TryFrom;  #[derive(Debug, Clone)] +pub enum JobResult { +    Pass = 0, +    Fail = 1, +} + +#[derive(Debug, Clone, PartialEq)]  pub enum JobState {      Pending = 0,      Started = 1, -    Complete = 2, +    Finished = 2,      Error = 3,      Invalid = 4,  } @@ -18,7 +24,7 @@ impl TryFrom<u8> for JobState {          match value {              0 => Ok(JobState::Pending),              1 => Ok(JobState::Started), -            2 => Ok(JobState::Complete), +            2 => Ok(JobState::Finished),              3 => Ok(JobState::Error),              4 => Ok(JobState::Invalid),              other => Err(format!("invalid job state: {}", other)), @@ -40,7 +46,9 @@ pub const CREATE_JOBS_TABLE: &'static str = "\          started_time INTEGER,          complete_time INTEGER,          job_timeout INTEGER, -        source TEXT);"; +        source TEXT, +        build_result INTEGER, +        final_status TEXT);";  pub const CREATE_COMMITS_TABLE: &'static str = "\      CREATE TABLE IF NOT EXISTS commits (id INTEGER PRIMARY KEY AUTOINCREMENT, sha TEXT UNIQUE);"; @@ -79,6 +87,9 @@ pub const CREATE_REPO_NAME_INDEX: &'static str = "\  pub const PENDING_JOBS: &'static str = "\      select id, artifacts_path, state, run_host, remote_id, commit_id, created_time, source from jobs where state=0;"; +pub const LAST_ARTIFACTS_FOR_JOB: &'static str = "\ +    select * from artifacts where job_id=?1 and (name like \"%(stderr)%\" or name like \"%(stdout)%\") order by id desc limit 2;"; +  pub const COMMIT_TO_ID: &'static str = "\      select id from commits where sha=?1;"; | 
