summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/ci_runner.rs67
-rw-r--r--src/io.rs41
-rw-r--r--src/lua/mod.rs57
3 files changed, 142 insertions, 23 deletions
diff --git a/src/ci_runner.rs b/src/ci_runner.rs
index c6fc838..a4d15ef 100644
--- a/src/ci_runner.rs
+++ b/src/ci_runner.rs
@@ -17,8 +17,9 @@ mod protocol;
mod lua;
mod io;
-use crate::io::ArtifactStream;
+use crate::io::{ArtifactStream, VecSink};
use crate::protocol::{ClientProto, CommandInfo, TaskInfo, RequestedJob};
+use crate::lua::CommandOutput;
#[derive(Debug)]
enum WorkAcquireError {
@@ -144,7 +145,7 @@ impl RunningJob {
.arg(&self.job.remote_url)
.arg("tmpdir");
- let clone_res = self.execute_command(git_clone, "git clone log", &format!("git clone {} tmpdir", &self.job.remote_url)).await
+ let clone_res = self.execute_command_and_report(git_clone, "git clone log", &format!("git clone {} tmpdir", &self.job.remote_url)).await
.map_err(|e| {
eprintln!("stringy error (exec failed?) for clone: {}", e);
RepoError::CloneFailedIdk { exit_code: ExitStatus::from_raw(0) }
@@ -160,7 +161,7 @@ impl RunningJob {
.arg("checkout")
.arg(&self.job.commit);
- let checkout_res = self.execute_command(git_checkout, "git checkout log", &format!("git checkout {}", &self.job.commit)).await
+ let checkout_res = self.execute_command_and_report(git_checkout, "git checkout log", &format!("git checkout {}", &self.job.commit)).await
.map_err(|e| {
eprintln!("stringy error (exec failed?) for checkout: {}", e);
RepoError::CheckoutFailedIdk { exit_code: ExitStatus::from_raw(0) }
@@ -177,17 +178,37 @@ impl RunningJob {
Ok(())
}
- async fn execute_command(&self, mut command: Command, name: &str, desc: &str) -> Result<ExitStatus, String> {
- eprintln!("[.] running {}", name);
- let mut stdout_artifact = self.create_artifact(
+ async fn execute_command_and_report(&self, mut command: Command, name: &str, desc: &str) -> Result<ExitStatus, String> {
+ let stdout_artifact = self.create_artifact(
&format!("{} (stdout)", name),
&format!("{} (stdout)", desc)
).await.expect("works");
- let mut stderr_artifact = self.create_artifact(
+ let stderr_artifact = self.create_artifact(
&format!("{} (stderr)", name),
&format!("{} (stderr)", desc)
).await.expect("works");
+ let exit_status = self.execute_command(command, name, desc, stdout_artifact, stderr_artifact).await?;
+
+ Ok(exit_status)
+ }
+
+ async fn execute_command_capture_output(&self, mut command: Command, name: &str, desc: &str) -> Result<crate::lua::CommandOutput, String> {
+ let stdout_collector = VecSink::new();
+ let stderr_collector = VecSink::new();
+
+ let exit_status = self.execute_command(command, name, desc, stdout_collector.clone(), stderr_collector.clone()).await?;
+
+ Ok(CommandOutput {
+ exit_status,
+ stdout: stdout_collector.take_buf(),
+ stderr: stderr_collector.take_buf(),
+ })
+ }
+
+ async fn execute_command(&self, mut command: Command, name: &str, desc: &str, mut stdout_reporter: impl AsyncWrite + Unpin + Send + 'static, mut stderr_reporter: impl AsyncWrite + Unpin + Send + 'static) -> Result<ExitStatus, String> {
+ eprintln!("[.] running {}", name);
+
let mut child = command
.stdin(Stdio::null())
.stdout(Stdio::piped())
@@ -199,9 +220,9 @@ impl RunningJob {
let mut child_stderr = child.stderr.take().unwrap();
eprintln!("[.] '{}': forwarding stdout", name);
- tokio::spawn(async move { crate::io::forward_data(&mut child_stdout, &mut stdout_artifact).await });
+ tokio::spawn(async move { crate::io::forward_data(&mut child_stdout, &mut stdout_reporter).await });
eprintln!("[.] '{}': forwarding stderr", name);
- tokio::spawn(async move { crate::io::forward_data(&mut child_stderr, &mut stderr_artifact).await });
+ tokio::spawn(async move { crate::io::forward_data(&mut child_stderr, &mut stderr_reporter).await });
let res = child.wait().await
.map_err(|e| format!("failed to wait? {:?}", e))?;
@@ -291,10 +312,7 @@ impl RunningJob {
}
}
- async fn run_command(&mut self, command: &[String], working_dir: Option<&str>) -> Result<(), String> {
- self.client.send_typed(&ClientProto::command(CommandInfo::started(command, working_dir, 1)))
- .await.unwrap();
-
+ fn prep_command(command: &[String], working_dir: Option<&str>) -> (Command, String) {
let mut cmd = Command::new(&command[0]);
let cwd = match working_dir {
Some(dir) => {
@@ -304,13 +322,32 @@ impl RunningJob {
"tmpdir".to_string()
}
};
- eprintln!("running {:?} in {}", &command, &cwd);
+ eprintln!("prepared {:?} to run in {}", &command, &cwd);
let human_name = command.join(" ");
cmd
.current_dir(cwd)
.args(&command[1..]);
+ (cmd, human_name)
+ }
+
+ async fn run_with_output(&mut self, command: &[String], working_dir: Option<&str>) -> Result<CommandOutput, String> {
+ let (cmd, human_name) = Self::prep_command(command, working_dir);
+
+ let cmd_res = self.execute_command_capture_output(cmd, &format!("{} log", human_name), &human_name).await?;
+
+ if !cmd_res.exit_status.success() {
+ return Err(format!("{} failed: {:?}", &human_name, cmd_res.exit_status));
+ }
+ Ok(cmd_res)
+ }
+
+ async fn run_command(&mut self, command: &[String], working_dir: Option<&str>) -> Result<(), String> {
+ self.client.send_typed(&ClientProto::command(CommandInfo::started(command, working_dir, 1)))
+ .await.unwrap();
+
+ let (cmd, human_name) = Self::prep_command(command, working_dir);
- let cmd_res = self.execute_command(cmd, &format!("{} log", human_name), &human_name).await?;
+ let cmd_res = self.execute_command_and_report(cmd, &format!("{} log", human_name), &human_name).await?;
self.client.send_typed(&ClientProto::command(CommandInfo::finished(cmd_res.code(), 1)))
.await.unwrap();
diff --git a/src/io.rs b/src/io.rs
index 50f9bad..f9f407f 100644
--- a/src/io.rs
+++ b/src/io.rs
@@ -6,6 +6,7 @@ use tokio::fs::OpenOptions;
use std::task::{Poll, Context};
use std::pin::Pin;
use std::time::{UNIX_EPOCH, SystemTime};
+use std::sync::{Arc, Mutex};
pub fn now_ms() -> u64 {
SystemTime::now()
@@ -14,6 +15,46 @@ pub fn now_ms() -> u64 {
.as_millis() as u64
}
+#[derive(Clone)]
+pub struct VecSink {
+ body: Arc<Mutex<Vec<u8>>>,
+}
+
+impl VecSink {
+ pub fn new() -> Self {
+ Self { body: Arc::new(Mutex::new(Vec::new())) }
+ }
+
+ pub fn take_buf(&self) -> Vec<u8> {
+ std::mem::replace(&mut *self.body.lock().unwrap(), Vec::new())
+ }
+}
+
+impl tokio::io::AsyncWrite for VecSink {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context,
+ buf: &[u8]
+ ) -> Poll<Result<usize, std::io::Error>> {
+ self.body.lock().unwrap().extend_from_slice(buf);
+ Poll::Ready(Ok(buf.len()))
+ }
+
+ fn poll_flush(
+ self: Pin<&mut Self>,
+ _cx: &mut Context
+ ) -> Poll<Result<(), std::io::Error>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn poll_shutdown(
+ self: Pin<&mut Self>,
+ _cx: &mut Context
+ ) -> Poll<Result<(), std::io::Error>> {
+ Poll::Ready(Ok(()))
+ }
+}
+
pub struct ArtifactStream {
sender: hyper::body::Sender,
}
diff --git a/src/lua/mod.rs b/src/lua/mod.rs
index 46e8047..1b86582 100644
--- a/src/lua/mod.rs
+++ b/src/lua/mod.rs
@@ -12,15 +12,29 @@ pub struct BuildEnv {
job: Arc<Mutex<RunningJob>>,
}
+#[derive(Debug)]
+pub struct RunParams {
+ step: Option<String>,
+ name: Option<String>,
+ cwd: Option<String>,
+}
+
+pub struct CommandOutput {
+ pub exit_status: std::process::ExitStatus,
+ pub stdout: Vec<u8>,
+ pub stderr: Vec<u8>,
+}
+
mod lua_exports {
use crate::RunningJob;
+ use crate::lua::{CommandOutput, RunParams};
use std::sync::{Arc, Mutex};
use std::path::PathBuf;
use rlua::prelude::*;
- pub fn build_command_impl(command: LuaValue, params: LuaValue, job_ctx: Arc<Mutex<RunningJob>>) -> Result<(), rlua::Error> {
+ pub fn collect_build_args(command: LuaValue, params: LuaValue) -> Result<(Vec<String>, RunParams), rlua::Error> {
let args = match command {
LuaValue::Table(table) => {
let len = table.len().expect("command table has a length");
@@ -44,13 +58,6 @@ mod lua_exports {
}
};
- #[derive(Debug)]
- struct RunParams {
- step: Option<String>,
- name: Option<String>,
- cwd: Option<String>,
- }
-
let params = match params {
LuaValue::Table(table) => {
let step = match table.get("step").expect("can get from table") {
@@ -104,6 +111,12 @@ mod lua_exports {
return Err(LuaError::RuntimeError(format!("argument 2 was not a table: {:?}", other)));
}
};
+
+ Ok((args, params))
+ }
+
+ pub fn build_command_impl(command: LuaValue, params: LuaValue, job_ctx: Arc<Mutex<RunningJob>>) -> Result<(), rlua::Error> {
+ let (args, params) = collect_build_args(command, params)?;
eprintln!("args: {:?}", args);
eprintln!(" params: {:?}", params);
let rt = tokio::runtime::Builder::new_current_thread()
@@ -116,6 +129,29 @@ mod lua_exports {
})
}
+ pub fn check_output_impl<'lua>(ctx: rlua::Context<'lua>, command: LuaValue<'lua>, params: LuaValue<'lua>, job_ctx: Arc<Mutex<RunningJob>>) -> Result<rlua::Table<'lua>, rlua::Error> {
+ let (args, params) = collect_build_args(command, params)?;
+ eprintln!("args: {:?}", args);
+ eprintln!(" params: {:?}", params);
+ let rt = tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()
+ .unwrap();
+ let command_output = rt.block_on(async move {
+ job_ctx.lock().unwrap().run_with_output(&args, params.cwd.as_ref().map(|x| x.as_str())).await
+ .map_err(|e| LuaError::RuntimeError(format!("run_command error: {:?}", e)))
+ })?;
+
+ let stdout = ctx.create_string(command_output.stdout.as_slice())?;
+ let stderr = ctx.create_string(command_output.stderr.as_slice())?;
+
+ let result = ctx.create_table()?;
+ result.set("stdout", stdout)?;
+ result.set("stderr", stderr)?;
+ result.set("status", command_output.exit_status.code())?;
+ Ok(result)
+ }
+
pub fn metric(name: String, value: String, job_ctx: Arc<Mutex<RunningJob>>) -> Result<(), rlua::Error> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
@@ -251,6 +287,10 @@ impl BuildEnv {
lua_exports::build_command_impl(command, params, job_ref)
})?;
+ let check_output = decl_env.create_function("check_output", move |ctx, job_ref, (command, params): (LuaValue, LuaValue)| {
+ lua_exports::check_output_impl(ctx, command, params, job_ref)
+ })?;
+
let metric = decl_env.create_function("metric", move |_, job_ref, (name, value): (String, String)| {
lua_exports::metric(name, value, job_ref)
})?;
@@ -301,6 +341,7 @@ impl BuildEnv {
("error", error),
("artifact", artifact),
("now_ms", now_ms),
+ ("check_output", check_output),
]
).unwrap();
build_functions.set("environment", build_environment).unwrap();