summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authoriximeow <git@iximeow.net>2022-12-26 00:21:32 +0000
committeriximeow <git@iximeow.net>2022-12-26 00:21:32 +0000
commit1fe3acc1422b09db27d179db05331a763b1db8a6 (patch)
tree0587e39f80b7fedce16cf7327a3aedf7951984ed /src
parentb3dbd762d9bc21cf48357fa50901e125b42becc0 (diff)
let build runners indicate restricted interest
Diffstat (limited to 'src')
-rw-r--r--src/ci_driver.rs77
-rw-r--r--src/ci_runner.rs15
-rw-r--r--src/dbctx.rs12
-rw-r--r--src/main.rs24
-rw-r--r--src/sql.rs5
5 files changed, 98 insertions, 35 deletions
diff --git a/src/ci_driver.rs b/src/ci_driver.rs
index bf21047..ef224b8 100644
--- a/src/ci_driver.rs
+++ b/src/ci_driver.rs
@@ -103,6 +103,11 @@ async fn activate_job(dbctx: Arc<DbCtx>, job: &PendingJob, clients: &mut mpsc::R
let mut candidate = clients.recv().await
.ok_or_else(|| "client channel disconnected".to_string())?;
+ if !candidate.will_accept(job) {
+ eprintln!("client {:?} would not accept job {:?}", candidate, job);
+ continue;
+ }
+
if let Ok(Some(mut client_job)) = candidate.submit(&dbctx, &job, &remote_git_url, &commit_sha).await {
break client_job;
} else {
@@ -130,6 +135,7 @@ struct RunnerClient {
rx: BodyStream,
name: String,
build_token: String,
+ accepted_sources: Option<Vec<String>>,
}
fn random_name() -> String {
@@ -168,6 +174,10 @@ impl ClientJob {
eprintln!("got {:?}", msg);
let msg_kind = msg.as_object().unwrap().get("kind").unwrap().as_str().unwrap();
match msg_kind {
+ "new_job_please" => {
+ eprintln!("misdirected job request (after handshake?)");
+ return;
+ },
"job_status" => {
let state = msg.as_object().unwrap().get("state").unwrap().as_str().unwrap();
let (result, state): (Result<String, String>, JobState) = if state == "finished" {
@@ -224,7 +234,7 @@ impl ClientJob {
}
impl RunnerClient {
- async fn new(sender: mpsc::Sender<Result<String, String>>, resp: BodyStream) -> Result<Self, String> {
+ async fn new(sender: mpsc::Sender<Result<String, String>>, resp: BodyStream, accepted_sources: Option<Vec<String>>) -> Result<Self, String> {
let name = random_name();
let token = token_for_job();
let client = RunnerClient {
@@ -232,6 +242,7 @@ impl RunnerClient {
rx: resp,
name,
build_token: token,
+ accepted_sources,
};
Ok(client)
}
@@ -259,6 +270,17 @@ impl RunnerClient {
}
}
+ // is this client willing to run the job based on what it has told us so far?
+ fn will_accept(&self, job: &PendingJob) -> bool {
+ match (job.source.as_ref(), self.accepted_sources.as_ref()) {
+ (_, None) => true,
+ (None, Some(_)) => false,
+ (Some(source), Some(accepted_sources)) => {
+ accepted_sources.contains(source)
+ }
+ }
+ }
+
async fn submit(mut self, dbctx: &Arc<DbCtx>, job: &PendingJob, remote_git_url: &str, sha: &str) -> Result<Option<ClientJob>, String> {
self.send(serde_json::json!({
"commit": sha,
@@ -358,31 +380,50 @@ async fn handle_artifact(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClien
(StatusCode::OK, "").into_response()
}
-async fn handle_next_job(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClient>)>, job_resp: BodyStream) -> impl IntoResponse {
+#[derive(Serialize, Deserialize)]
+struct WorkRequest {
+ kind: String,
+ accepted_pushers: Option<Vec<String>>
+}
+
+async fn handle_next_job(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClient>)>, mut job_resp: BodyStream) -> impl IntoResponse {
let (tx_sender, tx_receiver) = mpsc::channel(8);
let resp_body = StreamBody::new(ReceiverStream::new(tx_receiver));
tx_sender.send(Ok("hello".to_string())).await.expect("works");
- let client = RunnerClient::new(tx_sender, job_resp).await;
- match client {
- Ok(client) => {
- eprintln!("registering client");
- match ctx.1.try_send(client) {
- Ok(()) => {
- eprintln!("response established...");
- return (StatusCode::OK, resp_body);
- }
- Err(TrySendError::Full(client)) => {
- return (StatusCode::IM_A_TEAPOT, resp_body);
- }
- Err(TrySendError::Closed(client)) => {
- panic!("client holder is gone?");
- }
- }
+
+ let request = job_resp.next().await.expect("request chunk").expect("chunk exists");
+ let request = std::str::from_utf8(&request).unwrap();
+ let request: WorkRequest = match serde_json::from_str(&request) {
+ Ok(v) => v,
+ Err(e) => {
+ eprintln!("couldn't parse work request: {:?}", e);
+ return (StatusCode::MISDIRECTED_REQUEST, resp_body);
}
+ };
+ if &request.kind != "new_job_please" {
+ eprintln!("bad request kind: {:?}", &request.kind);
+ return (StatusCode::MISDIRECTED_REQUEST, resp_body);
+ }
+
+ let client = match RunnerClient::new(tx_sender, job_resp, request.accepted_pushers).await {
+ Ok(v) => v,
Err(e) => {
eprintln!("unable to register client");
return (StatusCode::MISDIRECTED_REQUEST, resp_body);
}
+ };
+
+ match ctx.1.try_send(client) {
+ Ok(()) => {
+ eprintln!("client requested work...");
+ return (StatusCode::OK, resp_body);
+ }
+ Err(TrySendError::Full(client)) => {
+ return (StatusCode::IM_A_TEAPOT, resp_body);
+ }
+ Err(TrySendError::Closed(client)) => {
+ panic!("client holder is gone?");
+ }
}
}
diff --git a/src/ci_runner.rs b/src/ci_runner.rs
index 9224614..b02c4ff 100644
--- a/src/ci_runner.rs
+++ b/src/ci_runner.rs
@@ -5,6 +5,7 @@ use std::process::Stdio;
use std::process::ExitStatus;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use serde_derive::{Deserialize, Serialize};
+use serde_json::json;
use serde::{Deserialize, de::DeserializeOwned, Serialize};
use std::task::{Context, Poll};
use std::pin::Pin;
@@ -199,7 +200,7 @@ impl tokio::io::AsyncWrite for ArtifactStream {
}
impl RunnerClient {
- async fn new(host: &str, sender: hyper::body::Sender, mut res: Response) -> Result<Self, String> {
+ async fn new(host: &str, mut sender: hyper::body::Sender, mut res: Response) -> Result<Self, String> {
if res.status() != StatusCode::OK {
return Err(format!("server returned a bad response: {:?}, response itself: {:?}", res.status(), res));
}
@@ -222,7 +223,7 @@ impl RunnerClient {
})
}
- async fn wait_for_work(&mut self) -> Result<Option<RequestedJob>, WorkAcquireError> {
+ async fn wait_for_work(&mut self, accepted_pushers: Option<&[String]>) -> Result<Option<RequestedJob>, WorkAcquireError> {
match self.rx.chunk().await {
Ok(Some(chunk)) => {
eprintln!("got chunk: {:?}", &chunk);
@@ -312,10 +313,16 @@ async fn main() {
.build()
.expect("can build client");
- let allowed_pushers = None;
+ let allowed_pushers: Option<Vec<String>> = None;
loop {
let (mut sender, body) = hyper::Body::channel();
+
+ sender.send_data(serde_json::to_string(&json!({
+ "kind": "new_job_please",
+ "accepted_pushers": &["git@iximeow.net", "me@iximeow.net"],
+ })).unwrap().into()).await.expect("req");
+
let poll = client.post("https://ci.butactuallyin.space:9876/api/next_job")
.header("user-agent", "ci-butactuallyin-space-runner")
.header("authorization", &secret)
@@ -333,7 +340,7 @@ async fn main() {
continue;
}
};
- let job = match client.wait_for_work().await {
+ let job = match client.wait_for_work(allowed_pushers.as_ref().map(|x| x.as_ref())).await {
Ok(Some(request)) => request,
Ok(None) => {
eprintln!("no work to do (yet)");
diff --git a/src/dbctx.rs b/src/dbctx.rs
index d025b8d..c4eb767 100644
--- a/src/dbctx.rs
+++ b/src/dbctx.rs
@@ -25,6 +25,7 @@ pub struct PendingJob {
pub remote_id: u64,
pub commit_id: u64,
pub created_time: u64,
+ pub source: Option<String>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -219,7 +220,7 @@ impl DbCtx {
Ok(conn.last_insert_rowid() as u64)
}
- pub fn new_job(&self, remote_id: u64, sha: &str) -> Result<u64, String> {
+ pub fn new_job(&self, remote_id: u64, sha: &str, pusher: Option<&str>) -> Result<u64, String> {
// TODO: potential race: if two remotes learn about a commit at the same time and we decide
// to create two jobs at the same time, this might return an incorrect id if the insert
// didn't actually insert a new row.
@@ -233,8 +234,8 @@ impl DbCtx {
let conn = self.conn.lock().unwrap();
let rows_modified = conn.execute(
- "insert into jobs (state, remote_id, commit_id, created_time) values (?1, ?2, ?3, ?4);",
- (crate::sql::JobState::Pending as u64, remote_id, commit_id, created_time)
+ "insert into jobs (state, remote_id, commit_id, created_time, source) values (?1, ?2, ?3, ?4, ?5);",
+ (crate::sql::JobState::Pending as u64, remote_id, commit_id, created_time, pusher)
).unwrap();
assert_eq!(1, rows_modified);
@@ -250,12 +251,13 @@ impl DbCtx {
let mut pending = Vec::new();
while let Some(row) = jobs.next().unwrap() {
- let (id, artifacts, state, run_host, remote_id, commit_id, created_time) = row.try_into().unwrap();
+ let (id, artifacts, state, run_host, remote_id, commit_id, created_time, source) = row.try_into().unwrap();
let state: u8 = state;
pending.push(PendingJob {
id, artifacts,
state: state.try_into().unwrap(),
- run_host, remote_id, commit_id, created_time
+ run_host, remote_id, commit_id, created_time,
+ source,
});
}
diff --git a/src/main.rs b/src/main.rs
index 56aca01..56ad88b 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -43,7 +43,7 @@ enum GithubHookError {
#[derive(Debug)]
enum GithubEvent {
- Push { tip: String, repo_name: String, head_commit: serde_json::Map<String, serde_json::Value> },
+ Push { tip: String, repo_name: String, head_commit: serde_json::Map<String, serde_json::Value>, pusher: serde_json::Map<String, serde_json::Value> },
Other {}
}
@@ -73,17 +73,23 @@ fn parse_push_event(body: serde_json::Value) -> Result<GithubEvent, GithubHookEr
.ok_or(GithubHookError::BadType { path: "head_commit", expected: "obj" })?
.to_owned();
- Ok(GithubEvent::Push { tip, repo_name, head_commit })
+ let pusher = body.get("pusher")
+ .ok_or(GithubHookError::MissingElement { path: "pusher" })?
+ .as_object()
+ .ok_or(GithubHookError::BadType { path: "pusher", expected: "obj" })?
+ .to_owned();
+
+ Ok(GithubEvent::Push { tip, repo_name, head_commit, pusher })
}
async fn process_push_event(ctx: Arc<DbCtx>, owner: String, repo: String, event: GithubEvent) -> impl IntoResponse {
- let (sha, repo, head_commit) = if let GithubEvent::Push { tip, repo_name, head_commit } = event {
- (tip, repo_name, head_commit)
+ let (sha, repo, head_commit, pusher) = if let GithubEvent::Push { tip, repo_name, head_commit, pusher } = event {
+ (tip, repo_name, head_commit, pusher)
} else {
panic!("process push event on non-push event");
};
- println!("handling push event to {}/{}: sha {} in repo {}, {:?}", owner, repo, sha, repo, head_commit);
+ println!("handling push event to {}/{}: sha {} in repo {}, {:?}\n pusher: {:?}", owner, repo, sha, repo, head_commit, pusher);
// push event is in terms of a ref, but we don't know if it's a new commit (yet).
// in terms of CI jobs, we care mainly about new commits.
@@ -115,7 +121,13 @@ async fn process_push_event(ctx: Arc<DbCtx>, owner: String, repo: String, event:
}
};
- let job_id = ctx.new_job(remote_id, &sha).unwrap();
+ let pusher_email = pusher
+ .get("email")
+ .expect("has email")
+ .as_str()
+ .expect("is str");
+
+ let job_id = ctx.new_job(remote_id, &sha, Some(pusher_email)).unwrap();
let notifiers = ctx.notifiers_by_repo(repo_id).expect("can get notifiers");
diff --git a/src/sql.rs b/src/sql.rs
index 81c58de..3476643 100644
--- a/src/sql.rs
+++ b/src/sql.rs
@@ -39,7 +39,8 @@ pub const CREATE_JOBS_TABLE: &'static str = "\
created_time INTEGER,
started_time INTEGER,
complete_time INTEGER,
- job_timeout INTEGER);";
+ job_timeout INTEGER,
+ source TEXT);";
pub const CREATE_COMMITS_TABLE: &'static str = "\
CREATE TABLE IF NOT EXISTS commits (id INTEGER PRIMARY KEY AUTOINCREMENT, sha TEXT UNIQUE);";
@@ -76,7 +77,7 @@ pub const CREATE_REPO_NAME_INDEX: &'static str = "\
CREATE UNIQUE INDEX IF NOT EXISTS 'repo_names' ON repos(repo_name);";
pub const PENDING_JOBS: &'static str = "\
- select * from jobs where state=0;";
+ select id, artifacts_path, state, run_host, remote_id, commit_id, created_time, source from jobs where state=0;";
pub const COMMIT_TO_ID: &'static str = "\
select id from commits where sha=?1;";