From 1fe3acc1422b09db27d179db05331a763b1db8a6 Mon Sep 17 00:00:00 2001 From: iximeow Date: Mon, 26 Dec 2022 00:21:32 +0000 Subject: let build runners indicate restricted interest --- src/ci_driver.rs | 77 +++++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 59 insertions(+), 18 deletions(-) (limited to 'src/ci_driver.rs') diff --git a/src/ci_driver.rs b/src/ci_driver.rs index bf21047..ef224b8 100644 --- a/src/ci_driver.rs +++ b/src/ci_driver.rs @@ -103,6 +103,11 @@ async fn activate_job(dbctx: Arc, job: &PendingJob, clients: &mut mpsc::R let mut candidate = clients.recv().await .ok_or_else(|| "client channel disconnected".to_string())?; + if !candidate.will_accept(job) { + eprintln!("client {:?} would not accept job {:?}", candidate, job); + continue; + } + if let Ok(Some(mut client_job)) = candidate.submit(&dbctx, &job, &remote_git_url, &commit_sha).await { break client_job; } else { @@ -130,6 +135,7 @@ struct RunnerClient { rx: BodyStream, name: String, build_token: String, + accepted_sources: Option>, } fn random_name() -> String { @@ -168,6 +174,10 @@ impl ClientJob { eprintln!("got {:?}", msg); let msg_kind = msg.as_object().unwrap().get("kind").unwrap().as_str().unwrap(); match msg_kind { + "new_job_please" => { + eprintln!("misdirected job request (after handshake?)"); + return; + }, "job_status" => { let state = msg.as_object().unwrap().get("state").unwrap().as_str().unwrap(); let (result, state): (Result, JobState) = if state == "finished" { @@ -224,7 +234,7 @@ impl ClientJob { } impl RunnerClient { - async fn new(sender: mpsc::Sender>, resp: BodyStream) -> Result { + async fn new(sender: mpsc::Sender>, resp: BodyStream, accepted_sources: Option>) -> Result { let name = random_name(); let token = token_for_job(); let client = RunnerClient { @@ -232,6 +242,7 @@ impl RunnerClient { rx: resp, name, build_token: token, + accepted_sources, }; Ok(client) } @@ -259,6 +270,17 @@ impl RunnerClient { } } + // is this client willing to run the job based on what it has told us so far? + fn will_accept(&self, job: &PendingJob) -> bool { + match (job.source.as_ref(), self.accepted_sources.as_ref()) { + (_, None) => true, + (None, Some(_)) => false, + (Some(source), Some(accepted_sources)) => { + accepted_sources.contains(source) + } + } + } + async fn submit(mut self, dbctx: &Arc, job: &PendingJob, remote_git_url: &str, sha: &str) -> Result, String> { self.send(serde_json::json!({ "commit": sha, @@ -358,31 +380,50 @@ async fn handle_artifact(State(ctx): State<(Arc, mpsc::Sender, mpsc::Sender)>, job_resp: BodyStream) -> impl IntoResponse { +#[derive(Serialize, Deserialize)] +struct WorkRequest { + kind: String, + accepted_pushers: Option> +} + +async fn handle_next_job(State(ctx): State<(Arc, mpsc::Sender)>, mut job_resp: BodyStream) -> impl IntoResponse { let (tx_sender, tx_receiver) = mpsc::channel(8); let resp_body = StreamBody::new(ReceiverStream::new(tx_receiver)); tx_sender.send(Ok("hello".to_string())).await.expect("works"); - let client = RunnerClient::new(tx_sender, job_resp).await; - match client { - Ok(client) => { - eprintln!("registering client"); - match ctx.1.try_send(client) { - Ok(()) => { - eprintln!("response established..."); - return (StatusCode::OK, resp_body); - } - Err(TrySendError::Full(client)) => { - return (StatusCode::IM_A_TEAPOT, resp_body); - } - Err(TrySendError::Closed(client)) => { - panic!("client holder is gone?"); - } - } + + let request = job_resp.next().await.expect("request chunk").expect("chunk exists"); + let request = std::str::from_utf8(&request).unwrap(); + let request: WorkRequest = match serde_json::from_str(&request) { + Ok(v) => v, + Err(e) => { + eprintln!("couldn't parse work request: {:?}", e); + return (StatusCode::MISDIRECTED_REQUEST, resp_body); } + }; + if &request.kind != "new_job_please" { + eprintln!("bad request kind: {:?}", &request.kind); + return (StatusCode::MISDIRECTED_REQUEST, resp_body); + } + + let client = match RunnerClient::new(tx_sender, job_resp, request.accepted_pushers).await { + Ok(v) => v, Err(e) => { eprintln!("unable to register client"); return (StatusCode::MISDIRECTED_REQUEST, resp_body); } + }; + + match ctx.1.try_send(client) { + Ok(()) => { + eprintln!("client requested work..."); + return (StatusCode::OK, resp_body); + } + Err(TrySendError::Full(client)) => { + return (StatusCode::IM_A_TEAPOT, resp_body); + } + Err(TrySendError::Closed(client)) => { + panic!("client holder is gone?"); + } } } -- cgit v1.1