summaryrefslogtreecommitdiff
path: root/src/ci_driver.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/ci_driver.rs')
-rw-r--r--src/ci_driver.rs77
1 files changed, 72 insertions, 5 deletions
diff --git a/src/ci_driver.rs b/src/ci_driver.rs
index 96e0d44..9ec0dd8 100644
--- a/src/ci_driver.rs
+++ b/src/ci_driver.rs
@@ -9,6 +9,7 @@ use std::time::{SystemTime, UNIX_EPOCH};
use axum_server::tls_rustls::RustlsConfig;
use axum::body::StreamBody;
use axum::http::{StatusCode};
+use hyper::HeaderMap;
use axum::Router;
use axum::routing::*;
use axum::extract::State;
@@ -101,8 +102,8 @@ async fn activate_job(dbctx: Arc<DbCtx>, job: &PendingJob, clients: &mut mpsc::R
let run_host = client_job.client.name.clone();
connection.execute(
- "update jobs set started_time=?1, run_host=?2, state=1, artifacts_path=?3 where id=?4",
- (now as u64, run_host, format!("{}", artifacts.display()), job.id)
+ "update jobs set started_time=?1, run_host=?2, state=1, artifacts_path=?3, build_token=?4 where id=?5",
+ (now as u64, run_host, format!("{}", artifacts.display()), &client_job.client.build_token, job.id)
)
.expect("can update");
@@ -139,7 +140,15 @@ struct ClientJob {
impl ClientJob {
pub async fn run(&mut self) {
loop {
- let msg = self.client.recv().await.expect("recv works").expect("client sent an object");
+ eprintln!("waiting on response..");
+ let msg = match self.client.recv().await.expect("recv works") {
+ Some(msg) => msg,
+ None => {
+ eprintln!("client hung up. job's done, i hope?");
+ return;
+ }
+ };
+ eprintln!("got {:?}", msg);
let msg_kind = msg.as_object().unwrap().get("kind").unwrap().as_str().unwrap();
match msg_kind {
"job_status" => {
@@ -228,7 +237,6 @@ impl RunnerClient {
Err(format!("no client job: {:?}", e))
},
None => {
- eprintln!("no more body chunks? client hung up?");
Ok(None)
}
}
@@ -258,7 +266,6 @@ impl RunnerClient {
}
}
Ok(None) => {
- eprintln!("no more body chunks? client hung up?");
Ok(None)
}
Err(e) => {
@@ -275,6 +282,65 @@ impl fmt::Debug for RunnerClient {
}
}
+#[axum_macros::debug_handler]
+async fn handle_artifact(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClient>)>, headers: HeaderMap, artifact_content: BodyStream) -> impl IntoResponse {
+ let job_token = match headers.get("x-job-token") {
+ Some(job_token) => job_token.to_str().expect("valid string"),
+ None => {
+ eprintln!("bad artifact post: headers: {:?}\nno x-job-token", headers);
+ return (StatusCode::BAD_REQUEST, "").into_response();
+ }
+ };
+
+ let (job, artifact_path, token_validity) = match ctx.0.job_for_token(&job_token).unwrap() {
+ Some(result) => result,
+ None => {
+ eprintln!("bad artifact post: headers: {:?}\njob token is not known", headers);
+ return (StatusCode::BAD_REQUEST, "").into_response();
+ }
+ };
+
+ if token_validity != dbctx::TokenValidity::Valid {
+ eprintln!("bad artifact post: headers: {:?}. token is not valid: {:?}", headers, token_validity);
+ return (StatusCode::BAD_REQUEST, "").into_response();
+ }
+
+ let artifact_path = if let Some(artifact_path) = artifact_path {
+ artifact_path
+ } else {
+ eprintln!("bad artifact post: headers: {:?}. no artifact path?", headers);
+ return (StatusCode::BAD_REQUEST, "").into_response();
+ };
+
+ let artifact_name = match headers.get("x-artifact-name") {
+ Some(artifact_name) => artifact_name.to_str().expect("valid string"),
+ None => {
+ eprintln!("bad artifact post: headers: {:?}\nno x-artifact-name", headers);
+ return (StatusCode::BAD_REQUEST, "").into_response();
+ }
+ };
+
+ let artifact_desc = match headers.get("x-artifact-desc") {
+ Some(artifact_desc) => artifact_desc.to_str().expect("valid string"),
+ None => {
+ eprintln!("bad artifact post: headers: {:?}\nno x-artifact-desc", headers);
+ return (StatusCode::BAD_REQUEST, "").into_response();
+ }
+ };
+
+ let mut artifact = match ctx.0.reserve_artifact(job, artifact_name, artifact_desc).await {
+ Ok(artifact) => artifact,
+ Err(err) => {
+ eprintln!("failure to reserve artifact: {:?}", err);
+ return (StatusCode::INTERNAL_SERVER_ERROR, "").into_response();
+ }
+ };
+
+ spawn(async move { artifact.store_all(artifact_content).await });
+
+ (StatusCode::OK, "").into_response()
+}
+
async fn handle_next_job(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClient>)>, job_resp: BodyStream) -> impl IntoResponse {
let (tx_sender, tx_receiver) = mpsc::channel(8);
let resp_body = StreamBody::new(ReceiverStream::new(tx_receiver));
@@ -308,6 +374,7 @@ async fn make_api_server(dbctx: Arc<DbCtx>) -> (Router, mpsc::Receiver<RunnerCli
let router = Router::new()
.route("/api/next_job", post(handle_next_job))
+ .route("/api/artifact", post(handle_artifact))
.with_state((dbctx, pending_client_sender));
(router, pending_client_receiver)
}