diff options
author | iximeow <me@iximeow.net> | 2023-06-27 01:47:29 -0700 |
---|---|---|
committer | iximeow <me@iximeow.net> | 2023-06-27 01:47:29 -0700 |
commit | 55ed9f337ae0cf8e1336448d6b4273b3ee31aca2 (patch) | |
tree | e53faad5b4b4c9c03985078bcfac6a9ffcca4e21 | |
parent | 6fa7c3f44d97b20e0614bb0c314b76e302982e6e (diff) |
[api] artifacts/ now supports streaming in-progress artifacts back out as they are provided
-rw-r--r-- | Cargo.lock | 21 | ||||
-rw-r--r-- | Cargo.toml | 1 | ||||
-rw-r--r-- | src/ci_driver.rs | 6 | ||||
-rw-r--r-- | src/dbctx.rs | 43 | ||||
-rw-r--r-- | src/io.rs | 10 | ||||
-rw-r--r-- | src/lua/mod.rs | 11 | ||||
-rw-r--r-- | src/main.rs | 129 | ||||
-rw-r--r-- | src/sql.rs | 7 |
8 files changed, 183 insertions, 45 deletions
@@ -164,6 +164,26 @@ dependencies = [ ] [[package]] +name = "axum-extra" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51227033e4d3acad15c879092ac8a228532707b5db5ff2628f638334f63e1b7a" +dependencies = [ + "axum", + "bytes", + "futures-util", + "http", + "mime", + "pin-project-lite", + "tokio", + "tokio-util", + "tower", + "tower-http", + "tower-layer", + "tower-service", +] + +[[package]] name = "axum-macros" version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1193,6 +1213,7 @@ name = "ixi-ci-server" version = "0.0.1" dependencies = [ "axum", + "axum-extra", "axum-macros", "axum-server", "base64", @@ -8,6 +8,7 @@ edition = "2021" [dependencies] lazy_static = "*" axum = { version = "*" } +axum-extra = { version = "*", features = ["async-read-body"] } axum-server = { version = "*", features = ["tls-rustls"] } handlebars = "*" libc = "*" diff --git a/src/ci_driver.rs b/src/ci_driver.rs index 29a699c..c4a791d 100644 --- a/src/ci_driver.rs +++ b/src/ci_driver.rs @@ -414,7 +414,11 @@ async fn handle_artifact(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClien }; eprintln!("spawning task..."); - spawn(async move { artifact.store_all(artifact_content).await.unwrap() }); + let dbctx_ref = Arc::clone(&ctx.0); + spawn(async move { + artifact.store_all(artifact_content).await.unwrap(); + dbctx_ref.finalize_artifact(artifact.artifact_id).await.unwrap(); + }); eprintln!("done?"); (StatusCode::OK, "").into_response() diff --git a/src/dbctx.rs b/src/dbctx.rs index f545158..926f717 100644 --- a/src/dbctx.rs +++ b/src/dbctx.rs @@ -86,7 +86,9 @@ pub struct ArtifactRecord { pub id: u64, pub job_id: u64, pub name: String, - pub desc: String + pub desc: String, + pub created_time: u64, + pub completed_time: Option<u64>, } impl DbCtx { @@ -148,13 +150,30 @@ impl DbCtx { Ok(conn.last_insert_rowid() as u64) } + pub async fn finalize_artifact(&self, artifact_id: u64) -> Result<(), String> { + let conn = self.conn.lock().unwrap(); + conn + .execute( + "update artifacts set completed_time=?1 where id=?2", + (crate::io::now_ms(), artifact_id) + ) + .map(|_| ()) + .map_err(|e| { + format!("{:?}", e) + }) + } + pub async fn reserve_artifact(&self, job_id: u64, name: &str, desc: &str) -> Result<ArtifactDescriptor, String> { let artifact_id = { + let created_time = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("now is before epoch") + .as_millis() as u64; let conn = self.conn.lock().unwrap(); conn .execute( - "insert into artifacts (job_id, name, desc) values (?1, ?2, ?3)", - (job_id, name, desc) + "insert into artifacts (job_id, name, desc, created_time) values (?1, ?2, ?3, ?4)", + (job_id, name, desc, created_time) ) .map_err(|e| { format!("{:?}", e) @@ -166,6 +185,20 @@ impl DbCtx { ArtifactDescriptor::new(job_id, artifact_id).await } + pub fn lookup_artifact(&self, job_id: u64, artifact_id: u64) -> Result<Option<ArtifactRecord>, String> { + let conn = self.conn.lock().unwrap(); + conn + .query_row(sql::ARTIFACT_BY_ID, [artifact_id, job_id], |row| { + let (id, job_id, name, desc, created_time, completed_time) = row.try_into().unwrap(); + + Ok(ArtifactRecord { + id, job_id, name, desc, created_time, completed_time + }) + }) + .optional() + .map_err(|e| e.to_string()) + } + pub fn commit_sha(&self, commit_id: u64) -> Result<String, String> { self.conn.lock() .unwrap() @@ -321,8 +354,8 @@ impl DbCtx { let mut artifacts = Vec::new(); while let Some(row) = result.next().unwrap() { - let (id, job_id, name, desc): (u64, u64, String, String) = row.try_into().unwrap(); - artifacts.push(ArtifactRecord { id, job_id, name, desc }); + let (id, job_id, name, desc, created_time, completed_time): (u64, u64, String, String, u64, Option<u64>) = row.try_into().unwrap(); + artifacts.push(ArtifactRecord { id, job_id, name, desc, created_time, completed_time }); } Ok(artifacts) @@ -5,6 +5,14 @@ use std::io::Write; use tokio::fs::OpenOptions; use std::task::{Poll, Context}; use std::pin::Pin; +use std::time::{UNIX_EPOCH, SystemTime}; + +pub fn now_ms() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("now is later than epoch") + .as_millis() as u64 +} pub struct ArtifactStream { sender: hyper::body::Sender, @@ -51,7 +59,7 @@ impl tokio::io::AsyncWrite for ArtifactStream { pub struct ArtifactDescriptor { job_id: u64, - artifact_id: u64, + pub artifact_id: u64, file: File, } diff --git a/src/lua/mod.rs b/src/lua/mod.rs index 662b34c..53473cf 100644 --- a/src/lua/mod.rs +++ b/src/lua/mod.rs @@ -4,7 +4,6 @@ use rlua::prelude::*; use std::sync::{Arc, Mutex}; use std::path::PathBuf; -use std::time::{UNIX_EPOCH, SystemTime}; pub const DEFAULT_RUST_GOODFILE: &'static [u8] = include_bytes!("../../config/goodfiles/rust.lua"); @@ -18,7 +17,6 @@ mod lua_exports { use std::sync::{Arc, Mutex}; use std::path::PathBuf; - use std::time::{UNIX_EPOCH, SystemTime}; use rlua::prelude::*; @@ -166,13 +164,6 @@ mod lua_exports { }) } - pub fn now_ms() -> u64 { - SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("now is later than epoch") - .as_millis() as u64 - } - pub fn has_cmd(name: &str) -> Result<bool, rlua::Error> { Ok(std::process::Command::new("which") .arg(name) @@ -239,7 +230,7 @@ impl BuildEnv { lua_exports::metric(name, value, job_ref) })?; - let now_ms = decl_env.create_function("now_ms", move |_, job_ref, ()| Ok(lua_exports::now_ms()))?; + let now_ms = decl_env.create_function("now_ms", move |_, job_ref, ()| Ok(crate::io::now_ms()))?; let artifact = decl_env.create_function("artifact", move |_, job_ref, (path, name): (String, Option<String>)| { lua_exports::artifact(path, name, job_ref) diff --git a/src/main.rs b/src/main.rs index ab6b62c..c1d9664 100644 --- a/src/main.rs +++ b/src/main.rs @@ -20,6 +20,9 @@ use axum::extract::rejection::JsonRejection; use axum::body::Bytes; use axum::http::{StatusCode, Uri}; use http::header::HeaderMap; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use axum::body::StreamBody; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; @@ -43,12 +46,19 @@ struct WebserverConfig { psks: Vec<GithubPsk>, cert_path: PathBuf, key_path: PathBuf, + jobs_path: PathBuf, config_path: PathBuf, db_path: PathBuf, debug_addr: Option<String>, server_addr: Option<String>, } +#[derive(Clone)] +struct WebserverState { + jobs_path: PathBuf, + dbctx: Arc<DbCtx>, +} + #[derive(Clone, Serialize, Deserialize)] struct GithubPsk { key: String, @@ -253,9 +263,9 @@ async fn handle_github_event(ctx: Arc<DbCtx>, owner: String, repo: String, event } } -async fn handle_ci_index(State(ctx): State<Arc<DbCtx>>) -> impl IntoResponse { +async fn handle_ci_index(State(ctx): State<WebserverState>) -> impl IntoResponse { eprintln!("root index"); - let repos = match ctx.get_repos() { + let repos = match ctx.dbctx.get_repos() { Ok(repos) => repos, Err(e) => { eprintln!("failed to get repos: {:?}", e); @@ -293,8 +303,8 @@ async fn handle_ci_index(State(ctx): State<Arc<DbCtx>>) -> impl IntoResponse { for repo in repos { let mut most_recent_job: Option<Job> = None; - for remote in ctx.remotes_by_repo(repo.id).expect("remotes by repo works") { - let last_job = ctx.last_job_from_remote(remote.id).expect("job by remote works"); + for remote in ctx.dbctx.remotes_by_repo(repo.id).expect("remotes by repo works") { + let last_job = ctx.dbctx.last_job_from_remote(remote.id).expect("job by remote works"); if let Some(last_job) = last_job { if most_recent_job.as_ref().map(|job| job.created_time < last_job.created_time).unwrap_or(true) { most_recent_job = Some(last_job); @@ -306,13 +316,13 @@ async fn handle_ci_index(State(ctx): State<Arc<DbCtx>>) -> impl IntoResponse { let row_html: String = match most_recent_job { Some(job) => { - let job_commit = ctx.commit_sha(job.commit_id).expect("job has a commit"); - let commit_html = match commit_url(&job, &job_commit, &ctx) { + let job_commit = ctx.dbctx.commit_sha(job.commit_id).expect("job has a commit"); + let commit_html = match commit_url(&job, &job_commit, &ctx.dbctx) { Some(url) => format!("<a href=\"{}\">{}</a>", url, &job_commit), None => job_commit.clone() }; - let job_html = format!("<a href=\"{}\">{}</a>", job_url(&job, &job_commit, &ctx), job.id); + let job_html = format!("<a href=\"{}\">{}</a>", job_url(&job, &job_commit, &ctx.dbctx), job.id); let last_build_time = Utc.timestamp_millis_opt(job.created_time as i64).unwrap().to_rfc2822(); let duration = if let Some(start_time) = job.start_time { @@ -391,13 +401,13 @@ async fn handle_ci_index(State(ctx): State<Arc<DbCtx>>) -> impl IntoResponse { (StatusCode::OK, Html(response)) } -async fn handle_commit_status(Path(path): Path<(String, String, String)>, State(ctx): State<Arc<DbCtx>>) -> impl IntoResponse { +async fn handle_commit_status(Path(path): Path<(String, String, String)>, State(ctx): State<WebserverState>) -> impl IntoResponse { eprintln!("path: {}/{}, sha {}", path.0, path.1, path.2); let remote_path = format!("{}/{}", path.0, path.1); let sha = path.2; let (commit_id, sha): (u64, String) = if sha.len() >= 7 { - match ctx.conn.lock().unwrap() + match ctx.dbctx.conn.lock().unwrap() .query_row("select id, sha from commits where sha like ?1;", [&format!("{}%", sha)], |row| Ok((row.get_unwrap(0), row.get_unwrap(1)))) .optional() .expect("can query") { @@ -410,17 +420,17 @@ async fn handle_commit_status(Path(path): Path<(String, String, String)>, State( return (StatusCode::NOT_FOUND, Html("<html><body>no such commit</body></html>".to_string())); }; - let (remote_id, repo_id): (u64, u64) = ctx.conn.lock().unwrap() + let (remote_id, repo_id): (u64, u64) = ctx.dbctx.conn.lock().unwrap() .query_row("select id, repo_id from remotes where remote_path=?1;", [&remote_path], |row| Ok((row.get_unwrap(0), row.get_unwrap(1)))) .expect("can query"); - let (job_id, state, build_result, result_desc): (u64, u8, Option<u8>, Option<String>) = ctx.conn.lock().unwrap() + let (job_id, state, build_result, result_desc): (u64, u8, Option<u8>, Option<String>) = ctx.dbctx.conn.lock().unwrap() .query_row("select id, state, build_result, final_status from jobs where commit_id=?1;", [commit_id], |row| Ok((row.get_unwrap(0), row.get_unwrap(1), row.get_unwrap(2), row.get_unwrap(3)))) .expect("can query"); let state: sql::JobState = unsafe { std::mem::transmute(state) }; - let repo_name: String = ctx.conn.lock().unwrap() + let repo_name: String = ctx.dbctx.conn.lock().unwrap() .query_row("select repo_name from repos where id=?1;", [repo_id], |row| row.get(0)) .expect("can query"); @@ -455,7 +465,7 @@ async fn handle_commit_status(Path(path): Path<(String, String, String)>, State( let output = if state == JobState::Finished && build_result == Some(1) || state == JobState::Error { // collect stderr/stdout from the last artifacts, then the last 10kb of each, insert it in // the page... - let artifacts = ctx.artifacts_for_job(job_id).unwrap(); + let artifacts = ctx.dbctx.artifacts_for_job(job_id).unwrap(); if artifacts.len() > 0 { let mut streams = String::new(); for artifact in artifacts.iter() { @@ -473,7 +483,7 @@ async fn handle_commit_status(Path(path): Path<(String, String, String)>, State( None }; - let metrics = ctx.metrics_for_job(job_id).unwrap(); + let metrics = ctx.dbctx.metrics_for_job(job_id).unwrap(); let metrics_section = if metrics.len() > 0 { let mut section = String::new(); section.push_str("<div>"); @@ -516,12 +526,72 @@ async fn handle_commit_status(Path(path): Path<(String, String, String)>, State( (StatusCode::OK, Html(html)) } -async fn handle_repo_summary(Path(path): Path<String>, State(ctx): State<Arc<DbCtx>>) -> impl IntoResponse { +async fn handle_get_artifact(Path(path): Path<(String, String)>, State(ctx): State<WebserverState>) -> impl IntoResponse { + eprintln!("get artifact, job={}, artifact={}", path.0, path.1); + let job_id: u64 = path.0.parse().unwrap(); + let artifact_id: u64 = path.1.parse().unwrap(); + + let artifact_descriptor = match ctx.dbctx.lookup_artifact(job_id, artifact_id).unwrap() { + Some(artifact) => artifact, + None => { + return (StatusCode::NOT_FOUND, Html("no such artifact")).into_response(); + } + }; + + let mut live_artifact = false; + + if let Some(completed_time) = artifact_descriptor.completed_time { + if completed_time < artifact_descriptor.created_time { + live_artifact = true; + } + } else { + live_artifact = true; + } + + if live_artifact { + let (mut tx_sender, tx_receiver) = tokio::io::duplex(65536); + let resp_body = axum_extra::body::AsyncReadBody::new(tx_receiver); + let mut artifact_path = ctx.jobs_path.clone(); + artifact_path.push(artifact_descriptor.job_id.to_string()); + artifact_path.push(artifact_descriptor.id.to_string()); + spawn(async move { + let mut artifact = artifact_descriptor; + + let mut artifact_file = tokio::fs::File::open(&artifact_path) + .await + .expect("artifact file exists?"); + while artifact.completed_time.is_none() { + match crate::io::forward_data(&mut artifact_file, &mut tx_sender).await { + Ok(()) => { + // reached the current EOF, wait and then commit an unspeakable sin + tokio::time::sleep(std::time::Duration::from_millis(250)).await; + // this would be much implemented as yielding on a condvar woken when an + // inotify event on the file indicates a write has occurred. but i am + // dreadfully lazy, so we'll just uhh. busy-poll on the file? lmao. + artifact = ctx.dbctx.lookup_artifact(artifact.job_id, artifact.id) + .expect("can query db") + .expect("artifact still exists"); + } + Err(e) => { + eprintln!("artifact file streaming failed: {}", e); + } + } + } + + eprintln!("[+] artifact {} is done being written, and we've sent the whole thing. bye!", artifact.id); + }); + (StatusCode::OK, resp_body).into_response() + } else { + (StatusCode::OK, Html("all done")).into_response() + } +} + +async fn handle_repo_summary(Path(path): Path<String>, State(ctx): State<WebserverState>) -> impl IntoResponse { eprintln!("get repo summary: {:?}", path); let mut last_builds = Vec::new(); - let (repo_id, repo_name): (u64, String) = match ctx.conn.lock().unwrap() + let (repo_id, repo_name): (u64, String) = match ctx.dbctx.conn.lock().unwrap() .query_row("select id, repo_name from repos where repo_name=?1;", [&path], |row| Ok((row.get(0).unwrap(), row.get(1).unwrap()))) .optional() .unwrap() { @@ -532,8 +602,8 @@ async fn handle_repo_summary(Path(path): Path<String>, State(ctx): State<Arc<DbC } }; - for remote in ctx.remotes_by_repo(repo_id).expect("can get repo from a path") { - let mut last_ten_jobs = ctx.recent_jobs_from_remote(remote.id, 10).expect("can look up jobs for a repo"); + for remote in ctx.dbctx.remotes_by_repo(repo_id).expect("can get repo from a path") { + let mut last_ten_jobs = ctx.dbctx.recent_jobs_from_remote(remote.id, 10).expect("can look up jobs for a repo"); last_builds.extend(last_ten_jobs.drain(..)); } last_builds.sort_by_key(|job| -(job.created_time as i64)); @@ -559,13 +629,13 @@ async fn handle_repo_summary(Path(path): Path<String>, State(ctx): State<Arc<DbC response.push_str("</tr>\n"); for job in last_builds.iter().take(10) { - let job_commit = ctx.commit_sha(job.commit_id).expect("job has a commit"); - let commit_html = match commit_url(&job, &job_commit, &ctx) { + let job_commit = ctx.dbctx.commit_sha(job.commit_id).expect("job has a commit"); + let commit_html = match commit_url(&job, &job_commit, &ctx.dbctx) { Some(url) => format!("<a href=\"{}\">{}</a>", url, &job_commit), None => job_commit.clone() }; - let job_html = format!("<a href=\"{}\">{}</a>", job_url(&job, &job_commit, &ctx), job.id); + let job_html = format!("<a href=\"{}\">{}</a>", job_url(&job, &job_commit, &ctx.dbctx), job.id); let last_build_time = Utc.timestamp_millis_opt(job.created_time as i64).unwrap().to_rfc2822(); let duration = if let Some(start_time) = job.start_time { @@ -625,7 +695,7 @@ async fn handle_repo_summary(Path(path): Path<String>, State(ctx): State<Arc<DbC (StatusCode::OK, Html(response)) } -async fn handle_repo_event(Path(path): Path<(String, String)>, headers: HeaderMap, State(ctx): State<Arc<DbCtx>>, body: Bytes) -> impl IntoResponse { +async fn handle_repo_event(Path(path): Path<(String, String)>, headers: HeaderMap, State(ctx): State<WebserverState>, body: Bytes) -> impl IntoResponse { let json: Result<serde_json::Value, _> = serde_json::from_slice(&body); eprintln!("repo event: {:?} {:?} {:?}", path.0, path.1, headers); @@ -674,11 +744,11 @@ async fn handle_repo_event(Path(path): Path<(String, String)>, headers: HeaderMa } }; - handle_github_event(ctx, path.0, path.1, kind, payload).await + handle_github_event(Arc::clone(&ctx.dbctx), path.0, path.1, kind, payload).await } -async fn make_app_server(cfg_path: &PathBuf, db_path: &PathBuf) -> Router { +async fn make_app_server(jobs_path: PathBuf, cfg_path: &PathBuf, db_path: &PathBuf) -> Router { /* // GET /hello/warp => 200 OK with body "Hello, warp!" @@ -743,9 +813,13 @@ async fn make_app_server(cfg_path: &PathBuf, db_path: &PathBuf) -> Router { .route("/:owner/:repo/:sha", get(handle_commit_status)) .route("/:owner", get(handle_repo_summary)) .route("/:owner/:repo", post(handle_repo_event)) + .route("/artifact/:job/:artifact_id", get(handle_get_artifact)) .route("/", get(handle_ci_index)) .fallback(fallback_get) - .with_state(Arc::new(DbCtx::new(cfg_path, db_path))) + .with_state(WebserverState { + jobs_path, + dbctx: Arc::new(DbCtx::new(cfg_path, db_path)) + }) } #[tokio::main] @@ -766,15 +840,16 @@ async fn main() { web_config.key_path.clone(), ).await.unwrap(); + let jobs_path = web_config.jobs_path.clone(); let config_path = web_config.config_path.clone(); let db_path = web_config.db_path.clone(); if let Some(addr) = web_config.debug_addr.as_ref() { spawn(axum_server::bind_rustls("127.0.0.1:8080".parse().unwrap(), config.clone()) - .serve(make_app_server(&config_path, &db_path).await.into_make_service())); + .serve(make_app_server(jobs_path.clone(), &config_path, &db_path).await.into_make_service())); } if let Some(addr) = web_config.server_addr.as_ref() { spawn(axum_server::bind_rustls("0.0.0.0:443".parse().unwrap(), config) - .serve(make_app_server(&config_path, &db_path).await.into_make_service())); + .serve(make_app_server(jobs_path.clone(), &config_path, &db_path).await.into_make_service())); } loop { tokio::time::sleep(std::time::Duration::from_millis(1000)).await; @@ -84,7 +84,9 @@ pub const CREATE_ARTIFACTS_TABLE: &'static str = "\ CREATE TABLE IF NOT EXISTS artifacts (id INTEGER PRIMARY KEY AUTOINCREMENT, job_id INTEGER, name TEXT, - desc TEXT);"; + desc TEXT, + created_time INTEGER, + completed_time INTEGER);"; pub const CREATE_REMOTES_INDEX: &'static str = "\ CREATE INDEX IF NOT EXISTS 'repo_to_remote' ON remotes(repo_id);"; @@ -98,6 +100,9 @@ pub const PENDING_JOBS: &'static str = "\ pub const LAST_ARTIFACTS_FOR_JOB: &'static str = "\ select * from artifacts where job_id=?1 and (name like \"%(stderr)%\" or name like \"%(stdout)%\") order by id desc limit 2;"; +pub const ARTIFACT_BY_ID: &'static str = "\ + select * from artifacts where id=?1 and job_id=?2;"; + pub const METRICS_FOR_JOB: &'static str = "\ select * from metrics where job_id=?1 order by id asc;"; |