diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/ci_driver.rs | 77 | ||||
| -rw-r--r-- | src/ci_runner.rs | 15 | ||||
| -rw-r--r-- | src/dbctx.rs | 12 | ||||
| -rw-r--r-- | src/main.rs | 24 | ||||
| -rw-r--r-- | src/sql.rs | 5 | 
5 files changed, 98 insertions, 35 deletions
| diff --git a/src/ci_driver.rs b/src/ci_driver.rs index bf21047..ef224b8 100644 --- a/src/ci_driver.rs +++ b/src/ci_driver.rs @@ -103,6 +103,11 @@ async fn activate_job(dbctx: Arc<DbCtx>, job: &PendingJob, clients: &mut mpsc::R          let mut candidate = clients.recv().await              .ok_or_else(|| "client channel disconnected".to_string())?; +        if !candidate.will_accept(job) { +            eprintln!("client {:?} would not accept job {:?}", candidate, job); +            continue; +        } +          if let Ok(Some(mut client_job)) = candidate.submit(&dbctx, &job, &remote_git_url, &commit_sha).await {              break client_job;          } else { @@ -130,6 +135,7 @@ struct RunnerClient {      rx: BodyStream,      name: String,      build_token: String, +    accepted_sources: Option<Vec<String>>,  }  fn random_name() -> String { @@ -168,6 +174,10 @@ impl ClientJob {              eprintln!("got {:?}", msg);              let msg_kind = msg.as_object().unwrap().get("kind").unwrap().as_str().unwrap();              match msg_kind { +                "new_job_please" => { +                    eprintln!("misdirected job request (after handshake?)"); +                    return; +                },                  "job_status" => {                      let state = msg.as_object().unwrap().get("state").unwrap().as_str().unwrap();                      let (result, state): (Result<String, String>, JobState) = if state == "finished" { @@ -224,7 +234,7 @@ impl ClientJob {  }  impl RunnerClient { -    async fn new(sender: mpsc::Sender<Result<String, String>>, resp: BodyStream) -> Result<Self, String> { +    async fn new(sender: mpsc::Sender<Result<String, String>>, resp: BodyStream, accepted_sources: Option<Vec<String>>) -> Result<Self, String> {          let name = random_name();          let token = token_for_job();          let client = RunnerClient { @@ -232,6 +242,7 @@ impl RunnerClient {              rx: resp,              name,              build_token: token, +            accepted_sources,          };          Ok(client)      } @@ -259,6 +270,17 @@ impl RunnerClient {          }      } +    // is this client willing to run the job based on what it has told us so far? +    fn will_accept(&self, job: &PendingJob) -> bool { +        match (job.source.as_ref(), self.accepted_sources.as_ref()) { +            (_, None) => true, +            (None, Some(_)) => false, +            (Some(source), Some(accepted_sources)) => { +                accepted_sources.contains(source) +            } +        } +    } +      async fn submit(mut self, dbctx: &Arc<DbCtx>, job: &PendingJob, remote_git_url: &str, sha: &str) -> Result<Option<ClientJob>, String> {          self.send(serde_json::json!({              "commit": sha, @@ -358,31 +380,50 @@ async fn handle_artifact(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClien      (StatusCode::OK, "").into_response()  } -async fn handle_next_job(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClient>)>, job_resp: BodyStream) -> impl IntoResponse { +#[derive(Serialize, Deserialize)] +struct WorkRequest { +    kind: String, +    accepted_pushers: Option<Vec<String>> +} + +async fn handle_next_job(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClient>)>, mut job_resp: BodyStream) -> impl IntoResponse {      let (tx_sender, tx_receiver) = mpsc::channel(8);      let resp_body = StreamBody::new(ReceiverStream::new(tx_receiver));      tx_sender.send(Ok("hello".to_string())).await.expect("works"); -    let client = RunnerClient::new(tx_sender, job_resp).await; -    match client { -        Ok(client) => { -            eprintln!("registering client"); -            match ctx.1.try_send(client) { -                Ok(()) => { -                    eprintln!("response established..."); -                    return (StatusCode::OK, resp_body); -                } -                Err(TrySendError::Full(client)) => { -                    return (StatusCode::IM_A_TEAPOT, resp_body); -                } -                Err(TrySendError::Closed(client)) => { -                    panic!("client holder is gone?"); -                } -            } + +    let request = job_resp.next().await.expect("request chunk").expect("chunk exists"); +    let request = std::str::from_utf8(&request).unwrap(); +    let request: WorkRequest = match serde_json::from_str(&request) { +        Ok(v) => v, +        Err(e) => { +            eprintln!("couldn't parse work request: {:?}", e); +            return (StatusCode::MISDIRECTED_REQUEST, resp_body);          } +    }; +    if &request.kind != "new_job_please" { +        eprintln!("bad request kind: {:?}", &request.kind); +        return (StatusCode::MISDIRECTED_REQUEST, resp_body); +    } + +    let client = match RunnerClient::new(tx_sender, job_resp, request.accepted_pushers).await { +        Ok(v) => v,          Err(e) => {              eprintln!("unable to register client");              return (StatusCode::MISDIRECTED_REQUEST, resp_body);          } +    }; + +    match ctx.1.try_send(client) { +        Ok(()) => { +            eprintln!("client requested work..."); +            return (StatusCode::OK, resp_body); +        } +        Err(TrySendError::Full(client)) => { +            return (StatusCode::IM_A_TEAPOT, resp_body); +        } +        Err(TrySendError::Closed(client)) => { +            panic!("client holder is gone?"); +        }      }  } diff --git a/src/ci_runner.rs b/src/ci_runner.rs index 9224614..b02c4ff 100644 --- a/src/ci_runner.rs +++ b/src/ci_runner.rs @@ -5,6 +5,7 @@ use std::process::Stdio;  use std::process::ExitStatus;  use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};  use serde_derive::{Deserialize, Serialize}; +use serde_json::json;  use serde::{Deserialize, de::DeserializeOwned, Serialize};  use std::task::{Context, Poll};  use std::pin::Pin; @@ -199,7 +200,7 @@ impl tokio::io::AsyncWrite for ArtifactStream {  }  impl RunnerClient { -    async fn new(host: &str, sender: hyper::body::Sender, mut res: Response) -> Result<Self, String> { +    async fn new(host: &str, mut sender: hyper::body::Sender, mut res: Response) -> Result<Self, String> {          if res.status() != StatusCode::OK {              return Err(format!("server returned a bad response: {:?}, response itself: {:?}", res.status(), res));          } @@ -222,7 +223,7 @@ impl RunnerClient {          })      } -    async fn wait_for_work(&mut self) -> Result<Option<RequestedJob>, WorkAcquireError> { +    async fn wait_for_work(&mut self, accepted_pushers: Option<&[String]>) -> Result<Option<RequestedJob>, WorkAcquireError> {          match self.rx.chunk().await {              Ok(Some(chunk)) => {                  eprintln!("got chunk: {:?}", &chunk); @@ -312,10 +313,16 @@ async fn main() {          .build()          .expect("can build client"); -    let allowed_pushers = None; +    let allowed_pushers: Option<Vec<String>> = None;      loop {          let (mut sender, body) = hyper::Body::channel(); + +        sender.send_data(serde_json::to_string(&json!({ +            "kind": "new_job_please", +            "accepted_pushers": &["git@iximeow.net", "me@iximeow.net"], +        })).unwrap().into()).await.expect("req"); +          let poll = client.post("https://ci.butactuallyin.space:9876/api/next_job")              .header("user-agent", "ci-butactuallyin-space-runner")              .header("authorization", &secret) @@ -333,7 +340,7 @@ async fn main() {                          continue;                      }                  }; -                let job = match client.wait_for_work().await { +                let job = match client.wait_for_work(allowed_pushers.as_ref().map(|x| x.as_ref())).await {                      Ok(Some(request)) => request,                      Ok(None) => {                          eprintln!("no work to do (yet)"); diff --git a/src/dbctx.rs b/src/dbctx.rs index d025b8d..c4eb767 100644 --- a/src/dbctx.rs +++ b/src/dbctx.rs @@ -25,6 +25,7 @@ pub struct PendingJob {      pub remote_id: u64,      pub commit_id: u64,      pub created_time: u64, +    pub source: Option<String>,  }  #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -219,7 +220,7 @@ impl DbCtx {          Ok(conn.last_insert_rowid() as u64)      } -    pub fn new_job(&self, remote_id: u64, sha: &str) -> Result<u64, String> { +    pub fn new_job(&self, remote_id: u64, sha: &str, pusher: Option<&str>) -> Result<u64, String> {          // TODO: potential race: if two remotes learn about a commit at the same time and we decide          // to create two jobs at the same time, this might return an incorrect id if the insert          // didn't actually insert a new row. @@ -233,8 +234,8 @@ impl DbCtx {          let conn = self.conn.lock().unwrap();          let rows_modified = conn.execute( -            "insert into jobs (state, remote_id, commit_id, created_time) values (?1, ?2, ?3, ?4);", -            (crate::sql::JobState::Pending as u64, remote_id, commit_id, created_time) +            "insert into jobs (state, remote_id, commit_id, created_time, source) values (?1, ?2, ?3, ?4, ?5);", +            (crate::sql::JobState::Pending as u64, remote_id, commit_id, created_time, pusher)          ).unwrap();          assert_eq!(1, rows_modified); @@ -250,12 +251,13 @@ impl DbCtx {          let mut pending = Vec::new();          while let Some(row) = jobs.next().unwrap() { -            let (id, artifacts, state, run_host, remote_id, commit_id, created_time) = row.try_into().unwrap(); +            let (id, artifacts, state, run_host, remote_id, commit_id, created_time, source) = row.try_into().unwrap();              let state: u8 = state;              pending.push(PendingJob {                  id, artifacts,                  state: state.try_into().unwrap(), -                run_host, remote_id, commit_id, created_time +                run_host, remote_id, commit_id, created_time, +                source,              });          } diff --git a/src/main.rs b/src/main.rs index 56aca01..56ad88b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -43,7 +43,7 @@ enum GithubHookError {  #[derive(Debug)]  enum GithubEvent { -    Push { tip: String, repo_name: String, head_commit: serde_json::Map<String, serde_json::Value> }, +    Push { tip: String, repo_name: String, head_commit: serde_json::Map<String, serde_json::Value>, pusher: serde_json::Map<String, serde_json::Value> },      Other {}  } @@ -73,17 +73,23 @@ fn parse_push_event(body: serde_json::Value) -> Result<GithubEvent, GithubHookEr          .ok_or(GithubHookError::BadType { path: "head_commit", expected: "obj" })?          .to_owned(); -    Ok(GithubEvent::Push { tip, repo_name, head_commit }) +    let pusher = body.get("pusher") +        .ok_or(GithubHookError::MissingElement { path: "pusher" })? +        .as_object() +        .ok_or(GithubHookError::BadType { path: "pusher", expected: "obj" })? +        .to_owned(); + +    Ok(GithubEvent::Push { tip, repo_name, head_commit, pusher })  }  async fn process_push_event(ctx: Arc<DbCtx>, owner: String, repo: String, event: GithubEvent) -> impl IntoResponse { -    let (sha, repo, head_commit) = if let GithubEvent::Push { tip, repo_name, head_commit } = event { -        (tip, repo_name, head_commit) +    let (sha, repo, head_commit, pusher) = if let GithubEvent::Push { tip, repo_name, head_commit, pusher } = event { +        (tip, repo_name, head_commit, pusher)      } else {          panic!("process push event on non-push event");      }; -    println!("handling push event to {}/{}: sha {} in repo {}, {:?}", owner, repo, sha, repo, head_commit); +    println!("handling push event to {}/{}: sha {} in repo {}, {:?}\n  pusher: {:?}", owner, repo, sha, repo, head_commit, pusher);      // push event is in terms of a ref, but we don't know if it's a new commit (yet).      // in terms of CI jobs, we care mainly about new commits. @@ -115,7 +121,13 @@ async fn process_push_event(ctx: Arc<DbCtx>, owner: String, repo: String, event:          }      }; -    let job_id = ctx.new_job(remote_id, &sha).unwrap(); +    let pusher_email = pusher +        .get("email") +        .expect("has email") +        .as_str() +        .expect("is str"); + +    let job_id = ctx.new_job(remote_id, &sha, Some(pusher_email)).unwrap();      let notifiers = ctx.notifiers_by_repo(repo_id).expect("can get notifiers"); @@ -39,7 +39,8 @@ pub const CREATE_JOBS_TABLE: &'static str = "\          created_time INTEGER,          started_time INTEGER,          complete_time INTEGER, -        job_timeout INTEGER);"; +        job_timeout INTEGER, +        source TEXT);";  pub const CREATE_COMMITS_TABLE: &'static str = "\      CREATE TABLE IF NOT EXISTS commits (id INTEGER PRIMARY KEY AUTOINCREMENT, sha TEXT UNIQUE);"; @@ -76,7 +77,7 @@ pub const CREATE_REPO_NAME_INDEX: &'static str = "\      CREATE UNIQUE INDEX IF NOT EXISTS 'repo_names' ON repos(repo_name);";  pub const PENDING_JOBS: &'static str = "\ -    select * from jobs where state=0;"; +    select id, artifacts_path, state, run_host, remote_id, commit_id, created_time, source from jobs where state=0;";  pub const COMMIT_TO_ID: &'static str = "\      select id from commits where sha=?1;"; | 
