From cb418168d9375d4ea6c9d21ee6b97e02a575fb4b Mon Sep 17 00:00:00 2001 From: iximeow Date: Sun, 25 Dec 2022 08:17:28 +0000 Subject: support uploading artifacts during builds --- src/ci_driver.rs | 77 ++++++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 72 insertions(+), 5 deletions(-) (limited to 'src/ci_driver.rs') diff --git a/src/ci_driver.rs b/src/ci_driver.rs index 96e0d44..9ec0dd8 100644 --- a/src/ci_driver.rs +++ b/src/ci_driver.rs @@ -9,6 +9,7 @@ use std::time::{SystemTime, UNIX_EPOCH}; use axum_server::tls_rustls::RustlsConfig; use axum::body::StreamBody; use axum::http::{StatusCode}; +use hyper::HeaderMap; use axum::Router; use axum::routing::*; use axum::extract::State; @@ -101,8 +102,8 @@ async fn activate_job(dbctx: Arc, job: &PendingJob, clients: &mut mpsc::R let run_host = client_job.client.name.clone(); connection.execute( - "update jobs set started_time=?1, run_host=?2, state=1, artifacts_path=?3 where id=?4", - (now as u64, run_host, format!("{}", artifacts.display()), job.id) + "update jobs set started_time=?1, run_host=?2, state=1, artifacts_path=?3, build_token=?4 where id=?5", + (now as u64, run_host, format!("{}", artifacts.display()), &client_job.client.build_token, job.id) ) .expect("can update"); @@ -139,7 +140,15 @@ struct ClientJob { impl ClientJob { pub async fn run(&mut self) { loop { - let msg = self.client.recv().await.expect("recv works").expect("client sent an object"); + eprintln!("waiting on response.."); + let msg = match self.client.recv().await.expect("recv works") { + Some(msg) => msg, + None => { + eprintln!("client hung up. job's done, i hope?"); + return; + } + }; + eprintln!("got {:?}", msg); let msg_kind = msg.as_object().unwrap().get("kind").unwrap().as_str().unwrap(); match msg_kind { "job_status" => { @@ -228,7 +237,6 @@ impl RunnerClient { Err(format!("no client job: {:?}", e)) }, None => { - eprintln!("no more body chunks? client hung up?"); Ok(None) } } @@ -258,7 +266,6 @@ impl RunnerClient { } } Ok(None) => { - eprintln!("no more body chunks? client hung up?"); Ok(None) } Err(e) => { @@ -275,6 +282,65 @@ impl fmt::Debug for RunnerClient { } } +#[axum_macros::debug_handler] +async fn handle_artifact(State(ctx): State<(Arc, mpsc::Sender)>, headers: HeaderMap, artifact_content: BodyStream) -> impl IntoResponse { + let job_token = match headers.get("x-job-token") { + Some(job_token) => job_token.to_str().expect("valid string"), + None => { + eprintln!("bad artifact post: headers: {:?}\nno x-job-token", headers); + return (StatusCode::BAD_REQUEST, "").into_response(); + } + }; + + let (job, artifact_path, token_validity) = match ctx.0.job_for_token(&job_token).unwrap() { + Some(result) => result, + None => { + eprintln!("bad artifact post: headers: {:?}\njob token is not known", headers); + return (StatusCode::BAD_REQUEST, "").into_response(); + } + }; + + if token_validity != dbctx::TokenValidity::Valid { + eprintln!("bad artifact post: headers: {:?}. token is not valid: {:?}", headers, token_validity); + return (StatusCode::BAD_REQUEST, "").into_response(); + } + + let artifact_path = if let Some(artifact_path) = artifact_path { + artifact_path + } else { + eprintln!("bad artifact post: headers: {:?}. no artifact path?", headers); + return (StatusCode::BAD_REQUEST, "").into_response(); + }; + + let artifact_name = match headers.get("x-artifact-name") { + Some(artifact_name) => artifact_name.to_str().expect("valid string"), + None => { + eprintln!("bad artifact post: headers: {:?}\nno x-artifact-name", headers); + return (StatusCode::BAD_REQUEST, "").into_response(); + } + }; + + let artifact_desc = match headers.get("x-artifact-desc") { + Some(artifact_desc) => artifact_desc.to_str().expect("valid string"), + None => { + eprintln!("bad artifact post: headers: {:?}\nno x-artifact-desc", headers); + return (StatusCode::BAD_REQUEST, "").into_response(); + } + }; + + let mut artifact = match ctx.0.reserve_artifact(job, artifact_name, artifact_desc).await { + Ok(artifact) => artifact, + Err(err) => { + eprintln!("failure to reserve artifact: {:?}", err); + return (StatusCode::INTERNAL_SERVER_ERROR, "").into_response(); + } + }; + + spawn(async move { artifact.store_all(artifact_content).await }); + + (StatusCode::OK, "").into_response() +} + async fn handle_next_job(State(ctx): State<(Arc, mpsc::Sender)>, job_resp: BodyStream) -> impl IntoResponse { let (tx_sender, tx_receiver) = mpsc::channel(8); let resp_body = StreamBody::new(ReceiverStream::new(tx_receiver)); @@ -308,6 +374,7 @@ async fn make_api_server(dbctx: Arc) -> (Router, mpsc::Receiver