summaryrefslogtreecommitdiff
path: root/src/ci_runner.rs
diff options
context:
space:
mode:
authoriximeow <git@iximeow.net>2022-12-27 23:53:41 +0000
committeriximeow <git@iximeow.net>2022-12-27 23:53:41 +0000
commit018c655304bfc185740b3a163b369ead7d5131e8 (patch)
treef04024ad72a38e75460b8453aa46e08b08a32a97 /src/ci_runner.rs
parent3411ef67fb829c419f853293664d97c6a6d4b96b (diff)
finally actually support goodfiles
some more refinements to how builds are run as well: build state discusses if a build us running, where the result is either a pass or fail this is useful for deciding if a build is in progress and how artifacts (if any) should be presented
Diffstat (limited to 'src/ci_runner.rs')
-rw-r--r--src/ci_runner.rs291
1 files changed, 203 insertions, 88 deletions
diff --git a/src/ci_runner.rs b/src/ci_runner.rs
index 1c2ba92..de423eb 100644
--- a/src/ci_runner.rs
+++ b/src/ci_runner.rs
@@ -1,4 +1,6 @@
use std::time::Duration;
+use rlua::prelude::LuaError;
+use std::sync::{Arc, Mutex};
use reqwest::{StatusCode, Response};
use tokio::process::Command;
use std::process::Stdio;
@@ -11,6 +13,8 @@ use std::task::{Context, Poll};
use std::pin::Pin;
use std::marker::Unpin;
+mod lua;
+
#[derive(Debug)]
enum WorkAcquireError {
Reqwest(reqwest::Error),
@@ -34,12 +38,73 @@ struct RequestedJob {
}
impl RequestedJob {
+ pub fn into_running(self, client: RunnerClient) -> RunningJob {
+ RunningJob {
+ job: self,
+ client,
+ }
+ }
+}
+
+struct JobEnv {
+ lua: lua::BuildEnv,
+ job: Arc<Mutex<RunningJob>>,
+}
+
+impl JobEnv {
+ fn new(job: &Arc<Mutex<RunningJob>>) -> Self {
+ let lua = lua::BuildEnv::new(job);
+ JobEnv {
+ lua,
+ job: Arc::clone(job)
+ }
+ }
+
+ async fn default_goodfile(self) -> Result<(), LuaError> {
+ self.lua.run_build(crate::lua::DEFAULT_RUST_GOODFILE).await
+ }
+
+ async fn exec_goodfile(self) -> Result<(), LuaError> {
+ let script = std::fs::read_to_string("./tmpdir/goodfile").unwrap();
+ self.lua.run_build(script.as_bytes()).await
+ }
+}
+
+pub struct RunningJob {
+ job: RequestedJob,
+ client: RunnerClient,
+}
+
+async fn forward_data(mut source: impl AsyncRead + Unpin, mut dest: impl AsyncWrite + Unpin) -> Result<(), String> {
+ let mut buf = vec![0; 1024 * 1024];
+ loop {
+ let n_read = source.read(&mut buf).await
+ .map_err(|e| format!("failed to read: {:?}", e))?;
+
+ if n_read == 0 {
+ return Ok(());
+ }
+
+ dest.write_all(&buf[..n_read]).await
+ .map_err(|e| format!("failed to write: {:?}", e))?;
+ }
+}
+
+impl RunningJob {
+ async fn send_metric(&mut self, name: &str, value: String) -> Result<(), String> {
+ self.client.send(serde_json::json!({
+ "kind": "metric",
+ "value": value.to_string(),
+ })).await
+ .map_err(|e| format!("failed to send metric {}: {:?})", name, e))
+ }
+
// TODO: panics if hyper finds the channel is closed. hum
- async fn create_artifact(&self, client: &mut RunnerClient, name: &str, desc: &str) -> Result<ArtifactStream, String> {
+ async fn create_artifact(&self, name: &str, desc: &str) -> Result<ArtifactStream, String> {
let (mut sender, body) = hyper::Body::channel();
- let resp = client.http.post("https://ci.butactuallyin.space:9876/api/artifact")
+ let resp = self.client.http.post("https://ci.butactuallyin.space:9876/api/artifact")
.header("user-agent", "ci-butactuallyin-space-runner")
- .header("x-job-token", &self.build_token)
+ .header("x-job-token", &self.job.build_token)
.header("x-artifact-name", name)
.header("x-artifact-desc", desc)
.body(body)
@@ -57,30 +122,71 @@ impl RequestedJob {
}
}
- async fn execute_command(&self, client: &mut RunnerClient, mut command: Command, name: &str, desc: &str) -> Result<ExitStatus, String> {
- eprintln!("[.] running {}", name);
- async fn forward_data(mut source: impl AsyncRead + Unpin, mut dest: impl AsyncWrite + Unpin) -> Result<(), String> {
- let mut buf = vec![0; 1024 * 1024];
- loop {
- let n_read = source.read(&mut buf).await
- .map_err(|e| format!("failed to read: {:?}", e))?;
-
- if n_read == 0 {
- return Ok(());
- }
+ async fn clone_remote(&self) -> Result<(), String> {
+ let mut git_clone = Command::new("git");
+ git_clone
+ .arg("clone")
+ .arg(&self.job.remote_url)
+ .arg("tmpdir");
- dest.write_all(&buf[..n_read]).await
- .map_err(|e| format!("failed to write: {:?}", e))?;
- }
+ let clone_res = self.execute_command(git_clone, "git clone log", &format!("git clone {} tmpdir", &self.job.remote_url)).await?;
+
+ if !clone_res.success() {
+ return Err(format!("git clone failed: {:?}", clone_res));
}
+ let mut git_checkout = Command::new("git");
+ git_checkout
+ .current_dir("tmpdir")
+ .arg("checkout")
+ .arg(&self.job.commit);
+
+ let checkout_res = self.execute_command(git_checkout, "git checkout log", &format!("git checkout {}", &self.job.commit)).await?;
+
+ if !checkout_res.success() {
+ return Err(format!("git checkout failed: {:?}", checkout_res));
+ }
+
+ Ok(())
+ }
+
+ async fn execute_goodfile(&self) -> Result<String, String> {
+ Ok("string".to_string())
+ }
+
+ async fn default_goodfile(&self) -> Result<String, String> {
+ let mut build = Command::new("cargo");
+ build
+ .current_dir("tmpdir")
+ .arg("build");
+
+ let build_res = self.execute_command(build, "cargo build log", "cargo build").await?;
+
+ if !build_res.success() {
+ return Err(format!("cargo build failed: {:?}", build_res));
+ }
+
+ let mut test = Command::new("cargo");
+ test
+ .current_dir("tmpdir")
+ .arg("test");
+
+ let test_res = self.execute_command(test, "cargo test log", "cargo test").await?;
+
+ match test_res.code() {
+ Some(0) => Ok("pass".to_string()),
+ Some(n) => Ok(format!("error: {}", n)),
+ None => Ok(format!("abnormal exit")),
+ }
+ }
+
+ async fn execute_command(&self, mut command: Command, name: &str, desc: &str) -> Result<ExitStatus, String> {
+ eprintln!("[.] running {}", name);
let stdout_artifact = self.create_artifact(
- client,
&format!("{} (stdout)", name),
&format!("{} (stdout)", desc)
).await.expect("works");
let stderr_artifact = self.create_artifact(
- client,
&format!("{} (stderr)", name),
&format!("{} (stderr)", desc)
).await.expect("works");
@@ -112,54 +218,93 @@ impl RequestedJob {
Ok(res)
}
- async fn execute_goodfile(&self, client: &mut RunnerClient) -> Result<String, String> {
- let mut git_clone = Command::new("git");
- git_clone
- .arg("clone")
- .arg(&self.remote_url)
- .arg("tmpdir");
+ async fn run(mut self) {
+ self.client.send(serde_json::json!({
+ "status": "started"
+ })).await.unwrap();
- let clone_res = self.execute_command(client, git_clone, "git clone log", &format!("git clone {} tmpdir", &self.remote_url)).await?;
+ std::fs::remove_dir_all("tmpdir").unwrap();
+ std::fs::create_dir("tmpdir").unwrap();
- if !clone_res.success() {
- return Err(format!("git clone failed: {:?}", clone_res));
- }
+ self.clone_remote().await.expect("clone succeeds");
+
+ let ctx = Arc::new(Mutex::new(self));
+
+ let lua_env = JobEnv::new(&ctx);
+
+ let metadata = std::fs::metadata("./tmpdir/goodfile");
+ let res: Result<String, (String, String)> = match metadata {
+ Ok(_) => {
+ match lua_env.exec_goodfile().await {
+ Ok(()) => {
+ Ok("pass".to_string())
+ },
+ Err(lua_err) => {
+ Err(("failed".to_string(), lua_err.to_string()))
+ }
+ }
+ },
+ Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
+ match lua_env.default_goodfile().await {
+ Ok(()) => {
+ Ok("pass".to_string())
+ },
+ Err(lua_err) => {
+ Err(("failed".to_string(), lua_err.to_string()))
+ }
+ }
+ },
+ Err(e) => {
+ eprintln!("[-] error finding goodfile: {:?}", e);
+ Err(("failed".to_string(), "inaccessible goodfile".to_string()))
+ }
+ };
- let mut git_checkout = Command::new("git");
- git_checkout
- .current_dir("tmpdir")
- .arg("checkout")
- .arg(&self.commit);
+ match res {
+ Ok(status) => {
+ eprintln!("[+] job success!");
+ let status = serde_json::json!({
+ "kind": "job_status",
+ "state": "finished",
+ "result": status
+ });
+ eprintln!("reporting status: {}", status);
- let checkout_res = self.execute_command(client, git_checkout, "git checkout log", &format!("git checkout {}", &self.commit)).await?;
+ let res = ctx.lock().unwrap().client.send(status).await;
+ if let Err(e) = res {
+ eprintln!("[!] FAILED TO REPORT JOB STATUS ({}): {:?}", "success", e);
+ }
+ }
+ Err((status, lua_err)) => {
+ eprintln!("[-] job error: {}", status);
- if !checkout_res.success() {
- return Err(format!("git checkout failed: {:?}", checkout_res));
+ let res = ctx.lock().unwrap().client.send(serde_json::json!({
+ "kind": "job_status",
+ "state": "interrupted",
+ "result": status,
+ "desc": lua_err.to_string(),
+ })).await;
+ if let Err(e) = res {
+ eprintln!("[!] FAILED TO REPORT JOB STATUS ({}): {:?}", status, e);
+ }
+ }
}
+ }
- let mut build = Command::new("cargo");
- build
+ async fn run_command(&self, command: &[String]) -> Result<(), String> {
+ let mut cmd = Command::new(&command[0]);
+ let human_name = command.join(" ");
+ cmd
.current_dir("tmpdir")
- .arg("build");
+ .args(&command[1..]);
- let build_res = self.execute_command(client, build, "cargo build log", "cargo build").await?;
+ let cmd_res = self.execute_command(cmd, &format!("{} log", human_name), &human_name).await?;
- if !build_res.success() {
- return Err(format!("cargo build failed: {:?}", build_res));
+ if !cmd_res.success() {
+ return Err(format!("{} failed: {:?}", &human_name, cmd_res));
}
- let mut test = Command::new("cargo");
- test
- .current_dir("tmpdir")
- .arg("test");
-
- let test_res = self.execute_command(client, test, "cargo test log", "cargo test").await?;
-
- match test_res.code() {
- Some(0) => Ok("pass".to_string()),
- Some(n) => Ok(format!("error: {}", n)),
- None => Ok(format!("abnormal exit")),
- }
+ Ok(())
}
}
@@ -269,38 +414,6 @@ impl RunnerClient {
).await
.map_err(|e| format!("send error: {:?}", e))
}
-
- async fn run_job(&mut self, job: RequestedJob) {
- self.send(serde_json::json!({
- "status": "started"
- })).await.unwrap();
-
- std::fs::remove_dir_all("tmpdir").unwrap();
- std::fs::create_dir("tmpdir").unwrap();
-
- let res = job.execute_goodfile(self).await;
-
- match res {
- Ok(status) => {
- eprintln!("[+] job success!");
-
- self.send(serde_json::json!({
- "kind": "job_status",
- "state": "finished",
- "result": status
- })).await.unwrap();
- }
- Err(status) => {
- eprintln!("[-] job error: {}", status);
-
- self.send(serde_json::json!({
- "kind": "job_status",
- "state": "interrupted",
- "result": status
- })).await.unwrap();
- }
- }
- }
}
#[tokio::main]
@@ -355,7 +468,9 @@ async fn main() {
eprintln!("requested work: {:?}", job);
eprintln!("doing {:?}", job);
- client.run_job(job).await;
+
+ let mut job = job.into_running(client);
+ job.run().await;
std::thread::sleep(Duration::from_millis(10000));
},
Err(e) => {