diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/ci_ctl.rs | 184 | ||||
| -rw-r--r-- | src/ci_driver.rs | 310 | ||||
| -rw-r--r-- | src/ci_runner.rs | 269 | ||||
| -rw-r--r-- | src/dbctx.rs | 204 | ||||
| -rw-r--r-- | src/main.rs | 341 | ||||
| -rw-r--r-- | src/notifier.rs | 126 | ||||
| -rw-r--r-- | src/sql.rs | 23 | 
7 files changed, 1198 insertions, 259 deletions
| diff --git a/src/ci_ctl.rs b/src/ci_ctl.rs new file mode 100644 index 0000000..3f48907 --- /dev/null +++ b/src/ci_ctl.rs @@ -0,0 +1,184 @@ +use clap::{Parser, Subcommand}; + +mod sql; +mod dbctx; +mod notifier; + +use sql::JobState; +use dbctx::DbCtx; +use notifier::NotifierConfig; + +#[derive(Parser)] +#[command(version, about, long_about = None)] +struct Args { +    /// path to a database to manage (defaults to "./state.db") +    db_path: Option<String>, + +    /// path to where configs should be found (defaults to "./config") +    config_path: Option<String>, + +    #[command(subcommand)] +    command: Command, +} + +#[derive(Subcommand)] +enum Command { +    /// add _something_ to the db +    Add { +        #[command(subcommand)] +        what: AddItem, +    }, + +    /// make sure that the state looks reasonable. +    /// +    /// currently, ensure that all notifiers have a config, that config references an existing +    /// file, and that the referenced file is valid. +    Validate, + +    /// do something with jobs +    Job { +        #[command(subcommand)] +        what: JobAction, +    }, +} + +#[derive(Subcommand)] +enum JobAction { +    List, +    Rerun { +        which: u32 +    } +} + +#[derive(Subcommand)] +enum AddItem { +    Repo { +        name: String, +        remote: Option<String>, +        remote_kind: Option<String>, +        config: Option<String>, +    }, +    Remote { +        repo_name: String, +        remote: String, +        remote_kind: String, +        config: String, +    }, +} + +fn main() { +    let args = Args::parse(); + +    let db_path = args.db_path.unwrap_or_else(|| "./state.db".to_owned()); +    let config_path = args.config_path.unwrap_or_else(|| "./config".to_owned()); + +    match args.command { +        Command::Job { what } => { +            match what { +                JobAction::List => { +                    let db = DbCtx::new(&config_path, &db_path); +                    let mut conn = db.conn.lock().unwrap(); +                    let mut query = conn.prepare("select id, artifacts_path, state, commit_id, created_time from jobs;").unwrap(); +                    let mut jobs = query.query([]).unwrap(); +                    while let Some(row) = jobs.next().unwrap() { +                        let (id, artifacts, state, commit_id, created_time): (u64, Option<String>, u64, u64, u64) = row.try_into().unwrap(); + +                        eprintln!("[+] {:04} | {: >8?} | {}", id, state, created_time); +                    } +                    eprintln!("jobs"); +                }, +                JobAction::Rerun { which } => { +                    let db = DbCtx::new(&config_path, &db_path); +                    db.conn.lock().unwrap().execute("update jobs set state=0 where id=?1", [which]) +                        .expect("works"); +                    eprintln!("[+] job {} set to pending", which); +                } +            } +        }, +        Command::Add { what } => { +            match what { +                AddItem::Repo { name, remote, remote_kind, config } => { +                    let remote_config = match (remote, remote_kind, config) { +                        (Some(remote), Some(remote_kind), Some(config_path)) => { +                            // do something +                            if remote_kind != "github" { +                                eprintln!("unknown remote kind: {}", remote); +                                return; +                            } +                            Some((remote, remote_kind, config_path)) +                        }, +                        (None, None, None) => { +                            None +                        }, +                        _ => { +                            eprintln!("when specifying a remote, `remote`, `remote_kind`, and `config_path` must either all be provided together or not at all"); +                            return; +                        } +                    }; + +                    let db = DbCtx::new(&config_path, &db_path); +                    let repo_id = match db.new_repo(&name) { +                        Ok(repo_id) => repo_id, +                        Err(e) => { +                            if e.contains("UNIQUE constraint failed") { +                                eprintln!("[!] repo '{}' already exists", name); +                                return; +                            } else { +                                eprintln!("[!] failed to create repo entry: {}", e); +                                return; +                            } +                        } +                    }; +                    println!("[+] new repo created: '{}' id {}", &name, repo_id); +                    if let Some((remote, remote_kind, config_path)) = remote_config { +                        let full_config_file_path = format!("{}/{}", &db.config_path, config_path); +                        let config = match remote_kind.as_ref() { +                            "github" => { +                                assert!(NotifierConfig::github_from_file(&full_config_file_path).is_ok()); +                            } +                            "email" => { +                                assert!(NotifierConfig::email_from_file(&full_config_file_path).is_ok()); +                            } +                            other => { +                                panic!("[-] notifiers for '{}' remotes are not supported", other); +                            } +                        }; +                        db.new_remote(repo_id, remote.as_str(), remote_kind.as_str(), config_path.as_str()).unwrap(); +                        println!("[+] new remote created: repo '{}', {} remote at {}", &name, remote_kind, remote); +                    } +                }, +                AddItem::Remote { repo_name, remote, remote_kind, config } => { +                    let db = DbCtx::new(&config_path, &db_path); +                    let repo_id = match db.repo_id_by_name(&repo_name) { +                        Ok(Some(id)) => id, +                        Ok(None) => { +                            eprintln!("[-] repo '{}' does not exist", repo_name); +                            return; +                        }, +                        Err(e) => { +                            eprintln!("[!] couldn't look up repo '{}': {:?}", repo_name, e); +                            return; +                        } +                    }; +                    let config_file = format!("{}/{}", config_path, config); +                    match remote_kind.as_ref() { +                        "github" => { +                            NotifierConfig::github_from_file(&config_file).unwrap(); +                        } +                        "email" => { +                            NotifierConfig::email_from_file(&config_file).unwrap(); +                        } +                        other => { +                            panic!("notifiers for '{}' remotes are not supported", other); +                        } +                    }; +                    db.new_remote(repo_id, remote.as_str(), remote_kind.as_str(), config.as_str()).unwrap(); +                    println!("[+] new remote created: repo '{}', {} remote at {}", &repo_name, remote_kind, remote); +                }, +            } +        }, +        Command::Validate => { +            println!("ok"); +        } +    } +} diff --git a/src/ci_driver.rs b/src/ci_driver.rs index fd6f813..96e0d44 100644 --- a/src/ci_driver.rs +++ b/src/ci_driver.rs @@ -1,10 +1,29 @@ -use rusqlite::Connection;  use std::process::Command; +use futures_util::StreamExt; +use std::fmt;  use std::path::{Path, PathBuf}; +use tokio::spawn; +use tokio_stream::wrappers::ReceiverStream; +use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; +use axum_server::tls_rustls::RustlsConfig; +use axum::body::StreamBody; +use axum::http::{StatusCode}; +use axum::Router; +use axum::routing::*; +use axum::extract::State; +use axum::extract::BodyStream; +use axum::response::IntoResponse; +use tokio::sync::mpsc; +use tokio::sync::mpsc::error::TrySendError; +use serde_json::json; +mod dbctx;  mod sql; +mod notifier; -use std::time::{SystemTime, UNIX_EPOCH}; +use crate::dbctx::{DbCtx, PendingJob}; +use crate::sql::JobState;  fn reserve_artifacts_dir(job: u64) -> std::io::Result<PathBuf> {      let mut path: PathBuf = "/root/ixi_ci_server/jobs/".into(); @@ -13,33 +32,43 @@ fn reserve_artifacts_dir(job: u64) -> std::io::Result<PathBuf> {      Ok(path)  } -fn activate_job(connection: &mut Connection, job: u64, artifacts: Option<String>, state: u8, run_host: Option<String>, commit_id: u64, repo_url: String, repo_name: String) { +async fn activate_job(dbctx: Arc<DbCtx>, job: &PendingJob, clients: &mut mpsc::Receiver<RunnerClient>) -> Result<(), String> { +    let connection = dbctx.conn.lock().unwrap(); +    let (repo_id, remote_git_url): (u64, String) = connection +        .query_row("select repo_id, remote_git_url from remotes where id=?1", [job.remote_id], |row| Ok((row.get_unwrap(0), row.get_unwrap(1)))) +        .expect("query succeeds"); +    let repo_name: String = connection +        .query_row("select repo_name from repos where id=?1", [repo_id], |row| row.get(0)) +        .expect("query succeeds"); +      let now = SystemTime::now()          .duration_since(UNIX_EPOCH)          .expect("now is before epoch")          .as_millis();      let commit_sha: String = connection -        .query_row("select sha from commits where id=?1", [commit_id], |row| row.get(0)) +        .query_row("select sha from commits where id=?1", [job.commit_id], |row| row.get(0))          .expect("can run query"); -    let artifacts: PathBuf = match artifacts { +    let artifacts = PathBuf::from("/tmp"); +    /* +    let artifacts: PathBuf = match &job.artifacts {          Some(artifacts) => PathBuf::from(artifacts), -        None => reserve_artifacts_dir(job).expect("can reserve a directory for artifacts") +        None => reserve_artifacts_dir(job.id).expect("can reserve a directory for artifacts")      }; -    if run_host == None { +    if job.run_host.as_ref() == None {          eprintln!("need to find a host to run the job");      } -    eprintln!("cloning {}", repo_url); +    eprintln!("cloning {}", remote_git_url);      let mut repo_dir = artifacts.clone();      repo_dir.push("repo");      eprintln!(" ... into {}", repo_dir.display());      Command::new("git")          .arg("clone") -        .arg(repo_url) +        .arg(&remote_git_url)          .arg(&format!("{}", repo_dir.display()))          .status()          .expect("can clone the repo"); @@ -48,44 +77,265 @@ fn activate_job(connection: &mut Connection, job: u64, artifacts: Option<String>      Command::new("git")          .current_dir(&repo_dir)          .arg("checkout") -        .arg(commit_sha) +        .arg(&commit_sha)          .status()          .expect("can checkout hash"); +    */      eprintln!("running {}", repo_name);      /*       * find the CI script, figure out how to run it       */ +    let mut client_job = loop { +        let mut candidate = clients.recv().await +            .ok_or_else(|| "client channel disconnected".to_string())?; + +        if let Ok(Some(mut client_job)) = candidate.submit(&dbctx, &job, &remote_git_url, &commit_sha).await { +            break client_job; +        } else { +            // failed to submit job, move on for now +        } +    }; + +    let run_host = client_job.client.name.clone(); +      connection.execute(          "update jobs set started_time=?1, run_host=?2, state=1, artifacts_path=?3 where id=?4", -        (now as u64, "test host".to_string(), format!("{}", artifacts.display()), job) +        (now as u64, run_host, format!("{}", artifacts.display()), job.id)      )          .expect("can update"); + +    spawn(async move { +        client_job.run().await +    }); + +    Ok(())  } -fn main() { -    let mut connection = Connection::open("/root/ixi_ci_server/state.db").unwrap(); -    connection.execute(sql::CREATE_JOBS_TABLE, ()).unwrap(); -    connection.execute(sql::CREATE_COMMITS_TABLE, ()).unwrap(); -    connection.execute(sql::CREATE_REPOS_TABLE, ()).unwrap(); -    connection.execute(sql::CREATE_REMOTES_TABLE, ()).unwrap(); +struct RunnerClient { +    tx: mpsc::Sender<Result<String, String>>, +    rx: BodyStream, +    name: String, +    build_token: String, +} -    loop { -        let mut pending_query = connection.prepare(sql::PENDING_JOBS).unwrap(); -        let mut jobs = pending_query.query([]).unwrap(); -        let mut to_start = Vec::new(); -        while let Some(row) = jobs.next().unwrap() { -            let (id, artifacts, state, run_host, commit_id, repo_url, repo_name): (u64, Option<String>, u8, Option<String>, u64, String, String)= TryInto::try_into(row).unwrap(); -            to_start.push((id, artifacts, state, run_host, commit_id, repo_url, repo_name)); +fn random_name() -> String { +    "random name".to_string() +} + +fn token_for_job() -> String { +    "very secret token do not share".to_string() +} + +struct ClientJob { +    dbctx: Arc<DbCtx>, +    remote_git_url: String, +    sha: String, +    job: PendingJob, +    client: RunnerClient     +} + +impl ClientJob { +    pub async fn run(&mut self) { +        loop { +            let msg = self.client.recv().await.expect("recv works").expect("client sent an object"); +            let msg_kind = msg.as_object().unwrap().get("kind").unwrap().as_str().unwrap(); +            match msg_kind { +                "job_status" => { +                    let state = msg.as_object().unwrap().get("state").unwrap().as_str().unwrap(); +                    let (result, state): (Result<String, String>, JobState) = if state == "finished" { +                        let result = msg.as_object().unwrap().get("result").unwrap().as_str().unwrap(); +                        eprintln!("job update: state is {} and result is {}", state, result); +                        match result { +                            "pass" => { +                                (Ok("success".to_string()), JobState::Complete) +                            }, +                            other => { +                                (Err(other.to_string()), JobState::Error) +                            } +                        } +                    } else if state == "interrupted" { +                        let result = msg.as_object().unwrap().get("result").unwrap().as_str().unwrap(); +                        eprintln!("job update: state is {} and result is {}", state, result); +                        (Err(result.to_string()), JobState::Error) +                    } else { +                        eprintln!("job update: state is {}", state); +                        (Err(format!("atypical completion status: {}", state)), JobState::Invalid) +                    }; + +                    let repo_id = self.dbctx.repo_id_by_remote(self.job.remote_id).unwrap().expect("remote exists"); + +                    for notifier in self.dbctx.notifiers_by_repo(repo_id).expect("can get notifiers") { +                        notifier.tell_complete_job(&self.dbctx, repo_id, &self.sha, self.job.id, result.clone()).await.expect("can notify"); +                    } + +                    let now = SystemTime::now() +                        .duration_since(UNIX_EPOCH) +                        .expect("now is before epoch") +                        .as_millis(); + +                    self.dbctx.conn.lock().unwrap().execute( +                        "update jobs set complete_time=?1, state=?2 where id=?3", +                        (now as u64, state as u64, self.job.id) +                    ) +                        .expect("can update"); +                } +                "artifact_create" => { +                    eprintln!("creating artifact {:?}", msg); +                    self.client.send(serde_json::json!({ +                        "status": "ok", +                        "object_id": "10", +                    })).await.unwrap(); +                }, +                other => { +                    eprintln!("unhandled message kind {:?} ({:?})", msg_kind, msg); +                    return; +                } +            } +        } +    } +} + +impl RunnerClient { +    async fn new(sender: mpsc::Sender<Result<String, String>>, resp: BodyStream) -> Result<Self, String> { +        let name = random_name(); +        let token = token_for_job(); +        let client = RunnerClient { +            tx: sender, +            rx: resp, +            name, +            build_token: token, +        }; +        Ok(client) +    } + +    async fn send(&mut self, msg: serde_json::Value) -> Result<(), String> { +        self.tx.send(Ok(serde_json::to_string(&msg).unwrap())) +            .await +            .map_err(|e| e.to_string()) +    } + +    async fn recv(&mut self) -> Result<Option<serde_json::Value>, String> { +        match self.rx.next().await { +            Some(Ok(bytes)) => { +                serde_json::from_slice(&bytes) +                    .map(Option::Some) +                    .map_err(|e| e.to_string()) +            }, +            Some(Err(e)) => { +                eprintln!("e: {:?}", e); +                Err(format!("no client job: {:?}", e)) +            }, +            None => { +                eprintln!("no more body chunks? client hung up?"); +                Ok(None) +            } +        } +    } + +    async fn submit(mut self, dbctx: &Arc<DbCtx>, job: &PendingJob, remote_git_url: &str, sha: &str) -> Result<Option<ClientJob>, String> { +        self.send(serde_json::json!({ +            "commit": sha, +            "remote_url": remote_git_url, +            "build_token": &self.build_token, +        })).await?; +        match self.recv().await { +            Ok(Some(resp)) => { +                if resp == serde_json::json!({ +                    "status": "started" +                }) { +                    eprintln!("resp: {:?}", resp); +                    Ok(Some(ClientJob { +                        job: job.clone(), +                        dbctx: Arc::clone(dbctx), +                        sha: sha.to_string(), +                        remote_git_url: remote_git_url.to_string(), +                        client: self +                    })) +                } else { +                    Err("client rejected job".to_string()) +                } +            } +            Ok(None) => { +                eprintln!("no more body chunks? client hung up?"); +                Ok(None) +            } +            Err(e) => { +                eprintln!("e: {:?}", e); +                Err(e) +            } +        } +    } +} + +impl fmt::Debug for RunnerClient { +    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { +        f.write_str("RunnerClient { .. }") +    } +} + +async fn handle_next_job(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClient>)>, job_resp: BodyStream) -> impl IntoResponse { +    let (tx_sender, tx_receiver) = mpsc::channel(8); +    let resp_body = StreamBody::new(ReceiverStream::new(tx_receiver)); +    tx_sender.send(Ok("hello".to_string())).await.expect("works"); +    let client = RunnerClient::new(tx_sender, job_resp).await; +    match client { +        Ok(client) => { +            eprintln!("registering client"); +            match ctx.1.try_send(client) { +                Ok(()) => { +                    eprintln!("response established..."); +                    return (StatusCode::OK, resp_body); +                } +                Err(TrySendError::Full(client)) => { +                    return (StatusCode::IM_A_TEAPOT, resp_body); +                } +                Err(TrySendError::Closed(client)) => { +                    panic!("client holder is gone?"); +                } +            }          } -        std::mem::drop(jobs); -        std::mem::drop(pending_query); -        if to_start.len() > 0 { -            println!("{} new jobs", to_start.len()); +        Err(e) => { +            eprintln!("unable to register client"); +            return (StatusCode::MISDIRECTED_REQUEST, resp_body); +        } +    } +} + +async fn make_api_server(dbctx: Arc<DbCtx>) -> (Router, mpsc::Receiver<RunnerClient>) { +    let (pending_client_sender, pending_client_receiver) = mpsc::channel(8); + +    let router = Router::new() +        .route("/api/next_job", post(handle_next_job)) +        .with_state((dbctx, pending_client_sender)); +    (router, pending_client_receiver) +} + +#[tokio::main] +async fn main() { +    tracing_subscriber::fmt::init(); +    let config = RustlsConfig::from_pem_file( +        PathBuf::from("/etc/letsencrypt/live/ci.butactuallyin.space/fullchain.pem"), +        PathBuf::from("/etc/letsencrypt/live/ci.butactuallyin.space/privkey.pem"), +    ).await.unwrap(); + +    let dbctx = Arc::new(DbCtx::new("/root/ixi_ci_server/config", "/root/ixi_ci_server/state.db")); + +    let (api_server, mut channel) = make_api_server(Arc::clone(&dbctx)).await; +    spawn(axum_server::bind_rustls("0.0.0.0:9876".parse().unwrap(), config) +          .serve(api_server.into_make_service())); + +    dbctx.create_tables().unwrap(); + +    loop { +        let jobs = dbctx.get_pending_jobs().unwrap(); + +        if jobs.len() > 0 { +            println!("{} new jobs", jobs.len()); -            for job in to_start.into_iter() { -                activate_job(&mut connection, job.0, job.1, job.2, job.3, job.4, job.5, job.6); +            for job in jobs.into_iter() { +                activate_job(Arc::clone(&dbctx), &job, &mut channel).await;              }          }          std::thread::sleep(std::time::Duration::from_millis(100)); diff --git a/src/ci_runner.rs b/src/ci_runner.rs new file mode 100644 index 0000000..6554f05 --- /dev/null +++ b/src/ci_runner.rs @@ -0,0 +1,269 @@ +use std::time::Duration; +use reqwest::{StatusCode, Response}; +use std::process::Command; +use serde_derive::{Deserialize, Serialize}; +use serde::{Deserialize, de::DeserializeOwned, Serialize}; + +#[derive(Debug)] +enum WorkAcquireError { +    Reqwest(reqwest::Error), +    EarlyEof, +    Protocol(String), +} + +struct RunnerClient { +    host: String, +    tx: hyper::body::Sender, +    rx: Response, +    current_job: Option<RequestedJob>, +} + +#[derive(Debug, Serialize, Deserialize)] +struct RequestedJob { +    commit: String, +    remote_url: String, +    build_token: String, +} + +impl RequestedJob { +    // TODO: panics if hyper finds the channel is closed. hum +    async fn create_artifact(&self, client: &mut RunnerClient, name: &str, desc: &str) -> Result<ArtifactStream, String> { +        let create_artifact_message = serde_json::json!({ +            "kind": "artifact_create", +            "name": name, +            "description": desc, +            "job_token": &self.build_token, +        }); +        client.send(create_artifact_message).await +            .map_err(|e| format!("create artifact send error: {:?}", e))?; +        let resp = client.recv().await +            .map_err(|e| format!("create artifact recv error: {:?}", e))?; +        eprintln!("resp: {:?}", resp); +        let object_id = resp.unwrap() +            .as_object().expect("is an object") +            .get("object_id").unwrap().as_str().expect("is str") +            .to_owned(); +        // POST to this object id... +        Ok(ArtifactStream { +            object_id, +        }) +    } + +    async fn execute_goodfile(&self, client: &mut RunnerClient) -> Result<String, String> { +        let clone_log = self.create_artifact(client, "git clone log", &format!("git clone {} tmpdir", &self.remote_url)).await.expect("works"); + +        let clone_res = Command::new("git") +            .arg("clone") +            .arg(&self.remote_url) +            .arg("tmpdir") +            .status() +            .map_err(|e| format!("failed to run git clone? {:?}", e))?; + +        if !clone_res.success() { +            return Err(format!("git clone failed: {:?}", clone_res)); +        } + +        let checkout_log = self.create_artifact(client, "git checkout log", &format!("git checkout {}", &self.commit)).await.expect("works"); + +        let checkout_res = Command::new("git") +            .current_dir("tmpdir") +            .arg("checkout") +            .arg(&self.commit) +            .status() +            .map_err(|e| format!("failed to run git checkout? {:?}", e))?; + +        if !checkout_res.success() { +            return Err(format!("git checkout failed: {:?}", checkout_res)); +        } + +        let build_log = self.create_artifact(client, "cargo build log", "cargo build").await.expect("works"); + +        let build_res = Command::new("cargo") +            .current_dir("tmpdir") +            .arg("build") +            .status() +            .map_err(|e| format!("failed to run cargo build? {:?}", e))?; + +        if !build_res.success() { +            return Err(format!("cargo build failed: {:?}", build_res)); +        } + +        let test_log = self.create_artifact(client, "cargo test log", "cargo test").await.expect("works"); + +        let test_result = Command::new("cargo") +            .current_dir("tmpdir") +            .arg("test") +            .status() +            .map_err(|e| format!("failed to run cargo test? {:?}", e))?; + +        match test_result.code() { +            Some(0) => Ok("pass".to_string()), +            Some(n) => Ok(format!("error: {}", n)), +            None => Ok(format!("abnormal exit")), +        } +    } +} + +struct ArtifactStream { +    object_id: String, +} + +impl RunnerClient { +    async fn new(host: &str, sender: hyper::body::Sender, mut res: Response) -> Result<Self, String> { +        if res.status() != StatusCode::OK { +            return Err(format!("server returned a bad response: {:?}, response itself: {:?}", res.status(), res)); +        } + +        let hello = res.chunk().await.expect("chunk"); +        if hello.as_ref().map(|x| &x[..]) != Some(b"hello") { +            return Err(format!("bad hello: {:?}", hello)); +        } + +        Ok(Self { +            host: host.to_string(), +            tx: sender, +            rx: res, +            current_job: None, +        }) +    } + +    async fn wait_for_work(&mut self) -> Result<Option<RequestedJob>, WorkAcquireError> { +        match self.rx.chunk().await { +            Ok(Some(chunk)) => { +                eprintln!("got chunk: {:?}", &chunk); +                serde_json::from_slice(&chunk) +                    .map(Option::Some) +                    .map_err(|e| { +                        WorkAcquireError::Protocol(format!("not json: {:?}", e)) +                    }) +            } +            Ok(None) => { +                Ok(None) +            }, +            Err(e) => { +                Err(WorkAcquireError::Reqwest(e)) +            } +        } +    } + +    async fn recv(&mut self) -> Result<Option<serde_json::Value>, String> { +        self.recv_typed().await +    } + +    async fn recv_typed<T: DeserializeOwned>(&mut self) -> Result<Option<T>, String> { +        match self.rx.chunk().await { +            Ok(Some(chunk)) => { +                serde_json::from_slice(&chunk) +                    .map(Option::Some) +                    .map_err(|e| { +                        format!("not json: {:?}", e) +                    }) +            }, +            Ok(None) => Ok(None), +            Err(e) => { +                Err(format!("error in recv: {:?}", e)) +            } +        } +    } + +    async fn send(&mut self, value: serde_json::Value) -> Result<(), String> { +        self.tx.send_data( +            serde_json::to_vec(&value) +                .map_err(|e| format!("json error: {:?}", e))? +                .into() +        ).await +            .map_err(|e| format!("send error: {:?}", e)) +    } + +    async fn run_job(&mut self, job: RequestedJob) { +        self.send(serde_json::json!({ +            "status": "started" +        })).await.unwrap(); + +        std::fs::remove_dir_all("tmpdir").unwrap(); +        std::fs::create_dir("tmpdir").unwrap(); + +        let res = job.execute_goodfile(self).await; + +        match res { +            Ok(status) => { +                self.send(serde_json::json!({ +                    "kind": "job_status", +                    "state": "finished", +                    "result": status +                })).await.unwrap(); +            } +            Err(status) => { +                self.send(serde_json::json!({ +                    "kind": "job_status", +                    "state": "interrupted", +                    "result": status +                })).await.unwrap(); +            } +        } +    } +} + +#[tokio::main] +async fn main() { +    let secret = std::fs::read_to_string("./auth_secret").unwrap(); +    let client = reqwest::ClientBuilder::new() +        .connect_timeout(Duration::from_millis(1000)) +        .timeout(Duration::from_millis(600000)) +        .build() +        .expect("can build client"); + +    loop { +        let (mut sender, body) = hyper::Body::channel(); +        let poll = client.post("https://ci.butactuallyin.space:9876/api/next_job") +            .header("user-agent", "ci-butactuallyin-space-runner") +            .header("authorization", &secret) +            .body(body) +            .send() +            .await; + +        match poll { +            Ok(mut res) => { +                let mut client = match RunnerClient::new("ci.butactuallyin.space:9876", sender, res).await { +                    Ok(client) => client, +                    Err(e) => { +                        eprintln!("failed to initialize client: {:?}", e); +                        std::thread::sleep(Duration::from_millis(10000)); +                        continue; +                    } +                }; +                let job = match client.wait_for_work().await { +                    Ok(Some(request)) => request, +                    Ok(None) => { +                        eprintln!("no work to do (yet)"); +                        std::thread::sleep(Duration::from_millis(2000)); +                        continue; +                    } +                    Err(e) => { +                        eprintln!("failed to get work: {:?}", e); +                        std::thread::sleep(Duration::from_millis(10000)); +                        continue; +                    } +                }; +                eprintln!("requested work: {:?}", job); + +                eprintln!("doing {:?}", job); +                client.run_job(job).await; +                std::thread::sleep(Duration::from_millis(10000)); +            }, +            Err(e) => { +                let message = format!("{}", e); + +                if message.contains("tcp connect error") { +                    eprintln!("could not reach server. sleeping a bit and retrying."); +                    std::thread::sleep(Duration::from_millis(5000)); +                    continue; +                } + +                eprintln!("unhandled error: {}", message); + +                std::thread::sleep(Duration::from_millis(1000)); +            } +        } +    } +} diff --git a/src/dbctx.rs b/src/dbctx.rs new file mode 100644 index 0000000..3af2d56 --- /dev/null +++ b/src/dbctx.rs @@ -0,0 +1,204 @@ +use std::sync::Mutex; +use rusqlite::{Connection, OptionalExtension}; +use std::time::{SystemTime, UNIX_EPOCH}; + +use crate::notifier::{RemoteNotifier, NotifierConfig}; +use crate::sql; + +pub struct DbCtx { +    pub config_path: String, +    // don't love this but.. for now... +    pub conn: Mutex<Connection>, +} + +#[derive(Debug, Clone)] +pub struct PendingJob { +    pub id: u64, +    pub artifacts: Option<String>, +    pub state: sql::JobState, +    pub run_host: Option<String>, +    pub remote_id: u64, +    pub commit_id: u64, +    pub created_time: u64, +} + +impl DbCtx { +    pub fn new(config_path: &str, db_path: &str) -> Self { +        DbCtx { +            config_path: config_path.to_owned(), +            conn: Mutex::new(Connection::open(db_path).unwrap()) +        } +    } + +    pub fn create_tables(&self) -> Result<(), ()> { +        let conn = self.conn.lock().unwrap(); +        conn.execute(sql::CREATE_JOBS_TABLE, ()).unwrap(); +        conn.execute(sql::CREATE_COMMITS_TABLE, ()).unwrap(); +        conn.execute(sql::CREATE_REPOS_TABLE, ()).unwrap(); +        conn.execute(sql::CREATE_REPO_NAME_INDEX, ()).unwrap(); +        conn.execute(sql::CREATE_REMOTES_TABLE, ()).unwrap(); +        conn.execute(sql::CREATE_REMOTES_INDEX, ()).unwrap(); + +        Ok(()) +    } + +    pub fn new_commit(&self, sha: &str) -> Result<u64, String> { +        let conn = self.conn.lock().unwrap(); +        conn +            .execute( +                "insert into commits (sha) values (?1)", +                [sha.clone()] +            ) +            .expect("can insert"); + +        Ok(conn.last_insert_rowid() as u64) +    } + +    pub fn new_repo(&self, name: &str) -> Result<u64, String> { +        let conn = self.conn.lock().unwrap(); +        conn +            .execute( +                "insert into repos (repo_name) values (?1)", +                [name.clone()] +            ) +            .map_err(|e| { +                format!("{:?}", e) +            })?; + +        Ok(conn.last_insert_rowid() as u64) +    } + +    pub fn repo_id_by_remote(&self, remote_id: u64) -> Result<Option<u64>, String> { +        self.conn.lock() +            .unwrap() +            .query_row("select repo_id from remotes where id=?1", [remote_id], |row| row.get(0)) +            .optional() +            .map_err(|e| e.to_string()) +    } + +    pub fn repo_id_by_name(&self, repo_name: &str) -> Result<Option<u64>, String> { +        self.conn.lock() +            .unwrap() +            .query_row("select id from repos where repo_name=?1", [repo_name], |row| row.get(0)) +            .optional() +            .map_err(|e| e.to_string()) +    } + +    pub fn new_remote(&self, repo_id: u64, remote: &str, remote_kind: &str, config_path: &str) -> Result<u64, String> { +        let (remote_path, remote_api, remote_url, remote_git_url) = match remote_kind { +            "github" => { +                (remote.to_owned(), remote_kind.to_owned(), format!("https://www.github.com/{}", remote), format!("https://www.github.com/{}.git", remote)) +            }, +            other => { +                panic!("unsupported remote kind: {}", other); +            } +        }; + +        let conn = self.conn.lock().unwrap(); +        conn +            .execute( +                "insert into remotes (repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path) values (?1, ?2, ?3, ?4, ?5, ?6);", +                (repo_id, remote_path, remote_api, remote_url, remote_git_url, config_path) +            ) +            .expect("can insert"); + +        Ok(conn.last_insert_rowid() as u64) +    } + +    pub fn new_job(&self, remote_id: u64, sha: &str) -> Result<u64, String> { +        // TODO: potential race: if two remotes learn about a commit at the same time and we decide +        // to create two jobs at the same time, this might return an incorrect id if the insert +        // didn't actually insert a new row. +        let commit_id = self.new_commit(sha).expect("can create commit record"); + +        let created_time = SystemTime::now() +            .duration_since(UNIX_EPOCH) +            .expect("now is before epoch") +            .as_millis() as u64; + +        let conn = self.conn.lock().unwrap(); + +        let rows_modified = conn.execute( +            "insert into jobs (state, remote_id, commit_id, created_time) values (?1, ?2, ?3, ?4);", +            (crate::sql::JobState::Pending as u64, remote_id, commit_id, created_time) +        ).unwrap(); + +        assert_eq!(1, rows_modified); + +        Ok(conn.last_insert_rowid() as u64) +    } + +    pub fn get_pending_jobs(&self) -> Result<Vec<PendingJob>, String> { +        let conn = self.conn.lock().unwrap(); + +        let mut pending_query = conn.prepare(sql::PENDING_JOBS).unwrap(); +        let mut jobs = pending_query.query([]).unwrap(); +        let mut pending = Vec::new(); + +        while let Some(row) = jobs.next().unwrap() { +            let (id, artifacts, state, run_host, remote_id, commit_id, created_time) = row.try_into().unwrap(); +            let state: u8 = state; +            pending.push(PendingJob { +                id, artifacts, +                state: state.try_into().unwrap(), +                run_host, remote_id, commit_id, created_time +            }); +        } + +        Ok(pending) +    } + +    pub fn notifiers_by_repo(&self, repo_id: u64) -> Result<Vec<RemoteNotifier>, String> { +        #[derive(Debug)] +        #[allow(dead_code)] +        struct Remote { +            id: u64, +            repo_id: u64, +            remote_path: String, +            remote_api: String, +            notifier_config_path: String, +        } + +        let mut remotes: Vec<Remote> = Vec::new(); + +        let conn = self.conn.lock().unwrap(); +        let mut remotes_query = conn.prepare(crate::sql::REMOTES_FOR_REPO).unwrap(); +        let mut remote_results = remotes_query.query([repo_id]).unwrap(); + +        while let Some(row) = remote_results.next().unwrap() { +            let (id, repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path) = row.try_into().unwrap(); +            let _: String = remote_url; +            let _: String = remote_git_url; +            remotes.push(Remote { id, repo_id, remote_path, remote_api, notifier_config_path }); +        } + +        let mut notifiers: Vec<RemoteNotifier> = Vec::new(); + +        for remote in remotes.into_iter() { +            match remote.remote_api.as_str() { +                "github" => { +                    let notifier = RemoteNotifier { +                        remote_path: remote.remote_path, +                        notifier: NotifierConfig::github_from_file(&format!("{}/{}", self.config_path, remote.notifier_config_path)) +                            .expect("can load notifier config") +                    }; +                    notifiers.push(notifier); +                }, +                "email" => { +                    let notifier = RemoteNotifier { +                        remote_path: remote.remote_path, +                        notifier: NotifierConfig::email_from_file(&format!("{}/{}", self.config_path, remote.notifier_config_path)) +                            .expect("can load notifier config") +                    }; +                    notifiers.push(notifier); +                } +                other => { +                    eprintln!("unknown remote api kind: {:?}, remote is {:?}", other, &remote) +                } +            } +        } + +        Ok(notifiers) +    } +} + diff --git a/src/main.rs b/src/main.rs index 1b6d9e8..56aca01 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,11 +3,10 @@  use tokio::spawn;  use std::path::PathBuf; -use serde_derive::{Deserialize, Serialize};  use axum_server::tls_rustls::RustlsConfig;  use axum::routing::*;  use axum::Router; -use axum::response::{IntoResponse, Response}; +use axum::response::{IntoResponse, Response, Html};  use std::net::SocketAddr;  use axum::extract::{Path, State};  use http_body::combinators::UnsyncBoxBody; @@ -18,15 +17,22 @@ use axum::http::{StatusCode, Uri};  use http::header::HeaderMap;  use std::sync::Arc; -use std::sync::Mutex;  use std::time::{SystemTime, UNIX_EPOCH};  use hmac::{Hmac, Mac};  use sha2::Sha256;  mod sql; +mod notifier; +mod dbctx; -use rusqlite::{Connection, OptionalExtension}; +use sql::JobState; + +use dbctx::DbCtx; + +use rusqlite::OptionalExtension; + +const PSKS: &'static [&'static [u8]] = &[];  #[derive(Copy, Clone, Debug)]  enum GithubHookError { @@ -96,14 +102,22 @@ async fn process_push_event(ctx: Arc<DbCtx>, owner: String, repo: String, event:          return (StatusCode::OK, String::new());      } -    let remote_url = format!("https://github.com/{}.git", repo); -    let (remote_id, repo_id): (u64, u64) = ctx.conn.lock().unwrap() +    let remote_url = format!("https://www.github.com/{}.git", repo); +    eprintln!("looking for remote url: {}", remote_url); +    let (remote_id, repo_id): (u64, u64) = match ctx.conn.lock().unwrap()          .query_row("select id, repo_id from remotes where remote_git_url=?1;", [&remote_url], |row| Ok((row.get(0).unwrap(), row.get(1).unwrap()))) -        .unwrap(); +        .optional() +        .unwrap() { +        Some(elems) => elems, +        None => { +            eprintln!("no remote registered for url {} (repo {})", remote_url, repo); +            return (StatusCode::NOT_FOUND, String::new()); +        } +    };      let job_id = ctx.new_job(remote_id, &sha).unwrap(); -    let notifiers = ctx.notifiers_by_name(&repo).expect("can get notifiers"); +    let notifiers = ctx.notifiers_by_repo(repo_id).expect("can get notifiers");      for notifier in notifiers {          notifier.tell_pending_job(&ctx, repo_id, &sha, job_id).await.expect("can notify"); @@ -134,11 +148,78 @@ async fn handle_github_event(ctx: Arc<DbCtx>, owner: String, repo: String, event  async fn handle_commit_status(Path(path): Path<(String, String, String)>, State(ctx): State<Arc<DbCtx>>) -> impl IntoResponse {      eprintln!("path: {}/{}, sha {}", path.0, path.1, path.2); +    let remote_path = format!("{}/{}", path.0, path.1); +    let sha = path.2; + +    let commit_id: Option<u64> = ctx.conn.lock().unwrap() +        .query_row("select id from commits where sha=?1;", [&sha], |row| row.get(0)) +        .optional() +        .expect("can query"); + +    let commit_id: u64 = match commit_id { +        Some(commit_id) => { +            commit_id +        }, +        None => { +            return (StatusCode::NOT_FOUND, Html("<html><body>no such commit</body></html>".to_string())); +        } +    }; + +    let (remote_id, repo_id): (u64, u64) = ctx.conn.lock().unwrap() +        .query_row("select id, repo_id from remotes where remote_path=?1;", [&remote_path], |row| Ok((row.get_unwrap(0), row.get_unwrap(1)))) +        .expect("can query"); + +    let (job_id, state): (u64, u8) = ctx.conn.lock().unwrap() +        .query_row("select id, state from jobs where commit_id=?1;", [commit_id], |row| Ok((row.get_unwrap(0), row.get_unwrap(1)))) +        .expect("can query"); + +    let state: sql::JobState = unsafe { std::mem::transmute(state) }; + +    let repo_name: String = ctx.conn.lock().unwrap() +        .query_row("select repo_name from repos where id=?1;", [repo_id], |row| row.get(0)) +        .expect("can query"); + +    let deployed = false; +      let time = SystemTime::now()          .duration_since(UNIX_EPOCH)          .expect("now is before epoch"); -    format!("requested: {:?}", path) +    let resp = format!("\ +        <html>\n\ +          <head>\n\ +            <title>ci.butactuallyin.space - {}</title>\n\ +          </head>\n\ +          <body>\n\ +            <pre>\n\ +            repo: {}\n\ +            commit: <a href='https://www.github.com/{}/commit/{}'>{}</a>\n  \ +            status: {}\n  \ +            deployed: {}\n\ +            </pre>\n\ +          </body>\n\ +        </html>\n", +        repo_name, +        repo_name, +        &remote_path, &sha, &sha, +        match state { +            JobState::Pending | JobState::Started => { +                "<span style='color:#660;'>pending</span>" +            }, +            JobState::Complete => { +                "<span style='color:green;'>pass</span>" +            }, +            JobState::Error => { +                "<span style='color:red;'>pass</span>" +            } +            JobState::Invalid => { +                "<span style='color:red;'>(server error)</span>" +            } +        }, +        deployed, +    ); + +    (StatusCode::OK, Html(resp))  }  async fn handle_repo_event(Path(path): Path<(String, String)>, headers: HeaderMap, State(ctx): State<Arc<DbCtx>>, body: Bytes) -> impl IntoResponse { @@ -161,17 +242,24 @@ async fn handle_repo_event(Path(path): Path<(String, String)>, headers: HeaderMa          }      }; -    let mut mac = Hmac::<Sha256>::new_from_slice(GITHUB_PSK) -        .expect("hmac can be constructed"); -    mac.update(&body); -    let result = mac.finalize().into_bytes().to_vec(); - -    // hack: skip sha256= -    let decoded = hex::decode(&sent_hmac[7..]).expect("provided hmac is valid hex"); -    if decoded != result { -        eprintln!("bad hmac:\n\ -           got:      {:?}\n\ -           expected: {:?}", decoded, result); +    let mut hmac_ok = false; + +    for psk in PSKS.iter() { +        let mut mac = Hmac::<Sha256>::new_from_slice(psk) +            .expect("hmac can be constructed"); +        mac.update(&body); +        let result = mac.finalize().into_bytes().to_vec(); + +        // hack: skip sha256= +        let decoded = hex::decode(&sent_hmac[7..]).expect("provided hmac is valid hex"); +        if decoded == result { +            hmac_ok = true; +            break; +        } +    } + +    if !hmac_ok { +        eprintln!("bad hmac by all psks");          return (StatusCode::BAD_REQUEST, "").into_response();      } @@ -186,213 +274,8 @@ async fn handle_repo_event(Path(path): Path<(String, String)>, headers: HeaderMa      handle_github_event(ctx, path.0, path.1, kind, payload).await  } -struct DbCtx { -    conn: Mutex<Connection>, -} - -struct RemoteNotifier { -    remote_path: String, -    notifier: NotifierConfig, -} - -#[derive(Serialize, Deserialize)] -#[serde(untagged)] -enum NotifierConfig { -    GitHub { -        token: String, -    }, -    Email { -        username: String, -        password: String, -        mailserver: String, -        from: String, -        to: String, -    } -} - -impl NotifierConfig { -    fn github_from_file(path: &str) -> Result<Self, String> { -        let bytes = std::fs::read(path) -            .map_err(|e| format!("can't read notifier config at {}: {:?}", path, e))?; -        let config = serde_json::from_slice(&bytes) -            .map_err(|e| format!("can't deserialize notifier config at {}: {:?}", path, e))?; - -        if matches!(config, NotifierConfig::GitHub { .. }) { -            Ok(config) -        } else { -            Err(format!("config at {} doesn't look like a github config (but was otherwise valid?)", path)) -        } -    } - -    fn email_from_file(path: &str) -> Result<Self, String> { -        let bytes = std::fs::read(path) -            .map_err(|e| format!("can't read notifier config at {}: {:?}", path, e))?; -        let config = serde_json::from_slice(&bytes) -            .map_err(|e| format!("can't deserialize notifier config at {}: {:?}", path, e))?; - -        if matches!(config, NotifierConfig::Email { .. }) { -            Ok(config) -        } else { -            Err(format!("config at {} doesn't look like an email config (but was otherwise valid?)", path)) -        } -    } -} - -impl RemoteNotifier { -    async fn tell_pending_job(&self, ctx: &Arc<DbCtx>, repo_id: u64, sha: &str, job_id: u64) -> Result<(), String> { -        match &self.notifier { -            NotifierConfig::GitHub { token } => { -                let status_info = serde_json::json!({ -                    "state": "pending", -                    "target_url": format!( -                        "https://{}/{}/{}", -                        "ci.butactuallyin.space", -                        &self.remote_path, -                        sha, -                    ), -                    "description": "build is queued", -                    "context": "actuallyinspace runner", -                }); - -                // TODO: should pool (probably in ctx?) to have an upper bound in concurrent -                // connections. -                let client = reqwest::Client::new(); -                let res = client.post(&format!("https://api.github.com/repos/{}/statuses/{}", &self.remote_path, sha)) -                    .body(serde_json::to_string(&status_info).expect("can stringify json")) -                    .header("authorization", format!("Bearer {}", token)) -                    .header("accept", "application/vnd.github+json") -                    .send() -                    .await; - -                match res { -                    Ok(res) => { -                        if res.status() == StatusCode::OK { -                            Ok(()) -                        } else { -                            Err(format!("bad response: {}, response data: {:?}", res.status().as_u16(), res)) -                        } -                    } -                    Err(e) => { -                        Err(format!("failure sending request: {:?}", e)) -                    } -                } -            } -            NotifierConfig::Email { username, password, mailserver, from, to } => { -                panic!("should send an email saying that a job is now pending for `sha`") -            } -        } -    } -} - -impl DbCtx { -    fn new(db_path: &'static str) -> Self { -        DbCtx { -            conn: Mutex::new(Connection::open(db_path).unwrap()) -        } -    } - -    fn new_commit(&self, sha: &str) -> Result<u64, String> { -        let conn = self.conn.lock().unwrap(); -        conn -            .execute( -                "insert into commits (sha) values (?1)", -                [sha.clone()] -            ) -            .expect("can insert"); - -        Ok(conn.last_insert_rowid() as u64) -    } - -    fn new_job(&self, remote_id: u64, sha: &str) -> Result<u64, String> { -        // TODO: potential race: if two remotes learn about a commit at the same time and we decide -        // to create two jobs at the same time, this might return an incorrect id if the insert -        // didn't actually insert a new row. -        let commit_id = self.new_commit(sha).expect("can create commit record"); - -        let created_time = SystemTime::now() -            .duration_since(UNIX_EPOCH) -            .expect("now is before epoch") -            .as_millis() as u64; - -        let conn = self.conn.lock().unwrap(); - -        let rows_modified = conn.execute( -            "insert into jobs (state, remote_id, commit_id, created_time) values (?1, ?2, ?3, ?4);", -            (sql::JobState::Pending as u64, remote_id, commit_id, created_time) -        ).unwrap(); - -        assert_eq!(1, rows_modified); - -        Ok(conn.last_insert_rowid() as u64) -    } - -    fn notifiers_by_name(&self, repo: &str) -> Result<Vec<RemoteNotifier>, String> { -        let maybe_repo_id: Option<u64> = self.conn.lock() -            .unwrap() -            .query_row("select * from repos where repo_name=?1", [repo], |row| row.get(0)) -            .optional() -            .expect("query succeeds"); -        match maybe_repo_id { -            Some(repo_id) => { -                // get remotes -                 -                #[derive(Debug)] -                #[allow(dead_code)] -                struct Remote { -                    id: u64, -                    repo_id: u64, -                    remote_path: String, -                    remote_api: String, -                    notifier_config_path: String, -                } - -                let mut remotes: Vec<Remote> = Vec::new(); - -                let conn = self.conn.lock().unwrap(); -                let mut remotes_query = conn.prepare(sql::REMOTES_FOR_REPO).unwrap(); -                let mut remote_results = remotes_query.query([repo_id]).unwrap(); - -                while let Some(row) = remote_results.next().unwrap() { -                    let (id, repo_id, remote_path, remote_api, notifier_config_path) = row.try_into().unwrap(); -                    remotes.push(Remote { id, repo_id, remote_path, remote_api, notifier_config_path }); -                } - -                let mut notifiers: Vec<RemoteNotifier> = Vec::new(); - -                for remote in remotes.into_iter() { -                    match remote.remote_api.as_str() { -                        "github" => { -                            let notifier = RemoteNotifier { -                                remote_path: remote.remote_path, -                                notifier: NotifierConfig::github_from_file(&remote.notifier_config_path) -                                    .expect("can load notifier config") -                            }; -                            notifiers.push(notifier); -                        }, -                        "email" => { -                            let notifier = RemoteNotifier { -                                remote_path: remote.remote_path, -                                notifier: NotifierConfig::email_from_file(&remote.notifier_config_path) -                                    .expect("can load notifier config") -                            }; -                            notifiers.push(notifier); -                        } -                        other => { -                            eprintln!("unknown remote api kind: {:?}, remote is {:?}", other, &remote) -                        } -                    } -                } - -                Ok(notifiers) -            } -            None => { -                return Err(format!("repo '{}' is not known", repo)); -            } -        } -    } -} -async fn make_app_server(db_path: &'static str) -> Router { +async fn make_app_server(cfg_path: &'static str, db_path: &'static str) -> Router {      /*      // GET /hello/warp => 200 OK with body "Hello, warp!" @@ -457,7 +340,7 @@ async fn make_app_server(db_path: &'static str) -> Router {          .route("/:owner/:repo/:sha", get(handle_commit_status))          .route("/:owner/:repo", post(handle_repo_event))          .fallback(fallback_get) -        .with_state(Arc::new(DbCtx::new(db_path))) +        .with_state(Arc::new(DbCtx::new(cfg_path, db_path)))  }  #[tokio::main] @@ -468,9 +351,9 @@ async fn main() {          PathBuf::from("/etc/letsencrypt/live/ci.butactuallyin.space/privkey.pem"),      ).await.unwrap();      spawn(axum_server::bind_rustls("127.0.0.1:8080".parse().unwrap(), config.clone()) -        .serve(make_app_server("/root/ixi_ci_server/state.db").await.into_make_service())); +        .serve(make_app_server("/root/ixi_ci_server/config", "/root/ixi_ci_server/state.db").await.into_make_service()));      axum_server::bind_rustls("0.0.0.0:443".parse().unwrap(), config) -        .serve(make_app_server("/root/ixi_ci_server/state.db").await.into_make_service()) +        .serve(make_app_server("/root/ixi_ci_server/config", "/root/ixi_ci_server/state.db").await.into_make_service())          .await          .unwrap();  } diff --git a/src/notifier.rs b/src/notifier.rs new file mode 100644 index 0000000..4ddc91b --- /dev/null +++ b/src/notifier.rs @@ -0,0 +1,126 @@ +use serde_derive::{Deserialize, Serialize}; +use std::sync::Arc; +use axum::http::StatusCode; + +use crate::DbCtx; + +pub struct RemoteNotifier { +    pub remote_path: String, +    pub notifier: NotifierConfig, +} + +#[derive(Serialize, Deserialize)] +#[serde(untagged)] +pub enum NotifierConfig { +    GitHub { +        token: String, +    }, +    Email { +        username: String, +        password: String, +        mailserver: String, +        from: String, +        to: String, +    } +} + +impl NotifierConfig { +    pub fn github_from_file(path: &str) -> Result<Self, String> { +        let bytes = std::fs::read(path) +            .map_err(|e| format!("can't read notifier config at {}: {:?}", path, e))?; +        let config = serde_json::from_slice(&bytes) +            .map_err(|e| format!("can't deserialize notifier config at {}: {:?}", path, e))?; + +        if matches!(config, NotifierConfig::GitHub { .. }) { +            Ok(config) +        } else { +            Err(format!("config at {} doesn't look like a github config (but was otherwise valid?)", path)) +        } +    } + +    pub fn email_from_file(path: &str) -> Result<Self, String> { +        let bytes = std::fs::read(path) +            .map_err(|e| format!("can't read notifier config at {}: {:?}", path, e))?; +        let config = serde_json::from_slice(&bytes) +            .map_err(|e| format!("can't deserialize notifier config at {}: {:?}", path, e))?; + +        if matches!(config, NotifierConfig::Email { .. }) { +            Ok(config) +        } else { +            Err(format!("config at {} doesn't look like an email config (but was otherwise valid?)", path)) +        } +    } +} + +impl RemoteNotifier { +    pub async fn tell_pending_job(&self, ctx: &Arc<DbCtx>, repo_id: u64, sha: &str, job_id: u64) -> Result<(), String> { +        self.tell_job_status( +            ctx, +            repo_id, sha, job_id, +            "pending", "build is queued", &format!("https://{}/{}/{}", "ci.butactuallyin.space", &self.remote_path, sha) +        ).await +    } + +    pub async fn tell_complete_job(&self, ctx: &Arc<DbCtx>, repo_id: u64, sha: &str, job_id: u64, desc: Result<String, String>) -> Result<(), String> { +        match desc { +            Ok(status) => { +                self.tell_job_status( +                    ctx, +                    repo_id, sha, job_id, +                    "success", &status, &format!("https://{}/{}/{}", "ci.butactuallyin.space", &self.remote_path, sha) +                ).await +            }, +            Err(status) => { +                self.tell_job_status( +                    ctx, +                    repo_id, sha, job_id, +                    "failure", &status, &format!("https://{}/{}/{}", "ci.butactuallyin.space", &self.remote_path, sha) +                ).await +            } +        } +    } + +    pub async fn tell_job_status(&self, ctx: &Arc<DbCtx>, repo_id: u64, sha: &str, job_id: u64, state: &str, desc: &str, target_url: &str) -> Result<(), String> { +        match &self.notifier { +            NotifierConfig::GitHub { token } => { +                let status_info = serde_json::json!({ +                    "state": state, +                    "description": desc, +                    "target_url": target_url, +                    "context": "actuallyinspace runner", +                }); + +                // TODO: should pool (probably in ctx?) to have an upper bound in concurrent +                // connections. +                let client = reqwest::Client::new(); +                let req = client.post(&format!("https://api.github.com/repos/{}/statuses/{}", &self.remote_path, sha)) +                    .body(serde_json::to_string(&status_info).expect("can stringify json")) +                    .header("content-type", "application/json") +                    .header("user-agent", "iximeow") +                    .header("authorization", format!("Bearer {}", token)) +                    .header("accept", "application/vnd.github+json"); +                eprintln!("sending {:?}", req); +                eprintln!("  body: {}", serde_json::to_string(&status_info).expect("can stringify json")); +                let res = req +                    .send() +                    .await; + +                match res { +                    Ok(res) => { +                        if res.status() == StatusCode::OK || res.status() == StatusCode::CREATED{ +                            Ok(()) +                        } else { +                            Err(format!("bad response: {}, response data: {:?}", res.status().as_u16(), res)) +                        } +                    } +                    Err(e) => { +                        Err(format!("failure sending request: {:?}", e)) +                    } +                } +            } +            NotifierConfig::Email { username, password, mailserver, from, to } => { +                panic!("should send an email saying that a job is now pending for `sha`") +            } +        } +    } +} @@ -1,10 +1,29 @@  #![allow(dead_code)] +use std::convert::TryFrom; + +#[derive(Debug, Clone)]  pub enum JobState {      Pending = 0,      Started = 1,      Complete = 2,      Error = 3, +    Invalid = 4, +} + +impl TryFrom<u8> for JobState { +    type Error = String; + +    fn try_from(value: u8) -> Result<Self, String> { +        match value { +            0 => Ok(JobState::Pending), +            1 => Ok(JobState::Started), +            2 => Ok(JobState::Complete), +            3 => Ok(JobState::Error), +            4 => Ok(JobState::Invalid), +            other => Err(format!("invalid job state: {}", other)), +        } +    }  }  // remote_id is the remote from which we were notified. this is necessary so we know which remote @@ -14,6 +33,7 @@ pub const CREATE_JOBS_TABLE: &'static str = "\          artifacts_path TEXT,          state INTEGER NOT NULL,          run_host TEXT, +        build_token TEXT,          remote_id INTEGER,          commit_id INTEGER,          created_time INTEGER, @@ -45,6 +65,9 @@ pub const CREATE_REMOTES_TABLE: &'static str = "\  pub const CREATE_REMOTES_INDEX: &'static str = "\      CREATE INDEX IF NOT EXISTS 'repo_to_remote' ON remotes(repo_id);"; +pub const CREATE_REPO_NAME_INDEX: &'static str = "\ +    CREATE UNIQUE INDEX IF NOT EXISTS 'repo_names' ON repos(repo_name);"; +  pub const PENDING_JOBS: &'static str = "\      select * from jobs where state=0;"; | 
