summaryrefslogtreecommitdiff
path: root/src/ci_driver.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/ci_driver.rs')
-rw-r--r--src/ci_driver.rs77
1 files changed, 59 insertions, 18 deletions
diff --git a/src/ci_driver.rs b/src/ci_driver.rs
index bf21047..ef224b8 100644
--- a/src/ci_driver.rs
+++ b/src/ci_driver.rs
@@ -103,6 +103,11 @@ async fn activate_job(dbctx: Arc<DbCtx>, job: &PendingJob, clients: &mut mpsc::R
let mut candidate = clients.recv().await
.ok_or_else(|| "client channel disconnected".to_string())?;
+ if !candidate.will_accept(job) {
+ eprintln!("client {:?} would not accept job {:?}", candidate, job);
+ continue;
+ }
+
if let Ok(Some(mut client_job)) = candidate.submit(&dbctx, &job, &remote_git_url, &commit_sha).await {
break client_job;
} else {
@@ -130,6 +135,7 @@ struct RunnerClient {
rx: BodyStream,
name: String,
build_token: String,
+ accepted_sources: Option<Vec<String>>,
}
fn random_name() -> String {
@@ -168,6 +174,10 @@ impl ClientJob {
eprintln!("got {:?}", msg);
let msg_kind = msg.as_object().unwrap().get("kind").unwrap().as_str().unwrap();
match msg_kind {
+ "new_job_please" => {
+ eprintln!("misdirected job request (after handshake?)");
+ return;
+ },
"job_status" => {
let state = msg.as_object().unwrap().get("state").unwrap().as_str().unwrap();
let (result, state): (Result<String, String>, JobState) = if state == "finished" {
@@ -224,7 +234,7 @@ impl ClientJob {
}
impl RunnerClient {
- async fn new(sender: mpsc::Sender<Result<String, String>>, resp: BodyStream) -> Result<Self, String> {
+ async fn new(sender: mpsc::Sender<Result<String, String>>, resp: BodyStream, accepted_sources: Option<Vec<String>>) -> Result<Self, String> {
let name = random_name();
let token = token_for_job();
let client = RunnerClient {
@@ -232,6 +242,7 @@ impl RunnerClient {
rx: resp,
name,
build_token: token,
+ accepted_sources,
};
Ok(client)
}
@@ -259,6 +270,17 @@ impl RunnerClient {
}
}
+ // is this client willing to run the job based on what it has told us so far?
+ fn will_accept(&self, job: &PendingJob) -> bool {
+ match (job.source.as_ref(), self.accepted_sources.as_ref()) {
+ (_, None) => true,
+ (None, Some(_)) => false,
+ (Some(source), Some(accepted_sources)) => {
+ accepted_sources.contains(source)
+ }
+ }
+ }
+
async fn submit(mut self, dbctx: &Arc<DbCtx>, job: &PendingJob, remote_git_url: &str, sha: &str) -> Result<Option<ClientJob>, String> {
self.send(serde_json::json!({
"commit": sha,
@@ -358,31 +380,50 @@ async fn handle_artifact(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClien
(StatusCode::OK, "").into_response()
}
-async fn handle_next_job(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClient>)>, job_resp: BodyStream) -> impl IntoResponse {
+#[derive(Serialize, Deserialize)]
+struct WorkRequest {
+ kind: String,
+ accepted_pushers: Option<Vec<String>>
+}
+
+async fn handle_next_job(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClient>)>, mut job_resp: BodyStream) -> impl IntoResponse {
let (tx_sender, tx_receiver) = mpsc::channel(8);
let resp_body = StreamBody::new(ReceiverStream::new(tx_receiver));
tx_sender.send(Ok("hello".to_string())).await.expect("works");
- let client = RunnerClient::new(tx_sender, job_resp).await;
- match client {
- Ok(client) => {
- eprintln!("registering client");
- match ctx.1.try_send(client) {
- Ok(()) => {
- eprintln!("response established...");
- return (StatusCode::OK, resp_body);
- }
- Err(TrySendError::Full(client)) => {
- return (StatusCode::IM_A_TEAPOT, resp_body);
- }
- Err(TrySendError::Closed(client)) => {
- panic!("client holder is gone?");
- }
- }
+
+ let request = job_resp.next().await.expect("request chunk").expect("chunk exists");
+ let request = std::str::from_utf8(&request).unwrap();
+ let request: WorkRequest = match serde_json::from_str(&request) {
+ Ok(v) => v,
+ Err(e) => {
+ eprintln!("couldn't parse work request: {:?}", e);
+ return (StatusCode::MISDIRECTED_REQUEST, resp_body);
}
+ };
+ if &request.kind != "new_job_please" {
+ eprintln!("bad request kind: {:?}", &request.kind);
+ return (StatusCode::MISDIRECTED_REQUEST, resp_body);
+ }
+
+ let client = match RunnerClient::new(tx_sender, job_resp, request.accepted_pushers).await {
+ Ok(v) => v,
Err(e) => {
eprintln!("unable to register client");
return (StatusCode::MISDIRECTED_REQUEST, resp_body);
}
+ };
+
+ match ctx.1.try_send(client) {
+ Ok(()) => {
+ eprintln!("client requested work...");
+ return (StatusCode::OK, resp_body);
+ }
+ Err(TrySendError::Full(client)) => {
+ return (StatusCode::IM_A_TEAPOT, resp_body);
+ }
+ Err(TrySendError::Closed(client)) => {
+ panic!("client holder is gone?");
+ }
}
}