From 1fe3acc1422b09db27d179db05331a763b1db8a6 Mon Sep 17 00:00:00 2001 From: iximeow Date: Mon, 26 Dec 2022 00:21:32 +0000 Subject: let build runners indicate restricted interest --- src/ci_driver.rs | 77 +++++++++++++++++++++++++++++++++++++++++++------------- src/ci_runner.rs | 15 ++++++++--- src/dbctx.rs | 12 +++++---- src/main.rs | 24 +++++++++++++----- src/sql.rs | 5 ++-- 5 files changed, 98 insertions(+), 35 deletions(-) (limited to 'src') diff --git a/src/ci_driver.rs b/src/ci_driver.rs index bf21047..ef224b8 100644 --- a/src/ci_driver.rs +++ b/src/ci_driver.rs @@ -103,6 +103,11 @@ async fn activate_job(dbctx: Arc, job: &PendingJob, clients: &mut mpsc::R let mut candidate = clients.recv().await .ok_or_else(|| "client channel disconnected".to_string())?; + if !candidate.will_accept(job) { + eprintln!("client {:?} would not accept job {:?}", candidate, job); + continue; + } + if let Ok(Some(mut client_job)) = candidate.submit(&dbctx, &job, &remote_git_url, &commit_sha).await { break client_job; } else { @@ -130,6 +135,7 @@ struct RunnerClient { rx: BodyStream, name: String, build_token: String, + accepted_sources: Option>, } fn random_name() -> String { @@ -168,6 +174,10 @@ impl ClientJob { eprintln!("got {:?}", msg); let msg_kind = msg.as_object().unwrap().get("kind").unwrap().as_str().unwrap(); match msg_kind { + "new_job_please" => { + eprintln!("misdirected job request (after handshake?)"); + return; + }, "job_status" => { let state = msg.as_object().unwrap().get("state").unwrap().as_str().unwrap(); let (result, state): (Result, JobState) = if state == "finished" { @@ -224,7 +234,7 @@ impl ClientJob { } impl RunnerClient { - async fn new(sender: mpsc::Sender>, resp: BodyStream) -> Result { + async fn new(sender: mpsc::Sender>, resp: BodyStream, accepted_sources: Option>) -> Result { let name = random_name(); let token = token_for_job(); let client = RunnerClient { @@ -232,6 +242,7 @@ impl RunnerClient { rx: resp, name, build_token: token, + accepted_sources, }; Ok(client) } @@ -259,6 +270,17 @@ impl RunnerClient { } } + // is this client willing to run the job based on what it has told us so far? + fn will_accept(&self, job: &PendingJob) -> bool { + match (job.source.as_ref(), self.accepted_sources.as_ref()) { + (_, None) => true, + (None, Some(_)) => false, + (Some(source), Some(accepted_sources)) => { + accepted_sources.contains(source) + } + } + } + async fn submit(mut self, dbctx: &Arc, job: &PendingJob, remote_git_url: &str, sha: &str) -> Result, String> { self.send(serde_json::json!({ "commit": sha, @@ -358,31 +380,50 @@ async fn handle_artifact(State(ctx): State<(Arc, mpsc::Sender, mpsc::Sender)>, job_resp: BodyStream) -> impl IntoResponse { +#[derive(Serialize, Deserialize)] +struct WorkRequest { + kind: String, + accepted_pushers: Option> +} + +async fn handle_next_job(State(ctx): State<(Arc, mpsc::Sender)>, mut job_resp: BodyStream) -> impl IntoResponse { let (tx_sender, tx_receiver) = mpsc::channel(8); let resp_body = StreamBody::new(ReceiverStream::new(tx_receiver)); tx_sender.send(Ok("hello".to_string())).await.expect("works"); - let client = RunnerClient::new(tx_sender, job_resp).await; - match client { - Ok(client) => { - eprintln!("registering client"); - match ctx.1.try_send(client) { - Ok(()) => { - eprintln!("response established..."); - return (StatusCode::OK, resp_body); - } - Err(TrySendError::Full(client)) => { - return (StatusCode::IM_A_TEAPOT, resp_body); - } - Err(TrySendError::Closed(client)) => { - panic!("client holder is gone?"); - } - } + + let request = job_resp.next().await.expect("request chunk").expect("chunk exists"); + let request = std::str::from_utf8(&request).unwrap(); + let request: WorkRequest = match serde_json::from_str(&request) { + Ok(v) => v, + Err(e) => { + eprintln!("couldn't parse work request: {:?}", e); + return (StatusCode::MISDIRECTED_REQUEST, resp_body); } + }; + if &request.kind != "new_job_please" { + eprintln!("bad request kind: {:?}", &request.kind); + return (StatusCode::MISDIRECTED_REQUEST, resp_body); + } + + let client = match RunnerClient::new(tx_sender, job_resp, request.accepted_pushers).await { + Ok(v) => v, Err(e) => { eprintln!("unable to register client"); return (StatusCode::MISDIRECTED_REQUEST, resp_body); } + }; + + match ctx.1.try_send(client) { + Ok(()) => { + eprintln!("client requested work..."); + return (StatusCode::OK, resp_body); + } + Err(TrySendError::Full(client)) => { + return (StatusCode::IM_A_TEAPOT, resp_body); + } + Err(TrySendError::Closed(client)) => { + panic!("client holder is gone?"); + } } } diff --git a/src/ci_runner.rs b/src/ci_runner.rs index 9224614..b02c4ff 100644 --- a/src/ci_runner.rs +++ b/src/ci_runner.rs @@ -5,6 +5,7 @@ use std::process::Stdio; use std::process::ExitStatus; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use serde_derive::{Deserialize, Serialize}; +use serde_json::json; use serde::{Deserialize, de::DeserializeOwned, Serialize}; use std::task::{Context, Poll}; use std::pin::Pin; @@ -199,7 +200,7 @@ impl tokio::io::AsyncWrite for ArtifactStream { } impl RunnerClient { - async fn new(host: &str, sender: hyper::body::Sender, mut res: Response) -> Result { + async fn new(host: &str, mut sender: hyper::body::Sender, mut res: Response) -> Result { if res.status() != StatusCode::OK { return Err(format!("server returned a bad response: {:?}, response itself: {:?}", res.status(), res)); } @@ -222,7 +223,7 @@ impl RunnerClient { }) } - async fn wait_for_work(&mut self) -> Result, WorkAcquireError> { + async fn wait_for_work(&mut self, accepted_pushers: Option<&[String]>) -> Result, WorkAcquireError> { match self.rx.chunk().await { Ok(Some(chunk)) => { eprintln!("got chunk: {:?}", &chunk); @@ -312,10 +313,16 @@ async fn main() { .build() .expect("can build client"); - let allowed_pushers = None; + let allowed_pushers: Option> = None; loop { let (mut sender, body) = hyper::Body::channel(); + + sender.send_data(serde_json::to_string(&json!({ + "kind": "new_job_please", + "accepted_pushers": &["git@iximeow.net", "me@iximeow.net"], + })).unwrap().into()).await.expect("req"); + let poll = client.post("https://ci.butactuallyin.space:9876/api/next_job") .header("user-agent", "ci-butactuallyin-space-runner") .header("authorization", &secret) @@ -333,7 +340,7 @@ async fn main() { continue; } }; - let job = match client.wait_for_work().await { + let job = match client.wait_for_work(allowed_pushers.as_ref().map(|x| x.as_ref())).await { Ok(Some(request)) => request, Ok(None) => { eprintln!("no work to do (yet)"); diff --git a/src/dbctx.rs b/src/dbctx.rs index d025b8d..c4eb767 100644 --- a/src/dbctx.rs +++ b/src/dbctx.rs @@ -25,6 +25,7 @@ pub struct PendingJob { pub remote_id: u64, pub commit_id: u64, pub created_time: u64, + pub source: Option, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -219,7 +220,7 @@ impl DbCtx { Ok(conn.last_insert_rowid() as u64) } - pub fn new_job(&self, remote_id: u64, sha: &str) -> Result { + pub fn new_job(&self, remote_id: u64, sha: &str, pusher: Option<&str>) -> Result { // TODO: potential race: if two remotes learn about a commit at the same time and we decide // to create two jobs at the same time, this might return an incorrect id if the insert // didn't actually insert a new row. @@ -233,8 +234,8 @@ impl DbCtx { let conn = self.conn.lock().unwrap(); let rows_modified = conn.execute( - "insert into jobs (state, remote_id, commit_id, created_time) values (?1, ?2, ?3, ?4);", - (crate::sql::JobState::Pending as u64, remote_id, commit_id, created_time) + "insert into jobs (state, remote_id, commit_id, created_time, source) values (?1, ?2, ?3, ?4, ?5);", + (crate::sql::JobState::Pending as u64, remote_id, commit_id, created_time, pusher) ).unwrap(); assert_eq!(1, rows_modified); @@ -250,12 +251,13 @@ impl DbCtx { let mut pending = Vec::new(); while let Some(row) = jobs.next().unwrap() { - let (id, artifacts, state, run_host, remote_id, commit_id, created_time) = row.try_into().unwrap(); + let (id, artifacts, state, run_host, remote_id, commit_id, created_time, source) = row.try_into().unwrap(); let state: u8 = state; pending.push(PendingJob { id, artifacts, state: state.try_into().unwrap(), - run_host, remote_id, commit_id, created_time + run_host, remote_id, commit_id, created_time, + source, }); } diff --git a/src/main.rs b/src/main.rs index 56aca01..56ad88b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -43,7 +43,7 @@ enum GithubHookError { #[derive(Debug)] enum GithubEvent { - Push { tip: String, repo_name: String, head_commit: serde_json::Map }, + Push { tip: String, repo_name: String, head_commit: serde_json::Map, pusher: serde_json::Map }, Other {} } @@ -73,17 +73,23 @@ fn parse_push_event(body: serde_json::Value) -> Result, owner: String, repo: String, event: GithubEvent) -> impl IntoResponse { - let (sha, repo, head_commit) = if let GithubEvent::Push { tip, repo_name, head_commit } = event { - (tip, repo_name, head_commit) + let (sha, repo, head_commit, pusher) = if let GithubEvent::Push { tip, repo_name, head_commit, pusher } = event { + (tip, repo_name, head_commit, pusher) } else { panic!("process push event on non-push event"); }; - println!("handling push event to {}/{}: sha {} in repo {}, {:?}", owner, repo, sha, repo, head_commit); + println!("handling push event to {}/{}: sha {} in repo {}, {:?}\n pusher: {:?}", owner, repo, sha, repo, head_commit, pusher); // push event is in terms of a ref, but we don't know if it's a new commit (yet). // in terms of CI jobs, we care mainly about new commits. @@ -115,7 +121,13 @@ async fn process_push_event(ctx: Arc, owner: String, repo: String, event: } }; - let job_id = ctx.new_job(remote_id, &sha).unwrap(); + let pusher_email = pusher + .get("email") + .expect("has email") + .as_str() + .expect("is str"); + + let job_id = ctx.new_job(remote_id, &sha, Some(pusher_email)).unwrap(); let notifiers = ctx.notifiers_by_repo(repo_id).expect("can get notifiers"); diff --git a/src/sql.rs b/src/sql.rs index 81c58de..3476643 100644 --- a/src/sql.rs +++ b/src/sql.rs @@ -39,7 +39,8 @@ pub const CREATE_JOBS_TABLE: &'static str = "\ created_time INTEGER, started_time INTEGER, complete_time INTEGER, - job_timeout INTEGER);"; + job_timeout INTEGER, + source TEXT);"; pub const CREATE_COMMITS_TABLE: &'static str = "\ CREATE TABLE IF NOT EXISTS commits (id INTEGER PRIMARY KEY AUTOINCREMENT, sha TEXT UNIQUE);"; @@ -76,7 +77,7 @@ pub const CREATE_REPO_NAME_INDEX: &'static str = "\ CREATE UNIQUE INDEX IF NOT EXISTS 'repo_names' ON repos(repo_name);"; pub const PENDING_JOBS: &'static str = "\ - select * from jobs where state=0;"; + select id, artifacts_path, state, run_host, remote_id, commit_id, created_time, source from jobs where state=0;"; pub const COMMIT_TO_ID: &'static str = "\ select id from commits where sha=?1;"; -- cgit v1.1