diff options
author | iximeow <git@iximeow.net> | 2022-12-26 00:21:32 +0000 |
---|---|---|
committer | iximeow <git@iximeow.net> | 2022-12-26 00:21:32 +0000 |
commit | 1fe3acc1422b09db27d179db05331a763b1db8a6 (patch) | |
tree | 0587e39f80b7fedce16cf7327a3aedf7951984ed /src/ci_driver.rs | |
parent | b3dbd762d9bc21cf48357fa50901e125b42becc0 (diff) |
let build runners indicate restricted interest
Diffstat (limited to 'src/ci_driver.rs')
-rw-r--r-- | src/ci_driver.rs | 77 |
1 files changed, 59 insertions, 18 deletions
diff --git a/src/ci_driver.rs b/src/ci_driver.rs index bf21047..ef224b8 100644 --- a/src/ci_driver.rs +++ b/src/ci_driver.rs @@ -103,6 +103,11 @@ async fn activate_job(dbctx: Arc<DbCtx>, job: &PendingJob, clients: &mut mpsc::R let mut candidate = clients.recv().await .ok_or_else(|| "client channel disconnected".to_string())?; + if !candidate.will_accept(job) { + eprintln!("client {:?} would not accept job {:?}", candidate, job); + continue; + } + if let Ok(Some(mut client_job)) = candidate.submit(&dbctx, &job, &remote_git_url, &commit_sha).await { break client_job; } else { @@ -130,6 +135,7 @@ struct RunnerClient { rx: BodyStream, name: String, build_token: String, + accepted_sources: Option<Vec<String>>, } fn random_name() -> String { @@ -168,6 +174,10 @@ impl ClientJob { eprintln!("got {:?}", msg); let msg_kind = msg.as_object().unwrap().get("kind").unwrap().as_str().unwrap(); match msg_kind { + "new_job_please" => { + eprintln!("misdirected job request (after handshake?)"); + return; + }, "job_status" => { let state = msg.as_object().unwrap().get("state").unwrap().as_str().unwrap(); let (result, state): (Result<String, String>, JobState) = if state == "finished" { @@ -224,7 +234,7 @@ impl ClientJob { } impl RunnerClient { - async fn new(sender: mpsc::Sender<Result<String, String>>, resp: BodyStream) -> Result<Self, String> { + async fn new(sender: mpsc::Sender<Result<String, String>>, resp: BodyStream, accepted_sources: Option<Vec<String>>) -> Result<Self, String> { let name = random_name(); let token = token_for_job(); let client = RunnerClient { @@ -232,6 +242,7 @@ impl RunnerClient { rx: resp, name, build_token: token, + accepted_sources, }; Ok(client) } @@ -259,6 +270,17 @@ impl RunnerClient { } } + // is this client willing to run the job based on what it has told us so far? + fn will_accept(&self, job: &PendingJob) -> bool { + match (job.source.as_ref(), self.accepted_sources.as_ref()) { + (_, None) => true, + (None, Some(_)) => false, + (Some(source), Some(accepted_sources)) => { + accepted_sources.contains(source) + } + } + } + async fn submit(mut self, dbctx: &Arc<DbCtx>, job: &PendingJob, remote_git_url: &str, sha: &str) -> Result<Option<ClientJob>, String> { self.send(serde_json::json!({ "commit": sha, @@ -358,31 +380,50 @@ async fn handle_artifact(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClien (StatusCode::OK, "").into_response() } -async fn handle_next_job(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClient>)>, job_resp: BodyStream) -> impl IntoResponse { +#[derive(Serialize, Deserialize)] +struct WorkRequest { + kind: String, + accepted_pushers: Option<Vec<String>> +} + +async fn handle_next_job(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClient>)>, mut job_resp: BodyStream) -> impl IntoResponse { let (tx_sender, tx_receiver) = mpsc::channel(8); let resp_body = StreamBody::new(ReceiverStream::new(tx_receiver)); tx_sender.send(Ok("hello".to_string())).await.expect("works"); - let client = RunnerClient::new(tx_sender, job_resp).await; - match client { - Ok(client) => { - eprintln!("registering client"); - match ctx.1.try_send(client) { - Ok(()) => { - eprintln!("response established..."); - return (StatusCode::OK, resp_body); - } - Err(TrySendError::Full(client)) => { - return (StatusCode::IM_A_TEAPOT, resp_body); - } - Err(TrySendError::Closed(client)) => { - panic!("client holder is gone?"); - } - } + + let request = job_resp.next().await.expect("request chunk").expect("chunk exists"); + let request = std::str::from_utf8(&request).unwrap(); + let request: WorkRequest = match serde_json::from_str(&request) { + Ok(v) => v, + Err(e) => { + eprintln!("couldn't parse work request: {:?}", e); + return (StatusCode::MISDIRECTED_REQUEST, resp_body); } + }; + if &request.kind != "new_job_please" { + eprintln!("bad request kind: {:?}", &request.kind); + return (StatusCode::MISDIRECTED_REQUEST, resp_body); + } + + let client = match RunnerClient::new(tx_sender, job_resp, request.accepted_pushers).await { + Ok(v) => v, Err(e) => { eprintln!("unable to register client"); return (StatusCode::MISDIRECTED_REQUEST, resp_body); } + }; + + match ctx.1.try_send(client) { + Ok(()) => { + eprintln!("client requested work..."); + return (StatusCode::OK, resp_body); + } + Err(TrySendError::Full(client)) => { + return (StatusCode::IM_A_TEAPOT, resp_body); + } + Err(TrySendError::Closed(client)) => { + panic!("client holder is gone?"); + } } } |