From 55ed9f337ae0cf8e1336448d6b4273b3ee31aca2 Mon Sep 17 00:00:00 2001 From: iximeow Date: Tue, 27 Jun 2023 01:47:29 -0700 Subject: [api] artifacts/ now supports streaming in-progress artifacts back out as they are provided --- src/main.rs | 129 +++++++++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 102 insertions(+), 27 deletions(-) (limited to 'src/main.rs') diff --git a/src/main.rs b/src/main.rs index ab6b62c..c1d9664 100644 --- a/src/main.rs +++ b/src/main.rs @@ -20,6 +20,9 @@ use axum::extract::rejection::JsonRejection; use axum::body::Bytes; use axum::http::{StatusCode, Uri}; use http::header::HeaderMap; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use axum::body::StreamBody; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; @@ -43,12 +46,19 @@ struct WebserverConfig { psks: Vec, cert_path: PathBuf, key_path: PathBuf, + jobs_path: PathBuf, config_path: PathBuf, db_path: PathBuf, debug_addr: Option, server_addr: Option, } +#[derive(Clone)] +struct WebserverState { + jobs_path: PathBuf, + dbctx: Arc, +} + #[derive(Clone, Serialize, Deserialize)] struct GithubPsk { key: String, @@ -253,9 +263,9 @@ async fn handle_github_event(ctx: Arc, owner: String, repo: String, event } } -async fn handle_ci_index(State(ctx): State>) -> impl IntoResponse { +async fn handle_ci_index(State(ctx): State) -> impl IntoResponse { eprintln!("root index"); - let repos = match ctx.get_repos() { + let repos = match ctx.dbctx.get_repos() { Ok(repos) => repos, Err(e) => { eprintln!("failed to get repos: {:?}", e); @@ -293,8 +303,8 @@ async fn handle_ci_index(State(ctx): State>) -> impl IntoResponse { for repo in repos { let mut most_recent_job: Option = None; - for remote in ctx.remotes_by_repo(repo.id).expect("remotes by repo works") { - let last_job = ctx.last_job_from_remote(remote.id).expect("job by remote works"); + for remote in ctx.dbctx.remotes_by_repo(repo.id).expect("remotes by repo works") { + let last_job = ctx.dbctx.last_job_from_remote(remote.id).expect("job by remote works"); if let Some(last_job) = last_job { if most_recent_job.as_ref().map(|job| job.created_time < last_job.created_time).unwrap_or(true) { most_recent_job = Some(last_job); @@ -306,13 +316,13 @@ async fn handle_ci_index(State(ctx): State>) -> impl IntoResponse { let row_html: String = match most_recent_job { Some(job) => { - let job_commit = ctx.commit_sha(job.commit_id).expect("job has a commit"); - let commit_html = match commit_url(&job, &job_commit, &ctx) { + let job_commit = ctx.dbctx.commit_sha(job.commit_id).expect("job has a commit"); + let commit_html = match commit_url(&job, &job_commit, &ctx.dbctx) { Some(url) => format!("{}", url, &job_commit), None => job_commit.clone() }; - let job_html = format!("{}", job_url(&job, &job_commit, &ctx), job.id); + let job_html = format!("{}", job_url(&job, &job_commit, &ctx.dbctx), job.id); let last_build_time = Utc.timestamp_millis_opt(job.created_time as i64).unwrap().to_rfc2822(); let duration = if let Some(start_time) = job.start_time { @@ -391,13 +401,13 @@ async fn handle_ci_index(State(ctx): State>) -> impl IntoResponse { (StatusCode::OK, Html(response)) } -async fn handle_commit_status(Path(path): Path<(String, String, String)>, State(ctx): State>) -> impl IntoResponse { +async fn handle_commit_status(Path(path): Path<(String, String, String)>, State(ctx): State) -> impl IntoResponse { eprintln!("path: {}/{}, sha {}", path.0, path.1, path.2); let remote_path = format!("{}/{}", path.0, path.1); let sha = path.2; let (commit_id, sha): (u64, String) = if sha.len() >= 7 { - match ctx.conn.lock().unwrap() + match ctx.dbctx.conn.lock().unwrap() .query_row("select id, sha from commits where sha like ?1;", [&format!("{}%", sha)], |row| Ok((row.get_unwrap(0), row.get_unwrap(1)))) .optional() .expect("can query") { @@ -410,17 +420,17 @@ async fn handle_commit_status(Path(path): Path<(String, String, String)>, State( return (StatusCode::NOT_FOUND, Html("no such commit".to_string())); }; - let (remote_id, repo_id): (u64, u64) = ctx.conn.lock().unwrap() + let (remote_id, repo_id): (u64, u64) = ctx.dbctx.conn.lock().unwrap() .query_row("select id, repo_id from remotes where remote_path=?1;", [&remote_path], |row| Ok((row.get_unwrap(0), row.get_unwrap(1)))) .expect("can query"); - let (job_id, state, build_result, result_desc): (u64, u8, Option, Option) = ctx.conn.lock().unwrap() + let (job_id, state, build_result, result_desc): (u64, u8, Option, Option) = ctx.dbctx.conn.lock().unwrap() .query_row("select id, state, build_result, final_status from jobs where commit_id=?1;", [commit_id], |row| Ok((row.get_unwrap(0), row.get_unwrap(1), row.get_unwrap(2), row.get_unwrap(3)))) .expect("can query"); let state: sql::JobState = unsafe { std::mem::transmute(state) }; - let repo_name: String = ctx.conn.lock().unwrap() + let repo_name: String = ctx.dbctx.conn.lock().unwrap() .query_row("select repo_name from repos where id=?1;", [repo_id], |row| row.get(0)) .expect("can query"); @@ -455,7 +465,7 @@ async fn handle_commit_status(Path(path): Path<(String, String, String)>, State( let output = if state == JobState::Finished && build_result == Some(1) || state == JobState::Error { // collect stderr/stdout from the last artifacts, then the last 10kb of each, insert it in // the page... - let artifacts = ctx.artifacts_for_job(job_id).unwrap(); + let artifacts = ctx.dbctx.artifacts_for_job(job_id).unwrap(); if artifacts.len() > 0 { let mut streams = String::new(); for artifact in artifacts.iter() { @@ -473,7 +483,7 @@ async fn handle_commit_status(Path(path): Path<(String, String, String)>, State( None }; - let metrics = ctx.metrics_for_job(job_id).unwrap(); + let metrics = ctx.dbctx.metrics_for_job(job_id).unwrap(); let metrics_section = if metrics.len() > 0 { let mut section = String::new(); section.push_str("
"); @@ -516,12 +526,72 @@ async fn handle_commit_status(Path(path): Path<(String, String, String)>, State( (StatusCode::OK, Html(html)) } -async fn handle_repo_summary(Path(path): Path, State(ctx): State>) -> impl IntoResponse { +async fn handle_get_artifact(Path(path): Path<(String, String)>, State(ctx): State) -> impl IntoResponse { + eprintln!("get artifact, job={}, artifact={}", path.0, path.1); + let job_id: u64 = path.0.parse().unwrap(); + let artifact_id: u64 = path.1.parse().unwrap(); + + let artifact_descriptor = match ctx.dbctx.lookup_artifact(job_id, artifact_id).unwrap() { + Some(artifact) => artifact, + None => { + return (StatusCode::NOT_FOUND, Html("no such artifact")).into_response(); + } + }; + + let mut live_artifact = false; + + if let Some(completed_time) = artifact_descriptor.completed_time { + if completed_time < artifact_descriptor.created_time { + live_artifact = true; + } + } else { + live_artifact = true; + } + + if live_artifact { + let (mut tx_sender, tx_receiver) = tokio::io::duplex(65536); + let resp_body = axum_extra::body::AsyncReadBody::new(tx_receiver); + let mut artifact_path = ctx.jobs_path.clone(); + artifact_path.push(artifact_descriptor.job_id.to_string()); + artifact_path.push(artifact_descriptor.id.to_string()); + spawn(async move { + let mut artifact = artifact_descriptor; + + let mut artifact_file = tokio::fs::File::open(&artifact_path) + .await + .expect("artifact file exists?"); + while artifact.completed_time.is_none() { + match crate::io::forward_data(&mut artifact_file, &mut tx_sender).await { + Ok(()) => { + // reached the current EOF, wait and then commit an unspeakable sin + tokio::time::sleep(std::time::Duration::from_millis(250)).await; + // this would be much implemented as yielding on a condvar woken when an + // inotify event on the file indicates a write has occurred. but i am + // dreadfully lazy, so we'll just uhh. busy-poll on the file? lmao. + artifact = ctx.dbctx.lookup_artifact(artifact.job_id, artifact.id) + .expect("can query db") + .expect("artifact still exists"); + } + Err(e) => { + eprintln!("artifact file streaming failed: {}", e); + } + } + } + + eprintln!("[+] artifact {} is done being written, and we've sent the whole thing. bye!", artifact.id); + }); + (StatusCode::OK, resp_body).into_response() + } else { + (StatusCode::OK, Html("all done")).into_response() + } +} + +async fn handle_repo_summary(Path(path): Path, State(ctx): State) -> impl IntoResponse { eprintln!("get repo summary: {:?}", path); let mut last_builds = Vec::new(); - let (repo_id, repo_name): (u64, String) = match ctx.conn.lock().unwrap() + let (repo_id, repo_name): (u64, String) = match ctx.dbctx.conn.lock().unwrap() .query_row("select id, repo_name from repos where repo_name=?1;", [&path], |row| Ok((row.get(0).unwrap(), row.get(1).unwrap()))) .optional() .unwrap() { @@ -532,8 +602,8 @@ async fn handle_repo_summary(Path(path): Path, State(ctx): State, State(ctx): State\n"); for job in last_builds.iter().take(10) { - let job_commit = ctx.commit_sha(job.commit_id).expect("job has a commit"); - let commit_html = match commit_url(&job, &job_commit, &ctx) { + let job_commit = ctx.dbctx.commit_sha(job.commit_id).expect("job has a commit"); + let commit_html = match commit_url(&job, &job_commit, &ctx.dbctx) { Some(url) => format!("{}", url, &job_commit), None => job_commit.clone() }; - let job_html = format!("{}", job_url(&job, &job_commit, &ctx), job.id); + let job_html = format!("{}", job_url(&job, &job_commit, &ctx.dbctx), job.id); let last_build_time = Utc.timestamp_millis_opt(job.created_time as i64).unwrap().to_rfc2822(); let duration = if let Some(start_time) = job.start_time { @@ -625,7 +695,7 @@ async fn handle_repo_summary(Path(path): Path, State(ctx): State, headers: HeaderMap, State(ctx): State>, body: Bytes) -> impl IntoResponse { +async fn handle_repo_event(Path(path): Path<(String, String)>, headers: HeaderMap, State(ctx): State, body: Bytes) -> impl IntoResponse { let json: Result = serde_json::from_slice(&body); eprintln!("repo event: {:?} {:?} {:?}", path.0, path.1, headers); @@ -674,11 +744,11 @@ async fn handle_repo_event(Path(path): Path<(String, String)>, headers: HeaderMa } }; - handle_github_event(ctx, path.0, path.1, kind, payload).await + handle_github_event(Arc::clone(&ctx.dbctx), path.0, path.1, kind, payload).await } -async fn make_app_server(cfg_path: &PathBuf, db_path: &PathBuf) -> Router { +async fn make_app_server(jobs_path: PathBuf, cfg_path: &PathBuf, db_path: &PathBuf) -> Router { /* // GET /hello/warp => 200 OK with body "Hello, warp!" @@ -743,9 +813,13 @@ async fn make_app_server(cfg_path: &PathBuf, db_path: &PathBuf) -> Router { .route("/:owner/:repo/:sha", get(handle_commit_status)) .route("/:owner", get(handle_repo_summary)) .route("/:owner/:repo", post(handle_repo_event)) + .route("/artifact/:job/:artifact_id", get(handle_get_artifact)) .route("/", get(handle_ci_index)) .fallback(fallback_get) - .with_state(Arc::new(DbCtx::new(cfg_path, db_path))) + .with_state(WebserverState { + jobs_path, + dbctx: Arc::new(DbCtx::new(cfg_path, db_path)) + }) } #[tokio::main] @@ -766,15 +840,16 @@ async fn main() { web_config.key_path.clone(), ).await.unwrap(); + let jobs_path = web_config.jobs_path.clone(); let config_path = web_config.config_path.clone(); let db_path = web_config.db_path.clone(); if let Some(addr) = web_config.debug_addr.as_ref() { spawn(axum_server::bind_rustls("127.0.0.1:8080".parse().unwrap(), config.clone()) - .serve(make_app_server(&config_path, &db_path).await.into_make_service())); + .serve(make_app_server(jobs_path.clone(), &config_path, &db_path).await.into_make_service())); } if let Some(addr) = web_config.server_addr.as_ref() { spawn(axum_server::bind_rustls("0.0.0.0:443".parse().unwrap(), config) - .serve(make_app_server(&config_path, &db_path).await.into_make_service())); + .serve(make_app_server(jobs_path.clone(), &config_path, &db_path).await.into_make_service())); } loop { tokio::time::sleep(std::time::Duration::from_millis(1000)).await; -- cgit v1.1