From 55ed9f337ae0cf8e1336448d6b4273b3ee31aca2 Mon Sep 17 00:00:00 2001 From: iximeow Date: Tue, 27 Jun 2023 01:47:29 -0700 Subject: [api] artifacts/ now supports streaming in-progress artifacts back out as they are provided --- src/ci_driver.rs | 6 ++- src/dbctx.rs | 43 ++++++++++++++++--- src/io.rs | 10 ++++- src/lua/mod.rs | 11 +---- src/main.rs | 129 +++++++++++++++++++++++++++++++++++++++++++------------ src/sql.rs | 7 ++- 6 files changed, 161 insertions(+), 45 deletions(-) (limited to 'src') diff --git a/src/ci_driver.rs b/src/ci_driver.rs index 29a699c..c4a791d 100644 --- a/src/ci_driver.rs +++ b/src/ci_driver.rs @@ -414,7 +414,11 @@ async fn handle_artifact(State(ctx): State<(Arc, mpsc::Sender, } impl DbCtx { @@ -148,13 +150,30 @@ impl DbCtx { Ok(conn.last_insert_rowid() as u64) } + pub async fn finalize_artifact(&self, artifact_id: u64) -> Result<(), String> { + let conn = self.conn.lock().unwrap(); + conn + .execute( + "update artifacts set completed_time=?1 where id=?2", + (crate::io::now_ms(), artifact_id) + ) + .map(|_| ()) + .map_err(|e| { + format!("{:?}", e) + }) + } + pub async fn reserve_artifact(&self, job_id: u64, name: &str, desc: &str) -> Result { let artifact_id = { + let created_time = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("now is before epoch") + .as_millis() as u64; let conn = self.conn.lock().unwrap(); conn .execute( - "insert into artifacts (job_id, name, desc) values (?1, ?2, ?3)", - (job_id, name, desc) + "insert into artifacts (job_id, name, desc, created_time) values (?1, ?2, ?3, ?4)", + (job_id, name, desc, created_time) ) .map_err(|e| { format!("{:?}", e) @@ -166,6 +185,20 @@ impl DbCtx { ArtifactDescriptor::new(job_id, artifact_id).await } + pub fn lookup_artifact(&self, job_id: u64, artifact_id: u64) -> Result, String> { + let conn = self.conn.lock().unwrap(); + conn + .query_row(sql::ARTIFACT_BY_ID, [artifact_id, job_id], |row| { + let (id, job_id, name, desc, created_time, completed_time) = row.try_into().unwrap(); + + Ok(ArtifactRecord { + id, job_id, name, desc, created_time, completed_time + }) + }) + .optional() + .map_err(|e| e.to_string()) + } + pub fn commit_sha(&self, commit_id: u64) -> Result { self.conn.lock() .unwrap() @@ -321,8 +354,8 @@ impl DbCtx { let mut artifacts = Vec::new(); while let Some(row) = result.next().unwrap() { - let (id, job_id, name, desc): (u64, u64, String, String) = row.try_into().unwrap(); - artifacts.push(ArtifactRecord { id, job_id, name, desc }); + let (id, job_id, name, desc, created_time, completed_time): (u64, u64, String, String, u64, Option) = row.try_into().unwrap(); + artifacts.push(ArtifactRecord { id, job_id, name, desc, created_time, completed_time }); } Ok(artifacts) diff --git a/src/io.rs b/src/io.rs index 219edbf..6e71282 100644 --- a/src/io.rs +++ b/src/io.rs @@ -5,6 +5,14 @@ use std::io::Write; use tokio::fs::OpenOptions; use std::task::{Poll, Context}; use std::pin::Pin; +use std::time::{UNIX_EPOCH, SystemTime}; + +pub fn now_ms() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("now is later than epoch") + .as_millis() as u64 +} pub struct ArtifactStream { sender: hyper::body::Sender, @@ -51,7 +59,7 @@ impl tokio::io::AsyncWrite for ArtifactStream { pub struct ArtifactDescriptor { job_id: u64, - artifact_id: u64, + pub artifact_id: u64, file: File, } diff --git a/src/lua/mod.rs b/src/lua/mod.rs index 662b34c..53473cf 100644 --- a/src/lua/mod.rs +++ b/src/lua/mod.rs @@ -4,7 +4,6 @@ use rlua::prelude::*; use std::sync::{Arc, Mutex}; use std::path::PathBuf; -use std::time::{UNIX_EPOCH, SystemTime}; pub const DEFAULT_RUST_GOODFILE: &'static [u8] = include_bytes!("../../config/goodfiles/rust.lua"); @@ -18,7 +17,6 @@ mod lua_exports { use std::sync::{Arc, Mutex}; use std::path::PathBuf; - use std::time::{UNIX_EPOCH, SystemTime}; use rlua::prelude::*; @@ -166,13 +164,6 @@ mod lua_exports { }) } - pub fn now_ms() -> u64 { - SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("now is later than epoch") - .as_millis() as u64 - } - pub fn has_cmd(name: &str) -> Result { Ok(std::process::Command::new("which") .arg(name) @@ -239,7 +230,7 @@ impl BuildEnv { lua_exports::metric(name, value, job_ref) })?; - let now_ms = decl_env.create_function("now_ms", move |_, job_ref, ()| Ok(lua_exports::now_ms()))?; + let now_ms = decl_env.create_function("now_ms", move |_, job_ref, ()| Ok(crate::io::now_ms()))?; let artifact = decl_env.create_function("artifact", move |_, job_ref, (path, name): (String, Option)| { lua_exports::artifact(path, name, job_ref) diff --git a/src/main.rs b/src/main.rs index ab6b62c..c1d9664 100644 --- a/src/main.rs +++ b/src/main.rs @@ -20,6 +20,9 @@ use axum::extract::rejection::JsonRejection; use axum::body::Bytes; use axum::http::{StatusCode, Uri}; use http::header::HeaderMap; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use axum::body::StreamBody; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; @@ -43,12 +46,19 @@ struct WebserverConfig { psks: Vec, cert_path: PathBuf, key_path: PathBuf, + jobs_path: PathBuf, config_path: PathBuf, db_path: PathBuf, debug_addr: Option, server_addr: Option, } +#[derive(Clone)] +struct WebserverState { + jobs_path: PathBuf, + dbctx: Arc, +} + #[derive(Clone, Serialize, Deserialize)] struct GithubPsk { key: String, @@ -253,9 +263,9 @@ async fn handle_github_event(ctx: Arc, owner: String, repo: String, event } } -async fn handle_ci_index(State(ctx): State>) -> impl IntoResponse { +async fn handle_ci_index(State(ctx): State) -> impl IntoResponse { eprintln!("root index"); - let repos = match ctx.get_repos() { + let repos = match ctx.dbctx.get_repos() { Ok(repos) => repos, Err(e) => { eprintln!("failed to get repos: {:?}", e); @@ -293,8 +303,8 @@ async fn handle_ci_index(State(ctx): State>) -> impl IntoResponse { for repo in repos { let mut most_recent_job: Option = None; - for remote in ctx.remotes_by_repo(repo.id).expect("remotes by repo works") { - let last_job = ctx.last_job_from_remote(remote.id).expect("job by remote works"); + for remote in ctx.dbctx.remotes_by_repo(repo.id).expect("remotes by repo works") { + let last_job = ctx.dbctx.last_job_from_remote(remote.id).expect("job by remote works"); if let Some(last_job) = last_job { if most_recent_job.as_ref().map(|job| job.created_time < last_job.created_time).unwrap_or(true) { most_recent_job = Some(last_job); @@ -306,13 +316,13 @@ async fn handle_ci_index(State(ctx): State>) -> impl IntoResponse { let row_html: String = match most_recent_job { Some(job) => { - let job_commit = ctx.commit_sha(job.commit_id).expect("job has a commit"); - let commit_html = match commit_url(&job, &job_commit, &ctx) { + let job_commit = ctx.dbctx.commit_sha(job.commit_id).expect("job has a commit"); + let commit_html = match commit_url(&job, &job_commit, &ctx.dbctx) { Some(url) => format!("{}", url, &job_commit), None => job_commit.clone() }; - let job_html = format!("{}", job_url(&job, &job_commit, &ctx), job.id); + let job_html = format!("{}", job_url(&job, &job_commit, &ctx.dbctx), job.id); let last_build_time = Utc.timestamp_millis_opt(job.created_time as i64).unwrap().to_rfc2822(); let duration = if let Some(start_time) = job.start_time { @@ -391,13 +401,13 @@ async fn handle_ci_index(State(ctx): State>) -> impl IntoResponse { (StatusCode::OK, Html(response)) } -async fn handle_commit_status(Path(path): Path<(String, String, String)>, State(ctx): State>) -> impl IntoResponse { +async fn handle_commit_status(Path(path): Path<(String, String, String)>, State(ctx): State) -> impl IntoResponse { eprintln!("path: {}/{}, sha {}", path.0, path.1, path.2); let remote_path = format!("{}/{}", path.0, path.1); let sha = path.2; let (commit_id, sha): (u64, String) = if sha.len() >= 7 { - match ctx.conn.lock().unwrap() + match ctx.dbctx.conn.lock().unwrap() .query_row("select id, sha from commits where sha like ?1;", [&format!("{}%", sha)], |row| Ok((row.get_unwrap(0), row.get_unwrap(1)))) .optional() .expect("can query") { @@ -410,17 +420,17 @@ async fn handle_commit_status(Path(path): Path<(String, String, String)>, State( return (StatusCode::NOT_FOUND, Html("no such commit".to_string())); }; - let (remote_id, repo_id): (u64, u64) = ctx.conn.lock().unwrap() + let (remote_id, repo_id): (u64, u64) = ctx.dbctx.conn.lock().unwrap() .query_row("select id, repo_id from remotes where remote_path=?1;", [&remote_path], |row| Ok((row.get_unwrap(0), row.get_unwrap(1)))) .expect("can query"); - let (job_id, state, build_result, result_desc): (u64, u8, Option, Option) = ctx.conn.lock().unwrap() + let (job_id, state, build_result, result_desc): (u64, u8, Option, Option) = ctx.dbctx.conn.lock().unwrap() .query_row("select id, state, build_result, final_status from jobs where commit_id=?1;", [commit_id], |row| Ok((row.get_unwrap(0), row.get_unwrap(1), row.get_unwrap(2), row.get_unwrap(3)))) .expect("can query"); let state: sql::JobState = unsafe { std::mem::transmute(state) }; - let repo_name: String = ctx.conn.lock().unwrap() + let repo_name: String = ctx.dbctx.conn.lock().unwrap() .query_row("select repo_name from repos where id=?1;", [repo_id], |row| row.get(0)) .expect("can query"); @@ -455,7 +465,7 @@ async fn handle_commit_status(Path(path): Path<(String, String, String)>, State( let output = if state == JobState::Finished && build_result == Some(1) || state == JobState::Error { // collect stderr/stdout from the last artifacts, then the last 10kb of each, insert it in // the page... - let artifacts = ctx.artifacts_for_job(job_id).unwrap(); + let artifacts = ctx.dbctx.artifacts_for_job(job_id).unwrap(); if artifacts.len() > 0 { let mut streams = String::new(); for artifact in artifacts.iter() { @@ -473,7 +483,7 @@ async fn handle_commit_status(Path(path): Path<(String, String, String)>, State( None }; - let metrics = ctx.metrics_for_job(job_id).unwrap(); + let metrics = ctx.dbctx.metrics_for_job(job_id).unwrap(); let metrics_section = if metrics.len() > 0 { let mut section = String::new(); section.push_str("
"); @@ -516,12 +526,72 @@ async fn handle_commit_status(Path(path): Path<(String, String, String)>, State( (StatusCode::OK, Html(html)) } -async fn handle_repo_summary(Path(path): Path, State(ctx): State>) -> impl IntoResponse { +async fn handle_get_artifact(Path(path): Path<(String, String)>, State(ctx): State) -> impl IntoResponse { + eprintln!("get artifact, job={}, artifact={}", path.0, path.1); + let job_id: u64 = path.0.parse().unwrap(); + let artifact_id: u64 = path.1.parse().unwrap(); + + let artifact_descriptor = match ctx.dbctx.lookup_artifact(job_id, artifact_id).unwrap() { + Some(artifact) => artifact, + None => { + return (StatusCode::NOT_FOUND, Html("no such artifact")).into_response(); + } + }; + + let mut live_artifact = false; + + if let Some(completed_time) = artifact_descriptor.completed_time { + if completed_time < artifact_descriptor.created_time { + live_artifact = true; + } + } else { + live_artifact = true; + } + + if live_artifact { + let (mut tx_sender, tx_receiver) = tokio::io::duplex(65536); + let resp_body = axum_extra::body::AsyncReadBody::new(tx_receiver); + let mut artifact_path = ctx.jobs_path.clone(); + artifact_path.push(artifact_descriptor.job_id.to_string()); + artifact_path.push(artifact_descriptor.id.to_string()); + spawn(async move { + let mut artifact = artifact_descriptor; + + let mut artifact_file = tokio::fs::File::open(&artifact_path) + .await + .expect("artifact file exists?"); + while artifact.completed_time.is_none() { + match crate::io::forward_data(&mut artifact_file, &mut tx_sender).await { + Ok(()) => { + // reached the current EOF, wait and then commit an unspeakable sin + tokio::time::sleep(std::time::Duration::from_millis(250)).await; + // this would be much implemented as yielding on a condvar woken when an + // inotify event on the file indicates a write has occurred. but i am + // dreadfully lazy, so we'll just uhh. busy-poll on the file? lmao. + artifact = ctx.dbctx.lookup_artifact(artifact.job_id, artifact.id) + .expect("can query db") + .expect("artifact still exists"); + } + Err(e) => { + eprintln!("artifact file streaming failed: {}", e); + } + } + } + + eprintln!("[+] artifact {} is done being written, and we've sent the whole thing. bye!", artifact.id); + }); + (StatusCode::OK, resp_body).into_response() + } else { + (StatusCode::OK, Html("all done")).into_response() + } +} + +async fn handle_repo_summary(Path(path): Path, State(ctx): State) -> impl IntoResponse { eprintln!("get repo summary: {:?}", path); let mut last_builds = Vec::new(); - let (repo_id, repo_name): (u64, String) = match ctx.conn.lock().unwrap() + let (repo_id, repo_name): (u64, String) = match ctx.dbctx.conn.lock().unwrap() .query_row("select id, repo_name from repos where repo_name=?1;", [&path], |row| Ok((row.get(0).unwrap(), row.get(1).unwrap()))) .optional() .unwrap() { @@ -532,8 +602,8 @@ async fn handle_repo_summary(Path(path): Path, State(ctx): State, State(ctx): State\n"); for job in last_builds.iter().take(10) { - let job_commit = ctx.commit_sha(job.commit_id).expect("job has a commit"); - let commit_html = match commit_url(&job, &job_commit, &ctx) { + let job_commit = ctx.dbctx.commit_sha(job.commit_id).expect("job has a commit"); + let commit_html = match commit_url(&job, &job_commit, &ctx.dbctx) { Some(url) => format!("{}", url, &job_commit), None => job_commit.clone() }; - let job_html = format!("{}", job_url(&job, &job_commit, &ctx), job.id); + let job_html = format!("{}", job_url(&job, &job_commit, &ctx.dbctx), job.id); let last_build_time = Utc.timestamp_millis_opt(job.created_time as i64).unwrap().to_rfc2822(); let duration = if let Some(start_time) = job.start_time { @@ -625,7 +695,7 @@ async fn handle_repo_summary(Path(path): Path, State(ctx): State, headers: HeaderMap, State(ctx): State>, body: Bytes) -> impl IntoResponse { +async fn handle_repo_event(Path(path): Path<(String, String)>, headers: HeaderMap, State(ctx): State, body: Bytes) -> impl IntoResponse { let json: Result = serde_json::from_slice(&body); eprintln!("repo event: {:?} {:?} {:?}", path.0, path.1, headers); @@ -674,11 +744,11 @@ async fn handle_repo_event(Path(path): Path<(String, String)>, headers: HeaderMa } }; - handle_github_event(ctx, path.0, path.1, kind, payload).await + handle_github_event(Arc::clone(&ctx.dbctx), path.0, path.1, kind, payload).await } -async fn make_app_server(cfg_path: &PathBuf, db_path: &PathBuf) -> Router { +async fn make_app_server(jobs_path: PathBuf, cfg_path: &PathBuf, db_path: &PathBuf) -> Router { /* // GET /hello/warp => 200 OK with body "Hello, warp!" @@ -743,9 +813,13 @@ async fn make_app_server(cfg_path: &PathBuf, db_path: &PathBuf) -> Router { .route("/:owner/:repo/:sha", get(handle_commit_status)) .route("/:owner", get(handle_repo_summary)) .route("/:owner/:repo", post(handle_repo_event)) + .route("/artifact/:job/:artifact_id", get(handle_get_artifact)) .route("/", get(handle_ci_index)) .fallback(fallback_get) - .with_state(Arc::new(DbCtx::new(cfg_path, db_path))) + .with_state(WebserverState { + jobs_path, + dbctx: Arc::new(DbCtx::new(cfg_path, db_path)) + }) } #[tokio::main] @@ -766,15 +840,16 @@ async fn main() { web_config.key_path.clone(), ).await.unwrap(); + let jobs_path = web_config.jobs_path.clone(); let config_path = web_config.config_path.clone(); let db_path = web_config.db_path.clone(); if let Some(addr) = web_config.debug_addr.as_ref() { spawn(axum_server::bind_rustls("127.0.0.1:8080".parse().unwrap(), config.clone()) - .serve(make_app_server(&config_path, &db_path).await.into_make_service())); + .serve(make_app_server(jobs_path.clone(), &config_path, &db_path).await.into_make_service())); } if let Some(addr) = web_config.server_addr.as_ref() { spawn(axum_server::bind_rustls("0.0.0.0:443".parse().unwrap(), config) - .serve(make_app_server(&config_path, &db_path).await.into_make_service())); + .serve(make_app_server(jobs_path.clone(), &config_path, &db_path).await.into_make_service())); } loop { tokio::time::sleep(std::time::Duration::from_millis(1000)).await; diff --git a/src/sql.rs b/src/sql.rs index 6e0141a..f5ae731 100644 --- a/src/sql.rs +++ b/src/sql.rs @@ -84,7 +84,9 @@ pub const CREATE_ARTIFACTS_TABLE: &'static str = "\ CREATE TABLE IF NOT EXISTS artifacts (id INTEGER PRIMARY KEY AUTOINCREMENT, job_id INTEGER, name TEXT, - desc TEXT);"; + desc TEXT, + created_time INTEGER, + completed_time INTEGER);"; pub const CREATE_REMOTES_INDEX: &'static str = "\ CREATE INDEX IF NOT EXISTS 'repo_to_remote' ON remotes(repo_id);"; @@ -98,6 +100,9 @@ pub const PENDING_JOBS: &'static str = "\ pub const LAST_ARTIFACTS_FOR_JOB: &'static str = "\ select * from artifacts where job_id=?1 and (name like \"%(stderr)%\" or name like \"%(stdout)%\") order by id desc limit 2;"; +pub const ARTIFACT_BY_ID: &'static str = "\ + select * from artifacts where id=?1 and job_id=?2;"; + pub const METRICS_FOR_JOB: &'static str = "\ select * from metrics where job_id=?1 order by id asc;"; -- cgit v1.1