From 00b72715aa496363dd3c783bd5b2ff965869c8c3 Mon Sep 17 00:00:00 2001 From: iximeow Date: Mon, 3 Jul 2023 14:18:45 -0700 Subject: expose function for lua to run commands and collect output onward, `rustc --version`!! --- src/ci_runner.rs | 67 +++++++++++++++++++++++++++++++++++++++++++------------- src/io.rs | 41 ++++++++++++++++++++++++++++++++++ src/lua/mod.rs | 57 ++++++++++++++++++++++++++++++++++++++++------- 3 files changed, 142 insertions(+), 23 deletions(-) (limited to 'src') diff --git a/src/ci_runner.rs b/src/ci_runner.rs index c6fc838..a4d15ef 100644 --- a/src/ci_runner.rs +++ b/src/ci_runner.rs @@ -17,8 +17,9 @@ mod protocol; mod lua; mod io; -use crate::io::ArtifactStream; +use crate::io::{ArtifactStream, VecSink}; use crate::protocol::{ClientProto, CommandInfo, TaskInfo, RequestedJob}; +use crate::lua::CommandOutput; #[derive(Debug)] enum WorkAcquireError { @@ -144,7 +145,7 @@ impl RunningJob { .arg(&self.job.remote_url) .arg("tmpdir"); - let clone_res = self.execute_command(git_clone, "git clone log", &format!("git clone {} tmpdir", &self.job.remote_url)).await + let clone_res = self.execute_command_and_report(git_clone, "git clone log", &format!("git clone {} tmpdir", &self.job.remote_url)).await .map_err(|e| { eprintln!("stringy error (exec failed?) for clone: {}", e); RepoError::CloneFailedIdk { exit_code: ExitStatus::from_raw(0) } @@ -160,7 +161,7 @@ impl RunningJob { .arg("checkout") .arg(&self.job.commit); - let checkout_res = self.execute_command(git_checkout, "git checkout log", &format!("git checkout {}", &self.job.commit)).await + let checkout_res = self.execute_command_and_report(git_checkout, "git checkout log", &format!("git checkout {}", &self.job.commit)).await .map_err(|e| { eprintln!("stringy error (exec failed?) for checkout: {}", e); RepoError::CheckoutFailedIdk { exit_code: ExitStatus::from_raw(0) } @@ -177,17 +178,37 @@ impl RunningJob { Ok(()) } - async fn execute_command(&self, mut command: Command, name: &str, desc: &str) -> Result { - eprintln!("[.] running {}", name); - let mut stdout_artifact = self.create_artifact( + async fn execute_command_and_report(&self, mut command: Command, name: &str, desc: &str) -> Result { + let stdout_artifact = self.create_artifact( &format!("{} (stdout)", name), &format!("{} (stdout)", desc) ).await.expect("works"); - let mut stderr_artifact = self.create_artifact( + let stderr_artifact = self.create_artifact( &format!("{} (stderr)", name), &format!("{} (stderr)", desc) ).await.expect("works"); + let exit_status = self.execute_command(command, name, desc, stdout_artifact, stderr_artifact).await?; + + Ok(exit_status) + } + + async fn execute_command_capture_output(&self, mut command: Command, name: &str, desc: &str) -> Result { + let stdout_collector = VecSink::new(); + let stderr_collector = VecSink::new(); + + let exit_status = self.execute_command(command, name, desc, stdout_collector.clone(), stderr_collector.clone()).await?; + + Ok(CommandOutput { + exit_status, + stdout: stdout_collector.take_buf(), + stderr: stderr_collector.take_buf(), + }) + } + + async fn execute_command(&self, mut command: Command, name: &str, desc: &str, mut stdout_reporter: impl AsyncWrite + Unpin + Send + 'static, mut stderr_reporter: impl AsyncWrite + Unpin + Send + 'static) -> Result { + eprintln!("[.] running {}", name); + let mut child = command .stdin(Stdio::null()) .stdout(Stdio::piped()) @@ -199,9 +220,9 @@ impl RunningJob { let mut child_stderr = child.stderr.take().unwrap(); eprintln!("[.] '{}': forwarding stdout", name); - tokio::spawn(async move { crate::io::forward_data(&mut child_stdout, &mut stdout_artifact).await }); + tokio::spawn(async move { crate::io::forward_data(&mut child_stdout, &mut stdout_reporter).await }); eprintln!("[.] '{}': forwarding stderr", name); - tokio::spawn(async move { crate::io::forward_data(&mut child_stderr, &mut stderr_artifact).await }); + tokio::spawn(async move { crate::io::forward_data(&mut child_stderr, &mut stderr_reporter).await }); let res = child.wait().await .map_err(|e| format!("failed to wait? {:?}", e))?; @@ -291,10 +312,7 @@ impl RunningJob { } } - async fn run_command(&mut self, command: &[String], working_dir: Option<&str>) -> Result<(), String> { - self.client.send_typed(&ClientProto::command(CommandInfo::started(command, working_dir, 1))) - .await.unwrap(); - + fn prep_command(command: &[String], working_dir: Option<&str>) -> (Command, String) { let mut cmd = Command::new(&command[0]); let cwd = match working_dir { Some(dir) => { @@ -304,13 +322,32 @@ impl RunningJob { "tmpdir".to_string() } }; - eprintln!("running {:?} in {}", &command, &cwd); + eprintln!("prepared {:?} to run in {}", &command, &cwd); let human_name = command.join(" "); cmd .current_dir(cwd) .args(&command[1..]); + (cmd, human_name) + } + + async fn run_with_output(&mut self, command: &[String], working_dir: Option<&str>) -> Result { + let (cmd, human_name) = Self::prep_command(command, working_dir); + + let cmd_res = self.execute_command_capture_output(cmd, &format!("{} log", human_name), &human_name).await?; + + if !cmd_res.exit_status.success() { + return Err(format!("{} failed: {:?}", &human_name, cmd_res.exit_status)); + } + Ok(cmd_res) + } + + async fn run_command(&mut self, command: &[String], working_dir: Option<&str>) -> Result<(), String> { + self.client.send_typed(&ClientProto::command(CommandInfo::started(command, working_dir, 1))) + .await.unwrap(); + + let (cmd, human_name) = Self::prep_command(command, working_dir); - let cmd_res = self.execute_command(cmd, &format!("{} log", human_name), &human_name).await?; + let cmd_res = self.execute_command_and_report(cmd, &format!("{} log", human_name), &human_name).await?; self.client.send_typed(&ClientProto::command(CommandInfo::finished(cmd_res.code(), 1))) .await.unwrap(); diff --git a/src/io.rs b/src/io.rs index 50f9bad..f9f407f 100644 --- a/src/io.rs +++ b/src/io.rs @@ -6,6 +6,7 @@ use tokio::fs::OpenOptions; use std::task::{Poll, Context}; use std::pin::Pin; use std::time::{UNIX_EPOCH, SystemTime}; +use std::sync::{Arc, Mutex}; pub fn now_ms() -> u64 { SystemTime::now() @@ -14,6 +15,46 @@ pub fn now_ms() -> u64 { .as_millis() as u64 } +#[derive(Clone)] +pub struct VecSink { + body: Arc>>, +} + +impl VecSink { + pub fn new() -> Self { + Self { body: Arc::new(Mutex::new(Vec::new())) } + } + + pub fn take_buf(&self) -> Vec { + std::mem::replace(&mut *self.body.lock().unwrap(), Vec::new()) + } +} + +impl tokio::io::AsyncWrite for VecSink { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &[u8] + ) -> Poll> { + self.body.lock().unwrap().extend_from_slice(buf); + Poll::Ready(Ok(buf.len())) + } + + fn poll_flush( + self: Pin<&mut Self>, + _cx: &mut Context + ) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + _cx: &mut Context + ) -> Poll> { + Poll::Ready(Ok(())) + } +} + pub struct ArtifactStream { sender: hyper::body::Sender, } diff --git a/src/lua/mod.rs b/src/lua/mod.rs index 46e8047..1b86582 100644 --- a/src/lua/mod.rs +++ b/src/lua/mod.rs @@ -12,15 +12,29 @@ pub struct BuildEnv { job: Arc>, } +#[derive(Debug)] +pub struct RunParams { + step: Option, + name: Option, + cwd: Option, +} + +pub struct CommandOutput { + pub exit_status: std::process::ExitStatus, + pub stdout: Vec, + pub stderr: Vec, +} + mod lua_exports { use crate::RunningJob; + use crate::lua::{CommandOutput, RunParams}; use std::sync::{Arc, Mutex}; use std::path::PathBuf; use rlua::prelude::*; - pub fn build_command_impl(command: LuaValue, params: LuaValue, job_ctx: Arc>) -> Result<(), rlua::Error> { + pub fn collect_build_args(command: LuaValue, params: LuaValue) -> Result<(Vec, RunParams), rlua::Error> { let args = match command { LuaValue::Table(table) => { let len = table.len().expect("command table has a length"); @@ -44,13 +58,6 @@ mod lua_exports { } }; - #[derive(Debug)] - struct RunParams { - step: Option, - name: Option, - cwd: Option, - } - let params = match params { LuaValue::Table(table) => { let step = match table.get("step").expect("can get from table") { @@ -104,6 +111,12 @@ mod lua_exports { return Err(LuaError::RuntimeError(format!("argument 2 was not a table: {:?}", other))); } }; + + Ok((args, params)) + } + + pub fn build_command_impl(command: LuaValue, params: LuaValue, job_ctx: Arc>) -> Result<(), rlua::Error> { + let (args, params) = collect_build_args(command, params)?; eprintln!("args: {:?}", args); eprintln!(" params: {:?}", params); let rt = tokio::runtime::Builder::new_current_thread() @@ -116,6 +129,29 @@ mod lua_exports { }) } + pub fn check_output_impl<'lua>(ctx: rlua::Context<'lua>, command: LuaValue<'lua>, params: LuaValue<'lua>, job_ctx: Arc>) -> Result, rlua::Error> { + let (args, params) = collect_build_args(command, params)?; + eprintln!("args: {:?}", args); + eprintln!(" params: {:?}", params); + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + let command_output = rt.block_on(async move { + job_ctx.lock().unwrap().run_with_output(&args, params.cwd.as_ref().map(|x| x.as_str())).await + .map_err(|e| LuaError::RuntimeError(format!("run_command error: {:?}", e))) + })?; + + let stdout = ctx.create_string(command_output.stdout.as_slice())?; + let stderr = ctx.create_string(command_output.stderr.as_slice())?; + + let result = ctx.create_table()?; + result.set("stdout", stdout)?; + result.set("stderr", stderr)?; + result.set("status", command_output.exit_status.code())?; + Ok(result) + } + pub fn metric(name: String, value: String, job_ctx: Arc>) -> Result<(), rlua::Error> { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() @@ -251,6 +287,10 @@ impl BuildEnv { lua_exports::build_command_impl(command, params, job_ref) })?; + let check_output = decl_env.create_function("check_output", move |ctx, job_ref, (command, params): (LuaValue, LuaValue)| { + lua_exports::check_output_impl(ctx, command, params, job_ref) + })?; + let metric = decl_env.create_function("metric", move |_, job_ref, (name, value): (String, String)| { lua_exports::metric(name, value, job_ref) })?; @@ -301,6 +341,7 @@ impl BuildEnv { ("error", error), ("artifact", artifact), ("now_ms", now_ms), + ("check_output", check_output), ] ).unwrap(); build_functions.set("environment", build_environment).unwrap(); -- cgit v1.1