diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/ci_driver.rs | 6 | ||||
| -rw-r--r-- | src/dbctx.rs | 43 | ||||
| -rw-r--r-- | src/io.rs | 10 | ||||
| -rw-r--r-- | src/lua/mod.rs | 11 | ||||
| -rw-r--r-- | src/main.rs | 129 | ||||
| -rw-r--r-- | src/sql.rs | 7 | 
6 files changed, 161 insertions, 45 deletions
| diff --git a/src/ci_driver.rs b/src/ci_driver.rs index 29a699c..c4a791d 100644 --- a/src/ci_driver.rs +++ b/src/ci_driver.rs @@ -414,7 +414,11 @@ async fn handle_artifact(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClien      };      eprintln!("spawning task..."); -    spawn(async move { artifact.store_all(artifact_content).await.unwrap() }); +    let dbctx_ref = Arc::clone(&ctx.0); +    spawn(async move { +        artifact.store_all(artifact_content).await.unwrap(); +        dbctx_ref.finalize_artifact(artifact.artifact_id).await.unwrap(); +    });      eprintln!("done?");      (StatusCode::OK, "").into_response() diff --git a/src/dbctx.rs b/src/dbctx.rs index f545158..926f717 100644 --- a/src/dbctx.rs +++ b/src/dbctx.rs @@ -86,7 +86,9 @@ pub struct ArtifactRecord {      pub id: u64,      pub job_id: u64,      pub name: String, -    pub desc: String +    pub desc: String, +    pub created_time: u64, +    pub completed_time: Option<u64>,  }  impl DbCtx { @@ -148,13 +150,30 @@ impl DbCtx {          Ok(conn.last_insert_rowid() as u64)      } +    pub async fn finalize_artifact(&self, artifact_id: u64) -> Result<(), String> { +        let conn = self.conn.lock().unwrap(); +        conn +            .execute( +                "update artifacts set completed_time=?1 where id=?2", +                (crate::io::now_ms(), artifact_id) +            ) +            .map(|_| ()) +            .map_err(|e| { +                format!("{:?}", e) +            }) +    } +      pub async fn reserve_artifact(&self, job_id: u64, name: &str, desc: &str) -> Result<ArtifactDescriptor, String> {          let artifact_id = { +            let created_time = SystemTime::now() +                .duration_since(UNIX_EPOCH) +                .expect("now is before epoch") +                .as_millis() as u64;              let conn = self.conn.lock().unwrap();              conn                  .execute( -                    "insert into artifacts (job_id, name, desc) values (?1, ?2, ?3)", -                    (job_id, name, desc) +                    "insert into artifacts (job_id, name, desc, created_time) values (?1, ?2, ?3, ?4)", +                    (job_id, name, desc, created_time)                  )                  .map_err(|e| {                      format!("{:?}", e) @@ -166,6 +185,20 @@ impl DbCtx {          ArtifactDescriptor::new(job_id, artifact_id).await      } +    pub fn lookup_artifact(&self, job_id: u64, artifact_id: u64) -> Result<Option<ArtifactRecord>, String> { +        let conn = self.conn.lock().unwrap(); +        conn +            .query_row(sql::ARTIFACT_BY_ID, [artifact_id, job_id], |row| { +                let (id, job_id, name, desc, created_time, completed_time) = row.try_into().unwrap(); + +                Ok(ArtifactRecord { +                    id, job_id, name, desc, created_time, completed_time +                }) +            }) +            .optional() +            .map_err(|e| e.to_string()) +    } +      pub fn commit_sha(&self, commit_id: u64) -> Result<String, String> {          self.conn.lock()              .unwrap() @@ -321,8 +354,8 @@ impl DbCtx {          let mut artifacts = Vec::new();          while let Some(row) = result.next().unwrap() { -            let (id, job_id, name, desc): (u64, u64, String, String) = row.try_into().unwrap(); -            artifacts.push(ArtifactRecord { id, job_id, name, desc }); +            let (id, job_id, name, desc, created_time, completed_time): (u64, u64, String, String, u64, Option<u64>) = row.try_into().unwrap(); +            artifacts.push(ArtifactRecord { id, job_id, name, desc, created_time, completed_time });          }          Ok(artifacts) @@ -5,6 +5,14 @@ use std::io::Write;  use tokio::fs::OpenOptions;  use std::task::{Poll, Context};  use std::pin::Pin; +use std::time::{UNIX_EPOCH, SystemTime}; + +pub fn now_ms() -> u64 { +    SystemTime::now() +        .duration_since(UNIX_EPOCH) +        .expect("now is later than epoch") +        .as_millis() as u64 +}  pub struct ArtifactStream {      sender: hyper::body::Sender, @@ -51,7 +59,7 @@ impl tokio::io::AsyncWrite for ArtifactStream {  pub struct ArtifactDescriptor {      job_id: u64, -    artifact_id: u64, +    pub artifact_id: u64,      file: File,  } diff --git a/src/lua/mod.rs b/src/lua/mod.rs index 662b34c..53473cf 100644 --- a/src/lua/mod.rs +++ b/src/lua/mod.rs @@ -4,7 +4,6 @@ use rlua::prelude::*;  use std::sync::{Arc, Mutex};  use std::path::PathBuf; -use std::time::{UNIX_EPOCH, SystemTime};  pub const DEFAULT_RUST_GOODFILE: &'static [u8] = include_bytes!("../../config/goodfiles/rust.lua"); @@ -18,7 +17,6 @@ mod lua_exports {      use std::sync::{Arc, Mutex};      use std::path::PathBuf; -    use std::time::{UNIX_EPOCH, SystemTime};      use rlua::prelude::*; @@ -166,13 +164,6 @@ mod lua_exports {          })      } -    pub fn now_ms() -> u64 { -        SystemTime::now() -            .duration_since(UNIX_EPOCH) -            .expect("now is later than epoch") -            .as_millis() as u64 -    } -      pub fn has_cmd(name: &str) -> Result<bool, rlua::Error> {          Ok(std::process::Command::new("which")              .arg(name) @@ -239,7 +230,7 @@ impl BuildEnv {              lua_exports::metric(name, value, job_ref)          })?; -        let now_ms = decl_env.create_function("now_ms", move |_, job_ref, ()| Ok(lua_exports::now_ms()))?; +        let now_ms = decl_env.create_function("now_ms", move |_, job_ref, ()| Ok(crate::io::now_ms()))?;          let artifact = decl_env.create_function("artifact", move |_, job_ref, (path, name): (String, Option<String>)| {              lua_exports::artifact(path, name, job_ref) diff --git a/src/main.rs b/src/main.rs index ab6b62c..c1d9664 100644 --- a/src/main.rs +++ b/src/main.rs @@ -20,6 +20,9 @@ use axum::extract::rejection::JsonRejection;  use axum::body::Bytes;  use axum::http::{StatusCode, Uri};  use http::header::HeaderMap; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use axum::body::StreamBody;  use std::sync::Arc;  use std::time::{SystemTime, UNIX_EPOCH}; @@ -43,12 +46,19 @@ struct WebserverConfig {      psks: Vec<GithubPsk>,      cert_path: PathBuf,      key_path: PathBuf, +    jobs_path: PathBuf,      config_path: PathBuf,      db_path: PathBuf,      debug_addr: Option<String>,      server_addr: Option<String>,  } +#[derive(Clone)] +struct WebserverState { +    jobs_path: PathBuf, +    dbctx: Arc<DbCtx>, +} +  #[derive(Clone, Serialize, Deserialize)]  struct GithubPsk {      key: String, @@ -253,9 +263,9 @@ async fn handle_github_event(ctx: Arc<DbCtx>, owner: String, repo: String, event      }  } -async fn handle_ci_index(State(ctx): State<Arc<DbCtx>>) -> impl IntoResponse { +async fn handle_ci_index(State(ctx): State<WebserverState>) -> impl IntoResponse {      eprintln!("root index"); -    let repos = match ctx.get_repos() { +    let repos = match ctx.dbctx.get_repos() {          Ok(repos) => repos,          Err(e) => {              eprintln!("failed to get repos: {:?}", e); @@ -293,8 +303,8 @@ async fn handle_ci_index(State(ctx): State<Arc<DbCtx>>) -> impl IntoResponse {      for repo in repos {          let mut most_recent_job: Option<Job> = None; -        for remote in ctx.remotes_by_repo(repo.id).expect("remotes by repo works") { -            let last_job = ctx.last_job_from_remote(remote.id).expect("job by remote works"); +        for remote in ctx.dbctx.remotes_by_repo(repo.id).expect("remotes by repo works") { +            let last_job = ctx.dbctx.last_job_from_remote(remote.id).expect("job by remote works");              if let Some(last_job) = last_job {                  if most_recent_job.as_ref().map(|job| job.created_time < last_job.created_time).unwrap_or(true) {                      most_recent_job = Some(last_job); @@ -306,13 +316,13 @@ async fn handle_ci_index(State(ctx): State<Arc<DbCtx>>) -> impl IntoResponse {          let row_html: String = match most_recent_job {              Some(job) => { -                let job_commit = ctx.commit_sha(job.commit_id).expect("job has a commit"); -                let commit_html = match commit_url(&job, &job_commit, &ctx) { +                let job_commit = ctx.dbctx.commit_sha(job.commit_id).expect("job has a commit"); +                let commit_html = match commit_url(&job, &job_commit, &ctx.dbctx) {                      Some(url) => format!("<a href=\"{}\">{}</a>", url, &job_commit),                      None => job_commit.clone()                  }; -                let job_html = format!("<a href=\"{}\">{}</a>", job_url(&job, &job_commit, &ctx), job.id); +                let job_html = format!("<a href=\"{}\">{}</a>", job_url(&job, &job_commit, &ctx.dbctx), job.id);                  let last_build_time = Utc.timestamp_millis_opt(job.created_time as i64).unwrap().to_rfc2822();                  let duration = if let Some(start_time) = job.start_time { @@ -391,13 +401,13 @@ async fn handle_ci_index(State(ctx): State<Arc<DbCtx>>) -> impl IntoResponse {      (StatusCode::OK, Html(response))  } -async fn handle_commit_status(Path(path): Path<(String, String, String)>, State(ctx): State<Arc<DbCtx>>) -> impl IntoResponse { +async fn handle_commit_status(Path(path): Path<(String, String, String)>, State(ctx): State<WebserverState>) -> impl IntoResponse {      eprintln!("path: {}/{}, sha {}", path.0, path.1, path.2);      let remote_path = format!("{}/{}", path.0, path.1);      let sha = path.2;      let (commit_id, sha): (u64, String) = if sha.len() >= 7 { -        match ctx.conn.lock().unwrap() +        match ctx.dbctx.conn.lock().unwrap()              .query_row("select id, sha from commits where sha like ?1;", [&format!("{}%", sha)], |row| Ok((row.get_unwrap(0), row.get_unwrap(1))))              .optional()              .expect("can query") { @@ -410,17 +420,17 @@ async fn handle_commit_status(Path(path): Path<(String, String, String)>, State(          return (StatusCode::NOT_FOUND, Html("<html><body>no such commit</body></html>".to_string()));      }; -    let (remote_id, repo_id): (u64, u64) = ctx.conn.lock().unwrap() +    let (remote_id, repo_id): (u64, u64) = ctx.dbctx.conn.lock().unwrap()          .query_row("select id, repo_id from remotes where remote_path=?1;", [&remote_path], |row| Ok((row.get_unwrap(0), row.get_unwrap(1))))          .expect("can query"); -    let (job_id, state, build_result, result_desc): (u64, u8, Option<u8>, Option<String>) = ctx.conn.lock().unwrap() +    let (job_id, state, build_result, result_desc): (u64, u8, Option<u8>, Option<String>) = ctx.dbctx.conn.lock().unwrap()          .query_row("select id, state, build_result, final_status from jobs where commit_id=?1;", [commit_id], |row| Ok((row.get_unwrap(0), row.get_unwrap(1), row.get_unwrap(2), row.get_unwrap(3))))          .expect("can query");      let state: sql::JobState = unsafe { std::mem::transmute(state) }; -    let repo_name: String = ctx.conn.lock().unwrap() +    let repo_name: String = ctx.dbctx.conn.lock().unwrap()          .query_row("select repo_name from repos where id=?1;", [repo_id], |row| row.get(0))          .expect("can query"); @@ -455,7 +465,7 @@ async fn handle_commit_status(Path(path): Path<(String, String, String)>, State(      let output = if state == JobState::Finished && build_result == Some(1) || state == JobState::Error {          // collect stderr/stdout from the last artifacts, then the last 10kb of each, insert it in          // the page... -        let artifacts = ctx.artifacts_for_job(job_id).unwrap(); +        let artifacts = ctx.dbctx.artifacts_for_job(job_id).unwrap();          if artifacts.len() > 0 {              let mut streams = String::new();              for artifact in artifacts.iter() { @@ -473,7 +483,7 @@ async fn handle_commit_status(Path(path): Path<(String, String, String)>, State(          None      }; -    let metrics = ctx.metrics_for_job(job_id).unwrap(); +    let metrics = ctx.dbctx.metrics_for_job(job_id).unwrap();      let metrics_section = if metrics.len() > 0 {          let mut section = String::new();          section.push_str("<div>"); @@ -516,12 +526,72 @@ async fn handle_commit_status(Path(path): Path<(String, String, String)>, State(      (StatusCode::OK, Html(html))  } -async fn handle_repo_summary(Path(path): Path<String>, State(ctx): State<Arc<DbCtx>>) -> impl IntoResponse { +async fn handle_get_artifact(Path(path): Path<(String, String)>, State(ctx): State<WebserverState>) -> impl IntoResponse { +    eprintln!("get artifact, job={}, artifact={}", path.0, path.1); +    let job_id: u64 = path.0.parse().unwrap(); +    let artifact_id: u64 = path.1.parse().unwrap(); + +    let artifact_descriptor = match ctx.dbctx.lookup_artifact(job_id, artifact_id).unwrap() { +        Some(artifact) => artifact, +        None => { +            return (StatusCode::NOT_FOUND, Html("no such artifact")).into_response(); +        } +    }; + +    let mut live_artifact = false; + +    if let Some(completed_time) = artifact_descriptor.completed_time { +        if completed_time < artifact_descriptor.created_time { +            live_artifact = true; +        } +    } else { +        live_artifact = true; +    } + +    if live_artifact { +        let (mut tx_sender, tx_receiver) = tokio::io::duplex(65536); +        let resp_body = axum_extra::body::AsyncReadBody::new(tx_receiver); +        let mut artifact_path = ctx.jobs_path.clone(); +        artifact_path.push(artifact_descriptor.job_id.to_string()); +        artifact_path.push(artifact_descriptor.id.to_string()); +        spawn(async move { +            let mut artifact = artifact_descriptor; + +            let mut artifact_file = tokio::fs::File::open(&artifact_path) +                .await +                .expect("artifact file exists?"); +            while artifact.completed_time.is_none() { +                match crate::io::forward_data(&mut artifact_file, &mut tx_sender).await { +                    Ok(()) => { +                        // reached the current EOF, wait and then commit an unspeakable sin +                        tokio::time::sleep(std::time::Duration::from_millis(250)).await; +                        // this would be much implemented as yielding on a condvar woken when an +                        // inotify event on the file indicates a write has occurred. but i am +                        // dreadfully lazy, so we'll just uhh. busy-poll on the file? lmao. +                        artifact = ctx.dbctx.lookup_artifact(artifact.job_id, artifact.id) +                            .expect("can query db") +                            .expect("artifact still exists"); +                    } +                    Err(e) => { +                        eprintln!("artifact file streaming failed: {}", e); +                    } +                } +            } + +            eprintln!("[+] artifact {} is done being written, and we've sent the whole thing. bye!", artifact.id); +        }); +        (StatusCode::OK, resp_body).into_response() +    } else { +        (StatusCode::OK, Html("all done")).into_response() +    } +} + +async fn handle_repo_summary(Path(path): Path<String>, State(ctx): State<WebserverState>) -> impl IntoResponse {      eprintln!("get repo summary: {:?}", path);      let mut last_builds = Vec::new(); -    let (repo_id, repo_name): (u64, String) = match ctx.conn.lock().unwrap() +    let (repo_id, repo_name): (u64, String) = match ctx.dbctx.conn.lock().unwrap()          .query_row("select id, repo_name from repos where repo_name=?1;", [&path], |row| Ok((row.get(0).unwrap(), row.get(1).unwrap())))          .optional()          .unwrap() { @@ -532,8 +602,8 @@ async fn handle_repo_summary(Path(path): Path<String>, State(ctx): State<Arc<DbC          }      }; -    for remote in ctx.remotes_by_repo(repo_id).expect("can get repo from a path") { -        let mut last_ten_jobs = ctx.recent_jobs_from_remote(remote.id, 10).expect("can look up jobs for a repo"); +    for remote in ctx.dbctx.remotes_by_repo(repo_id).expect("can get repo from a path") { +        let mut last_ten_jobs = ctx.dbctx.recent_jobs_from_remote(remote.id, 10).expect("can look up jobs for a repo");          last_builds.extend(last_ten_jobs.drain(..));      }      last_builds.sort_by_key(|job| -(job.created_time as i64)); @@ -559,13 +629,13 @@ async fn handle_repo_summary(Path(path): Path<String>, State(ctx): State<Arc<DbC      response.push_str("</tr>\n");      for job in last_builds.iter().take(10) { -        let job_commit = ctx.commit_sha(job.commit_id).expect("job has a commit"); -        let commit_html = match commit_url(&job, &job_commit, &ctx) { +        let job_commit = ctx.dbctx.commit_sha(job.commit_id).expect("job has a commit"); +        let commit_html = match commit_url(&job, &job_commit, &ctx.dbctx) {              Some(url) => format!("<a href=\"{}\">{}</a>", url, &job_commit),              None => job_commit.clone()          }; -        let job_html = format!("<a href=\"{}\">{}</a>", job_url(&job, &job_commit, &ctx), job.id); +        let job_html = format!("<a href=\"{}\">{}</a>", job_url(&job, &job_commit, &ctx.dbctx), job.id);          let last_build_time = Utc.timestamp_millis_opt(job.created_time as i64).unwrap().to_rfc2822();          let duration = if let Some(start_time) = job.start_time { @@ -625,7 +695,7 @@ async fn handle_repo_summary(Path(path): Path<String>, State(ctx): State<Arc<DbC      (StatusCode::OK, Html(response))  } -async fn handle_repo_event(Path(path): Path<(String, String)>, headers: HeaderMap, State(ctx): State<Arc<DbCtx>>, body: Bytes) -> impl IntoResponse { +async fn handle_repo_event(Path(path): Path<(String, String)>, headers: HeaderMap, State(ctx): State<WebserverState>, body: Bytes) -> impl IntoResponse {      let json: Result<serde_json::Value, _> = serde_json::from_slice(&body);      eprintln!("repo event: {:?} {:?} {:?}", path.0, path.1, headers); @@ -674,11 +744,11 @@ async fn handle_repo_event(Path(path): Path<(String, String)>, headers: HeaderMa          }      }; -    handle_github_event(ctx, path.0, path.1, kind, payload).await +    handle_github_event(Arc::clone(&ctx.dbctx), path.0, path.1, kind, payload).await  } -async fn make_app_server(cfg_path: &PathBuf, db_path: &PathBuf) -> Router { +async fn make_app_server(jobs_path: PathBuf, cfg_path: &PathBuf, db_path: &PathBuf) -> Router {      /*      // GET /hello/warp => 200 OK with body "Hello, warp!" @@ -743,9 +813,13 @@ async fn make_app_server(cfg_path: &PathBuf, db_path: &PathBuf) -> Router {          .route("/:owner/:repo/:sha", get(handle_commit_status))          .route("/:owner", get(handle_repo_summary))          .route("/:owner/:repo", post(handle_repo_event)) +        .route("/artifact/:job/:artifact_id", get(handle_get_artifact))          .route("/", get(handle_ci_index))          .fallback(fallback_get) -        .with_state(Arc::new(DbCtx::new(cfg_path, db_path))) +        .with_state(WebserverState { +            jobs_path, +            dbctx: Arc::new(DbCtx::new(cfg_path, db_path)) +        })  }  #[tokio::main] @@ -766,15 +840,16 @@ async fn main() {          web_config.key_path.clone(),      ).await.unwrap(); +    let jobs_path = web_config.jobs_path.clone();      let config_path = web_config.config_path.clone();      let db_path = web_config.db_path.clone();      if let Some(addr) = web_config.debug_addr.as_ref() {          spawn(axum_server::bind_rustls("127.0.0.1:8080".parse().unwrap(), config.clone()) -            .serve(make_app_server(&config_path, &db_path).await.into_make_service())); +            .serve(make_app_server(jobs_path.clone(), &config_path, &db_path).await.into_make_service()));      }      if let Some(addr) = web_config.server_addr.as_ref() {          spawn(axum_server::bind_rustls("0.0.0.0:443".parse().unwrap(), config) -            .serve(make_app_server(&config_path, &db_path).await.into_make_service())); +            .serve(make_app_server(jobs_path.clone(), &config_path, &db_path).await.into_make_service()));      }      loop {          tokio::time::sleep(std::time::Duration::from_millis(1000)).await; @@ -84,7 +84,9 @@ pub const CREATE_ARTIFACTS_TABLE: &'static str = "\      CREATE TABLE IF NOT EXISTS artifacts (id INTEGER PRIMARY KEY AUTOINCREMENT,          job_id INTEGER,          name TEXT, -        desc TEXT);"; +        desc TEXT, +        created_time INTEGER, +        completed_time INTEGER);";  pub const CREATE_REMOTES_INDEX: &'static str = "\      CREATE INDEX IF NOT EXISTS 'repo_to_remote' ON remotes(repo_id);"; @@ -98,6 +100,9 @@ pub const PENDING_JOBS: &'static str = "\  pub const LAST_ARTIFACTS_FOR_JOB: &'static str = "\      select * from artifacts where job_id=?1 and (name like \"%(stderr)%\" or name like \"%(stdout)%\") order by id desc limit 2;"; +pub const ARTIFACT_BY_ID: &'static str = "\ +    select * from artifacts where id=?1 and job_id=?2;"; +  pub const METRICS_FOR_JOB: &'static str = "\      select * from metrics where job_id=?1 order by id asc;"; | 
