From cb418168d9375d4ea6c9d21ee6b97e02a575fb4b Mon Sep 17 00:00:00 2001 From: iximeow Date: Sun, 25 Dec 2022 08:17:28 +0000 Subject: support uploading artifacts during builds --- src/ci_driver.rs | 77 ++++++++++++++++++++-- src/ci_runner.rs | 193 ++++++++++++++++++++++++++++++++++++++++++------------- src/dbctx.rs | 114 ++++++++++++++++++++++++++++++++ src/notifier.rs | 5 ++ src/sql.rs | 9 ++- 5 files changed, 346 insertions(+), 52 deletions(-) (limited to 'src') diff --git a/src/ci_driver.rs b/src/ci_driver.rs index 96e0d44..9ec0dd8 100644 --- a/src/ci_driver.rs +++ b/src/ci_driver.rs @@ -9,6 +9,7 @@ use std::time::{SystemTime, UNIX_EPOCH}; use axum_server::tls_rustls::RustlsConfig; use axum::body::StreamBody; use axum::http::{StatusCode}; +use hyper::HeaderMap; use axum::Router; use axum::routing::*; use axum::extract::State; @@ -101,8 +102,8 @@ async fn activate_job(dbctx: Arc, job: &PendingJob, clients: &mut mpsc::R let run_host = client_job.client.name.clone(); connection.execute( - "update jobs set started_time=?1, run_host=?2, state=1, artifacts_path=?3 where id=?4", - (now as u64, run_host, format!("{}", artifacts.display()), job.id) + "update jobs set started_time=?1, run_host=?2, state=1, artifacts_path=?3, build_token=?4 where id=?5", + (now as u64, run_host, format!("{}", artifacts.display()), &client_job.client.build_token, job.id) ) .expect("can update"); @@ -139,7 +140,15 @@ struct ClientJob { impl ClientJob { pub async fn run(&mut self) { loop { - let msg = self.client.recv().await.expect("recv works").expect("client sent an object"); + eprintln!("waiting on response.."); + let msg = match self.client.recv().await.expect("recv works") { + Some(msg) => msg, + None => { + eprintln!("client hung up. job's done, i hope?"); + return; + } + }; + eprintln!("got {:?}", msg); let msg_kind = msg.as_object().unwrap().get("kind").unwrap().as_str().unwrap(); match msg_kind { "job_status" => { @@ -228,7 +237,6 @@ impl RunnerClient { Err(format!("no client job: {:?}", e)) }, None => { - eprintln!("no more body chunks? client hung up?"); Ok(None) } } @@ -258,7 +266,6 @@ impl RunnerClient { } } Ok(None) => { - eprintln!("no more body chunks? client hung up?"); Ok(None) } Err(e) => { @@ -275,6 +282,65 @@ impl fmt::Debug for RunnerClient { } } +#[axum_macros::debug_handler] +async fn handle_artifact(State(ctx): State<(Arc, mpsc::Sender)>, headers: HeaderMap, artifact_content: BodyStream) -> impl IntoResponse { + let job_token = match headers.get("x-job-token") { + Some(job_token) => job_token.to_str().expect("valid string"), + None => { + eprintln!("bad artifact post: headers: {:?}\nno x-job-token", headers); + return (StatusCode::BAD_REQUEST, "").into_response(); + } + }; + + let (job, artifact_path, token_validity) = match ctx.0.job_for_token(&job_token).unwrap() { + Some(result) => result, + None => { + eprintln!("bad artifact post: headers: {:?}\njob token is not known", headers); + return (StatusCode::BAD_REQUEST, "").into_response(); + } + }; + + if token_validity != dbctx::TokenValidity::Valid { + eprintln!("bad artifact post: headers: {:?}. token is not valid: {:?}", headers, token_validity); + return (StatusCode::BAD_REQUEST, "").into_response(); + } + + let artifact_path = if let Some(artifact_path) = artifact_path { + artifact_path + } else { + eprintln!("bad artifact post: headers: {:?}. no artifact path?", headers); + return (StatusCode::BAD_REQUEST, "").into_response(); + }; + + let artifact_name = match headers.get("x-artifact-name") { + Some(artifact_name) => artifact_name.to_str().expect("valid string"), + None => { + eprintln!("bad artifact post: headers: {:?}\nno x-artifact-name", headers); + return (StatusCode::BAD_REQUEST, "").into_response(); + } + }; + + let artifact_desc = match headers.get("x-artifact-desc") { + Some(artifact_desc) => artifact_desc.to_str().expect("valid string"), + None => { + eprintln!("bad artifact post: headers: {:?}\nno x-artifact-desc", headers); + return (StatusCode::BAD_REQUEST, "").into_response(); + } + }; + + let mut artifact = match ctx.0.reserve_artifact(job, artifact_name, artifact_desc).await { + Ok(artifact) => artifact, + Err(err) => { + eprintln!("failure to reserve artifact: {:?}", err); + return (StatusCode::INTERNAL_SERVER_ERROR, "").into_response(); + } + }; + + spawn(async move { artifact.store_all(artifact_content).await }); + + (StatusCode::OK, "").into_response() +} + async fn handle_next_job(State(ctx): State<(Arc, mpsc::Sender)>, job_resp: BodyStream) -> impl IntoResponse { let (tx_sender, tx_receiver) = mpsc::channel(8); let resp_body = StreamBody::new(ReceiverStream::new(tx_receiver)); @@ -308,6 +374,7 @@ async fn make_api_server(dbctx: Arc) -> (Router, mpsc::Receiver Result { - let create_artifact_message = serde_json::json!({ - "kind": "artifact_create", - "name": name, - "description": desc, - "job_token": &self.build_token, - }); - client.send(create_artifact_message).await - .map_err(|e| format!("create artifact send error: {:?}", e))?; - let resp = client.recv().await - .map_err(|e| format!("create artifact recv error: {:?}", e))?; - eprintln!("resp: {:?}", resp); - let object_id = resp.unwrap() - .as_object().expect("is an object") - .get("object_id").unwrap().as_str().expect("is str") - .to_owned(); - // POST to this object id... - Ok(ArtifactStream { - object_id, - }) + eprintln!("[?] creating artifact..."); + let (mut sender, body) = hyper::Body::channel(); + let resp = client.http.post("https://ci.butactuallyin.space:9876/api/artifact") + .header("user-agent", "ci-butactuallyin-space-runner") + .header("x-job-token", &self.build_token) + .header("x-artifact-name", name) + .header("x-artifact-desc", desc) + .body(body) + .send() + .await + .map_err(|e| format!("unable to send request: {:?}", e))?; + + if resp.status() == StatusCode::OK { + eprintln!("[+] artifact '{}' started", name); + Ok(ArtifactStream { + sender, + }) + } else { + Err(format!("[-] unable to create artifact: {:?}", resp)) + } } - async fn execute_goodfile(&self, client: &mut RunnerClient) -> Result { - let clone_log = self.create_artifact(client, "git clone log", &format!("git clone {} tmpdir", &self.remote_url)).await.expect("works"); + async fn execute_command(&self, client: &mut RunnerClient, mut command: Command, name: &str, desc: &str) -> Result { + eprintln!("[.] running {}", name); + async fn forward_data(mut source: impl AsyncRead + Unpin, mut dest: impl AsyncWrite + Unpin) -> Result<(), String> { + let mut buf = vec![0; 1024 * 1024]; + loop { + let n_read = source.read(&mut buf).await + .map_err(|e| format!("failed to read: {:?}", e))?; - let clone_res = Command::new("git") + if n_read == 0 { + return Ok(()); + } + + dest.write_all(&buf[..n_read]).await + .map_err(|e| format!("failed to write: {:?}", e))?; + } + } + + let stdout_artifact = self.create_artifact( + client, + &format!("{} (stdout)", name), + &format!("{} (stdout)", desc) + ).await.expect("works"); + let stderr_artifact = self.create_artifact( + client, + &format!("{} (stderr)", name), + &format!("{} (stderr)", desc) + ).await.expect("works"); + + let mut child = command + .stdin(Stdio::null()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + .map_err(|e| format!("failed to spawn '{}', {:?}", name, e))?; + + let child_stdout = child.stdout.take().unwrap(); + let child_stderr = child.stderr.take().unwrap(); + + eprintln!("[.] '{}': forwarding stdout", name); + tokio::spawn(forward_data(child_stdout, stdout_artifact)); + eprintln!("[.] '{}': forwarding stderr", name); + tokio::spawn(forward_data(child_stderr, stderr_artifact)); + + let res = child.wait().await + .map_err(|e| format!("failed to wait? {:?}", e))?; + + if res.success() { + eprintln!("[+] '{}' success", name); + } else { + eprintln!("[-] '{}' fail: {:?}", name, res); + } + + Ok(res) + } + + async fn execute_goodfile(&self, client: &mut RunnerClient) -> Result { + let mut git_clone = Command::new("git"); + git_clone .arg("clone") .arg(&self.remote_url) - .arg("tmpdir") - .status() - .map_err(|e| format!("failed to run git clone? {:?}", e))?; + .arg("tmpdir"); + + let clone_res = self.execute_command(client, git_clone, "git clone log", &format!("git clone {} tmpdir", &self.remote_url)).await?; if !clone_res.success() { return Err(format!("git clone failed: {:?}", clone_res)); } - let checkout_log = self.create_artifact(client, "git checkout log", &format!("git checkout {}", &self.commit)).await.expect("works"); - - let checkout_res = Command::new("git") + let mut git_checkout = Command::new("git"); + git_checkout .current_dir("tmpdir") .arg("checkout") - .arg(&self.commit) - .status() - .map_err(|e| format!("failed to run git checkout? {:?}", e))?; + .arg(&self.commit); + + let checkout_res = self.execute_command(client, git_checkout, "git checkout log", &format!("git checkout {}", &self.commit)).await?; if !checkout_res.success() { return Err(format!("git checkout failed: {:?}", checkout_res)); } - let build_log = self.create_artifact(client, "cargo build log", "cargo build").await.expect("works"); - - let build_res = Command::new("cargo") + let mut build = Command::new("cargo"); + build .current_dir("tmpdir") - .arg("build") - .status() - .map_err(|e| format!("failed to run cargo build? {:?}", e))?; + .arg("build"); + + let build_res = self.execute_command(client, build, "cargo build log", "cargo build").await?; if !build_res.success() { return Err(format!("cargo build failed: {:?}", build_res)); } - let test_log = self.create_artifact(client, "cargo test log", "cargo test").await.expect("works"); - - let test_result = Command::new("cargo") + let mut test = Command::new("cargo"); + test .current_dir("tmpdir") - .arg("test") - .status() - .map_err(|e| format!("failed to run cargo test? {:?}", e))?; + .arg("test"); + + let test_res = self.execute_command(client, test, "cargo test log", "cargo test").await?; - match test_result.code() { + match test_res.code() { Some(0) => Ok("pass".to_string()), Some(n) => Ok(format!("error: {}", n)), None => Ok(format!("abnormal exit")), @@ -105,7 +164,38 @@ impl RequestedJob { } struct ArtifactStream { - object_id: String, + sender: hyper::body::Sender, +} + +impl tokio::io::AsyncWrite for ArtifactStream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &[u8] + ) -> Poll> { + match self.get_mut().sender.try_send_data(buf.to_vec().into()) { + Ok(()) => { + Poll::Ready(Ok(buf.len())) + }, + _ => { + Poll::Pending + } + } + } + + fn poll_flush( + self: Pin<&mut Self>, + _cx: &mut Context + ) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + _cx: &mut Context + ) -> Poll> { + Poll::Ready(Ok(())) + } } impl RunnerClient { @@ -120,6 +210,11 @@ impl RunnerClient { } Ok(Self { + http: reqwest::ClientBuilder::new() + .connect_timeout(Duration::from_millis(1000)) + .timeout(Duration::from_millis(600000)) + .build() + .expect("can build client"), host: host.to_string(), tx: sender, rx: res, @@ -187,6 +282,8 @@ impl RunnerClient { match res { Ok(status) => { + eprintln!("[+] job success!"); + self.send(serde_json::json!({ "kind": "job_status", "state": "finished", @@ -194,6 +291,8 @@ impl RunnerClient { })).await.unwrap(); } Err(status) => { + eprintln!("[-] job error: {}", status); + self.send(serde_json::json!({ "kind": "job_status", "state": "interrupted", @@ -213,6 +312,8 @@ async fn main() { .build() .expect("can build client"); + let allowed_pushers = None; + loop { let (mut sender, body) = hyper::Body::channel(); let poll = client.post("https://ci.butactuallyin.space:9876/api/next_job") diff --git a/src/dbctx.rs b/src/dbctx.rs index 3af2d56..d025b8d 100644 --- a/src/dbctx.rs +++ b/src/dbctx.rs @@ -1,10 +1,15 @@ use std::sync::Mutex; +use futures_util::StreamExt; use rusqlite::{Connection, OptionalExtension}; use std::time::{SystemTime, UNIX_EPOCH}; +use tokio::fs::{File, OpenOptions}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; use crate::notifier::{RemoteNotifier, NotifierConfig}; use crate::sql; +const TOKEN_EXPIRY_MS: u64 = 1000 * 60 * 30; + pub struct DbCtx { pub config_path: String, // don't love this but.. for now... @@ -22,6 +27,61 @@ pub struct PendingJob { pub created_time: u64, } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum TokenValidity { + Expired, + Invalid, + Valid, +} + +pub struct ArtifactDescriptor { + job_id: u64, + artifact_id: u64, + file: File, +} + +impl ArtifactDescriptor { + async fn new(job_id: u64, artifact_id: u64) -> Result { + // TODO: jobs should be a configurable path + let path = format!("jobs/{}/{}", job_id, artifact_id); + let file = OpenOptions::new() + .read(true) + .write(true) + .create_new(true) + .open(&path) + .await + .map_err(|e| format!("couldn't open artifact file {}: {}", path, e))?; + + Ok(ArtifactDescriptor { + job_id, + artifact_id, + file, + }) + } + + pub async fn store_all(&mut self, mut data: axum::extract::BodyStream) -> Result<(), String> { + loop { + let chunk = data.next().await; + + let chunk = match chunk { + Some(Ok(chunk)) => chunk, + Some(Err(e)) => { + return Err(format!("error reading: {:?}", e)); + } + None => { + eprintln!("body done?"); + return Ok(()); + } + }; + + let chunk = chunk.as_ref(); + + self.file.write_all(chunk).await + .map_err(|e| format!("failed to write: {:?}", e))?; + } + } +} + impl DbCtx { pub fn new(config_path: &str, db_path: &str) -> Self { DbCtx { @@ -32,6 +92,7 @@ impl DbCtx { pub fn create_tables(&self) -> Result<(), ()> { let conn = self.conn.lock().unwrap(); + conn.execute(sql::CREATE_ARTIFACTS_TABLE, ()).unwrap(); conn.execute(sql::CREATE_JOBS_TABLE, ()).unwrap(); conn.execute(sql::CREATE_COMMITS_TABLE, ()).unwrap(); conn.execute(sql::CREATE_REPOS_TABLE, ()).unwrap(); @@ -68,6 +129,56 @@ impl DbCtx { Ok(conn.last_insert_rowid() as u64) } + pub async fn reserve_artifact(&self, job_id: u64, name: &str, desc: &str) -> Result { + let artifact_id = { + let conn = self.conn.lock().unwrap(); + conn + .execute( + "insert into artifacts (job_id, name, desc) values (?1, ?2, ?3)", + (job_id, name, desc) + ) + .map_err(|e| { + format!("{:?}", e) + })?; + + conn.last_insert_rowid() as u64 + }; + + ArtifactDescriptor::new(job_id, artifact_id).await + } + + pub fn job_for_token(&self, token: &str) -> Result, TokenValidity)>, String> { + self.conn.lock() + .unwrap() + .query_row( + "select id, artifacts_path, started_time, job_timeout from jobs where build_token=?1", + [token], + |row| { + let timeout: Option = row.get(3).unwrap(); + let timeout = timeout.unwrap_or(TOKEN_EXPIRY_MS); + + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("now is before epoch") + .as_millis(); + + let time: Option = row.get(2).unwrap(); + let validity = if let Some(time) = time { + if now > time as u128 + timeout as u128 { + TokenValidity::Expired + } else { + TokenValidity::Valid + } + } else { + TokenValidity::Invalid + }; + Ok((row.get(0).unwrap(), row.get(1).unwrap(), validity)) + } + ) + .optional() + .map_err(|e| e.to_string()) + } + pub fn repo_id_by_remote(&self, remote_id: u64) -> Result, String> { self.conn.lock() .unwrap() @@ -89,6 +200,9 @@ impl DbCtx { "github" => { (remote.to_owned(), remote_kind.to_owned(), format!("https://www.github.com/{}", remote), format!("https://www.github.com/{}.git", remote)) }, + "github-email" => { + (remote.to_owned(), "email".to_owned(), format!("https://www.github.com/{}", remote), format!("http://www.github.com/{}.git", remote)) + }, other => { panic!("unsupported remote kind: {}", other); } diff --git a/src/notifier.rs b/src/notifier.rs index 4ddc91b..3d9964a 100644 --- a/src/notifier.rs +++ b/src/notifier.rs @@ -1,6 +1,11 @@ use serde_derive::{Deserialize, Serialize}; use std::sync::Arc; use axum::http::StatusCode; +use lettre::transport::smtp::authentication::{Credentials, Mechanism}; +use lettre::{Message, Transport}; +use lettre::transport::smtp::extension::ClientId; +use lettre::transport::smtp::client::{SmtpConnection, TlsParametersBuilder}; +use std::time::Duration; use crate::DbCtx; diff --git a/src/sql.rs b/src/sql.rs index 80eb18a..81c58de 100644 --- a/src/sql.rs +++ b/src/sql.rs @@ -38,7 +38,8 @@ pub const CREATE_JOBS_TABLE: &'static str = "\ commit_id INTEGER, created_time INTEGER, started_time INTEGER, - complete_time INTEGER);"; + complete_time INTEGER, + job_timeout INTEGER);"; pub const CREATE_COMMITS_TABLE: &'static str = "\ CREATE TABLE IF NOT EXISTS commits (id INTEGER PRIMARY KEY AUTOINCREMENT, sha TEXT UNIQUE);"; @@ -62,6 +63,12 @@ pub const CREATE_REMOTES_TABLE: &'static str = "\ remote_git_url TEXT, notifier_config_path TEXT);"; +pub const CREATE_ARTIFACTS_TABLE: &'static str = "\ + CREATE TABLE IF NOT EXISTS artifacts (id INTEGER PRIMARY KEY AUTOINCREMENT, + job_id INTEGER, + name TEXT, + desc TEXT);"; + pub const CREATE_REMOTES_INDEX: &'static str = "\ CREATE INDEX IF NOT EXISTS 'repo_to_remote' ON remotes(repo_id);"; -- cgit v1.1