diff options
| -rw-r--r-- | src/ci_runner.rs | 67 | ||||
| -rw-r--r-- | src/io.rs | 41 | ||||
| -rw-r--r-- | src/lua/mod.rs | 57 | 
3 files changed, 142 insertions, 23 deletions
| diff --git a/src/ci_runner.rs b/src/ci_runner.rs index c6fc838..a4d15ef 100644 --- a/src/ci_runner.rs +++ b/src/ci_runner.rs @@ -17,8 +17,9 @@ mod protocol;  mod lua;  mod io; -use crate::io::ArtifactStream; +use crate::io::{ArtifactStream, VecSink};  use crate::protocol::{ClientProto, CommandInfo, TaskInfo, RequestedJob}; +use crate::lua::CommandOutput;  #[derive(Debug)]  enum WorkAcquireError { @@ -144,7 +145,7 @@ impl RunningJob {              .arg(&self.job.remote_url)              .arg("tmpdir"); -        let clone_res = self.execute_command(git_clone, "git clone log", &format!("git clone {} tmpdir", &self.job.remote_url)).await +        let clone_res = self.execute_command_and_report(git_clone, "git clone log", &format!("git clone {} tmpdir", &self.job.remote_url)).await              .map_err(|e| {                  eprintln!("stringy error (exec failed?) for clone: {}", e);                  RepoError::CloneFailedIdk { exit_code: ExitStatus::from_raw(0) } @@ -160,7 +161,7 @@ impl RunningJob {              .arg("checkout")              .arg(&self.job.commit); -        let checkout_res = self.execute_command(git_checkout, "git checkout log", &format!("git checkout {}", &self.job.commit)).await +        let checkout_res = self.execute_command_and_report(git_checkout, "git checkout log", &format!("git checkout {}", &self.job.commit)).await              .map_err(|e| {                  eprintln!("stringy error (exec failed?) for checkout: {}", e);                  RepoError::CheckoutFailedIdk { exit_code: ExitStatus::from_raw(0) } @@ -177,17 +178,37 @@ impl RunningJob {          Ok(())      } -    async fn execute_command(&self, mut command: Command, name: &str, desc: &str) -> Result<ExitStatus, String> { -        eprintln!("[.] running {}", name); -        let mut stdout_artifact = self.create_artifact( +    async fn execute_command_and_report(&self, mut command: Command, name: &str, desc: &str) -> Result<ExitStatus, String> { +        let stdout_artifact = self.create_artifact(              &format!("{} (stdout)", name),              &format!("{} (stdout)", desc)          ).await.expect("works"); -        let mut stderr_artifact = self.create_artifact( +        let stderr_artifact = self.create_artifact(              &format!("{} (stderr)", name),              &format!("{} (stderr)", desc)          ).await.expect("works"); +        let exit_status = self.execute_command(command, name, desc, stdout_artifact, stderr_artifact).await?; + +        Ok(exit_status) +    } + +    async fn execute_command_capture_output(&self, mut command: Command, name: &str, desc: &str) -> Result<crate::lua::CommandOutput, String> { +        let stdout_collector = VecSink::new(); +        let stderr_collector = VecSink::new(); + +        let exit_status = self.execute_command(command, name, desc, stdout_collector.clone(), stderr_collector.clone()).await?; + +        Ok(CommandOutput { +            exit_status, +            stdout: stdout_collector.take_buf(), +            stderr: stderr_collector.take_buf(), +        }) +    } + +    async fn execute_command(&self, mut command: Command, name: &str, desc: &str, mut stdout_reporter: impl AsyncWrite + Unpin + Send + 'static, mut stderr_reporter: impl AsyncWrite + Unpin + Send + 'static) -> Result<ExitStatus, String> { +        eprintln!("[.] running {}", name); +          let mut child = command              .stdin(Stdio::null())              .stdout(Stdio::piped()) @@ -199,9 +220,9 @@ impl RunningJob {          let mut child_stderr = child.stderr.take().unwrap();          eprintln!("[.] '{}': forwarding stdout", name); -        tokio::spawn(async move { crate::io::forward_data(&mut child_stdout, &mut stdout_artifact).await }); +        tokio::spawn(async move { crate::io::forward_data(&mut child_stdout, &mut stdout_reporter).await });          eprintln!("[.] '{}': forwarding stderr", name); -        tokio::spawn(async move { crate::io::forward_data(&mut child_stderr, &mut stderr_artifact).await }); +        tokio::spawn(async move { crate::io::forward_data(&mut child_stderr, &mut stderr_reporter).await });          let res = child.wait().await              .map_err(|e| format!("failed to wait? {:?}", e))?; @@ -291,10 +312,7 @@ impl RunningJob {          }      } -    async fn run_command(&mut self, command: &[String], working_dir: Option<&str>) -> Result<(), String> { -        self.client.send_typed(&ClientProto::command(CommandInfo::started(command, working_dir, 1))) -            .await.unwrap(); - +    fn prep_command(command: &[String], working_dir: Option<&str>) -> (Command, String) {          let mut cmd = Command::new(&command[0]);          let cwd = match working_dir {              Some(dir) => { @@ -304,13 +322,32 @@ impl RunningJob {                  "tmpdir".to_string()              }          }; -        eprintln!("running {:?} in {}", &command, &cwd); +        eprintln!("prepared {:?} to run in {}", &command, &cwd);          let human_name = command.join(" ");          cmd              .current_dir(cwd)              .args(&command[1..]); +        (cmd, human_name) +    } + +    async fn run_with_output(&mut self, command: &[String], working_dir: Option<&str>) -> Result<CommandOutput, String> { +        let (cmd, human_name) = Self::prep_command(command, working_dir); + +        let cmd_res = self.execute_command_capture_output(cmd, &format!("{} log", human_name), &human_name).await?; + +        if !cmd_res.exit_status.success() { +            return Err(format!("{} failed: {:?}", &human_name, cmd_res.exit_status)); +        } +        Ok(cmd_res) +    } + +    async fn run_command(&mut self, command: &[String], working_dir: Option<&str>) -> Result<(), String> { +        self.client.send_typed(&ClientProto::command(CommandInfo::started(command, working_dir, 1))) +            .await.unwrap(); + +        let (cmd, human_name) = Self::prep_command(command, working_dir); -        let cmd_res = self.execute_command(cmd, &format!("{} log", human_name), &human_name).await?; +        let cmd_res = self.execute_command_and_report(cmd, &format!("{} log", human_name), &human_name).await?;          self.client.send_typed(&ClientProto::command(CommandInfo::finished(cmd_res.code(), 1)))              .await.unwrap(); @@ -6,6 +6,7 @@ use tokio::fs::OpenOptions;  use std::task::{Poll, Context};  use std::pin::Pin;  use std::time::{UNIX_EPOCH, SystemTime}; +use std::sync::{Arc, Mutex};  pub fn now_ms() -> u64 {      SystemTime::now() @@ -14,6 +15,46 @@ pub fn now_ms() -> u64 {          .as_millis() as u64  } +#[derive(Clone)] +pub struct VecSink { +    body: Arc<Mutex<Vec<u8>>>, +} + +impl VecSink { +    pub fn new() -> Self { +        Self { body: Arc::new(Mutex::new(Vec::new())) } +    } + +    pub fn take_buf(&self) -> Vec<u8> { +        std::mem::replace(&mut *self.body.lock().unwrap(), Vec::new()) +    } +} + +impl tokio::io::AsyncWrite for VecSink { +    fn poll_write( +        self: Pin<&mut Self>, +        cx: &mut Context, +        buf: &[u8] +    ) -> Poll<Result<usize, std::io::Error>> { +        self.body.lock().unwrap().extend_from_slice(buf); +        Poll::Ready(Ok(buf.len())) +    } + +    fn poll_flush( +        self: Pin<&mut Self>, +        _cx: &mut Context +    ) -> Poll<Result<(), std::io::Error>> { +        Poll::Ready(Ok(())) +    } + +    fn poll_shutdown( +        self: Pin<&mut Self>, +        _cx: &mut Context +    ) -> Poll<Result<(), std::io::Error>> { +        Poll::Ready(Ok(())) +    } +} +  pub struct ArtifactStream {      sender: hyper::body::Sender,  } diff --git a/src/lua/mod.rs b/src/lua/mod.rs index 46e8047..1b86582 100644 --- a/src/lua/mod.rs +++ b/src/lua/mod.rs @@ -12,15 +12,29 @@ pub struct BuildEnv {      job: Arc<Mutex<RunningJob>>,  } +#[derive(Debug)] +pub struct RunParams { +    step: Option<String>, +    name: Option<String>, +    cwd: Option<String>, +} + +pub struct CommandOutput { +    pub exit_status: std::process::ExitStatus, +    pub stdout: Vec<u8>, +    pub stderr: Vec<u8>, +} +  mod lua_exports {      use crate::RunningJob; +    use crate::lua::{CommandOutput, RunParams};      use std::sync::{Arc, Mutex};      use std::path::PathBuf;      use rlua::prelude::*; -    pub fn build_command_impl(command: LuaValue, params: LuaValue, job_ctx: Arc<Mutex<RunningJob>>) -> Result<(), rlua::Error> { +    pub fn collect_build_args(command: LuaValue, params: LuaValue) -> Result<(Vec<String>, RunParams), rlua::Error> {          let args = match command {              LuaValue::Table(table) => {                  let len = table.len().expect("command table has a length"); @@ -44,13 +58,6 @@ mod lua_exports {              }          }; -        #[derive(Debug)] -        struct RunParams { -            step: Option<String>, -            name: Option<String>, -            cwd: Option<String>, -        } -          let params = match params {              LuaValue::Table(table) => {                  let step = match table.get("step").expect("can get from table") { @@ -104,6 +111,12 @@ mod lua_exports {                  return Err(LuaError::RuntimeError(format!("argument 2 was not a table: {:?}", other)));              }          }; + +        Ok((args, params)) +    } + +    pub fn build_command_impl(command: LuaValue, params: LuaValue, job_ctx: Arc<Mutex<RunningJob>>) -> Result<(), rlua::Error> { +        let (args, params) = collect_build_args(command, params)?;          eprintln!("args: {:?}", args);          eprintln!("  params: {:?}", params);          let rt = tokio::runtime::Builder::new_current_thread() @@ -116,6 +129,29 @@ mod lua_exports {          })      } +    pub fn check_output_impl<'lua>(ctx: rlua::Context<'lua>, command: LuaValue<'lua>, params: LuaValue<'lua>, job_ctx: Arc<Mutex<RunningJob>>) -> Result<rlua::Table<'lua>, rlua::Error> { +        let (args, params) = collect_build_args(command, params)?; +        eprintln!("args: {:?}", args); +        eprintln!("  params: {:?}", params); +        let rt = tokio::runtime::Builder::new_current_thread() +            .enable_all() +            .build() +            .unwrap(); +        let command_output = rt.block_on(async move { +            job_ctx.lock().unwrap().run_with_output(&args, params.cwd.as_ref().map(|x| x.as_str())).await +                .map_err(|e| LuaError::RuntimeError(format!("run_command error: {:?}", e))) +        })?; + +        let stdout = ctx.create_string(command_output.stdout.as_slice())?; +        let stderr = ctx.create_string(command_output.stderr.as_slice())?; + +        let result = ctx.create_table()?; +        result.set("stdout", stdout)?; +        result.set("stderr", stderr)?; +        result.set("status", command_output.exit_status.code())?; +        Ok(result) +    } +      pub fn metric(name: String, value: String, job_ctx: Arc<Mutex<RunningJob>>) -> Result<(), rlua::Error> {          let rt = tokio::runtime::Builder::new_current_thread()              .enable_all() @@ -251,6 +287,10 @@ impl BuildEnv {              lua_exports::build_command_impl(command, params, job_ref)          })?; +        let check_output = decl_env.create_function("check_output", move |ctx, job_ref, (command, params): (LuaValue, LuaValue)| { +            lua_exports::check_output_impl(ctx, command, params, job_ref) +        })?; +          let metric = decl_env.create_function("metric", move |_, job_ref, (name, value): (String, String)| {              lua_exports::metric(name, value, job_ref)          })?; @@ -301,6 +341,7 @@ impl BuildEnv {                  ("error", error),                  ("artifact", artifact),                  ("now_ms", now_ms), +                ("check_output", check_output),              ]          ).unwrap();          build_functions.set("environment", build_environment).unwrap(); | 
