From 9e6906c00c49186189d211dc96e132d85e7ff641 Mon Sep 17 00:00:00 2001 From: iximeow Date: Thu, 13 Jul 2023 00:51:51 -0700 Subject: reorganize the whole thing into crates/ packages --- Cargo.lock | 98 ++++ Cargo.toml | 31 +- ci-ctl/Cargo.toml | 15 + ci-ctl/src/main.rs | 229 +++++++++ ci-driver/Cargo.toml | 30 ++ ci-driver/src/main.rs | 626 +++++++++++++++++++++++ ci-lib-core/Cargo.toml | 13 + ci-lib-core/src/dbctx.rs | 647 ++++++++++++++++++++++++ ci-lib-core/src/lib.rs | 13 + ci-lib-core/src/protocol.rs | 114 +++++ ci-lib-core/src/sql.rs | 319 ++++++++++++ ci-lib-native/Cargo.toml | 20 + ci-lib-native/src/dbctx_ext.rs | 62 +++ ci-lib-native/src/io.rs | 174 +++++++ ci-lib-native/src/lib.rs | 3 + ci-lib-native/src/notifier.rs | 181 +++++++ ci-runner/Cargo.toml | 25 + ci-runner/src/lua/mod.rs | 411 +++++++++++++++ ci-runner/src/main.rs | 668 ++++++++++++++++++++++++ ci-web-server/Cargo.toml | 31 ++ ci-web-server/src/main.rs | 1089 +++++++++++++++++++++++++++++++++++++++ src/ci_ctl.rs | 235 --------- src/ci_driver.rs | 630 ----------------------- src/ci_runner.rs | 668 ------------------------ src/dbctx.rs | 772 ---------------------------- src/io.rs | 181 ------- src/lua/mod.rs | 411 --------------- src/main.rs | 1092 ---------------------------------------- src/notifier.rs | 181 ------- src/protocol.rs | 114 ----- src/sql.rs | 229 --------- 31 files changed, 4783 insertions(+), 4529 deletions(-) create mode 100644 ci-ctl/Cargo.toml create mode 100644 ci-ctl/src/main.rs create mode 100644 ci-driver/Cargo.toml create mode 100644 ci-driver/src/main.rs create mode 100644 ci-lib-core/Cargo.toml create mode 100644 ci-lib-core/src/dbctx.rs create mode 100644 ci-lib-core/src/lib.rs create mode 100644 ci-lib-core/src/protocol.rs create mode 100644 ci-lib-core/src/sql.rs create mode 100644 ci-lib-native/Cargo.toml create mode 100644 ci-lib-native/src/dbctx_ext.rs create mode 100644 ci-lib-native/src/io.rs create mode 100644 ci-lib-native/src/lib.rs create mode 100644 ci-lib-native/src/notifier.rs create mode 100644 ci-runner/Cargo.toml create mode 100644 ci-runner/src/lua/mod.rs create mode 100644 ci-runner/src/main.rs create mode 100644 ci-web-server/Cargo.toml create mode 100644 ci-web-server/src/main.rs delete mode 100644 src/ci_ctl.rs delete mode 100644 src/ci_driver.rs delete mode 100644 src/ci_runner.rs delete mode 100644 src/dbctx.rs delete mode 100644 src/io.rs delete mode 100644 src/lua/mod.rs delete mode 100644 src/main.rs delete mode 100644 src/notifier.rs delete mode 100644 src/protocol.rs delete mode 100644 src/sql.rs diff --git a/Cargo.lock b/Cargo.lock index fdb63dc..729b789 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -312,6 +312,103 @@ dependencies = [ ] [[package]] +name = "ci-ctl" +version = "0.0.1" +dependencies = [ + "ci-lib-core", + "ci-lib-native", + "clap 4.0.29", +] + +[[package]] +name = "ci-driver" +version = "0.0.1" +dependencies = [ + "axum", + "axum-extra", + "axum-macros", + "axum-server", + "base64", + "ci-lib-core", + "ci-lib-native", + "futures-util", + "hyper", + "lazy_static", + "lettre", + "reqwest", + "serde", + "serde_json", + "tokio", + "tokio-stream", + "tracing-subscriber 0.3.16", +] + +[[package]] +name = "ci-lib-core" +version = "0.0.1" +dependencies = [ + "rusqlite", + "serde", +] + +[[package]] +name = "ci-lib-native" +version = "0.0.1" +dependencies = [ + "axum", + "ci-lib-core", + "futures-util", + "hyper", + "lettre", + "reqwest", + "serde", + "serde_json", + "tokio", +] + +[[package]] +name = "ci-runner" +version = "0.0.1" +dependencies = [ + "ci-lib-core", + "ci-lib-native", + "hyper", + "libc", + "reqwest", + "rlua", + "serde", + "serde_derive", + "serde_json", + "tokio", + "tracing", + "tracing-subscriber 0.3.16", +] + +[[package]] +name = "ci-web-server" +version = "0.0.1" +dependencies = [ + "axum", + "axum-extra", + "axum-server", + "chrono", + "ci-lib-core", + "ci-lib-native", + "hex", + "hmac", + "http", + "http-body", + "lazy_static", + "rusqlite", + "serde", + "serde_json", + "sha2", + "tokio", + "tokio-stream", + "tracing-subscriber 0.3.16", +] + +[[package]] name = "clap" version = "3.2.23" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1294,6 +1391,7 @@ version = "0.25.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "29f835d03d717946d28b1d1ed632eb6f0e24a299388ee623d0c23118d3e8a7fa" dependencies = [ + "cc", "pkg-config", "vcpkg", ] diff --git a/Cargo.toml b/Cargo.toml index 7d1a602..b81e4b8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,21 @@ authors = [ "iximeow " ] license = "0BSD" edition = "2021" +[lib] + +[workspace] +members = [ + "ci-lib-core", + "ci-lib-native", + "ci-runner", + "ci-driver", + "ci-web-server", + "ci-ctl", +] +exclude = [ + "ci-wasm-frontend" +] + [dependencies] lazy_static = "*" axum = { version = "*" } @@ -37,19 +52,3 @@ reqwest = { version = "*", features = ["rustls-tls-native-roots"] } clap = { version = "*", features = ["derive"] } rlua = "*" chrono = "*" - -[[bin]] -name = "ci_webserver" -path = "src/main.rs" - -[[bin]] -name = "ci_driver" -path = "src/ci_driver.rs" - -[[bin]] -name = "ci_ctl" -path = "src/ci_ctl.rs" - -[[bin]] -name = "ci_runner" -path = "src/ci_runner.rs" diff --git a/ci-ctl/Cargo.toml b/ci-ctl/Cargo.toml new file mode 100644 index 0000000..e2662ad --- /dev/null +++ b/ci-ctl/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "ci-ctl" +version = "0.0.1" +authors = [ "iximeow " ] +license = "0BSD" +edition = "2021" + +[[bin]] +name = "ci_ctl" +path = "src/main.rs" + +[dependencies] +ci-lib-core = { path = "../ci-lib-core" } +ci-lib-native = { path = "../ci-lib-native" } +clap = { version = "4", features = ["derive"] } diff --git a/ci-ctl/src/main.rs b/ci-ctl/src/main.rs new file mode 100644 index 0000000..bd2f733 --- /dev/null +++ b/ci-ctl/src/main.rs @@ -0,0 +1,229 @@ +use clap::{Parser, Subcommand}; + +use ci_lib_core::dbctx::DbCtx; +use ci_lib_native::notifier::NotifierConfig; + +#[derive(Parser)] +#[command(version, about, long_about = None)] +struct Args { + /// path to a database to manage (defaults to "./state.db") + db_path: Option, + + /// path to where configs should be found (defaults to "./config") + config_path: Option, + + #[command(subcommand)] + command: Command, +} + +#[derive(Subcommand)] +enum Command { + /// add _something_ to the db + Add { + #[command(subcommand)] + what: AddItem, + }, + + /// make sure that the state looks reasonable. + /// + /// currently, ensure that all notifiers have a config, that config references an existing + /// file, and that the referenced file is valid. + Validate, + + /// do something with jobs + Job { + #[command(subcommand)] + what: JobAction, + }, +} + +#[derive(Subcommand)] +enum JobAction { + List, + Rerun { + which: u32 + }, + RerunCommit { + commit: String + }, + Create { + repo: String, + commit: String, + pusher_email: String, + } +} + +#[derive(Subcommand)] +enum AddItem { + Repo { + name: String, + remote: Option, + remote_kind: Option, + config: Option, + }, + Remote { + repo_name: String, + remote: String, + remote_kind: String, + config: String, + }, +} + +fn main() { + let args = Args::parse(); + + let db_path = args.db_path.unwrap_or_else(|| "./state.db".to_owned()); + let config_path = args.config_path.unwrap_or_else(|| "./config".to_owned()); + + match args.command { + Command::Job { what } => { + match what { + JobAction::List => { + let db = DbCtx::new(&config_path, &db_path); + let mut conn = db.conn.lock().unwrap(); + let mut query = conn.prepare(ci_lib_core::sql::SELECT_ALL_RUNS_WITH_JOB_INFO).unwrap(); + let mut jobs = query.query([]).unwrap(); + while let Some(row) = jobs.next().unwrap() { + let (job_id, run_id, state, created_time, commit_id, run_preferences): (u64, u64, u64, u64, u64, Option) = row.try_into().unwrap(); + + eprint!("[+] {:04} ({:04}) | {: >8?} | {} | {}", run_id, job_id, state, created_time, commit_id); + if let Some(run_preferences) = run_preferences { + eprintln!(" | run preference: {}", run_preferences); + } else { + eprintln!(""); + } + } + eprintln!("jobs"); + }, + JobAction::Rerun { which } => { + let db = DbCtx::new(&config_path, &db_path); + let task_id = db.new_run(which as u64, None).expect("db can be queried").id; + eprintln!("[+] rerunning job {} as task {}", which, task_id); + } + JobAction::RerunCommit { commit } => { + let db = DbCtx::new(&config_path, &db_path); + let job_id = db.job_for_commit(&commit).unwrap(); + if let Some(job_id) = job_id { + let task_id = db.new_run(job_id, None).expect("db can be queried").id; + eprintln!("[+] rerunning job {} (commit {}) as task {}", job_id, commit, task_id); + } else { + eprintln!("[-] no job for commit {}", commit); + } + } + JobAction::Create { repo, commit, pusher_email } => { + let db = DbCtx::new(&config_path, &db_path); + let parts = repo.split(":").collect::>(); + let (remote_kind, repo_path) = (parts[0], parts[1]); + let remote = match db.remote_by_path_and_api(&remote_kind, &repo_path).expect("can query") { + Some(remote) => remote, + None => { + eprintln!("[-] no remote registered as {}:{}", remote_kind, repo_path); + return; + } + }; + + let repo_default_run_pref: Option = db.conn.lock().unwrap() + .query_row("select default_run_preference from repos where id=?1;", [remote.repo_id], |row| { + Ok((row.get(0)).unwrap()) + }) + .expect("can query"); + + let job_id = db.new_job(remote.id, &commit, Some(&pusher_email), repo_default_run_pref).expect("can create"); + let _ = db.new_run(job_id, None).unwrap(); + } + } + }, + Command::Add { what } => { + match what { + AddItem::Repo { name, remote, remote_kind, config } => { + let remote_config = match (remote, remote_kind, config) { + (Some(remote), Some(remote_kind), Some(config_path)) => { + // do something + if remote_kind != "github" { + eprintln!("unknown remote kind: {}", remote); + return; + } + Some((remote, remote_kind, config_path)) + }, + (None, None, None) => { + None + }, + _ => { + eprintln!("when specifying a remote, `remote`, `remote_kind`, and `config_path` must either all be provided together or not at all"); + return; + } + }; + + let db = DbCtx::new(&config_path, &db_path); + let repo_id = match db.new_repo(&name) { + Ok(repo_id) => repo_id, + Err(e) => { + if e.contains("UNIQUE constraint failed") { + eprintln!("[!] repo '{}' already exists", name); + return; + } else { + eprintln!("[!] failed to create repo entry: {}", e); + return; + } + } + }; + println!("[+] new repo created: '{}' id {}", &name, repo_id); + if let Some((remote, remote_kind, config_path)) = remote_config { + let full_config_file_path = format!("{}/{}", &db.config_path.display(), config_path); + let config = match remote_kind.as_ref() { + "github" => { + assert!(NotifierConfig::github_from_file(&full_config_file_path).is_ok()); + } + "github-email" => { + assert!(NotifierConfig::email_from_file(&full_config_file_path).is_ok()); + } + other => { + panic!("[-] notifiers for '{}' remotes are not supported", other); + } + }; + db.new_remote(repo_id, remote.as_str(), remote_kind.as_str(), config_path.as_str()).unwrap(); + println!("[+] new remote created: repo '{}', {} remote at {}", &name, remote_kind, remote); + match remote_kind.as_str() { + "github" => { + println!("[!] now go make sure your github repo has a webhook set for `https://ci.butactuallyin.space/{}` to receive at least the `push` event.", remote.as_str()); + println!(" the secret sent with calls to this webhook should be the same preshared secret as the CI server is configured to know."); + } + _ => { } + } + } + }, + AddItem::Remote { repo_name, remote, remote_kind, config } => { + let db = DbCtx::new(&config_path, &db_path); + let repo_id = match db.repo_id_by_name(&repo_name) { + Ok(Some(id)) => id, + Ok(None) => { + eprintln!("[-] repo '{}' does not exist", repo_name); + return; + }, + Err(e) => { + eprintln!("[!] couldn't look up repo '{}': {:?}", repo_name, e); + return; + } + }; + let config_file = format!("{}/{}", config_path, config); + match remote_kind.as_ref() { + "github" => { + NotifierConfig::github_from_file(&config_file).unwrap(); + } + "github-email" => { + NotifierConfig::email_from_file(&config_file).unwrap(); + } + other => { + panic!("notifiers for '{}' remotes are not supported", other); + } + }; + db.new_remote(repo_id, remote.as_str(), remote_kind.as_str(), config.as_str()).unwrap(); + println!("[+] new remote created: repo '{}', {} remote at {}", &repo_name, remote_kind, remote); + }, + } + }, + Command::Validate => { + println!("ok"); + } + } +} diff --git a/ci-driver/Cargo.toml b/ci-driver/Cargo.toml new file mode 100644 index 0000000..697f929 --- /dev/null +++ b/ci-driver/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "ci-driver" +version = "0.0.1" +authors = [ "iximeow " ] +license = "0BSD" +edition = "2021" + +[[bin]] +name = "ci_driver" +path = "src/main.rs" + +[dependencies] +ci-lib-core = { path = "../ci-lib-core" } +ci-lib-native = { path = "../ci-lib-native" } + +axum = { version = "*" } +axum-extra = { version = "*", features = ["async-read-body"] } +axum-server = { version = "*", features = ["tls-rustls"] } +axum-macros = "*" +serde_json = "*" +serde = { version = "*", features = ["derive"] } +base64 = "*" +tokio = { version = "*", features = ["full"] } +tokio-stream = "*" +tracing-subscriber = "*" +hyper = "*" +futures-util = "*" +lazy_static = "*" +lettre = "*" +reqwest = "*" diff --git a/ci-driver/src/main.rs b/ci-driver/src/main.rs new file mode 100644 index 0000000..f8ff34c --- /dev/null +++ b/ci-driver/src/main.rs @@ -0,0 +1,626 @@ +use std::process::Command; +use std::collections::HashMap; +use std::sync::{Mutex, RwLock}; +use lazy_static::lazy_static; +use std::io::Read; +use futures_util::StreamExt; +use std::fmt; +use std::path::{Path, PathBuf}; +use tokio::spawn; +use tokio_stream::wrappers::ReceiverStream; +use std::sync::{Arc, Weak}; +use std::time::{SystemTime, UNIX_EPOCH}; +use axum_server::tls_rustls::RustlsConfig; +use axum::body::StreamBody; +use axum::http::{StatusCode}; +use hyper::HeaderMap; +use axum::Router; +use axum::routing::*; +use axum::extract::State; +use axum::extract::BodyStream; +use axum::response::IntoResponse; +use tokio::sync::mpsc; +use tokio::sync::mpsc::error::TrySendError; +use serde_json::json; +use serde::{Deserialize, Serialize}; + +use ci_lib_core::dbctx::DbCtx; +use ci_lib_core::sql; +use ci_lib_core::sql::{PendingRun, Job, Run}; +use ci_lib_core::sql::JobResult; +use ci_lib_core::sql::RunState; +use ci_lib_core::protocol::{ClientProto, CommandInfo, TaskInfo, RequestedJob}; + +lazy_static! { + static ref AUTH_SECRET: RwLock> = RwLock::new(None); + static ref ACTIVE_TASKS: Mutex>> = Mutex::new(HashMap::new()); +} + +fn reserve_artifacts_dir(run: u64) -> std::io::Result { + let mut path: PathBuf = "/root/ixi_ci_server/artifacts/".into(); + path.push(run.to_string()); + match std::fs::create_dir(&path) { + Ok(()) => { + Ok(path) + }, + Err(e) => { + if e.kind() == std::io::ErrorKind::AlreadyExists { + Ok(path) + } else { + Err(e) + } + } + } +} + +async fn activate_run(dbctx: Arc, candidate: RunnerClient, job: &Job, run: &PendingRun) -> Result<(), String> { + eprintln!("activating task {:?}", run); + + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("now is before epoch") + .as_millis(); + + let remote = dbctx.remote_by_id(job.remote_id).expect("query succeeds").expect("job has remote"); + let repo = dbctx.repo_by_id(remote.repo_id).expect("query succeeds").expect("remote has repo"); + + let commit_sha = dbctx.commit_sha(job.commit_id).expect("query succeeds"); + + let artifacts: PathBuf = reserve_artifacts_dir(run.id).expect("can reserve a directory for artifacts"); + + eprintln!("running {}", &repo.name); + + let res = candidate.submit(&dbctx, &run, &remote.remote_git_url, &commit_sha).await; + + let mut client_job = match res { + Ok(Some(mut client_job)) => { client_job } + Ok(None) => { + return Err("client hung up instead of acking task".to_string()); + } + Err(e) => { + // failed to submit job, move on for now + return Err(format!("failed to submit task: {:?}", e)); + } + }; + + let host_id = client_job.client.host_id; + + let connection = dbctx.conn.lock().unwrap(); + connection.execute( + "update runs set started_time=?1, host_id=?2, state=1, artifacts_path=?3, build_token=?4 where id=?5", + (now as u64, host_id, format!("{}", artifacts.display()), &client_job.client.build_token, run.id) + ) + .expect("can update"); + std::mem::drop(connection); + + spawn(async move { + client_job.run().await + }); + + Ok(()) +} + +struct RunnerClient { + tx: mpsc::Sender>, + rx: BodyStream, + host_id: u32, + build_token: String, + accepted_sources: Option>, +} + +fn token_for_job() -> String { + let mut data = [0u8; 32]; + std::fs::File::open("/dev/urandom") + .unwrap() + .read_exact(&mut data) + .unwrap(); + + base64::encode(data) +} + +struct ClientJob { + dbctx: Arc, + remote_git_url: String, + sha: String, + task: PendingRun, + client: RunnerClient, + // exists only as confirmation this `ClientJob` is somewhere, still alive and being processed. + task_witness: Arc<()>, +} + +impl ClientJob { + pub async fn run(&mut self) { + loop { + eprintln!("waiting on response.."); + let msg = match self.client.recv_typed::().await.expect("recv works") { + Some(msg) => msg, + None => { + eprintln!("client hung up. task's done, i hope?"); + return; + } + }; + eprintln!("got {:?}", msg); + match msg { + ClientProto::NewTaskPlease { allowed_pushers, host_info } => { + eprintln!("misdirected task request (after handshake?)"); + return; + } + ClientProto::TaskStatus(task_info) => { + let (result, state): (Result, RunState) = match task_info { + TaskInfo::Finished { status } => { + eprintln!("task update: state is finished and result is {}", status); + match status.as_str() { + "pass" => { + (Ok("success".to_string()), RunState::Finished) + }, + other => { + eprintln!("unhandled task completion status: {}", other); + (Err(other.to_string()), RunState::Error) + } + } + }, + TaskInfo::Interrupted { status, description } => { + eprintln!("task update: state is interrupted and result is {}", status); + let desc = description.unwrap_or_else(|| status.clone()); + (Err(desc), RunState::Error) + } + }; + + let job = self.dbctx.job_by_id(self.task.job_id).expect("can query").expect("job exists"); + let repo_id = self.dbctx.repo_id_by_remote(job.remote_id).unwrap().expect("remote exists"); + + for notifier in ci_lib_native::dbctx_ext::notifiers_by_repo(&self.dbctx, repo_id).expect("can get notifiers") { + if let Err(e) = notifier.tell_complete_job(&self.dbctx, repo_id, &self.sha, self.task.id, result.clone()).await { + eprintln!("could not notify {:?}: {:?}", notifier.remote_path, e); + } + } + + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("now is before epoch") + .as_millis(); + + let build_result = if result.is_ok() { + JobResult::Pass + } else { + JobResult::Fail + }; + let result_desc = match result { + Ok(msg) => msg, + Err(msg) => msg, + }; + + self.dbctx.conn.lock().unwrap().execute( + "update runs set complete_time=?1, state=?2, build_result=?3, final_status=?4 where id=?5", + (now as u64, state as u64, build_result as u8, result_desc, self.task.id) + ) + .expect("can update"); + } + ClientProto::ArtifactCreate => { + eprintln!("creating artifact"); + self.client.send(serde_json::json!({ + "status": "ok", + "object_id": "10", + })).await.unwrap(); + }, + ClientProto::Metric { name, value } => { + self.dbctx.insert_metric(self.task.id, &name, &value) + .expect("TODO handle metric insert error?"); + } + ClientProto::Command(_command_info) => { + // record information about commands, start/stop, etc. probably also allow + // artifacts to be attached to commands and default to attaching stdout/stderr? + } + other => { + eprintln!("unhandled message {:?}", other); + } + } + } + } +} + +impl RunnerClient { + async fn new(sender: mpsc::Sender>, resp: BodyStream, accepted_sources: Option>, host_id: u32) -> Result { + let token = token_for_job(); + let client = RunnerClient { + tx: sender, + rx: resp, + host_id, + build_token: token, + accepted_sources, + }; + Ok(client) + } + + async fn test_connection(&mut self) -> Result<(), String> { + self.send_typed(&ClientProto::Ping).await?; + let resp = self.recv_typed::().await?; + match resp { + Some(ClientProto::Pong) => { + Ok(()) + } + Some(other) => { + Err(format!("unexpected connection test response: {:?}", other)) + } + None => { + Err("client hung up".to_string()) + } + } + } + + async fn send(&mut self, msg: serde_json::Value) -> Result<(), String> { + self.send_typed(&msg).await + } + + async fn send_typed(&mut self, msg: &T) -> Result<(), String> { + self.tx.send(Ok(serde_json::to_string(msg).unwrap())) + .await + .map_err(|e| e.to_string()) + } + + async fn recv(&mut self) -> Result, String> { + match self.rx.next().await { + Some(Ok(bytes)) => { + serde_json::from_slice(&bytes) + .map(Option::Some) + .map_err(|e| e.to_string()) + }, + Some(Err(e)) => { + eprintln!("e: {:?}", e); + Err(format!("no client job: {:?}", e)) + }, + None => { + Ok(None) + } + } + } + + async fn recv_typed(&mut self) -> Result, String> { + let json = self.recv().await?; + Ok(json.map(|v| serde_json::from_value(v).unwrap())) + } + + // is this client willing to run the job based on what it has told us so far? + fn will_accept(&self, job: &Job) -> bool { + match (job.source.as_ref(), self.accepted_sources.as_ref()) { + (_, None) => true, + (None, Some(_)) => false, + (Some(source), Some(accepted_sources)) => { + accepted_sources.contains(source) + } + } + } + + async fn submit(mut self, dbctx: &Arc, job: &PendingRun, remote_git_url: &str, sha: &str) -> Result, String> { + self.send_typed(&ClientProto::new_task(RequestedJob { + commit: sha.to_string(), + remote_url: remote_git_url.to_string(), + build_token: self.build_token.to_string(), + })).await?; + match self.recv_typed::().await { + Ok(Some(ClientProto::Started)) => { + let task_witness = Arc::new(()); + ACTIVE_TASKS.lock().unwrap().insert(job.id, Arc::downgrade(&task_witness)); + Ok(Some(ClientJob { + task: job.clone(), + dbctx: Arc::clone(dbctx), + sha: sha.to_string(), + remote_git_url: remote_git_url.to_string(), + client: self, + task_witness, + })) + } + Ok(Some(resp)) => { + eprintln!("invalid response: {:?}", resp); + Err("client rejected job".to_string()) + } + Ok(None) => { + Ok(None) + } + Err(e) => { + eprintln!("e: {:?}", e); + Err(e) + } + } + } +} + +impl fmt::Debug for RunnerClient { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.write_str("RunnerClient { .. }") + } +} + +#[axum_macros::debug_handler] +async fn handle_artifact(State(ctx): State<(Arc, mpsc::Sender)>, headers: HeaderMap, artifact_content: BodyStream) -> impl IntoResponse { + eprintln!("artifact request"); + let run_token = match headers.get("x-task-token") { + Some(run_token) => run_token.to_str().expect("valid string"), + None => { + eprintln!("bad artifact post: headers: {:?}\nno x-tasak-token", headers); + return (StatusCode::BAD_REQUEST, "").into_response(); + } + }; + + let (run, artifact_path, token_validity) = match ctx.0.run_for_token(&run_token).unwrap() { + Some(result) => result, + None => { + eprintln!("bad artifact post: headers: {:?}\nrun token is not known", headers); + return (StatusCode::BAD_REQUEST, "").into_response(); + } + }; + + if token_validity != sql::TokenValidity::Valid { + eprintln!("bad artifact post: headers: {:?}. token is not valid: {:?}", headers, token_validity); + return (StatusCode::BAD_REQUEST, "").into_response(); + } + + let artifact_path = if let Some(artifact_path) = artifact_path { + artifact_path + } else { + eprintln!("bad artifact post: headers: {:?}. no artifact path?", headers); + return (StatusCode::BAD_REQUEST, "").into_response(); + }; + + let artifact_name = match headers.get("x-artifact-name") { + Some(artifact_name) => artifact_name.to_str().expect("valid string"), + None => { + eprintln!("bad artifact post: headers: {:?}\nno x-artifact-name", headers); + return (StatusCode::BAD_REQUEST, "").into_response(); + } + }; + + let artifact_desc = match headers.get("x-artifact-desc") { + Some(artifact_desc) => artifact_desc.to_str().expect("valid string"), + None => { + eprintln!("bad artifact post: headers: {:?}\nno x-artifact-desc", headers); + return (StatusCode::BAD_REQUEST, "").into_response(); + } + }; + + let mut artifact = match ci_lib_native::dbctx_ext::reserve_artifact(&ctx.0, run, artifact_name, artifact_desc).await { + Ok(artifact) => artifact, + Err(err) => { + eprintln!("failure to reserve artifact: {:?}", err); + return (StatusCode::INTERNAL_SERVER_ERROR, "").into_response(); + } + }; + + eprintln!("spawning task..."); + let dbctx_ref = Arc::clone(&ctx.0); + spawn(async move { + artifact.store_all(artifact_content).await.unwrap(); + dbctx_ref.finalize_artifact(artifact.artifact_id).await.unwrap(); + }); + eprintln!("done?"); + + (StatusCode::OK, "").into_response() +} + +#[derive(Serialize, Deserialize)] +struct WorkRequest { + kind: String, + accepted_pushers: Option> +} + +async fn handle_next_job(State(ctx): State<(Arc, mpsc::Sender)>, headers: HeaderMap, mut job_resp: BodyStream) -> impl IntoResponse { + let auth_token = match headers.get("authorization") { + Some(token) => { + if Some(token.to_str().unwrap_or("")) != AUTH_SECRET.read().unwrap().as_ref().map(|x| &**x) { + eprintln!("BAD AUTH SECRET SUBMITTED: {:?}", token); + return (StatusCode::BAD_REQUEST, "").into_response(); + } + } + None => { + eprintln!("bad artifact post: headers: {:?}\nno authorization", headers); + return (StatusCode::BAD_REQUEST, "").into_response(); + } + }; + + let (tx_sender, tx_receiver) = mpsc::channel(8); + let resp_body = StreamBody::new(ReceiverStream::new(tx_receiver)); + tx_sender.send(Ok("hello".to_string())).await.expect("works"); + + let request = job_resp.next().await.expect("request chunk").expect("chunk exists"); + let request = std::str::from_utf8(&request).unwrap(); + let request: ClientProto = match serde_json::from_str(&request) { + Ok(v) => v, + Err(e) => { + eprintln!("couldn't parse work request: {:?}", e); + return (StatusCode::MISDIRECTED_REQUEST, resp_body).into_response(); + } + }; + let (accepted_pushers, host_info) = match request { + ClientProto::NewTaskPlease { allowed_pushers, host_info } => (allowed_pushers, host_info), + other => { + eprintln!("bad request kind: {:?}", &other); + return (StatusCode::MISDIRECTED_REQUEST, resp_body).into_response(); + } + }; + + eprintln!("client identifies itself as {:?}", host_info); + + let host_info_id = ctx.0.id_for_host(&host_info).expect("can get a host info id"); + + let client = match RunnerClient::new(tx_sender, job_resp, accepted_pushers, host_info_id).await { + Ok(v) => v, + Err(e) => { + eprintln!("unable to register client"); + return (StatusCode::MISDIRECTED_REQUEST, resp_body).into_response(); + } + }; + + match ctx.1.try_send(client) { + Ok(()) => { + eprintln!("client requested work..."); + return (StatusCode::OK, resp_body).into_response(); + } + Err(TrySendError::Full(client)) => { + return (StatusCode::IM_A_TEAPOT, resp_body).into_response(); + } + Err(TrySendError::Closed(client)) => { + panic!("client holder is gone?"); + } + } +} + +async fn make_api_server(dbctx: Arc) -> (Router, mpsc::Receiver) { + let (pending_client_sender, pending_client_receiver) = mpsc::channel(8); + + let router = Router::new() + .route("/api/next_job", post(handle_next_job)) + .route("/api/artifact", post(handle_artifact)) + .with_state((dbctx, pending_client_sender)); + (router, pending_client_receiver) +} + +#[derive(Deserialize, Serialize)] +struct DriverConfig { + cert_path: PathBuf, + key_path: PathBuf, + config_path: PathBuf, + db_path: PathBuf, + server_addr: String, + auth_secret: String, +} + +#[tokio::main] +async fn main() { + tracing_subscriber::fmt::init(); + + let mut args = std::env::args(); + args.next().expect("first arg exists"); + let config_path = args.next().unwrap_or("./driver_config.json".to_string()); + let driver_config: DriverConfig = serde_json::from_reader(std::fs::File::open(config_path).expect("file exists and is accessible")).expect("valid json for DriverConfig"); + let mut auth_secret = AUTH_SECRET.write().unwrap(); + *auth_secret = Some(driver_config.auth_secret.clone()); + std::mem::drop(auth_secret); + + let config = RustlsConfig::from_pem_file( + driver_config.cert_path.clone(), + driver_config.key_path.clone(), + ).await.unwrap(); + + let dbctx = Arc::new(DbCtx::new(&driver_config.config_path, &driver_config.db_path)); + + dbctx.create_tables().unwrap(); + + let (api_server, mut channel) = make_api_server(Arc::clone(&dbctx)).await; + spawn(axum_server::bind_rustls(driver_config.server_addr.parse().unwrap(), config) + .serve(api_server.into_make_service())); + + spawn(old_task_reaper(Arc::clone(&dbctx))); + + loop { + let mut candidate = match channel.recv().await + .ok_or_else(|| "client channel disconnected".to_string()) { + + Ok(candidate) => { candidate }, + Err(e) => { eprintln!("client error: {}", e); continue; } + }; + + let dbctx = Arc::clone(&dbctx); + spawn(async move { + let host_id = candidate.host_id; + let res = find_client_task(dbctx, candidate).await; + eprintln!("task client for {}: {:?}", host_id, res); + }); + } +} + +async fn find_client_task(dbctx: Arc, mut candidate: RunnerClient) -> Result<(), String> { + let find_client_task_start = std::time::Instant::now(); + + let (run, job) = 'find_work: loop { + // try to find a job for this candidate: + // * start with pending runs - these need *some* client to run them, but do not care which + // * if no new jobs, maybe an existing job still needs a rerun on this client? + // * otherwise, um, i dunno. do nothing? + + let runs = dbctx.get_pending_runs(Some(candidate.host_id)).unwrap(); + + if runs.len() > 0 { + println!("{} new runs", runs.len()); + + for run in runs.into_iter() { + let job = dbctx.job_by_id(run.job_id).expect("can query").expect("job exists"); + + if candidate.will_accept(&job) { + break 'find_work (run, job); + } + + } + } + + let alt_run_jobs = dbctx.jobs_needing_task_runs_for_host(candidate.host_id as u64).expect("can query"); + + for job in alt_run_jobs.into_iter() { + if candidate.will_accept(&job) { + let run = dbctx.new_run(job.id, Some(candidate.host_id)).unwrap(); + break 'find_work (run, job); + } + } + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + if candidate.test_connection().await.is_err() { + return Err("lost client connection".to_string()); + } + + if find_client_task_start.elapsed().as_secs() > 300 { + return Err("5min new task deadline elapsed".to_string()); + } + }; + + eprintln!("enqueueing job {} for alternate run under host id {}", job.id, candidate.host_id); + activate_run(Arc::clone(&dbctx), candidate, &job, &run).await?; + + Ok(()) +} + +async fn old_task_reaper(dbctx: Arc) { + let mut potentially_stale_tasks = dbctx.get_active_runs().unwrap(); + + let active_tasks = ACTIVE_TASKS.lock().unwrap(); + + for (id, witness) in active_tasks.iter() { + if let Some(idx) = potentially_stale_tasks.iter().position(|task| task.id == *id) { + potentially_stale_tasks.swap_remove(idx); + } + } + + std::mem::drop(active_tasks); + + // ok, so we have tasks that are not active, now if the task is started we know someone should + // be running it and they are not. retain only those tasks, as they are ones we may want to + // mark dead. + // + // further, filter out any tasks created in the last 60 seconds. this is a VERY generous grace + // period for clients that have accepted a job but for some reason we have not recorded them as + // active (perhaps they are slow to ack somehow). + + let stale_threshold = ci_lib_core::now_ms() - 60_000; + + let stale_tasks: Vec = potentially_stale_tasks.into_iter().filter(|task| { + match (task.state, task.start_time) { + // `run` is atomically set to `Started` and adorned with a `start_time`. disagreement + // between the two means this run is corrupt and should be reaped. + (RunState::Started, None) => { + true + }, + (RunState::Started, Some(start_time)) => { + start_time < stale_threshold + } + // and if it's not `started`, it's either pending (not assigned yet, so not stale), or + // one of the complete statuses. + _ => { + false + } + } + }).collect(); + + for task in stale_tasks.iter() { + eprintln!("looks like task {} is stale, reaping", task.id); + dbctx.reap_task(task.id).expect("works"); + } +} diff --git a/ci-lib-core/Cargo.toml b/ci-lib-core/Cargo.toml new file mode 100644 index 0000000..5ec649a --- /dev/null +++ b/ci-lib-core/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "ci-lib-core" +version = "0.0.1" +authors = [ "iximeow " ] +license = "0BSD" +edition = "2021" +description = "shared code across the ci project that is applicable for all targets" + +[lib] + +[dependencies] +serde = { version = "*", features = ["derive"] } +rusqlite = { version = "*", features = ["bundled"] } diff --git a/ci-lib-core/src/dbctx.rs b/ci-lib-core/src/dbctx.rs new file mode 100644 index 0000000..7493030 --- /dev/null +++ b/ci-lib-core/src/dbctx.rs @@ -0,0 +1,647 @@ +use std::sync::Mutex; +// use futures_util::StreamExt; +use rusqlite::{Connection, OptionalExtension}; +use std::time::{SystemTime, UNIX_EPOCH}; +// use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use std::path::Path; +use std::path::PathBuf; +use std::ops::Deref; + +use crate::sql; + +use crate::sql::ArtifactRecord; +use crate::sql::Run; +use crate::sql::TokenValidity; +use crate::sql::MetricRecord; +use crate::sql::PendingRun; +use crate::sql::Job; +use crate::sql::Remote; +use crate::sql::Repo; + +const TOKEN_EXPIRY_MS: u64 = 1000 * 60 * 30; + +pub struct DbCtx { + pub config_path: PathBuf, + // don't love this but.. for now... + pub conn: Mutex, +} + +impl DbCtx { + pub fn new>(config_path: P, db_path: P) -> Self { + DbCtx { + config_path: config_path.as_ref().to_owned(), + conn: Mutex::new(Connection::open(db_path).unwrap()) + } + } + + fn conn<'a>(&'a self) -> impl Deref + 'a { + self.conn.lock().unwrap() + } + + pub fn create_tables(&self) -> Result<(), ()> { + let conn = self.conn.lock().unwrap(); + conn.execute(sql::CREATE_ARTIFACTS_TABLE, ()).unwrap(); + conn.execute(sql::CREATE_JOBS_TABLE, ()).unwrap(); + conn.execute(sql::CREATE_METRICS_TABLE, ()).unwrap(); + conn.execute(sql::CREATE_COMMITS_TABLE, ()).unwrap(); + conn.execute(sql::CREATE_REPOS_TABLE, ()).unwrap(); + conn.execute(sql::CREATE_REPO_NAME_INDEX, ()).unwrap(); + conn.execute(sql::CREATE_REMOTES_TABLE, ()).unwrap(); + conn.execute(sql::CREATE_REMOTES_INDEX, ()).unwrap(); + conn.execute(sql::CREATE_RUNS_TABLE, ()).unwrap(); + conn.execute(sql::CREATE_HOSTS_TABLE, ()).unwrap(); + + Ok(()) + } + + pub fn insert_metric(&self, run_id: u64, name: &str, value: &str) -> Result<(), String> { + let conn = self.conn.lock().unwrap(); + conn + .execute( + "insert into metrics (run_id, name, value) values (?1, ?2, ?3) on conflict (run_id, name) do update set value=excluded.value", + (run_id, name, value) + ) + .expect("can upsert"); + Ok(()) + } + + pub fn new_commit(&self, sha: &str) -> Result { + let conn = self.conn.lock().unwrap(); + conn + .execute( + "insert into commits (sha) values (?1)", + [sha.clone()] + ) + .expect("can insert"); + + Ok(conn.last_insert_rowid() as u64) + } + + pub fn new_repo(&self, name: &str) -> Result { + let conn = self.conn.lock().unwrap(); + conn + .execute( + "insert into repos (repo_name) values (?1)", + [name.clone()] + ) + .map_err(|e| { + format!("{:?}", e) + })?; + + Ok(conn.last_insert_rowid() as u64) + } + + pub async fn finalize_artifact(&self, artifact_id: u64) -> Result<(), String> { + let conn = self.conn.lock().unwrap(); + conn + .execute( + "update artifacts set completed_time=?1 where id=?2", + (crate::now_ms(), artifact_id) + ) + .map(|_| ()) + .map_err(|e| { + format!("{:?}", e) + }) + } + + pub fn lookup_artifact(&self, run_id: u64, artifact_id: u64) -> Result, String> { + let conn = self.conn.lock().unwrap(); + conn + .query_row(sql::ARTIFACT_BY_ID, [artifact_id, run_id], |row| { + let (id, run_id, name, desc, created_time, completed_time) = row.try_into().unwrap(); + + Ok(ArtifactRecord { + id, run_id, name, desc, created_time, completed_time + }) + }) + .optional() + .map_err(|e| e.to_string()) + } + + pub fn commit_sha(&self, commit_id: u64) -> Result { + self.conn.lock() + .unwrap() + .query_row( + "select sha from commits where id=?1", + [commit_id], + |row| { row.get(0) } + ) + .map_err(|e| e.to_string()) + } + + pub fn job_for_commit(&self, sha: &str) -> Result, String> { + self.conn.lock() + .unwrap() + .query_row( + "select id from commits where sha=?1", + [sha], + |row| { row.get(0) } + ) + .optional() + .map_err(|e| e.to_string()) + } + + pub fn run_for_token(&self, token: &str) -> Result, TokenValidity)>, String> { + self.conn.lock() + .unwrap() + .query_row( + "select id, artifacts_path, started_time, run_timeout from runs where build_token=?1", + [token], + |row| { + let timeout: Option = row.get(3).unwrap(); + let timeout = timeout.unwrap_or(TOKEN_EXPIRY_MS); + + let now = crate::now_ms(); + + let time: Option = row.get(2).unwrap(); + let validity = if let Some(time) = time { + if now > time + timeout { + TokenValidity::Expired + } else { + TokenValidity::Valid + } + } else { + TokenValidity::Invalid + }; + Ok((row.get(0).unwrap(), row.get(1).unwrap(), validity)) + } + ) + .optional() + .map_err(|e| e.to_string()) + } + + pub fn job_by_id(&self, id: u64) -> Result, String> { + self.conn.lock() + .unwrap() + .query_row(crate::sql::JOB_BY_ID, [id], |row| { + let (id, source, created_time, remote_id, commit_id, run_preferences) = row.try_into().unwrap(); + + Ok(Job { + id, source, created_time, remote_id, commit_id, run_preferences + }) + }) + .optional() + .map_err(|e| e.to_string()) + } + + pub fn remote_by_path_and_api(&self, api: &str, path: &str) -> Result, String> { + self.conn.lock() + .unwrap() + .query_row("select id, repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path from remotes where remote_api=?1 and remote_path=?2", [api, path], |row| { + let (id, repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path) = row.try_into().unwrap(); + + Ok(Remote { + id, repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path + }) + }) + .optional() + .map_err(|e| e.to_string()) + } + + pub fn remote_by_id(&self, id: u64) -> Result, String> { + self.conn.lock() + .unwrap() + .query_row("select id, repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path from remotes where id=?1", [id], |row| { + let (id, repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path) = row.try_into().unwrap(); + + Ok(Remote { + id, repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path + }) + }) + .optional() + .map_err(|e| e.to_string()) + } + + pub fn repo_id_by_remote(&self, remote_id: u64) -> Result, String> { + self.conn.lock() + .unwrap() + .query_row("select repo_id from remotes where id=?1", [remote_id], |row| row.get(0)) + .optional() + .map_err(|e| e.to_string()) + } + + pub fn repo_id_by_name(&self, repo_name: &str) -> Result, String> { + self.conn.lock() + .unwrap() + .query_row("select id from repos where repo_name=?1", [repo_name], |row| row.get(0)) + .optional() + .map_err(|e| e.to_string()) + } + + pub fn new_remote(&self, repo_id: u64, remote: &str, remote_kind: &str, config_path: &str) -> Result { + let (remote_path, remote_api, remote_url, remote_git_url) = match remote_kind { + "github" => { + (remote.to_owned(), remote_kind.to_owned(), format!("https://www.github.com/{}", remote), format!("https://www.github.com/{}.git", remote)) + }, + "github-email" => { + (remote.to_owned(), "email".to_owned(), format!("https://www.github.com/{}", remote), format!("http://www.github.com/{}.git", remote)) + }, + other => { + panic!("unsupported remote kind: {}", other); + } + }; + + let conn = self.conn.lock().unwrap(); + conn + .execute( + "insert into remotes (repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path) values (?1, ?2, ?3, ?4, ?5, ?6);", + (repo_id, remote_path, remote_api, remote_url, remote_git_url, config_path) + ) + .expect("can insert"); + + Ok(conn.last_insert_rowid() as u64) + } + + pub fn new_job(&self, remote_id: u64, sha: &str, pusher: Option<&str>, repo_default_run_pref: Option) -> Result { + // TODO: potential race: if two remotes learn about a commit at the same time and we decide + // to create two jobs at the same time, this might return an incorrect id if the insert + // didn't actually insert a new row. + let commit_id = self.new_commit(sha).expect("can create commit record"); + + let created_time = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("now is before epoch") + .as_millis() as u64; + + let conn = self.conn.lock().unwrap(); + + let rows_modified = conn.execute( + "insert into jobs (remote_id, commit_id, created_time, source, run_preferences) values (?1, ?2, ?3, ?4, ?5);", + (remote_id, commit_id, created_time, pusher, repo_default_run_pref) + ).unwrap(); + + assert_eq!(1, rows_modified); + + let job_id = conn.last_insert_rowid() as u64; + + Ok(job_id) + } + + pub fn new_run(&self, job_id: u64, host_preference: Option) -> Result { + let created_time = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("now is before epoch") + .as_millis() as u64; + + let conn = self.conn.lock().unwrap(); + + let rows_modified = conn.execute( + "insert into runs (job_id, state, created_time, host_preference) values (?1, ?2, ?3, ?4);", + (job_id, crate::sql::RunState::Pending as u64, created_time, host_preference) + ).unwrap(); + + assert_eq!(1, rows_modified); + + let run_id = conn.last_insert_rowid() as u64; + + Ok(PendingRun { + id: run_id, + job_id, + create_time: created_time, + }) + } + + pub fn reap_task(&self, task_id: u64) -> Result<(), String> { + let conn = self.conn.lock().unwrap(); + + conn.execute( + "update runs set final_status=\"lost signal\", state=4 where id=?1;", + [task_id] + ).unwrap(); + + Ok(()) + } + + pub fn metrics_for_run(&self, run: u64) -> Result, String> { + let conn = self.conn.lock().unwrap(); + + let mut metrics_query = conn.prepare(sql::METRICS_FOR_RUN).unwrap(); + let mut result = metrics_query.query([run]).unwrap(); + let mut metrics = Vec::new(); + + while let Some(row) = result.next().unwrap() { + let (id, run_id, name, value): (u64, u64, String, String) = row.try_into().unwrap(); + metrics.push(MetricRecord { id, run_id, name, value }); + } + + Ok(metrics) + } + + pub fn artifacts_for_run(&self, run: u64, limit: Option) -> Result, String> { + let conn = self.conn.lock().unwrap(); + + let mut artifacts_query = conn.prepare(sql::LAST_ARTIFACTS_FOR_RUN).unwrap(); + let mut result = artifacts_query.query([run, limit.unwrap_or(65535)]).unwrap(); + let mut artifacts = Vec::new(); + + while let Some(row) = result.next().unwrap() { + let (id, run_id, name, desc, created_time, completed_time): (u64, u64, String, String, u64, Option) = row.try_into().unwrap(); + artifacts.push(ArtifactRecord { id, run_id, name, desc, created_time, completed_time }); + } + + Ok(artifacts) + } + + pub fn repo_by_id(&self, id: u64) -> Result, String> { + self.conn.lock() + .unwrap() + .query_row("select id, repo_name, default_run_preference from repos where id=?1", [id], |row| { + let (id, repo_name, default_run_preference) = row.try_into().unwrap(); + Ok(Repo { + id, + name: repo_name, + default_run_preference, + }) + }) + .optional() + .map_err(|e| e.to_string()) + } + + pub fn get_repos(&self) -> Result, String> { + let conn = self.conn.lock().unwrap(); + + let mut repos_query = conn.prepare(sql::ALL_REPOS).unwrap(); + let mut repos = repos_query.query([]).unwrap(); + let mut result = Vec::new(); + + while let Some(row) = repos.next().unwrap() { + let (id, repo_name, default_run_preference) = row.try_into().unwrap(); + result.push(Repo { + id, + name: repo_name, + default_run_preference, + }); + } + + Ok(result) + } + + pub fn last_job_from_remote(&self, id: u64) -> Result, String> { + self.recent_jobs_from_remote(id, 1) + .map(|mut jobs| jobs.pop()) + } + + pub fn job_by_commit_id(&self, commit_id: u64) -> Result, String> { + let conn = self.conn.lock().unwrap(); + + conn + .query_row(sql::JOB_BY_COMMIT_ID, [commit_id], |row| { + let (id, source, created_time, remote_id, commit_id, run_preferences) = row.try_into().unwrap(); + Ok(Job { + id, + remote_id, + commit_id, + created_time, + source, + run_preferences, + }) + }) + .optional() + .map_err(|e| e.to_string()) + } + + pub fn recent_jobs_from_remote(&self, id: u64, limit: u64) -> Result, String> { + let conn = self.conn.lock().unwrap(); + + let mut job_query = conn.prepare(sql::LAST_JOBS_FROM_REMOTE).unwrap(); + let mut result = job_query.query([id, limit]).unwrap(); + + let mut jobs = Vec::new(); + + while let Some(row) = result.next().unwrap() { + let (id, source, created_time, remote_id, commit_id, run_preferences) = row.try_into().unwrap(); + jobs.push(Job { + id, + remote_id, + commit_id, + created_time, + source, + run_preferences, + }); + } + + Ok(jobs) + } + + pub fn get_active_runs(&self) -> Result, String> { + let conn = self.conn.lock().unwrap(); + + let mut started_query = conn.prepare(sql::ACTIVE_RUNS).unwrap(); + let mut runs = started_query.query([]).unwrap(); + let mut started = Vec::new(); + + while let Some(row) = runs.next().unwrap() { + started.push(Self::row2run(row)); + } + + Ok(started) + } + + pub fn get_pending_runs(&self, host_id: Option) -> Result, String> { + let conn = self.conn.lock().unwrap(); + + let mut pending_query = conn.prepare(sql::PENDING_RUNS).unwrap(); + let mut runs = pending_query.query([host_id]).unwrap(); + let mut pending = Vec::new(); + + while let Some(row) = runs.next().unwrap() { + let (id, job_id, create_time) = row.try_into().unwrap(); + let run = PendingRun { + id, + job_id, + create_time, + }; + pending.push(run); + } + + Ok(pending) + } + + pub fn jobs_needing_task_runs_for_host(&self, host_id: u64) -> Result, String> { + // for jobs that this host has not run, we'll arbitrarily say that we won't generate new + // runs for jobs more than a day old. + // + // we don't want to rebuild the entire history every time we see a new host by default; if + // you really want to rebuild all of history on a new host, use `ci_ctl` to prepare the + // runs. + let cutoff = crate::now_ms() - 24 * 3600 * 1000; + + let conn = self.conn.lock().unwrap(); + + let mut jobs_needing_task_runs = conn.prepare(sql::JOBS_NEEDING_HOST_RUN).unwrap(); + let mut job_rows = jobs_needing_task_runs.query([cutoff, host_id]).unwrap(); + let mut jobs = Vec::new(); + + while let Some(row) = job_rows.next().unwrap() { + let (id, source, created_time, remote_id, commit_id, run_preferences) = row.try_into().unwrap(); + + jobs.push(Job { + id, source, created_time, remote_id, commit_id, run_preferences, + }); + } + + Ok(jobs) + } + + + pub fn remotes_by_repo(&self, repo_id: u64) -> Result, String> { + let mut remotes: Vec = Vec::new(); + + let conn = self.conn.lock().unwrap(); + let mut remotes_query = conn.prepare(crate::sql::REMOTES_FOR_REPO).unwrap(); + let mut remote_results = remotes_query.query([repo_id]).unwrap(); + + while let Some(row) = remote_results.next().unwrap() { + let (id, repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path) = row.try_into().unwrap(); + remotes.push(Remote { id, repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path }); + } + + Ok(remotes) + } + + /// try to find a host close to `host_info`, but maybe not an exact match. + /// + /// specifically, we'll ignore microcode and family/os - enough that measurements ought to be + /// comparable but maybe not perfectly so. + pub fn find_id_like_host(&self, host_info: &crate::protocol::HostInfo) -> Result, String> { + self.conn.lock() + .unwrap() + .query_row( + "select id from hosts where \ + hostname=?1 and cpu_vendor_id=?2 and cpu_model_name=?3 and cpu_family=?4 and \ + cpu_model=?5 and cpu_max_freq_khz=?6 and cpu_cores=?7 and mem_total=?8 and \ + arch=?9);", + ( + &host_info.hostname, + &host_info.cpu_info.vendor_id, + &host_info.cpu_info.model_name, + &host_info.cpu_info.family, + &host_info.cpu_info.model, + &host_info.cpu_info.max_freq, + &host_info.cpu_info.cores, + &host_info.memory_info.total, + &host_info.env_info.arch, + ), + |row| { row.get(0) } + ) + .map_err(|e| e.to_string()) + } + + /// get an id for the host described by `host_info`. this may create a new record if no such + /// host exists. + pub fn id_for_host(&self, host_info: &crate::protocol::HostInfo) -> Result { + let conn = self.conn.lock().unwrap(); + + conn + .execute( + "insert or ignore into hosts \ + (\ + hostname, cpu_vendor_id, cpu_model_name, cpu_family, \ + cpu_model, cpu_microcode, cpu_max_freq_khz, cpu_cores, \ + mem_total, arch, family, os\ + ) values (\ + ?1, ?2, ?3, ?4, \ + ?5, ?6, ?7, ?8, \ + ?9, ?10, ?11, ?12 \ + );", + ( + &host_info.hostname, + &host_info.cpu_info.vendor_id, + &host_info.cpu_info.model_name, + &host_info.cpu_info.family, + &host_info.cpu_info.model, + &host_info.cpu_info.microcode, + &host_info.cpu_info.max_freq, + &host_info.cpu_info.cores, + &host_info.memory_info.total, + &host_info.env_info.arch, + &host_info.env_info.family, + &host_info.env_info.os, + ) + ) + .expect("can insert"); + + conn + .query_row( + "select id from hosts where \ + hostname=?1 and cpu_vendor_id=?2 and cpu_model_name=?3 and cpu_family=?4 and \ + cpu_model=?5 and cpu_microcode=?6 and cpu_max_freq_khz=?7 and \ + cpu_cores=?8 and mem_total=?9 and arch=?10 and family=?11 and os=?12;", + ( + &host_info.hostname, + &host_info.cpu_info.vendor_id, + &host_info.cpu_info.model_name, + &host_info.cpu_info.family, + &host_info.cpu_info.model, + &host_info.cpu_info.microcode, + &host_info.cpu_info.max_freq, + &host_info.cpu_info.cores, + &host_info.memory_info.total, + &host_info.env_info.arch, + &host_info.env_info.family, + &host_info.env_info.os, + ), + |row| { row.get(0) } + ) + .map_err(|e| e.to_string()) + } + + pub fn host_model_info(&self, host_id: u64) -> Result<(String, String, String, String, u64), String> { + let conn = self.conn.lock().unwrap(); + conn + .query_row("select hostname, cpu_vendor_id, cpu_family, cpu_model, cpu_max_freq_khz from hosts where id=?1;", [host_id], |row| { + Ok(( + row.get(0).unwrap(), + row.get(1).unwrap(), + row.get(2).unwrap(), + row.get(3).unwrap(), + row.get(4).unwrap(), + )) + }) + .map_err(|e| e.to_string()) + } + + pub fn runs_for_job_one_per_host(&self, job_id: u64) -> Result, String> { + let conn = self.conn.lock().unwrap(); + let mut runs_query = conn.prepare(crate::sql::RUNS_FOR_JOB).unwrap(); + let mut runs_results = runs_query.query([job_id]).unwrap(); + + let mut results = Vec::new(); + + while let Some(row) = runs_results.next().unwrap() { + results.push(Self::row2run(row)); + } + + Ok(results) + } + + pub fn last_run_for_job(&self, job_id: u64) -> Result, String> { + let conn = self.conn.lock().unwrap(); + + conn + .query_row(sql::LAST_RUN_FOR_JOB, [job_id], |row| { + Ok(Self::row2run(row)) + }) + .optional() + .map_err(|e| e.to_string()) + } + + pub(crate) fn row2run(row: &rusqlite::Row) -> Run { + let (id, job_id, artifacts_path, state, host_id, build_token, create_time, start_time, complete_time, run_timeout, build_result, final_text) = row.try_into().unwrap(); + let state: u8 = state; + Run { + id, + job_id, + artifacts_path, + state: state.try_into().unwrap(), + host_id, + create_time, + start_time, + complete_time, + build_token, + run_timeout, + build_result, + final_text, + } + } +} diff --git a/ci-lib-core/src/lib.rs b/ci-lib-core/src/lib.rs new file mode 100644 index 0000000..c20ce8e --- /dev/null +++ b/ci-lib-core/src/lib.rs @@ -0,0 +1,13 @@ +use std::time::{SystemTime, UNIX_EPOCH}; + + +pub mod protocol; +pub mod sql; +pub mod dbctx; + +pub fn now_ms() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("now is later than epoch") + .as_millis() as u64 +} diff --git a/ci-lib-core/src/protocol.rs b/ci-lib-core/src/protocol.rs new file mode 100644 index 0000000..c7a9318 --- /dev/null +++ b/ci-lib-core/src/protocol.rs @@ -0,0 +1,114 @@ +use serde::{Serialize, Deserialize}; + +#[derive(Serialize, Deserialize, Debug)] +#[serde(tag = "kind")] +#[serde(rename_all = "snake_case")] +pub enum ClientProto { + Started, + ArtifactCreate, + NewTask(RequestedJob), + NewTaskPlease { allowed_pushers: Option>, host_info: HostInfo }, + Metric { name: String, value: String }, + Command(CommandInfo), + TaskStatus(TaskInfo), + Ping, + Pong, +} + +#[derive(Serialize, Deserialize, Debug)] +#[serde(tag = "command_info")] +#[serde(rename_all = "snake_case")] +pub enum CommandInfo { + Started { command: Vec, cwd: Option, id: u32 }, + Finished { exit_code: Option, id: u32 }, +} + +#[derive(Serialize, Deserialize, Debug)] +#[serde(tag = "task_info")] +#[serde(rename_all = "snake_case")] +pub enum TaskInfo { + Finished { status: String }, + Interrupted { status: String, description: Option }, +} + +impl ClientProto { + pub fn metric(name: impl Into, value: impl Into) -> Self { + ClientProto::Metric { name: name.into(), value: value.into() } + } + + pub fn command(state: CommandInfo) -> Self { + ClientProto::Command(state) + } + + pub fn new_task_please(allowed_pushers: Option>, host_info: HostInfo) -> Self { + ClientProto::NewTaskPlease { allowed_pushers, host_info } + } + + pub fn task_status(state: TaskInfo) -> Self { + ClientProto::TaskStatus(state) + } + + pub fn new_task(task: RequestedJob) -> Self { + ClientProto::NewTask(task) + } +} + +impl CommandInfo { + pub fn started(command: impl Into>, cwd: Option<&str>, id: u32) -> Self { + CommandInfo::Started { command: command.into(), cwd: cwd.map(ToOwned::to_owned), id } + } + + pub fn finished(exit_code: Option, id: u32) -> Self { + CommandInfo::Finished { exit_code, id } + } +} + +impl TaskInfo { + pub fn finished(status: impl Into) -> Self { + TaskInfo::Finished { status: status.into() } + } + + pub fn interrupted(status: impl Into, description: impl Into>) -> Self { + TaskInfo::Interrupted { status: status.into(), description: description.into() } + } +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct HostInfo { + pub hostname: String, + pub cpu_info: CpuInfo, + pub memory_info: MemoryInfo, + pub env_info: EnvInfo, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct CpuInfo { + pub model_name: String, + pub microcode: String, + pub cores: u32, + pub vendor_id: String, + pub family: String, + pub model: String, + // clock speed in khz + pub max_freq: u64, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct MemoryInfo { + pub total: String, + pub available: String, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct EnvInfo { + pub arch: String, + pub family: String, + pub os: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RequestedJob { + pub commit: String, + pub remote_url: String, + pub build_token: String, +} diff --git a/ci-lib-core/src/sql.rs b/ci-lib-core/src/sql.rs new file mode 100644 index 0000000..2aeb52b --- /dev/null +++ b/ci-lib-core/src/sql.rs @@ -0,0 +1,319 @@ +#![allow(dead_code)] + +use std::convert::TryFrom; + +#[derive(Debug, Clone)] +pub struct PendingRun { + pub id: u64, + pub job_id: u64, + pub create_time: u64, +} + +impl Run { + fn into_pending_run(self) -> PendingRun { + PendingRun { + id: self.id, + job_id: self.job_id, + create_time: self.create_time, + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum TokenValidity { + Expired, + Invalid, + Valid, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct MetricRecord { + pub id: u64, + pub run_id: u64, + pub name: String, + pub value: String +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ArtifactRecord { + pub id: u64, + pub run_id: u64, + pub name: String, + pub desc: String, + pub created_time: u64, + pub completed_time: Option, +} + +#[derive(Debug, Clone)] +pub struct Repo { + pub id: u64, + pub name: String, + pub default_run_preference: Option, +} + +#[derive(Debug)] +pub struct Remote { + pub id: u64, + pub repo_id: u64, + pub remote_path: String, + pub remote_api: String, + pub remote_url: String, + pub remote_git_url: String, + pub notifier_config_path: String, +} + +// a job tracks when we became aware of a commit from remote. typically a job will have a 1-1 +// relationship with commits, and potentially many *runs* of that job. +#[derive(Debug, Clone)] +pub struct Job { + pub id: u64, + pub remote_id: u64, + pub commit_id: u64, + pub created_time: u64, + pub source: Option, + pub run_preferences: Option, +} + +// a run tracks the intent or obligation to have some runner somewhere run a goodfile and report +// results. a job may have many runs from many different hosts rebuliding history, or reruns of the +// same job on the same hardware to collect more datapoints on the operation. +#[derive(Debug, Clone)] +pub struct Run { + pub id: u64, + pub job_id: u64, + pub artifacts_path: Option, + pub state: RunState, + pub host_id: Option, + pub create_time: u64, + pub start_time: Option, + pub complete_time: Option, + pub build_token: Option, + pub run_timeout: Option, + pub build_result: Option, + pub final_text: Option, +} + +#[derive(Debug, Clone)] +pub enum JobResult { + Pass = 0, + Fail = 1, +} + +#[derive(Debug, Copy, Clone, PartialEq)] +pub enum RunState { + Pending = 0, + Started = 1, + Finished = 2, + Error = 3, + Invalid = 4, +} + +impl TryFrom for RunState { + type Error = String; + + fn try_from(value: u8) -> Result { + match value { + 0 => Ok(RunState::Pending), + 1 => Ok(RunState::Started), + 2 => Ok(RunState::Finished), + 3 => Ok(RunState::Error), + 4 => Ok(RunState::Invalid), + other => Err(format!("invalid job state: {}", other)), + } + } +} + +/* +pub(crate) fn row2run(row: &rusqlite::Row) -> Run { + let (id, job_id, artifacts_path, state, host_id, build_token, create_time, start_time, complete_time, run_timeout, build_result, final_text) = row.try_into().unwrap(); + let state: u8 = state; + Run { + id, + job_id, + artifacts_path, + state: state.try_into().unwrap(), + host_id, + create_time, + start_time, + complete_time, + build_token, + run_timeout, + build_result, + final_text, + } +} +*/ + +// remote_id is the remote from which we were notified. this is necessary so we know which remote +// to pull from to actually run the job. +pub const CREATE_JOBS_TABLE: &'static str = "\ + CREATE TABLE IF NOT EXISTS jobs (id INTEGER PRIMARY KEY AUTOINCREMENT, + source TEXT, + created_time INTEGER, + remote_id INTEGER, + commit_id INTEGER, + run_preferences TEXT);"; + +pub const CREATE_METRICS_TABLE: &'static str = "\ + CREATE TABLE IF NOT EXISTS metrics (id INTEGER PRIMARY KEY AUTOINCREMENT, + job_id INTEGER, + name TEXT, + value TEXT, + UNIQUE(job_id, name) + );"; + +pub const CREATE_COMMITS_TABLE: &'static str = "\ + CREATE TABLE IF NOT EXISTS commits (id INTEGER PRIMARY KEY AUTOINCREMENT, sha TEXT UNIQUE);"; + +pub const CREATE_REPOS_TABLE: &'static str = "\ + CREATE TABLE IF NOT EXISTS repos (id INTEGER PRIMARY KEY AUTOINCREMENT, + repo_name TEXT, + default_run_preference TEXT);"; + +// remote_api is `github` or NULL for now. hopefully a future cgit-style notifier one day. +// remote_path is some unique identifier for the relevant remote. +// * for `github` remotes, this will be `owner/repo`. +// * for others.. who knows. +// remote_url is a url for human interaction with the remote (think https://git.iximeow.net/zvm) +// remote_git_url is a url that can be `git clone`'d to fetch sources +pub const CREATE_REMOTES_TABLE: &'static str = "\ + CREATE TABLE IF NOT EXISTS remotes (id INTEGER PRIMARY KEY AUTOINCREMENT, + repo_id INTEGER, + remote_path TEXT, + remote_api TEXT, + remote_url TEXT, + remote_git_url TEXT, + notifier_config_path TEXT);"; + +pub const CREATE_ARTIFACTS_TABLE: &'static str = "\ + CREATE TABLE IF NOT EXISTS artifacts (id INTEGER PRIMARY KEY AUTOINCREMENT, + run_id INTEGER, + name TEXT, + desc TEXT, + created_time INTEGER, + completed_time INTEGER);"; + +pub const CREATE_RUNS_TABLE: &'static str = "\ + CREATE TABLE IF NOT EXISTS runs (id INTEGER PRIMARY KEY AUTOINCREMENT, + job_id INTEGER, + artifacts_path TEXT, + state INTEGER NOT NULL, + host_id INTEGER, + build_token TEXT, + created_time INTEGER, + started_time INTEGER, + complete_time INTEGER, + run_timeout INTEGER, + build_result INTEGER, + final_status TEXT);"; + +pub const CREATE_HOSTS_TABLE: &'static str = "\ + CREATE TABLE IF NOT EXISTS hosts (id INTEGER PRIMARY KEY AUTOINCREMENT, + hostname TEXT, + cpu_vendor_id TEXT, + cpu_model_name TEXT, + cpu_family TEXT, + cpu_model TEXT, + cpu_microcode TEXT, + cpu_max_freq_khz INTEGER, + cpu_cores INTEGER, + mem_total TEXT, + arch TEXT, + family TEXT, + os TEXT, + UNIQUE(hostname, cpu_vendor_id, cpu_model_name, cpu_family, cpu_model, cpu_microcode, cpu_cores, mem_total, arch, family, os));"; + +pub const CREATE_REMOTES_INDEX: &'static str = "\ + CREATE INDEX IF NOT EXISTS 'repo_to_remote' ON remotes(repo_id);"; + +pub const CREATE_REPO_NAME_INDEX: &'static str = "\ + CREATE UNIQUE INDEX IF NOT EXISTS 'repo_names' ON repos(repo_name);"; + +pub const PENDING_RUNS: &'static str = "\ + select id, job_id, created_time, host_preference from runs where state=0 and (host_preference=?1 or host_preference is null) order by created_time desc;"; + +pub const JOBS_NEEDING_HOST_RUN: &'static str = "\ + select jobs.id, jobs.source, jobs.created_time, jobs.remote_id, jobs.commit_id, jobs.run_preferences from jobs \ + where jobs.run_preferences=\"all\" and jobs.created_time > ?1 \ + and not exists \ + (select 1 from runs r2 where r2.job_id = jobs.id and r2.host_id = ?2);"; + +pub const ACTIVE_RUNS: &'static str = "\ + select id, + job_id, + artifacts_path, + state, + host_id, + build_token, + created_time, + started_time, + complete_time, + run_timeout, + build_result, + final_status from runs where state=1 or state=0;"; + +pub const LAST_ARTIFACTS_FOR_RUN: &'static str = "\ + select * from artifacts where run_id=?1 and (name like \"%(stderr)%\" or name like \"%(stdout)%\") order by id desc limit ?2;"; + +pub const JOB_BY_COMMIT_ID: &'static str = "\ + select id, source, created_time, remote_id, commit_id, run_preferences from jobs where commit_id=?1;"; + +pub const ARTIFACT_BY_ID: &'static str = "\ + select * from artifacts where id=?1 and run_id=?2;"; + +pub const JOB_BY_ID: &'static str = "\ + select id, source, created_time, remote_id, commit_id, run_preferences from jobs where id=?1"; + +pub const METRICS_FOR_RUN: &'static str = "\ + select * from metrics where run_id=?1 order by id asc;"; + +pub const METRICS_FOR_JOB: &'static str = "\ + select metrics.id, metrics.run_id, metrics.name, metrics.value from metrics \ + join runs on runs.id=metrics.run_id \ + where runs.job_id=?1 \ + order by metrics.run_id desc, metrics.id desc;"; + +pub const COMMIT_TO_ID: &'static str = "\ + select id from commits where sha=?1;"; + +pub const REMOTES_FOR_REPO: &'static str = "\ + select * from remotes where repo_id=?1;"; + +pub const ALL_REPOS: &'static str = "\ + select id, repo_name, default_run_preference from repos;"; + +pub const LAST_JOBS_FROM_REMOTE: &'static str = "\ + select id, source, created_time, remote_id, commit_id, run_preferences from jobs where remote_id=?1 order by created_time desc limit ?2;"; + +pub const LAST_RUN_FOR_JOB: &'static str = "\ + select id, + job_id, + artifacts_path, + state, + host_id, + build_token, + created_time, + started_time, + complete_time, + run_timeout, + build_result, + final_status from runs where job_id=?1 order by started_time desc limit 1;"; + +pub const RUNS_FOR_JOB: &'static str = "\ + select id, + job_id, + artifacts_path, + state, + host_id, + build_token, + created_time, + started_time, + complete_time, + run_timeout, + build_result, + final_status from runs where job_id=?1 group by host_id order by started_time desc, state asc;"; + +pub const SELECT_ALL_RUNS_WITH_JOB_INFO: &'static str = "\ + select jobs.id as job_id, runs.id as run_id, runs.state, runs.created_time, jobs.commit_id, jobs.run_preferences + from jobs join runs on jobs.id=runs.job_id + oder by runs.created_time asc;"; diff --git a/ci-lib-native/Cargo.toml b/ci-lib-native/Cargo.toml new file mode 100644 index 0000000..7a4e665 --- /dev/null +++ b/ci-lib-native/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "ci-lib-native" +version = "0.0.1" +authors = [ "iximeow " ] +license = "0BSD" +edition = "2021" +description = "shared code across the ci project that is applicable for native targets (uses tokio, etc)" + +[lib] + +[dependencies] +ci-lib-core = { path = "../ci-lib-core" } +tokio = { version = "*", features = ["full"] } +futures-util = "*" +axum = "*" +hyper = "*" +serde_json = "*" +serde = { version = "*", features = ["derive"] } +lettre = "*" +reqwest = "*" diff --git a/ci-lib-native/src/dbctx_ext.rs b/ci-lib-native/src/dbctx_ext.rs new file mode 100644 index 0000000..44436fc --- /dev/null +++ b/ci-lib-native/src/dbctx_ext.rs @@ -0,0 +1,62 @@ +use crate::io::ArtifactDescriptor; +use crate::notifier::{RemoteNotifier, NotifierConfig}; +use tokio::fs::{File, OpenOptions}; + +use ci_lib_core::dbctx::DbCtx; + +pub fn notifiers_by_repo(ctx: &DbCtx, repo_id: u64) -> Result, String> { + let remotes = ctx.remotes_by_repo(repo_id)?; + + let mut notifiers: Vec = Vec::new(); + + for remote in remotes.into_iter() { + match remote.remote_api.as_str() { + "github" => { + let mut notifier_path = ctx.config_path.clone(); + notifier_path.push(&remote.notifier_config_path); + + let notifier = RemoteNotifier { + remote_path: remote.remote_path, + notifier: NotifierConfig::github_from_file(¬ifier_path) + .expect("can load notifier config") + }; + notifiers.push(notifier); + }, + "email" => { + let mut notifier_path = ctx.config_path.clone(); + notifier_path.push(&remote.notifier_config_path); + + let notifier = RemoteNotifier { + remote_path: remote.remote_path, + notifier: NotifierConfig::email_from_file(¬ifier_path) + .expect("can load notifier config") + }; + notifiers.push(notifier); + } + other => { + eprintln!("unknown remote api kind: {:?}, remote is {:?}", other, &remote) + } + } + } + + Ok(notifiers) +} + +pub async fn reserve_artifact(ctx: &DbCtx, run_id: u64, name: &str, desc: &str) -> Result { + let artifact_id = { + let created_time = ci_lib_core::now_ms(); + let conn = ctx.conn.lock().unwrap(); + conn + .execute( + "insert into artifacts (run_id, name, desc, created_time) values (?1, ?2, ?3, ?4)", + (run_id, name, desc, created_time) + ) + .map_err(|e| { + format!("{:?}", e) + })?; + + conn.last_insert_rowid() as u64 + }; + + ArtifactDescriptor::new(run_id, artifact_id).await +} diff --git a/ci-lib-native/src/io.rs b/ci-lib-native/src/io.rs new file mode 100644 index 0000000..d41349c --- /dev/null +++ b/ci-lib-native/src/io.rs @@ -0,0 +1,174 @@ +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use futures_util::StreamExt; +use tokio::fs::File; +use std::io::Write; +use tokio::fs::OpenOptions; +use std::task::{Poll, Context}; +use std::pin::Pin; +use std::time::{UNIX_EPOCH, SystemTime}; +use std::sync::{Arc, Mutex}; + +#[derive(Clone)] +pub struct VecSink { + body: Arc>>, +} + +impl VecSink { + pub fn new() -> Self { + Self { body: Arc::new(Mutex::new(Vec::new())) } + } + + pub fn take_buf(&self) -> Vec { + std::mem::replace(&mut *self.body.lock().unwrap(), Vec::new()) + } +} + +impl tokio::io::AsyncWrite for VecSink { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &[u8] + ) -> Poll> { + self.body.lock().unwrap().extend_from_slice(buf); + Poll::Ready(Ok(buf.len())) + } + + fn poll_flush( + self: Pin<&mut Self>, + _cx: &mut Context + ) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + _cx: &mut Context + ) -> Poll> { + Poll::Ready(Ok(())) + } +} + +pub struct ArtifactStream { + sender: hyper::body::Sender, +} + +impl ArtifactStream { + pub fn new(sender: hyper::body::Sender) -> Self { + Self { sender } + } +} + +impl tokio::io::AsyncWrite for ArtifactStream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &[u8] + ) -> Poll> { + match self.get_mut().sender.try_send_data(buf.to_vec().into()) { + Ok(()) => { + Poll::Ready(Ok(buf.len())) + }, + _ => { + cx.waker().wake_by_ref(); + Poll::Pending + } + } + } + + fn poll_flush( + self: Pin<&mut Self>, + _cx: &mut Context + ) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + _cx: &mut Context + ) -> Poll> { + Poll::Ready(Ok(())) + } +} + + +pub struct ArtifactDescriptor { + job_id: u64, + pub artifact_id: u64, + file: File, +} + +impl ArtifactDescriptor { + pub async fn new(job_id: u64, artifact_id: u64) -> Result { + // TODO: jobs should be a configurable path + let path = format!("artifacts/{}/{}", job_id, artifact_id); + let file = OpenOptions::new() + .read(true) + .write(true) + .create_new(true) + .open(&path) + .await + .map_err(|e| format!("couldn't open artifact file {}: {}", path, e))?; + + Ok(ArtifactDescriptor { + job_id, + artifact_id, + file, + }) + } + + pub async fn store_all(&mut self, mut data: axum::extract::BodyStream) -> Result<(), String> { + loop { + let chunk = data.next().await; + + let chunk = match chunk { + Some(Ok(chunk)) => chunk, + Some(Err(e)) => { + eprintln!("error: {:?}", e); + return Err(format!("error reading: {:?}", e)); + } + None => { + eprintln!("body done?"); + return Ok(()); + } + }; + + let chunk = chunk.as_ref(); + + self.file.write_all(chunk).await + .map_err(|e| format!("failed to write: {:?}", e))?; + } + } +} + +pub async fn forward_data(source: &mut (impl AsyncRead + Unpin), dest: &mut (impl AsyncWrite + Unpin)) -> Result<(), String> { + let mut buf = vec![0; 1024 * 1024]; + loop { + let n_read = source.read(&mut buf).await + .map_err(|e| format!("failed to read: {:?}", e))?; + + if n_read == 0 { + eprintln!("done reading!"); + return Ok(()); + } + + dest.write_all(&buf[..n_read]).await + .map_err(|e| format!("failed to write: {:?}", e))?; + } +} +/* +pub async fn forward_data(source: &mut (impl AsyncRead + Unpin), dest: &mut ArtifactStream) -> Result<(), String> { + let mut buf = vec![0; 1024 * 1024]; + loop { + let n_read = source.read(&mut buf).await + .map_err(|e| format!("failed to read: {:?}", e))?; + + if n_read == 0 { + eprintln!("done reading!"); + return Ok(()); + } + + dest.sender.send_data(buf[..n_read].to_vec().into()).await + .map_err(|e| format!("failed to write: {:?}", e))?; + } +} +*/ diff --git a/ci-lib-native/src/lib.rs b/ci-lib-native/src/lib.rs new file mode 100644 index 0000000..74cb710 --- /dev/null +++ b/ci-lib-native/src/lib.rs @@ -0,0 +1,3 @@ +pub mod io; +pub mod dbctx_ext; +pub mod notifier; diff --git a/ci-lib-native/src/notifier.rs b/ci-lib-native/src/notifier.rs new file mode 100644 index 0000000..dd4a35c --- /dev/null +++ b/ci-lib-native/src/notifier.rs @@ -0,0 +1,181 @@ +use serde::{Deserialize, Serialize}; +use std::sync::Arc; +use axum::http::StatusCode; +use lettre::transport::smtp::authentication::{Credentials, Mechanism}; +use lettre::{Message, Transport}; +use lettre::transport::smtp::extension::ClientId; +use lettre::transport::smtp::client::{SmtpConnection, TlsParametersBuilder}; +use std::time::Duration; +use std::path::Path; + +use ci_lib_core::dbctx::DbCtx; + +pub struct RemoteNotifier { + pub remote_path: String, + pub notifier: NotifierConfig, +} + +#[derive(Serialize, Deserialize)] +#[serde(untagged)] +pub enum NotifierConfig { + GitHub { + token: String, + }, + Email { + username: String, + password: String, + mailserver: String, + from: String, + to: String, + } +} + +impl NotifierConfig { + pub fn github_from_file>(path: P) -> Result { + let path = path.as_ref(); + let bytes = std::fs::read(path) + .map_err(|e| format!("can't read notifier config at {}: {:?}", path.display(), e))?; + let config = serde_json::from_slice(&bytes) + .map_err(|e| format!("can't deserialize notifier config at {}: {:?}", path.display(), e))?; + + if matches!(config, NotifierConfig::GitHub { .. }) { + Ok(config) + } else { + Err(format!("config at {} doesn't look like a github config (but was otherwise valid?)", path.display())) + } + } + + pub fn email_from_file>(path: P) -> Result { + let path = path.as_ref(); + let bytes = std::fs::read(path) + .map_err(|e| format!("can't read notifier config at {}: {:?}", path.display(), e))?; + let config = serde_json::from_slice(&bytes) + .map_err(|e| format!("can't deserialize notifier config at {}: {:?}", path.display(), e))?; + + if matches!(config, NotifierConfig::Email { .. }) { + Ok(config) + } else { + Err(format!("config at {} doesn't look like an email config (but was otherwise valid?)", path.display())) + } + } +} + +impl RemoteNotifier { + pub async fn tell_pending_job(&self, ctx: &Arc, repo_id: u64, sha: &str, job_id: u64) -> Result<(), String> { + self.tell_job_status( + ctx, + repo_id, sha, job_id, + "pending", "build is queued", &format!("https://{}/{}/{}", "ci.butactuallyin.space", &self.remote_path, sha) + ).await + } + + pub async fn tell_complete_job(&self, ctx: &Arc, repo_id: u64, sha: &str, job_id: u64, desc: Result) -> Result<(), String> { + match desc { + Ok(status) => { + self.tell_job_status( + ctx, + repo_id, sha, job_id, + "success", &status, &format!("https://{}/{}/{}", "ci.butactuallyin.space", &self.remote_path, sha) + ).await + }, + Err(status) => { + self.tell_job_status( + ctx, + repo_id, sha, job_id, + "failure", &status, &format!("https://{}/{}/{}", "ci.butactuallyin.space", &self.remote_path, sha) + ).await + } + } + } + + pub async fn tell_job_status(&self, ctx: &Arc, repo_id: u64, sha: &str, job_id: u64, state: &str, desc: &str, target_url: &str) -> Result<(), String> { + match &self.notifier { + NotifierConfig::GitHub { token } => { + let status_info = serde_json::json!({ + "state": state, + "description": desc, + "target_url": target_url, + "context": "actuallyinspace runner", + }); + + // TODO: should pool (probably in ctx?) to have an upper bound in concurrent + // connections. + let client = reqwest::Client::new(); + let req = client.post(&format!("https://api.github.com/repos/{}/statuses/{}", &self.remote_path, sha)) + .body(serde_json::to_string(&status_info).expect("can stringify json")) + .header("content-type", "application/json") + .header("user-agent", "iximeow") + .header("authorization", format!("Bearer {}", token)) + .header("accept", "application/vnd.github+json"); + eprintln!("sending {:?}", req); + eprintln!(" body: {}", serde_json::to_string(&status_info).expect("can stringify json")); + let res = req + .send() + .await; + + match res { + Ok(res) => { + if res.status() == StatusCode::OK || res.status() == StatusCode::CREATED{ + Ok(()) + } else { + Err(format!("bad response: {}, response data: {:?}", res.status().as_u16(), res)) + } + } + Err(e) => { + Err(format!("failure sending request: {:?}", e)) + } + } + } + NotifierConfig::Email { username, password, mailserver, from, to } => { + eprintln!("[.] emailing {} for job {} via {}", state, &self.remote_path, mailserver); + + let subject = format!("{}: job for {}", state, &self.remote_path); + + let body = format!("{}", subject); + + // TODO: when ci.butactuallyin.space has valid certs again, ... fix this. + let tls = TlsParametersBuilder::new(mailserver.to_string()) + .dangerous_accept_invalid_certs(true) + .build() + .unwrap(); + + let mut mailer = SmtpConnection::connect( + mailserver, + Some(Duration::from_millis(5000)), + &ClientId::Domain("ci.butactuallyin.space".to_string()), + None, + None, + ).unwrap(); + + mailer.starttls( + &tls, + &ClientId::Domain("ci.butactuallyin.space".to_string()), + ).unwrap(); + + let resp = mailer.auth( + &[Mechanism::Plain, Mechanism::Login], + &Credentials::new(username.to_owned(), password.to_owned()) + ).unwrap(); + assert!(resp.is_positive()); + + let email = Message::builder() + .from(from.parse().unwrap()) + .to(to.parse().unwrap()) + .subject(&subject) + .body(body) + .unwrap(); + + match mailer.send(email.envelope(), &email.formatted()) { + Ok(_) => { + eprintln!("[+] notified {}@{}", username, mailserver); + Ok(()) + } + Err(e) => { + eprintln!("[-] could not send email: {:?}", e); + Err(e.to_string()) + } + } + } + } + } +} diff --git a/ci-runner/Cargo.toml b/ci-runner/Cargo.toml new file mode 100644 index 0000000..038ed14 --- /dev/null +++ b/ci-runner/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "ci-runner" +version = "0.0.1" +authors = [ "iximeow " ] +license = "0BSD" +edition = "2021" + +[[bin]] +name = "ci_runner" +path = "src/main.rs" + +[dependencies] +ci-lib-core = { path = "../ci-lib-core" } +ci-lib-native = { path = "../ci-lib-native" } + +libc = "*" +serde = "*" +serde_derive = "*" +serde_json = "*" +tokio = { version = "*", features = ["full"] } +reqwest = "*" +rlua = "*" +hyper = "*" +tracing = "*" +tracing-subscriber = "*" diff --git a/ci-runner/src/lua/mod.rs b/ci-runner/src/lua/mod.rs new file mode 100644 index 0000000..62ac68b --- /dev/null +++ b/ci-runner/src/lua/mod.rs @@ -0,0 +1,411 @@ +use crate::RunningJob; + +use rlua::prelude::*; + +use std::sync::{Arc, Mutex}; +use std::path::PathBuf; + +pub const DEFAULT_RUST_GOODFILE: &'static [u8] = include_bytes!("../../../config/goodfiles/rust.lua"); + +pub struct BuildEnv { + lua: Lua, + job: Arc>, +} + +#[derive(Debug)] +pub struct RunParams { + step: Option, + name: Option, + cwd: Option, +} + +pub struct CommandOutput { + pub exit_status: std::process::ExitStatus, + pub stdout: Vec, + pub stderr: Vec, +} + +mod lua_exports { + use crate::RunningJob; + use crate::lua::{CommandOutput, RunParams}; + + use std::sync::{Arc, Mutex}; + use std::path::PathBuf; + + use rlua::prelude::*; + + pub fn collect_build_args(command: LuaValue, params: LuaValue) -> Result<(Vec, RunParams), rlua::Error> { + let args = match command { + LuaValue::Table(table) => { + let len = table.len().expect("command table has a length"); + let mut command_args = Vec::new(); + for i in 0..len { + let value = table.get(i + 1).expect("command arg is gettble"); + match value { + LuaValue::String(s) => { + command_args.push(s.to_str().unwrap().to_owned()); + }, + other => { + return Err(LuaError::RuntimeError(format!("argument {} was not a string, was {:?}", i, other))); + } + }; + } + + command_args + }, + other => { + return Err(LuaError::RuntimeError(format!("argument 1 was not a table: {:?}", other))); + } + }; + + let params = match params { + LuaValue::Table(table) => { + let step = match table.get("step").expect("can get from table") { + LuaValue::String(v) => { + Some(v.to_str()?.to_owned()) + }, + LuaValue::Nil => { + None + }, + other => { + return Err(LuaError::RuntimeError(format!("params[\"step\"] must be a string"))); + } + }; + let name = match table.get("name").expect("can get from table") { + LuaValue::String(v) => { + Some(v.to_str()?.to_owned()) + }, + LuaValue::Nil => { + None + }, + other => { + return Err(LuaError::RuntimeError(format!("params[\"name\"] must be a string"))); + } + }; + let cwd = match table.get("cwd").expect("can get from table") { + LuaValue::String(v) => { + Some(v.to_str()?.to_owned()) + }, + LuaValue::Nil => { + None + }, + other => { + return Err(LuaError::RuntimeError(format!("params[\"cwd\"] must be a string"))); + } + }; + + RunParams { + step, + name, + cwd, + } + }, + LuaValue::Nil => { + RunParams { + step: None, + name: None, + cwd: None, + } + } + other => { + return Err(LuaError::RuntimeError(format!("argument 2 was not a table: {:?}", other))); + } + }; + + Ok((args, params)) + } + + pub fn build_command_impl(command: LuaValue, params: LuaValue, job_ctx: Arc>) -> Result<(), rlua::Error> { + let (args, params) = collect_build_args(command, params)?; + eprintln!("args: {:?}", args); + eprintln!(" params: {:?}", params); + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + rt.block_on(async move { + job_ctx.lock().unwrap().run_command(&args, params.cwd.as_ref().map(|x| x.as_str())).await + .map_err(|e| LuaError::RuntimeError(format!("run_command error: {:?}", e))) + }) + } + + pub fn check_output_impl<'lua>(ctx: rlua::Context<'lua>, command: LuaValue<'lua>, params: LuaValue<'lua>, job_ctx: Arc>) -> Result, rlua::Error> { + let (args, params) = collect_build_args(command, params)?; + eprintln!("args: {:?}", args); + eprintln!(" params: {:?}", params); + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + let command_output = rt.block_on(async move { + job_ctx.lock().unwrap().run_with_output(&args, params.cwd.as_ref().map(|x| x.as_str())).await + .map_err(|e| LuaError::RuntimeError(format!("run_command error: {:?}", e))) + })?; + + let stdout = ctx.create_string(command_output.stdout.as_slice())?; + let stderr = ctx.create_string(command_output.stderr.as_slice())?; + + let result = ctx.create_table()?; + result.set("stdout", stdout)?; + result.set("stderr", stderr)?; + result.set("status", command_output.exit_status.code())?; + Ok(result) + } + + pub fn check_dependencies(commands: Vec) -> Result<(), rlua::Error> { + let mut missing_deps = Vec::new(); + for command in commands.iter() { + if !has_cmd(command)? { + missing_deps.push(command.clone()); + } + } + + if missing_deps.len() > 0 { + return Err(LuaError::RuntimeError(format!("missing dependencies: {}", missing_deps.join(", ")))); + } + + Ok(()) + } + + pub fn metric(name: String, value: String, job_ctx: Arc>) -> Result<(), rlua::Error> { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + rt.block_on(async move { + job_ctx.lock().unwrap().send_metric(&name, value).await + .map_err(|e| LuaError::RuntimeError(format!("send_metric error: {:?}", e))) + }) + } + + pub fn artifact(path: String, name: Option, job_ctx: Arc>) -> Result<(), rlua::Error> { + let path: PathBuf = path.into(); + + let default_name: String = match (path.file_name(), path.parent()) { + (Some(name), _) => name + .to_str() + .ok_or(LuaError::RuntimeError("artifact name is not a unicode string".to_string()))? + .to_string(), + (_, Some(parent)) => format!("{}", parent.display()), + (None, None) => { + // one day using directories for artifacts might work but / is still not going + // to be accepted + return Err(LuaError::RuntimeError(format!("cannot infer a default path name for {}", path.display()))); + } + }; + + let name: String = match name { + Some(name) => name, + None => default_name, + }; + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + rt.block_on(async move { + let mut artifact = job_ctx.lock().unwrap().create_artifact(&name, &format!("{} (from {})", name, path.display())).await + .map_err(|e| LuaError::RuntimeError(format!("create_artifact error: {:?}", e))) + .unwrap(); + let mut file = tokio::fs::File::open(&format!("tmpdir/{}", path.display())).await.unwrap(); + eprintln!("uploading..."); + crate::io::forward_data(&mut file, &mut artifact).await + .map_err(|e| LuaError::RuntimeError(format!("failed uploading data for {}: {:?}", name, e)))?; + std::mem::drop(artifact); + Ok(()) + }) + } + + pub fn has_cmd(name: &str) -> Result { + Ok(std::process::Command::new("which") + .arg(name) + .status() + .map_err(|e| LuaError::RuntimeError(format!("could not fork which? {:?}", e)))? + .success()) + } + + pub fn file_size(path: &str) -> Result { + Ok(std::fs::metadata(&format!("tmpdir/{}", path)) + .map_err(|e| LuaError::RuntimeError(format!("could not stat {:?}", path)))? + .len()) + } + + pub mod step { + use crate::RunningJob; + use std::sync::{Arc, Mutex}; + + pub fn start(job_ref: Arc>, name: String) -> Result<(), rlua::Error> { + let mut job = job_ref.lock().unwrap(); + job.current_step.clear(); + job.current_step.push(name); + Ok(()) + } + + pub fn push(job_ref: Arc>, name: String) -> Result<(), rlua::Error> { + let mut job = job_ref.lock().unwrap(); + job.current_step.push(name); + Ok(()) + } + + pub fn advance(job_ref: Arc>, name: String) -> Result<(), rlua::Error> { + let mut job = job_ref.lock().unwrap(); + job.current_step.pop(); + job.current_step.push(name); + Ok(()) + } + } +} + +struct DeclEnv<'lua, 'env> { + lua_ctx: &'env rlua::Context<'lua>, + job_ref: &'env Arc>, +} +impl<'lua, 'env> DeclEnv<'lua, 'env> { + fn create_function(&self, name: &str, f: F) -> Result, String> + where + A: FromLuaMulti<'lua>, + R: ToLuaMulti<'lua>, + F: 'static + Send + Fn(rlua::Context<'lua>, Arc>, A) -> Result { + + let job_ref = Arc::clone(self.job_ref); + self.lua_ctx.create_function(move |ctx, args| { + let job_ref = Arc::clone(&job_ref); + f(ctx, job_ref, args) + }) + .map_err(|e| format!("problem defining {} function: {:?}", name, e)) + } +} + +impl BuildEnv { + pub fn new(job: &Arc>) -> Self { + let env = BuildEnv { + lua: Lua::new(), + job: Arc::clone(job), + }; + env.lua.context(|lua_ctx| { + env.define_env(lua_ctx) + }).expect("can define context"); + env + } + + fn define_env(&self, lua_ctx: rlua::Context) -> Result<(), String> { + let decl_env = DeclEnv { + lua_ctx: &lua_ctx, + job_ref: &self.job, + }; + + let hello = decl_env.create_function("hello", |_, _, ()| { + eprintln!("hello from lua!!!"); + Ok(()) + })?; + + let check_dependencies = decl_env.create_function("dependencies", move |_, job_ref, commands: Vec| { + lua_exports::check_dependencies(commands) + })?; + + let build = decl_env.create_function("build", move |_, job_ref, (command, params): (LuaValue, LuaValue)| { + lua_exports::build_command_impl(command, params, job_ref) + })?; + + let check_output = decl_env.create_function("check_output", move |ctx, job_ref, (command, params): (LuaValue, LuaValue)| { + lua_exports::check_output_impl(ctx, command, params, job_ref) + })?; + + let metric = decl_env.create_function("metric", move |_, job_ref, (name, value): (String, String)| { + lua_exports::metric(name, value, job_ref) + })?; + + let now_ms = decl_env.create_function("now_ms", move |_, job_ref, ()| Ok(ci_lib_core::now_ms()))?; + + let artifact = decl_env.create_function("artifact", move |_, job_ref, (path, name): (String, Option)| { + lua_exports::artifact(path, name, job_ref) + })?; + + let error = decl_env.create_function("error", move |_, job_ref, msg: String| { + Err::<(), LuaError>(LuaError::RuntimeError(format!("explicit error: {}", msg))) + })?; + + let path_has_cmd = decl_env.create_function("path_has_cmd", move |_, job_ref, name: String| { + lua_exports::has_cmd(&name) + })?; + + let size_of_file = decl_env.create_function("size_of_file", move |_, job_ref, name: String| { + lua_exports::file_size(&name) + })?; + + let native_rust_triple = match std::env::consts::ARCH { + "x86_64" => "x86_64-unknown-linux-gnu", + "aarch64" => "aarch64-unknown-linux-gnu", + other => { panic!("dunno native rust triple for arch {}", other); } + }; + let native_rust_triple = lua_ctx.create_string(native_rust_triple).unwrap(); + let build_env_vars = lua_ctx.create_table_from( + vec![ + ("native_rust_triple", native_rust_triple) + ] + ).unwrap(); + + let build_environment = lua_ctx.create_table_from( + vec![ + ("has", path_has_cmd), + ("size", size_of_file), + ] + ).unwrap(); + build_environment.set("vars", build_env_vars).unwrap(); + + let build_functions = lua_ctx.create_table_from( + vec![ + ("hello", hello), + ("run", build), + ("dependencies", check_dependencies), + ("metric", metric), + ("error", error), + ("artifact", artifact), + ("now_ms", now_ms), + ("check_output", check_output), + ] + ).unwrap(); + build_functions.set("environment", build_environment).unwrap(); + let current_commit = self.job.lock().unwrap().job.commit.clone(); + build_functions.set("sha", lua_ctx.create_string(current_commit.as_bytes()).unwrap()).unwrap(); + let globals = lua_ctx.globals(); + globals.set("Build", build_functions).unwrap(); + + + let step_start = decl_env.create_function("step_start", move |_, job_ref, name: String| { + lua_exports::step::start(job_ref, name) + })?; + + let step_push = decl_env.create_function("step_push", move |_, job_ref, name: String| { + lua_exports::step::push(job_ref, name) + })?; + + let step_advance = decl_env.create_function("step_advance", move |_, job_ref, name: String| { + lua_exports::step::advance(job_ref, name) + })?; + + let step_functions = lua_ctx.create_table_from( + vec![ + ("start", step_start), + ("push", step_push), + ("advance", step_advance), + ] + ).unwrap(); + globals.set("Step", step_functions).unwrap(); + Ok(()) + } + + pub async fn run_build(self, script: &[u8]) -> Result<(), LuaError> { + let script = script.to_vec(); + let res: Result<(), LuaError> = tokio::task::spawn_blocking(|| { + std::thread::spawn(move || { + self.lua.context(|lua_ctx| { + lua_ctx.load(&script) + .set_name("goodfile")? + .exec() + }) + }).join().unwrap() + }).await.unwrap(); + eprintln!("lua res: {:?}", res); + res + } +} diff --git a/ci-runner/src/main.rs b/ci-runner/src/main.rs new file mode 100644 index 0000000..41f5594 --- /dev/null +++ b/ci-runner/src/main.rs @@ -0,0 +1,668 @@ +use std::time::Duration; +use std::os::unix::process::ExitStatusExt; +use rlua::prelude::LuaError; +use std::sync::{Arc, Mutex}; +use reqwest::{StatusCode, Response}; +use tokio::process::Command; +use std::process::Stdio; +use std::process::ExitStatus; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use serde_json::json; +use serde::{Deserialize, de::DeserializeOwned, Serialize}; +use std::task::{Context, Poll}; +use std::pin::Pin; +use std::marker::Unpin; + +use ci_lib_native::io; +use ci_lib_native::io::{ArtifactStream, VecSink}; +use ci_lib_core::protocol::{ClientProto, CommandInfo, TaskInfo, RequestedJob}; + +mod lua; + +use crate::lua::CommandOutput; + +#[derive(Debug)] +enum WorkAcquireError { + Reqwest(reqwest::Error), + EarlyEof, + Protocol(String), +} + +struct RunnerClient { + http: reqwest::Client, + host: String, + tx: hyper::body::Sender, + rx: Response, + current_job: Option, +} + +impl RunningJob { + fn from_job(job: RequestedJob, client: RunnerClient) -> Self { + Self { + job, + client, + current_step: StepTracker::new(), + } + } +} + +struct JobEnv { + lua: lua::BuildEnv, + job: Arc>, +} + +impl JobEnv { + fn new(job: &Arc>) -> Self { + let lua = lua::BuildEnv::new(job); + JobEnv { + lua, + job: Arc::clone(job) + } + } + + async fn default_goodfile(self) -> Result<(), LuaError> { + self.lua.run_build(crate::lua::DEFAULT_RUST_GOODFILE).await + } + + async fn exec_goodfile(self) -> Result<(), LuaError> { + let script = std::fs::read_to_string("./tmpdir/goodfile").unwrap(); + self.lua.run_build(script.as_bytes()).await + } +} + +pub struct RunningJob { + job: RequestedJob, + client: RunnerClient, + current_step: StepTracker, +} + +enum RepoError { + CloneFailedIdk { exit_code: ExitStatus }, + CheckoutFailedIdk { exit_code: ExitStatus }, + CheckoutFailedMissingRef, +} + +pub struct StepTracker { + scopes: Vec +} + +impl StepTracker { + pub fn new() -> Self { + StepTracker { + scopes: Vec::new() + } + } + + pub fn push(&mut self, name: String) { + self.scopes.push(name); + } + + pub fn pop(&mut self) { + self.scopes.pop(); + } + + pub fn clear(&mut self) { + self.scopes.clear(); + } + + pub fn full_step_path(&self) -> &[String] { + self.scopes.as_slice() + } +} + +impl RunningJob { + async fn send_metric(&mut self, name: &str, value: String) -> Result<(), String> { + self.client.send_typed(&ClientProto::metric(name, value)) + .await + .map_err(|e| format!("failed to send metric {}: {:?})", name, e)) + } + + // TODO: panics if hyper finds the channel is closed. hum + async fn create_artifact(&self, name: &str, desc: &str) -> Result { + let (mut sender, body) = hyper::Body::channel(); + let resp = self.client.http.post("https://ci.butactuallyin.space:9876/api/artifact") + .header("user-agent", "ci-butactuallyin-space-runner") + .header("x-task-token", &self.job.build_token) + .header("x-artifact-name", name) + .header("x-artifact-desc", desc) + .body(body) + .send() + .await + .map_err(|e| format!("unable to send request: {:?}", e))?; + + if resp.status() == StatusCode::OK { + eprintln!("[+] artifact '{}' started", name); + Ok(ArtifactStream::new(sender)) + } else { + Err(format!("[-] unable to create artifact: {:?}", resp)) + } + } + + async fn clone_remote(&self) -> Result<(), RepoError> { + let mut git_clone = Command::new("git"); + git_clone + .arg("clone") + .arg(&self.job.remote_url) + .arg("tmpdir"); + + let clone_res = self.execute_command_and_report(git_clone, "git clone log", &format!("git clone {} tmpdir", &self.job.remote_url)).await + .map_err(|e| { + eprintln!("stringy error (exec failed?) for clone: {}", e); + RepoError::CloneFailedIdk { exit_code: ExitStatus::from_raw(0) } + })?; + + if !clone_res.success() { + return Err(RepoError::CloneFailedIdk { exit_code: clone_res }); + } + + let mut git_checkout = Command::new("git"); + git_checkout + .current_dir("tmpdir") + .arg("checkout") + .arg(&self.job.commit); + + let checkout_res = self.execute_command_and_report(git_checkout, "git checkout log", &format!("git checkout {}", &self.job.commit)).await + .map_err(|e| { + eprintln!("stringy error (exec failed?) for checkout: {}", e); + RepoError::CheckoutFailedIdk { exit_code: ExitStatus::from_raw(0) } + })?; + + if !checkout_res.success() { + if checkout_res.code() == Some(128) { + return Err(RepoError::CheckoutFailedIdk { exit_code: checkout_res }); + } else { + return Err(RepoError::CheckoutFailedMissingRef); + } + } + + Ok(()) + } + + async fn execute_command_and_report(&self, mut command: Command, name: &str, desc: &str) -> Result { + let stdout_artifact = self.create_artifact( + &format!("{} (stdout)", name), + &format!("{} (stdout)", desc) + ).await.expect("works"); + let stderr_artifact = self.create_artifact( + &format!("{} (stderr)", name), + &format!("{} (stderr)", desc) + ).await.expect("works"); + + let exit_status = self.execute_command(command, name, desc, stdout_artifact, stderr_artifact).await?; + + Ok(exit_status) + } + + async fn execute_command_capture_output(&self, mut command: Command, name: &str, desc: &str) -> Result { + let stdout_collector = VecSink::new(); + let stderr_collector = VecSink::new(); + + let exit_status = self.execute_command(command, name, desc, stdout_collector.clone(), stderr_collector.clone()).await?; + + Ok(CommandOutput { + exit_status, + stdout: stdout_collector.take_buf(), + stderr: stderr_collector.take_buf(), + }) + } + + async fn execute_command(&self, mut command: Command, name: &str, desc: &str, mut stdout_reporter: impl AsyncWrite + Unpin + Send + 'static, mut stderr_reporter: impl AsyncWrite + Unpin + Send + 'static) -> Result { + eprintln!("[.] running {}", name); + + let mut child = command + .stdin(Stdio::null()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + .map_err(|e| format!("failed to spawn '{}', {:?}", name, e))?; + + let mut child_stdout = child.stdout.take().unwrap(); + let mut child_stderr = child.stderr.take().unwrap(); + + eprintln!("[.] '{}': forwarding stdout", name); + tokio::spawn(async move { io::forward_data(&mut child_stdout, &mut stdout_reporter).await }); + eprintln!("[.] '{}': forwarding stderr", name); + tokio::spawn(async move { io::forward_data(&mut child_stderr, &mut stderr_reporter).await }); + + let res = child.wait().await + .map_err(|e| format!("failed to wait? {:?}", e))?; + + if res.success() { + eprintln!("[+] '{}' success", name); + } else { + eprintln!("[-] '{}' fail: {:?}", name, res); + } + + Ok(res) + } + + async fn run(mut self) { + self.client.send_typed(&ClientProto::Started).await.unwrap(); + + std::fs::remove_dir_all("tmpdir").unwrap(); + std::fs::create_dir("tmpdir").unwrap(); + + let ctx = Arc::new(Mutex::new(self)); + + let checkout_res = ctx.lock().unwrap().clone_remote().await; + + if let Err(e) = checkout_res { + let status = "bad_ref"; + let status = ClientProto::task_status(TaskInfo::finished(status)); + eprintln!("checkout failed, reporting status: {:?}", status); + + let res = ctx.lock().unwrap().client.send_typed(&status).await; + if let Err(e) = res { + eprintln!("[!] FAILED TO REPORT JOB STATUS ({}): {:?}", "success", e); + } + + return; + } + + let lua_env = JobEnv::new(&ctx); + + let metadata = std::fs::metadata("./tmpdir/goodfile"); + let res: Result = match metadata { + Ok(_) => { + match lua_env.exec_goodfile().await { + Ok(()) => { + Ok("pass".to_string()) + }, + Err(lua_err) => { + Err(("failed".to_string(), lua_err.to_string())) + } + } + }, + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + match lua_env.default_goodfile().await { + Ok(()) => { + Ok("pass".to_string()) + }, + Err(lua_err) => { + Err(("failed".to_string(), lua_err.to_string())) + } + } + }, + Err(e) => { + eprintln!("[-] error finding goodfile: {:?}", e); + Err(("failed".to_string(), "inaccessible goodfile".to_string())) + } + }; + + match res { + Ok(status) => { + eprintln!("[+] job success!"); + let status = ClientProto::task_status(TaskInfo::finished(status)); + eprintln!("reporting status: {:?}", status); + + let res = ctx.lock().unwrap().client.send_typed(&status).await; + if let Err(e) = res { + eprintln!("[!] FAILED TO REPORT JOB STATUS ({}): {:?}", "success", e); + } + } + Err((status, lua_err)) => { + eprintln!("[-] job error: {}", status); + + let status = ClientProto::task_status(TaskInfo::interrupted(status, lua_err.to_string())); + let res = ctx.lock().unwrap().client.send_typed(&status).await; + if let Err(e) = res { + eprintln!("[!] FAILED TO REPORT JOB STATUS ({:?}): {:?}", status, e); + } + } + } + } + + fn prep_command(command: &[String], working_dir: Option<&str>) -> (Command, String) { + let mut cmd = Command::new(&command[0]); + let cwd = match working_dir { + Some(dir) => { + format!("tmpdir/{}", dir) + }, + None => { + "tmpdir".to_string() + } + }; + eprintln!("prepared {:?} to run in {}", &command, &cwd); + let human_name = command.join(" "); + cmd + .current_dir(cwd) + .args(&command[1..]); + (cmd, human_name) + } + + async fn run_with_output(&mut self, command: &[String], working_dir: Option<&str>) -> Result { + let (cmd, human_name) = Self::prep_command(command, working_dir); + + let cmd_res = self.execute_command_capture_output(cmd, &format!("{} log", human_name), &human_name).await?; + + if !cmd_res.exit_status.success() { + return Err(format!("{} failed: {:?}", &human_name, cmd_res.exit_status)); + } + Ok(cmd_res) + } + + async fn run_command(&mut self, command: &[String], working_dir: Option<&str>) -> Result<(), String> { + self.client.send_typed(&ClientProto::command(CommandInfo::started(command, working_dir, 1))) + .await.unwrap(); + + let (cmd, human_name) = Self::prep_command(command, working_dir); + + let cmd_res = self.execute_command_and_report(cmd, &format!("{} log", human_name), &human_name).await?; + + self.client.send_typed(&ClientProto::command(CommandInfo::finished(cmd_res.code(), 1))) + .await.unwrap(); + + + if !cmd_res.success() { + return Err(format!("{} failed: {:?}", &human_name, cmd_res)); + } + + Ok(()) + } +} + +impl RunnerClient { + async fn new(host: &str, mut sender: hyper::body::Sender, mut res: Response) -> Result { + if res.status() != StatusCode::OK { + return Err(format!("server returned a bad response: {:?}, response itself: {:?}", res.status(), res)); + } + + let hello = res.chunk().await.expect("chunk"); + if hello.as_ref().map(|x| &x[..]) != Some(b"hello") { + return Err(format!("bad hello: {:?}", hello)); + } + + Ok(Self { + http: reqwest::ClientBuilder::new() + .connect_timeout(Duration::from_millis(1000)) + .timeout(Duration::from_millis(600000)) + .build() + .expect("can build client"), + host: host.to_string(), + tx: sender, + rx: res, + current_job: None, + }) + } + + async fn wait_for_work(&mut self, accepted_pushers: Option<&[String]>) -> Result, WorkAcquireError> { + loop { + let message = self.recv_typed::().await; + match message { + Ok(Some(ClientProto::NewTask(new_task))) => { + return Ok(Some(new_task)); + }, + Ok(Some(ClientProto::Ping)) => { + self.send_typed(&ClientProto::Pong).await + .map_err(|e| WorkAcquireError::Protocol(format!("failed to pong: {}", e)))?; + }, + Ok(Some(other)) => { + return Err(WorkAcquireError::Protocol(format!("unexpected message: {:?}", other))); + }, + Ok(None) => { + return Ok(None); + }, + Err(e) => { + return Err(WorkAcquireError::Protocol(e)); + } + } + } + } + + async fn recv(&mut self) -> Result, String> { + self.recv_typed().await + } + + async fn recv_typed(&mut self) -> Result, String> { + match self.rx.chunk().await { + Ok(Some(chunk)) => { + serde_json::from_slice(&chunk) + .map(Option::Some) + .map_err(|e| { + format!("not json: {:?}", e) + }) + }, + Ok(None) => Ok(None), + Err(e) => { + Err(format!("error in recv: {:?}", e)) + } + } + } + + async fn send(&mut self, value: serde_json::Value) -> Result<(), String> { + self.send_typed(&value).await + } + + async fn send_typed(&mut self, t: &T) -> Result<(), String> { + self.tx.send_data( + serde_json::to_vec(t) + .map_err(|e| format!("json error: {:?}", e))? + .into() + ).await + .map_err(|e| format!("send error: {:?}", e)) + } +} + +#[derive(Deserialize, Serialize)] +struct RunnerConfig { + server_address: String, + auth_secret: String, + allowed_pushers: Option>, +} + +#[tokio::main] +async fn main() { + tracing_subscriber::fmt::init(); + let mut args = std::env::args(); + args.next().expect("first arg exists"); + let config_path = args.next().unwrap_or("./runner_config.json".to_string()); + let runner_config: RunnerConfig = serde_json::from_reader(std::fs::File::open(config_path).expect("file exists and is accessible")).expect("valid json for RunnerConfig"); + let client = reqwest::ClientBuilder::new() + .connect_timeout(Duration::from_millis(1000)) + .timeout(Duration::from_millis(600000)) + .build() + .expect("can build client"); + + let host_info = host_info::collect_host_info(); + eprintln!("host info: {:?}", host_info); + + loop { + let (mut sender, body) = hyper::Body::channel(); + + sender.send_data(serde_json::to_string(&ClientProto::new_task_please( + runner_config.allowed_pushers.clone(), + host_info.clone(), + )).unwrap().into()).await.expect("req"); + + let poll = client.post("https://ci.butactuallyin.space:9876/api/next_job") + .header("user-agent", "ci-butactuallyin-space-runner") + .header("authorization", runner_config.auth_secret.trim()) + .body(body) + .send() + .await; + + match poll { + Ok(mut res) => { + let mut client = match RunnerClient::new("ci.butactuallyin.space:9876", sender, res).await { + Ok(client) => client, + Err(e) => { + eprintln!("failed to initialize client: {:?}", e); + std::thread::sleep(Duration::from_millis(10000)); + continue; + } + }; + let job = match client.wait_for_work(runner_config.allowed_pushers.as_ref().map(|x| x.as_ref())).await { + Ok(Some(request)) => request, + Ok(None) => { + eprintln!("no work to do (yet)"); + std::thread::sleep(Duration::from_millis(2000)); + continue; + } + Err(e) => { + eprintln!("failed to get work: {:?}", e); + std::thread::sleep(Duration::from_millis(10000)); + continue; + } + }; + eprintln!("requested work: {:?}", job); + + eprintln!("doing {:?}", job); + + let mut job = RunningJob::from_job(job, client); + job.run().await; + std::thread::sleep(Duration::from_millis(10000)); + }, + Err(e) => { + let message = format!("{}", e); + + if message.contains("tcp connect error") { + eprintln!("could not reach server. sleeping a bit and retrying."); + std::thread::sleep(Duration::from_millis(5000)); + continue; + } + + eprintln!("unhandled error: {}", message); + + std::thread::sleep(Duration::from_millis(1000)); + } + } + } +} + +mod host_info { + use ci_lib_core::protocol::{CpuInfo, EnvInfo, HostInfo, MemoryInfo}; + + // get host model name, microcode, and how many cores + fn collect_cpu_info() -> CpuInfo { + fn find_line(lines: &[String], prefix: &str) -> String { + lines.iter() + .find(|line| line.starts_with(prefix)) + .expect(&format!("{} line is present", prefix)) + .split(":") + .last() + .unwrap() + .trim() + .to_string() + } + + /// try finding core `cpu`'s max frequency in khz. we'll assume this is the actual speed a + /// build would run at.. fingers crossed. + fn try_finding_cpu_freq(cpu: u32) -> Result { + if let Ok(freq_str) = std::fs::read_to_string("/sys/devices/system/cpu/cpu0/cpufreq/cpuinfo_max_freq") { + Ok(freq_str.trim().parse().unwrap()) + } else { + // so cpufreq probably isn't around, maybe /proc/cpuinfo's mhz figure is present? + let cpu_lines: Vec = std::fs::read_to_string("/proc/cpuinfo").unwrap().split("\n").map(|line| line.to_string()).collect(); + let cpu_mhzes: Vec<&String> = cpu_lines.iter().filter(|line| line.starts_with("cpu MHz")).collect(); + match cpu_mhzes.get(cpu as usize) { + Some(mhz) => { + let mut line_parts = cpu_mhzes[cpu as usize].split(":"); + let _ = line_parts.next(); + let mhz = line_parts.next().unwrap().trim(); + let mhz: f64 = mhz.parse().unwrap(); + Ok((mhz * 1000.0) as u64) + }, + None => { + panic!("could not get cpu freq either from cpufreq or /proc/cpuinfo?"); + } + } + } + } + + // we'll have to deploy one of a few techniques, because x86/x86_64 is internally + // consistent, but aarch64 is different. who knows what other CPUs think. + match std::env::consts::ARCH { + "x86" | "x86_64" => { + let cpu_lines: Vec = std::fs::read_to_string("/proc/cpuinfo").unwrap().split("\n").map(|line| line.to_string()).collect(); + let model_names: Vec<&String> = cpu_lines.iter().filter(|line| line.starts_with("model name")).collect(); + let cores = model_names.len() as u32; + let model_name = find_line(&cpu_lines, "model name"); + let vendor_id = find_line(&cpu_lines, "vendor_id"); + let family = find_line(&cpu_lines, "cpu family"); + let model = find_line(&cpu_lines, "model\t"); + let microcode = find_line(&cpu_lines, "microcode"); + let max_freq = try_finding_cpu_freq(0).unwrap(); + + CpuInfo { model_name, microcode, cores, vendor_id, family, model, max_freq } + } + "aarch64" => { + let cpu_lines: Vec = std::fs::read_to_string("/proc/cpuinfo").unwrap().split("\n").map(|line| line.to_string()).collect(); + let processors: Vec<&String> = cpu_lines.iter().filter(|line| line.starts_with("processor")).collect(); + let cores = processors.len() as u32; + + // alternate possible path: /sys/firmware/devicetree/base/compatible + let model_name = std::fs::read_to_string("/proc/device-tree/compatible").unwrap(); + let model_name = model_name.replace("\x00", ";"); + let vendor_id = find_line(&cpu_lines, "CPU implementer"); + let vendor_name = match vendor_id.as_str() { + "0x41" => "Arm Limited".to_string(), + "0x42" => "Broadcom Corporation".to_string(), + "0x43" => "Cavium Inc".to_string(), + "0x44" => "Digital Equipment Corporation".to_string(), + "0x46" => "Fujitsu Ltd".to_string(), + "0x49" => "Infineon Technologies AG".to_string(), + "0x4d" => "Motorola".to_string(), + "0x4e" => "NVIDIA Corporation".to_string(), + "0x50" => "Applied Micro Circuits Corporation".to_string(), + "0x51" => "Qualcomm Inc".to_string(), + "0x56" => "Marvell International Ltd".to_string(), + "0x69" => "Intel Corporation".to_string(), + "0xc0" => "Ampere Computing".to_string(), + other => format!("unknown aarch64 vendor {}", other), + }; + let family = find_line(&cpu_lines, "CPU architecture"); + let model = find_line(&cpu_lines, "CPU part"); + let microcode = String::new(); + let max_freq = std::fs::read_to_string("/sys/devices/system/cpu/cpu0/cpufreq/cpuinfo_max_freq").unwrap().trim().parse().unwrap(); + + CpuInfo { model_name, microcode, cores, vendor_id: vendor_name, family, model, max_freq } + } + other => { + panic!("dunno how to find cpu info for {}, panik", other); + } + } + } + + fn collect_mem_info() -> MemoryInfo { + let mem_lines: Vec = std::fs::read_to_string("/proc/meminfo").unwrap().split("\n").map(|line| line.to_string()).collect(); + let total = mem_lines[0].split(":").last().unwrap().trim().to_string(); + let available = mem_lines[2].split(":").last().unwrap().trim().to_string(); + + MemoryInfo { total, available } + } + + fn hostname() -> String { + let mut bytes = [0u8; 4096]; + let res = unsafe { + libc::gethostname(bytes.as_mut_ptr() as *mut std::ffi::c_char, bytes.len()) + }; + if res != 0 { + panic!("gethostname failed {:?}", res); + } + let end = bytes.iter().position(|b| *b == 0).expect("hostname is null-terminated"); + std::ffi::CStr::from_bytes_with_nul(&bytes[..end+1]).expect("null-terminated string").to_str().expect("is utf-8").to_string() + } + + pub fn collect_env_info() -> EnvInfo { + EnvInfo { + arch: std::env::consts::ARCH.to_string(), + family: std::env::consts::FAMILY.to_string(), + os: std::env::consts::OS.to_string(), + } + } + + pub fn collect_host_info() -> HostInfo { + let cpu_info = collect_cpu_info(); + let memory_info = collect_mem_info(); + let hostname = hostname(); + let env_info = collect_env_info(); + + HostInfo { + hostname, + cpu_info, + memory_info, + env_info, + } + } +} + diff --git a/ci-web-server/Cargo.toml b/ci-web-server/Cargo.toml new file mode 100644 index 0000000..2771e3a --- /dev/null +++ b/ci-web-server/Cargo.toml @@ -0,0 +1,31 @@ +[package] +name = "ci-web-server" +version = "0.0.1" +authors = [ "iximeow " ] +license = "0BSD" +edition = "2021" + +[[bin]] +name = "ci_web_server" +path = "src/main.rs" + +[dependencies] +ci-lib-core = { path = "../ci-lib-core" } +ci-lib-native = { path = "../ci-lib-native" } + +tokio = { features = ["full"] } +tokio-stream = "*" +serde_json = "*" +serde = { version = "*", features = ["derive"] } +axum-server = { version = "*", features = ["tls-rustls"] } +axum-extra = { version = "*", features = ["async-read-body"] } +axum = "*" +hex = "*" +tracing-subscriber = "*" +hmac = "*" +http = "*" +http-body = "*" +chrono = "*" +lazy_static = "*" +sha2 = "*" +rusqlite = { version = "*" } diff --git a/ci-web-server/src/main.rs b/ci-web-server/src/main.rs new file mode 100644 index 0000000..e2be54f --- /dev/null +++ b/ci-web-server/src/main.rs @@ -0,0 +1,1089 @@ +#![allow(dead_code)] +#![allow(unused_variables)] +#![allow(unused_imports)] + +use chrono::{Utc, TimeZone}; +use lazy_static::lazy_static; +use std::sync::RwLock; +use std::collections::HashMap; +use serde::{Deserialize, Serialize}; +use tokio::spawn; +use std::path::PathBuf; +use axum_server::tls_rustls::RustlsConfig; +use axum::routing::*; +use axum::Router; +use axum::response::{IntoResponse, Response, Html}; +use std::net::SocketAddr; +use axum::extract::{Path, State}; +use http_body::combinators::UnsyncBoxBody; +use axum::{Error, Json}; +use axum::extract::rejection::JsonRejection; +use axum::body::Bytes; +use axum::http::{StatusCode, Uri}; +use http::header::HeaderMap; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use axum::body::StreamBody; + +use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; + +use hmac::{Hmac, Mac}; +use sha2::Sha256; + +// mod protocol; + +use ci_lib_core::sql::RunState; + +use ci_lib_core::dbctx::DbCtx; +use ci_lib_core::sql::{ArtifactRecord, Job, Run}; + +use rusqlite::OptionalExtension; + +#[derive(Serialize, Deserialize)] +struct WebserverConfig { + psks: Vec, + jobs_path: PathBuf, + config_path: PathBuf, + db_path: PathBuf, + debug_addr: Option, + server_addr: Option, +} + +#[derive(Clone)] +struct WebserverState { + jobs_path: PathBuf, + dbctx: Arc, +} + +#[derive(Clone, Serialize, Deserialize)] +struct GithubPsk { + key: String, + gh_user: String, +} + +lazy_static! { + static ref PSKS: RwLock> = RwLock::new(Vec::new()); +} + +#[derive(Copy, Clone, Debug)] +enum GithubHookError { + BodyNotObject, + MissingElement { path: &'static str }, + BadType { path: &'static str, expected: &'static str }, +} + +#[derive(Debug)] +enum GithubEvent { + Push { tip: String, repo_name: String, head_commit: serde_json::Map, pusher: serde_json::Map }, + Other {} +} + +/// return a duration rendered as the largest two non-zero units. +/// +/// 60000ms -> 1m +/// 60001ms -> 1m +/// 61000ms -> 1m1s +/// 1030ms -> 1.03s +fn duration_as_human_string(duration_ms: u64) -> String { + let duration_sec = duration_ms / 1000; + let duration_min = duration_sec / 60; + let duration_hours = duration_min / 60; + + let duration_ms = duration_ms % 1000; + let duration_sec = duration_sec % 60; + let duration_min = duration_min % 60; + // no need to clamp hours, we're gonna just hope that it's a reasonable number of hours + + if duration_hours != 0 { + let mut res = format!("{}h", duration_hours); + if duration_min != 0 { + res.push_str(&format!("{}m", duration_min)); + } + res + } else if duration_min != 0 { + let mut res = format!("{}m", duration_min); + if duration_min != 0 { + res.push_str(&format!("{}s", duration_sec)); + } + res + } else { + let mut res = format!("{}", duration_sec); + if duration_ms != 0 { + res.push_str(&format!(".{:03}", duration_ms)); + } + res.push('s'); + res + } +} + +/// try producing a url for whatever caused this job to be started, if possible +fn commit_url(job: &Job, commit_sha: &str, ctx: &Arc) -> Option { + let remote = ctx.remote_by_id(job.remote_id).expect("query succeeds").expect("existing job references existing remote"); + + match remote.remote_api.as_str() { + "github" => { + Some(format!("{}/commit/{}", remote.remote_url, commit_sha)) + }, + "email" => { + None + }, + _ => { + None + } + } +} + +/// produce a url to the ci.butactuallyin.space job details page +fn job_url(job: &Job, commit_sha: &str, ctx: &Arc) -> String { + let remote = ctx.remote_by_id(job.remote_id).expect("query succeeds").expect("existing job references existing remote"); + + if remote.remote_api != "github" { + eprintln!("job url for remote type {} can't be constructed, i think", &remote.remote_api); + } + + format!("{}/{}", &remote.remote_path, commit_sha) +} + +/// render how long a run took, or is taking, in a human-friendly way +fn display_run_time(run: &Run) -> String { + if let Some(start_time) = run.start_time { + if let Some(complete_time) = run.complete_time { + if complete_time < start_time { + if run.state == RunState::Started { + // this run has been restarted. the completed time is stale. + // further, this is a currently active run. + let now_ms = ci_lib_core::now_ms(); + let mut duration = duration_as_human_string(now_ms - start_time); + duration.push_str(" (ongoing)"); + duration + } else { + "invalid data".to_string() + } + } else { + let duration_ms = complete_time - start_time; + let duration = duration_as_human_string(duration_ms); + duration + } + } else { + if run.state != RunState::Invalid { + let now_ms = ci_lib_core::now_ms(); + let mut duration = duration_as_human_string(now_ms - start_time); + duration.push_str(" (ongoing)"); + duration + } else { + "n/a".to_string() + } + } + } else { + "not yet run".to_owned() + } +} + +fn parse_push_event(body: serde_json::Value) -> Result { + let body = body.as_object() + .ok_or(GithubHookError::BodyNotObject)?; + + let tip = body.get("after") + .ok_or(GithubHookError::MissingElement { path: "after" })? + .as_str() + .ok_or(GithubHookError::BadType { path: "after", expected: "str" })? + .to_owned(); + + let repo_name = body.get("repository") + .ok_or(GithubHookError::MissingElement { path: "repository" })? + .as_object() + .ok_or(GithubHookError::BadType { path: "repository", expected: "obj" })? + .get("full_name") + .ok_or(GithubHookError::MissingElement { path: "repository/full_name" })? + .as_str() + .ok_or(GithubHookError::BadType { path: "repository/full_name", expected: "str" })? + .to_owned(); + + let head_commit = body.get("head_commit") + .ok_or(GithubHookError::MissingElement { path: "head_commit" })? + .as_object() + .ok_or(GithubHookError::BadType { path: "head_commit", expected: "obj" })? + .to_owned(); + + let pusher = body.get("pusher") + .ok_or(GithubHookError::MissingElement { path: "pusher" })? + .as_object() + .ok_or(GithubHookError::BadType { path: "pusher", expected: "obj" })? + .to_owned(); + + Ok(GithubEvent::Push { tip, repo_name, head_commit, pusher }) +} + +async fn process_push_event(ctx: Arc, owner: String, repo: String, event: GithubEvent) -> impl IntoResponse { + let (sha, repo, head_commit, pusher) = if let GithubEvent::Push { tip, repo_name, head_commit, pusher } = event { + (tip, repo_name, head_commit, pusher) + } else { + panic!("process push event on non-push event"); + }; + + println!("handling push event to {}/{}: sha {} in repo {}, {:?}\n pusher: {:?}", owner, repo, sha, repo, head_commit, pusher); + + // push event is in terms of a ref, but we don't know if it's a new commit (yet). + // in terms of CI jobs, we care mainly about new commits. + // so... + // * look up the commit, + // * if it known, bail out (new ref for existing commit we've already handled some way) + // * create a new commit ref + // * create a new job (state=pending) for the commit ref + let commit_id: Option = ctx.conn.lock().unwrap() + .query_row(ci_lib_core::sql::COMMIT_TO_ID, [sha.clone()], |row| row.get(0)) + .optional() + .expect("can run query"); + + if commit_id.is_some() { + eprintln!("commit already exists"); + return (StatusCode::OK, String::new()); + } + + let remote_url = format!("https://www.github.com/{}.git", repo); + eprintln!("looking for remote url: {}", remote_url); + let (remote_id, repo_id): (u64, u64) = match ctx.conn.lock().unwrap() + .query_row("select id, repo_id from remotes where remote_git_url=?1;", [&remote_url], |row| Ok((row.get(0).unwrap(), row.get(1).unwrap()))) + .optional() + .unwrap() { + Some(elems) => elems, + None => { + eprintln!("no remote registered for url {} (repo {})", remote_url, repo); + return (StatusCode::NOT_FOUND, String::new()); + } + }; + + let repo_default_run_pref: Option = ctx.conn.lock().unwrap() + .query_row("select default_run_preference from repos where id=?1;", [repo_id], |row| { + Ok((row.get(0)).unwrap()) + }) + .expect("can query"); + + let pusher_email = pusher + .get("email") + .expect("has email") + .as_str() + .expect("is str"); + + let job_id = ctx.new_job(remote_id, &sha, Some(pusher_email), repo_default_run_pref).unwrap(); + let _ = ctx.new_run(job_id, None).unwrap(); + + let notifiers = ci_lib_native::dbctx_ext::notifiers_by_repo(&ctx, repo_id).expect("can get notifiers"); + + for notifier in notifiers { + notifier.tell_pending_job(&ctx, repo_id, &sha, job_id).await.expect("can notify"); + } + + (StatusCode::OK, String::new()) +} + +async fn handle_github_event(ctx: Arc, owner: String, repo: String, event_kind: String, body: serde_json::Value) -> Response> { + println!("got github event: {}, {}, {}", owner, repo, event_kind); + match event_kind.as_str() { + "push" => { + let push_event = parse_push_event(body) + .map_err(|e| { + eprintln!("TODO: handle push event error: {:?}", e); + panic!() + }) + .expect("parse works"); + let res = process_push_event(ctx, owner, repo, push_event).await; + "ok".into_response() + }, + "status" => { + eprintln!("[.] status update"); + "ok".into_response() + } + other => { + eprintln!("unhandled event kind: {}, repo {}/{}. content: {:?}", other, owner, repo, body); + "".into_response() + } + } +} + +async fn handle_ci_index(State(ctx): State) -> impl IntoResponse { + eprintln!("root index"); + let repos = match ctx.dbctx.get_repos() { + Ok(repos) => repos, + Err(e) => { + eprintln!("failed to get repos: {:?}", e); + return (StatusCode::INTERNAL_SERVER_ERROR, Html("gonna feel that one tomorrow".to_string())); + } + }; + + let mut response = String::new(); + + response.push_str("\n"); + response.push_str("\n"); + response.push_str("

builds and build accessories

\n"); + + match repos.len() { + 0 => { response.push_str(&format!("

no repos configured, so there are no builds

\n")); }, + 1 => { response.push_str("

1 repo configured

\n"); }, + other => { response.push_str(&format!("

{} repos configured

\n", other)); }, + } + + response.push_str(""); + response.push_str("\n"); + let headings = ["repo", "last build", "job", "build commit", "duration", "status", "result"]; + for heading in headings { + response.push_str(&format!("", heading)); + } + response.push_str("\n"); + + let mut row_num = 0; + + for repo in repos { + let mut most_recent_run: Option<(Job, Run)> = None; + + for remote in ctx.dbctx.remotes_by_repo(repo.id).expect("remotes by repo works") { + let last_job = ctx.dbctx.last_job_from_remote(remote.id).expect("job by remote works"); + if let Some(last_job) = last_job { + if let Some(last_run) = ctx.dbctx.last_run_for_job(last_job.id).expect("can query") { + if most_recent_run.as_ref().map(|run| run.1.create_time < last_run.create_time).unwrap_or(true) { + most_recent_run = Some((last_job, last_run)); + } + } + } + } + + let repo_html = format!("{}", &repo.name, &repo.name); + + let row_html: String = match most_recent_run { + Some((job, run)) => { + let job_commit = ctx.dbctx.commit_sha(job.commit_id).expect("job has a commit"); + let commit_html = match commit_url(&job, &job_commit, &ctx.dbctx) { + Some(url) => format!("{}", url, &job_commit), + None => job_commit.clone() + }; + + let job_html = format!("{}", job_url(&job, &job_commit, &ctx.dbctx), job.id); + + let last_build_time = Utc.timestamp_millis_opt(run.create_time as i64).unwrap().to_rfc2822(); + let duration = display_run_time(&run); + + let status = format!("{:?}", run.state).to_lowercase(); + + let result = match run.build_result { + Some(0) => "pass", + Some(_) => "fail", + None => match run.state { + RunState::Pending => { "unstarted" }, + RunState::Started => { "in progress" }, + _ => { "unreported" } + } + }; + + let entries = [repo_html.as_str(), last_build_time.as_str(), job_html.as_str(), commit_html.as_str(), &duration, &status, &result]; + let entries = entries.iter().chain(std::iter::repeat(&"")).take(headings.len()); + + let mut row_html = String::new(); + for entry in entries { + row_html.push_str(&format!("", entry)); + } + row_html + } + None => { + let entries = [repo_html.as_str()]; + let entries = entries.iter().chain(std::iter::repeat(&"")).take(headings.len()); + + let mut row_html = String::new(); + for entry in entries { + row_html.push_str(&format!("", entry)); + } + row_html + } + }; + + let row_index = row_num % 2; + response.push_str(&format!("", ["even-row", "odd-row"][row_index])); + response.push_str(&row_html); + response.push_str(""); + response.push('\n'); + + row_num += 1; + } + response.push_str("
{}
{}{}
"); + + response.push_str("

active tasks

\n"); + + let runs = ctx.dbctx.get_active_runs().expect("can query"); + if runs.len() == 0 { + response.push_str("

(none)

\n"); + } else { + response.push_str(""); + response.push_str("\n"); + let headings = ["repo", "last build", "job", "build commit", "duration", "status", "result"]; + for heading in headings { + response.push_str(&format!("", heading)); + } + response.push_str("\n"); + + let mut row_num = 0; + + for run in runs.iter() { + let row_index = row_num % 2; + + let job = ctx.dbctx.job_by_id(run.job_id).expect("query succeeds").expect("job id is valid"); + let remote = ctx.dbctx.remote_by_id(job.remote_id).expect("query succeeds").expect("remote id is valid"); + let repo = ctx.dbctx.repo_by_id(remote.repo_id).expect("query succeeds").expect("repo id is valid"); + + let repo_html = format!("{}", &repo.name, &repo.name); + + let job_commit = ctx.dbctx.commit_sha(job.commit_id).expect("job has a commit"); + let commit_html = match commit_url(&job, &job_commit, &ctx.dbctx) { + Some(url) => format!("{}", url, &job_commit), + None => job_commit.clone() + }; + + let job_html = format!("{}", job_url(&job, &job_commit, &ctx.dbctx), job.id); + + let last_build_time = Utc.timestamp_millis_opt(run.create_time as i64).unwrap().to_rfc2822(); + let duration = display_run_time(&run); + + let status = format!("{:?}", run.state).to_lowercase(); + + let result = match run.build_result { + Some(0) => "pass", + Some(_) => "fail", + None => match run.state { + RunState::Pending => { "unstarted" }, + RunState::Started => { "in progress" }, + _ => { "unreported" } + } + }; + + let entries = [repo_html.as_str(), last_build_time.as_str(), job_html.as_str(), commit_html.as_str(), &duration, &status, &result]; + let entries = entries.iter().chain(std::iter::repeat(&"")).take(headings.len()); + + let mut row_html = String::new(); + for entry in entries { + row_html.push_str(&format!("", entry)); + } + + + response.push_str(&format!("", ["even-row", "odd-row"][row_index])); + response.push_str(&row_html); + response.push_str(""); + response.push('\n'); + + row_num += 1; + } + + response.push_str("
{}
{}
\n"); + } + + response.push_str(""); + + (StatusCode::OK, Html(response)) +} + +async fn handle_commit_status(Path(path): Path<(String, String, String)>, State(ctx): State) -> impl IntoResponse { + eprintln!("path: {}/{}, sha {}", path.0, path.1, path.2); + let remote_path = format!("{}/{}", path.0, path.1); + let sha = path.2; + + let (commit_id, sha): (u64, String) = if sha.len() >= 7 { + match ctx.dbctx.conn.lock().unwrap() + .query_row("select id, sha from commits where sha like ?1;", [&format!("{}%", sha)], |row| Ok((row.get_unwrap(0), row.get_unwrap(1)))) + .optional() + .expect("can query") { + Some((commit_id, sha)) => (commit_id, sha), + None => { + return (StatusCode::NOT_FOUND, Html("no such commit".to_string())); + } + } + } else { + return (StatusCode::NOT_FOUND, Html("no such commit".to_string())); + }; + + let short_sha = &sha[0..9]; + + let (remote_id, repo_id): (u64, u64) = ctx.dbctx.conn.lock().unwrap() + .query_row("select id, repo_id from remotes where remote_path=?1;", [&remote_path], |row| Ok((row.get_unwrap(0), row.get_unwrap(1)))) + .expect("can query"); + + let job = ctx.dbctx.job_by_commit_id(commit_id).expect("can query").expect("job exists"); + + let run = ctx.dbctx.last_run_for_job(job.id).expect("can query").expect("run exists"); + + let complete_time = run.complete_time.unwrap_or_else(ci_lib_core::now_ms); + + let (status_elem, status_desc) = match run.state { + RunState::Pending | RunState::Started => { + ("pending", "βŒ›in progress") + }, + RunState::Finished => { + if let Some(build_result) = run.build_result { + if build_result == 0 { + ("pass", "βœ… passed") + } else { + ("failed", "❌ failed") + } + } else { + eprintln!("run {} for commit {} is missing a build result but is reportedly finished (old data)?", run.id, commit_id); + ("unreported", "❔ missing status") + } + }, + RunState::Error => { + ("error", "🧯 error, uncompleted") + } + RunState::Invalid => { + ("(server error)", "dude even i don't know") + } + }; + let debug_info = run.state == RunState::Finished && run.build_result == Some(1) || run.state == RunState::Error; + + let repo_name: String = ctx.dbctx.conn.lock().unwrap() + .query_row("select repo_name from repos where id=?1;", [repo_id], |row| row.get(0)) + .expect("can query"); + + let deployed = false; + + let mut head = String::new(); + head.push_str(""); + head.push_str(&format!("ci.butactuallyin.space - {}", repo_name)); + let include_og_tags = true; + if include_og_tags { + head.push_str("\n"); + head.push_str(&format!("\n")); + head.push_str(&format!("\n")); + head.push_str(&format!("\n", &path.0, &path.1, &sha)); + head.push_str(&format!("", &path.0, &path.1, &short_sha)); + let build_og_description = format!("commit {} of {}/{}, {} after {}", + short_sha, + path.0, path.1, + status_desc, + display_run_time(&run) + ); + head.push_str(&format!("", build_og_description)); + head.push_str(&format!("", build_og_description)); + } + head.push_str("\n"); + let repo_html = format!("{}", &repo_name, &repo_name); + let remote_commit_elem = format!("{}", &remote_path, &sha, &sha); + + let mut artifacts_fragment = String::new(); + let mut artifacts: Vec = ctx.dbctx.artifacts_for_run(run.id, None).unwrap() + .into_iter() // HACK: filter out artifacts for previous runs of a run. artifacts should be attached to a run, runs should be distinct from run. but i'm sleepy. + .filter(|artifact| artifact.created_time >= run.start_time.unwrap_or_else(ci_lib_core::now_ms)) + .collect(); + + artifacts.sort_by_key(|artifact| artifact.created_time); + + fn diff_times(run_completed: u64, artifact_completed: Option) -> u64 { + let artifact_completed = artifact_completed.unwrap_or_else(ci_lib_core::now_ms); + let run_completed = std::cmp::max(run_completed, artifact_completed); + run_completed - artifact_completed + } + + let recent_artifacts: Vec = artifacts.iter().filter(|artifact| diff_times(complete_time, artifact.completed_time) <= 60_000).cloned().collect(); + let old_artifacts: Vec = artifacts.iter().filter(|artifact| diff_times(complete_time, artifact.completed_time) > 60_000).cloned().collect(); + + for artifact in old_artifacts.iter() { + let created_time_str = Utc.timestamp_millis_opt(artifact.created_time as i64).unwrap().to_rfc2822(); + artifacts_fragment.push_str(&format!("
{}
step:
{}
\n", created_time_str, &artifact.name)); + let duration_str = duration_as_human_string(artifact.completed_time.unwrap_or_else(ci_lib_core::now_ms) - artifact.created_time); + let size_str = (std::fs::metadata(&format!("./artifacts/{}/{}", artifact.run_id, artifact.id)).expect("metadata exists").len() / 1024).to_string(); + artifacts_fragment.push_str(&format!("
  {}kb in {} 
\n", size_str, duration_str)); + } + + for artifact in recent_artifacts.iter() { + let created_time_str = Utc.timestamp_millis_opt(artifact.created_time as i64).unwrap().to_rfc2822(); + artifacts_fragment.push_str(&format!("
{}
step:
{}
\n", created_time_str, &artifact.name)); + if debug_info { + artifacts_fragment.push_str("
");
+            artifacts_fragment.push_str(&std::fs::read_to_string(format!("./artifacts/{}/{}", artifact.run_id, artifact.id)).unwrap());
+            artifacts_fragment.push_str("
\n"); + } else { + let duration_str = duration_as_human_string(artifact.completed_time.unwrap_or_else(ci_lib_core::now_ms) - artifact.created_time); + let size_str = std::fs::metadata(&format!("./artifacts/{}/{}", artifact.run_id, artifact.id)).map(|md| { + (md.len() / 1024).to_string() + }).unwrap_or_else(|e| format!("[{}]", e)); + artifacts_fragment.push_str(&format!("
  {}kb in {} 
\n", size_str, duration_str)); + } + } + + let metrics = summarize_job_metrics(&ctx.dbctx, run.id, run.job_id).unwrap(); + + let mut html = String::new(); + html.push_str("\n"); + html.push_str(&format!(" {}\n", head)); + html.push_str(" \n"); + html.push_str("
\n");
+    html.push_str(&format!("repo: {}\n", repo_html));
+    html.push_str(&format!("commit: {}, run: {}\n", remote_commit_elem, run.id));
+    html.push_str(&format!("status: {} in {}\n", status_elem, display_run_time(&run)));
+    if let Some(desc) = run.final_text.as_ref() {
+        html.push_str(&format!("  description: {}\n  ", desc));
+    }
+    html.push_str(&format!("deployed: {}\n", deployed));
+    html.push_str("    
\n"); + if artifacts_fragment.len() > 0 { + html.push_str("
artifacts
\n"); + html.push_str(&artifacts_fragment); + } + if let Some(metrics) = metrics { + html.push_str(&metrics); + } + html.push_str(" \n"); + html.push_str(""); + + (StatusCode::OK, Html(html)) +} + +fn summarize_job_metrics(dbctx: &Arc, run_id: u64, job_id: u64) -> Result, String> { + let runs = dbctx.runs_for_job_one_per_host(job_id)?; + + let mut section = String::new(); + section.push_str("
\n"); + section.push_str("

metrics

\n"); + section.push_str("\n"); + + if runs.len() == 1 { + let metrics = dbctx.metrics_for_run(run_id).unwrap(); + if metrics.is_empty() { + return Ok(None); + } + + section.push_str(""); + for metric in metrics { + section.push_str(&format!("", &metric.name, &metric.value)); + } + } else { + // very silly ordering issue: need an authoritative ordering of metrics to display metrics + // in a consistent order across runs (though they SHOULD all be ordered the same). + // + // the first run might not have all metrics (first run could be on the slowest build host + // f.ex), so for now just assume that metrics *will* be consistently ordered and build a + // list of metrics from the longest list of metrics we've seen. builders do not support + // concurrency so at least the per-run metrics-order-consistency assumption should hold.. + let mut all_names: Vec = Vec::new(); + + let all_metrics: Vec<(HashMap, HostDesc)> = runs.iter().map(|run| { + let metrics = dbctx.metrics_for_run(run.id).unwrap(); + + let mut metrics_map = HashMap::new(); + for metric in metrics.into_iter() { + if !all_names.contains(&metric.name) { + all_names.push(metric.name.clone()); + } + metrics_map.insert(metric.name, metric.value); + } + + let (hostname, cpu_vendor_id, cpu_family, cpu_model, cpu_max_freq_khz) = match run.host_id { + Some(host_id) => { + dbctx.host_model_info(host_id).unwrap() + } + None => { + ("unknown".to_string(), "unknown".to_string(), "0".to_string(), "0".to_string(), 0) + } + }; + + (metrics_map, HostDesc::from_parts(hostname, cpu_vendor_id, cpu_family, cpu_model, cpu_max_freq_khz)) + }).collect(); + + if all_metrics.is_empty() { + return Ok(None); + } + + let mut header = "".to_string(); + for (_, host) in all_metrics.iter() { + header.push_str(&format!("", &host.hostname, &host.cpu_desc, (host.cpu_max_freq_khz as f64) / 1000_000.0)); + } + header.push_str("\n"); + section.push_str(&header); + + for name in all_names.iter() { + let mut row = format!("", &name); + for (metrics, _) in all_metrics.iter() { + let value = metrics.get(name) + .map(|x| x.clone()) + .unwrap_or_else(String::new); + row.push_str(&format!("", value)); + } + row.push_str("\n"); + section.push_str(&row); + } + }; + section.push_str("
namevalue
{}{}
name{}
{} @ {:.3}GHz
{}{}
\n"); + section.push_str("
\n"); + + Ok(Some(section)) +} + +async fn handle_get_artifact(Path(path): Path<(String, String)>, State(ctx): State) -> impl IntoResponse { + eprintln!("get artifact, run={}, artifact={}", path.0, path.1); + let run: u64 = path.0.parse().unwrap(); + let artifact_id: u64 = path.1.parse().unwrap(); + + let artifact_descriptor = match ctx.dbctx.lookup_artifact(run, artifact_id).unwrap() { + Some(artifact) => artifact, + None => { + return (StatusCode::NOT_FOUND, Html("no such artifact")).into_response(); + } + }; + + let mut live_artifact = false; + + if let Some(completed_time) = artifact_descriptor.completed_time { + if completed_time < artifact_descriptor.created_time { + live_artifact = true; + } + } else { + live_artifact = true; + } + + if live_artifact { + let (mut tx_sender, tx_receiver) = tokio::io::duplex(65536); + let resp_body = axum_extra::body::AsyncReadBody::new(tx_receiver); + let mut artifact_path = ctx.jobs_path.clone(); + artifact_path.push(artifact_descriptor.run_id.to_string()); + artifact_path.push(artifact_descriptor.id.to_string()); + spawn(async move { + let mut artifact = artifact_descriptor; + + let mut artifact_file = tokio::fs::File::open(&artifact_path) + .await + .expect("artifact file exists?"); + while artifact.completed_time.is_none() { + match ci_lib_native::io::forward_data(&mut artifact_file, &mut tx_sender).await { + Ok(()) => { + // reached the current EOF, wait and then commit an unspeakable sin + tokio::time::sleep(std::time::Duration::from_millis(250)).await; + // this would be much implemented as yielding on a condvar woken when an + // inotify event on the file indicates a write has occurred. but i am + // dreadfully lazy, so we'll just uhh. busy-poll on the file? lmao. + artifact = ctx.dbctx.lookup_artifact(artifact.run_id, artifact.id) + .expect("can query db") + .expect("artifact still exists"); + } + Err(e) => { + eprintln!("artifact file streaming failed: {}", e); + } + } + } + + eprintln!("[+] artifact {} is done being written, and we've sent the whole thing. bye!", artifact.id); + }); + (StatusCode::OK, resp_body).into_response() + } else { + (StatusCode::OK, Html("all done")).into_response() + } +} + +async fn handle_repo_summary(Path(path): Path, State(ctx): State) -> impl IntoResponse { + eprintln!("get repo summary: {:?}", path); + + let mut last_builds = Vec::new(); + + let (repo_id, repo_name, default_run_preference): (u64, String, Option) = match ctx.dbctx.conn.lock().unwrap() + .query_row("select id, repo_name, default_run_preference from repos where repo_name=?1;", [&path], |row| Ok((row.get(0).unwrap(), row.get(1).unwrap(), row.get(2).unwrap()))) + .optional() + .unwrap() { + Some(elem) => elem, + None => { + eprintln!("no repo named {}", path); + return (StatusCode::NOT_FOUND, Html(String::new())); + } + }; + + // TODO: display default_run_preference somehow on the web summary? + + for remote in ctx.dbctx.remotes_by_repo(repo_id).expect("can get repo from a path") { + let mut last_ten_jobs = ctx.dbctx.recent_jobs_from_remote(remote.id, 10).expect("can look up jobs for a repo"); + last_builds.extend(last_ten_jobs.drain(..)); + } + last_builds.sort_by_key(|job| -(job.created_time as i64)); + + let mut response = String::new(); + response.push_str("\n"); + response.push_str(&format!(" ci.butactuallyin.space - {} \n", repo_name)); + response.push_str("\n"); + response.push_str(&format!("

{} build history

\n", repo_name)); + response.push_str("full repos index

\n"); + + response.push_str(""); + response.push_str("\n"); + let headings = ["last build", "job", "build commit", "duration", "status", "result"]; + for heading in headings { + response.push_str(&format!("", heading)); + } + response.push_str("\n"); + + let mut row_num = 0; + + for job in last_builds.iter().take(10) { + let run = ctx.dbctx.last_run_for_job(job.id).expect("query succeeds").expect("TODO: run exists if job exists (small race if querying while creating job ...."); + let job_commit = ctx.dbctx.commit_sha(job.commit_id).expect("job has a commit"); + let commit_html = match commit_url(&job, &job_commit, &ctx.dbctx) { + Some(url) => format!("{}", url, &job_commit), + None => job_commit.clone() + }; + + let job_html = format!("{}", job_url(&job, &job_commit, &ctx.dbctx), job.id); + + let last_build_time = Utc.timestamp_millis_opt(run.create_time as i64).unwrap().to_rfc2822(); + let duration = display_run_time(&run); + + let status = format!("{:?}", run.state).to_lowercase(); + + let result = match run.build_result { + Some(0) => "pass", + Some(_) => "fail", + None => match run.state { + RunState::Pending => { "unstarted" }, + RunState::Started => { "in progress" }, + _ => { "unreported" } + } + }; + + let entries = [last_build_time.as_str(), job_html.as_str(), commit_html.as_str(), &duration, &status, &result]; + let entries = entries.iter().chain(std::iter::repeat(&"")).take(headings.len()); + + let mut row_html = String::new(); + for entry in entries { + row_html.push_str(&format!("", entry)); + } + + let row_index = row_num % 2; + response.push_str(&format!("", ["even-row", "odd-row"][row_index])); + response.push_str(&row_html); + response.push_str("\n"); + + row_num += 1; + } + response.push_str(""); + + (StatusCode::OK, Html(response)) +} + +async fn handle_repo_event(Path(path): Path<(String, String)>, headers: HeaderMap, State(ctx): State, body: Bytes) -> impl IntoResponse { + let json: Result = serde_json::from_slice(&body); + eprintln!("repo event: {:?} {:?} {:?}", path.0, path.1, headers); + + let payload = match json { + Ok(payload) => { payload }, + Err(e) => { + eprintln!("bad request: path={}/{}\nheaders: {:?}\nbody err: {:?}", path.0, path.1, headers, e); + return (StatusCode::BAD_REQUEST, "").into_response(); + } + }; + + let sent_hmac = match headers.get("x-hub-signature-256") { + Some(sent_hmac) => { sent_hmac.to_str().expect("valid ascii string").to_owned() }, + None => { + eprintln!("bad request: path={}/{}\nheaders: {:?}\nno x-hub-signature-256", path.0, path.1, headers); + return (StatusCode::BAD_REQUEST, "").into_response(); + } + }; + + let mut hmac_ok = false; + + for psk in PSKS.read().unwrap().iter() { + let mut mac = Hmac::::new_from_slice(psk.key.as_bytes()) + .expect("hmac can be constructed"); + mac.update(&body); + let result = mac.finalize().into_bytes().to_vec(); + + // hack: skip sha256= + let decoded = hex::decode(&sent_hmac[7..]).expect("provided hmac is valid hex"); + if decoded == result { + hmac_ok = true; + break; + } + } + + if !hmac_ok { + eprintln!("bad hmac by all psks"); + return (StatusCode::BAD_REQUEST, "").into_response(); + } + + let kind = match headers.get("x-github-event") { + Some(kind) => { kind.to_str().expect("valid ascii string").to_owned() }, + None => { + eprintln!("bad request: path={}/{}\nheaders: {:?}\nno x-github-event", path.0, path.1, headers); + return (StatusCode::BAD_REQUEST, "").into_response(); + } + }; + + handle_github_event(Arc::clone(&ctx.dbctx), path.0, path.1, kind, payload).await +} + + +async fn make_app_server(jobs_path: PathBuf, cfg_path: &PathBuf, db_path: &PathBuf) -> Router { + /* + + // GET /hello/warp => 200 OK with body "Hello, warp!" + let hello = warp::path!("hello" / String) + .map(|name| format!("Hello, {}!\n", name)); + + let github_event = warp::post() + .and(warp::path!(String / String)) + .and_then(|owner, repo| { + warp::header::("x-github-event") + .and(warp::body::content_length_limit(1024 * 1024)) + .and(warp::body::json()) + .and_then(|event, json| handle_github_event(owner, repo, event, json)) + .recover(|e| { + async fn handle_rejection(err: Rejection) -> Result { + Ok(warp::reply::with_status("65308", StatusCode::BAD_REQUEST)) + } + handle_rejection(e) + }) + }); + + let repo_status = warp::get() + .and(warp::path!(String / String / String)) + .map(|owner, repo, sha| format!("CI status for {}/{} commit {}\n", owner, repo, sha)); + + let other = + warp::post() + .and(warp::path::full()) + .and(warp::addr::remote()) + .and(warp::body::content_length_limit(1024 * 1024)) + .and(warp::body::bytes()) + .map(move |path, addr: Option, body| { + println!("{}: lets see what i got {:?}, {:?}", addr.unwrap(), path, body); + "hello :)\n" + }) + .or( + warp::get() + .and(warp::path::full()) + .and(warp::addr::remote()) + .map(move |path, addr: Option| { + println!("{}: GET to {:?}", addr.unwrap(), path); + "hello!\n" + }) + ) + .recover(|e| { + async fn handle_rejection(err: Rejection) -> Result { + Ok(warp::reply::with_status("50834", StatusCode::BAD_REQUEST)) + } + handle_rejection(e) + }); + */ + + async fn fallback_get(uri: Uri) -> impl IntoResponse { + (StatusCode::OK, "get resp") + } + + async fn fallback_post(Path(path): Path) -> impl IntoResponse { + "post resp" + } + + Router::new() + .route("/:owner/:repo/:sha", get(handle_commit_status)) + .route("/:owner", get(handle_repo_summary)) + .route("/:owner/:repo", post(handle_repo_event)) + .route("/artifact/:b/:artifact_id", get(handle_get_artifact)) + .route("/", get(handle_ci_index)) + .fallback(fallback_get) + .with_state(WebserverState { + jobs_path, + dbctx: Arc::new(DbCtx::new(cfg_path, db_path)) + }) +} + +async fn bind_server(conf: serde_json::Value, jobs_path: PathBuf, config_path: PathBuf, db_path: PathBuf) -> std::io::Result<()> { + let server = make_app_server(jobs_path.clone(), &config_path, &db_path).await.into_make_service(); + use serde_json::Value; + match conf { + Value::String(address) => { + axum_server::bind(address.parse().unwrap()) + .serve(server).await + }, + Value::Object(map) => { + let address = match map.get("address") { + Some(Value::String(address)) => address.clone(), + None => { + panic!("no local address"); + }, + other => { + panic!("invalid local address: {:?}", other); + } + }; + + match (map.get("cert_path"), map.get("key_path")) { + (Some(Value::String(cert_path)), Some(Value::String(key_path))) => { + let config = RustlsConfig::from_pem_file( + cert_path.clone(), + key_path.clone(), + ).await.unwrap(); + axum_server::bind_rustls(address.parse().unwrap(), config) + .serve(server).await + }, + (Some(_), _) | (_, Some(_)) => { + panic!("invalid local tls config: only one of `cert_path` or `key_path` has been provided"); + }, + (None, None) => { + axum_server::bind(address.parse().unwrap()) + .serve(server).await + } + } + }, + other => { + panic!("invalid server bind config: {:?}", other); + } + } +} + +#[tokio::main] +async fn main() { + tracing_subscriber::fmt::init(); + + let mut args = std::env::args(); + args.next().expect("first arg exists"); + let config_path = args.next().unwrap_or("./webserver_config.json".to_string()); + let web_config: WebserverConfig = serde_json::from_reader(std::fs::File::open(config_path).expect("file exists and is accessible")).expect("valid json for WebserverConfig"); + let mut psks = PSKS.write().expect("can write lock"); + *psks = web_config.psks.clone(); + // drop write lock so we can read PSKS elsewhere WITHOUT deadlocking. + std::mem::drop(psks); + + let jobs_path = web_config.jobs_path.clone(); + let config_path = web_config.config_path.clone(); + let db_path = web_config.db_path.clone(); + if let Some(addr_conf) = web_config.debug_addr.as_ref() { + spawn(bind_server(addr_conf.clone(), jobs_path.clone(), config_path.clone(), db_path.clone())); + } + if let Some(addr_conf) = web_config.server_addr.as_ref() { + spawn(bind_server(addr_conf.clone(), jobs_path.clone(), config_path.clone(), db_path.clone())); + } + loop { + tokio::time::sleep(std::time::Duration::from_millis(1000)).await; + } +} + +struct HostDesc { + hostname: String, + cpu_desc: String, + cpu_max_freq_khz: u64, +} +impl HostDesc { + fn from_parts(hostname: String, vendor_id: String, cpu_family: String, model: String, cpu_max_freq_khz: u64) -> Self { + let cpu_desc = match (vendor_id.as_str(), cpu_family.as_str(), model.as_str()) { + ("Arm Limited", "8", "0xd03") => "aarch64 A53".to_string(), + ("GenuineIntel", "6", "85") => "x86_64 Skylake".to_string(), + ("AuthenticAMD", "23", "113") => "x86_64 Matisse".to_string(), + (vendor, family, model) => format!("unknown {}:{}:{}", vendor, family, model) + }; + + HostDesc { + hostname, + cpu_desc, + cpu_max_freq_khz, + } + } +} diff --git a/src/ci_ctl.rs b/src/ci_ctl.rs deleted file mode 100644 index f0ffa62..0000000 --- a/src/ci_ctl.rs +++ /dev/null @@ -1,235 +0,0 @@ -use clap::{Parser, Subcommand}; - -mod sql; -mod dbctx; -mod notifier; -mod io; -mod protocol; - -use dbctx::DbCtx; -use notifier::NotifierConfig; - -#[derive(Parser)] -#[command(version, about, long_about = None)] -struct Args { - /// path to a database to manage (defaults to "./state.db") - db_path: Option, - - /// path to where configs should be found (defaults to "./config") - config_path: Option, - - #[command(subcommand)] - command: Command, -} - -#[derive(Subcommand)] -enum Command { - /// add _something_ to the db - Add { - #[command(subcommand)] - what: AddItem, - }, - - /// make sure that the state looks reasonable. - /// - /// currently, ensure that all notifiers have a config, that config references an existing - /// file, and that the referenced file is valid. - Validate, - - /// do something with jobs - Job { - #[command(subcommand)] - what: JobAction, - }, -} - -#[derive(Subcommand)] -enum JobAction { - List, - Rerun { - which: u32 - }, - RerunCommit { - commit: String - }, - Create { - repo: String, - commit: String, - pusher_email: String, - } -} - -#[derive(Subcommand)] -enum AddItem { - Repo { - name: String, - remote: Option, - remote_kind: Option, - config: Option, - }, - Remote { - repo_name: String, - remote: String, - remote_kind: String, - config: String, - }, -} - -fn main() { - let args = Args::parse(); - - let db_path = args.db_path.unwrap_or_else(|| "./state.db".to_owned()); - let config_path = args.config_path.unwrap_or_else(|| "./config".to_owned()); - - match args.command { - Command::Job { what } => { - match what { - JobAction::List => { - let db = DbCtx::new(&config_path, &db_path); - let mut conn = db.conn.lock().unwrap(); - let mut query = conn.prepare(crate::sql::SELECT_ALL_RUNS_WITH_JOB_INFO).unwrap(); - let mut jobs = query.query([]).unwrap(); - while let Some(row) = jobs.next().unwrap() { - let (job_id, run_id, state, created_time, commit_id, run_preferences): (u64, u64, u64, u64, u64, Option) = row.try_into().unwrap(); - - eprint!("[+] {:04} ({:04}) | {: >8?} | {} | {}", run_id, job_id, state, created_time, commit_id); - if let Some(run_preferences) = run_preferences { - eprintln!(" | run preference: {}", run_preferences); - } else { - eprintln!(""); - } - } - eprintln!("jobs"); - }, - JobAction::Rerun { which } => { - let db = DbCtx::new(&config_path, &db_path); - let task_id = db.new_run(which as u64, None).expect("db can be queried").id; - eprintln!("[+] rerunning job {} as task {}", which, task_id); - } - JobAction::RerunCommit { commit } => { - let db = DbCtx::new(&config_path, &db_path); - let job_id = db.job_for_commit(&commit).unwrap(); - if let Some(job_id) = job_id { - let task_id = db.new_run(job_id, None).expect("db can be queried").id; - eprintln!("[+] rerunning job {} (commit {}) as task {}", job_id, commit, task_id); - } else { - eprintln!("[-] no job for commit {}", commit); - } - } - JobAction::Create { repo, commit, pusher_email } => { - let db = DbCtx::new(&config_path, &db_path); - let parts = repo.split(":").collect::>(); - let (remote_kind, repo_path) = (parts[0], parts[1]); - let remote = match db.remote_by_path_and_api(&remote_kind, &repo_path).expect("can query") { - Some(remote) => remote, - None => { - eprintln!("[-] no remote registered as {}:{}", remote_kind, repo_path); - return; - } - }; - - let repo_default_run_pref: Option = db.conn.lock().unwrap() - .query_row("select default_run_preference from repos where id=?1;", [remote.repo_id], |row| { - Ok((row.get(0)).unwrap()) - }) - .expect("can query"); - - let job_id = db.new_job(remote.id, &commit, Some(&pusher_email), repo_default_run_pref).expect("can create"); - let _ = db.new_run(job_id, None).unwrap(); - } - } - }, - Command::Add { what } => { - match what { - AddItem::Repo { name, remote, remote_kind, config } => { - let remote_config = match (remote, remote_kind, config) { - (Some(remote), Some(remote_kind), Some(config_path)) => { - // do something - if remote_kind != "github" { - eprintln!("unknown remote kind: {}", remote); - return; - } - Some((remote, remote_kind, config_path)) - }, - (None, None, None) => { - None - }, - _ => { - eprintln!("when specifying a remote, `remote`, `remote_kind`, and `config_path` must either all be provided together or not at all"); - return; - } - }; - - let db = DbCtx::new(&config_path, &db_path); - let repo_id = match db.new_repo(&name) { - Ok(repo_id) => repo_id, - Err(e) => { - if e.contains("UNIQUE constraint failed") { - eprintln!("[!] repo '{}' already exists", name); - return; - } else { - eprintln!("[!] failed to create repo entry: {}", e); - return; - } - } - }; - println!("[+] new repo created: '{}' id {}", &name, repo_id); - if let Some((remote, remote_kind, config_path)) = remote_config { - let full_config_file_path = format!("{}/{}", &db.config_path.display(), config_path); - let config = match remote_kind.as_ref() { - "github" => { - assert!(NotifierConfig::github_from_file(&full_config_file_path).is_ok()); - } - "github-email" => { - assert!(NotifierConfig::email_from_file(&full_config_file_path).is_ok()); - } - other => { - panic!("[-] notifiers for '{}' remotes are not supported", other); - } - }; - db.new_remote(repo_id, remote.as_str(), remote_kind.as_str(), config_path.as_str()).unwrap(); - println!("[+] new remote created: repo '{}', {} remote at {}", &name, remote_kind, remote); - match remote_kind.as_str() { - "github" => { - println!("[!] now go make sure your github repo has a webhook set for `https://ci.butactuallyin.space/{}` to receive at least the `push` event.", remote.as_str()); - println!(" the secret sent with calls to this webhook should be the same preshared secret as the CI server is configured to know."); - } - _ => { } - } - } - }, - AddItem::Remote { repo_name, remote, remote_kind, config } => { - let db = DbCtx::new(&config_path, &db_path); - let repo_id = match db.repo_id_by_name(&repo_name) { - Ok(Some(id)) => id, - Ok(None) => { - eprintln!("[-] repo '{}' does not exist", repo_name); - return; - }, - Err(e) => { - eprintln!("[!] couldn't look up repo '{}': {:?}", repo_name, e); - return; - } - }; - let config_file = format!("{}/{}", config_path, config); - match remote_kind.as_ref() { - "github" => { - NotifierConfig::github_from_file(&config_file).unwrap(); - } - "github-email" => { - NotifierConfig::email_from_file(&config_file).unwrap(); - } - other => { - panic!("notifiers for '{}' remotes are not supported", other); - } - }; - db.new_remote(repo_id, remote.as_str(), remote_kind.as_str(), config.as_str()).unwrap(); - println!("[+] new remote created: repo '{}', {} remote at {}", &repo_name, remote_kind, remote); - }, - } - }, - Command::Validate => { - println!("ok"); - } - } -} diff --git a/src/ci_driver.rs b/src/ci_driver.rs deleted file mode 100644 index f6c1828..0000000 --- a/src/ci_driver.rs +++ /dev/null @@ -1,630 +0,0 @@ -use std::process::Command; -use std::collections::HashMap; -use std::sync::{Mutex, RwLock}; -use lazy_static::lazy_static; -use std::io::Read; -use serde_derive::{Deserialize, Serialize}; -use futures_util::StreamExt; -use std::fmt; -use std::path::{Path, PathBuf}; -use tokio::spawn; -use tokio_stream::wrappers::ReceiverStream; -use std::sync::{Arc, Weak}; -use std::time::{SystemTime, UNIX_EPOCH}; -use axum_server::tls_rustls::RustlsConfig; -use axum::body::StreamBody; -use axum::http::{StatusCode}; -use hyper::HeaderMap; -use axum::Router; -use axum::routing::*; -use axum::extract::State; -use axum::extract::BodyStream; -use axum::response::IntoResponse; -use tokio::sync::mpsc; -use tokio::sync::mpsc::error::TrySendError; -use serde_json::json; - -mod dbctx; -mod sql; -mod notifier; -mod io; -mod protocol; - -use crate::dbctx::{DbCtx, PendingRun, Job, Run}; -use crate::sql::JobResult; -use crate::sql::RunState; -use crate::protocol::{ClientProto, CommandInfo, TaskInfo, RequestedJob}; - -lazy_static! { - static ref AUTH_SECRET: RwLock> = RwLock::new(None); - static ref ACTIVE_TASKS: Mutex>> = Mutex::new(HashMap::new()); -} - -fn reserve_artifacts_dir(run: u64) -> std::io::Result { - let mut path: PathBuf = "/root/ixi_ci_server/artifacts/".into(); - path.push(run.to_string()); - match std::fs::create_dir(&path) { - Ok(()) => { - Ok(path) - }, - Err(e) => { - if e.kind() == std::io::ErrorKind::AlreadyExists { - Ok(path) - } else { - Err(e) - } - } - } -} - -async fn activate_run(dbctx: Arc, candidate: RunnerClient, job: &Job, run: &PendingRun) -> Result<(), String> { - eprintln!("activating task {:?}", run); - - let now = SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("now is before epoch") - .as_millis(); - - let remote = dbctx.remote_by_id(job.remote_id).expect("query succeeds").expect("job has remote"); - let repo = dbctx.repo_by_id(remote.repo_id).expect("query succeeds").expect("remote has repo"); - - let commit_sha = dbctx.commit_sha(job.commit_id).expect("query succeeds"); - - let artifacts: PathBuf = reserve_artifacts_dir(run.id).expect("can reserve a directory for artifacts"); - - eprintln!("running {}", &repo.name); - - let res = candidate.submit(&dbctx, &run, &remote.remote_git_url, &commit_sha).await; - - let mut client_job = match res { - Ok(Some(mut client_job)) => { client_job } - Ok(None) => { - return Err("client hung up instead of acking task".to_string()); - } - Err(e) => { - // failed to submit job, move on for now - return Err(format!("failed to submit task: {:?}", e)); - } - }; - - let host_id = client_job.client.host_id; - - let connection = dbctx.conn.lock().unwrap(); - connection.execute( - "update runs set started_time=?1, host_id=?2, state=1, artifacts_path=?3, build_token=?4 where id=?5", - (now as u64, host_id, format!("{}", artifacts.display()), &client_job.client.build_token, run.id) - ) - .expect("can update"); - std::mem::drop(connection); - - spawn(async move { - client_job.run().await - }); - - Ok(()) -} - -struct RunnerClient { - tx: mpsc::Sender>, - rx: BodyStream, - host_id: u32, - build_token: String, - accepted_sources: Option>, -} - -fn token_for_job() -> String { - let mut data = [0u8; 32]; - std::fs::File::open("/dev/urandom") - .unwrap() - .read_exact(&mut data) - .unwrap(); - - base64::encode(data) -} - -struct ClientJob { - dbctx: Arc, - remote_git_url: String, - sha: String, - task: PendingRun, - client: RunnerClient, - // exists only as confirmation this `ClientJob` is somewhere, still alive and being processed. - task_witness: Arc<()>, -} - -impl ClientJob { - pub async fn run(&mut self) { - loop { - eprintln!("waiting on response.."); - let msg = match self.client.recv_typed::().await.expect("recv works") { - Some(msg) => msg, - None => { - eprintln!("client hung up. task's done, i hope?"); - return; - } - }; - eprintln!("got {:?}", msg); - match msg { - ClientProto::NewTaskPlease { allowed_pushers, host_info } => { - eprintln!("misdirected task request (after handshake?)"); - return; - } - ClientProto::TaskStatus(task_info) => { - let (result, state): (Result, RunState) = match task_info { - TaskInfo::Finished { status } => { - eprintln!("task update: state is finished and result is {}", status); - match status.as_str() { - "pass" => { - (Ok("success".to_string()), RunState::Finished) - }, - other => { - eprintln!("unhandled task completion status: {}", other); - (Err(other.to_string()), RunState::Error) - } - } - }, - TaskInfo::Interrupted { status, description } => { - eprintln!("task update: state is interrupted and result is {}", status); - let desc = description.unwrap_or_else(|| status.clone()); - (Err(desc), RunState::Error) - } - }; - - let job = self.dbctx.job_by_id(self.task.job_id).expect("can query").expect("job exists"); - let repo_id = self.dbctx.repo_id_by_remote(job.remote_id).unwrap().expect("remote exists"); - - for notifier in self.dbctx.notifiers_by_repo(repo_id).expect("can get notifiers") { - if let Err(e) = notifier.tell_complete_job(&self.dbctx, repo_id, &self.sha, self.task.id, result.clone()).await { - eprintln!("could not notify {:?}: {:?}", notifier.remote_path, e); - } - } - - let now = SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("now is before epoch") - .as_millis(); - - let build_result = if result.is_ok() { - JobResult::Pass - } else { - JobResult::Fail - }; - let result_desc = match result { - Ok(msg) => msg, - Err(msg) => msg, - }; - - self.dbctx.conn.lock().unwrap().execute( - "update runs set complete_time=?1, state=?2, build_result=?3, final_status=?4 where id=?5", - (now as u64, state as u64, build_result as u8, result_desc, self.task.id) - ) - .expect("can update"); - } - ClientProto::ArtifactCreate => { - eprintln!("creating artifact"); - self.client.send(serde_json::json!({ - "status": "ok", - "object_id": "10", - })).await.unwrap(); - }, - ClientProto::Metric { name, value } => { - self.dbctx.insert_metric(self.task.id, &name, &value) - .expect("TODO handle metric insert error?"); - } - ClientProto::Command(_command_info) => { - // record information about commands, start/stop, etc. probably also allow - // artifacts to be attached to commands and default to attaching stdout/stderr? - } - other => { - eprintln!("unhandled message {:?}", other); - } - } - } - } -} - -impl RunnerClient { - async fn new(sender: mpsc::Sender>, resp: BodyStream, accepted_sources: Option>, host_id: u32) -> Result { - let token = token_for_job(); - let client = RunnerClient { - tx: sender, - rx: resp, - host_id, - build_token: token, - accepted_sources, - }; - Ok(client) - } - - async fn test_connection(&mut self) -> Result<(), String> { - self.send_typed(&ClientProto::Ping).await?; - let resp = self.recv_typed::().await?; - match resp { - Some(ClientProto::Pong) => { - Ok(()) - } - Some(other) => { - Err(format!("unexpected connection test response: {:?}", other)) - } - None => { - Err("client hung up".to_string()) - } - } - } - - async fn send(&mut self, msg: serde_json::Value) -> Result<(), String> { - self.send_typed(&msg).await - } - - async fn send_typed(&mut self, msg: &T) -> Result<(), String> { - self.tx.send(Ok(serde_json::to_string(msg).unwrap())) - .await - .map_err(|e| e.to_string()) - } - - async fn recv(&mut self) -> Result, String> { - match self.rx.next().await { - Some(Ok(bytes)) => { - serde_json::from_slice(&bytes) - .map(Option::Some) - .map_err(|e| e.to_string()) - }, - Some(Err(e)) => { - eprintln!("e: {:?}", e); - Err(format!("no client job: {:?}", e)) - }, - None => { - Ok(None) - } - } - } - - async fn recv_typed(&mut self) -> Result, String> { - let json = self.recv().await?; - Ok(json.map(|v| serde_json::from_value(v).unwrap())) - } - - // is this client willing to run the job based on what it has told us so far? - fn will_accept(&self, job: &Job) -> bool { - match (job.source.as_ref(), self.accepted_sources.as_ref()) { - (_, None) => true, - (None, Some(_)) => false, - (Some(source), Some(accepted_sources)) => { - accepted_sources.contains(source) - } - } - } - - async fn submit(mut self, dbctx: &Arc, job: &PendingRun, remote_git_url: &str, sha: &str) -> Result, String> { - self.send_typed(&ClientProto::new_task(RequestedJob { - commit: sha.to_string(), - remote_url: remote_git_url.to_string(), - build_token: self.build_token.to_string(), - })).await?; - match self.recv_typed::().await { - Ok(Some(ClientProto::Started)) => { - let task_witness = Arc::new(()); - ACTIVE_TASKS.lock().unwrap().insert(job.id, Arc::downgrade(&task_witness)); - Ok(Some(ClientJob { - task: job.clone(), - dbctx: Arc::clone(dbctx), - sha: sha.to_string(), - remote_git_url: remote_git_url.to_string(), - client: self, - task_witness, - })) - } - Ok(Some(resp)) => { - eprintln!("invalid response: {:?}", resp); - Err("client rejected job".to_string()) - } - Ok(None) => { - Ok(None) - } - Err(e) => { - eprintln!("e: {:?}", e); - Err(e) - } - } - } -} - -impl fmt::Debug for RunnerClient { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.write_str("RunnerClient { .. }") - } -} - -#[axum_macros::debug_handler] -async fn handle_artifact(State(ctx): State<(Arc, mpsc::Sender)>, headers: HeaderMap, artifact_content: BodyStream) -> impl IntoResponse { - eprintln!("artifact request"); - let run_token = match headers.get("x-task-token") { - Some(run_token) => run_token.to_str().expect("valid string"), - None => { - eprintln!("bad artifact post: headers: {:?}\nno x-tasak-token", headers); - return (StatusCode::BAD_REQUEST, "").into_response(); - } - }; - - let (run, artifact_path, token_validity) = match ctx.0.run_for_token(&run_token).unwrap() { - Some(result) => result, - None => { - eprintln!("bad artifact post: headers: {:?}\nrun token is not known", headers); - return (StatusCode::BAD_REQUEST, "").into_response(); - } - }; - - if token_validity != dbctx::TokenValidity::Valid { - eprintln!("bad artifact post: headers: {:?}. token is not valid: {:?}", headers, token_validity); - return (StatusCode::BAD_REQUEST, "").into_response(); - } - - let artifact_path = if let Some(artifact_path) = artifact_path { - artifact_path - } else { - eprintln!("bad artifact post: headers: {:?}. no artifact path?", headers); - return (StatusCode::BAD_REQUEST, "").into_response(); - }; - - let artifact_name = match headers.get("x-artifact-name") { - Some(artifact_name) => artifact_name.to_str().expect("valid string"), - None => { - eprintln!("bad artifact post: headers: {:?}\nno x-artifact-name", headers); - return (StatusCode::BAD_REQUEST, "").into_response(); - } - }; - - let artifact_desc = match headers.get("x-artifact-desc") { - Some(artifact_desc) => artifact_desc.to_str().expect("valid string"), - None => { - eprintln!("bad artifact post: headers: {:?}\nno x-artifact-desc", headers); - return (StatusCode::BAD_REQUEST, "").into_response(); - } - }; - - let mut artifact = match ctx.0.reserve_artifact(run, artifact_name, artifact_desc).await { - Ok(artifact) => artifact, - Err(err) => { - eprintln!("failure to reserve artifact: {:?}", err); - return (StatusCode::INTERNAL_SERVER_ERROR, "").into_response(); - } - }; - - eprintln!("spawning task..."); - let dbctx_ref = Arc::clone(&ctx.0); - spawn(async move { - artifact.store_all(artifact_content).await.unwrap(); - dbctx_ref.finalize_artifact(artifact.artifact_id).await.unwrap(); - }); - eprintln!("done?"); - - (StatusCode::OK, "").into_response() -} - -#[derive(Serialize, Deserialize)] -struct WorkRequest { - kind: String, - accepted_pushers: Option> -} - -async fn handle_next_job(State(ctx): State<(Arc, mpsc::Sender)>, headers: HeaderMap, mut job_resp: BodyStream) -> impl IntoResponse { - let auth_token = match headers.get("authorization") { - Some(token) => { - if Some(token.to_str().unwrap_or("")) != AUTH_SECRET.read().unwrap().as_ref().map(|x| &**x) { - eprintln!("BAD AUTH SECRET SUBMITTED: {:?}", token); - return (StatusCode::BAD_REQUEST, "").into_response(); - } - } - None => { - eprintln!("bad artifact post: headers: {:?}\nno authorization", headers); - return (StatusCode::BAD_REQUEST, "").into_response(); - } - }; - - let (tx_sender, tx_receiver) = mpsc::channel(8); - let resp_body = StreamBody::new(ReceiverStream::new(tx_receiver)); - tx_sender.send(Ok("hello".to_string())).await.expect("works"); - - let request = job_resp.next().await.expect("request chunk").expect("chunk exists"); - let request = std::str::from_utf8(&request).unwrap(); - let request: ClientProto = match serde_json::from_str(&request) { - Ok(v) => v, - Err(e) => { - eprintln!("couldn't parse work request: {:?}", e); - return (StatusCode::MISDIRECTED_REQUEST, resp_body).into_response(); - } - }; - let (accepted_pushers, host_info) = match request { - ClientProto::NewTaskPlease { allowed_pushers, host_info } => (allowed_pushers, host_info), - other => { - eprintln!("bad request kind: {:?}", &other); - return (StatusCode::MISDIRECTED_REQUEST, resp_body).into_response(); - } - }; - - eprintln!("client identifies itself as {:?}", host_info); - - let host_info_id = ctx.0.id_for_host(&host_info).expect("can get a host info id"); - - let client = match RunnerClient::new(tx_sender, job_resp, accepted_pushers, host_info_id).await { - Ok(v) => v, - Err(e) => { - eprintln!("unable to register client"); - return (StatusCode::MISDIRECTED_REQUEST, resp_body).into_response(); - } - }; - - match ctx.1.try_send(client) { - Ok(()) => { - eprintln!("client requested work..."); - return (StatusCode::OK, resp_body).into_response(); - } - Err(TrySendError::Full(client)) => { - return (StatusCode::IM_A_TEAPOT, resp_body).into_response(); - } - Err(TrySendError::Closed(client)) => { - panic!("client holder is gone?"); - } - } -} - -async fn make_api_server(dbctx: Arc) -> (Router, mpsc::Receiver) { - let (pending_client_sender, pending_client_receiver) = mpsc::channel(8); - - let router = Router::new() - .route("/api/next_job", post(handle_next_job)) - .route("/api/artifact", post(handle_artifact)) - .with_state((dbctx, pending_client_sender)); - (router, pending_client_receiver) -} - -#[derive(Deserialize, Serialize)] -struct DriverConfig { - cert_path: PathBuf, - key_path: PathBuf, - config_path: PathBuf, - db_path: PathBuf, - server_addr: String, - auth_secret: String, -} - -#[tokio::main] -async fn main() { - tracing_subscriber::fmt::init(); - - let mut args = std::env::args(); - args.next().expect("first arg exists"); - let config_path = args.next().unwrap_or("./driver_config.json".to_string()); - let driver_config: DriverConfig = serde_json::from_reader(std::fs::File::open(config_path).expect("file exists and is accessible")).expect("valid json for DriverConfig"); - let mut auth_secret = AUTH_SECRET.write().unwrap(); - *auth_secret = Some(driver_config.auth_secret.clone()); - std::mem::drop(auth_secret); - - let config = RustlsConfig::from_pem_file( - driver_config.cert_path.clone(), - driver_config.key_path.clone(), - ).await.unwrap(); - - let dbctx = Arc::new(DbCtx::new(&driver_config.config_path, &driver_config.db_path)); - - dbctx.create_tables().unwrap(); - - let (api_server, mut channel) = make_api_server(Arc::clone(&dbctx)).await; - spawn(axum_server::bind_rustls(driver_config.server_addr.parse().unwrap(), config) - .serve(api_server.into_make_service())); - - spawn(old_task_reaper(Arc::clone(&dbctx))); - - loop { - let mut candidate = match channel.recv().await - .ok_or_else(|| "client channel disconnected".to_string()) { - - Ok(candidate) => { candidate }, - Err(e) => { eprintln!("client error: {}", e); continue; } - }; - - let dbctx = Arc::clone(&dbctx); - spawn(async move { - let host_id = candidate.host_id; - let res = find_client_task(dbctx, candidate).await; - eprintln!("task client for {}: {:?}", host_id, res); - }); - } -} - -async fn find_client_task(dbctx: Arc, mut candidate: RunnerClient) -> Result<(), String> { - let find_client_task_start = std::time::Instant::now(); - - let (run, job) = 'find_work: loop { - // try to find a job for this candidate: - // * start with pending runs - these need *some* client to run them, but do not care which - // * if no new jobs, maybe an existing job still needs a rerun on this client? - // * otherwise, um, i dunno. do nothing? - - let runs = dbctx.get_pending_runs(Some(candidate.host_id)).unwrap(); - - if runs.len() > 0 { - println!("{} new runs", runs.len()); - - for run in runs.into_iter() { - let job = dbctx.job_by_id(run.job_id).expect("can query").expect("job exists"); - - if candidate.will_accept(&job) { - break 'find_work (run, job); - } - - } - } - - let alt_run_jobs = dbctx.jobs_needing_task_runs_for_host(candidate.host_id as u64).expect("can query"); - - for job in alt_run_jobs.into_iter() { - if candidate.will_accept(&job) { - let run = dbctx.new_run(job.id, Some(candidate.host_id)).unwrap(); - break 'find_work (run, job); - } - } - - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - - if candidate.test_connection().await.is_err() { - return Err("lost client connection".to_string()); - } - - if find_client_task_start.elapsed().as_secs() > 300 { - return Err("5min new task deadline elapsed".to_string()); - } - }; - - eprintln!("enqueueing job {} for alternate run under host id {}", job.id, candidate.host_id); - activate_run(Arc::clone(&dbctx), candidate, &job, &run).await?; - - Ok(()) -} - -async fn old_task_reaper(dbctx: Arc) { - let mut potentially_stale_tasks = dbctx.get_active_runs().unwrap(); - - let active_tasks = ACTIVE_TASKS.lock().unwrap(); - - for (id, witness) in active_tasks.iter() { - if let Some(idx) = potentially_stale_tasks.iter().position(|task| task.id == *id) { - potentially_stale_tasks.swap_remove(idx); - } - } - - std::mem::drop(active_tasks); - - // ok, so we have tasks that are not active, now if the task is started we know someone should - // be running it and they are not. retain only those tasks, as they are ones we may want to - // mark dead. - // - // further, filter out any tasks created in the last 60 seconds. this is a VERY generous grace - // period for clients that have accepted a job but for some reason we have not recorded them as - // active (perhaps they are slow to ack somehow). - - let stale_threshold = crate::io::now_ms() - 60_000; - - let stale_tasks: Vec = potentially_stale_tasks.into_iter().filter(|task| { - match (task.state, task.start_time) { - // `run` is atomically set to `Started` and adorned with a `start_time`. disagreement - // between the two means this run is corrupt and should be reaped. - (RunState::Started, None) => { - true - }, - (RunState::Started, Some(start_time)) => { - start_time < stale_threshold - } - // and if it's not `started`, it's either pending (not assigned yet, so not stale), or - // one of the complete statuses. - _ => { - false - } - } - }).collect(); - - for task in stale_tasks.iter() { - eprintln!("looks like task {} is stale, reaping", task.id); - dbctx.reap_task(task.id).expect("works"); - } -} diff --git a/src/ci_runner.rs b/src/ci_runner.rs deleted file mode 100644 index 9aaf33d..0000000 --- a/src/ci_runner.rs +++ /dev/null @@ -1,668 +0,0 @@ -use std::time::Duration; -use std::os::unix::process::ExitStatusExt; -use rlua::prelude::LuaError; -use std::sync::{Arc, Mutex}; -use reqwest::{StatusCode, Response}; -use tokio::process::Command; -use std::process::Stdio; -use std::process::ExitStatus; -use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; -use serde_json::json; -use serde::{Deserialize, de::DeserializeOwned, Serialize}; -use std::task::{Context, Poll}; -use std::pin::Pin; -use std::marker::Unpin; - -mod protocol; -mod lua; -mod io; - -use crate::io::{ArtifactStream, VecSink}; -use crate::protocol::{ClientProto, CommandInfo, TaskInfo, RequestedJob}; -use crate::lua::CommandOutput; - -#[derive(Debug)] -enum WorkAcquireError { - Reqwest(reqwest::Error), - EarlyEof, - Protocol(String), -} - -struct RunnerClient { - http: reqwest::Client, - host: String, - tx: hyper::body::Sender, - rx: Response, - current_job: Option, -} - -impl RequestedJob { - pub fn into_running(self, client: RunnerClient) -> RunningJob { - RunningJob { - job: self, - client, - current_step: StepTracker::new(), - } - } -} - -struct JobEnv { - lua: lua::BuildEnv, - job: Arc>, -} - -impl JobEnv { - fn new(job: &Arc>) -> Self { - let lua = lua::BuildEnv::new(job); - JobEnv { - lua, - job: Arc::clone(job) - } - } - - async fn default_goodfile(self) -> Result<(), LuaError> { - self.lua.run_build(crate::lua::DEFAULT_RUST_GOODFILE).await - } - - async fn exec_goodfile(self) -> Result<(), LuaError> { - let script = std::fs::read_to_string("./tmpdir/goodfile").unwrap(); - self.lua.run_build(script.as_bytes()).await - } -} - -pub struct RunningJob { - job: RequestedJob, - client: RunnerClient, - current_step: StepTracker, -} - -enum RepoError { - CloneFailedIdk { exit_code: ExitStatus }, - CheckoutFailedIdk { exit_code: ExitStatus }, - CheckoutFailedMissingRef, -} - -pub struct StepTracker { - scopes: Vec -} - -impl StepTracker { - pub fn new() -> Self { - StepTracker { - scopes: Vec::new() - } - } - - pub fn push(&mut self, name: String) { - self.scopes.push(name); - } - - pub fn pop(&mut self) { - self.scopes.pop(); - } - - pub fn clear(&mut self) { - self.scopes.clear(); - } - - pub fn full_step_path(&self) -> &[String] { - self.scopes.as_slice() - } -} - -impl RunningJob { - async fn send_metric(&mut self, name: &str, value: String) -> Result<(), String> { - self.client.send_typed(&ClientProto::metric(name, value)) - .await - .map_err(|e| format!("failed to send metric {}: {:?})", name, e)) - } - - // TODO: panics if hyper finds the channel is closed. hum - async fn create_artifact(&self, name: &str, desc: &str) -> Result { - let (mut sender, body) = hyper::Body::channel(); - let resp = self.client.http.post("https://ci.butactuallyin.space:9876/api/artifact") - .header("user-agent", "ci-butactuallyin-space-runner") - .header("x-task-token", &self.job.build_token) - .header("x-artifact-name", name) - .header("x-artifact-desc", desc) - .body(body) - .send() - .await - .map_err(|e| format!("unable to send request: {:?}", e))?; - - if resp.status() == StatusCode::OK { - eprintln!("[+] artifact '{}' started", name); - Ok(ArtifactStream::new(sender)) - } else { - Err(format!("[-] unable to create artifact: {:?}", resp)) - } - } - - async fn clone_remote(&self) -> Result<(), RepoError> { - let mut git_clone = Command::new("git"); - git_clone - .arg("clone") - .arg(&self.job.remote_url) - .arg("tmpdir"); - - let clone_res = self.execute_command_and_report(git_clone, "git clone log", &format!("git clone {} tmpdir", &self.job.remote_url)).await - .map_err(|e| { - eprintln!("stringy error (exec failed?) for clone: {}", e); - RepoError::CloneFailedIdk { exit_code: ExitStatus::from_raw(0) } - })?; - - if !clone_res.success() { - return Err(RepoError::CloneFailedIdk { exit_code: clone_res }); - } - - let mut git_checkout = Command::new("git"); - git_checkout - .current_dir("tmpdir") - .arg("checkout") - .arg(&self.job.commit); - - let checkout_res = self.execute_command_and_report(git_checkout, "git checkout log", &format!("git checkout {}", &self.job.commit)).await - .map_err(|e| { - eprintln!("stringy error (exec failed?) for checkout: {}", e); - RepoError::CheckoutFailedIdk { exit_code: ExitStatus::from_raw(0) } - })?; - - if !checkout_res.success() { - if checkout_res.code() == Some(128) { - return Err(RepoError::CheckoutFailedIdk { exit_code: checkout_res }); - } else { - return Err(RepoError::CheckoutFailedMissingRef); - } - } - - Ok(()) - } - - async fn execute_command_and_report(&self, mut command: Command, name: &str, desc: &str) -> Result { - let stdout_artifact = self.create_artifact( - &format!("{} (stdout)", name), - &format!("{} (stdout)", desc) - ).await.expect("works"); - let stderr_artifact = self.create_artifact( - &format!("{} (stderr)", name), - &format!("{} (stderr)", desc) - ).await.expect("works"); - - let exit_status = self.execute_command(command, name, desc, stdout_artifact, stderr_artifact).await?; - - Ok(exit_status) - } - - async fn execute_command_capture_output(&self, mut command: Command, name: &str, desc: &str) -> Result { - let stdout_collector = VecSink::new(); - let stderr_collector = VecSink::new(); - - let exit_status = self.execute_command(command, name, desc, stdout_collector.clone(), stderr_collector.clone()).await?; - - Ok(CommandOutput { - exit_status, - stdout: stdout_collector.take_buf(), - stderr: stderr_collector.take_buf(), - }) - } - - async fn execute_command(&self, mut command: Command, name: &str, desc: &str, mut stdout_reporter: impl AsyncWrite + Unpin + Send + 'static, mut stderr_reporter: impl AsyncWrite + Unpin + Send + 'static) -> Result { - eprintln!("[.] running {}", name); - - let mut child = command - .stdin(Stdio::null()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .spawn() - .map_err(|e| format!("failed to spawn '{}', {:?}", name, e))?; - - let mut child_stdout = child.stdout.take().unwrap(); - let mut child_stderr = child.stderr.take().unwrap(); - - eprintln!("[.] '{}': forwarding stdout", name); - tokio::spawn(async move { crate::io::forward_data(&mut child_stdout, &mut stdout_reporter).await }); - eprintln!("[.] '{}': forwarding stderr", name); - tokio::spawn(async move { crate::io::forward_data(&mut child_stderr, &mut stderr_reporter).await }); - - let res = child.wait().await - .map_err(|e| format!("failed to wait? {:?}", e))?; - - if res.success() { - eprintln!("[+] '{}' success", name); - } else { - eprintln!("[-] '{}' fail: {:?}", name, res); - } - - Ok(res) - } - - async fn run(mut self) { - self.client.send_typed(&ClientProto::Started).await.unwrap(); - - std::fs::remove_dir_all("tmpdir").unwrap(); - std::fs::create_dir("tmpdir").unwrap(); - - let ctx = Arc::new(Mutex::new(self)); - - let checkout_res = ctx.lock().unwrap().clone_remote().await; - - if let Err(e) = checkout_res { - let status = "bad_ref"; - let status = ClientProto::task_status(TaskInfo::finished(status)); - eprintln!("checkout failed, reporting status: {:?}", status); - - let res = ctx.lock().unwrap().client.send_typed(&status).await; - if let Err(e) = res { - eprintln!("[!] FAILED TO REPORT JOB STATUS ({}): {:?}", "success", e); - } - - return; - } - - let lua_env = JobEnv::new(&ctx); - - let metadata = std::fs::metadata("./tmpdir/goodfile"); - let res: Result = match metadata { - Ok(_) => { - match lua_env.exec_goodfile().await { - Ok(()) => { - Ok("pass".to_string()) - }, - Err(lua_err) => { - Err(("failed".to_string(), lua_err.to_string())) - } - } - }, - Err(e) if e.kind() == std::io::ErrorKind::NotFound => { - match lua_env.default_goodfile().await { - Ok(()) => { - Ok("pass".to_string()) - }, - Err(lua_err) => { - Err(("failed".to_string(), lua_err.to_string())) - } - } - }, - Err(e) => { - eprintln!("[-] error finding goodfile: {:?}", e); - Err(("failed".to_string(), "inaccessible goodfile".to_string())) - } - }; - - match res { - Ok(status) => { - eprintln!("[+] job success!"); - let status = ClientProto::task_status(TaskInfo::finished(status)); - eprintln!("reporting status: {:?}", status); - - let res = ctx.lock().unwrap().client.send_typed(&status).await; - if let Err(e) = res { - eprintln!("[!] FAILED TO REPORT JOB STATUS ({}): {:?}", "success", e); - } - } - Err((status, lua_err)) => { - eprintln!("[-] job error: {}", status); - - let status = ClientProto::task_status(TaskInfo::interrupted(status, lua_err.to_string())); - let res = ctx.lock().unwrap().client.send_typed(&status).await; - if let Err(e) = res { - eprintln!("[!] FAILED TO REPORT JOB STATUS ({:?}): {:?}", status, e); - } - } - } - } - - fn prep_command(command: &[String], working_dir: Option<&str>) -> (Command, String) { - let mut cmd = Command::new(&command[0]); - let cwd = match working_dir { - Some(dir) => { - format!("tmpdir/{}", dir) - }, - None => { - "tmpdir".to_string() - } - }; - eprintln!("prepared {:?} to run in {}", &command, &cwd); - let human_name = command.join(" "); - cmd - .current_dir(cwd) - .args(&command[1..]); - (cmd, human_name) - } - - async fn run_with_output(&mut self, command: &[String], working_dir: Option<&str>) -> Result { - let (cmd, human_name) = Self::prep_command(command, working_dir); - - let cmd_res = self.execute_command_capture_output(cmd, &format!("{} log", human_name), &human_name).await?; - - if !cmd_res.exit_status.success() { - return Err(format!("{} failed: {:?}", &human_name, cmd_res.exit_status)); - } - Ok(cmd_res) - } - - async fn run_command(&mut self, command: &[String], working_dir: Option<&str>) -> Result<(), String> { - self.client.send_typed(&ClientProto::command(CommandInfo::started(command, working_dir, 1))) - .await.unwrap(); - - let (cmd, human_name) = Self::prep_command(command, working_dir); - - let cmd_res = self.execute_command_and_report(cmd, &format!("{} log", human_name), &human_name).await?; - - self.client.send_typed(&ClientProto::command(CommandInfo::finished(cmd_res.code(), 1))) - .await.unwrap(); - - - if !cmd_res.success() { - return Err(format!("{} failed: {:?}", &human_name, cmd_res)); - } - - Ok(()) - } -} - -impl RunnerClient { - async fn new(host: &str, mut sender: hyper::body::Sender, mut res: Response) -> Result { - if res.status() != StatusCode::OK { - return Err(format!("server returned a bad response: {:?}, response itself: {:?}", res.status(), res)); - } - - let hello = res.chunk().await.expect("chunk"); - if hello.as_ref().map(|x| &x[..]) != Some(b"hello") { - return Err(format!("bad hello: {:?}", hello)); - } - - Ok(Self { - http: reqwest::ClientBuilder::new() - .connect_timeout(Duration::from_millis(1000)) - .timeout(Duration::from_millis(600000)) - .build() - .expect("can build client"), - host: host.to_string(), - tx: sender, - rx: res, - current_job: None, - }) - } - - async fn wait_for_work(&mut self, accepted_pushers: Option<&[String]>) -> Result, WorkAcquireError> { - loop { - let message = self.recv_typed::().await; - match message { - Ok(Some(ClientProto::NewTask(new_task))) => { - return Ok(Some(new_task)); - }, - Ok(Some(ClientProto::Ping)) => { - self.send_typed(&ClientProto::Pong).await - .map_err(|e| WorkAcquireError::Protocol(format!("failed to pong: {}", e)))?; - }, - Ok(Some(other)) => { - return Err(WorkAcquireError::Protocol(format!("unexpected message: {:?}", other))); - }, - Ok(None) => { - return Ok(None); - }, - Err(e) => { - return Err(WorkAcquireError::Protocol(e)); - } - } - } - } - - async fn recv(&mut self) -> Result, String> { - self.recv_typed().await - } - - async fn recv_typed(&mut self) -> Result, String> { - match self.rx.chunk().await { - Ok(Some(chunk)) => { - serde_json::from_slice(&chunk) - .map(Option::Some) - .map_err(|e| { - format!("not json: {:?}", e) - }) - }, - Ok(None) => Ok(None), - Err(e) => { - Err(format!("error in recv: {:?}", e)) - } - } - } - - async fn send(&mut self, value: serde_json::Value) -> Result<(), String> { - self.send_typed(&value).await - } - - async fn send_typed(&mut self, t: &T) -> Result<(), String> { - self.tx.send_data( - serde_json::to_vec(t) - .map_err(|e| format!("json error: {:?}", e))? - .into() - ).await - .map_err(|e| format!("send error: {:?}", e)) - } -} - -#[derive(Deserialize, Serialize)] -struct RunnerConfig { - server_address: String, - auth_secret: String, - allowed_pushers: Option>, -} - -#[tokio::main] -async fn main() { - tracing_subscriber::fmt::init(); - let mut args = std::env::args(); - args.next().expect("first arg exists"); - let config_path = args.next().unwrap_or("./runner_config.json".to_string()); - let runner_config: RunnerConfig = serde_json::from_reader(std::fs::File::open(config_path).expect("file exists and is accessible")).expect("valid json for RunnerConfig"); - let client = reqwest::ClientBuilder::new() - .connect_timeout(Duration::from_millis(1000)) - .timeout(Duration::from_millis(600000)) - .build() - .expect("can build client"); - - let host_info = host_info::collect_host_info(); - eprintln!("host info: {:?}", host_info); - - loop { - let (mut sender, body) = hyper::Body::channel(); - - sender.send_data(serde_json::to_string(&ClientProto::new_task_please( - runner_config.allowed_pushers.clone(), - host_info.clone(), - )).unwrap().into()).await.expect("req"); - - let poll = client.post("https://ci.butactuallyin.space:9876/api/next_job") - .header("user-agent", "ci-butactuallyin-space-runner") - .header("authorization", runner_config.auth_secret.trim()) - .body(body) - .send() - .await; - - match poll { - Ok(mut res) => { - let mut client = match RunnerClient::new("ci.butactuallyin.space:9876", sender, res).await { - Ok(client) => client, - Err(e) => { - eprintln!("failed to initialize client: {:?}", e); - std::thread::sleep(Duration::from_millis(10000)); - continue; - } - }; - let job = match client.wait_for_work(runner_config.allowed_pushers.as_ref().map(|x| x.as_ref())).await { - Ok(Some(request)) => request, - Ok(None) => { - eprintln!("no work to do (yet)"); - std::thread::sleep(Duration::from_millis(2000)); - continue; - } - Err(e) => { - eprintln!("failed to get work: {:?}", e); - std::thread::sleep(Duration::from_millis(10000)); - continue; - } - }; - eprintln!("requested work: {:?}", job); - - eprintln!("doing {:?}", job); - - let mut job = job.into_running(client); - job.run().await; - std::thread::sleep(Duration::from_millis(10000)); - }, - Err(e) => { - let message = format!("{}", e); - - if message.contains("tcp connect error") { - eprintln!("could not reach server. sleeping a bit and retrying."); - std::thread::sleep(Duration::from_millis(5000)); - continue; - } - - eprintln!("unhandled error: {}", message); - - std::thread::sleep(Duration::from_millis(1000)); - } - } - } -} - -mod host_info { - use crate::protocol::{CpuInfo, EnvInfo, HostInfo, MemoryInfo}; - - // get host model name, microcode, and how many cores - fn collect_cpu_info() -> CpuInfo { - fn find_line(lines: &[String], prefix: &str) -> String { - lines.iter() - .find(|line| line.starts_with(prefix)) - .expect(&format!("{} line is present", prefix)) - .split(":") - .last() - .unwrap() - .trim() - .to_string() - } - - /// try finding core `cpu`'s max frequency in khz. we'll assume this is the actual speed a - /// build would run at.. fingers crossed. - fn try_finding_cpu_freq(cpu: u32) -> Result { - if let Ok(freq_str) = std::fs::read_to_string("/sys/devices/system/cpu/cpu0/cpufreq/cpuinfo_max_freq") { - Ok(freq_str.trim().parse().unwrap()) - } else { - // so cpufreq probably isn't around, maybe /proc/cpuinfo's mhz figure is present? - let cpu_lines: Vec = std::fs::read_to_string("/proc/cpuinfo").unwrap().split("\n").map(|line| line.to_string()).collect(); - let cpu_mhzes: Vec<&String> = cpu_lines.iter().filter(|line| line.starts_with("cpu MHz")).collect(); - match cpu_mhzes.get(cpu as usize) { - Some(mhz) => { - let mut line_parts = cpu_mhzes[cpu as usize].split(":"); - let _ = line_parts.next(); - let mhz = line_parts.next().unwrap().trim(); - let mhz: f64 = mhz.parse().unwrap(); - Ok((mhz * 1000.0) as u64) - }, - None => { - panic!("could not get cpu freq either from cpufreq or /proc/cpuinfo?"); - } - } - } - } - - // we'll have to deploy one of a few techniques, because x86/x86_64 is internally - // consistent, but aarch64 is different. who knows what other CPUs think. - match std::env::consts::ARCH { - "x86" | "x86_64" => { - let cpu_lines: Vec = std::fs::read_to_string("/proc/cpuinfo").unwrap().split("\n").map(|line| line.to_string()).collect(); - let model_names: Vec<&String> = cpu_lines.iter().filter(|line| line.starts_with("model name")).collect(); - let cores = model_names.len() as u32; - let model_name = find_line(&cpu_lines, "model name"); - let vendor_id = find_line(&cpu_lines, "vendor_id"); - let family = find_line(&cpu_lines, "cpu family"); - let model = find_line(&cpu_lines, "model\t"); - let microcode = find_line(&cpu_lines, "microcode"); - let max_freq = try_finding_cpu_freq(0).unwrap(); - - CpuInfo { model_name, microcode, cores, vendor_id, family, model, max_freq } - } - "aarch64" => { - let cpu_lines: Vec = std::fs::read_to_string("/proc/cpuinfo").unwrap().split("\n").map(|line| line.to_string()).collect(); - let processors: Vec<&String> = cpu_lines.iter().filter(|line| line.starts_with("processor")).collect(); - let cores = processors.len() as u32; - - // alternate possible path: /sys/firmware/devicetree/base/compatible - let model_name = std::fs::read_to_string("/proc/device-tree/compatible").unwrap(); - let model_name = model_name.replace("\x00", ";"); - let vendor_id = find_line(&cpu_lines, "CPU implementer"); - let vendor_name = match vendor_id.as_str() { - "0x41" => "Arm Limited".to_string(), - "0x42" => "Broadcom Corporation".to_string(), - "0x43" => "Cavium Inc".to_string(), - "0x44" => "Digital Equipment Corporation".to_string(), - "0x46" => "Fujitsu Ltd".to_string(), - "0x49" => "Infineon Technologies AG".to_string(), - "0x4d" => "Motorola".to_string(), - "0x4e" => "NVIDIA Corporation".to_string(), - "0x50" => "Applied Micro Circuits Corporation".to_string(), - "0x51" => "Qualcomm Inc".to_string(), - "0x56" => "Marvell International Ltd".to_string(), - "0x69" => "Intel Corporation".to_string(), - "0xc0" => "Ampere Computing".to_string(), - other => format!("unknown aarch64 vendor {}", other), - }; - let family = find_line(&cpu_lines, "CPU architecture"); - let model = find_line(&cpu_lines, "CPU part"); - let microcode = String::new(); - let max_freq = std::fs::read_to_string("/sys/devices/system/cpu/cpu0/cpufreq/cpuinfo_max_freq").unwrap().trim().parse().unwrap(); - - CpuInfo { model_name, microcode, cores, vendor_id: vendor_name, family, model, max_freq } - } - other => { - panic!("dunno how to find cpu info for {}, panik", other); - } - } - } - - fn collect_mem_info() -> MemoryInfo { - let mem_lines: Vec = std::fs::read_to_string("/proc/meminfo").unwrap().split("\n").map(|line| line.to_string()).collect(); - let total = mem_lines[0].split(":").last().unwrap().trim().to_string(); - let available = mem_lines[2].split(":").last().unwrap().trim().to_string(); - - MemoryInfo { total, available } - } - - fn hostname() -> String { - let mut bytes = [0u8; 4096]; - let res = unsafe { - libc::gethostname(bytes.as_mut_ptr() as *mut std::ffi::c_char, bytes.len()) - }; - if res != 0 { - panic!("gethostname failed {:?}", res); - } - let end = bytes.iter().position(|b| *b == 0).expect("hostname is null-terminated"); - std::ffi::CStr::from_bytes_with_nul(&bytes[..end+1]).expect("null-terminated string").to_str().expect("is utf-8").to_string() - } - - pub fn collect_env_info() -> EnvInfo { - EnvInfo { - arch: std::env::consts::ARCH.to_string(), - family: std::env::consts::FAMILY.to_string(), - os: std::env::consts::OS.to_string(), - } - } - - pub fn collect_host_info() -> HostInfo { - let cpu_info = collect_cpu_info(); - let memory_info = collect_mem_info(); - let hostname = hostname(); - let env_info = collect_env_info(); - - HostInfo { - hostname, - cpu_info, - memory_info, - env_info, - } - } -} - diff --git a/src/dbctx.rs b/src/dbctx.rs deleted file mode 100644 index 7378b2e..0000000 --- a/src/dbctx.rs +++ /dev/null @@ -1,772 +0,0 @@ -use std::sync::Mutex; -use futures_util::StreamExt; -use rusqlite::{Connection, OptionalExtension}; -use std::time::{SystemTime, UNIX_EPOCH}; -use tokio::fs::{File, OpenOptions}; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use std::path::Path; -use std::path::PathBuf; - -use crate::io::ArtifactDescriptor; -use crate::notifier::{RemoteNotifier, NotifierConfig}; -use crate::sql; - -const TOKEN_EXPIRY_MS: u64 = 1000 * 60 * 30; - -pub struct DbCtx { - pub config_path: PathBuf, - // don't love this but.. for now... - pub conn: Mutex, -} - -#[derive(Debug, Clone)] -pub struct Repo { - pub id: u64, - pub name: String, - pub default_run_preference: Option, -} - -#[derive(Debug)] -pub struct Remote { - pub id: u64, - pub repo_id: u64, - pub remote_path: String, - pub remote_api: String, - pub remote_url: String, - pub remote_git_url: String, - pub notifier_config_path: String, -} - -// a job tracks when we became aware of a commit from remote. typically a job will have a 1-1 -// relationship with commits, and potentially many *runs* of that job. -#[derive(Debug, Clone)] -pub struct Job { - pub id: u64, - pub remote_id: u64, - pub commit_id: u64, - pub created_time: u64, - pub source: Option, - pub run_preferences: Option, -} - -// a run tracks the intent or obligation to have some runner somewhere run a goodfile and report -// results. a job may have many runs from many different hosts rebuliding history, or reruns of the -// same job on the same hardware to collect more datapoints on the operation. -#[derive(Debug, Clone)] -pub struct Run { - pub id: u64, - pub job_id: u64, - pub artifacts_path: Option, - pub state: sql::RunState, - pub host_id: Option, - pub create_time: u64, - pub start_time: Option, - pub complete_time: Option, - pub build_token: Option, - pub run_timeout: Option, - pub build_result: Option, - pub final_text: Option, -} - -impl Run { - fn into_pending_run(self) -> PendingRun { - PendingRun { - id: self.id, - job_id: self.job_id, - create_time: self.create_time, - } - } -} - -#[derive(Debug, Clone)] -pub struct PendingRun { - pub id: u64, - pub job_id: u64, - pub create_time: u64, -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum TokenValidity { - Expired, - Invalid, - Valid, -} - -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct MetricRecord { - pub id: u64, - pub run_id: u64, - pub name: String, - pub value: String -} - -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct ArtifactRecord { - pub id: u64, - pub run_id: u64, - pub name: String, - pub desc: String, - pub created_time: u64, - pub completed_time: Option, -} - -impl DbCtx { - pub fn new>(config_path: P, db_path: P) -> Self { - DbCtx { - config_path: config_path.as_ref().to_owned(), - conn: Mutex::new(Connection::open(db_path).unwrap()) - } - } - - pub fn create_tables(&self) -> Result<(), ()> { - let conn = self.conn.lock().unwrap(); - conn.execute(sql::CREATE_ARTIFACTS_TABLE, ()).unwrap(); - conn.execute(sql::CREATE_JOBS_TABLE, ()).unwrap(); - conn.execute(sql::CREATE_METRICS_TABLE, ()).unwrap(); - conn.execute(sql::CREATE_COMMITS_TABLE, ()).unwrap(); - conn.execute(sql::CREATE_REPOS_TABLE, ()).unwrap(); - conn.execute(sql::CREATE_REPO_NAME_INDEX, ()).unwrap(); - conn.execute(sql::CREATE_REMOTES_TABLE, ()).unwrap(); - conn.execute(sql::CREATE_REMOTES_INDEX, ()).unwrap(); - conn.execute(sql::CREATE_RUNS_TABLE, ()).unwrap(); - conn.execute(sql::CREATE_HOSTS_TABLE, ()).unwrap(); - - Ok(()) - } - - pub fn insert_metric(&self, run_id: u64, name: &str, value: &str) -> Result<(), String> { - let conn = self.conn.lock().unwrap(); - conn - .execute( - "insert into metrics (run_id, name, value) values (?1, ?2, ?3) on conflict (run_id, name) do update set value=excluded.value", - (run_id, name, value) - ) - .expect("can upsert"); - Ok(()) - } - - pub fn new_commit(&self, sha: &str) -> Result { - let conn = self.conn.lock().unwrap(); - conn - .execute( - "insert into commits (sha) values (?1)", - [sha.clone()] - ) - .expect("can insert"); - - Ok(conn.last_insert_rowid() as u64) - } - - pub fn new_repo(&self, name: &str) -> Result { - let conn = self.conn.lock().unwrap(); - conn - .execute( - "insert into repos (repo_name) values (?1)", - [name.clone()] - ) - .map_err(|e| { - format!("{:?}", e) - })?; - - Ok(conn.last_insert_rowid() as u64) - } - - pub async fn finalize_artifact(&self, artifact_id: u64) -> Result<(), String> { - let conn = self.conn.lock().unwrap(); - conn - .execute( - "update artifacts set completed_time=?1 where id=?2", - (crate::io::now_ms(), artifact_id) - ) - .map(|_| ()) - .map_err(|e| { - format!("{:?}", e) - }) - } - - pub async fn reserve_artifact(&self, run_id: u64, name: &str, desc: &str) -> Result { - let artifact_id = { - let created_time = SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("now is before epoch") - .as_millis() as u64; - let conn = self.conn.lock().unwrap(); - conn - .execute( - "insert into artifacts (run_id, name, desc, created_time) values (?1, ?2, ?3, ?4)", - (run_id, name, desc, created_time) - ) - .map_err(|e| { - format!("{:?}", e) - })?; - - conn.last_insert_rowid() as u64 - }; - - ArtifactDescriptor::new(run_id, artifact_id).await - } - - pub fn lookup_artifact(&self, run_id: u64, artifact_id: u64) -> Result, String> { - let conn = self.conn.lock().unwrap(); - conn - .query_row(sql::ARTIFACT_BY_ID, [artifact_id, run_id], |row| { - let (id, run_id, name, desc, created_time, completed_time) = row.try_into().unwrap(); - - Ok(ArtifactRecord { - id, run_id, name, desc, created_time, completed_time - }) - }) - .optional() - .map_err(|e| e.to_string()) - } - - pub fn commit_sha(&self, commit_id: u64) -> Result { - self.conn.lock() - .unwrap() - .query_row( - "select sha from commits where id=?1", - [commit_id], - |row| { row.get(0) } - ) - .map_err(|e| e.to_string()) - } - - pub fn job_for_commit(&self, sha: &str) -> Result, String> { - self.conn.lock() - .unwrap() - .query_row( - "select id from commits where sha=?1", - [sha], - |row| { row.get(0) } - ) - .optional() - .map_err(|e| e.to_string()) - } - - pub fn run_for_token(&self, token: &str) -> Result, TokenValidity)>, String> { - self.conn.lock() - .unwrap() - .query_row( - "select id, artifacts_path, started_time, run_timeout from runs where build_token=?1", - [token], - |row| { - let timeout: Option = row.get(3).unwrap(); - let timeout = timeout.unwrap_or(TOKEN_EXPIRY_MS); - - let now = SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("now is before epoch") - .as_millis(); - - let time: Option = row.get(2).unwrap(); - let validity = if let Some(time) = time { - if now > time as u128 + timeout as u128 { - TokenValidity::Expired - } else { - TokenValidity::Valid - } - } else { - TokenValidity::Invalid - }; - Ok((row.get(0).unwrap(), row.get(1).unwrap(), validity)) - } - ) - .optional() - .map_err(|e| e.to_string()) - } - - pub fn job_by_id(&self, id: u64) -> Result, String> { - self.conn.lock() - .unwrap() - .query_row(crate::sql::JOB_BY_ID, [id], |row| { - let (id, source, created_time, remote_id, commit_id, run_preferences) = row.try_into().unwrap(); - - Ok(Job { - id, source, created_time, remote_id, commit_id, run_preferences - }) - }) - .optional() - .map_err(|e| e.to_string()) - } - - pub fn remote_by_path_and_api(&self, api: &str, path: &str) -> Result, String> { - self.conn.lock() - .unwrap() - .query_row("select id, repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path from remotes where remote_api=?1 and remote_path=?2", [api, path], |row| { - let (id, repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path) = row.try_into().unwrap(); - - Ok(Remote { - id, repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path - }) - }) - .optional() - .map_err(|e| e.to_string()) - } - - pub fn remote_by_id(&self, id: u64) -> Result, String> { - self.conn.lock() - .unwrap() - .query_row("select id, repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path from remotes where id=?1", [id], |row| { - let (id, repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path) = row.try_into().unwrap(); - - Ok(Remote { - id, repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path - }) - }) - .optional() - .map_err(|e| e.to_string()) - } - - pub fn repo_id_by_remote(&self, remote_id: u64) -> Result, String> { - self.conn.lock() - .unwrap() - .query_row("select repo_id from remotes where id=?1", [remote_id], |row| row.get(0)) - .optional() - .map_err(|e| e.to_string()) - } - - pub fn repo_id_by_name(&self, repo_name: &str) -> Result, String> { - self.conn.lock() - .unwrap() - .query_row("select id from repos where repo_name=?1", [repo_name], |row| row.get(0)) - .optional() - .map_err(|e| e.to_string()) - } - - pub fn new_remote(&self, repo_id: u64, remote: &str, remote_kind: &str, config_path: &str) -> Result { - let (remote_path, remote_api, remote_url, remote_git_url) = match remote_kind { - "github" => { - (remote.to_owned(), remote_kind.to_owned(), format!("https://www.github.com/{}", remote), format!("https://www.github.com/{}.git", remote)) - }, - "github-email" => { - (remote.to_owned(), "email".to_owned(), format!("https://www.github.com/{}", remote), format!("http://www.github.com/{}.git", remote)) - }, - other => { - panic!("unsupported remote kind: {}", other); - } - }; - - let conn = self.conn.lock().unwrap(); - conn - .execute( - "insert into remotes (repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path) values (?1, ?2, ?3, ?4, ?5, ?6);", - (repo_id, remote_path, remote_api, remote_url, remote_git_url, config_path) - ) - .expect("can insert"); - - Ok(conn.last_insert_rowid() as u64) - } - - pub fn new_job(&self, remote_id: u64, sha: &str, pusher: Option<&str>, repo_default_run_pref: Option) -> Result { - // TODO: potential race: if two remotes learn about a commit at the same time and we decide - // to create two jobs at the same time, this might return an incorrect id if the insert - // didn't actually insert a new row. - let commit_id = self.new_commit(sha).expect("can create commit record"); - - let created_time = SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("now is before epoch") - .as_millis() as u64; - - let conn = self.conn.lock().unwrap(); - - let rows_modified = conn.execute( - "insert into jobs (remote_id, commit_id, created_time, source, run_preferences) values (?1, ?2, ?3, ?4, ?5);", - (remote_id, commit_id, created_time, pusher, repo_default_run_pref) - ).unwrap(); - - assert_eq!(1, rows_modified); - - let job_id = conn.last_insert_rowid() as u64; - - Ok(job_id) - } - - pub fn new_run(&self, job_id: u64, host_preference: Option) -> Result { - let created_time = SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("now is before epoch") - .as_millis() as u64; - - let conn = self.conn.lock().unwrap(); - - let rows_modified = conn.execute( - "insert into runs (job_id, state, created_time, host_preference) values (?1, ?2, ?3, ?4);", - (job_id, crate::sql::RunState::Pending as u64, created_time, host_preference) - ).unwrap(); - - assert_eq!(1, rows_modified); - - let run_id = conn.last_insert_rowid() as u64; - - Ok(PendingRun { - id: run_id, - job_id, - create_time: created_time, - }) - } - - pub fn reap_task(&self, task_id: u64) -> Result<(), String> { - let conn = self.conn.lock().unwrap(); - - conn.execute( - "update runs set final_status=\"lost signal\", state=4 where id=?1;", - [task_id] - ).unwrap(); - - Ok(()) - } - - pub fn metrics_for_run(&self, run: u64) -> Result, String> { - let conn = self.conn.lock().unwrap(); - - let mut metrics_query = conn.prepare(sql::METRICS_FOR_RUN).unwrap(); - let mut result = metrics_query.query([run]).unwrap(); - let mut metrics = Vec::new(); - - while let Some(row) = result.next().unwrap() { - let (id, run_id, name, value): (u64, u64, String, String) = row.try_into().unwrap(); - metrics.push(MetricRecord { id, run_id, name, value }); - } - - Ok(metrics) - } - - pub fn artifacts_for_run(&self, run: u64, limit: Option) -> Result, String> { - let conn = self.conn.lock().unwrap(); - - let mut artifacts_query = conn.prepare(sql::LAST_ARTIFACTS_FOR_RUN).unwrap(); - let mut result = artifacts_query.query([run, limit.unwrap_or(65535)]).unwrap(); - let mut artifacts = Vec::new(); - - while let Some(row) = result.next().unwrap() { - let (id, run_id, name, desc, created_time, completed_time): (u64, u64, String, String, u64, Option) = row.try_into().unwrap(); - artifacts.push(ArtifactRecord { id, run_id, name, desc, created_time, completed_time }); - } - - Ok(artifacts) - } - - pub fn repo_by_id(&self, id: u64) -> Result, String> { - self.conn.lock() - .unwrap() - .query_row("select id, repo_name, default_run_preference from repos where id=?1", [id], |row| { - let (id, repo_name, default_run_preference) = row.try_into().unwrap(); - Ok(Repo { - id, - name: repo_name, - default_run_preference, - }) - }) - .optional() - .map_err(|e| e.to_string()) - } - - pub fn get_repos(&self) -> Result, String> { - let conn = self.conn.lock().unwrap(); - - let mut repos_query = conn.prepare(sql::ALL_REPOS).unwrap(); - let mut repos = repos_query.query([]).unwrap(); - let mut result = Vec::new(); - - while let Some(row) = repos.next().unwrap() { - let (id, repo_name, default_run_preference) = row.try_into().unwrap(); - result.push(Repo { - id, - name: repo_name, - default_run_preference, - }); - } - - Ok(result) - } - - pub fn last_job_from_remote(&self, id: u64) -> Result, String> { - self.recent_jobs_from_remote(id, 1) - .map(|mut jobs| jobs.pop()) - } - - pub fn job_by_commit_id(&self, commit_id: u64) -> Result, String> { - let conn = self.conn.lock().unwrap(); - - conn - .query_row(sql::JOB_BY_COMMIT_ID, [commit_id], |row| { - let (id, source, created_time, remote_id, commit_id, run_preferences) = row.try_into().unwrap(); - Ok(Job { - id, - remote_id, - commit_id, - created_time, - source, - run_preferences, - }) - }) - .optional() - .map_err(|e| e.to_string()) - } - - pub fn recent_jobs_from_remote(&self, id: u64, limit: u64) -> Result, String> { - let conn = self.conn.lock().unwrap(); - - let mut job_query = conn.prepare(sql::LAST_JOBS_FROM_REMOTE).unwrap(); - let mut result = job_query.query([id, limit]).unwrap(); - - let mut jobs = Vec::new(); - - while let Some(row) = result.next().unwrap() { - let (id, source, created_time, remote_id, commit_id, run_preferences) = row.try_into().unwrap(); - jobs.push(Job { - id, - remote_id, - commit_id, - created_time, - source, - run_preferences, - }); - } - - Ok(jobs) - } - - pub fn get_active_runs(&self) -> Result, String> { - let conn = self.conn.lock().unwrap(); - - let mut started_query = conn.prepare(sql::ACTIVE_RUNS).unwrap(); - let mut runs = started_query.query([]).unwrap(); - let mut started = Vec::new(); - - while let Some(row) = runs.next().unwrap() { - started.push(crate::sql::row2run(row)); - } - - Ok(started) - } - - pub fn get_pending_runs(&self, host_id: Option) -> Result, String> { - let conn = self.conn.lock().unwrap(); - - let mut pending_query = conn.prepare(sql::PENDING_RUNS).unwrap(); - let mut runs = pending_query.query([host_id]).unwrap(); - let mut pending = Vec::new(); - - while let Some(row) = runs.next().unwrap() { - let (id, job_id, create_time) = row.try_into().unwrap(); - let run = PendingRun { - id, - job_id, - create_time, - }; - pending.push(run); - } - - Ok(pending) - } - - pub fn jobs_needing_task_runs_for_host(&self, host_id: u64) -> Result, String> { - // for jobs that this host has not run, we'll arbitrarily say that we won't generate new - // runs for jobs more than a day old. - // - // we don't want to rebuild the entire history every time we see a new host by default; if - // you really want to rebuild all of history on a new host, use `ci_ctl` to prepare the - // runs. - let cutoff = crate::io::now_ms() - 24 * 3600 * 1000; - - let conn = self.conn.lock().unwrap(); - - let mut jobs_needing_task_runs = conn.prepare(sql::JOBS_NEEDING_HOST_RUN).unwrap(); - let mut job_rows = jobs_needing_task_runs.query([cutoff, host_id]).unwrap(); - let mut jobs = Vec::new(); - - while let Some(row) = job_rows.next().unwrap() { - let (id, source, created_time, remote_id, commit_id, run_preferences) = row.try_into().unwrap(); - - jobs.push(Job { - id, source, created_time, remote_id, commit_id, run_preferences, - }); - } - - Ok(jobs) - } - - - pub fn remotes_by_repo(&self, repo_id: u64) -> Result, String> { - let mut remotes: Vec = Vec::new(); - - let conn = self.conn.lock().unwrap(); - let mut remotes_query = conn.prepare(crate::sql::REMOTES_FOR_REPO).unwrap(); - let mut remote_results = remotes_query.query([repo_id]).unwrap(); - - while let Some(row) = remote_results.next().unwrap() { - let (id, repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path) = row.try_into().unwrap(); - remotes.push(Remote { id, repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path }); - } - - Ok(remotes) - } - - /// try to find a host close to `host_info`, but maybe not an exact match. - /// - /// specifically, we'll ignore microcode and family/os - enough that measurements ought to be - /// comparable but maybe not perfectly so. - pub fn find_id_like_host(&self, host_info: &crate::protocol::HostInfo) -> Result, String> { - self.conn.lock() - .unwrap() - .query_row( - "select id from hosts where \ - hostname=?1 and cpu_vendor_id=?2 and cpu_model_name=?3 and cpu_family=?4 and \ - cpu_model=?5 and cpu_max_freq_khz=?6 and cpu_cores=?7 and mem_total=?8 and \ - arch=?9);", - ( - &host_info.hostname, - &host_info.cpu_info.vendor_id, - &host_info.cpu_info.model_name, - &host_info.cpu_info.family, - &host_info.cpu_info.model, - &host_info.cpu_info.max_freq, - &host_info.cpu_info.cores, - &host_info.memory_info.total, - &host_info.env_info.arch, - ), - |row| { row.get(0) } - ) - .map_err(|e| e.to_string()) - } - - /// get an id for the host described by `host_info`. this may create a new record if no such - /// host exists. - pub fn id_for_host(&self, host_info: &crate::protocol::HostInfo) -> Result { - let conn = self.conn.lock().unwrap(); - - conn - .execute( - "insert or ignore into hosts \ - (\ - hostname, cpu_vendor_id, cpu_model_name, cpu_family, \ - cpu_model, cpu_microcode, cpu_max_freq_khz, cpu_cores, \ - mem_total, arch, family, os\ - ) values (\ - ?1, ?2, ?3, ?4, \ - ?5, ?6, ?7, ?8, \ - ?9, ?10, ?11, ?12 \ - );", - ( - &host_info.hostname, - &host_info.cpu_info.vendor_id, - &host_info.cpu_info.model_name, - &host_info.cpu_info.family, - &host_info.cpu_info.model, - &host_info.cpu_info.microcode, - &host_info.cpu_info.max_freq, - &host_info.cpu_info.cores, - &host_info.memory_info.total, - &host_info.env_info.arch, - &host_info.env_info.family, - &host_info.env_info.os, - ) - ) - .expect("can insert"); - - conn - .query_row( - "select id from hosts where \ - hostname=?1 and cpu_vendor_id=?2 and cpu_model_name=?3 and cpu_family=?4 and \ - cpu_model=?5 and cpu_microcode=?6 and cpu_max_freq_khz=?7 and \ - cpu_cores=?8 and mem_total=?9 and arch=?10 and family=?11 and os=?12;", - ( - &host_info.hostname, - &host_info.cpu_info.vendor_id, - &host_info.cpu_info.model_name, - &host_info.cpu_info.family, - &host_info.cpu_info.model, - &host_info.cpu_info.microcode, - &host_info.cpu_info.max_freq, - &host_info.cpu_info.cores, - &host_info.memory_info.total, - &host_info.env_info.arch, - &host_info.env_info.family, - &host_info.env_info.os, - ), - |row| { row.get(0) } - ) - .map_err(|e| e.to_string()) - } - - pub fn host_model_info(&self, host_id: u64) -> Result<(String, String, String, String, u64), String> { - let conn = self.conn.lock().unwrap(); - conn - .query_row("select hostname, cpu_vendor_id, cpu_family, cpu_model, cpu_max_freq_khz from hosts where id=?1;", [host_id], |row| { - Ok(( - row.get(0).unwrap(), - row.get(1).unwrap(), - row.get(2).unwrap(), - row.get(3).unwrap(), - row.get(4).unwrap(), - )) - }) - .map_err(|e| e.to_string()) - } - - pub fn runs_for_job_one_per_host(&self, job_id: u64) -> Result, String> { - let conn = self.conn.lock().unwrap(); - let mut runs_query = conn.prepare(crate::sql::RUNS_FOR_JOB).unwrap(); - let mut runs_results = runs_query.query([job_id]).unwrap(); - - let mut results = Vec::new(); - - while let Some(row) = runs_results.next().unwrap() { - results.push(crate::sql::row2run(row)); - } - - Ok(results) - } - - pub fn last_run_for_job(&self, job_id: u64) -> Result, String> { - let conn = self.conn.lock().unwrap(); - - conn - .query_row(sql::LAST_RUN_FOR_JOB, [job_id], |row| { - Ok(crate::sql::row2run(row)) - }) - .optional() - .map_err(|e| e.to_string()) - } - - pub fn notifiers_by_repo(&self, repo_id: u64) -> Result, String> { - let remotes = self.remotes_by_repo(repo_id)?; - - let mut notifiers: Vec = Vec::new(); - - for remote in remotes.into_iter() { - match remote.remote_api.as_str() { - "github" => { - let mut notifier_path = self.config_path.clone(); - notifier_path.push(&remote.notifier_config_path); - - let notifier = RemoteNotifier { - remote_path: remote.remote_path, - notifier: NotifierConfig::github_from_file(¬ifier_path) - .expect("can load notifier config") - }; - notifiers.push(notifier); - }, - "email" => { - let mut notifier_path = self.config_path.clone(); - notifier_path.push(&remote.notifier_config_path); - - let notifier = RemoteNotifier { - remote_path: remote.remote_path, - notifier: NotifierConfig::email_from_file(¬ifier_path) - .expect("can load notifier config") - }; - notifiers.push(notifier); - } - other => { - eprintln!("unknown remote api kind: {:?}, remote is {:?}", other, &remote) - } - } - } - - Ok(notifiers) - } -} - diff --git a/src/io.rs b/src/io.rs deleted file mode 100644 index f9f407f..0000000 --- a/src/io.rs +++ /dev/null @@ -1,181 +0,0 @@ -use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; -use futures_util::StreamExt; -use tokio::fs::File; -use std::io::Write; -use tokio::fs::OpenOptions; -use std::task::{Poll, Context}; -use std::pin::Pin; -use std::time::{UNIX_EPOCH, SystemTime}; -use std::sync::{Arc, Mutex}; - -pub fn now_ms() -> u64 { - SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("now is later than epoch") - .as_millis() as u64 -} - -#[derive(Clone)] -pub struct VecSink { - body: Arc>>, -} - -impl VecSink { - pub fn new() -> Self { - Self { body: Arc::new(Mutex::new(Vec::new())) } - } - - pub fn take_buf(&self) -> Vec { - std::mem::replace(&mut *self.body.lock().unwrap(), Vec::new()) - } -} - -impl tokio::io::AsyncWrite for VecSink { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context, - buf: &[u8] - ) -> Poll> { - self.body.lock().unwrap().extend_from_slice(buf); - Poll::Ready(Ok(buf.len())) - } - - fn poll_flush( - self: Pin<&mut Self>, - _cx: &mut Context - ) -> Poll> { - Poll::Ready(Ok(())) - } - - fn poll_shutdown( - self: Pin<&mut Self>, - _cx: &mut Context - ) -> Poll> { - Poll::Ready(Ok(())) - } -} - -pub struct ArtifactStream { - sender: hyper::body::Sender, -} - -impl ArtifactStream { - pub fn new(sender: hyper::body::Sender) -> Self { - Self { sender } - } -} - -impl tokio::io::AsyncWrite for ArtifactStream { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context, - buf: &[u8] - ) -> Poll> { - match self.get_mut().sender.try_send_data(buf.to_vec().into()) { - Ok(()) => { - Poll::Ready(Ok(buf.len())) - }, - _ => { - cx.waker().wake_by_ref(); - Poll::Pending - } - } - } - - fn poll_flush( - self: Pin<&mut Self>, - _cx: &mut Context - ) -> Poll> { - Poll::Ready(Ok(())) - } - - fn poll_shutdown( - self: Pin<&mut Self>, - _cx: &mut Context - ) -> Poll> { - Poll::Ready(Ok(())) - } -} - - -pub struct ArtifactDescriptor { - job_id: u64, - pub artifact_id: u64, - file: File, -} - -impl ArtifactDescriptor { - pub async fn new(job_id: u64, artifact_id: u64) -> Result { - // TODO: jobs should be a configurable path - let path = format!("artifacts/{}/{}", job_id, artifact_id); - let file = OpenOptions::new() - .read(true) - .write(true) - .create_new(true) - .open(&path) - .await - .map_err(|e| format!("couldn't open artifact file {}: {}", path, e))?; - - Ok(ArtifactDescriptor { - job_id, - artifact_id, - file, - }) - } - - pub async fn store_all(&mut self, mut data: axum::extract::BodyStream) -> Result<(), String> { - loop { - let chunk = data.next().await; - - let chunk = match chunk { - Some(Ok(chunk)) => chunk, - Some(Err(e)) => { - eprintln!("error: {:?}", e); - return Err(format!("error reading: {:?}", e)); - } - None => { - eprintln!("body done?"); - return Ok(()); - } - }; - - let chunk = chunk.as_ref(); - - self.file.write_all(chunk).await - .map_err(|e| format!("failed to write: {:?}", e))?; - } - } -} - -pub async fn forward_data(source: &mut (impl AsyncRead + Unpin), dest: &mut (impl AsyncWrite + Unpin)) -> Result<(), String> { - let mut buf = vec![0; 1024 * 1024]; - loop { - let n_read = source.read(&mut buf).await - .map_err(|e| format!("failed to read: {:?}", e))?; - - if n_read == 0 { - eprintln!("done reading!"); - return Ok(()); - } - - dest.write_all(&buf[..n_read]).await - .map_err(|e| format!("failed to write: {:?}", e))?; - } -} -/* -pub async fn forward_data(source: &mut (impl AsyncRead + Unpin), dest: &mut ArtifactStream) -> Result<(), String> { - let mut buf = vec![0; 1024 * 1024]; - loop { - let n_read = source.read(&mut buf).await - .map_err(|e| format!("failed to read: {:?}", e))?; - - if n_read == 0 { - eprintln!("done reading!"); - return Ok(()); - } - - dest.sender.send_data(buf[..n_read].to_vec().into()).await - .map_err(|e| format!("failed to write: {:?}", e))?; - } -} -*/ diff --git a/src/lua/mod.rs b/src/lua/mod.rs deleted file mode 100644 index 6c4a281..0000000 --- a/src/lua/mod.rs +++ /dev/null @@ -1,411 +0,0 @@ -use crate::RunningJob; - -use rlua::prelude::*; - -use std::sync::{Arc, Mutex}; -use std::path::PathBuf; - -pub const DEFAULT_RUST_GOODFILE: &'static [u8] = include_bytes!("../../config/goodfiles/rust.lua"); - -pub struct BuildEnv { - lua: Lua, - job: Arc>, -} - -#[derive(Debug)] -pub struct RunParams { - step: Option, - name: Option, - cwd: Option, -} - -pub struct CommandOutput { - pub exit_status: std::process::ExitStatus, - pub stdout: Vec, - pub stderr: Vec, -} - -mod lua_exports { - use crate::RunningJob; - use crate::lua::{CommandOutput, RunParams}; - - use std::sync::{Arc, Mutex}; - use std::path::PathBuf; - - use rlua::prelude::*; - - pub fn collect_build_args(command: LuaValue, params: LuaValue) -> Result<(Vec, RunParams), rlua::Error> { - let args = match command { - LuaValue::Table(table) => { - let len = table.len().expect("command table has a length"); - let mut command_args = Vec::new(); - for i in 0..len { - let value = table.get(i + 1).expect("command arg is gettble"); - match value { - LuaValue::String(s) => { - command_args.push(s.to_str().unwrap().to_owned()); - }, - other => { - return Err(LuaError::RuntimeError(format!("argument {} was not a string, was {:?}", i, other))); - } - }; - } - - command_args - }, - other => { - return Err(LuaError::RuntimeError(format!("argument 1 was not a table: {:?}", other))); - } - }; - - let params = match params { - LuaValue::Table(table) => { - let step = match table.get("step").expect("can get from table") { - LuaValue::String(v) => { - Some(v.to_str()?.to_owned()) - }, - LuaValue::Nil => { - None - }, - other => { - return Err(LuaError::RuntimeError(format!("params[\"step\"] must be a string"))); - } - }; - let name = match table.get("name").expect("can get from table") { - LuaValue::String(v) => { - Some(v.to_str()?.to_owned()) - }, - LuaValue::Nil => { - None - }, - other => { - return Err(LuaError::RuntimeError(format!("params[\"name\"] must be a string"))); - } - }; - let cwd = match table.get("cwd").expect("can get from table") { - LuaValue::String(v) => { - Some(v.to_str()?.to_owned()) - }, - LuaValue::Nil => { - None - }, - other => { - return Err(LuaError::RuntimeError(format!("params[\"cwd\"] must be a string"))); - } - }; - - RunParams { - step, - name, - cwd, - } - }, - LuaValue::Nil => { - RunParams { - step: None, - name: None, - cwd: None, - } - } - other => { - return Err(LuaError::RuntimeError(format!("argument 2 was not a table: {:?}", other))); - } - }; - - Ok((args, params)) - } - - pub fn build_command_impl(command: LuaValue, params: LuaValue, job_ctx: Arc>) -> Result<(), rlua::Error> { - let (args, params) = collect_build_args(command, params)?; - eprintln!("args: {:?}", args); - eprintln!(" params: {:?}", params); - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); - rt.block_on(async move { - job_ctx.lock().unwrap().run_command(&args, params.cwd.as_ref().map(|x| x.as_str())).await - .map_err(|e| LuaError::RuntimeError(format!("run_command error: {:?}", e))) - }) - } - - pub fn check_output_impl<'lua>(ctx: rlua::Context<'lua>, command: LuaValue<'lua>, params: LuaValue<'lua>, job_ctx: Arc>) -> Result, rlua::Error> { - let (args, params) = collect_build_args(command, params)?; - eprintln!("args: {:?}", args); - eprintln!(" params: {:?}", params); - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); - let command_output = rt.block_on(async move { - job_ctx.lock().unwrap().run_with_output(&args, params.cwd.as_ref().map(|x| x.as_str())).await - .map_err(|e| LuaError::RuntimeError(format!("run_command error: {:?}", e))) - })?; - - let stdout = ctx.create_string(command_output.stdout.as_slice())?; - let stderr = ctx.create_string(command_output.stderr.as_slice())?; - - let result = ctx.create_table()?; - result.set("stdout", stdout)?; - result.set("stderr", stderr)?; - result.set("status", command_output.exit_status.code())?; - Ok(result) - } - - pub fn check_dependencies(commands: Vec) -> Result<(), rlua::Error> { - let mut missing_deps = Vec::new(); - for command in commands.iter() { - if !has_cmd(command)? { - missing_deps.push(command.clone()); - } - } - - if missing_deps.len() > 0 { - return Err(LuaError::RuntimeError(format!("missing dependencies: {}", missing_deps.join(", ")))); - } - - Ok(()) - } - - pub fn metric(name: String, value: String, job_ctx: Arc>) -> Result<(), rlua::Error> { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); - rt.block_on(async move { - job_ctx.lock().unwrap().send_metric(&name, value).await - .map_err(|e| LuaError::RuntimeError(format!("send_metric error: {:?}", e))) - }) - } - - pub fn artifact(path: String, name: Option, job_ctx: Arc>) -> Result<(), rlua::Error> { - let path: PathBuf = path.into(); - - let default_name: String = match (path.file_name(), path.parent()) { - (Some(name), _) => name - .to_str() - .ok_or(LuaError::RuntimeError("artifact name is not a unicode string".to_string()))? - .to_string(), - (_, Some(parent)) => format!("{}", parent.display()), - (None, None) => { - // one day using directories for artifacts might work but / is still not going - // to be accepted - return Err(LuaError::RuntimeError(format!("cannot infer a default path name for {}", path.display()))); - } - }; - - let name: String = match name { - Some(name) => name, - None => default_name, - }; - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); - rt.block_on(async move { - let mut artifact = job_ctx.lock().unwrap().create_artifact(&name, &format!("{} (from {})", name, path.display())).await - .map_err(|e| LuaError::RuntimeError(format!("create_artifact error: {:?}", e))) - .unwrap(); - let mut file = tokio::fs::File::open(&format!("tmpdir/{}", path.display())).await.unwrap(); - eprintln!("uploading..."); - crate::io::forward_data(&mut file, &mut artifact).await - .map_err(|e| LuaError::RuntimeError(format!("failed uploading data for {}: {:?}", name, e)))?; - std::mem::drop(artifact); - Ok(()) - }) - } - - pub fn has_cmd(name: &str) -> Result { - Ok(std::process::Command::new("which") - .arg(name) - .status() - .map_err(|e| LuaError::RuntimeError(format!("could not fork which? {:?}", e)))? - .success()) - } - - pub fn file_size(path: &str) -> Result { - Ok(std::fs::metadata(&format!("tmpdir/{}", path)) - .map_err(|e| LuaError::RuntimeError(format!("could not stat {:?}", path)))? - .len()) - } - - pub mod step { - use crate::RunningJob; - use std::sync::{Arc, Mutex}; - - pub fn start(job_ref: Arc>, name: String) -> Result<(), rlua::Error> { - let mut job = job_ref.lock().unwrap(); - job.current_step.clear(); - job.current_step.push(name); - Ok(()) - } - - pub fn push(job_ref: Arc>, name: String) -> Result<(), rlua::Error> { - let mut job = job_ref.lock().unwrap(); - job.current_step.push(name); - Ok(()) - } - - pub fn advance(job_ref: Arc>, name: String) -> Result<(), rlua::Error> { - let mut job = job_ref.lock().unwrap(); - job.current_step.pop(); - job.current_step.push(name); - Ok(()) - } - } -} - -struct DeclEnv<'lua, 'env> { - lua_ctx: &'env rlua::Context<'lua>, - job_ref: &'env Arc>, -} -impl<'lua, 'env> DeclEnv<'lua, 'env> { - fn create_function(&self, name: &str, f: F) -> Result, String> - where - A: FromLuaMulti<'lua>, - R: ToLuaMulti<'lua>, - F: 'static + Send + Fn(rlua::Context<'lua>, Arc>, A) -> Result { - - let job_ref = Arc::clone(self.job_ref); - self.lua_ctx.create_function(move |ctx, args| { - let job_ref = Arc::clone(&job_ref); - f(ctx, job_ref, args) - }) - .map_err(|e| format!("problem defining {} function: {:?}", name, e)) - } -} - -impl BuildEnv { - pub fn new(job: &Arc>) -> Self { - let env = BuildEnv { - lua: Lua::new(), - job: Arc::clone(job), - }; - env.lua.context(|lua_ctx| { - env.define_env(lua_ctx) - }).expect("can define context"); - env - } - - fn define_env(&self, lua_ctx: rlua::Context) -> Result<(), String> { - let decl_env = DeclEnv { - lua_ctx: &lua_ctx, - job_ref: &self.job, - }; - - let hello = decl_env.create_function("hello", |_, _, ()| { - eprintln!("hello from lua!!!"); - Ok(()) - })?; - - let check_dependencies = decl_env.create_function("dependencies", move |_, job_ref, commands: Vec| { - lua_exports::check_dependencies(commands) - })?; - - let build = decl_env.create_function("build", move |_, job_ref, (command, params): (LuaValue, LuaValue)| { - lua_exports::build_command_impl(command, params, job_ref) - })?; - - let check_output = decl_env.create_function("check_output", move |ctx, job_ref, (command, params): (LuaValue, LuaValue)| { - lua_exports::check_output_impl(ctx, command, params, job_ref) - })?; - - let metric = decl_env.create_function("metric", move |_, job_ref, (name, value): (String, String)| { - lua_exports::metric(name, value, job_ref) - })?; - - let now_ms = decl_env.create_function("now_ms", move |_, job_ref, ()| Ok(crate::io::now_ms()))?; - - let artifact = decl_env.create_function("artifact", move |_, job_ref, (path, name): (String, Option)| { - lua_exports::artifact(path, name, job_ref) - })?; - - let error = decl_env.create_function("error", move |_, job_ref, msg: String| { - Err::<(), LuaError>(LuaError::RuntimeError(format!("explicit error: {}", msg))) - })?; - - let path_has_cmd = decl_env.create_function("path_has_cmd", move |_, job_ref, name: String| { - lua_exports::has_cmd(&name) - })?; - - let size_of_file = decl_env.create_function("size_of_file", move |_, job_ref, name: String| { - lua_exports::file_size(&name) - })?; - - let native_rust_triple = match std::env::consts::ARCH { - "x86_64" => "x86_64-unknown-linux-gnu", - "aarch64" => "aarch64-unknown-linux-gnu", - other => { panic!("dunno native rust triple for arch {}", other); } - }; - let native_rust_triple = lua_ctx.create_string(native_rust_triple).unwrap(); - let build_env_vars = lua_ctx.create_table_from( - vec![ - ("native_rust_triple", native_rust_triple) - ] - ).unwrap(); - - let build_environment = lua_ctx.create_table_from( - vec![ - ("has", path_has_cmd), - ("size", size_of_file), - ] - ).unwrap(); - build_environment.set("vars", build_env_vars).unwrap(); - - let build_functions = lua_ctx.create_table_from( - vec![ - ("hello", hello), - ("run", build), - ("dependencies", check_dependencies), - ("metric", metric), - ("error", error), - ("artifact", artifact), - ("now_ms", now_ms), - ("check_output", check_output), - ] - ).unwrap(); - build_functions.set("environment", build_environment).unwrap(); - let current_commit = self.job.lock().unwrap().job.commit.clone(); - build_functions.set("sha", lua_ctx.create_string(current_commit.as_bytes()).unwrap()).unwrap(); - let globals = lua_ctx.globals(); - globals.set("Build", build_functions).unwrap(); - - - let step_start = decl_env.create_function("step_start", move |_, job_ref, name: String| { - lua_exports::step::start(job_ref, name) - })?; - - let step_push = decl_env.create_function("step_push", move |_, job_ref, name: String| { - lua_exports::step::push(job_ref, name) - })?; - - let step_advance = decl_env.create_function("step_advance", move |_, job_ref, name: String| { - lua_exports::step::advance(job_ref, name) - })?; - - let step_functions = lua_ctx.create_table_from( - vec![ - ("start", step_start), - ("push", step_push), - ("advance", step_advance), - ] - ).unwrap(); - globals.set("Step", step_functions).unwrap(); - Ok(()) - } - - pub async fn run_build(self, script: &[u8]) -> Result<(), LuaError> { - let script = script.to_vec(); - let res: Result<(), LuaError> = tokio::task::spawn_blocking(|| { - std::thread::spawn(move || { - self.lua.context(|lua_ctx| { - lua_ctx.load(&script) - .set_name("goodfile")? - .exec() - }) - }).join().unwrap() - }).await.unwrap(); - eprintln!("lua res: {:?}", res); - res - } -} diff --git a/src/main.rs b/src/main.rs deleted file mode 100644 index d11df1f..0000000 --- a/src/main.rs +++ /dev/null @@ -1,1092 +0,0 @@ -#![allow(dead_code)] -#![allow(unused_variables)] -#![allow(unused_imports)] - -use chrono::{Utc, TimeZone}; -use lazy_static::lazy_static; -use std::sync::RwLock; -use std::collections::HashMap; -use serde_derive::{Deserialize, Serialize}; -use tokio::spawn; -use std::path::PathBuf; -use axum_server::tls_rustls::RustlsConfig; -use axum::routing::*; -use axum::Router; -use axum::response::{IntoResponse, Response, Html}; -use std::net::SocketAddr; -use axum::extract::{Path, State}; -use http_body::combinators::UnsyncBoxBody; -use axum::{Error, Json}; -use axum::extract::rejection::JsonRejection; -use axum::body::Bytes; -use axum::http::{StatusCode, Uri}; -use http::header::HeaderMap; -use tokio::sync::mpsc; -use tokio_stream::wrappers::ReceiverStream; -use axum::body::StreamBody; - -use std::sync::Arc; -use std::time::{SystemTime, UNIX_EPOCH}; - -use hmac::{Hmac, Mac}; -use sha2::Sha256; - -mod io; -mod sql; -mod notifier; -mod dbctx; -mod protocol; - -use sql::RunState; - -use dbctx::{DbCtx, Job, Run, ArtifactRecord}; - -use rusqlite::OptionalExtension; - -#[derive(Serialize, Deserialize)] -struct WebserverConfig { - psks: Vec, - jobs_path: PathBuf, - config_path: PathBuf, - db_path: PathBuf, - debug_addr: Option, - server_addr: Option, -} - -#[derive(Clone)] -struct WebserverState { - jobs_path: PathBuf, - dbctx: Arc, -} - -#[derive(Clone, Serialize, Deserialize)] -struct GithubPsk { - key: String, - gh_user: String, -} - -lazy_static! { - static ref PSKS: RwLock> = RwLock::new(Vec::new()); -} - -#[derive(Copy, Clone, Debug)] -enum GithubHookError { - BodyNotObject, - MissingElement { path: &'static str }, - BadType { path: &'static str, expected: &'static str }, -} - -#[derive(Debug)] -enum GithubEvent { - Push { tip: String, repo_name: String, head_commit: serde_json::Map, pusher: serde_json::Map }, - Other {} -} - -/// return a duration rendered as the largest two non-zero units. -/// -/// 60000ms -> 1m -/// 60001ms -> 1m -/// 61000ms -> 1m1s -/// 1030ms -> 1.03s -fn duration_as_human_string(duration_ms: u64) -> String { - let duration_sec = duration_ms / 1000; - let duration_min = duration_sec / 60; - let duration_hours = duration_min / 60; - - let duration_ms = duration_ms % 1000; - let duration_sec = duration_sec % 60; - let duration_min = duration_min % 60; - // no need to clamp hours, we're gonna just hope that it's a reasonable number of hours - - if duration_hours != 0 { - let mut res = format!("{}h", duration_hours); - if duration_min != 0 { - res.push_str(&format!("{}m", duration_min)); - } - res - } else if duration_min != 0 { - let mut res = format!("{}m", duration_min); - if duration_min != 0 { - res.push_str(&format!("{}s", duration_sec)); - } - res - } else { - let mut res = format!("{}", duration_sec); - if duration_ms != 0 { - res.push_str(&format!(".{:03}", duration_ms)); - } - res.push('s'); - res - } -} - -/// try producing a url for whatever caused this job to be started, if possible -fn commit_url(job: &Job, commit_sha: &str, ctx: &Arc) -> Option { - let remote = ctx.remote_by_id(job.remote_id).expect("query succeeds").expect("existing job references existing remote"); - - match remote.remote_api.as_str() { - "github" => { - Some(format!("{}/commit/{}", remote.remote_url, commit_sha)) - }, - "email" => { - None - }, - _ => { - None - } - } -} - -/// produce a url to the ci.butactuallyin.space job details page -fn job_url(job: &Job, commit_sha: &str, ctx: &Arc) -> String { - let remote = ctx.remote_by_id(job.remote_id).expect("query succeeds").expect("existing job references existing remote"); - - if remote.remote_api != "github" { - eprintln!("job url for remote type {} can't be constructed, i think", &remote.remote_api); - } - - format!("{}/{}", &remote.remote_path, commit_sha) -} - -/// render how long a run took, or is taking, in a human-friendly way -fn display_run_time(run: &Run) -> String { - if let Some(start_time) = run.start_time { - if let Some(complete_time) = run.complete_time { - if complete_time < start_time { - if run.state == RunState::Started { - // this run has been restarted. the completed time is stale. - // further, this is a currently active run. - let now_ms = SystemTime::now().duration_since(UNIX_EPOCH).expect("now is after then").as_millis() as u64; - let mut duration = duration_as_human_string(now_ms - start_time); - duration.push_str(" (ongoing)"); - duration - } else { - "invalid data".to_string() - } - } else { - let duration_ms = complete_time - start_time; - let duration = duration_as_human_string(duration_ms); - duration - } - } else { - if run.state != RunState::Invalid { - let now_ms = SystemTime::now().duration_since(UNIX_EPOCH).expect("now is after then").as_millis() as u64; - let mut duration = duration_as_human_string(now_ms - start_time); - duration.push_str(" (ongoing)"); - duration - } else { - "n/a".to_string() - } - } - } else { - "not yet run".to_owned() - } -} - -fn parse_push_event(body: serde_json::Value) -> Result { - let body = body.as_object() - .ok_or(GithubHookError::BodyNotObject)?; - - let tip = body.get("after") - .ok_or(GithubHookError::MissingElement { path: "after" })? - .as_str() - .ok_or(GithubHookError::BadType { path: "after", expected: "str" })? - .to_owned(); - - let repo_name = body.get("repository") - .ok_or(GithubHookError::MissingElement { path: "repository" })? - .as_object() - .ok_or(GithubHookError::BadType { path: "repository", expected: "obj" })? - .get("full_name") - .ok_or(GithubHookError::MissingElement { path: "repository/full_name" })? - .as_str() - .ok_or(GithubHookError::BadType { path: "repository/full_name", expected: "str" })? - .to_owned(); - - let head_commit = body.get("head_commit") - .ok_or(GithubHookError::MissingElement { path: "head_commit" })? - .as_object() - .ok_or(GithubHookError::BadType { path: "head_commit", expected: "obj" })? - .to_owned(); - - let pusher = body.get("pusher") - .ok_or(GithubHookError::MissingElement { path: "pusher" })? - .as_object() - .ok_or(GithubHookError::BadType { path: "pusher", expected: "obj" })? - .to_owned(); - - Ok(GithubEvent::Push { tip, repo_name, head_commit, pusher }) -} - -async fn process_push_event(ctx: Arc, owner: String, repo: String, event: GithubEvent) -> impl IntoResponse { - let (sha, repo, head_commit, pusher) = if let GithubEvent::Push { tip, repo_name, head_commit, pusher } = event { - (tip, repo_name, head_commit, pusher) - } else { - panic!("process push event on non-push event"); - }; - - println!("handling push event to {}/{}: sha {} in repo {}, {:?}\n pusher: {:?}", owner, repo, sha, repo, head_commit, pusher); - - // push event is in terms of a ref, but we don't know if it's a new commit (yet). - // in terms of CI jobs, we care mainly about new commits. - // so... - // * look up the commit, - // * if it known, bail out (new ref for existing commit we've already handled some way) - // * create a new commit ref - // * create a new job (state=pending) for the commit ref - let commit_id: Option = ctx.conn.lock().unwrap() - .query_row(sql::COMMIT_TO_ID, [sha.clone()], |row| row.get(0)) - .optional() - .expect("can run query"); - - if commit_id.is_some() { - eprintln!("commit already exists"); - return (StatusCode::OK, String::new()); - } - - let remote_url = format!("https://www.github.com/{}.git", repo); - eprintln!("looking for remote url: {}", remote_url); - let (remote_id, repo_id): (u64, u64) = match ctx.conn.lock().unwrap() - .query_row("select id, repo_id from remotes where remote_git_url=?1;", [&remote_url], |row| Ok((row.get(0).unwrap(), row.get(1).unwrap()))) - .optional() - .unwrap() { - Some(elems) => elems, - None => { - eprintln!("no remote registered for url {} (repo {})", remote_url, repo); - return (StatusCode::NOT_FOUND, String::new()); - } - }; - - let repo_default_run_pref: Option = ctx.conn.lock().unwrap() - .query_row("select default_run_preference from repos where id=?1;", [repo_id], |row| { - Ok((row.get(0)).unwrap()) - }) - .expect("can query"); - - let pusher_email = pusher - .get("email") - .expect("has email") - .as_str() - .expect("is str"); - - let job_id = ctx.new_job(remote_id, &sha, Some(pusher_email), repo_default_run_pref).unwrap(); - let _ = ctx.new_run(job_id, None).unwrap(); - - let notifiers = ctx.notifiers_by_repo(repo_id).expect("can get notifiers"); - - for notifier in notifiers { - notifier.tell_pending_job(&ctx, repo_id, &sha, job_id).await.expect("can notify"); - } - - (StatusCode::OK, String::new()) -} - -async fn handle_github_event(ctx: Arc, owner: String, repo: String, event_kind: String, body: serde_json::Value) -> Response> { - println!("got github event: {}, {}, {}", owner, repo, event_kind); - match event_kind.as_str() { - "push" => { - let push_event = parse_push_event(body) - .map_err(|e| { - eprintln!("TODO: handle push event error: {:?}", e); - panic!() - }) - .expect("parse works"); - let res = process_push_event(ctx, owner, repo, push_event).await; - "ok".into_response() - }, - "status" => { - eprintln!("[.] status update"); - "ok".into_response() - } - other => { - eprintln!("unhandled event kind: {}, repo {}/{}. content: {:?}", other, owner, repo, body); - "".into_response() - } - } -} - -async fn handle_ci_index(State(ctx): State) -> impl IntoResponse { - eprintln!("root index"); - let repos = match ctx.dbctx.get_repos() { - Ok(repos) => repos, - Err(e) => { - eprintln!("failed to get repos: {:?}", e); - return (StatusCode::INTERNAL_SERVER_ERROR, Html("gonna feel that one tomorrow".to_string())); - } - }; - - let mut response = String::new(); - - response.push_str("\n"); - response.push_str("\n"); - response.push_str("

builds and build accessories

\n"); - - match repos.len() { - 0 => { response.push_str(&format!("

no repos configured, so there are no builds

\n")); }, - 1 => { response.push_str("

1 repo configured

\n"); }, - other => { response.push_str(&format!("

{} repos configured

\n", other)); }, - } - - response.push_str("
{}
{}
"); - response.push_str("\n"); - let headings = ["repo", "last build", "job", "build commit", "duration", "status", "result"]; - for heading in headings { - response.push_str(&format!("", heading)); - } - response.push_str("\n"); - - let mut row_num = 0; - - for repo in repos { - let mut most_recent_run: Option<(Job, Run)> = None; - - for remote in ctx.dbctx.remotes_by_repo(repo.id).expect("remotes by repo works") { - let last_job = ctx.dbctx.last_job_from_remote(remote.id).expect("job by remote works"); - if let Some(last_job) = last_job { - if let Some(last_run) = ctx.dbctx.last_run_for_job(last_job.id).expect("can query") { - if most_recent_run.as_ref().map(|run| run.1.create_time < last_run.create_time).unwrap_or(true) { - most_recent_run = Some((last_job, last_run)); - } - } - } - } - - let repo_html = format!("{}", &repo.name, &repo.name); - - let row_html: String = match most_recent_run { - Some((job, run)) => { - let job_commit = ctx.dbctx.commit_sha(job.commit_id).expect("job has a commit"); - let commit_html = match commit_url(&job, &job_commit, &ctx.dbctx) { - Some(url) => format!("{}", url, &job_commit), - None => job_commit.clone() - }; - - let job_html = format!("{}", job_url(&job, &job_commit, &ctx.dbctx), job.id); - - let last_build_time = Utc.timestamp_millis_opt(run.create_time as i64).unwrap().to_rfc2822(); - let duration = display_run_time(&run); - - let status = format!("{:?}", run.state).to_lowercase(); - - let result = match run.build_result { - Some(0) => "pass", - Some(_) => "fail", - None => match run.state { - RunState::Pending => { "unstarted" }, - RunState::Started => { "in progress" }, - _ => { "unreported" } - } - }; - - let entries = [repo_html.as_str(), last_build_time.as_str(), job_html.as_str(), commit_html.as_str(), &duration, &status, &result]; - let entries = entries.iter().chain(std::iter::repeat(&"")).take(headings.len()); - - let mut row_html = String::new(); - for entry in entries { - row_html.push_str(&format!("", entry)); - } - row_html - } - None => { - let entries = [repo_html.as_str()]; - let entries = entries.iter().chain(std::iter::repeat(&"")).take(headings.len()); - - let mut row_html = String::new(); - for entry in entries { - row_html.push_str(&format!("", entry)); - } - row_html - } - }; - - let row_index = row_num % 2; - response.push_str(&format!("", ["even-row", "odd-row"][row_index])); - response.push_str(&row_html); - response.push_str(""); - response.push('\n'); - - row_num += 1; - } - response.push_str("
{}
{}{}
"); - - response.push_str("

active tasks

\n"); - - let runs = ctx.dbctx.get_active_runs().expect("can query"); - if runs.len() == 0 { - response.push_str("

(none)

\n"); - } else { - response.push_str(""); - response.push_str("\n"); - let headings = ["repo", "last build", "job", "build commit", "duration", "status", "result"]; - for heading in headings { - response.push_str(&format!("", heading)); - } - response.push_str("\n"); - - let mut row_num = 0; - - for run in runs.iter() { - let row_index = row_num % 2; - - let job = ctx.dbctx.job_by_id(run.job_id).expect("query succeeds").expect("job id is valid"); - let remote = ctx.dbctx.remote_by_id(job.remote_id).expect("query succeeds").expect("remote id is valid"); - let repo = ctx.dbctx.repo_by_id(remote.repo_id).expect("query succeeds").expect("repo id is valid"); - - let repo_html = format!("{}", &repo.name, &repo.name); - - let job_commit = ctx.dbctx.commit_sha(job.commit_id).expect("job has a commit"); - let commit_html = match commit_url(&job, &job_commit, &ctx.dbctx) { - Some(url) => format!("{}", url, &job_commit), - None => job_commit.clone() - }; - - let job_html = format!("{}", job_url(&job, &job_commit, &ctx.dbctx), job.id); - - let last_build_time = Utc.timestamp_millis_opt(run.create_time as i64).unwrap().to_rfc2822(); - let duration = display_run_time(&run); - - let status = format!("{:?}", run.state).to_lowercase(); - - let result = match run.build_result { - Some(0) => "pass", - Some(_) => "fail", - None => match run.state { - RunState::Pending => { "unstarted" }, - RunState::Started => { "in progress" }, - _ => { "unreported" } - } - }; - - let entries = [repo_html.as_str(), last_build_time.as_str(), job_html.as_str(), commit_html.as_str(), &duration, &status, &result]; - let entries = entries.iter().chain(std::iter::repeat(&"")).take(headings.len()); - - let mut row_html = String::new(); - for entry in entries { - row_html.push_str(&format!("", entry)); - } - - - response.push_str(&format!("", ["even-row", "odd-row"][row_index])); - response.push_str(&row_html); - response.push_str(""); - response.push('\n'); - - row_num += 1; - } - - response.push_str("
{}
{}
\n"); - } - - response.push_str(""); - - (StatusCode::OK, Html(response)) -} - -async fn handle_commit_status(Path(path): Path<(String, String, String)>, State(ctx): State) -> impl IntoResponse { - eprintln!("path: {}/{}, sha {}", path.0, path.1, path.2); - let remote_path = format!("{}/{}", path.0, path.1); - let sha = path.2; - - let (commit_id, sha): (u64, String) = if sha.len() >= 7 { - match ctx.dbctx.conn.lock().unwrap() - .query_row("select id, sha from commits where sha like ?1;", [&format!("{}%", sha)], |row| Ok((row.get_unwrap(0), row.get_unwrap(1)))) - .optional() - .expect("can query") { - Some((commit_id, sha)) => (commit_id, sha), - None => { - return (StatusCode::NOT_FOUND, Html("no such commit".to_string())); - } - } - } else { - return (StatusCode::NOT_FOUND, Html("no such commit".to_string())); - }; - - let short_sha = &sha[0..9]; - - let (remote_id, repo_id): (u64, u64) = ctx.dbctx.conn.lock().unwrap() - .query_row("select id, repo_id from remotes where remote_path=?1;", [&remote_path], |row| Ok((row.get_unwrap(0), row.get_unwrap(1)))) - .expect("can query"); - - let job = ctx.dbctx.job_by_commit_id(commit_id).expect("can query").expect("job exists"); - - let run = ctx.dbctx.last_run_for_job(job.id).expect("can query").expect("run exists"); - - let complete_time = run.complete_time.unwrap_or_else(crate::io::now_ms); - - let (status_elem, status_desc) = match run.state { - RunState::Pending | RunState::Started => { - ("pending", "βŒ›in progress") - }, - RunState::Finished => { - if let Some(build_result) = run.build_result { - if build_result == 0 { - ("pass", "βœ… passed") - } else { - ("failed", "❌ failed") - } - } else { - eprintln!("run {} for commit {} is missing a build result but is reportedly finished (old data)?", run.id, commit_id); - ("unreported", "❔ missing status") - } - }, - RunState::Error => { - ("error", "🧯 error, uncompleted") - } - RunState::Invalid => { - ("(server error)", "dude even i don't know") - } - }; - let debug_info = run.state == RunState::Finished && run.build_result == Some(1) || run.state == RunState::Error; - - let repo_name: String = ctx.dbctx.conn.lock().unwrap() - .query_row("select repo_name from repos where id=?1;", [repo_id], |row| row.get(0)) - .expect("can query"); - - let deployed = false; - - let mut head = String::new(); - head.push_str(""); - head.push_str(&format!("ci.butactuallyin.space - {}", repo_name)); - let include_og_tags = true; - if include_og_tags { - head.push_str("\n"); - head.push_str(&format!("\n")); - head.push_str(&format!("\n")); - head.push_str(&format!("\n", &path.0, &path.1, &sha)); - head.push_str(&format!("", &path.0, &path.1, &short_sha)); - let build_og_description = format!("commit {} of {}/{}, {} after {}", - short_sha, - path.0, path.1, - status_desc, - display_run_time(&run) - ); - head.push_str(&format!("", build_og_description)); - head.push_str(&format!("", build_og_description)); - } - head.push_str("\n"); - let repo_html = format!("{}", &repo_name, &repo_name); - let remote_commit_elem = format!("{}", &remote_path, &sha, &sha); - - let mut artifacts_fragment = String::new(); - let mut artifacts: Vec = ctx.dbctx.artifacts_for_run(run.id, None).unwrap() - .into_iter() // HACK: filter out artifacts for previous runs of a run. artifacts should be attached to a run, runs should be distinct from run. but i'm sleepy. - .filter(|artifact| artifact.created_time >= run.start_time.unwrap_or_else(crate::io::now_ms)) - .collect(); - - artifacts.sort_by_key(|artifact| artifact.created_time); - - fn diff_times(run_completed: u64, artifact_completed: Option) -> u64 { - let artifact_completed = artifact_completed.unwrap_or_else(crate::io::now_ms); - let run_completed = std::cmp::max(run_completed, artifact_completed); - run_completed - artifact_completed - } - - let recent_artifacts: Vec = artifacts.iter().filter(|artifact| diff_times(complete_time, artifact.completed_time) <= 60_000).cloned().collect(); - let old_artifacts: Vec = artifacts.iter().filter(|artifact| diff_times(complete_time, artifact.completed_time) > 60_000).cloned().collect(); - - for artifact in old_artifacts.iter() { - let created_time_str = Utc.timestamp_millis_opt(artifact.created_time as i64).unwrap().to_rfc2822(); - artifacts_fragment.push_str(&format!("
{}
step:
{}
\n", created_time_str, &artifact.name)); - let duration_str = duration_as_human_string(artifact.completed_time.unwrap_or_else(crate::io::now_ms) - artifact.created_time); - let size_str = (std::fs::metadata(&format!("./artifacts/{}/{}", artifact.run_id, artifact.id)).expect("metadata exists").len() / 1024).to_string(); - artifacts_fragment.push_str(&format!("
  {}kb in {} 
\n", size_str, duration_str)); - } - - for artifact in recent_artifacts.iter() { - let created_time_str = Utc.timestamp_millis_opt(artifact.created_time as i64).unwrap().to_rfc2822(); - artifacts_fragment.push_str(&format!("
{}
step:
{}
\n", created_time_str, &artifact.name)); - if debug_info { - artifacts_fragment.push_str("
");
-            artifacts_fragment.push_str(&std::fs::read_to_string(format!("./artifacts/{}/{}", artifact.run_id, artifact.id)).unwrap());
-            artifacts_fragment.push_str("
\n"); - } else { - let duration_str = duration_as_human_string(artifact.completed_time.unwrap_or_else(crate::io::now_ms) - artifact.created_time); - let size_str = std::fs::metadata(&format!("./artifacts/{}/{}", artifact.run_id, artifact.id)).map(|md| { - (md.len() / 1024).to_string() - }).unwrap_or_else(|e| format!("[{}]", e)); - artifacts_fragment.push_str(&format!("
  {}kb in {} 
\n", size_str, duration_str)); - } - } - - let metrics = summarize_job_metrics(&ctx.dbctx, run.id, run.job_id).unwrap(); - - let mut html = String::new(); - html.push_str("\n"); - html.push_str(&format!(" {}\n", head)); - html.push_str(" \n"); - html.push_str("
\n");
-    html.push_str(&format!("repo: {}\n", repo_html));
-    html.push_str(&format!("commit: {}, run: {}\n", remote_commit_elem, run.id));
-    html.push_str(&format!("status: {} in {}\n", status_elem, display_run_time(&run)));
-    if let Some(desc) = run.final_text.as_ref() {
-        html.push_str(&format!("  description: {}\n  ", desc));
-    }
-    html.push_str(&format!("deployed: {}\n", deployed));
-    html.push_str("    
\n"); - if artifacts_fragment.len() > 0 { - html.push_str("
artifacts
\n"); - html.push_str(&artifacts_fragment); - } - if let Some(metrics) = metrics { - html.push_str(&metrics); - } - html.push_str(" \n"); - html.push_str(""); - - (StatusCode::OK, Html(html)) -} - -fn summarize_job_metrics(dbctx: &Arc, run_id: u64, job_id: u64) -> Result, String> { - let runs = dbctx.runs_for_job_one_per_host(job_id)?; - - let mut section = String::new(); - section.push_str("
\n"); - section.push_str("

metrics

\n"); - section.push_str("\n"); - - if runs.len() == 1 { - let metrics = dbctx.metrics_for_run(run_id).unwrap(); - if metrics.is_empty() { - return Ok(None); - } - - section.push_str(""); - for metric in metrics { - section.push_str(&format!("", &metric.name, &metric.value)); - } - } else { - // very silly ordering issue: need an authoritative ordering of metrics to display metrics - // in a consistent order across runs (though they SHOULD all be ordered the same). - // - // the first run might not have all metrics (first run could be on the slowest build host - // f.ex), so for now just assume that metrics *will* be consistently ordered and build a - // list of metrics from the longest list of metrics we've seen. builders do not support - // concurrency so at least the per-run metrics-order-consistency assumption should hold.. - let mut all_names: Vec = Vec::new(); - - let all_metrics: Vec<(HashMap, HostDesc)> = runs.iter().map(|run| { - let metrics = dbctx.metrics_for_run(run.id).unwrap(); - - let mut metrics_map = HashMap::new(); - for metric in metrics.into_iter() { - if !all_names.contains(&metric.name) { - all_names.push(metric.name.clone()); - } - metrics_map.insert(metric.name, metric.value); - } - - let (hostname, cpu_vendor_id, cpu_family, cpu_model, cpu_max_freq_khz) = match run.host_id { - Some(host_id) => { - dbctx.host_model_info(host_id).unwrap() - } - None => { - ("unknown".to_string(), "unknown".to_string(), "0".to_string(), "0".to_string(), 0) - } - }; - - (metrics_map, HostDesc::from_parts(hostname, cpu_vendor_id, cpu_family, cpu_model, cpu_max_freq_khz)) - }).collect(); - - if all_metrics.is_empty() { - return Ok(None); - } - - let mut header = "".to_string(); - for (_, host) in all_metrics.iter() { - header.push_str(&format!("", &host.hostname, &host.cpu_desc, (host.cpu_max_freq_khz as f64) / 1000_000.0)); - } - header.push_str("\n"); - section.push_str(&header); - - for name in all_names.iter() { - let mut row = format!("", &name); - for (metrics, _) in all_metrics.iter() { - let value = metrics.get(name) - .map(|x| x.clone()) - .unwrap_or_else(String::new); - row.push_str(&format!("", value)); - } - row.push_str("\n"); - section.push_str(&row); - } - }; - section.push_str("
namevalue
{}{}
name{}
{} @ {:.3}GHz
{}{}
\n"); - section.push_str("
\n"); - - Ok(Some(section)) -} - -async fn handle_get_artifact(Path(path): Path<(String, String)>, State(ctx): State) -> impl IntoResponse { - eprintln!("get artifact, run={}, artifact={}", path.0, path.1); - let run: u64 = path.0.parse().unwrap(); - let artifact_id: u64 = path.1.parse().unwrap(); - - let artifact_descriptor = match ctx.dbctx.lookup_artifact(run, artifact_id).unwrap() { - Some(artifact) => artifact, - None => { - return (StatusCode::NOT_FOUND, Html("no such artifact")).into_response(); - } - }; - - let mut live_artifact = false; - - if let Some(completed_time) = artifact_descriptor.completed_time { - if completed_time < artifact_descriptor.created_time { - live_artifact = true; - } - } else { - live_artifact = true; - } - - if live_artifact { - let (mut tx_sender, tx_receiver) = tokio::io::duplex(65536); - let resp_body = axum_extra::body::AsyncReadBody::new(tx_receiver); - let mut artifact_path = ctx.jobs_path.clone(); - artifact_path.push(artifact_descriptor.run_id.to_string()); - artifact_path.push(artifact_descriptor.id.to_string()); - spawn(async move { - let mut artifact = artifact_descriptor; - - let mut artifact_file = tokio::fs::File::open(&artifact_path) - .await - .expect("artifact file exists?"); - while artifact.completed_time.is_none() { - match crate::io::forward_data(&mut artifact_file, &mut tx_sender).await { - Ok(()) => { - // reached the current EOF, wait and then commit an unspeakable sin - tokio::time::sleep(std::time::Duration::from_millis(250)).await; - // this would be much implemented as yielding on a condvar woken when an - // inotify event on the file indicates a write has occurred. but i am - // dreadfully lazy, so we'll just uhh. busy-poll on the file? lmao. - artifact = ctx.dbctx.lookup_artifact(artifact.run_id, artifact.id) - .expect("can query db") - .expect("artifact still exists"); - } - Err(e) => { - eprintln!("artifact file streaming failed: {}", e); - } - } - } - - eprintln!("[+] artifact {} is done being written, and we've sent the whole thing. bye!", artifact.id); - }); - (StatusCode::OK, resp_body).into_response() - } else { - (StatusCode::OK, Html("all done")).into_response() - } -} - -async fn handle_repo_summary(Path(path): Path, State(ctx): State) -> impl IntoResponse { - eprintln!("get repo summary: {:?}", path); - - let mut last_builds = Vec::new(); - - let (repo_id, repo_name, default_run_preference): (u64, String, Option) = match ctx.dbctx.conn.lock().unwrap() - .query_row("select id, repo_name, default_run_preference from repos where repo_name=?1;", [&path], |row| Ok((row.get(0).unwrap(), row.get(1).unwrap(), row.get(2).unwrap()))) - .optional() - .unwrap() { - Some(elem) => elem, - None => { - eprintln!("no repo named {}", path); - return (StatusCode::NOT_FOUND, Html(String::new())); - } - }; - - // TODO: display default_run_preference somehow on the web summary? - - for remote in ctx.dbctx.remotes_by_repo(repo_id).expect("can get repo from a path") { - let mut last_ten_jobs = ctx.dbctx.recent_jobs_from_remote(remote.id, 10).expect("can look up jobs for a repo"); - last_builds.extend(last_ten_jobs.drain(..)); - } - last_builds.sort_by_key(|job| -(job.created_time as i64)); - - let mut response = String::new(); - response.push_str("\n"); - response.push_str(&format!(" ci.butactuallyin.space - {} \n", repo_name)); - response.push_str("\n"); - response.push_str(&format!("

{} build history

\n", repo_name)); - response.push_str("full repos index

\n"); - - response.push_str(""); - response.push_str("\n"); - let headings = ["last build", "job", "build commit", "duration", "status", "result"]; - for heading in headings { - response.push_str(&format!("", heading)); - } - response.push_str("\n"); - - let mut row_num = 0; - - for job in last_builds.iter().take(10) { - let run = ctx.dbctx.last_run_for_job(job.id).expect("query succeeds").expect("TODO: run exists if job exists (small race if querying while creating job ...."); - let job_commit = ctx.dbctx.commit_sha(job.commit_id).expect("job has a commit"); - let commit_html = match commit_url(&job, &job_commit, &ctx.dbctx) { - Some(url) => format!("{}", url, &job_commit), - None => job_commit.clone() - }; - - let job_html = format!("{}", job_url(&job, &job_commit, &ctx.dbctx), job.id); - - let last_build_time = Utc.timestamp_millis_opt(run.create_time as i64).unwrap().to_rfc2822(); - let duration = display_run_time(&run); - - let status = format!("{:?}", run.state).to_lowercase(); - - let result = match run.build_result { - Some(0) => "pass", - Some(_) => "fail", - None => match run.state { - RunState::Pending => { "unstarted" }, - RunState::Started => { "in progress" }, - _ => { "unreported" } - } - }; - - let entries = [last_build_time.as_str(), job_html.as_str(), commit_html.as_str(), &duration, &status, &result]; - let entries = entries.iter().chain(std::iter::repeat(&"")).take(headings.len()); - - let mut row_html = String::new(); - for entry in entries { - row_html.push_str(&format!("", entry)); - } - - let row_index = row_num % 2; - response.push_str(&format!("", ["even-row", "odd-row"][row_index])); - response.push_str(&row_html); - response.push_str("\n"); - - row_num += 1; - } - response.push_str(""); - - (StatusCode::OK, Html(response)) -} - -async fn handle_repo_event(Path(path): Path<(String, String)>, headers: HeaderMap, State(ctx): State, body: Bytes) -> impl IntoResponse { - let json: Result = serde_json::from_slice(&body); - eprintln!("repo event: {:?} {:?} {:?}", path.0, path.1, headers); - - let payload = match json { - Ok(payload) => { payload }, - Err(e) => { - eprintln!("bad request: path={}/{}\nheaders: {:?}\nbody err: {:?}", path.0, path.1, headers, e); - return (StatusCode::BAD_REQUEST, "").into_response(); - } - }; - - let sent_hmac = match headers.get("x-hub-signature-256") { - Some(sent_hmac) => { sent_hmac.to_str().expect("valid ascii string").to_owned() }, - None => { - eprintln!("bad request: path={}/{}\nheaders: {:?}\nno x-hub-signature-256", path.0, path.1, headers); - return (StatusCode::BAD_REQUEST, "").into_response(); - } - }; - - let mut hmac_ok = false; - - for psk in PSKS.read().unwrap().iter() { - let mut mac = Hmac::::new_from_slice(psk.key.as_bytes()) - .expect("hmac can be constructed"); - mac.update(&body); - let result = mac.finalize().into_bytes().to_vec(); - - // hack: skip sha256= - let decoded = hex::decode(&sent_hmac[7..]).expect("provided hmac is valid hex"); - if decoded == result { - hmac_ok = true; - break; - } - } - - if !hmac_ok { - eprintln!("bad hmac by all psks"); - return (StatusCode::BAD_REQUEST, "").into_response(); - } - - let kind = match headers.get("x-github-event") { - Some(kind) => { kind.to_str().expect("valid ascii string").to_owned() }, - None => { - eprintln!("bad request: path={}/{}\nheaders: {:?}\nno x-github-event", path.0, path.1, headers); - return (StatusCode::BAD_REQUEST, "").into_response(); - } - }; - - handle_github_event(Arc::clone(&ctx.dbctx), path.0, path.1, kind, payload).await -} - - -async fn make_app_server(jobs_path: PathBuf, cfg_path: &PathBuf, db_path: &PathBuf) -> Router { - /* - - // GET /hello/warp => 200 OK with body "Hello, warp!" - let hello = warp::path!("hello" / String) - .map(|name| format!("Hello, {}!\n", name)); - - let github_event = warp::post() - .and(warp::path!(String / String)) - .and_then(|owner, repo| { - warp::header::("x-github-event") - .and(warp::body::content_length_limit(1024 * 1024)) - .and(warp::body::json()) - .and_then(|event, json| handle_github_event(owner, repo, event, json)) - .recover(|e| { - async fn handle_rejection(err: Rejection) -> Result { - Ok(warp::reply::with_status("65308", StatusCode::BAD_REQUEST)) - } - handle_rejection(e) - }) - }); - - let repo_status = warp::get() - .and(warp::path!(String / String / String)) - .map(|owner, repo, sha| format!("CI status for {}/{} commit {}\n", owner, repo, sha)); - - let other = - warp::post() - .and(warp::path::full()) - .and(warp::addr::remote()) - .and(warp::body::content_length_limit(1024 * 1024)) - .and(warp::body::bytes()) - .map(move |path, addr: Option, body| { - println!("{}: lets see what i got {:?}, {:?}", addr.unwrap(), path, body); - "hello :)\n" - }) - .or( - warp::get() - .and(warp::path::full()) - .and(warp::addr::remote()) - .map(move |path, addr: Option| { - println!("{}: GET to {:?}", addr.unwrap(), path); - "hello!\n" - }) - ) - .recover(|e| { - async fn handle_rejection(err: Rejection) -> Result { - Ok(warp::reply::with_status("50834", StatusCode::BAD_REQUEST)) - } - handle_rejection(e) - }); - */ - - async fn fallback_get(uri: Uri) -> impl IntoResponse { - (StatusCode::OK, "get resp") - } - - async fn fallback_post(Path(path): Path) -> impl IntoResponse { - "post resp" - } - - Router::new() - .route("/:owner/:repo/:sha", get(handle_commit_status)) - .route("/:owner", get(handle_repo_summary)) - .route("/:owner/:repo", post(handle_repo_event)) - .route("/artifact/:b/:artifact_id", get(handle_get_artifact)) - .route("/", get(handle_ci_index)) - .fallback(fallback_get) - .with_state(WebserverState { - jobs_path, - dbctx: Arc::new(DbCtx::new(cfg_path, db_path)) - }) -} - -async fn bind_server(conf: serde_json::Value, jobs_path: PathBuf, config_path: PathBuf, db_path: PathBuf) -> std::io::Result<()> { - let server = make_app_server(jobs_path.clone(), &config_path, &db_path).await.into_make_service(); - use serde_json::Value; - match conf { - Value::String(address) => { - axum_server::bind(address.parse().unwrap()) - .serve(server).await - }, - Value::Object(map) => { - let address = match map.get("address") { - Some(Value::String(address)) => address.clone(), - None => { - panic!("no local address"); - }, - other => { - panic!("invalid local address: {:?}", other); - } - }; - - match (map.get("cert_path"), map.get("key_path")) { - (Some(Value::String(cert_path)), Some(Value::String(key_path))) => { - let config = RustlsConfig::from_pem_file( - cert_path.clone(), - key_path.clone(), - ).await.unwrap(); - axum_server::bind_rustls(address.parse().unwrap(), config) - .serve(server).await - }, - (Some(_), _) | (_, Some(_)) => { - panic!("invalid local tls config: only one of `cert_path` or `key_path` has been provided"); - }, - (None, None) => { - axum_server::bind(address.parse().unwrap()) - .serve(server).await - } - } - }, - other => { - panic!("invalid server bind config: {:?}", other); - } - } -} - -#[tokio::main] -async fn main() { - tracing_subscriber::fmt::init(); - - let mut args = std::env::args(); - args.next().expect("first arg exists"); - let config_path = args.next().unwrap_or("./webserver_config.json".to_string()); - let web_config: WebserverConfig = serde_json::from_reader(std::fs::File::open(config_path).expect("file exists and is accessible")).expect("valid json for WebserverConfig"); - let mut psks = PSKS.write().expect("can write lock"); - *psks = web_config.psks.clone(); - // drop write lock so we can read PSKS elsewhere WITHOUT deadlocking. - std::mem::drop(psks); - - let jobs_path = web_config.jobs_path.clone(); - let config_path = web_config.config_path.clone(); - let db_path = web_config.db_path.clone(); - if let Some(addr_conf) = web_config.debug_addr.as_ref() { - spawn(bind_server(addr_conf.clone(), jobs_path.clone(), config_path.clone(), db_path.clone())); - } - if let Some(addr_conf) = web_config.server_addr.as_ref() { - spawn(bind_server(addr_conf.clone(), jobs_path.clone(), config_path.clone(), db_path.clone())); - } - loop { - tokio::time::sleep(std::time::Duration::from_millis(1000)).await; - } -} - -struct HostDesc { - hostname: String, - cpu_desc: String, - cpu_max_freq_khz: u64, -} -impl HostDesc { - fn from_parts(hostname: String, vendor_id: String, cpu_family: String, model: String, cpu_max_freq_khz: u64) -> Self { - let cpu_desc = match (vendor_id.as_str(), cpu_family.as_str(), model.as_str()) { - ("Arm Limited", "8", "0xd03") => "aarch64 A53".to_string(), - ("GenuineIntel", "6", "85") => "x86_64 Skylake".to_string(), - ("AuthenticAMD", "23", "113") => "x86_64 Matisse".to_string(), - (vendor, family, model) => format!("unknown {}:{}:{}", vendor, family, model) - }; - - HostDesc { - hostname, - cpu_desc, - cpu_max_freq_khz, - } - } -} diff --git a/src/notifier.rs b/src/notifier.rs deleted file mode 100644 index f9d6084..0000000 --- a/src/notifier.rs +++ /dev/null @@ -1,181 +0,0 @@ -use serde_derive::{Deserialize, Serialize}; -use std::sync::Arc; -use axum::http::StatusCode; -use lettre::transport::smtp::authentication::{Credentials, Mechanism}; -use lettre::{Message, Transport}; -use lettre::transport::smtp::extension::ClientId; -use lettre::transport::smtp::client::{SmtpConnection, TlsParametersBuilder}; -use std::time::Duration; -use std::path::Path; - -use crate::DbCtx; - -pub struct RemoteNotifier { - pub remote_path: String, - pub notifier: NotifierConfig, -} - -#[derive(Serialize, Deserialize)] -#[serde(untagged)] -pub enum NotifierConfig { - GitHub { - token: String, - }, - Email { - username: String, - password: String, - mailserver: String, - from: String, - to: String, - } -} - -impl NotifierConfig { - pub fn github_from_file>(path: P) -> Result { - let path = path.as_ref(); - let bytes = std::fs::read(path) - .map_err(|e| format!("can't read notifier config at {}: {:?}", path.display(), e))?; - let config = serde_json::from_slice(&bytes) - .map_err(|e| format!("can't deserialize notifier config at {}: {:?}", path.display(), e))?; - - if matches!(config, NotifierConfig::GitHub { .. }) { - Ok(config) - } else { - Err(format!("config at {} doesn't look like a github config (but was otherwise valid?)", path.display())) - } - } - - pub fn email_from_file>(path: P) -> Result { - let path = path.as_ref(); - let bytes = std::fs::read(path) - .map_err(|e| format!("can't read notifier config at {}: {:?}", path.display(), e))?; - let config = serde_json::from_slice(&bytes) - .map_err(|e| format!("can't deserialize notifier config at {}: {:?}", path.display(), e))?; - - if matches!(config, NotifierConfig::Email { .. }) { - Ok(config) - } else { - Err(format!("config at {} doesn't look like an email config (but was otherwise valid?)", path.display())) - } - } -} - -impl RemoteNotifier { - pub async fn tell_pending_job(&self, ctx: &Arc, repo_id: u64, sha: &str, job_id: u64) -> Result<(), String> { - self.tell_job_status( - ctx, - repo_id, sha, job_id, - "pending", "build is queued", &format!("https://{}/{}/{}", "ci.butactuallyin.space", &self.remote_path, sha) - ).await - } - - pub async fn tell_complete_job(&self, ctx: &Arc, repo_id: u64, sha: &str, job_id: u64, desc: Result) -> Result<(), String> { - match desc { - Ok(status) => { - self.tell_job_status( - ctx, - repo_id, sha, job_id, - "success", &status, &format!("https://{}/{}/{}", "ci.butactuallyin.space", &self.remote_path, sha) - ).await - }, - Err(status) => { - self.tell_job_status( - ctx, - repo_id, sha, job_id, - "failure", &status, &format!("https://{}/{}/{}", "ci.butactuallyin.space", &self.remote_path, sha) - ).await - } - } - } - - pub async fn tell_job_status(&self, ctx: &Arc, repo_id: u64, sha: &str, job_id: u64, state: &str, desc: &str, target_url: &str) -> Result<(), String> { - match &self.notifier { - NotifierConfig::GitHub { token } => { - let status_info = serde_json::json!({ - "state": state, - "description": desc, - "target_url": target_url, - "context": "actuallyinspace runner", - }); - - // TODO: should pool (probably in ctx?) to have an upper bound in concurrent - // connections. - let client = reqwest::Client::new(); - let req = client.post(&format!("https://api.github.com/repos/{}/statuses/{}", &self.remote_path, sha)) - .body(serde_json::to_string(&status_info).expect("can stringify json")) - .header("content-type", "application/json") - .header("user-agent", "iximeow") - .header("authorization", format!("Bearer {}", token)) - .header("accept", "application/vnd.github+json"); - eprintln!("sending {:?}", req); - eprintln!(" body: {}", serde_json::to_string(&status_info).expect("can stringify json")); - let res = req - .send() - .await; - - match res { - Ok(res) => { - if res.status() == StatusCode::OK || res.status() == StatusCode::CREATED{ - Ok(()) - } else { - Err(format!("bad response: {}, response data: {:?}", res.status().as_u16(), res)) - } - } - Err(e) => { - Err(format!("failure sending request: {:?}", e)) - } - } - } - NotifierConfig::Email { username, password, mailserver, from, to } => { - eprintln!("[.] emailing {} for job {} via {}", state, &self.remote_path, mailserver); - - let subject = format!("{}: job for {}", state, &self.remote_path); - - let body = format!("{}", subject); - - // TODO: when ci.butactuallyin.space has valid certs again, ... fix this. - let tls = TlsParametersBuilder::new(mailserver.to_string()) - .dangerous_accept_invalid_certs(true) - .build() - .unwrap(); - - let mut mailer = SmtpConnection::connect( - mailserver, - Some(Duration::from_millis(5000)), - &ClientId::Domain("ci.butactuallyin.space".to_string()), - None, - None, - ).unwrap(); - - mailer.starttls( - &tls, - &ClientId::Domain("ci.butactuallyin.space".to_string()), - ).unwrap(); - - let resp = mailer.auth( - &[Mechanism::Plain, Mechanism::Login], - &Credentials::new(username.to_owned(), password.to_owned()) - ).unwrap(); - assert!(resp.is_positive()); - - let email = Message::builder() - .from(from.parse().unwrap()) - .to(to.parse().unwrap()) - .subject(&subject) - .body(body) - .unwrap(); - - match mailer.send(email.envelope(), &email.formatted()) { - Ok(_) => { - eprintln!("[+] notified {}@{}", username, mailserver); - Ok(()) - } - Err(e) => { - eprintln!("[-] could not send email: {:?}", e); - Err(e.to_string()) - } - } - } - } - } -} diff --git a/src/protocol.rs b/src/protocol.rs deleted file mode 100644 index c7a9318..0000000 --- a/src/protocol.rs +++ /dev/null @@ -1,114 +0,0 @@ -use serde::{Serialize, Deserialize}; - -#[derive(Serialize, Deserialize, Debug)] -#[serde(tag = "kind")] -#[serde(rename_all = "snake_case")] -pub enum ClientProto { - Started, - ArtifactCreate, - NewTask(RequestedJob), - NewTaskPlease { allowed_pushers: Option>, host_info: HostInfo }, - Metric { name: String, value: String }, - Command(CommandInfo), - TaskStatus(TaskInfo), - Ping, - Pong, -} - -#[derive(Serialize, Deserialize, Debug)] -#[serde(tag = "command_info")] -#[serde(rename_all = "snake_case")] -pub enum CommandInfo { - Started { command: Vec, cwd: Option, id: u32 }, - Finished { exit_code: Option, id: u32 }, -} - -#[derive(Serialize, Deserialize, Debug)] -#[serde(tag = "task_info")] -#[serde(rename_all = "snake_case")] -pub enum TaskInfo { - Finished { status: String }, - Interrupted { status: String, description: Option }, -} - -impl ClientProto { - pub fn metric(name: impl Into, value: impl Into) -> Self { - ClientProto::Metric { name: name.into(), value: value.into() } - } - - pub fn command(state: CommandInfo) -> Self { - ClientProto::Command(state) - } - - pub fn new_task_please(allowed_pushers: Option>, host_info: HostInfo) -> Self { - ClientProto::NewTaskPlease { allowed_pushers, host_info } - } - - pub fn task_status(state: TaskInfo) -> Self { - ClientProto::TaskStatus(state) - } - - pub fn new_task(task: RequestedJob) -> Self { - ClientProto::NewTask(task) - } -} - -impl CommandInfo { - pub fn started(command: impl Into>, cwd: Option<&str>, id: u32) -> Self { - CommandInfo::Started { command: command.into(), cwd: cwd.map(ToOwned::to_owned), id } - } - - pub fn finished(exit_code: Option, id: u32) -> Self { - CommandInfo::Finished { exit_code, id } - } -} - -impl TaskInfo { - pub fn finished(status: impl Into) -> Self { - TaskInfo::Finished { status: status.into() } - } - - pub fn interrupted(status: impl Into, description: impl Into>) -> Self { - TaskInfo::Interrupted { status: status.into(), description: description.into() } - } -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct HostInfo { - pub hostname: String, - pub cpu_info: CpuInfo, - pub memory_info: MemoryInfo, - pub env_info: EnvInfo, -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct CpuInfo { - pub model_name: String, - pub microcode: String, - pub cores: u32, - pub vendor_id: String, - pub family: String, - pub model: String, - // clock speed in khz - pub max_freq: u64, -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct MemoryInfo { - pub total: String, - pub available: String, -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct EnvInfo { - pub arch: String, - pub family: String, - pub os: String, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct RequestedJob { - pub commit: String, - pub remote_url: String, - pub build_token: String, -} diff --git a/src/sql.rs b/src/sql.rs deleted file mode 100644 index 1071279..0000000 --- a/src/sql.rs +++ /dev/null @@ -1,229 +0,0 @@ -#![allow(dead_code)] - -use std::convert::TryFrom; - -use crate::dbctx::Run; -use crate::dbctx::Job; - -#[derive(Debug, Clone)] -pub enum JobResult { - Pass = 0, - Fail = 1, -} - -#[derive(Debug, Copy, Clone, PartialEq)] -pub enum RunState { - Pending = 0, - Started = 1, - Finished = 2, - Error = 3, - Invalid = 4, -} - -impl TryFrom for RunState { - type Error = String; - - fn try_from(value: u8) -> Result { - match value { - 0 => Ok(RunState::Pending), - 1 => Ok(RunState::Started), - 2 => Ok(RunState::Finished), - 3 => Ok(RunState::Error), - 4 => Ok(RunState::Invalid), - other => Err(format!("invalid job state: {}", other)), - } - } -} - -pub(crate) fn row2run(row: &rusqlite::Row) -> Run { - let (id, job_id, artifacts_path, state, host_id, build_token, create_time, start_time, complete_time, run_timeout, build_result, final_text) = row.try_into().unwrap(); - let state: u8 = state; - Run { - id, - job_id, - artifacts_path, - state: state.try_into().unwrap(), - host_id, - create_time, - start_time, - complete_time, - build_token, - run_timeout, - build_result, - final_text, - } -} - -// remote_id is the remote from which we were notified. this is necessary so we know which remote -// to pull from to actually run the job. -pub const CREATE_JOBS_TABLE: &'static str = "\ - CREATE TABLE IF NOT EXISTS jobs (id INTEGER PRIMARY KEY AUTOINCREMENT, - source TEXT, - created_time INTEGER, - remote_id INTEGER, - commit_id INTEGER, - run_preferences TEXT);"; - -pub const CREATE_METRICS_TABLE: &'static str = "\ - CREATE TABLE IF NOT EXISTS metrics (id INTEGER PRIMARY KEY AUTOINCREMENT, - job_id INTEGER, - name TEXT, - value TEXT, - UNIQUE(job_id, name) - );"; - -pub const CREATE_COMMITS_TABLE: &'static str = "\ - CREATE TABLE IF NOT EXISTS commits (id INTEGER PRIMARY KEY AUTOINCREMENT, sha TEXT UNIQUE);"; - -pub const CREATE_REPOS_TABLE: &'static str = "\ - CREATE TABLE IF NOT EXISTS repos (id INTEGER PRIMARY KEY AUTOINCREMENT, - repo_name TEXT, - default_run_preference TEXT);"; - -// remote_api is `github` or NULL for now. hopefully a future cgit-style notifier one day. -// remote_path is some unique identifier for the relevant remote. -// * for `github` remotes, this will be `owner/repo`. -// * for others.. who knows. -// remote_url is a url for human interaction with the remote (think https://git.iximeow.net/zvm) -// remote_git_url is a url that can be `git clone`'d to fetch sources -pub const CREATE_REMOTES_TABLE: &'static str = "\ - CREATE TABLE IF NOT EXISTS remotes (id INTEGER PRIMARY KEY AUTOINCREMENT, - repo_id INTEGER, - remote_path TEXT, - remote_api TEXT, - remote_url TEXT, - remote_git_url TEXT, - notifier_config_path TEXT);"; - -pub const CREATE_ARTIFACTS_TABLE: &'static str = "\ - CREATE TABLE IF NOT EXISTS artifacts (id INTEGER PRIMARY KEY AUTOINCREMENT, - run_id INTEGER, - name TEXT, - desc TEXT, - created_time INTEGER, - completed_time INTEGER);"; - -pub const CREATE_RUNS_TABLE: &'static str = "\ - CREATE TABLE IF NOT EXISTS runs (id INTEGER PRIMARY KEY AUTOINCREMENT, - job_id INTEGER, - artifacts_path TEXT, - state INTEGER NOT NULL, - host_id INTEGER, - build_token TEXT, - created_time INTEGER, - started_time INTEGER, - complete_time INTEGER, - run_timeout INTEGER, - build_result INTEGER, - final_status TEXT);"; - -pub const CREATE_HOSTS_TABLE: &'static str = "\ - CREATE TABLE IF NOT EXISTS hosts (id INTEGER PRIMARY KEY AUTOINCREMENT, - hostname TEXT, - cpu_vendor_id TEXT, - cpu_model_name TEXT, - cpu_family TEXT, - cpu_model TEXT, - cpu_microcode TEXT, - cpu_max_freq_khz INTEGER, - cpu_cores INTEGER, - mem_total TEXT, - arch TEXT, - family TEXT, - os TEXT, - UNIQUE(hostname, cpu_vendor_id, cpu_model_name, cpu_family, cpu_model, cpu_microcode, cpu_cores, mem_total, arch, family, os));"; - -pub const CREATE_REMOTES_INDEX: &'static str = "\ - CREATE INDEX IF NOT EXISTS 'repo_to_remote' ON remotes(repo_id);"; - -pub const CREATE_REPO_NAME_INDEX: &'static str = "\ - CREATE UNIQUE INDEX IF NOT EXISTS 'repo_names' ON repos(repo_name);"; - -pub const PENDING_RUNS: &'static str = "\ - select id, job_id, created_time, host_preference from runs where state=0 and (host_preference=?1 or host_preference is null) order by created_time desc;"; - -pub const JOBS_NEEDING_HOST_RUN: &'static str = "\ - select jobs.id, jobs.source, jobs.created_time, jobs.remote_id, jobs.commit_id, jobs.run_preferences from jobs \ - where jobs.run_preferences=\"all\" and jobs.created_time > ?1 \ - and not exists \ - (select 1 from runs r2 where r2.job_id = jobs.id and r2.host_id = ?2);"; - -pub const ACTIVE_RUNS: &'static str = "\ - select id, - job_id, - artifacts_path, - state, - host_id, - build_token, - created_time, - started_time, - complete_time, - run_timeout, - build_result, - final_status from runs where state=1 or state=0;"; - -pub const LAST_ARTIFACTS_FOR_RUN: &'static str = "\ - select * from artifacts where run_id=?1 and (name like \"%(stderr)%\" or name like \"%(stdout)%\") order by id desc limit ?2;"; - -pub const JOB_BY_COMMIT_ID: &'static str = "\ - select id, source, created_time, remote_id, commit_id, run_preferences from jobs where commit_id=?1;"; - -pub const ARTIFACT_BY_ID: &'static str = "\ - select * from artifacts where id=?1 and run_id=?2;"; - -pub const JOB_BY_ID: &'static str = "\ - select id, source, created_time, remote_id, commit_id, run_preferences from jobs where id=?1"; - -pub const METRICS_FOR_RUN: &'static str = "\ - select * from metrics where run_id=?1 order by id asc;"; - -pub const METRICS_FOR_JOB: &'static str = "\ - select metrics.id, metrics.run_id, metrics.name, metrics.value from metrics \ - join runs on runs.id=metrics.run_id \ - where runs.job_id=?1 \ - order by metrics.run_id desc, metrics.id desc;"; - -pub const COMMIT_TO_ID: &'static str = "\ - select id from commits where sha=?1;"; - -pub const REMOTES_FOR_REPO: &'static str = "\ - select * from remotes where repo_id=?1;"; - -pub const ALL_REPOS: &'static str = "\ - select id, repo_name, default_run_preference from repos;"; - -pub const LAST_JOBS_FROM_REMOTE: &'static str = "\ - select id, source, created_time, remote_id, commit_id, run_preferences from jobs where remote_id=?1 order by created_time desc limit ?2;"; - -pub const LAST_RUN_FOR_JOB: &'static str = "\ - select id, - job_id, - artifacts_path, - state, - host_id, - build_token, - created_time, - started_time, - complete_time, - run_timeout, - build_result, - final_status from runs where job_id=?1 order by started_time desc limit 1;"; - -pub const RUNS_FOR_JOB: &'static str = "\ - select id, - job_id, - artifacts_path, - state, - host_id, - build_token, - created_time, - started_time, - complete_time, - run_timeout, - build_result, - final_status from runs where job_id=?1 group by host_id order by started_time desc, state asc;"; - -pub const SELECT_ALL_RUNS_WITH_JOB_INFO: &'static str = "\ - select jobs.id as job_id, runs.id as run_id, runs.state, runs.created_time, jobs.commit_id, jobs.run_preferences - from jobs join runs on jobs.id=runs.job_id - oder by runs.created_time asc;"; -- cgit v1.1
{}
{}