summaryrefslogtreecommitdiff
path: root/src/ci_runner.rs
diff options
context:
space:
mode:
authoriximeow <git@iximeow.net>2022-12-25 08:17:28 +0000
committeriximeow <git@iximeow.net>2022-12-25 08:17:28 +0000
commitcb418168d9375d4ea6c9d21ee6b97e02a575fb4b (patch)
tree8d96ef3354840b25401817611159ad68ef554f7c /src/ci_runner.rs
parent4a213e872395f9b0562c113bb7303815a1d26a57 (diff)
support uploading artifacts during builds
Diffstat (limited to 'src/ci_runner.rs')
-rw-r--r--src/ci_runner.rs193
1 files changed, 147 insertions, 46 deletions
diff --git a/src/ci_runner.rs b/src/ci_runner.rs
index 6554f05..9224614 100644
--- a/src/ci_runner.rs
+++ b/src/ci_runner.rs
@@ -1,8 +1,14 @@
use std::time::Duration;
use reqwest::{StatusCode, Response};
-use std::process::Command;
+use tokio::process::Command;
+use std::process::Stdio;
+use std::process::ExitStatus;
+use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use serde_derive::{Deserialize, Serialize};
use serde::{Deserialize, de::DeserializeOwned, Serialize};
+use std::task::{Context, Poll};
+use std::pin::Pin;
+use std::marker::Unpin;
#[derive(Debug)]
enum WorkAcquireError {
@@ -12,6 +18,7 @@ enum WorkAcquireError {
}
struct RunnerClient {
+ http: reqwest::Client,
host: String,
tx: hyper::body::Sender,
rx: Response,
@@ -28,75 +35,127 @@ struct RequestedJob {
impl RequestedJob {
// TODO: panics if hyper finds the channel is closed. hum
async fn create_artifact(&self, client: &mut RunnerClient, name: &str, desc: &str) -> Result<ArtifactStream, String> {
- let create_artifact_message = serde_json::json!({
- "kind": "artifact_create",
- "name": name,
- "description": desc,
- "job_token": &self.build_token,
- });
- client.send(create_artifact_message).await
- .map_err(|e| format!("create artifact send error: {:?}", e))?;
- let resp = client.recv().await
- .map_err(|e| format!("create artifact recv error: {:?}", e))?;
- eprintln!("resp: {:?}", resp);
- let object_id = resp.unwrap()
- .as_object().expect("is an object")
- .get("object_id").unwrap().as_str().expect("is str")
- .to_owned();
- // POST to this object id...
- Ok(ArtifactStream {
- object_id,
- })
+ eprintln!("[?] creating artifact...");
+ let (mut sender, body) = hyper::Body::channel();
+ let resp = client.http.post("https://ci.butactuallyin.space:9876/api/artifact")
+ .header("user-agent", "ci-butactuallyin-space-runner")
+ .header("x-job-token", &self.build_token)
+ .header("x-artifact-name", name)
+ .header("x-artifact-desc", desc)
+ .body(body)
+ .send()
+ .await
+ .map_err(|e| format!("unable to send request: {:?}", e))?;
+
+ if resp.status() == StatusCode::OK {
+ eprintln!("[+] artifact '{}' started", name);
+ Ok(ArtifactStream {
+ sender,
+ })
+ } else {
+ Err(format!("[-] unable to create artifact: {:?}", resp))
+ }
}
- async fn execute_goodfile(&self, client: &mut RunnerClient) -> Result<String, String> {
- let clone_log = self.create_artifact(client, "git clone log", &format!("git clone {} tmpdir", &self.remote_url)).await.expect("works");
+ async fn execute_command(&self, client: &mut RunnerClient, mut command: Command, name: &str, desc: &str) -> Result<ExitStatus, String> {
+ eprintln!("[.] running {}", name);
+ async fn forward_data(mut source: impl AsyncRead + Unpin, mut dest: impl AsyncWrite + Unpin) -> Result<(), String> {
+ let mut buf = vec![0; 1024 * 1024];
+ loop {
+ let n_read = source.read(&mut buf).await
+ .map_err(|e| format!("failed to read: {:?}", e))?;
- let clone_res = Command::new("git")
+ if n_read == 0 {
+ return Ok(());
+ }
+
+ dest.write_all(&buf[..n_read]).await
+ .map_err(|e| format!("failed to write: {:?}", e))?;
+ }
+ }
+
+ let stdout_artifact = self.create_artifact(
+ client,
+ &format!("{} (stdout)", name),
+ &format!("{} (stdout)", desc)
+ ).await.expect("works");
+ let stderr_artifact = self.create_artifact(
+ client,
+ &format!("{} (stderr)", name),
+ &format!("{} (stderr)", desc)
+ ).await.expect("works");
+
+ let mut child = command
+ .stdin(Stdio::null())
+ .stdout(Stdio::piped())
+ .stderr(Stdio::piped())
+ .spawn()
+ .map_err(|e| format!("failed to spawn '{}', {:?}", name, e))?;
+
+ let child_stdout = child.stdout.take().unwrap();
+ let child_stderr = child.stderr.take().unwrap();
+
+ eprintln!("[.] '{}': forwarding stdout", name);
+ tokio::spawn(forward_data(child_stdout, stdout_artifact));
+ eprintln!("[.] '{}': forwarding stderr", name);
+ tokio::spawn(forward_data(child_stderr, stderr_artifact));
+
+ let res = child.wait().await
+ .map_err(|e| format!("failed to wait? {:?}", e))?;
+
+ if res.success() {
+ eprintln!("[+] '{}' success", name);
+ } else {
+ eprintln!("[-] '{}' fail: {:?}", name, res);
+ }
+
+ Ok(res)
+ }
+
+ async fn execute_goodfile(&self, client: &mut RunnerClient) -> Result<String, String> {
+ let mut git_clone = Command::new("git");
+ git_clone
.arg("clone")
.arg(&self.remote_url)
- .arg("tmpdir")
- .status()
- .map_err(|e| format!("failed to run git clone? {:?}", e))?;
+ .arg("tmpdir");
+
+ let clone_res = self.execute_command(client, git_clone, "git clone log", &format!("git clone {} tmpdir", &self.remote_url)).await?;
if !clone_res.success() {
return Err(format!("git clone failed: {:?}", clone_res));
}
- let checkout_log = self.create_artifact(client, "git checkout log", &format!("git checkout {}", &self.commit)).await.expect("works");
-
- let checkout_res = Command::new("git")
+ let mut git_checkout = Command::new("git");
+ git_checkout
.current_dir("tmpdir")
.arg("checkout")
- .arg(&self.commit)
- .status()
- .map_err(|e| format!("failed to run git checkout? {:?}", e))?;
+ .arg(&self.commit);
+
+ let checkout_res = self.execute_command(client, git_checkout, "git checkout log", &format!("git checkout {}", &self.commit)).await?;
if !checkout_res.success() {
return Err(format!("git checkout failed: {:?}", checkout_res));
}
- let build_log = self.create_artifact(client, "cargo build log", "cargo build").await.expect("works");
-
- let build_res = Command::new("cargo")
+ let mut build = Command::new("cargo");
+ build
.current_dir("tmpdir")
- .arg("build")
- .status()
- .map_err(|e| format!("failed to run cargo build? {:?}", e))?;
+ .arg("build");
+
+ let build_res = self.execute_command(client, build, "cargo build log", "cargo build").await?;
if !build_res.success() {
return Err(format!("cargo build failed: {:?}", build_res));
}
- let test_log = self.create_artifact(client, "cargo test log", "cargo test").await.expect("works");
-
- let test_result = Command::new("cargo")
+ let mut test = Command::new("cargo");
+ test
.current_dir("tmpdir")
- .arg("test")
- .status()
- .map_err(|e| format!("failed to run cargo test? {:?}", e))?;
+ .arg("test");
+
+ let test_res = self.execute_command(client, test, "cargo test log", "cargo test").await?;
- match test_result.code() {
+ match test_res.code() {
Some(0) => Ok("pass".to_string()),
Some(n) => Ok(format!("error: {}", n)),
None => Ok(format!("abnormal exit")),
@@ -105,7 +164,38 @@ impl RequestedJob {
}
struct ArtifactStream {
- object_id: String,
+ sender: hyper::body::Sender,
+}
+
+impl tokio::io::AsyncWrite for ArtifactStream {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context,
+ buf: &[u8]
+ ) -> Poll<Result<usize, std::io::Error>> {
+ match self.get_mut().sender.try_send_data(buf.to_vec().into()) {
+ Ok(()) => {
+ Poll::Ready(Ok(buf.len()))
+ },
+ _ => {
+ Poll::Pending
+ }
+ }
+ }
+
+ fn poll_flush(
+ self: Pin<&mut Self>,
+ _cx: &mut Context
+ ) -> Poll<Result<(), std::io::Error>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn poll_shutdown(
+ self: Pin<&mut Self>,
+ _cx: &mut Context
+ ) -> Poll<Result<(), std::io::Error>> {
+ Poll::Ready(Ok(()))
+ }
}
impl RunnerClient {
@@ -120,6 +210,11 @@ impl RunnerClient {
}
Ok(Self {
+ http: reqwest::ClientBuilder::new()
+ .connect_timeout(Duration::from_millis(1000))
+ .timeout(Duration::from_millis(600000))
+ .build()
+ .expect("can build client"),
host: host.to_string(),
tx: sender,
rx: res,
@@ -187,6 +282,8 @@ impl RunnerClient {
match res {
Ok(status) => {
+ eprintln!("[+] job success!");
+
self.send(serde_json::json!({
"kind": "job_status",
"state": "finished",
@@ -194,6 +291,8 @@ impl RunnerClient {
})).await.unwrap();
}
Err(status) => {
+ eprintln!("[-] job error: {}", status);
+
self.send(serde_json::json!({
"kind": "job_status",
"state": "interrupted",
@@ -213,6 +312,8 @@ async fn main() {
.build()
.expect("can build client");
+ let allowed_pushers = None;
+
loop {
let (mut sender, body) = hyper::Body::channel();
let poll = client.post("https://ci.butactuallyin.space:9876/api/next_job")