From 7734472faa9e0ecb18bab286055aceaa13c9c947 Mon Sep 17 00:00:00 2001 From: iximeow Date: Sun, 4 Dec 2022 21:07:57 +0000 Subject: add ci driver, webserver. they work. "work" is a strong word. --- src/ci_driver.rs | 93 +++++++++++ src/main.rs | 476 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/sql.rs | 55 +++++++ 3 files changed, 624 insertions(+) create mode 100644 src/ci_driver.rs create mode 100644 src/main.rs create mode 100644 src/sql.rs (limited to 'src') diff --git a/src/ci_driver.rs b/src/ci_driver.rs new file mode 100644 index 0000000..fd6f813 --- /dev/null +++ b/src/ci_driver.rs @@ -0,0 +1,93 @@ +use rusqlite::Connection; +use std::process::Command; +use std::path::{Path, PathBuf}; + +mod sql; + +use std::time::{SystemTime, UNIX_EPOCH}; + +fn reserve_artifacts_dir(job: u64) -> std::io::Result { + let mut path: PathBuf = "/root/ixi_ci_server/jobs/".into(); + path.push(job.to_string()); + std::fs::create_dir(&path)?; + Ok(path) +} + +fn activate_job(connection: &mut Connection, job: u64, artifacts: Option, state: u8, run_host: Option, commit_id: u64, repo_url: String, repo_name: String) { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("now is before epoch") + .as_millis(); + + let commit_sha: String = connection + .query_row("select sha from commits where id=?1", [commit_id], |row| row.get(0)) + .expect("can run query"); + + let artifacts: PathBuf = match artifacts { + Some(artifacts) => PathBuf::from(artifacts), + None => reserve_artifacts_dir(job).expect("can reserve a directory for artifacts") + }; + + if run_host == None { + eprintln!("need to find a host to run the job"); + } + + eprintln!("cloning {}", repo_url); + let mut repo_dir = artifacts.clone(); + repo_dir.push("repo"); + eprintln!(" ... into {}", repo_dir.display()); + + Command::new("git") + .arg("clone") + .arg(repo_url) + .arg(&format!("{}", repo_dir.display())) + .status() + .expect("can clone the repo"); + + eprintln!("checking out {}", commit_sha); + Command::new("git") + .current_dir(&repo_dir) + .arg("checkout") + .arg(commit_sha) + .status() + .expect("can checkout hash"); + + eprintln!("running {}", repo_name); + /* + * find the CI script, figure out how to run it + */ + + connection.execute( + "update jobs set started_time=?1, run_host=?2, state=1, artifacts_path=?3 where id=?4", + (now as u64, "test host".to_string(), format!("{}", artifacts.display()), job) + ) + .expect("can update"); +} + +fn main() { + let mut connection = Connection::open("/root/ixi_ci_server/state.db").unwrap(); + connection.execute(sql::CREATE_JOBS_TABLE, ()).unwrap(); + connection.execute(sql::CREATE_COMMITS_TABLE, ()).unwrap(); + connection.execute(sql::CREATE_REPOS_TABLE, ()).unwrap(); + connection.execute(sql::CREATE_REMOTES_TABLE, ()).unwrap(); + + loop { + let mut pending_query = connection.prepare(sql::PENDING_JOBS).unwrap(); + let mut jobs = pending_query.query([]).unwrap(); + let mut to_start = Vec::new(); + while let Some(row) = jobs.next().unwrap() { + let (id, artifacts, state, run_host, commit_id, repo_url, repo_name): (u64, Option, u8, Option, u64, String, String)= TryInto::try_into(row).unwrap(); + to_start.push((id, artifacts, state, run_host, commit_id, repo_url, repo_name)); + } + std::mem::drop(jobs); + std::mem::drop(pending_query); + if to_start.len() > 0 { + println!("{} new jobs", to_start.len()); + + for job in to_start.into_iter() { + activate_job(&mut connection, job.0, job.1, job.2, job.3, job.4, job.5, job.6); + } + } + std::thread::sleep(std::time::Duration::from_millis(100)); + } +} diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..1b6d9e8 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,476 @@ +#![allow(dead_code)] +#![allow(unused_variables)] + +use tokio::spawn; +use std::path::PathBuf; +use serde_derive::{Deserialize, Serialize}; +use axum_server::tls_rustls::RustlsConfig; +use axum::routing::*; +use axum::Router; +use axum::response::{IntoResponse, Response}; +use std::net::SocketAddr; +use axum::extract::{Path, State}; +use http_body::combinators::UnsyncBoxBody; +use axum::{Error, Json}; +use axum::extract::rejection::JsonRejection; +use axum::body::Bytes; +use axum::http::{StatusCode, Uri}; +use http::header::HeaderMap; + +use std::sync::Arc; +use std::sync::Mutex; +use std::time::{SystemTime, UNIX_EPOCH}; + +use hmac::{Hmac, Mac}; +use sha2::Sha256; + +mod sql; + +use rusqlite::{Connection, OptionalExtension}; + +#[derive(Copy, Clone, Debug)] +enum GithubHookError { + BodyNotObject, + MissingElement { path: &'static str }, + BadType { path: &'static str, expected: &'static str }, +} + +#[derive(Debug)] +enum GithubEvent { + Push { tip: String, repo_name: String, head_commit: serde_json::Map }, + Other {} +} + +fn parse_push_event(body: serde_json::Value) -> Result { + let body = body.as_object() + .ok_or(GithubHookError::BodyNotObject)?; + + let tip = body.get("after") + .ok_or(GithubHookError::MissingElement { path: "after" })? + .as_str() + .ok_or(GithubHookError::BadType { path: "after", expected: "str" })? + .to_owned(); + + let repo_name = body.get("repository") + .ok_or(GithubHookError::MissingElement { path: "repository" })? + .as_object() + .ok_or(GithubHookError::BadType { path: "repository", expected: "obj" })? + .get("full_name") + .ok_or(GithubHookError::MissingElement { path: "repository/full_name" })? + .as_str() + .ok_or(GithubHookError::BadType { path: "repository/full_name", expected: "str" })? + .to_owned(); + + let head_commit = body.get("head_commit") + .ok_or(GithubHookError::MissingElement { path: "head_commit" })? + .as_object() + .ok_or(GithubHookError::BadType { path: "head_commit", expected: "obj" })? + .to_owned(); + + Ok(GithubEvent::Push { tip, repo_name, head_commit }) +} + +async fn process_push_event(ctx: Arc, owner: String, repo: String, event: GithubEvent) -> impl IntoResponse { + let (sha, repo, head_commit) = if let GithubEvent::Push { tip, repo_name, head_commit } = event { + (tip, repo_name, head_commit) + } else { + panic!("process push event on non-push event"); + }; + + println!("handling push event to {}/{}: sha {} in repo {}, {:?}", owner, repo, sha, repo, head_commit); + + // push event is in terms of a ref, but we don't know if it's a new commit (yet). + // in terms of CI jobs, we care mainly about new commits. + // so... + // * look up the commit, + // * if it known, bail out (new ref for existing commit we've already handled some way) + // * create a new commit ref + // * create a new job (state=pending) for the commit ref + let commit_id: Option = ctx.conn.lock().unwrap() + .query_row(sql::COMMIT_TO_ID, [sha.clone()], |row| row.get(0)) + .optional() + .expect("can run query"); + + if commit_id.is_some() { + eprintln!("commit already exists"); + return (StatusCode::OK, String::new()); + } + + let remote_url = format!("https://github.com/{}.git", repo); + let (remote_id, repo_id): (u64, u64) = ctx.conn.lock().unwrap() + .query_row("select id, repo_id from remotes where remote_git_url=?1;", [&remote_url], |row| Ok((row.get(0).unwrap(), row.get(1).unwrap()))) + .unwrap(); + + let job_id = ctx.new_job(remote_id, &sha).unwrap(); + + let notifiers = ctx.notifiers_by_name(&repo).expect("can get notifiers"); + + for notifier in notifiers { + notifier.tell_pending_job(&ctx, repo_id, &sha, job_id).await.expect("can notify"); + } + + (StatusCode::OK, String::new()) +} + +async fn handle_github_event(ctx: Arc, owner: String, repo: String, event_kind: String, body: serde_json::Value) -> Response> { + println!("got github event: {}, {}, {}", owner, repo, event_kind); + match event_kind.as_str() { + "push" => { + let push_event = parse_push_event(body) + .map_err(|e| { + eprintln!("TODO: handle push event error: {:?}", e); + panic!() + }) + .expect("parse works"); + let res = process_push_event(ctx, owner, repo, push_event).await; + "ok".into_response() + }, + other => { + eprintln!("unhandled event kind: {}, repo {}/{}. content: {:?}", other, owner, repo, body); + "".into_response() + } + } +} + +async fn handle_commit_status(Path(path): Path<(String, String, String)>, State(ctx): State>) -> impl IntoResponse { + eprintln!("path: {}/{}, sha {}", path.0, path.1, path.2); + let time = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("now is before epoch"); + + format!("requested: {:?}", path) +} + +async fn handle_repo_event(Path(path): Path<(String, String)>, headers: HeaderMap, State(ctx): State>, body: Bytes) -> impl IntoResponse { + let json: Result = serde_json::from_slice(&body); + eprintln!("repo event: {:?} {:?} {:?}", path.0, path.1, headers); + + let payload = match json { + Ok(payload) => { payload }, + Err(e) => { + eprintln!("bad request: path={}/{}\nheaders: {:?}\nbody err: {:?}", path.0, path.1, headers, e); + return (StatusCode::BAD_REQUEST, "").into_response(); + } + }; + + let sent_hmac = match headers.get("x-hub-signature-256") { + Some(sent_hmac) => { sent_hmac.to_str().expect("valid ascii string").to_owned() }, + None => { + eprintln!("bad request: path={}/{}\nheaders: {:?}\nno x-hub-signature-256", path.0, path.1, headers); + return (StatusCode::BAD_REQUEST, "").into_response(); + } + }; + + let mut mac = Hmac::::new_from_slice(GITHUB_PSK) + .expect("hmac can be constructed"); + mac.update(&body); + let result = mac.finalize().into_bytes().to_vec(); + + // hack: skip sha256= + let decoded = hex::decode(&sent_hmac[7..]).expect("provided hmac is valid hex"); + if decoded != result { + eprintln!("bad hmac:\n\ + got: {:?}\n\ + expected: {:?}", decoded, result); + return (StatusCode::BAD_REQUEST, "").into_response(); + } + + let kind = match headers.get("x-github-event") { + Some(kind) => { kind.to_str().expect("valid ascii string").to_owned() }, + None => { + eprintln!("bad request: path={}/{}\nheaders: {:?}\nno x-github-event", path.0, path.1, headers); + return (StatusCode::BAD_REQUEST, "").into_response(); + } + }; + + handle_github_event(ctx, path.0, path.1, kind, payload).await +} + +struct DbCtx { + conn: Mutex, +} + +struct RemoteNotifier { + remote_path: String, + notifier: NotifierConfig, +} + +#[derive(Serialize, Deserialize)] +#[serde(untagged)] +enum NotifierConfig { + GitHub { + token: String, + }, + Email { + username: String, + password: String, + mailserver: String, + from: String, + to: String, + } +} + +impl NotifierConfig { + fn github_from_file(path: &str) -> Result { + let bytes = std::fs::read(path) + .map_err(|e| format!("can't read notifier config at {}: {:?}", path, e))?; + let config = serde_json::from_slice(&bytes) + .map_err(|e| format!("can't deserialize notifier config at {}: {:?}", path, e))?; + + if matches!(config, NotifierConfig::GitHub { .. }) { + Ok(config) + } else { + Err(format!("config at {} doesn't look like a github config (but was otherwise valid?)", path)) + } + } + + fn email_from_file(path: &str) -> Result { + let bytes = std::fs::read(path) + .map_err(|e| format!("can't read notifier config at {}: {:?}", path, e))?; + let config = serde_json::from_slice(&bytes) + .map_err(|e| format!("can't deserialize notifier config at {}: {:?}", path, e))?; + + if matches!(config, NotifierConfig::Email { .. }) { + Ok(config) + } else { + Err(format!("config at {} doesn't look like an email config (but was otherwise valid?)", path)) + } + } +} + +impl RemoteNotifier { + async fn tell_pending_job(&self, ctx: &Arc, repo_id: u64, sha: &str, job_id: u64) -> Result<(), String> { + match &self.notifier { + NotifierConfig::GitHub { token } => { + let status_info = serde_json::json!({ + "state": "pending", + "target_url": format!( + "https://{}/{}/{}", + "ci.butactuallyin.space", + &self.remote_path, + sha, + ), + "description": "build is queued", + "context": "actuallyinspace runner", + }); + + // TODO: should pool (probably in ctx?) to have an upper bound in concurrent + // connections. + let client = reqwest::Client::new(); + let res = client.post(&format!("https://api.github.com/repos/{}/statuses/{}", &self.remote_path, sha)) + .body(serde_json::to_string(&status_info).expect("can stringify json")) + .header("authorization", format!("Bearer {}", token)) + .header("accept", "application/vnd.github+json") + .send() + .await; + + match res { + Ok(res) => { + if res.status() == StatusCode::OK { + Ok(()) + } else { + Err(format!("bad response: {}, response data: {:?}", res.status().as_u16(), res)) + } + } + Err(e) => { + Err(format!("failure sending request: {:?}", e)) + } + } + } + NotifierConfig::Email { username, password, mailserver, from, to } => { + panic!("should send an email saying that a job is now pending for `sha`") + } + } + } +} + +impl DbCtx { + fn new(db_path: &'static str) -> Self { + DbCtx { + conn: Mutex::new(Connection::open(db_path).unwrap()) + } + } + + fn new_commit(&self, sha: &str) -> Result { + let conn = self.conn.lock().unwrap(); + conn + .execute( + "insert into commits (sha) values (?1)", + [sha.clone()] + ) + .expect("can insert"); + + Ok(conn.last_insert_rowid() as u64) + } + + fn new_job(&self, remote_id: u64, sha: &str) -> Result { + // TODO: potential race: if two remotes learn about a commit at the same time and we decide + // to create two jobs at the same time, this might return an incorrect id if the insert + // didn't actually insert a new row. + let commit_id = self.new_commit(sha).expect("can create commit record"); + + let created_time = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("now is before epoch") + .as_millis() as u64; + + let conn = self.conn.lock().unwrap(); + + let rows_modified = conn.execute( + "insert into jobs (state, remote_id, commit_id, created_time) values (?1, ?2, ?3, ?4);", + (sql::JobState::Pending as u64, remote_id, commit_id, created_time) + ).unwrap(); + + assert_eq!(1, rows_modified); + + Ok(conn.last_insert_rowid() as u64) + } + + fn notifiers_by_name(&self, repo: &str) -> Result, String> { + let maybe_repo_id: Option = self.conn.lock() + .unwrap() + .query_row("select * from repos where repo_name=?1", [repo], |row| row.get(0)) + .optional() + .expect("query succeeds"); + match maybe_repo_id { + Some(repo_id) => { + // get remotes + + #[derive(Debug)] + #[allow(dead_code)] + struct Remote { + id: u64, + repo_id: u64, + remote_path: String, + remote_api: String, + notifier_config_path: String, + } + + let mut remotes: Vec = Vec::new(); + + let conn = self.conn.lock().unwrap(); + let mut remotes_query = conn.prepare(sql::REMOTES_FOR_REPO).unwrap(); + let mut remote_results = remotes_query.query([repo_id]).unwrap(); + + while let Some(row) = remote_results.next().unwrap() { + let (id, repo_id, remote_path, remote_api, notifier_config_path) = row.try_into().unwrap(); + remotes.push(Remote { id, repo_id, remote_path, remote_api, notifier_config_path }); + } + + let mut notifiers: Vec = Vec::new(); + + for remote in remotes.into_iter() { + match remote.remote_api.as_str() { + "github" => { + let notifier = RemoteNotifier { + remote_path: remote.remote_path, + notifier: NotifierConfig::github_from_file(&remote.notifier_config_path) + .expect("can load notifier config") + }; + notifiers.push(notifier); + }, + "email" => { + let notifier = RemoteNotifier { + remote_path: remote.remote_path, + notifier: NotifierConfig::email_from_file(&remote.notifier_config_path) + .expect("can load notifier config") + }; + notifiers.push(notifier); + } + other => { + eprintln!("unknown remote api kind: {:?}, remote is {:?}", other, &remote) + } + } + } + + Ok(notifiers) + } + None => { + return Err(format!("repo '{}' is not known", repo)); + } + } + } +} + +async fn make_app_server(db_path: &'static str) -> Router { + /* + + // GET /hello/warp => 200 OK with body "Hello, warp!" + let hello = warp::path!("hello" / String) + .map(|name| format!("Hello, {}!\n", name)); + + let github_event = warp::post() + .and(warp::path!(String / String)) + .and_then(|owner, repo| { + warp::header::("x-github-event") + .and(warp::body::content_length_limit(1024 * 1024)) + .and(warp::body::json()) + .and_then(|event, json| handle_github_event(owner, repo, event, json)) + .recover(|e| { + async fn handle_rejection(err: Rejection) -> Result { + Ok(warp::reply::with_status("65308", StatusCode::BAD_REQUEST)) + } + handle_rejection(e) + }) + }); + + let repo_status = warp::get() + .and(warp::path!(String / String / String)) + .map(|owner, repo, sha| format!("CI status for {}/{} commit {}\n", owner, repo, sha)); + + let other = + warp::post() + .and(warp::path::full()) + .and(warp::addr::remote()) + .and(warp::body::content_length_limit(1024 * 1024)) + .and(warp::body::bytes()) + .map(move |path, addr: Option, body| { + println!("{}: lets see what i got {:?}, {:?}", addr.unwrap(), path, body); + "hello :)\n" + }) + .or( + warp::get() + .and(warp::path::full()) + .and(warp::addr::remote()) + .map(move |path, addr: Option| { + println!("{}: GET to {:?}", addr.unwrap(), path); + "hello!\n" + }) + ) + .recover(|e| { + async fn handle_rejection(err: Rejection) -> Result { + Ok(warp::reply::with_status("50834", StatusCode::BAD_REQUEST)) + } + handle_rejection(e) + }); + */ + + async fn fallback_get(uri: Uri) -> impl IntoResponse { + (StatusCode::OK, "get resp") + } + + async fn fallback_post(Path(path): Path) -> impl IntoResponse { + "post resp" + } + + Router::new() + .route("/:owner/:repo/:sha", get(handle_commit_status)) + .route("/:owner/:repo", post(handle_repo_event)) + .fallback(fallback_get) + .with_state(Arc::new(DbCtx::new(db_path))) +} + +#[tokio::main] +async fn main() { + tracing_subscriber::fmt::init(); + let config = RustlsConfig::from_pem_file( + PathBuf::from("/etc/letsencrypt/live/ci.butactuallyin.space/fullchain.pem"), + PathBuf::from("/etc/letsencrypt/live/ci.butactuallyin.space/privkey.pem"), + ).await.unwrap(); + spawn(axum_server::bind_rustls("127.0.0.1:8080".parse().unwrap(), config.clone()) + .serve(make_app_server("/root/ixi_ci_server/state.db").await.into_make_service())); + axum_server::bind_rustls("0.0.0.0:443".parse().unwrap(), config) + .serve(make_app_server("/root/ixi_ci_server/state.db").await.into_make_service()) + .await + .unwrap(); +} diff --git a/src/sql.rs b/src/sql.rs new file mode 100644 index 0000000..ee334c1 --- /dev/null +++ b/src/sql.rs @@ -0,0 +1,55 @@ +#![allow(dead_code)] + +pub enum JobState { + Pending = 0, + Started = 1, + Complete = 2, + Error = 3, +} + +// remote_id is the remote from which we were notified. this is necessary so we know which remote +// to pull from to actually run the job. +pub const CREATE_JOBS_TABLE: &'static str = "\ + CREATE TABLE IF NOT EXISTS jobs (id INTEGER PRIMARY KEY AUTOINCREMENT, + artifacts_path TEXT, + state INTEGER NOT NULL, + run_host TEXT, + remote_id INTEGER, + commit_id INTEGER, + created_time INTEGER, + started_time INTEGER, + complete_time INTEGER);"; + +pub const CREATE_COMMITS_TABLE: &'static str = "\ + CREATE TABLE IF NOT EXISTS commits (id INTEGER PRIMARY KEY AUTOINCREMENT, sha TEXT UNIQUE);"; + +pub const CREATE_REPOS_TABLE: &'static str = "\ + CREATE TABLE IF NOT EXISTS repos (id INTEGER PRIMARY KEY AUTOINCREMENT, + repo_name TEXT);"; + +// remote_api is `github` or NULL for now. hopefully a future cgit-style notifier one day. +// remote_path is some unique identifier for the relevant remote. +// * for `github` remotes, this will be `owner/repo`. +// * for others.. who knows. +// remote_url is a url for human interaction with the remote (think https://git.iximeow.net/zvm) +// remote_git_url is a url that can be `git clone`'d to fetch sources +pub const CREATE_REMOTES_TABLE: &'static str = "\ + CREATE TABLE IF NOT EXISTS remotes (id INTEGER PRIMARY KEY AUTOINCREMENT, + repo_id INTEGER, + remote_path TEXT, + remote_api TEXT, + remote_url TEXT, + remote_git_url TEXT, + notifier_config_path TEXT);"; + +pub const CREATE_REMOTES_INDEX: &'static str = "\ + CREATE INDEX IF NOT EXISTS 'repo_to_remote' ON remotes(repo_id);"; + +pub const PENDING_JOBS: &'static str = "\ + select * from jobs where state=0;"; + +pub const COMMIT_TO_ID: &'static str = "\ + select id from commits where sha=?1;"; + +pub const REMOTES_FOR_REPO: &'static str = "\ + select * from remotes where repo_id=?1;"; -- cgit v1.1