From 4a213e872395f9b0562c113bb7303815a1d26a57 Mon Sep 17 00:00:00 2001 From: iximeow Date: Thu, 22 Dec 2022 18:29:26 +0000 Subject: draw almost all of the owl --- .gitignore | 2 + Cargo.lock | 242 ++++++++++++++++++++++++++++++++++++++- Cargo.toml | 13 +++ src/ci_ctl.rs | 184 ++++++++++++++++++++++++++++++ src/ci_driver.rs | 310 +++++++++++++++++++++++++++++++++++++++++++++----- src/ci_runner.rs | 269 +++++++++++++++++++++++++++++++++++++++++++ src/dbctx.rs | 204 +++++++++++++++++++++++++++++++++ src/main.rs | 341 ++++++++++++++++++------------------------------------- src/notifier.rs | 126 ++++++++++++++++++++ src/sql.rs | 23 ++++ 10 files changed, 1454 insertions(+), 260 deletions(-) create mode 100644 src/ci_ctl.rs create mode 100644 src/ci_runner.rs create mode 100644 src/dbctx.rs create mode 100644 src/notifier.rs diff --git a/.gitignore b/.gitignore index 6ca7663..d94cb0d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,6 @@ /target /jobs +/config state.db *.swp +auth_secret diff --git a/Cargo.lock b/Cargo.lock index 51368b5..d15e009 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -128,6 +128,15 @@ dependencies = [ ] [[package]] +name = "bstr" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba3569f383e8f1598449f1a423e72e99569137b47740b1da11ef19af3d5c3223" +dependencies = [ + "memchr", +] + +[[package]] name = "bumpalo" version = "3.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -152,6 +161,43 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] +name = "clap" +version = "4.0.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d63b9e9c07271b9957ad22c173bae2a4d9a81127680962039296abcd2f8251d" +dependencies = [ + "bitflags", + "clap_derive", + "clap_lex", + "is-terminal", + "once_cell", + "strsim", + "termcolor", +] + +[[package]] +name = "clap_derive" +version = "4.0.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0177313f9f02afc995627906bbd8967e2be069f5261954222dac78290c2b9014" +dependencies = [ + "heck", + "proc-macro-error", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "clap_lex" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d4198f73e42b4936b35b5bb248d81d2b595ecb170da0bac7655c54eedfa8da8" +dependencies = [ + "os_str_bytes", +] + +[[package]] name = "core-foundation" version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -207,6 +253,27 @@ dependencies = [ ] [[package]] +name = "errno" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f639046355ee4f37944e44f60642c6f3a7efa3cf6b78c78a0d989a8ce6c396a1" +dependencies = [ + "errno-dragonfly", + "libc", + "winapi", +] + +[[package]] +name = "errno-dragonfly" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa68f1b12764fab894d2755d2518754e71b4fd80ecfb822714a1206c2aab39bf" +dependencies = [ + "cc", + "libc", +] + +[[package]] name = "fallible-iterator" version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -273,6 +340,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04909a7a7e4633ae6c4a9ab280aeb86da1236243a77b694a49eacd659a4bd3ac" [[package]] +name = "futures-macro" +version = "0.3.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bdfb8ce053d86b91919aad980c220b1fb8401a9394410e1c289ed7e66b61835d" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] name = "futures-sink" version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -291,9 +369,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "197676987abd2f9cadff84926f410af1c183608d36641465df73ae8211dc65d6" dependencies = [ "futures-core", + "futures-macro", "futures-task", "pin-project-lite", "pin-utils", + "slab", ] [[package]] @@ -369,6 +449,12 @@ dependencies = [ ] [[package]] +name = "heck" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2540771e65fc8cb83cd6e8a237f70c319bd5c29f78ed1084ba5d50eeac86f7f9" + +[[package]] name = "hermit-abi" version = "0.1.19" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -378,6 +464,15 @@ dependencies = [ ] [[package]] +name = "hermit-abi" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee512640fe35acbfb4bb779db6f0d80704c2cacfa2e39b601ef3e3f47d1ae4c7" +dependencies = [ + "libc", +] + +[[package]] name = "hex" version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -512,12 +607,34 @@ dependencies = [ ] [[package]] +name = "io-lifetimes" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46112a93252b123d31a119a8d1a1ac19deac4fac6e0e8b0df58f0d4e5870e63c" +dependencies = [ + "libc", + "windows-sys 0.42.0", +] + +[[package]] name = "ipnet" version = "2.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f88c5561171189e69df9d98bcf18fd5f9558300f7ea7b801eb8a0fd748bd8745" [[package]] +name = "is-terminal" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "927609f78c2913a6f6ac3c27a4fe87f43e2a35367c0c4b0f8265e8f49a104330" +dependencies = [ + "hermit-abi 0.2.6", + "io-lifetimes", + "rustix", + "windows-sys 0.42.0", +] + +[[package]] name = "itoa" version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -529,20 +646,25 @@ version = "0.0.1" dependencies = [ "axum", "axum-server", + "clap", + "futures-util", "handlebars", "hex", "hmac", "http", "http-body", + "hyper", "libc", "rand", "reqwest", + "rlua", "rusqlite", "serde", "serde_derive", "serde_json", "sha2", "tokio", + "tokio-stream", "tracing", "tracing-subscriber", ] @@ -579,6 +701,12 @@ dependencies = [ ] [[package]] +name = "linux-raw-sys" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f9f08d8963a6c613f4b1a78f4f4a4dbfadf8e6545b2d72861731e4858b8b47f" + +[[package]] name = "lock_api" version = "0.4.9" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -656,12 +784,21 @@ dependencies = [ ] [[package]] +name = "num-traits" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "578ede34cf02f8924ab9447f50c28075b4d3e5b269972345e7e0372b38c6cdcd" +dependencies = [ + "autocfg", +] + +[[package]] name = "num_cpus" version = "1.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f6058e64324c71e02bc2b150e4f3bc8286db6c83092132ffa3f6b1eab0f9def5" dependencies = [ - "hermit-abi", + "hermit-abi 0.1.19", "libc", ] @@ -717,6 +854,12 @@ dependencies = [ ] [[package]] +name = "os_str_bytes" +version = "6.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b7820b9daea5457c9f21c69448905d723fbd21136ccf521748f23fd49e723ee" + +[[package]] name = "overload" version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -840,6 +983,30 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" [[package]] +name = "proc-macro-error" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" +dependencies = [ + "proc-macro-error-attr", + "proc-macro2", + "quote", + "syn", + "version_check", +] + +[[package]] +name = "proc-macro-error-attr" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" +dependencies = [ + "proc-macro2", + "quote", + "version_check", +] + +[[package]] name = "proc-macro2" version = "1.0.47" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -963,6 +1130,30 @@ dependencies = [ ] [[package]] +name = "rlua" +version = "0.19.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95b38117a836316ef62c02f6751e6d28e2eb53a1c35f0329427a9fb9c1c7b6a0" +dependencies = [ + "bitflags", + "bstr", + "libc", + "num-traits", + "rlua-lua54-sys", +] + +[[package]] +name = "rlua-lua54-sys" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23ae48797c3e76fb2c205fda8f30e28416a15b9fc1d649cc7cea9ff1fb9cf028" +dependencies = [ + "cc", + "libc", + "pkg-config", +] + +[[package]] name = "rusqlite" version = "0.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -977,6 +1168,20 @@ dependencies = [ ] [[package]] +name = "rustix" +version = "0.36.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb93e85278e08bb5788653183213d3a60fc242b10cb9be96586f5a73dcb67c23" +dependencies = [ + "bitflags", + "errno", + "io-lifetimes", + "libc", + "linux-raw-sys", + "windows-sys 0.42.0", +] + +[[package]] name = "rustls" version = "0.20.7" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1191,6 +1396,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" [[package]] +name = "strsim" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" + +[[package]] name = "subtle" version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1228,6 +1439,15 @@ dependencies = [ ] [[package]] +name = "termcolor" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bab24d30b911b2376f3a13cc2cd443142f0c81dda04c118693e35b3835757755" +dependencies = [ + "winapi-util", +] + +[[package]] name = "thiserror" version = "1.0.37" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1324,6 +1544,17 @@ dependencies = [ ] [[package]] +name = "tokio-stream" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d660770404473ccd7bc9f8b28494a811bc18542b915c0855c51e8f419d5223ce" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + +[[package]] name = "tokio-util" version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1636,6 +1867,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" [[package]] +name = "winapi-util" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" +dependencies = [ + "winapi", +] + +[[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" diff --git a/Cargo.toml b/Cargo.toml index 832d81f..1ac1665 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,9 @@ serde_derive = "*" serde_json = "*" rand = "*" tokio = { version = "*", features = ["full"] } +tokio-stream = "*" +hyper = "*" +futures-util = "*" tracing = "*" tracing-subscriber = "*" http-body = "*" @@ -25,6 +28,8 @@ hmac = "*" hex = "*" sha2 = "*" reqwest = { version = "*", features = ["rustls-tls-native-roots"] } +clap = { version = "*", features = ["derive"] } +rlua = "*" [[bin]] name = "ci_webserver" @@ -33,3 +38,11 @@ path = "src/main.rs" [[bin]] name = "ci_driver" path = "src/ci_driver.rs" + +[[bin]] +name = "ci_ctl" +path = "src/ci_ctl.rs" + +[[bin]] +name = "ci_runner" +path = "src/ci_runner.rs" diff --git a/src/ci_ctl.rs b/src/ci_ctl.rs new file mode 100644 index 0000000..3f48907 --- /dev/null +++ b/src/ci_ctl.rs @@ -0,0 +1,184 @@ +use clap::{Parser, Subcommand}; + +mod sql; +mod dbctx; +mod notifier; + +use sql::JobState; +use dbctx::DbCtx; +use notifier::NotifierConfig; + +#[derive(Parser)] +#[command(version, about, long_about = None)] +struct Args { + /// path to a database to manage (defaults to "./state.db") + db_path: Option, + + /// path to where configs should be found (defaults to "./config") + config_path: Option, + + #[command(subcommand)] + command: Command, +} + +#[derive(Subcommand)] +enum Command { + /// add _something_ to the db + Add { + #[command(subcommand)] + what: AddItem, + }, + + /// make sure that the state looks reasonable. + /// + /// currently, ensure that all notifiers have a config, that config references an existing + /// file, and that the referenced file is valid. + Validate, + + /// do something with jobs + Job { + #[command(subcommand)] + what: JobAction, + }, +} + +#[derive(Subcommand)] +enum JobAction { + List, + Rerun { + which: u32 + } +} + +#[derive(Subcommand)] +enum AddItem { + Repo { + name: String, + remote: Option, + remote_kind: Option, + config: Option, + }, + Remote { + repo_name: String, + remote: String, + remote_kind: String, + config: String, + }, +} + +fn main() { + let args = Args::parse(); + + let db_path = args.db_path.unwrap_or_else(|| "./state.db".to_owned()); + let config_path = args.config_path.unwrap_or_else(|| "./config".to_owned()); + + match args.command { + Command::Job { what } => { + match what { + JobAction::List => { + let db = DbCtx::new(&config_path, &db_path); + let mut conn = db.conn.lock().unwrap(); + let mut query = conn.prepare("select id, artifacts_path, state, commit_id, created_time from jobs;").unwrap(); + let mut jobs = query.query([]).unwrap(); + while let Some(row) = jobs.next().unwrap() { + let (id, artifacts, state, commit_id, created_time): (u64, Option, u64, u64, u64) = row.try_into().unwrap(); + + eprintln!("[+] {:04} | {: >8?} | {}", id, state, created_time); + } + eprintln!("jobs"); + }, + JobAction::Rerun { which } => { + let db = DbCtx::new(&config_path, &db_path); + db.conn.lock().unwrap().execute("update jobs set state=0 where id=?1", [which]) + .expect("works"); + eprintln!("[+] job {} set to pending", which); + } + } + }, + Command::Add { what } => { + match what { + AddItem::Repo { name, remote, remote_kind, config } => { + let remote_config = match (remote, remote_kind, config) { + (Some(remote), Some(remote_kind), Some(config_path)) => { + // do something + if remote_kind != "github" { + eprintln!("unknown remote kind: {}", remote); + return; + } + Some((remote, remote_kind, config_path)) + }, + (None, None, None) => { + None + }, + _ => { + eprintln!("when specifying a remote, `remote`, `remote_kind`, and `config_path` must either all be provided together or not at all"); + return; + } + }; + + let db = DbCtx::new(&config_path, &db_path); + let repo_id = match db.new_repo(&name) { + Ok(repo_id) => repo_id, + Err(e) => { + if e.contains("UNIQUE constraint failed") { + eprintln!("[!] repo '{}' already exists", name); + return; + } else { + eprintln!("[!] failed to create repo entry: {}", e); + return; + } + } + }; + println!("[+] new repo created: '{}' id {}", &name, repo_id); + if let Some((remote, remote_kind, config_path)) = remote_config { + let full_config_file_path = format!("{}/{}", &db.config_path, config_path); + let config = match remote_kind.as_ref() { + "github" => { + assert!(NotifierConfig::github_from_file(&full_config_file_path).is_ok()); + } + "email" => { + assert!(NotifierConfig::email_from_file(&full_config_file_path).is_ok()); + } + other => { + panic!("[-] notifiers for '{}' remotes are not supported", other); + } + }; + db.new_remote(repo_id, remote.as_str(), remote_kind.as_str(), config_path.as_str()).unwrap(); + println!("[+] new remote created: repo '{}', {} remote at {}", &name, remote_kind, remote); + } + }, + AddItem::Remote { repo_name, remote, remote_kind, config } => { + let db = DbCtx::new(&config_path, &db_path); + let repo_id = match db.repo_id_by_name(&repo_name) { + Ok(Some(id)) => id, + Ok(None) => { + eprintln!("[-] repo '{}' does not exist", repo_name); + return; + }, + Err(e) => { + eprintln!("[!] couldn't look up repo '{}': {:?}", repo_name, e); + return; + } + }; + let config_file = format!("{}/{}", config_path, config); + match remote_kind.as_ref() { + "github" => { + NotifierConfig::github_from_file(&config_file).unwrap(); + } + "email" => { + NotifierConfig::email_from_file(&config_file).unwrap(); + } + other => { + panic!("notifiers for '{}' remotes are not supported", other); + } + }; + db.new_remote(repo_id, remote.as_str(), remote_kind.as_str(), config.as_str()).unwrap(); + println!("[+] new remote created: repo '{}', {} remote at {}", &repo_name, remote_kind, remote); + }, + } + }, + Command::Validate => { + println!("ok"); + } + } +} diff --git a/src/ci_driver.rs b/src/ci_driver.rs index fd6f813..96e0d44 100644 --- a/src/ci_driver.rs +++ b/src/ci_driver.rs @@ -1,10 +1,29 @@ -use rusqlite::Connection; use std::process::Command; +use futures_util::StreamExt; +use std::fmt; use std::path::{Path, PathBuf}; +use tokio::spawn; +use tokio_stream::wrappers::ReceiverStream; +use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; +use axum_server::tls_rustls::RustlsConfig; +use axum::body::StreamBody; +use axum::http::{StatusCode}; +use axum::Router; +use axum::routing::*; +use axum::extract::State; +use axum::extract::BodyStream; +use axum::response::IntoResponse; +use tokio::sync::mpsc; +use tokio::sync::mpsc::error::TrySendError; +use serde_json::json; +mod dbctx; mod sql; +mod notifier; -use std::time::{SystemTime, UNIX_EPOCH}; +use crate::dbctx::{DbCtx, PendingJob}; +use crate::sql::JobState; fn reserve_artifacts_dir(job: u64) -> std::io::Result { let mut path: PathBuf = "/root/ixi_ci_server/jobs/".into(); @@ -13,33 +32,43 @@ fn reserve_artifacts_dir(job: u64) -> std::io::Result { Ok(path) } -fn activate_job(connection: &mut Connection, job: u64, artifacts: Option, state: u8, run_host: Option, commit_id: u64, repo_url: String, repo_name: String) { +async fn activate_job(dbctx: Arc, job: &PendingJob, clients: &mut mpsc::Receiver) -> Result<(), String> { + let connection = dbctx.conn.lock().unwrap(); + let (repo_id, remote_git_url): (u64, String) = connection + .query_row("select repo_id, remote_git_url from remotes where id=?1", [job.remote_id], |row| Ok((row.get_unwrap(0), row.get_unwrap(1)))) + .expect("query succeeds"); + let repo_name: String = connection + .query_row("select repo_name from repos where id=?1", [repo_id], |row| row.get(0)) + .expect("query succeeds"); + let now = SystemTime::now() .duration_since(UNIX_EPOCH) .expect("now is before epoch") .as_millis(); let commit_sha: String = connection - .query_row("select sha from commits where id=?1", [commit_id], |row| row.get(0)) + .query_row("select sha from commits where id=?1", [job.commit_id], |row| row.get(0)) .expect("can run query"); - let artifacts: PathBuf = match artifacts { + let artifacts = PathBuf::from("/tmp"); + /* + let artifacts: PathBuf = match &job.artifacts { Some(artifacts) => PathBuf::from(artifacts), - None => reserve_artifacts_dir(job).expect("can reserve a directory for artifacts") + None => reserve_artifacts_dir(job.id).expect("can reserve a directory for artifacts") }; - if run_host == None { + if job.run_host.as_ref() == None { eprintln!("need to find a host to run the job"); } - eprintln!("cloning {}", repo_url); + eprintln!("cloning {}", remote_git_url); let mut repo_dir = artifacts.clone(); repo_dir.push("repo"); eprintln!(" ... into {}", repo_dir.display()); Command::new("git") .arg("clone") - .arg(repo_url) + .arg(&remote_git_url) .arg(&format!("{}", repo_dir.display())) .status() .expect("can clone the repo"); @@ -48,44 +77,265 @@ fn activate_job(connection: &mut Connection, job: u64, artifacts: Option Command::new("git") .current_dir(&repo_dir) .arg("checkout") - .arg(commit_sha) + .arg(&commit_sha) .status() .expect("can checkout hash"); + */ eprintln!("running {}", repo_name); /* * find the CI script, figure out how to run it */ + let mut client_job = loop { + let mut candidate = clients.recv().await + .ok_or_else(|| "client channel disconnected".to_string())?; + + if let Ok(Some(mut client_job)) = candidate.submit(&dbctx, &job, &remote_git_url, &commit_sha).await { + break client_job; + } else { + // failed to submit job, move on for now + } + }; + + let run_host = client_job.client.name.clone(); + connection.execute( "update jobs set started_time=?1, run_host=?2, state=1, artifacts_path=?3 where id=?4", - (now as u64, "test host".to_string(), format!("{}", artifacts.display()), job) + (now as u64, run_host, format!("{}", artifacts.display()), job.id) ) .expect("can update"); + + spawn(async move { + client_job.run().await + }); + + Ok(()) } -fn main() { - let mut connection = Connection::open("/root/ixi_ci_server/state.db").unwrap(); - connection.execute(sql::CREATE_JOBS_TABLE, ()).unwrap(); - connection.execute(sql::CREATE_COMMITS_TABLE, ()).unwrap(); - connection.execute(sql::CREATE_REPOS_TABLE, ()).unwrap(); - connection.execute(sql::CREATE_REMOTES_TABLE, ()).unwrap(); +struct RunnerClient { + tx: mpsc::Sender>, + rx: BodyStream, + name: String, + build_token: String, +} - loop { - let mut pending_query = connection.prepare(sql::PENDING_JOBS).unwrap(); - let mut jobs = pending_query.query([]).unwrap(); - let mut to_start = Vec::new(); - while let Some(row) = jobs.next().unwrap() { - let (id, artifacts, state, run_host, commit_id, repo_url, repo_name): (u64, Option, u8, Option, u64, String, String)= TryInto::try_into(row).unwrap(); - to_start.push((id, artifacts, state, run_host, commit_id, repo_url, repo_name)); +fn random_name() -> String { + "random name".to_string() +} + +fn token_for_job() -> String { + "very secret token do not share".to_string() +} + +struct ClientJob { + dbctx: Arc, + remote_git_url: String, + sha: String, + job: PendingJob, + client: RunnerClient +} + +impl ClientJob { + pub async fn run(&mut self) { + loop { + let msg = self.client.recv().await.expect("recv works").expect("client sent an object"); + let msg_kind = msg.as_object().unwrap().get("kind").unwrap().as_str().unwrap(); + match msg_kind { + "job_status" => { + let state = msg.as_object().unwrap().get("state").unwrap().as_str().unwrap(); + let (result, state): (Result, JobState) = if state == "finished" { + let result = msg.as_object().unwrap().get("result").unwrap().as_str().unwrap(); + eprintln!("job update: state is {} and result is {}", state, result); + match result { + "pass" => { + (Ok("success".to_string()), JobState::Complete) + }, + other => { + (Err(other.to_string()), JobState::Error) + } + } + } else if state == "interrupted" { + let result = msg.as_object().unwrap().get("result").unwrap().as_str().unwrap(); + eprintln!("job update: state is {} and result is {}", state, result); + (Err(result.to_string()), JobState::Error) + } else { + eprintln!("job update: state is {}", state); + (Err(format!("atypical completion status: {}", state)), JobState::Invalid) + }; + + let repo_id = self.dbctx.repo_id_by_remote(self.job.remote_id).unwrap().expect("remote exists"); + + for notifier in self.dbctx.notifiers_by_repo(repo_id).expect("can get notifiers") { + notifier.tell_complete_job(&self.dbctx, repo_id, &self.sha, self.job.id, result.clone()).await.expect("can notify"); + } + + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("now is before epoch") + .as_millis(); + + self.dbctx.conn.lock().unwrap().execute( + "update jobs set complete_time=?1, state=?2 where id=?3", + (now as u64, state as u64, self.job.id) + ) + .expect("can update"); + } + "artifact_create" => { + eprintln!("creating artifact {:?}", msg); + self.client.send(serde_json::json!({ + "status": "ok", + "object_id": "10", + })).await.unwrap(); + }, + other => { + eprintln!("unhandled message kind {:?} ({:?})", msg_kind, msg); + return; + } + } + } + } +} + +impl RunnerClient { + async fn new(sender: mpsc::Sender>, resp: BodyStream) -> Result { + let name = random_name(); + let token = token_for_job(); + let client = RunnerClient { + tx: sender, + rx: resp, + name, + build_token: token, + }; + Ok(client) + } + + async fn send(&mut self, msg: serde_json::Value) -> Result<(), String> { + self.tx.send(Ok(serde_json::to_string(&msg).unwrap())) + .await + .map_err(|e| e.to_string()) + } + + async fn recv(&mut self) -> Result, String> { + match self.rx.next().await { + Some(Ok(bytes)) => { + serde_json::from_slice(&bytes) + .map(Option::Some) + .map_err(|e| e.to_string()) + }, + Some(Err(e)) => { + eprintln!("e: {:?}", e); + Err(format!("no client job: {:?}", e)) + }, + None => { + eprintln!("no more body chunks? client hung up?"); + Ok(None) + } + } + } + + async fn submit(mut self, dbctx: &Arc, job: &PendingJob, remote_git_url: &str, sha: &str) -> Result, String> { + self.send(serde_json::json!({ + "commit": sha, + "remote_url": remote_git_url, + "build_token": &self.build_token, + })).await?; + match self.recv().await { + Ok(Some(resp)) => { + if resp == serde_json::json!({ + "status": "started" + }) { + eprintln!("resp: {:?}", resp); + Ok(Some(ClientJob { + job: job.clone(), + dbctx: Arc::clone(dbctx), + sha: sha.to_string(), + remote_git_url: remote_git_url.to_string(), + client: self + })) + } else { + Err("client rejected job".to_string()) + } + } + Ok(None) => { + eprintln!("no more body chunks? client hung up?"); + Ok(None) + } + Err(e) => { + eprintln!("e: {:?}", e); + Err(e) + } + } + } +} + +impl fmt::Debug for RunnerClient { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.write_str("RunnerClient { .. }") + } +} + +async fn handle_next_job(State(ctx): State<(Arc, mpsc::Sender)>, job_resp: BodyStream) -> impl IntoResponse { + let (tx_sender, tx_receiver) = mpsc::channel(8); + let resp_body = StreamBody::new(ReceiverStream::new(tx_receiver)); + tx_sender.send(Ok("hello".to_string())).await.expect("works"); + let client = RunnerClient::new(tx_sender, job_resp).await; + match client { + Ok(client) => { + eprintln!("registering client"); + match ctx.1.try_send(client) { + Ok(()) => { + eprintln!("response established..."); + return (StatusCode::OK, resp_body); + } + Err(TrySendError::Full(client)) => { + return (StatusCode::IM_A_TEAPOT, resp_body); + } + Err(TrySendError::Closed(client)) => { + panic!("client holder is gone?"); + } + } } - std::mem::drop(jobs); - std::mem::drop(pending_query); - if to_start.len() > 0 { - println!("{} new jobs", to_start.len()); + Err(e) => { + eprintln!("unable to register client"); + return (StatusCode::MISDIRECTED_REQUEST, resp_body); + } + } +} + +async fn make_api_server(dbctx: Arc) -> (Router, mpsc::Receiver) { + let (pending_client_sender, pending_client_receiver) = mpsc::channel(8); + + let router = Router::new() + .route("/api/next_job", post(handle_next_job)) + .with_state((dbctx, pending_client_sender)); + (router, pending_client_receiver) +} + +#[tokio::main] +async fn main() { + tracing_subscriber::fmt::init(); + let config = RustlsConfig::from_pem_file( + PathBuf::from("/etc/letsencrypt/live/ci.butactuallyin.space/fullchain.pem"), + PathBuf::from("/etc/letsencrypt/live/ci.butactuallyin.space/privkey.pem"), + ).await.unwrap(); + + let dbctx = Arc::new(DbCtx::new("/root/ixi_ci_server/config", "/root/ixi_ci_server/state.db")); + + let (api_server, mut channel) = make_api_server(Arc::clone(&dbctx)).await; + spawn(axum_server::bind_rustls("0.0.0.0:9876".parse().unwrap(), config) + .serve(api_server.into_make_service())); + + dbctx.create_tables().unwrap(); + + loop { + let jobs = dbctx.get_pending_jobs().unwrap(); + + if jobs.len() > 0 { + println!("{} new jobs", jobs.len()); - for job in to_start.into_iter() { - activate_job(&mut connection, job.0, job.1, job.2, job.3, job.4, job.5, job.6); + for job in jobs.into_iter() { + activate_job(Arc::clone(&dbctx), &job, &mut channel).await; } } std::thread::sleep(std::time::Duration::from_millis(100)); diff --git a/src/ci_runner.rs b/src/ci_runner.rs new file mode 100644 index 0000000..6554f05 --- /dev/null +++ b/src/ci_runner.rs @@ -0,0 +1,269 @@ +use std::time::Duration; +use reqwest::{StatusCode, Response}; +use std::process::Command; +use serde_derive::{Deserialize, Serialize}; +use serde::{Deserialize, de::DeserializeOwned, Serialize}; + +#[derive(Debug)] +enum WorkAcquireError { + Reqwest(reqwest::Error), + EarlyEof, + Protocol(String), +} + +struct RunnerClient { + host: String, + tx: hyper::body::Sender, + rx: Response, + current_job: Option, +} + +#[derive(Debug, Serialize, Deserialize)] +struct RequestedJob { + commit: String, + remote_url: String, + build_token: String, +} + +impl RequestedJob { + // TODO: panics if hyper finds the channel is closed. hum + async fn create_artifact(&self, client: &mut RunnerClient, name: &str, desc: &str) -> Result { + let create_artifact_message = serde_json::json!({ + "kind": "artifact_create", + "name": name, + "description": desc, + "job_token": &self.build_token, + }); + client.send(create_artifact_message).await + .map_err(|e| format!("create artifact send error: {:?}", e))?; + let resp = client.recv().await + .map_err(|e| format!("create artifact recv error: {:?}", e))?; + eprintln!("resp: {:?}", resp); + let object_id = resp.unwrap() + .as_object().expect("is an object") + .get("object_id").unwrap().as_str().expect("is str") + .to_owned(); + // POST to this object id... + Ok(ArtifactStream { + object_id, + }) + } + + async fn execute_goodfile(&self, client: &mut RunnerClient) -> Result { + let clone_log = self.create_artifact(client, "git clone log", &format!("git clone {} tmpdir", &self.remote_url)).await.expect("works"); + + let clone_res = Command::new("git") + .arg("clone") + .arg(&self.remote_url) + .arg("tmpdir") + .status() + .map_err(|e| format!("failed to run git clone? {:?}", e))?; + + if !clone_res.success() { + return Err(format!("git clone failed: {:?}", clone_res)); + } + + let checkout_log = self.create_artifact(client, "git checkout log", &format!("git checkout {}", &self.commit)).await.expect("works"); + + let checkout_res = Command::new("git") + .current_dir("tmpdir") + .arg("checkout") + .arg(&self.commit) + .status() + .map_err(|e| format!("failed to run git checkout? {:?}", e))?; + + if !checkout_res.success() { + return Err(format!("git checkout failed: {:?}", checkout_res)); + } + + let build_log = self.create_artifact(client, "cargo build log", "cargo build").await.expect("works"); + + let build_res = Command::new("cargo") + .current_dir("tmpdir") + .arg("build") + .status() + .map_err(|e| format!("failed to run cargo build? {:?}", e))?; + + if !build_res.success() { + return Err(format!("cargo build failed: {:?}", build_res)); + } + + let test_log = self.create_artifact(client, "cargo test log", "cargo test").await.expect("works"); + + let test_result = Command::new("cargo") + .current_dir("tmpdir") + .arg("test") + .status() + .map_err(|e| format!("failed to run cargo test? {:?}", e))?; + + match test_result.code() { + Some(0) => Ok("pass".to_string()), + Some(n) => Ok(format!("error: {}", n)), + None => Ok(format!("abnormal exit")), + } + } +} + +struct ArtifactStream { + object_id: String, +} + +impl RunnerClient { + async fn new(host: &str, sender: hyper::body::Sender, mut res: Response) -> Result { + if res.status() != StatusCode::OK { + return Err(format!("server returned a bad response: {:?}, response itself: {:?}", res.status(), res)); + } + + let hello = res.chunk().await.expect("chunk"); + if hello.as_ref().map(|x| &x[..]) != Some(b"hello") { + return Err(format!("bad hello: {:?}", hello)); + } + + Ok(Self { + host: host.to_string(), + tx: sender, + rx: res, + current_job: None, + }) + } + + async fn wait_for_work(&mut self) -> Result, WorkAcquireError> { + match self.rx.chunk().await { + Ok(Some(chunk)) => { + eprintln!("got chunk: {:?}", &chunk); + serde_json::from_slice(&chunk) + .map(Option::Some) + .map_err(|e| { + WorkAcquireError::Protocol(format!("not json: {:?}", e)) + }) + } + Ok(None) => { + Ok(None) + }, + Err(e) => { + Err(WorkAcquireError::Reqwest(e)) + } + } + } + + async fn recv(&mut self) -> Result, String> { + self.recv_typed().await + } + + async fn recv_typed(&mut self) -> Result, String> { + match self.rx.chunk().await { + Ok(Some(chunk)) => { + serde_json::from_slice(&chunk) + .map(Option::Some) + .map_err(|e| { + format!("not json: {:?}", e) + }) + }, + Ok(None) => Ok(None), + Err(e) => { + Err(format!("error in recv: {:?}", e)) + } + } + } + + async fn send(&mut self, value: serde_json::Value) -> Result<(), String> { + self.tx.send_data( + serde_json::to_vec(&value) + .map_err(|e| format!("json error: {:?}", e))? + .into() + ).await + .map_err(|e| format!("send error: {:?}", e)) + } + + async fn run_job(&mut self, job: RequestedJob) { + self.send(serde_json::json!({ + "status": "started" + })).await.unwrap(); + + std::fs::remove_dir_all("tmpdir").unwrap(); + std::fs::create_dir("tmpdir").unwrap(); + + let res = job.execute_goodfile(self).await; + + match res { + Ok(status) => { + self.send(serde_json::json!({ + "kind": "job_status", + "state": "finished", + "result": status + })).await.unwrap(); + } + Err(status) => { + self.send(serde_json::json!({ + "kind": "job_status", + "state": "interrupted", + "result": status + })).await.unwrap(); + } + } + } +} + +#[tokio::main] +async fn main() { + let secret = std::fs::read_to_string("./auth_secret").unwrap(); + let client = reqwest::ClientBuilder::new() + .connect_timeout(Duration::from_millis(1000)) + .timeout(Duration::from_millis(600000)) + .build() + .expect("can build client"); + + loop { + let (mut sender, body) = hyper::Body::channel(); + let poll = client.post("https://ci.butactuallyin.space:9876/api/next_job") + .header("user-agent", "ci-butactuallyin-space-runner") + .header("authorization", &secret) + .body(body) + .send() + .await; + + match poll { + Ok(mut res) => { + let mut client = match RunnerClient::new("ci.butactuallyin.space:9876", sender, res).await { + Ok(client) => client, + Err(e) => { + eprintln!("failed to initialize client: {:?}", e); + std::thread::sleep(Duration::from_millis(10000)); + continue; + } + }; + let job = match client.wait_for_work().await { + Ok(Some(request)) => request, + Ok(None) => { + eprintln!("no work to do (yet)"); + std::thread::sleep(Duration::from_millis(2000)); + continue; + } + Err(e) => { + eprintln!("failed to get work: {:?}", e); + std::thread::sleep(Duration::from_millis(10000)); + continue; + } + }; + eprintln!("requested work: {:?}", job); + + eprintln!("doing {:?}", job); + client.run_job(job).await; + std::thread::sleep(Duration::from_millis(10000)); + }, + Err(e) => { + let message = format!("{}", e); + + if message.contains("tcp connect error") { + eprintln!("could not reach server. sleeping a bit and retrying."); + std::thread::sleep(Duration::from_millis(5000)); + continue; + } + + eprintln!("unhandled error: {}", message); + + std::thread::sleep(Duration::from_millis(1000)); + } + } + } +} diff --git a/src/dbctx.rs b/src/dbctx.rs new file mode 100644 index 0000000..3af2d56 --- /dev/null +++ b/src/dbctx.rs @@ -0,0 +1,204 @@ +use std::sync::Mutex; +use rusqlite::{Connection, OptionalExtension}; +use std::time::{SystemTime, UNIX_EPOCH}; + +use crate::notifier::{RemoteNotifier, NotifierConfig}; +use crate::sql; + +pub struct DbCtx { + pub config_path: String, + // don't love this but.. for now... + pub conn: Mutex, +} + +#[derive(Debug, Clone)] +pub struct PendingJob { + pub id: u64, + pub artifacts: Option, + pub state: sql::JobState, + pub run_host: Option, + pub remote_id: u64, + pub commit_id: u64, + pub created_time: u64, +} + +impl DbCtx { + pub fn new(config_path: &str, db_path: &str) -> Self { + DbCtx { + config_path: config_path.to_owned(), + conn: Mutex::new(Connection::open(db_path).unwrap()) + } + } + + pub fn create_tables(&self) -> Result<(), ()> { + let conn = self.conn.lock().unwrap(); + conn.execute(sql::CREATE_JOBS_TABLE, ()).unwrap(); + conn.execute(sql::CREATE_COMMITS_TABLE, ()).unwrap(); + conn.execute(sql::CREATE_REPOS_TABLE, ()).unwrap(); + conn.execute(sql::CREATE_REPO_NAME_INDEX, ()).unwrap(); + conn.execute(sql::CREATE_REMOTES_TABLE, ()).unwrap(); + conn.execute(sql::CREATE_REMOTES_INDEX, ()).unwrap(); + + Ok(()) + } + + pub fn new_commit(&self, sha: &str) -> Result { + let conn = self.conn.lock().unwrap(); + conn + .execute( + "insert into commits (sha) values (?1)", + [sha.clone()] + ) + .expect("can insert"); + + Ok(conn.last_insert_rowid() as u64) + } + + pub fn new_repo(&self, name: &str) -> Result { + let conn = self.conn.lock().unwrap(); + conn + .execute( + "insert into repos (repo_name) values (?1)", + [name.clone()] + ) + .map_err(|e| { + format!("{:?}", e) + })?; + + Ok(conn.last_insert_rowid() as u64) + } + + pub fn repo_id_by_remote(&self, remote_id: u64) -> Result, String> { + self.conn.lock() + .unwrap() + .query_row("select repo_id from remotes where id=?1", [remote_id], |row| row.get(0)) + .optional() + .map_err(|e| e.to_string()) + } + + pub fn repo_id_by_name(&self, repo_name: &str) -> Result, String> { + self.conn.lock() + .unwrap() + .query_row("select id from repos where repo_name=?1", [repo_name], |row| row.get(0)) + .optional() + .map_err(|e| e.to_string()) + } + + pub fn new_remote(&self, repo_id: u64, remote: &str, remote_kind: &str, config_path: &str) -> Result { + let (remote_path, remote_api, remote_url, remote_git_url) = match remote_kind { + "github" => { + (remote.to_owned(), remote_kind.to_owned(), format!("https://www.github.com/{}", remote), format!("https://www.github.com/{}.git", remote)) + }, + other => { + panic!("unsupported remote kind: {}", other); + } + }; + + let conn = self.conn.lock().unwrap(); + conn + .execute( + "insert into remotes (repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path) values (?1, ?2, ?3, ?4, ?5, ?6);", + (repo_id, remote_path, remote_api, remote_url, remote_git_url, config_path) + ) + .expect("can insert"); + + Ok(conn.last_insert_rowid() as u64) + } + + pub fn new_job(&self, remote_id: u64, sha: &str) -> Result { + // TODO: potential race: if two remotes learn about a commit at the same time and we decide + // to create two jobs at the same time, this might return an incorrect id if the insert + // didn't actually insert a new row. + let commit_id = self.new_commit(sha).expect("can create commit record"); + + let created_time = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("now is before epoch") + .as_millis() as u64; + + let conn = self.conn.lock().unwrap(); + + let rows_modified = conn.execute( + "insert into jobs (state, remote_id, commit_id, created_time) values (?1, ?2, ?3, ?4);", + (crate::sql::JobState::Pending as u64, remote_id, commit_id, created_time) + ).unwrap(); + + assert_eq!(1, rows_modified); + + Ok(conn.last_insert_rowid() as u64) + } + + pub fn get_pending_jobs(&self) -> Result, String> { + let conn = self.conn.lock().unwrap(); + + let mut pending_query = conn.prepare(sql::PENDING_JOBS).unwrap(); + let mut jobs = pending_query.query([]).unwrap(); + let mut pending = Vec::new(); + + while let Some(row) = jobs.next().unwrap() { + let (id, artifacts, state, run_host, remote_id, commit_id, created_time) = row.try_into().unwrap(); + let state: u8 = state; + pending.push(PendingJob { + id, artifacts, + state: state.try_into().unwrap(), + run_host, remote_id, commit_id, created_time + }); + } + + Ok(pending) + } + + pub fn notifiers_by_repo(&self, repo_id: u64) -> Result, String> { + #[derive(Debug)] + #[allow(dead_code)] + struct Remote { + id: u64, + repo_id: u64, + remote_path: String, + remote_api: String, + notifier_config_path: String, + } + + let mut remotes: Vec = Vec::new(); + + let conn = self.conn.lock().unwrap(); + let mut remotes_query = conn.prepare(crate::sql::REMOTES_FOR_REPO).unwrap(); + let mut remote_results = remotes_query.query([repo_id]).unwrap(); + + while let Some(row) = remote_results.next().unwrap() { + let (id, repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path) = row.try_into().unwrap(); + let _: String = remote_url; + let _: String = remote_git_url; + remotes.push(Remote { id, repo_id, remote_path, remote_api, notifier_config_path }); + } + + let mut notifiers: Vec = Vec::new(); + + for remote in remotes.into_iter() { + match remote.remote_api.as_str() { + "github" => { + let notifier = RemoteNotifier { + remote_path: remote.remote_path, + notifier: NotifierConfig::github_from_file(&format!("{}/{}", self.config_path, remote.notifier_config_path)) + .expect("can load notifier config") + }; + notifiers.push(notifier); + }, + "email" => { + let notifier = RemoteNotifier { + remote_path: remote.remote_path, + notifier: NotifierConfig::email_from_file(&format!("{}/{}", self.config_path, remote.notifier_config_path)) + .expect("can load notifier config") + }; + notifiers.push(notifier); + } + other => { + eprintln!("unknown remote api kind: {:?}, remote is {:?}", other, &remote) + } + } + } + + Ok(notifiers) + } +} + diff --git a/src/main.rs b/src/main.rs index 1b6d9e8..56aca01 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,11 +3,10 @@ use tokio::spawn; use std::path::PathBuf; -use serde_derive::{Deserialize, Serialize}; use axum_server::tls_rustls::RustlsConfig; use axum::routing::*; use axum::Router; -use axum::response::{IntoResponse, Response}; +use axum::response::{IntoResponse, Response, Html}; use std::net::SocketAddr; use axum::extract::{Path, State}; use http_body::combinators::UnsyncBoxBody; @@ -18,15 +17,22 @@ use axum::http::{StatusCode, Uri}; use http::header::HeaderMap; use std::sync::Arc; -use std::sync::Mutex; use std::time::{SystemTime, UNIX_EPOCH}; use hmac::{Hmac, Mac}; use sha2::Sha256; mod sql; +mod notifier; +mod dbctx; -use rusqlite::{Connection, OptionalExtension}; +use sql::JobState; + +use dbctx::DbCtx; + +use rusqlite::OptionalExtension; + +const PSKS: &'static [&'static [u8]] = &[]; #[derive(Copy, Clone, Debug)] enum GithubHookError { @@ -96,14 +102,22 @@ async fn process_push_event(ctx: Arc, owner: String, repo: String, event: return (StatusCode::OK, String::new()); } - let remote_url = format!("https://github.com/{}.git", repo); - let (remote_id, repo_id): (u64, u64) = ctx.conn.lock().unwrap() + let remote_url = format!("https://www.github.com/{}.git", repo); + eprintln!("looking for remote url: {}", remote_url); + let (remote_id, repo_id): (u64, u64) = match ctx.conn.lock().unwrap() .query_row("select id, repo_id from remotes where remote_git_url=?1;", [&remote_url], |row| Ok((row.get(0).unwrap(), row.get(1).unwrap()))) - .unwrap(); + .optional() + .unwrap() { + Some(elems) => elems, + None => { + eprintln!("no remote registered for url {} (repo {})", remote_url, repo); + return (StatusCode::NOT_FOUND, String::new()); + } + }; let job_id = ctx.new_job(remote_id, &sha).unwrap(); - let notifiers = ctx.notifiers_by_name(&repo).expect("can get notifiers"); + let notifiers = ctx.notifiers_by_repo(repo_id).expect("can get notifiers"); for notifier in notifiers { notifier.tell_pending_job(&ctx, repo_id, &sha, job_id).await.expect("can notify"); @@ -134,11 +148,78 @@ async fn handle_github_event(ctx: Arc, owner: String, repo: String, event async fn handle_commit_status(Path(path): Path<(String, String, String)>, State(ctx): State>) -> impl IntoResponse { eprintln!("path: {}/{}, sha {}", path.0, path.1, path.2); + let remote_path = format!("{}/{}", path.0, path.1); + let sha = path.2; + + let commit_id: Option = ctx.conn.lock().unwrap() + .query_row("select id from commits where sha=?1;", [&sha], |row| row.get(0)) + .optional() + .expect("can query"); + + let commit_id: u64 = match commit_id { + Some(commit_id) => { + commit_id + }, + None => { + return (StatusCode::NOT_FOUND, Html("no such commit".to_string())); + } + }; + + let (remote_id, repo_id): (u64, u64) = ctx.conn.lock().unwrap() + .query_row("select id, repo_id from remotes where remote_path=?1;", [&remote_path], |row| Ok((row.get_unwrap(0), row.get_unwrap(1)))) + .expect("can query"); + + let (job_id, state): (u64, u8) = ctx.conn.lock().unwrap() + .query_row("select id, state from jobs where commit_id=?1;", [commit_id], |row| Ok((row.get_unwrap(0), row.get_unwrap(1)))) + .expect("can query"); + + let state: sql::JobState = unsafe { std::mem::transmute(state) }; + + let repo_name: String = ctx.conn.lock().unwrap() + .query_row("select repo_name from repos where id=?1;", [repo_id], |row| row.get(0)) + .expect("can query"); + + let deployed = false; + let time = SystemTime::now() .duration_since(UNIX_EPOCH) .expect("now is before epoch"); - format!("requested: {:?}", path) + let resp = format!("\ + \n\ + \n\ + ci.butactuallyin.space - {}\n\ + \n\ + \n\ +
\n\
+            repo: {}\n\
+            commit: {}\n  \
+            status: {}\n  \
+            deployed: {}\n\
+            
\n\ + \n\ + \n", + repo_name, + repo_name, + &remote_path, &sha, &sha, + match state { + JobState::Pending | JobState::Started => { + "pending" + }, + JobState::Complete => { + "pass" + }, + JobState::Error => { + "pass" + } + JobState::Invalid => { + "(server error)" + } + }, + deployed, + ); + + (StatusCode::OK, Html(resp)) } async fn handle_repo_event(Path(path): Path<(String, String)>, headers: HeaderMap, State(ctx): State>, body: Bytes) -> impl IntoResponse { @@ -161,17 +242,24 @@ async fn handle_repo_event(Path(path): Path<(String, String)>, headers: HeaderMa } }; - let mut mac = Hmac::::new_from_slice(GITHUB_PSK) - .expect("hmac can be constructed"); - mac.update(&body); - let result = mac.finalize().into_bytes().to_vec(); - - // hack: skip sha256= - let decoded = hex::decode(&sent_hmac[7..]).expect("provided hmac is valid hex"); - if decoded != result { - eprintln!("bad hmac:\n\ - got: {:?}\n\ - expected: {:?}", decoded, result); + let mut hmac_ok = false; + + for psk in PSKS.iter() { + let mut mac = Hmac::::new_from_slice(psk) + .expect("hmac can be constructed"); + mac.update(&body); + let result = mac.finalize().into_bytes().to_vec(); + + // hack: skip sha256= + let decoded = hex::decode(&sent_hmac[7..]).expect("provided hmac is valid hex"); + if decoded == result { + hmac_ok = true; + break; + } + } + + if !hmac_ok { + eprintln!("bad hmac by all psks"); return (StatusCode::BAD_REQUEST, "").into_response(); } @@ -186,213 +274,8 @@ async fn handle_repo_event(Path(path): Path<(String, String)>, headers: HeaderMa handle_github_event(ctx, path.0, path.1, kind, payload).await } -struct DbCtx { - conn: Mutex, -} - -struct RemoteNotifier { - remote_path: String, - notifier: NotifierConfig, -} - -#[derive(Serialize, Deserialize)] -#[serde(untagged)] -enum NotifierConfig { - GitHub { - token: String, - }, - Email { - username: String, - password: String, - mailserver: String, - from: String, - to: String, - } -} - -impl NotifierConfig { - fn github_from_file(path: &str) -> Result { - let bytes = std::fs::read(path) - .map_err(|e| format!("can't read notifier config at {}: {:?}", path, e))?; - let config = serde_json::from_slice(&bytes) - .map_err(|e| format!("can't deserialize notifier config at {}: {:?}", path, e))?; - - if matches!(config, NotifierConfig::GitHub { .. }) { - Ok(config) - } else { - Err(format!("config at {} doesn't look like a github config (but was otherwise valid?)", path)) - } - } - - fn email_from_file(path: &str) -> Result { - let bytes = std::fs::read(path) - .map_err(|e| format!("can't read notifier config at {}: {:?}", path, e))?; - let config = serde_json::from_slice(&bytes) - .map_err(|e| format!("can't deserialize notifier config at {}: {:?}", path, e))?; - - if matches!(config, NotifierConfig::Email { .. }) { - Ok(config) - } else { - Err(format!("config at {} doesn't look like an email config (but was otherwise valid?)", path)) - } - } -} - -impl RemoteNotifier { - async fn tell_pending_job(&self, ctx: &Arc, repo_id: u64, sha: &str, job_id: u64) -> Result<(), String> { - match &self.notifier { - NotifierConfig::GitHub { token } => { - let status_info = serde_json::json!({ - "state": "pending", - "target_url": format!( - "https://{}/{}/{}", - "ci.butactuallyin.space", - &self.remote_path, - sha, - ), - "description": "build is queued", - "context": "actuallyinspace runner", - }); - - // TODO: should pool (probably in ctx?) to have an upper bound in concurrent - // connections. - let client = reqwest::Client::new(); - let res = client.post(&format!("https://api.github.com/repos/{}/statuses/{}", &self.remote_path, sha)) - .body(serde_json::to_string(&status_info).expect("can stringify json")) - .header("authorization", format!("Bearer {}", token)) - .header("accept", "application/vnd.github+json") - .send() - .await; - - match res { - Ok(res) => { - if res.status() == StatusCode::OK { - Ok(()) - } else { - Err(format!("bad response: {}, response data: {:?}", res.status().as_u16(), res)) - } - } - Err(e) => { - Err(format!("failure sending request: {:?}", e)) - } - } - } - NotifierConfig::Email { username, password, mailserver, from, to } => { - panic!("should send an email saying that a job is now pending for `sha`") - } - } - } -} - -impl DbCtx { - fn new(db_path: &'static str) -> Self { - DbCtx { - conn: Mutex::new(Connection::open(db_path).unwrap()) - } - } - - fn new_commit(&self, sha: &str) -> Result { - let conn = self.conn.lock().unwrap(); - conn - .execute( - "insert into commits (sha) values (?1)", - [sha.clone()] - ) - .expect("can insert"); - - Ok(conn.last_insert_rowid() as u64) - } - - fn new_job(&self, remote_id: u64, sha: &str) -> Result { - // TODO: potential race: if two remotes learn about a commit at the same time and we decide - // to create two jobs at the same time, this might return an incorrect id if the insert - // didn't actually insert a new row. - let commit_id = self.new_commit(sha).expect("can create commit record"); - - let created_time = SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("now is before epoch") - .as_millis() as u64; - - let conn = self.conn.lock().unwrap(); - - let rows_modified = conn.execute( - "insert into jobs (state, remote_id, commit_id, created_time) values (?1, ?2, ?3, ?4);", - (sql::JobState::Pending as u64, remote_id, commit_id, created_time) - ).unwrap(); - - assert_eq!(1, rows_modified); - - Ok(conn.last_insert_rowid() as u64) - } - - fn notifiers_by_name(&self, repo: &str) -> Result, String> { - let maybe_repo_id: Option = self.conn.lock() - .unwrap() - .query_row("select * from repos where repo_name=?1", [repo], |row| row.get(0)) - .optional() - .expect("query succeeds"); - match maybe_repo_id { - Some(repo_id) => { - // get remotes - - #[derive(Debug)] - #[allow(dead_code)] - struct Remote { - id: u64, - repo_id: u64, - remote_path: String, - remote_api: String, - notifier_config_path: String, - } - - let mut remotes: Vec = Vec::new(); - - let conn = self.conn.lock().unwrap(); - let mut remotes_query = conn.prepare(sql::REMOTES_FOR_REPO).unwrap(); - let mut remote_results = remotes_query.query([repo_id]).unwrap(); - - while let Some(row) = remote_results.next().unwrap() { - let (id, repo_id, remote_path, remote_api, notifier_config_path) = row.try_into().unwrap(); - remotes.push(Remote { id, repo_id, remote_path, remote_api, notifier_config_path }); - } - - let mut notifiers: Vec = Vec::new(); - - for remote in remotes.into_iter() { - match remote.remote_api.as_str() { - "github" => { - let notifier = RemoteNotifier { - remote_path: remote.remote_path, - notifier: NotifierConfig::github_from_file(&remote.notifier_config_path) - .expect("can load notifier config") - }; - notifiers.push(notifier); - }, - "email" => { - let notifier = RemoteNotifier { - remote_path: remote.remote_path, - notifier: NotifierConfig::email_from_file(&remote.notifier_config_path) - .expect("can load notifier config") - }; - notifiers.push(notifier); - } - other => { - eprintln!("unknown remote api kind: {:?}, remote is {:?}", other, &remote) - } - } - } - - Ok(notifiers) - } - None => { - return Err(format!("repo '{}' is not known", repo)); - } - } - } -} -async fn make_app_server(db_path: &'static str) -> Router { +async fn make_app_server(cfg_path: &'static str, db_path: &'static str) -> Router { /* // GET /hello/warp => 200 OK with body "Hello, warp!" @@ -457,7 +340,7 @@ async fn make_app_server(db_path: &'static str) -> Router { .route("/:owner/:repo/:sha", get(handle_commit_status)) .route("/:owner/:repo", post(handle_repo_event)) .fallback(fallback_get) - .with_state(Arc::new(DbCtx::new(db_path))) + .with_state(Arc::new(DbCtx::new(cfg_path, db_path))) } #[tokio::main] @@ -468,9 +351,9 @@ async fn main() { PathBuf::from("/etc/letsencrypt/live/ci.butactuallyin.space/privkey.pem"), ).await.unwrap(); spawn(axum_server::bind_rustls("127.0.0.1:8080".parse().unwrap(), config.clone()) - .serve(make_app_server("/root/ixi_ci_server/state.db").await.into_make_service())); + .serve(make_app_server("/root/ixi_ci_server/config", "/root/ixi_ci_server/state.db").await.into_make_service())); axum_server::bind_rustls("0.0.0.0:443".parse().unwrap(), config) - .serve(make_app_server("/root/ixi_ci_server/state.db").await.into_make_service()) + .serve(make_app_server("/root/ixi_ci_server/config", "/root/ixi_ci_server/state.db").await.into_make_service()) .await .unwrap(); } diff --git a/src/notifier.rs b/src/notifier.rs new file mode 100644 index 0000000..4ddc91b --- /dev/null +++ b/src/notifier.rs @@ -0,0 +1,126 @@ +use serde_derive::{Deserialize, Serialize}; +use std::sync::Arc; +use axum::http::StatusCode; + +use crate::DbCtx; + +pub struct RemoteNotifier { + pub remote_path: String, + pub notifier: NotifierConfig, +} + +#[derive(Serialize, Deserialize)] +#[serde(untagged)] +pub enum NotifierConfig { + GitHub { + token: String, + }, + Email { + username: String, + password: String, + mailserver: String, + from: String, + to: String, + } +} + +impl NotifierConfig { + pub fn github_from_file(path: &str) -> Result { + let bytes = std::fs::read(path) + .map_err(|e| format!("can't read notifier config at {}: {:?}", path, e))?; + let config = serde_json::from_slice(&bytes) + .map_err(|e| format!("can't deserialize notifier config at {}: {:?}", path, e))?; + + if matches!(config, NotifierConfig::GitHub { .. }) { + Ok(config) + } else { + Err(format!("config at {} doesn't look like a github config (but was otherwise valid?)", path)) + } + } + + pub fn email_from_file(path: &str) -> Result { + let bytes = std::fs::read(path) + .map_err(|e| format!("can't read notifier config at {}: {:?}", path, e))?; + let config = serde_json::from_slice(&bytes) + .map_err(|e| format!("can't deserialize notifier config at {}: {:?}", path, e))?; + + if matches!(config, NotifierConfig::Email { .. }) { + Ok(config) + } else { + Err(format!("config at {} doesn't look like an email config (but was otherwise valid?)", path)) + } + } +} + +impl RemoteNotifier { + pub async fn tell_pending_job(&self, ctx: &Arc, repo_id: u64, sha: &str, job_id: u64) -> Result<(), String> { + self.tell_job_status( + ctx, + repo_id, sha, job_id, + "pending", "build is queued", &format!("https://{}/{}/{}", "ci.butactuallyin.space", &self.remote_path, sha) + ).await + } + + pub async fn tell_complete_job(&self, ctx: &Arc, repo_id: u64, sha: &str, job_id: u64, desc: Result) -> Result<(), String> { + match desc { + Ok(status) => { + self.tell_job_status( + ctx, + repo_id, sha, job_id, + "success", &status, &format!("https://{}/{}/{}", "ci.butactuallyin.space", &self.remote_path, sha) + ).await + }, + Err(status) => { + self.tell_job_status( + ctx, + repo_id, sha, job_id, + "failure", &status, &format!("https://{}/{}/{}", "ci.butactuallyin.space", &self.remote_path, sha) + ).await + } + } + } + + pub async fn tell_job_status(&self, ctx: &Arc, repo_id: u64, sha: &str, job_id: u64, state: &str, desc: &str, target_url: &str) -> Result<(), String> { + match &self.notifier { + NotifierConfig::GitHub { token } => { + let status_info = serde_json::json!({ + "state": state, + "description": desc, + "target_url": target_url, + "context": "actuallyinspace runner", + }); + + // TODO: should pool (probably in ctx?) to have an upper bound in concurrent + // connections. + let client = reqwest::Client::new(); + let req = client.post(&format!("https://api.github.com/repos/{}/statuses/{}", &self.remote_path, sha)) + .body(serde_json::to_string(&status_info).expect("can stringify json")) + .header("content-type", "application/json") + .header("user-agent", "iximeow") + .header("authorization", format!("Bearer {}", token)) + .header("accept", "application/vnd.github+json"); + eprintln!("sending {:?}", req); + eprintln!(" body: {}", serde_json::to_string(&status_info).expect("can stringify json")); + let res = req + .send() + .await; + + match res { + Ok(res) => { + if res.status() == StatusCode::OK || res.status() == StatusCode::CREATED{ + Ok(()) + } else { + Err(format!("bad response: {}, response data: {:?}", res.status().as_u16(), res)) + } + } + Err(e) => { + Err(format!("failure sending request: {:?}", e)) + } + } + } + NotifierConfig::Email { username, password, mailserver, from, to } => { + panic!("should send an email saying that a job is now pending for `sha`") + } + } + } +} diff --git a/src/sql.rs b/src/sql.rs index ee334c1..80eb18a 100644 --- a/src/sql.rs +++ b/src/sql.rs @@ -1,10 +1,29 @@ #![allow(dead_code)] +use std::convert::TryFrom; + +#[derive(Debug, Clone)] pub enum JobState { Pending = 0, Started = 1, Complete = 2, Error = 3, + Invalid = 4, +} + +impl TryFrom for JobState { + type Error = String; + + fn try_from(value: u8) -> Result { + match value { + 0 => Ok(JobState::Pending), + 1 => Ok(JobState::Started), + 2 => Ok(JobState::Complete), + 3 => Ok(JobState::Error), + 4 => Ok(JobState::Invalid), + other => Err(format!("invalid job state: {}", other)), + } + } } // remote_id is the remote from which we were notified. this is necessary so we know which remote @@ -14,6 +33,7 @@ pub const CREATE_JOBS_TABLE: &'static str = "\ artifacts_path TEXT, state INTEGER NOT NULL, run_host TEXT, + build_token TEXT, remote_id INTEGER, commit_id INTEGER, created_time INTEGER, @@ -45,6 +65,9 @@ pub const CREATE_REMOTES_TABLE: &'static str = "\ pub const CREATE_REMOTES_INDEX: &'static str = "\ CREATE INDEX IF NOT EXISTS 'repo_to_remote' ON remotes(repo_id);"; +pub const CREATE_REPO_NAME_INDEX: &'static str = "\ + CREATE UNIQUE INDEX IF NOT EXISTS 'repo_names' ON repos(repo_name);"; + pub const PENDING_JOBS: &'static str = "\ select * from jobs where state=0;"; -- cgit v1.1