From cb418168d9375d4ea6c9d21ee6b97e02a575fb4b Mon Sep 17 00:00:00 2001 From: iximeow Date: Sun, 25 Dec 2022 08:17:28 +0000 Subject: support uploading artifacts during builds --- src/ci_runner.rs | 193 ++++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 147 insertions(+), 46 deletions(-) (limited to 'src/ci_runner.rs') diff --git a/src/ci_runner.rs b/src/ci_runner.rs index 6554f05..9224614 100644 --- a/src/ci_runner.rs +++ b/src/ci_runner.rs @@ -1,8 +1,14 @@ use std::time::Duration; use reqwest::{StatusCode, Response}; -use std::process::Command; +use tokio::process::Command; +use std::process::Stdio; +use std::process::ExitStatus; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use serde_derive::{Deserialize, Serialize}; use serde::{Deserialize, de::DeserializeOwned, Serialize}; +use std::task::{Context, Poll}; +use std::pin::Pin; +use std::marker::Unpin; #[derive(Debug)] enum WorkAcquireError { @@ -12,6 +18,7 @@ enum WorkAcquireError { } struct RunnerClient { + http: reqwest::Client, host: String, tx: hyper::body::Sender, rx: Response, @@ -28,75 +35,127 @@ struct RequestedJob { impl RequestedJob { // TODO: panics if hyper finds the channel is closed. hum async fn create_artifact(&self, client: &mut RunnerClient, name: &str, desc: &str) -> Result { - let create_artifact_message = serde_json::json!({ - "kind": "artifact_create", - "name": name, - "description": desc, - "job_token": &self.build_token, - }); - client.send(create_artifact_message).await - .map_err(|e| format!("create artifact send error: {:?}", e))?; - let resp = client.recv().await - .map_err(|e| format!("create artifact recv error: {:?}", e))?; - eprintln!("resp: {:?}", resp); - let object_id = resp.unwrap() - .as_object().expect("is an object") - .get("object_id").unwrap().as_str().expect("is str") - .to_owned(); - // POST to this object id... - Ok(ArtifactStream { - object_id, - }) + eprintln!("[?] creating artifact..."); + let (mut sender, body) = hyper::Body::channel(); + let resp = client.http.post("https://ci.butactuallyin.space:9876/api/artifact") + .header("user-agent", "ci-butactuallyin-space-runner") + .header("x-job-token", &self.build_token) + .header("x-artifact-name", name) + .header("x-artifact-desc", desc) + .body(body) + .send() + .await + .map_err(|e| format!("unable to send request: {:?}", e))?; + + if resp.status() == StatusCode::OK { + eprintln!("[+] artifact '{}' started", name); + Ok(ArtifactStream { + sender, + }) + } else { + Err(format!("[-] unable to create artifact: {:?}", resp)) + } } - async fn execute_goodfile(&self, client: &mut RunnerClient) -> Result { - let clone_log = self.create_artifact(client, "git clone log", &format!("git clone {} tmpdir", &self.remote_url)).await.expect("works"); + async fn execute_command(&self, client: &mut RunnerClient, mut command: Command, name: &str, desc: &str) -> Result { + eprintln!("[.] running {}", name); + async fn forward_data(mut source: impl AsyncRead + Unpin, mut dest: impl AsyncWrite + Unpin) -> Result<(), String> { + let mut buf = vec![0; 1024 * 1024]; + loop { + let n_read = source.read(&mut buf).await + .map_err(|e| format!("failed to read: {:?}", e))?; - let clone_res = Command::new("git") + if n_read == 0 { + return Ok(()); + } + + dest.write_all(&buf[..n_read]).await + .map_err(|e| format!("failed to write: {:?}", e))?; + } + } + + let stdout_artifact = self.create_artifact( + client, + &format!("{} (stdout)", name), + &format!("{} (stdout)", desc) + ).await.expect("works"); + let stderr_artifact = self.create_artifact( + client, + &format!("{} (stderr)", name), + &format!("{} (stderr)", desc) + ).await.expect("works"); + + let mut child = command + .stdin(Stdio::null()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + .map_err(|e| format!("failed to spawn '{}', {:?}", name, e))?; + + let child_stdout = child.stdout.take().unwrap(); + let child_stderr = child.stderr.take().unwrap(); + + eprintln!("[.] '{}': forwarding stdout", name); + tokio::spawn(forward_data(child_stdout, stdout_artifact)); + eprintln!("[.] '{}': forwarding stderr", name); + tokio::spawn(forward_data(child_stderr, stderr_artifact)); + + let res = child.wait().await + .map_err(|e| format!("failed to wait? {:?}", e))?; + + if res.success() { + eprintln!("[+] '{}' success", name); + } else { + eprintln!("[-] '{}' fail: {:?}", name, res); + } + + Ok(res) + } + + async fn execute_goodfile(&self, client: &mut RunnerClient) -> Result { + let mut git_clone = Command::new("git"); + git_clone .arg("clone") .arg(&self.remote_url) - .arg("tmpdir") - .status() - .map_err(|e| format!("failed to run git clone? {:?}", e))?; + .arg("tmpdir"); + + let clone_res = self.execute_command(client, git_clone, "git clone log", &format!("git clone {} tmpdir", &self.remote_url)).await?; if !clone_res.success() { return Err(format!("git clone failed: {:?}", clone_res)); } - let checkout_log = self.create_artifact(client, "git checkout log", &format!("git checkout {}", &self.commit)).await.expect("works"); - - let checkout_res = Command::new("git") + let mut git_checkout = Command::new("git"); + git_checkout .current_dir("tmpdir") .arg("checkout") - .arg(&self.commit) - .status() - .map_err(|e| format!("failed to run git checkout? {:?}", e))?; + .arg(&self.commit); + + let checkout_res = self.execute_command(client, git_checkout, "git checkout log", &format!("git checkout {}", &self.commit)).await?; if !checkout_res.success() { return Err(format!("git checkout failed: {:?}", checkout_res)); } - let build_log = self.create_artifact(client, "cargo build log", "cargo build").await.expect("works"); - - let build_res = Command::new("cargo") + let mut build = Command::new("cargo"); + build .current_dir("tmpdir") - .arg("build") - .status() - .map_err(|e| format!("failed to run cargo build? {:?}", e))?; + .arg("build"); + + let build_res = self.execute_command(client, build, "cargo build log", "cargo build").await?; if !build_res.success() { return Err(format!("cargo build failed: {:?}", build_res)); } - let test_log = self.create_artifact(client, "cargo test log", "cargo test").await.expect("works"); - - let test_result = Command::new("cargo") + let mut test = Command::new("cargo"); + test .current_dir("tmpdir") - .arg("test") - .status() - .map_err(|e| format!("failed to run cargo test? {:?}", e))?; + .arg("test"); + + let test_res = self.execute_command(client, test, "cargo test log", "cargo test").await?; - match test_result.code() { + match test_res.code() { Some(0) => Ok("pass".to_string()), Some(n) => Ok(format!("error: {}", n)), None => Ok(format!("abnormal exit")), @@ -105,7 +164,38 @@ impl RequestedJob { } struct ArtifactStream { - object_id: String, + sender: hyper::body::Sender, +} + +impl tokio::io::AsyncWrite for ArtifactStream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &[u8] + ) -> Poll> { + match self.get_mut().sender.try_send_data(buf.to_vec().into()) { + Ok(()) => { + Poll::Ready(Ok(buf.len())) + }, + _ => { + Poll::Pending + } + } + } + + fn poll_flush( + self: Pin<&mut Self>, + _cx: &mut Context + ) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + _cx: &mut Context + ) -> Poll> { + Poll::Ready(Ok(())) + } } impl RunnerClient { @@ -120,6 +210,11 @@ impl RunnerClient { } Ok(Self { + http: reqwest::ClientBuilder::new() + .connect_timeout(Duration::from_millis(1000)) + .timeout(Duration::from_millis(600000)) + .build() + .expect("can build client"), host: host.to_string(), tx: sender, rx: res, @@ -187,6 +282,8 @@ impl RunnerClient { match res { Ok(status) => { + eprintln!("[+] job success!"); + self.send(serde_json::json!({ "kind": "job_status", "state": "finished", @@ -194,6 +291,8 @@ impl RunnerClient { })).await.unwrap(); } Err(status) => { + eprintln!("[-] job error: {}", status); + self.send(serde_json::json!({ "kind": "job_status", "state": "interrupted", @@ -213,6 +312,8 @@ async fn main() { .build() .expect("can build client"); + let allowed_pushers = None; + loop { let (mut sender, body) = hyper::Body::channel(); let poll = client.post("https://ci.butactuallyin.space:9876/api/next_job") -- cgit v1.1