diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/ci_ctl.rs | 184 | ||||
-rw-r--r-- | src/ci_driver.rs | 310 | ||||
-rw-r--r-- | src/ci_runner.rs | 269 | ||||
-rw-r--r-- | src/dbctx.rs | 204 | ||||
-rw-r--r-- | src/main.rs | 341 | ||||
-rw-r--r-- | src/notifier.rs | 126 | ||||
-rw-r--r-- | src/sql.rs | 23 |
7 files changed, 1198 insertions, 259 deletions
diff --git a/src/ci_ctl.rs b/src/ci_ctl.rs new file mode 100644 index 0000000..3f48907 --- /dev/null +++ b/src/ci_ctl.rs @@ -0,0 +1,184 @@ +use clap::{Parser, Subcommand}; + +mod sql; +mod dbctx; +mod notifier; + +use sql::JobState; +use dbctx::DbCtx; +use notifier::NotifierConfig; + +#[derive(Parser)] +#[command(version, about, long_about = None)] +struct Args { + /// path to a database to manage (defaults to "./state.db") + db_path: Option<String>, + + /// path to where configs should be found (defaults to "./config") + config_path: Option<String>, + + #[command(subcommand)] + command: Command, +} + +#[derive(Subcommand)] +enum Command { + /// add _something_ to the db + Add { + #[command(subcommand)] + what: AddItem, + }, + + /// make sure that the state looks reasonable. + /// + /// currently, ensure that all notifiers have a config, that config references an existing + /// file, and that the referenced file is valid. + Validate, + + /// do something with jobs + Job { + #[command(subcommand)] + what: JobAction, + }, +} + +#[derive(Subcommand)] +enum JobAction { + List, + Rerun { + which: u32 + } +} + +#[derive(Subcommand)] +enum AddItem { + Repo { + name: String, + remote: Option<String>, + remote_kind: Option<String>, + config: Option<String>, + }, + Remote { + repo_name: String, + remote: String, + remote_kind: String, + config: String, + }, +} + +fn main() { + let args = Args::parse(); + + let db_path = args.db_path.unwrap_or_else(|| "./state.db".to_owned()); + let config_path = args.config_path.unwrap_or_else(|| "./config".to_owned()); + + match args.command { + Command::Job { what } => { + match what { + JobAction::List => { + let db = DbCtx::new(&config_path, &db_path); + let mut conn = db.conn.lock().unwrap(); + let mut query = conn.prepare("select id, artifacts_path, state, commit_id, created_time from jobs;").unwrap(); + let mut jobs = query.query([]).unwrap(); + while let Some(row) = jobs.next().unwrap() { + let (id, artifacts, state, commit_id, created_time): (u64, Option<String>, u64, u64, u64) = row.try_into().unwrap(); + + eprintln!("[+] {:04} | {: >8?} | {}", id, state, created_time); + } + eprintln!("jobs"); + }, + JobAction::Rerun { which } => { + let db = DbCtx::new(&config_path, &db_path); + db.conn.lock().unwrap().execute("update jobs set state=0 where id=?1", [which]) + .expect("works"); + eprintln!("[+] job {} set to pending", which); + } + } + }, + Command::Add { what } => { + match what { + AddItem::Repo { name, remote, remote_kind, config } => { + let remote_config = match (remote, remote_kind, config) { + (Some(remote), Some(remote_kind), Some(config_path)) => { + // do something + if remote_kind != "github" { + eprintln!("unknown remote kind: {}", remote); + return; + } + Some((remote, remote_kind, config_path)) + }, + (None, None, None) => { + None + }, + _ => { + eprintln!("when specifying a remote, `remote`, `remote_kind`, and `config_path` must either all be provided together or not at all"); + return; + } + }; + + let db = DbCtx::new(&config_path, &db_path); + let repo_id = match db.new_repo(&name) { + Ok(repo_id) => repo_id, + Err(e) => { + if e.contains("UNIQUE constraint failed") { + eprintln!("[!] repo '{}' already exists", name); + return; + } else { + eprintln!("[!] failed to create repo entry: {}", e); + return; + } + } + }; + println!("[+] new repo created: '{}' id {}", &name, repo_id); + if let Some((remote, remote_kind, config_path)) = remote_config { + let full_config_file_path = format!("{}/{}", &db.config_path, config_path); + let config = match remote_kind.as_ref() { + "github" => { + assert!(NotifierConfig::github_from_file(&full_config_file_path).is_ok()); + } + "email" => { + assert!(NotifierConfig::email_from_file(&full_config_file_path).is_ok()); + } + other => { + panic!("[-] notifiers for '{}' remotes are not supported", other); + } + }; + db.new_remote(repo_id, remote.as_str(), remote_kind.as_str(), config_path.as_str()).unwrap(); + println!("[+] new remote created: repo '{}', {} remote at {}", &name, remote_kind, remote); + } + }, + AddItem::Remote { repo_name, remote, remote_kind, config } => { + let db = DbCtx::new(&config_path, &db_path); + let repo_id = match db.repo_id_by_name(&repo_name) { + Ok(Some(id)) => id, + Ok(None) => { + eprintln!("[-] repo '{}' does not exist", repo_name); + return; + }, + Err(e) => { + eprintln!("[!] couldn't look up repo '{}': {:?}", repo_name, e); + return; + } + }; + let config_file = format!("{}/{}", config_path, config); + match remote_kind.as_ref() { + "github" => { + NotifierConfig::github_from_file(&config_file).unwrap(); + } + "email" => { + NotifierConfig::email_from_file(&config_file).unwrap(); + } + other => { + panic!("notifiers for '{}' remotes are not supported", other); + } + }; + db.new_remote(repo_id, remote.as_str(), remote_kind.as_str(), config.as_str()).unwrap(); + println!("[+] new remote created: repo '{}', {} remote at {}", &repo_name, remote_kind, remote); + }, + } + }, + Command::Validate => { + println!("ok"); + } + } +} diff --git a/src/ci_driver.rs b/src/ci_driver.rs index fd6f813..96e0d44 100644 --- a/src/ci_driver.rs +++ b/src/ci_driver.rs @@ -1,10 +1,29 @@ -use rusqlite::Connection; use std::process::Command; +use futures_util::StreamExt; +use std::fmt; use std::path::{Path, PathBuf}; +use tokio::spawn; +use tokio_stream::wrappers::ReceiverStream; +use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; +use axum_server::tls_rustls::RustlsConfig; +use axum::body::StreamBody; +use axum::http::{StatusCode}; +use axum::Router; +use axum::routing::*; +use axum::extract::State; +use axum::extract::BodyStream; +use axum::response::IntoResponse; +use tokio::sync::mpsc; +use tokio::sync::mpsc::error::TrySendError; +use serde_json::json; +mod dbctx; mod sql; +mod notifier; -use std::time::{SystemTime, UNIX_EPOCH}; +use crate::dbctx::{DbCtx, PendingJob}; +use crate::sql::JobState; fn reserve_artifacts_dir(job: u64) -> std::io::Result<PathBuf> { let mut path: PathBuf = "/root/ixi_ci_server/jobs/".into(); @@ -13,33 +32,43 @@ fn reserve_artifacts_dir(job: u64) -> std::io::Result<PathBuf> { Ok(path) } -fn activate_job(connection: &mut Connection, job: u64, artifacts: Option<String>, state: u8, run_host: Option<String>, commit_id: u64, repo_url: String, repo_name: String) { +async fn activate_job(dbctx: Arc<DbCtx>, job: &PendingJob, clients: &mut mpsc::Receiver<RunnerClient>) -> Result<(), String> { + let connection = dbctx.conn.lock().unwrap(); + let (repo_id, remote_git_url): (u64, String) = connection + .query_row("select repo_id, remote_git_url from remotes where id=?1", [job.remote_id], |row| Ok((row.get_unwrap(0), row.get_unwrap(1)))) + .expect("query succeeds"); + let repo_name: String = connection + .query_row("select repo_name from repos where id=?1", [repo_id], |row| row.get(0)) + .expect("query succeeds"); + let now = SystemTime::now() .duration_since(UNIX_EPOCH) .expect("now is before epoch") .as_millis(); let commit_sha: String = connection - .query_row("select sha from commits where id=?1", [commit_id], |row| row.get(0)) + .query_row("select sha from commits where id=?1", [job.commit_id], |row| row.get(0)) .expect("can run query"); - let artifacts: PathBuf = match artifacts { + let artifacts = PathBuf::from("/tmp"); + /* + let artifacts: PathBuf = match &job.artifacts { Some(artifacts) => PathBuf::from(artifacts), - None => reserve_artifacts_dir(job).expect("can reserve a directory for artifacts") + None => reserve_artifacts_dir(job.id).expect("can reserve a directory for artifacts") }; - if run_host == None { + if job.run_host.as_ref() == None { eprintln!("need to find a host to run the job"); } - eprintln!("cloning {}", repo_url); + eprintln!("cloning {}", remote_git_url); let mut repo_dir = artifacts.clone(); repo_dir.push("repo"); eprintln!(" ... into {}", repo_dir.display()); Command::new("git") .arg("clone") - .arg(repo_url) + .arg(&remote_git_url) .arg(&format!("{}", repo_dir.display())) .status() .expect("can clone the repo"); @@ -48,44 +77,265 @@ fn activate_job(connection: &mut Connection, job: u64, artifacts: Option<String> Command::new("git") .current_dir(&repo_dir) .arg("checkout") - .arg(commit_sha) + .arg(&commit_sha) .status() .expect("can checkout hash"); + */ eprintln!("running {}", repo_name); /* * find the CI script, figure out how to run it */ + let mut client_job = loop { + let mut candidate = clients.recv().await + .ok_or_else(|| "client channel disconnected".to_string())?; + + if let Ok(Some(mut client_job)) = candidate.submit(&dbctx, &job, &remote_git_url, &commit_sha).await { + break client_job; + } else { + // failed to submit job, move on for now + } + }; + + let run_host = client_job.client.name.clone(); + connection.execute( "update jobs set started_time=?1, run_host=?2, state=1, artifacts_path=?3 where id=?4", - (now as u64, "test host".to_string(), format!("{}", artifacts.display()), job) + (now as u64, run_host, format!("{}", artifacts.display()), job.id) ) .expect("can update"); + + spawn(async move { + client_job.run().await + }); + + Ok(()) } -fn main() { - let mut connection = Connection::open("/root/ixi_ci_server/state.db").unwrap(); - connection.execute(sql::CREATE_JOBS_TABLE, ()).unwrap(); - connection.execute(sql::CREATE_COMMITS_TABLE, ()).unwrap(); - connection.execute(sql::CREATE_REPOS_TABLE, ()).unwrap(); - connection.execute(sql::CREATE_REMOTES_TABLE, ()).unwrap(); +struct RunnerClient { + tx: mpsc::Sender<Result<String, String>>, + rx: BodyStream, + name: String, + build_token: String, +} - loop { - let mut pending_query = connection.prepare(sql::PENDING_JOBS).unwrap(); - let mut jobs = pending_query.query([]).unwrap(); - let mut to_start = Vec::new(); - while let Some(row) = jobs.next().unwrap() { - let (id, artifacts, state, run_host, commit_id, repo_url, repo_name): (u64, Option<String>, u8, Option<String>, u64, String, String)= TryInto::try_into(row).unwrap(); - to_start.push((id, artifacts, state, run_host, commit_id, repo_url, repo_name)); +fn random_name() -> String { + "random name".to_string() +} + +fn token_for_job() -> String { + "very secret token do not share".to_string() +} + +struct ClientJob { + dbctx: Arc<DbCtx>, + remote_git_url: String, + sha: String, + job: PendingJob, + client: RunnerClient +} + +impl ClientJob { + pub async fn run(&mut self) { + loop { + let msg = self.client.recv().await.expect("recv works").expect("client sent an object"); + let msg_kind = msg.as_object().unwrap().get("kind").unwrap().as_str().unwrap(); + match msg_kind { + "job_status" => { + let state = msg.as_object().unwrap().get("state").unwrap().as_str().unwrap(); + let (result, state): (Result<String, String>, JobState) = if state == "finished" { + let result = msg.as_object().unwrap().get("result").unwrap().as_str().unwrap(); + eprintln!("job update: state is {} and result is {}", state, result); + match result { + "pass" => { + (Ok("success".to_string()), JobState::Complete) + }, + other => { + (Err(other.to_string()), JobState::Error) + } + } + } else if state == "interrupted" { + let result = msg.as_object().unwrap().get("result").unwrap().as_str().unwrap(); + eprintln!("job update: state is {} and result is {}", state, result); + (Err(result.to_string()), JobState::Error) + } else { + eprintln!("job update: state is {}", state); + (Err(format!("atypical completion status: {}", state)), JobState::Invalid) + }; + + let repo_id = self.dbctx.repo_id_by_remote(self.job.remote_id).unwrap().expect("remote exists"); + + for notifier in self.dbctx.notifiers_by_repo(repo_id).expect("can get notifiers") { + notifier.tell_complete_job(&self.dbctx, repo_id, &self.sha, self.job.id, result.clone()).await.expect("can notify"); + } + + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("now is before epoch") + .as_millis(); + + self.dbctx.conn.lock().unwrap().execute( + "update jobs set complete_time=?1, state=?2 where id=?3", + (now as u64, state as u64, self.job.id) + ) + .expect("can update"); + } + "artifact_create" => { + eprintln!("creating artifact {:?}", msg); + self.client.send(serde_json::json!({ + "status": "ok", + "object_id": "10", + })).await.unwrap(); + }, + other => { + eprintln!("unhandled message kind {:?} ({:?})", msg_kind, msg); + return; + } + } + } + } +} + +impl RunnerClient { + async fn new(sender: mpsc::Sender<Result<String, String>>, resp: BodyStream) -> Result<Self, String> { + let name = random_name(); + let token = token_for_job(); + let client = RunnerClient { + tx: sender, + rx: resp, + name, + build_token: token, + }; + Ok(client) + } + + async fn send(&mut self, msg: serde_json::Value) -> Result<(), String> { + self.tx.send(Ok(serde_json::to_string(&msg).unwrap())) + .await + .map_err(|e| e.to_string()) + } + + async fn recv(&mut self) -> Result<Option<serde_json::Value>, String> { + match self.rx.next().await { + Some(Ok(bytes)) => { + serde_json::from_slice(&bytes) + .map(Option::Some) + .map_err(|e| e.to_string()) + }, + Some(Err(e)) => { + eprintln!("e: {:?}", e); + Err(format!("no client job: {:?}", e)) + }, + None => { + eprintln!("no more body chunks? client hung up?"); + Ok(None) + } + } + } + + async fn submit(mut self, dbctx: &Arc<DbCtx>, job: &PendingJob, remote_git_url: &str, sha: &str) -> Result<Option<ClientJob>, String> { + self.send(serde_json::json!({ + "commit": sha, + "remote_url": remote_git_url, + "build_token": &self.build_token, + })).await?; + match self.recv().await { + Ok(Some(resp)) => { + if resp == serde_json::json!({ + "status": "started" + }) { + eprintln!("resp: {:?}", resp); + Ok(Some(ClientJob { + job: job.clone(), + dbctx: Arc::clone(dbctx), + sha: sha.to_string(), + remote_git_url: remote_git_url.to_string(), + client: self + })) + } else { + Err("client rejected job".to_string()) + } + } + Ok(None) => { + eprintln!("no more body chunks? client hung up?"); + Ok(None) + } + Err(e) => { + eprintln!("e: {:?}", e); + Err(e) + } + } + } +} + +impl fmt::Debug for RunnerClient { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.write_str("RunnerClient { .. }") + } +} + +async fn handle_next_job(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClient>)>, job_resp: BodyStream) -> impl IntoResponse { + let (tx_sender, tx_receiver) = mpsc::channel(8); + let resp_body = StreamBody::new(ReceiverStream::new(tx_receiver)); + tx_sender.send(Ok("hello".to_string())).await.expect("works"); + let client = RunnerClient::new(tx_sender, job_resp).await; + match client { + Ok(client) => { + eprintln!("registering client"); + match ctx.1.try_send(client) { + Ok(()) => { + eprintln!("response established..."); + return (StatusCode::OK, resp_body); + } + Err(TrySendError::Full(client)) => { + return (StatusCode::IM_A_TEAPOT, resp_body); + } + Err(TrySendError::Closed(client)) => { + panic!("client holder is gone?"); + } + } } - std::mem::drop(jobs); - std::mem::drop(pending_query); - if to_start.len() > 0 { - println!("{} new jobs", to_start.len()); + Err(e) => { + eprintln!("unable to register client"); + return (StatusCode::MISDIRECTED_REQUEST, resp_body); + } + } +} + +async fn make_api_server(dbctx: Arc<DbCtx>) -> (Router, mpsc::Receiver<RunnerClient>) { + let (pending_client_sender, pending_client_receiver) = mpsc::channel(8); + + let router = Router::new() + .route("/api/next_job", post(handle_next_job)) + .with_state((dbctx, pending_client_sender)); + (router, pending_client_receiver) +} + +#[tokio::main] +async fn main() { + tracing_subscriber::fmt::init(); + let config = RustlsConfig::from_pem_file( + PathBuf::from("/etc/letsencrypt/live/ci.butactuallyin.space/fullchain.pem"), + PathBuf::from("/etc/letsencrypt/live/ci.butactuallyin.space/privkey.pem"), + ).await.unwrap(); + + let dbctx = Arc::new(DbCtx::new("/root/ixi_ci_server/config", "/root/ixi_ci_server/state.db")); + + let (api_server, mut channel) = make_api_server(Arc::clone(&dbctx)).await; + spawn(axum_server::bind_rustls("0.0.0.0:9876".parse().unwrap(), config) + .serve(api_server.into_make_service())); + + dbctx.create_tables().unwrap(); + + loop { + let jobs = dbctx.get_pending_jobs().unwrap(); + + if jobs.len() > 0 { + println!("{} new jobs", jobs.len()); - for job in to_start.into_iter() { - activate_job(&mut connection, job.0, job.1, job.2, job.3, job.4, job.5, job.6); + for job in jobs.into_iter() { + activate_job(Arc::clone(&dbctx), &job, &mut channel).await; } } std::thread::sleep(std::time::Duration::from_millis(100)); diff --git a/src/ci_runner.rs b/src/ci_runner.rs new file mode 100644 index 0000000..6554f05 --- /dev/null +++ b/src/ci_runner.rs @@ -0,0 +1,269 @@ +use std::time::Duration; +use reqwest::{StatusCode, Response}; +use std::process::Command; +use serde_derive::{Deserialize, Serialize}; +use serde::{Deserialize, de::DeserializeOwned, Serialize}; + +#[derive(Debug)] +enum WorkAcquireError { + Reqwest(reqwest::Error), + EarlyEof, + Protocol(String), +} + +struct RunnerClient { + host: String, + tx: hyper::body::Sender, + rx: Response, + current_job: Option<RequestedJob>, +} + +#[derive(Debug, Serialize, Deserialize)] +struct RequestedJob { + commit: String, + remote_url: String, + build_token: String, +} + +impl RequestedJob { + // TODO: panics if hyper finds the channel is closed. hum + async fn create_artifact(&self, client: &mut RunnerClient, name: &str, desc: &str) -> Result<ArtifactStream, String> { + let create_artifact_message = serde_json::json!({ + "kind": "artifact_create", + "name": name, + "description": desc, + "job_token": &self.build_token, + }); + client.send(create_artifact_message).await + .map_err(|e| format!("create artifact send error: {:?}", e))?; + let resp = client.recv().await + .map_err(|e| format!("create artifact recv error: {:?}", e))?; + eprintln!("resp: {:?}", resp); + let object_id = resp.unwrap() + .as_object().expect("is an object") + .get("object_id").unwrap().as_str().expect("is str") + .to_owned(); + // POST to this object id... + Ok(ArtifactStream { + object_id, + }) + } + + async fn execute_goodfile(&self, client: &mut RunnerClient) -> Result<String, String> { + let clone_log = self.create_artifact(client, "git clone log", &format!("git clone {} tmpdir", &self.remote_url)).await.expect("works"); + + let clone_res = Command::new("git") + .arg("clone") + .arg(&self.remote_url) + .arg("tmpdir") + .status() + .map_err(|e| format!("failed to run git clone? {:?}", e))?; + + if !clone_res.success() { + return Err(format!("git clone failed: {:?}", clone_res)); + } + + let checkout_log = self.create_artifact(client, "git checkout log", &format!("git checkout {}", &self.commit)).await.expect("works"); + + let checkout_res = Command::new("git") + .current_dir("tmpdir") + .arg("checkout") + .arg(&self.commit) + .status() + .map_err(|e| format!("failed to run git checkout? {:?}", e))?; + + if !checkout_res.success() { + return Err(format!("git checkout failed: {:?}", checkout_res)); + } + + let build_log = self.create_artifact(client, "cargo build log", "cargo build").await.expect("works"); + + let build_res = Command::new("cargo") + .current_dir("tmpdir") + .arg("build") + .status() + .map_err(|e| format!("failed to run cargo build? {:?}", e))?; + + if !build_res.success() { + return Err(format!("cargo build failed: {:?}", build_res)); + } + + let test_log = self.create_artifact(client, "cargo test log", "cargo test").await.expect("works"); + + let test_result = Command::new("cargo") + .current_dir("tmpdir") + .arg("test") + .status() + .map_err(|e| format!("failed to run cargo test? {:?}", e))?; + + match test_result.code() { + Some(0) => Ok("pass".to_string()), + Some(n) => Ok(format!("error: {}", n)), + None => Ok(format!("abnormal exit")), + } + } +} + +struct ArtifactStream { + object_id: String, +} + +impl RunnerClient { + async fn new(host: &str, sender: hyper::body::Sender, mut res: Response) -> Result<Self, String> { + if res.status() != StatusCode::OK { + return Err(format!("server returned a bad response: {:?}, response itself: {:?}", res.status(), res)); + } + + let hello = res.chunk().await.expect("chunk"); + if hello.as_ref().map(|x| &x[..]) != Some(b"hello") { + return Err(format!("bad hello: {:?}", hello)); + } + + Ok(Self { + host: host.to_string(), + tx: sender, + rx: res, + current_job: None, + }) + } + + async fn wait_for_work(&mut self) -> Result<Option<RequestedJob>, WorkAcquireError> { + match self.rx.chunk().await { + Ok(Some(chunk)) => { + eprintln!("got chunk: {:?}", &chunk); + serde_json::from_slice(&chunk) + .map(Option::Some) + .map_err(|e| { + WorkAcquireError::Protocol(format!("not json: {:?}", e)) + }) + } + Ok(None) => { + Ok(None) + }, + Err(e) => { + Err(WorkAcquireError::Reqwest(e)) + } + } + } + + async fn recv(&mut self) -> Result<Option<serde_json::Value>, String> { + self.recv_typed().await + } + + async fn recv_typed<T: DeserializeOwned>(&mut self) -> Result<Option<T>, String> { + match self.rx.chunk().await { + Ok(Some(chunk)) => { + serde_json::from_slice(&chunk) + .map(Option::Some) + .map_err(|e| { + format!("not json: {:?}", e) + }) + }, + Ok(None) => Ok(None), + Err(e) => { + Err(format!("error in recv: {:?}", e)) + } + } + } + + async fn send(&mut self, value: serde_json::Value) -> Result<(), String> { + self.tx.send_data( + serde_json::to_vec(&value) + .map_err(|e| format!("json error: {:?}", e))? + .into() + ).await + .map_err(|e| format!("send error: {:?}", e)) + } + + async fn run_job(&mut self, job: RequestedJob) { + self.send(serde_json::json!({ + "status": "started" + })).await.unwrap(); + + std::fs::remove_dir_all("tmpdir").unwrap(); + std::fs::create_dir("tmpdir").unwrap(); + + let res = job.execute_goodfile(self).await; + + match res { + Ok(status) => { + self.send(serde_json::json!({ + "kind": "job_status", + "state": "finished", + "result": status + })).await.unwrap(); + } + Err(status) => { + self.send(serde_json::json!({ + "kind": "job_status", + "state": "interrupted", + "result": status + })).await.unwrap(); + } + } + } +} + +#[tokio::main] +async fn main() { + let secret = std::fs::read_to_string("./auth_secret").unwrap(); + let client = reqwest::ClientBuilder::new() + .connect_timeout(Duration::from_millis(1000)) + .timeout(Duration::from_millis(600000)) + .build() + .expect("can build client"); + + loop { + let (mut sender, body) = hyper::Body::channel(); + let poll = client.post("https://ci.butactuallyin.space:9876/api/next_job") + .header("user-agent", "ci-butactuallyin-space-runner") + .header("authorization", &secret) + .body(body) + .send() + .await; + + match poll { + Ok(mut res) => { + let mut client = match RunnerClient::new("ci.butactuallyin.space:9876", sender, res).await { + Ok(client) => client, + Err(e) => { + eprintln!("failed to initialize client: {:?}", e); + std::thread::sleep(Duration::from_millis(10000)); + continue; + } + }; + let job = match client.wait_for_work().await { + Ok(Some(request)) => request, + Ok(None) => { + eprintln!("no work to do (yet)"); + std::thread::sleep(Duration::from_millis(2000)); + continue; + } + Err(e) => { + eprintln!("failed to get work: {:?}", e); + std::thread::sleep(Duration::from_millis(10000)); + continue; + } + }; + eprintln!("requested work: {:?}", job); + + eprintln!("doing {:?}", job); + client.run_job(job).await; + std::thread::sleep(Duration::from_millis(10000)); + }, + Err(e) => { + let message = format!("{}", e); + + if message.contains("tcp connect error") { + eprintln!("could not reach server. sleeping a bit and retrying."); + std::thread::sleep(Duration::from_millis(5000)); + continue; + } + + eprintln!("unhandled error: {}", message); + + std::thread::sleep(Duration::from_millis(1000)); + } + } + } +} diff --git a/src/dbctx.rs b/src/dbctx.rs new file mode 100644 index 0000000..3af2d56 --- /dev/null +++ b/src/dbctx.rs @@ -0,0 +1,204 @@ +use std::sync::Mutex; +use rusqlite::{Connection, OptionalExtension}; +use std::time::{SystemTime, UNIX_EPOCH}; + +use crate::notifier::{RemoteNotifier, NotifierConfig}; +use crate::sql; + +pub struct DbCtx { + pub config_path: String, + // don't love this but.. for now... + pub conn: Mutex<Connection>, +} + +#[derive(Debug, Clone)] +pub struct PendingJob { + pub id: u64, + pub artifacts: Option<String>, + pub state: sql::JobState, + pub run_host: Option<String>, + pub remote_id: u64, + pub commit_id: u64, + pub created_time: u64, +} + +impl DbCtx { + pub fn new(config_path: &str, db_path: &str) -> Self { + DbCtx { + config_path: config_path.to_owned(), + conn: Mutex::new(Connection::open(db_path).unwrap()) + } + } + + pub fn create_tables(&self) -> Result<(), ()> { + let conn = self.conn.lock().unwrap(); + conn.execute(sql::CREATE_JOBS_TABLE, ()).unwrap(); + conn.execute(sql::CREATE_COMMITS_TABLE, ()).unwrap(); + conn.execute(sql::CREATE_REPOS_TABLE, ()).unwrap(); + conn.execute(sql::CREATE_REPO_NAME_INDEX, ()).unwrap(); + conn.execute(sql::CREATE_REMOTES_TABLE, ()).unwrap(); + conn.execute(sql::CREATE_REMOTES_INDEX, ()).unwrap(); + + Ok(()) + } + + pub fn new_commit(&self, sha: &str) -> Result<u64, String> { + let conn = self.conn.lock().unwrap(); + conn + .execute( + "insert into commits (sha) values (?1)", + [sha.clone()] + ) + .expect("can insert"); + + Ok(conn.last_insert_rowid() as u64) + } + + pub fn new_repo(&self, name: &str) -> Result<u64, String> { + let conn = self.conn.lock().unwrap(); + conn + .execute( + "insert into repos (repo_name) values (?1)", + [name.clone()] + ) + .map_err(|e| { + format!("{:?}", e) + })?; + + Ok(conn.last_insert_rowid() as u64) + } + + pub fn repo_id_by_remote(&self, remote_id: u64) -> Result<Option<u64>, String> { + self.conn.lock() + .unwrap() + .query_row("select repo_id from remotes where id=?1", [remote_id], |row| row.get(0)) + .optional() + .map_err(|e| e.to_string()) + } + + pub fn repo_id_by_name(&self, repo_name: &str) -> Result<Option<u64>, String> { + self.conn.lock() + .unwrap() + .query_row("select id from repos where repo_name=?1", [repo_name], |row| row.get(0)) + .optional() + .map_err(|e| e.to_string()) + } + + pub fn new_remote(&self, repo_id: u64, remote: &str, remote_kind: &str, config_path: &str) -> Result<u64, String> { + let (remote_path, remote_api, remote_url, remote_git_url) = match remote_kind { + "github" => { + (remote.to_owned(), remote_kind.to_owned(), format!("https://www.github.com/{}", remote), format!("https://www.github.com/{}.git", remote)) + }, + other => { + panic!("unsupported remote kind: {}", other); + } + }; + + let conn = self.conn.lock().unwrap(); + conn + .execute( + "insert into remotes (repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path) values (?1, ?2, ?3, ?4, ?5, ?6);", + (repo_id, remote_path, remote_api, remote_url, remote_git_url, config_path) + ) + .expect("can insert"); + + Ok(conn.last_insert_rowid() as u64) + } + + pub fn new_job(&self, remote_id: u64, sha: &str) -> Result<u64, String> { + // TODO: potential race: if two remotes learn about a commit at the same time and we decide + // to create two jobs at the same time, this might return an incorrect id if the insert + // didn't actually insert a new row. + let commit_id = self.new_commit(sha).expect("can create commit record"); + + let created_time = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("now is before epoch") + .as_millis() as u64; + + let conn = self.conn.lock().unwrap(); + + let rows_modified = conn.execute( + "insert into jobs (state, remote_id, commit_id, created_time) values (?1, ?2, ?3, ?4);", + (crate::sql::JobState::Pending as u64, remote_id, commit_id, created_time) + ).unwrap(); + + assert_eq!(1, rows_modified); + + Ok(conn.last_insert_rowid() as u64) + } + + pub fn get_pending_jobs(&self) -> Result<Vec<PendingJob>, String> { + let conn = self.conn.lock().unwrap(); + + let mut pending_query = conn.prepare(sql::PENDING_JOBS).unwrap(); + let mut jobs = pending_query.query([]).unwrap(); + let mut pending = Vec::new(); + + while let Some(row) = jobs.next().unwrap() { + let (id, artifacts, state, run_host, remote_id, commit_id, created_time) = row.try_into().unwrap(); + let state: u8 = state; + pending.push(PendingJob { + id, artifacts, + state: state.try_into().unwrap(), + run_host, remote_id, commit_id, created_time + }); + } + + Ok(pending) + } + + pub fn notifiers_by_repo(&self, repo_id: u64) -> Result<Vec<RemoteNotifier>, String> { + #[derive(Debug)] + #[allow(dead_code)] + struct Remote { + id: u64, + repo_id: u64, + remote_path: String, + remote_api: String, + notifier_config_path: String, + } + + let mut remotes: Vec<Remote> = Vec::new(); + + let conn = self.conn.lock().unwrap(); + let mut remotes_query = conn.prepare(crate::sql::REMOTES_FOR_REPO).unwrap(); + let mut remote_results = remotes_query.query([repo_id]).unwrap(); + + while let Some(row) = remote_results.next().unwrap() { + let (id, repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path) = row.try_into().unwrap(); + let _: String = remote_url; + let _: String = remote_git_url; + remotes.push(Remote { id, repo_id, remote_path, remote_api, notifier_config_path }); + } + + let mut notifiers: Vec<RemoteNotifier> = Vec::new(); + + for remote in remotes.into_iter() { + match remote.remote_api.as_str() { + "github" => { + let notifier = RemoteNotifier { + remote_path: remote.remote_path, + notifier: NotifierConfig::github_from_file(&format!("{}/{}", self.config_path, remote.notifier_config_path)) + .expect("can load notifier config") + }; + notifiers.push(notifier); + }, + "email" => { + let notifier = RemoteNotifier { + remote_path: remote.remote_path, + notifier: NotifierConfig::email_from_file(&format!("{}/{}", self.config_path, remote.notifier_config_path)) + .expect("can load notifier config") + }; + notifiers.push(notifier); + } + other => { + eprintln!("unknown remote api kind: {:?}, remote is {:?}", other, &remote) + } + } + } + + Ok(notifiers) + } +} + diff --git a/src/main.rs b/src/main.rs index 1b6d9e8..56aca01 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,11 +3,10 @@ use tokio::spawn; use std::path::PathBuf; -use serde_derive::{Deserialize, Serialize}; use axum_server::tls_rustls::RustlsConfig; use axum::routing::*; use axum::Router; -use axum::response::{IntoResponse, Response}; +use axum::response::{IntoResponse, Response, Html}; use std::net::SocketAddr; use axum::extract::{Path, State}; use http_body::combinators::UnsyncBoxBody; @@ -18,15 +17,22 @@ use axum::http::{StatusCode, Uri}; use http::header::HeaderMap; use std::sync::Arc; -use std::sync::Mutex; use std::time::{SystemTime, UNIX_EPOCH}; use hmac::{Hmac, Mac}; use sha2::Sha256; mod sql; +mod notifier; +mod dbctx; -use rusqlite::{Connection, OptionalExtension}; +use sql::JobState; + +use dbctx::DbCtx; + +use rusqlite::OptionalExtension; + +const PSKS: &'static [&'static [u8]] = &[]; #[derive(Copy, Clone, Debug)] enum GithubHookError { @@ -96,14 +102,22 @@ async fn process_push_event(ctx: Arc<DbCtx>, owner: String, repo: String, event: return (StatusCode::OK, String::new()); } - let remote_url = format!("https://github.com/{}.git", repo); - let (remote_id, repo_id): (u64, u64) = ctx.conn.lock().unwrap() + let remote_url = format!("https://www.github.com/{}.git", repo); + eprintln!("looking for remote url: {}", remote_url); + let (remote_id, repo_id): (u64, u64) = match ctx.conn.lock().unwrap() .query_row("select id, repo_id from remotes where remote_git_url=?1;", [&remote_url], |row| Ok((row.get(0).unwrap(), row.get(1).unwrap()))) - .unwrap(); + .optional() + .unwrap() { + Some(elems) => elems, + None => { + eprintln!("no remote registered for url {} (repo {})", remote_url, repo); + return (StatusCode::NOT_FOUND, String::new()); + } + }; let job_id = ctx.new_job(remote_id, &sha).unwrap(); - let notifiers = ctx.notifiers_by_name(&repo).expect("can get notifiers"); + let notifiers = ctx.notifiers_by_repo(repo_id).expect("can get notifiers"); for notifier in notifiers { notifier.tell_pending_job(&ctx, repo_id, &sha, job_id).await.expect("can notify"); @@ -134,11 +148,78 @@ async fn handle_github_event(ctx: Arc<DbCtx>, owner: String, repo: String, event async fn handle_commit_status(Path(path): Path<(String, String, String)>, State(ctx): State<Arc<DbCtx>>) -> impl IntoResponse { eprintln!("path: {}/{}, sha {}", path.0, path.1, path.2); + let remote_path = format!("{}/{}", path.0, path.1); + let sha = path.2; + + let commit_id: Option<u64> = ctx.conn.lock().unwrap() + .query_row("select id from commits where sha=?1;", [&sha], |row| row.get(0)) + .optional() + .expect("can query"); + + let commit_id: u64 = match commit_id { + Some(commit_id) => { + commit_id + }, + None => { + return (StatusCode::NOT_FOUND, Html("<html><body>no such commit</body></html>".to_string())); + } + }; + + let (remote_id, repo_id): (u64, u64) = ctx.conn.lock().unwrap() + .query_row("select id, repo_id from remotes where remote_path=?1;", [&remote_path], |row| Ok((row.get_unwrap(0), row.get_unwrap(1)))) + .expect("can query"); + + let (job_id, state): (u64, u8) = ctx.conn.lock().unwrap() + .query_row("select id, state from jobs where commit_id=?1;", [commit_id], |row| Ok((row.get_unwrap(0), row.get_unwrap(1)))) + .expect("can query"); + + let state: sql::JobState = unsafe { std::mem::transmute(state) }; + + let repo_name: String = ctx.conn.lock().unwrap() + .query_row("select repo_name from repos where id=?1;", [repo_id], |row| row.get(0)) + .expect("can query"); + + let deployed = false; + let time = SystemTime::now() .duration_since(UNIX_EPOCH) .expect("now is before epoch"); - format!("requested: {:?}", path) + let resp = format!("\ + <html>\n\ + <head>\n\ + <title>ci.butactuallyin.space - {}</title>\n\ + </head>\n\ + <body>\n\ + <pre>\n\ + repo: {}\n\ + commit: <a href='https://www.github.com/{}/commit/{}'>{}</a>\n \ + status: {}\n \ + deployed: {}\n\ + </pre>\n\ + </body>\n\ + </html>\n", + repo_name, + repo_name, + &remote_path, &sha, &sha, + match state { + JobState::Pending | JobState::Started => { + "<span style='color:#660;'>pending</span>" + }, + JobState::Complete => { + "<span style='color:green;'>pass</span>" + }, + JobState::Error => { + "<span style='color:red;'>pass</span>" + } + JobState::Invalid => { + "<span style='color:red;'>(server error)</span>" + } + }, + deployed, + ); + + (StatusCode::OK, Html(resp)) } async fn handle_repo_event(Path(path): Path<(String, String)>, headers: HeaderMap, State(ctx): State<Arc<DbCtx>>, body: Bytes) -> impl IntoResponse { @@ -161,17 +242,24 @@ async fn handle_repo_event(Path(path): Path<(String, String)>, headers: HeaderMa } }; - let mut mac = Hmac::<Sha256>::new_from_slice(GITHUB_PSK) - .expect("hmac can be constructed"); - mac.update(&body); - let result = mac.finalize().into_bytes().to_vec(); - - // hack: skip sha256= - let decoded = hex::decode(&sent_hmac[7..]).expect("provided hmac is valid hex"); - if decoded != result { - eprintln!("bad hmac:\n\ - got: {:?}\n\ - expected: {:?}", decoded, result); + let mut hmac_ok = false; + + for psk in PSKS.iter() { + let mut mac = Hmac::<Sha256>::new_from_slice(psk) + .expect("hmac can be constructed"); + mac.update(&body); + let result = mac.finalize().into_bytes().to_vec(); + + // hack: skip sha256= + let decoded = hex::decode(&sent_hmac[7..]).expect("provided hmac is valid hex"); + if decoded == result { + hmac_ok = true; + break; + } + } + + if !hmac_ok { + eprintln!("bad hmac by all psks"); return (StatusCode::BAD_REQUEST, "").into_response(); } @@ -186,213 +274,8 @@ async fn handle_repo_event(Path(path): Path<(String, String)>, headers: HeaderMa handle_github_event(ctx, path.0, path.1, kind, payload).await } -struct DbCtx { - conn: Mutex<Connection>, -} - -struct RemoteNotifier { - remote_path: String, - notifier: NotifierConfig, -} - -#[derive(Serialize, Deserialize)] -#[serde(untagged)] -enum NotifierConfig { - GitHub { - token: String, - }, - Email { - username: String, - password: String, - mailserver: String, - from: String, - to: String, - } -} - -impl NotifierConfig { - fn github_from_file(path: &str) -> Result<Self, String> { - let bytes = std::fs::read(path) - .map_err(|e| format!("can't read notifier config at {}: {:?}", path, e))?; - let config = serde_json::from_slice(&bytes) - .map_err(|e| format!("can't deserialize notifier config at {}: {:?}", path, e))?; - - if matches!(config, NotifierConfig::GitHub { .. }) { - Ok(config) - } else { - Err(format!("config at {} doesn't look like a github config (but was otherwise valid?)", path)) - } - } - - fn email_from_file(path: &str) -> Result<Self, String> { - let bytes = std::fs::read(path) - .map_err(|e| format!("can't read notifier config at {}: {:?}", path, e))?; - let config = serde_json::from_slice(&bytes) - .map_err(|e| format!("can't deserialize notifier config at {}: {:?}", path, e))?; - - if matches!(config, NotifierConfig::Email { .. }) { - Ok(config) - } else { - Err(format!("config at {} doesn't look like an email config (but was otherwise valid?)", path)) - } - } -} - -impl RemoteNotifier { - async fn tell_pending_job(&self, ctx: &Arc<DbCtx>, repo_id: u64, sha: &str, job_id: u64) -> Result<(), String> { - match &self.notifier { - NotifierConfig::GitHub { token } => { - let status_info = serde_json::json!({ - "state": "pending", - "target_url": format!( - "https://{}/{}/{}", - "ci.butactuallyin.space", - &self.remote_path, - sha, - ), - "description": "build is queued", - "context": "actuallyinspace runner", - }); - - // TODO: should pool (probably in ctx?) to have an upper bound in concurrent - // connections. - let client = reqwest::Client::new(); - let res = client.post(&format!("https://api.github.com/repos/{}/statuses/{}", &self.remote_path, sha)) - .body(serde_json::to_string(&status_info).expect("can stringify json")) - .header("authorization", format!("Bearer {}", token)) - .header("accept", "application/vnd.github+json") - .send() - .await; - - match res { - Ok(res) => { - if res.status() == StatusCode::OK { - Ok(()) - } else { - Err(format!("bad response: {}, response data: {:?}", res.status().as_u16(), res)) - } - } - Err(e) => { - Err(format!("failure sending request: {:?}", e)) - } - } - } - NotifierConfig::Email { username, password, mailserver, from, to } => { - panic!("should send an email saying that a job is now pending for `sha`") - } - } - } -} - -impl DbCtx { - fn new(db_path: &'static str) -> Self { - DbCtx { - conn: Mutex::new(Connection::open(db_path).unwrap()) - } - } - - fn new_commit(&self, sha: &str) -> Result<u64, String> { - let conn = self.conn.lock().unwrap(); - conn - .execute( - "insert into commits (sha) values (?1)", - [sha.clone()] - ) - .expect("can insert"); - - Ok(conn.last_insert_rowid() as u64) - } - - fn new_job(&self, remote_id: u64, sha: &str) -> Result<u64, String> { - // TODO: potential race: if two remotes learn about a commit at the same time and we decide - // to create two jobs at the same time, this might return an incorrect id if the insert - // didn't actually insert a new row. - let commit_id = self.new_commit(sha).expect("can create commit record"); - - let created_time = SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("now is before epoch") - .as_millis() as u64; - - let conn = self.conn.lock().unwrap(); - - let rows_modified = conn.execute( - "insert into jobs (state, remote_id, commit_id, created_time) values (?1, ?2, ?3, ?4);", - (sql::JobState::Pending as u64, remote_id, commit_id, created_time) - ).unwrap(); - - assert_eq!(1, rows_modified); - - Ok(conn.last_insert_rowid() as u64) - } - - fn notifiers_by_name(&self, repo: &str) -> Result<Vec<RemoteNotifier>, String> { - let maybe_repo_id: Option<u64> = self.conn.lock() - .unwrap() - .query_row("select * from repos where repo_name=?1", [repo], |row| row.get(0)) - .optional() - .expect("query succeeds"); - match maybe_repo_id { - Some(repo_id) => { - // get remotes - - #[derive(Debug)] - #[allow(dead_code)] - struct Remote { - id: u64, - repo_id: u64, - remote_path: String, - remote_api: String, - notifier_config_path: String, - } - - let mut remotes: Vec<Remote> = Vec::new(); - - let conn = self.conn.lock().unwrap(); - let mut remotes_query = conn.prepare(sql::REMOTES_FOR_REPO).unwrap(); - let mut remote_results = remotes_query.query([repo_id]).unwrap(); - - while let Some(row) = remote_results.next().unwrap() { - let (id, repo_id, remote_path, remote_api, notifier_config_path) = row.try_into().unwrap(); - remotes.push(Remote { id, repo_id, remote_path, remote_api, notifier_config_path }); - } - - let mut notifiers: Vec<RemoteNotifier> = Vec::new(); - - for remote in remotes.into_iter() { - match remote.remote_api.as_str() { - "github" => { - let notifier = RemoteNotifier { - remote_path: remote.remote_path, - notifier: NotifierConfig::github_from_file(&remote.notifier_config_path) - .expect("can load notifier config") - }; - notifiers.push(notifier); - }, - "email" => { - let notifier = RemoteNotifier { - remote_path: remote.remote_path, - notifier: NotifierConfig::email_from_file(&remote.notifier_config_path) - .expect("can load notifier config") - }; - notifiers.push(notifier); - } - other => { - eprintln!("unknown remote api kind: {:?}, remote is {:?}", other, &remote) - } - } - } - - Ok(notifiers) - } - None => { - return Err(format!("repo '{}' is not known", repo)); - } - } - } -} -async fn make_app_server(db_path: &'static str) -> Router { +async fn make_app_server(cfg_path: &'static str, db_path: &'static str) -> Router { /* // GET /hello/warp => 200 OK with body "Hello, warp!" @@ -457,7 +340,7 @@ async fn make_app_server(db_path: &'static str) -> Router { .route("/:owner/:repo/:sha", get(handle_commit_status)) .route("/:owner/:repo", post(handle_repo_event)) .fallback(fallback_get) - .with_state(Arc::new(DbCtx::new(db_path))) + .with_state(Arc::new(DbCtx::new(cfg_path, db_path))) } #[tokio::main] @@ -468,9 +351,9 @@ async fn main() { PathBuf::from("/etc/letsencrypt/live/ci.butactuallyin.space/privkey.pem"), ).await.unwrap(); spawn(axum_server::bind_rustls("127.0.0.1:8080".parse().unwrap(), config.clone()) - .serve(make_app_server("/root/ixi_ci_server/state.db").await.into_make_service())); + .serve(make_app_server("/root/ixi_ci_server/config", "/root/ixi_ci_server/state.db").await.into_make_service())); axum_server::bind_rustls("0.0.0.0:443".parse().unwrap(), config) - .serve(make_app_server("/root/ixi_ci_server/state.db").await.into_make_service()) + .serve(make_app_server("/root/ixi_ci_server/config", "/root/ixi_ci_server/state.db").await.into_make_service()) .await .unwrap(); } diff --git a/src/notifier.rs b/src/notifier.rs new file mode 100644 index 0000000..4ddc91b --- /dev/null +++ b/src/notifier.rs @@ -0,0 +1,126 @@ +use serde_derive::{Deserialize, Serialize}; +use std::sync::Arc; +use axum::http::StatusCode; + +use crate::DbCtx; + +pub struct RemoteNotifier { + pub remote_path: String, + pub notifier: NotifierConfig, +} + +#[derive(Serialize, Deserialize)] +#[serde(untagged)] +pub enum NotifierConfig { + GitHub { + token: String, + }, + Email { + username: String, + password: String, + mailserver: String, + from: String, + to: String, + } +} + +impl NotifierConfig { + pub fn github_from_file(path: &str) -> Result<Self, String> { + let bytes = std::fs::read(path) + .map_err(|e| format!("can't read notifier config at {}: {:?}", path, e))?; + let config = serde_json::from_slice(&bytes) + .map_err(|e| format!("can't deserialize notifier config at {}: {:?}", path, e))?; + + if matches!(config, NotifierConfig::GitHub { .. }) { + Ok(config) + } else { + Err(format!("config at {} doesn't look like a github config (but was otherwise valid?)", path)) + } + } + + pub fn email_from_file(path: &str) -> Result<Self, String> { + let bytes = std::fs::read(path) + .map_err(|e| format!("can't read notifier config at {}: {:?}", path, e))?; + let config = serde_json::from_slice(&bytes) + .map_err(|e| format!("can't deserialize notifier config at {}: {:?}", path, e))?; + + if matches!(config, NotifierConfig::Email { .. }) { + Ok(config) + } else { + Err(format!("config at {} doesn't look like an email config (but was otherwise valid?)", path)) + } + } +} + +impl RemoteNotifier { + pub async fn tell_pending_job(&self, ctx: &Arc<DbCtx>, repo_id: u64, sha: &str, job_id: u64) -> Result<(), String> { + self.tell_job_status( + ctx, + repo_id, sha, job_id, + "pending", "build is queued", &format!("https://{}/{}/{}", "ci.butactuallyin.space", &self.remote_path, sha) + ).await + } + + pub async fn tell_complete_job(&self, ctx: &Arc<DbCtx>, repo_id: u64, sha: &str, job_id: u64, desc: Result<String, String>) -> Result<(), String> { + match desc { + Ok(status) => { + self.tell_job_status( + ctx, + repo_id, sha, job_id, + "success", &status, &format!("https://{}/{}/{}", "ci.butactuallyin.space", &self.remote_path, sha) + ).await + }, + Err(status) => { + self.tell_job_status( + ctx, + repo_id, sha, job_id, + "failure", &status, &format!("https://{}/{}/{}", "ci.butactuallyin.space", &self.remote_path, sha) + ).await + } + } + } + + pub async fn tell_job_status(&self, ctx: &Arc<DbCtx>, repo_id: u64, sha: &str, job_id: u64, state: &str, desc: &str, target_url: &str) -> Result<(), String> { + match &self.notifier { + NotifierConfig::GitHub { token } => { + let status_info = serde_json::json!({ + "state": state, + "description": desc, + "target_url": target_url, + "context": "actuallyinspace runner", + }); + + // TODO: should pool (probably in ctx?) to have an upper bound in concurrent + // connections. + let client = reqwest::Client::new(); + let req = client.post(&format!("https://api.github.com/repos/{}/statuses/{}", &self.remote_path, sha)) + .body(serde_json::to_string(&status_info).expect("can stringify json")) + .header("content-type", "application/json") + .header("user-agent", "iximeow") + .header("authorization", format!("Bearer {}", token)) + .header("accept", "application/vnd.github+json"); + eprintln!("sending {:?}", req); + eprintln!(" body: {}", serde_json::to_string(&status_info).expect("can stringify json")); + let res = req + .send() + .await; + + match res { + Ok(res) => { + if res.status() == StatusCode::OK || res.status() == StatusCode::CREATED{ + Ok(()) + } else { + Err(format!("bad response: {}, response data: {:?}", res.status().as_u16(), res)) + } + } + Err(e) => { + Err(format!("failure sending request: {:?}", e)) + } + } + } + NotifierConfig::Email { username, password, mailserver, from, to } => { + panic!("should send an email saying that a job is now pending for `sha`") + } + } + } +} @@ -1,10 +1,29 @@ #![allow(dead_code)] +use std::convert::TryFrom; + +#[derive(Debug, Clone)] pub enum JobState { Pending = 0, Started = 1, Complete = 2, Error = 3, + Invalid = 4, +} + +impl TryFrom<u8> for JobState { + type Error = String; + + fn try_from(value: u8) -> Result<Self, String> { + match value { + 0 => Ok(JobState::Pending), + 1 => Ok(JobState::Started), + 2 => Ok(JobState::Complete), + 3 => Ok(JobState::Error), + 4 => Ok(JobState::Invalid), + other => Err(format!("invalid job state: {}", other)), + } + } } // remote_id is the remote from which we were notified. this is necessary so we know which remote @@ -14,6 +33,7 @@ pub const CREATE_JOBS_TABLE: &'static str = "\ artifacts_path TEXT, state INTEGER NOT NULL, run_host TEXT, + build_token TEXT, remote_id INTEGER, commit_id INTEGER, created_time INTEGER, @@ -45,6 +65,9 @@ pub const CREATE_REMOTES_TABLE: &'static str = "\ pub const CREATE_REMOTES_INDEX: &'static str = "\ CREATE INDEX IF NOT EXISTS 'repo_to_remote' ON remotes(repo_id);"; +pub const CREATE_REPO_NAME_INDEX: &'static str = "\ + CREATE UNIQUE INDEX IF NOT EXISTS 'repo_names' ON repos(repo_name);"; + pub const PENDING_JOBS: &'static str = "\ select * from jobs where state=0;"; |