diff options
| author | iximeow <me@iximeow.net> | 2023-07-13 00:51:51 -0700 | 
|---|---|---|
| committer | iximeow <me@iximeow.net> | 2023-07-13 00:51:51 -0700 | 
| commit | 9e6906c00c49186189d211dc96e132d85e7ff641 (patch) | |
| tree | 05c20145ebc306313e3a12dc73c34b5dea40bbdc /src | |
| parent | 543150f1666690351d4698421cc6ceb115c1e251 (diff) | |
reorganize the whole thing into crates/ packages
Diffstat (limited to 'src')
| -rw-r--r-- | src/ci_ctl.rs | 235 | ||||
| -rw-r--r-- | src/ci_driver.rs | 630 | ||||
| -rw-r--r-- | src/ci_runner.rs | 668 | ||||
| -rw-r--r-- | src/dbctx.rs | 772 | ||||
| -rw-r--r-- | src/io.rs | 181 | ||||
| -rw-r--r-- | src/lua/mod.rs | 411 | ||||
| -rw-r--r-- | src/main.rs | 1092 | ||||
| -rw-r--r-- | src/notifier.rs | 181 | ||||
| -rw-r--r-- | src/protocol.rs | 114 | ||||
| -rw-r--r-- | src/sql.rs | 229 | 
10 files changed, 0 insertions, 4513 deletions
| diff --git a/src/ci_ctl.rs b/src/ci_ctl.rs deleted file mode 100644 index f0ffa62..0000000 --- a/src/ci_ctl.rs +++ /dev/null @@ -1,235 +0,0 @@ -use clap::{Parser, Subcommand}; - -mod sql; -mod dbctx; -mod notifier; -mod io; -mod protocol; - -use dbctx::DbCtx; -use notifier::NotifierConfig; - -#[derive(Parser)] -#[command(version, about, long_about = None)] -struct Args { -    /// path to a database to manage (defaults to "./state.db") -    db_path: Option<String>, - -    /// path to where configs should be found (defaults to "./config") -    config_path: Option<String>, - -    #[command(subcommand)] -    command: Command, -} - -#[derive(Subcommand)] -enum Command { -    /// add _something_ to the db -    Add { -        #[command(subcommand)] -        what: AddItem, -    }, - -    /// make sure that the state looks reasonable. -    /// -    /// currently, ensure that all notifiers have a config, that config references an existing -    /// file, and that the referenced file is valid. -    Validate, - -    /// do something with jobs -    Job { -        #[command(subcommand)] -        what: JobAction, -    }, -} - -#[derive(Subcommand)] -enum JobAction { -    List, -    Rerun { -        which: u32 -    }, -    RerunCommit { -        commit: String -    }, -    Create { -        repo: String, -        commit: String, -        pusher_email: String, -    } -} - -#[derive(Subcommand)] -enum AddItem { -    Repo { -        name: String, -        remote: Option<String>, -        remote_kind: Option<String>, -        config: Option<String>, -    }, -    Remote { -        repo_name: String, -        remote: String, -        remote_kind: String, -        config: String, -    }, -} - -fn main() { -    let args = Args::parse(); - -    let db_path = args.db_path.unwrap_or_else(|| "./state.db".to_owned()); -    let config_path = args.config_path.unwrap_or_else(|| "./config".to_owned()); - -    match args.command { -        Command::Job { what } => { -            match what { -                JobAction::List => { -                    let db = DbCtx::new(&config_path, &db_path); -                    let mut conn = db.conn.lock().unwrap(); -                    let mut query = conn.prepare(crate::sql::SELECT_ALL_RUNS_WITH_JOB_INFO).unwrap(); -                    let mut jobs = query.query([]).unwrap(); -                    while let Some(row) = jobs.next().unwrap() { -                        let (job_id, run_id, state, created_time, commit_id, run_preferences): (u64, u64, u64, u64, u64, Option<String>) = row.try_into().unwrap(); - -                        eprint!("[+] {:04} ({:04}) | {: >8?} | {} | {}", run_id, job_id, state, created_time, commit_id); -                        if let Some(run_preferences) = run_preferences { -                            eprintln!(" | run preference: {}", run_preferences); -                        } else { -                            eprintln!(""); -                        } -                    } -                    eprintln!("jobs"); -                }, -                JobAction::Rerun { which } => { -                    let db = DbCtx::new(&config_path, &db_path); -                    let task_id = db.new_run(which as u64, None).expect("db can be queried").id; -                    eprintln!("[+] rerunning job {} as task {}", which, task_id); -                } -                JobAction::RerunCommit { commit } => { -                    let db = DbCtx::new(&config_path, &db_path); -                    let job_id = db.job_for_commit(&commit).unwrap(); -                    if let Some(job_id) = job_id { -                        let task_id = db.new_run(job_id, None).expect("db can be queried").id; -                        eprintln!("[+] rerunning job {} (commit {}) as task {}", job_id, commit, task_id); -                    } else { -                        eprintln!("[-] no job for commit {}", commit); -                    } -                } -                JobAction::Create { repo, commit, pusher_email } => { -                    let db = DbCtx::new(&config_path, &db_path); -                    let parts = repo.split(":").collect::<Vec<&str>>(); -                    let (remote_kind, repo_path) = (parts[0], parts[1]); -                    let remote = match db.remote_by_path_and_api(&remote_kind, &repo_path).expect("can query") { -                        Some(remote) => remote, -                        None => { -                            eprintln!("[-] no remote registered as {}:{}", remote_kind, repo_path); -                            return; -                        } -                    }; - -                    let repo_default_run_pref: Option<String> = db.conn.lock().unwrap() -                        .query_row("select default_run_preference from repos where id=?1;", [remote.repo_id], |row| { -                            Ok((row.get(0)).unwrap()) -                        }) -                        .expect("can query"); - -                    let job_id = db.new_job(remote.id, &commit, Some(&pusher_email), repo_default_run_pref).expect("can create"); -                    let _ = db.new_run(job_id, None).unwrap(); -                } -            } -        }, -        Command::Add { what } => { -            match what { -                AddItem::Repo { name, remote, remote_kind, config } => { -                    let remote_config = match (remote, remote_kind, config) { -                        (Some(remote), Some(remote_kind), Some(config_path)) => { -                            // do something -                            if remote_kind != "github" { -                                eprintln!("unknown remote kind: {}", remote); -                                return; -                            } -                            Some((remote, remote_kind, config_path)) -                        }, -                        (None, None, None) => { -                            None -                        }, -                        _ => { -                            eprintln!("when specifying a remote, `remote`, `remote_kind`, and `config_path` must either all be provided together or not at all"); -                            return; -                        } -                    }; - -                    let db = DbCtx::new(&config_path, &db_path); -                    let repo_id = match db.new_repo(&name) { -                        Ok(repo_id) => repo_id, -                        Err(e) => { -                            if e.contains("UNIQUE constraint failed") { -                                eprintln!("[!] repo '{}' already exists", name); -                                return; -                            } else { -                                eprintln!("[!] failed to create repo entry: {}", e); -                                return; -                            } -                        } -                    }; -                    println!("[+] new repo created: '{}' id {}", &name, repo_id); -                    if let Some((remote, remote_kind, config_path)) = remote_config { -                        let full_config_file_path = format!("{}/{}", &db.config_path.display(), config_path); -                        let config = match remote_kind.as_ref() { -                            "github" => { -                                assert!(NotifierConfig::github_from_file(&full_config_file_path).is_ok()); -                            } -                            "github-email" => { -                                assert!(NotifierConfig::email_from_file(&full_config_file_path).is_ok()); -                            } -                            other => { -                                panic!("[-] notifiers for '{}' remotes are not supported", other); -                            } -                        }; -                        db.new_remote(repo_id, remote.as_str(), remote_kind.as_str(), config_path.as_str()).unwrap(); -                        println!("[+] new remote created: repo '{}', {} remote at {}", &name, remote_kind, remote); -                        match remote_kind.as_str() { -                            "github" => { -                                println!("[!] now go make sure your github repo has a webhook set for `https://ci.butactuallyin.space/{}` to receive at least the `push` event.", remote.as_str()); -                                println!("      the secret sent with calls to this webhook should be the same preshared secret as the CI server is configured to know."); -                            } -                            _ => { } -                        } -                    } -                }, -                AddItem::Remote { repo_name, remote, remote_kind, config } => { -                    let db = DbCtx::new(&config_path, &db_path); -                    let repo_id = match db.repo_id_by_name(&repo_name) { -                        Ok(Some(id)) => id, -                        Ok(None) => { -                            eprintln!("[-] repo '{}' does not exist", repo_name); -                            return; -                        }, -                        Err(e) => { -                            eprintln!("[!] couldn't look up repo '{}': {:?}", repo_name, e); -                            return; -                        } -                    }; -                    let config_file = format!("{}/{}", config_path, config); -                    match remote_kind.as_ref() { -                        "github" => { -                            NotifierConfig::github_from_file(&config_file).unwrap(); -                        } -                        "github-email" => { -                            NotifierConfig::email_from_file(&config_file).unwrap(); -                        } -                        other => { -                            panic!("notifiers for '{}' remotes are not supported", other); -                        } -                    }; -                    db.new_remote(repo_id, remote.as_str(), remote_kind.as_str(), config.as_str()).unwrap(); -                    println!("[+] new remote created: repo '{}', {} remote at {}", &repo_name, remote_kind, remote); -                }, -            } -        }, -        Command::Validate => { -            println!("ok"); -        } -    } -} diff --git a/src/ci_driver.rs b/src/ci_driver.rs deleted file mode 100644 index f6c1828..0000000 --- a/src/ci_driver.rs +++ /dev/null @@ -1,630 +0,0 @@ -use std::process::Command; -use std::collections::HashMap; -use std::sync::{Mutex, RwLock}; -use lazy_static::lazy_static; -use std::io::Read; -use serde_derive::{Deserialize, Serialize}; -use futures_util::StreamExt; -use std::fmt; -use std::path::{Path, PathBuf}; -use tokio::spawn; -use tokio_stream::wrappers::ReceiverStream; -use std::sync::{Arc, Weak}; -use std::time::{SystemTime, UNIX_EPOCH}; -use axum_server::tls_rustls::RustlsConfig; -use axum::body::StreamBody; -use axum::http::{StatusCode}; -use hyper::HeaderMap; -use axum::Router; -use axum::routing::*; -use axum::extract::State; -use axum::extract::BodyStream; -use axum::response::IntoResponse; -use tokio::sync::mpsc; -use tokio::sync::mpsc::error::TrySendError; -use serde_json::json; - -mod dbctx; -mod sql; -mod notifier; -mod io; -mod protocol; - -use crate::dbctx::{DbCtx, PendingRun, Job, Run}; -use crate::sql::JobResult; -use crate::sql::RunState; -use crate::protocol::{ClientProto, CommandInfo, TaskInfo, RequestedJob}; - -lazy_static! { -    static ref AUTH_SECRET: RwLock<Option<String>> = RwLock::new(None); -    static ref ACTIVE_TASKS: Mutex<HashMap<u64, Weak<()>>> = Mutex::new(HashMap::new()); -} - -fn reserve_artifacts_dir(run: u64) -> std::io::Result<PathBuf> { -    let mut path: PathBuf = "/root/ixi_ci_server/artifacts/".into(); -    path.push(run.to_string()); -    match std::fs::create_dir(&path) { -        Ok(()) => { -            Ok(path) -        }, -        Err(e) => { -            if e.kind() == std::io::ErrorKind::AlreadyExists { -                Ok(path) -            } else { -                Err(e) -            } -        } -    } -} - -async fn activate_run(dbctx: Arc<DbCtx>, candidate: RunnerClient, job: &Job, run: &PendingRun) -> Result<(), String> { -    eprintln!("activating task {:?}", run); - -    let now = SystemTime::now() -        .duration_since(UNIX_EPOCH) -        .expect("now is before epoch") -        .as_millis(); - -    let remote = dbctx.remote_by_id(job.remote_id).expect("query succeeds").expect("job has remote"); -    let repo = dbctx.repo_by_id(remote.repo_id).expect("query succeeds").expect("remote has repo"); - -    let commit_sha = dbctx.commit_sha(job.commit_id).expect("query succeeds"); - -    let artifacts: PathBuf = reserve_artifacts_dir(run.id).expect("can reserve a directory for artifacts"); - -    eprintln!("running {}", &repo.name); - -    let res = candidate.submit(&dbctx, &run, &remote.remote_git_url, &commit_sha).await; - -    let mut client_job = match res { -        Ok(Some(mut client_job)) => { client_job } -        Ok(None) => { -            return Err("client hung up instead of acking task".to_string()); -        } -        Err(e) => { -            // failed to submit job, move on for now -            return Err(format!("failed to submit task: {:?}", e)); -        } -    }; - -    let host_id = client_job.client.host_id; - -    let connection = dbctx.conn.lock().unwrap(); -    connection.execute( -        "update runs set started_time=?1, host_id=?2, state=1, artifacts_path=?3, build_token=?4 where id=?5", -        (now as u64, host_id, format!("{}", artifacts.display()), &client_job.client.build_token, run.id) -    ) -        .expect("can update"); -    std::mem::drop(connection); - -    spawn(async move { -        client_job.run().await -    }); - -    Ok(()) -} - -struct RunnerClient { -    tx: mpsc::Sender<Result<String, String>>, -    rx: BodyStream, -    host_id: u32, -    build_token: String, -    accepted_sources: Option<Vec<String>>, -} - -fn token_for_job() -> String { -    let mut data = [0u8; 32]; -    std::fs::File::open("/dev/urandom") -        .unwrap() -        .read_exact(&mut data) -        .unwrap(); - -    base64::encode(data) -} - -struct ClientJob { -    dbctx: Arc<DbCtx>, -    remote_git_url: String, -    sha: String, -    task: PendingRun, -    client: RunnerClient, -    // exists only as confirmation this `ClientJob` is somewhere, still alive and being processed. -    task_witness: Arc<()>, -} - -impl ClientJob { -    pub async fn run(&mut self) { -        loop { -            eprintln!("waiting on response.."); -            let msg = match self.client.recv_typed::<ClientProto>().await.expect("recv works") { -                Some(msg) => msg, -                None => { -                    eprintln!("client hung up. task's done, i hope?"); -                    return; -                } -            }; -            eprintln!("got {:?}", msg); -            match msg { -                ClientProto::NewTaskPlease { allowed_pushers, host_info } => { -                    eprintln!("misdirected task request (after handshake?)"); -                    return; -                } -                ClientProto::TaskStatus(task_info) => { -                    let (result, state): (Result<String, String>, RunState) = match task_info { -                        TaskInfo::Finished { status } => { -                            eprintln!("task update: state is finished and result is {}", status); -                            match status.as_str() { -                                "pass" => { -                                    (Ok("success".to_string()), RunState::Finished) -                                }, -                                other => { -                                    eprintln!("unhandled task completion status: {}", other); -                                    (Err(other.to_string()), RunState::Error) -                                } -                            } -                        }, -                        TaskInfo::Interrupted { status, description } => { -                            eprintln!("task update: state is interrupted and result is {}", status); -                            let desc = description.unwrap_or_else(|| status.clone()); -                            (Err(desc), RunState::Error) -                        } -                    }; - -                    let job = self.dbctx.job_by_id(self.task.job_id).expect("can query").expect("job exists"); -                    let repo_id = self.dbctx.repo_id_by_remote(job.remote_id).unwrap().expect("remote exists"); - -                    for notifier in self.dbctx.notifiers_by_repo(repo_id).expect("can get notifiers") { -                        if let Err(e) = notifier.tell_complete_job(&self.dbctx, repo_id, &self.sha, self.task.id, result.clone()).await { -                            eprintln!("could not notify {:?}: {:?}", notifier.remote_path, e); -                        } -                    } - -                    let now = SystemTime::now() -                        .duration_since(UNIX_EPOCH) -                        .expect("now is before epoch") -                        .as_millis(); - -                    let build_result = if result.is_ok() { -                        JobResult::Pass -                    } else { -                        JobResult::Fail -                    }; -                    let result_desc = match result { -                        Ok(msg) => msg, -                        Err(msg) => msg, -                    }; - -                    self.dbctx.conn.lock().unwrap().execute( -                        "update runs set complete_time=?1, state=?2, build_result=?3, final_status=?4 where id=?5", -                        (now as u64, state as u64, build_result as u8, result_desc, self.task.id) -                    ) -                        .expect("can update"); -                } -                ClientProto::ArtifactCreate => { -                    eprintln!("creating artifact"); -                    self.client.send(serde_json::json!({ -                        "status": "ok", -                        "object_id": "10", -                    })).await.unwrap(); -                }, -                ClientProto::Metric { name, value } => { -                    self.dbctx.insert_metric(self.task.id, &name, &value) -                        .expect("TODO handle metric insert error?"); -                } -                ClientProto::Command(_command_info) => { -                    // record information about commands, start/stop, etc. probably also allow -                    // artifacts to be attached to commands and default to attaching stdout/stderr? -                } -                other => { -                    eprintln!("unhandled message {:?}", other); -                } -            } -        } -    } -} - -impl RunnerClient { -    async fn new(sender: mpsc::Sender<Result<String, String>>, resp: BodyStream, accepted_sources: Option<Vec<String>>, host_id: u32) -> Result<Self, String> { -        let token = token_for_job(); -        let client = RunnerClient { -            tx: sender, -            rx: resp, -            host_id, -            build_token: token, -            accepted_sources, -        }; -        Ok(client) -    } - -    async fn test_connection(&mut self) -> Result<(), String> { -        self.send_typed(&ClientProto::Ping).await?; -        let resp = self.recv_typed::<ClientProto>().await?; -        match resp { -            Some(ClientProto::Pong) => { -                Ok(()) -            } -            Some(other) => { -                Err(format!("unexpected connection test response: {:?}", other)) -            } -            None => { -                Err("client hung up".to_string()) -            } -        } -    } - -    async fn send(&mut self, msg: serde_json::Value) -> Result<(), String> { -        self.send_typed(&msg).await -    } - -    async fn send_typed<T: serde::Serialize>(&mut self, msg: &T) -> Result<(), String> { -        self.tx.send(Ok(serde_json::to_string(msg).unwrap())) -            .await -            .map_err(|e| e.to_string()) -    } - -    async fn recv(&mut self) -> Result<Option<serde_json::Value>, String> { -        match self.rx.next().await { -            Some(Ok(bytes)) => { -                serde_json::from_slice(&bytes) -                    .map(Option::Some) -                    .map_err(|e| e.to_string()) -            }, -            Some(Err(e)) => { -                eprintln!("e: {:?}", e); -                Err(format!("no client job: {:?}", e)) -            }, -            None => { -                Ok(None) -            } -        } -    } - -    async fn recv_typed<T: serde::de::DeserializeOwned>(&mut self) -> Result<Option<T>, String> { -        let json = self.recv().await?; -        Ok(json.map(|v| serde_json::from_value(v).unwrap())) -    } - -    // is this client willing to run the job based on what it has told us so far? -    fn will_accept(&self, job: &Job) -> bool { -        match (job.source.as_ref(), self.accepted_sources.as_ref()) { -            (_, None) => true, -            (None, Some(_)) => false, -            (Some(source), Some(accepted_sources)) => { -                accepted_sources.contains(source) -            } -        } -    } - -    async fn submit(mut self, dbctx: &Arc<DbCtx>, job: &PendingRun, remote_git_url: &str, sha: &str) -> Result<Option<ClientJob>, String> { -        self.send_typed(&ClientProto::new_task(RequestedJob { -            commit: sha.to_string(), -            remote_url: remote_git_url.to_string(), -            build_token: self.build_token.to_string(), -        })).await?; -        match self.recv_typed::<ClientProto>().await { -            Ok(Some(ClientProto::Started)) => { -                let task_witness = Arc::new(()); -                ACTIVE_TASKS.lock().unwrap().insert(job.id, Arc::downgrade(&task_witness)); -                Ok(Some(ClientJob { -                    task: job.clone(), -                    dbctx: Arc::clone(dbctx), -                    sha: sha.to_string(), -                    remote_git_url: remote_git_url.to_string(), -                    client: self, -                    task_witness, -                })) -            } -            Ok(Some(resp)) => { -                eprintln!("invalid response: {:?}", resp); -                Err("client rejected job".to_string()) -            } -            Ok(None) => { -                Ok(None) -            } -            Err(e) => { -                eprintln!("e: {:?}", e); -                Err(e) -            } -        } -    } -} - -impl fmt::Debug for RunnerClient { -    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { -        f.write_str("RunnerClient { .. }") -    } -} - -#[axum_macros::debug_handler] -async fn handle_artifact(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClient>)>, headers: HeaderMap, artifact_content: BodyStream) -> impl IntoResponse { -    eprintln!("artifact request"); -    let run_token = match headers.get("x-task-token") { -        Some(run_token) => run_token.to_str().expect("valid string"), -        None => { -            eprintln!("bad artifact post: headers: {:?}\nno x-tasak-token", headers); -            return (StatusCode::BAD_REQUEST, "").into_response(); -        } -    }; - -    let (run, artifact_path, token_validity) = match ctx.0.run_for_token(&run_token).unwrap() { -        Some(result) => result, -        None => { -            eprintln!("bad artifact post: headers: {:?}\nrun token is not known", headers); -            return (StatusCode::BAD_REQUEST, "").into_response(); -        } -    }; - -    if token_validity != dbctx::TokenValidity::Valid { -        eprintln!("bad artifact post: headers: {:?}. token is not valid: {:?}", headers, token_validity); -        return (StatusCode::BAD_REQUEST, "").into_response(); -    } - -    let artifact_path = if let Some(artifact_path) = artifact_path { -        artifact_path -    } else { -        eprintln!("bad artifact post: headers: {:?}. no artifact path?", headers); -        return (StatusCode::BAD_REQUEST, "").into_response(); -    }; - -    let artifact_name = match headers.get("x-artifact-name") { -        Some(artifact_name) => artifact_name.to_str().expect("valid string"), -        None => { -            eprintln!("bad artifact post: headers: {:?}\nno x-artifact-name", headers); -            return (StatusCode::BAD_REQUEST, "").into_response(); -        } -    }; - -    let artifact_desc = match headers.get("x-artifact-desc") { -        Some(artifact_desc) => artifact_desc.to_str().expect("valid string"), -        None => { -            eprintln!("bad artifact post: headers: {:?}\nno x-artifact-desc", headers); -            return (StatusCode::BAD_REQUEST, "").into_response(); -        } -    }; - -    let mut artifact = match ctx.0.reserve_artifact(run, artifact_name, artifact_desc).await { -        Ok(artifact) => artifact, -        Err(err) => { -            eprintln!("failure to reserve artifact: {:?}", err); -            return (StatusCode::INTERNAL_SERVER_ERROR, "").into_response(); -        } -    }; - -    eprintln!("spawning task..."); -    let dbctx_ref = Arc::clone(&ctx.0); -    spawn(async move { -        artifact.store_all(artifact_content).await.unwrap(); -        dbctx_ref.finalize_artifact(artifact.artifact_id).await.unwrap(); -    }); -    eprintln!("done?"); - -    (StatusCode::OK, "").into_response() -} - -#[derive(Serialize, Deserialize)] -struct WorkRequest { -    kind: String, -    accepted_pushers: Option<Vec<String>> -} - -async fn handle_next_job(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClient>)>, headers: HeaderMap, mut job_resp: BodyStream) -> impl IntoResponse { -    let auth_token = match headers.get("authorization") { -        Some(token) => { -            if Some(token.to_str().unwrap_or("")) != AUTH_SECRET.read().unwrap().as_ref().map(|x| &**x) { -                eprintln!("BAD AUTH SECRET SUBMITTED: {:?}", token); -                return (StatusCode::BAD_REQUEST, "").into_response(); -            } -        } -        None => { -            eprintln!("bad artifact post: headers: {:?}\nno authorization", headers); -            return (StatusCode::BAD_REQUEST, "").into_response(); -        } -    }; - -    let (tx_sender, tx_receiver) = mpsc::channel(8); -    let resp_body = StreamBody::new(ReceiverStream::new(tx_receiver)); -    tx_sender.send(Ok("hello".to_string())).await.expect("works"); - -    let request = job_resp.next().await.expect("request chunk").expect("chunk exists"); -    let request = std::str::from_utf8(&request).unwrap(); -    let request: ClientProto = match serde_json::from_str(&request) { -        Ok(v) => v, -        Err(e) => { -            eprintln!("couldn't parse work request: {:?}", e); -            return (StatusCode::MISDIRECTED_REQUEST, resp_body).into_response(); -        } -    }; -    let (accepted_pushers, host_info) = match request { -        ClientProto::NewTaskPlease { allowed_pushers, host_info } => (allowed_pushers, host_info), -        other => { -            eprintln!("bad request kind: {:?}", &other); -            return (StatusCode::MISDIRECTED_REQUEST, resp_body).into_response(); -        } -    }; - -    eprintln!("client identifies itself as {:?}", host_info); - -    let host_info_id = ctx.0.id_for_host(&host_info).expect("can get a host info id"); - -    let client = match RunnerClient::new(tx_sender, job_resp, accepted_pushers, host_info_id).await { -        Ok(v) => v, -        Err(e) => { -            eprintln!("unable to register client"); -            return (StatusCode::MISDIRECTED_REQUEST, resp_body).into_response(); -        } -    }; - -    match ctx.1.try_send(client) { -        Ok(()) => { -            eprintln!("client requested work..."); -            return (StatusCode::OK, resp_body).into_response(); -        } -        Err(TrySendError::Full(client)) => { -            return (StatusCode::IM_A_TEAPOT, resp_body).into_response(); -        } -        Err(TrySendError::Closed(client)) => { -            panic!("client holder is gone?"); -        } -    } -} - -async fn make_api_server(dbctx: Arc<DbCtx>) -> (Router, mpsc::Receiver<RunnerClient>) { -    let (pending_client_sender, pending_client_receiver) = mpsc::channel(8); - -    let router = Router::new() -        .route("/api/next_job", post(handle_next_job)) -        .route("/api/artifact", post(handle_artifact)) -        .with_state((dbctx, pending_client_sender)); -    (router, pending_client_receiver) -} - -#[derive(Deserialize, Serialize)] -struct DriverConfig { -    cert_path: PathBuf, -    key_path: PathBuf, -    config_path: PathBuf, -    db_path: PathBuf, -    server_addr: String, -    auth_secret: String, -} - -#[tokio::main] -async fn main() { -    tracing_subscriber::fmt::init(); - -    let mut args = std::env::args(); -    args.next().expect("first arg exists"); -    let config_path = args.next().unwrap_or("./driver_config.json".to_string()); -    let driver_config: DriverConfig = serde_json::from_reader(std::fs::File::open(config_path).expect("file exists and is accessible")).expect("valid json for DriverConfig"); -    let mut auth_secret = AUTH_SECRET.write().unwrap(); -    *auth_secret = Some(driver_config.auth_secret.clone()); -    std::mem::drop(auth_secret); - -    let config = RustlsConfig::from_pem_file( -        driver_config.cert_path.clone(), -        driver_config.key_path.clone(), -    ).await.unwrap(); - -    let dbctx = Arc::new(DbCtx::new(&driver_config.config_path, &driver_config.db_path)); - -    dbctx.create_tables().unwrap(); - -    let (api_server, mut channel) = make_api_server(Arc::clone(&dbctx)).await; -    spawn(axum_server::bind_rustls(driver_config.server_addr.parse().unwrap(), config) -          .serve(api_server.into_make_service())); - -    spawn(old_task_reaper(Arc::clone(&dbctx))); - -    loop { -        let mut candidate = match channel.recv().await -            .ok_or_else(|| "client channel disconnected".to_string()) { - -            Ok(candidate) => { candidate }, -            Err(e) => { eprintln!("client error: {}", e); continue; } -        }; - -        let dbctx = Arc::clone(&dbctx); -        spawn(async move { -            let host_id = candidate.host_id; -            let res = find_client_task(dbctx, candidate).await; -            eprintln!("task client for {}: {:?}", host_id, res); -        }); -    } -} - -async fn find_client_task(dbctx: Arc<DbCtx>, mut candidate: RunnerClient) -> Result<(), String> { -    let find_client_task_start = std::time::Instant::now(); - -    let (run, job) = 'find_work: loop { -        // try to find a job for this candidate: -        // * start with pending runs - these need *some* client to run them, but do not care which -        // * if no new jobs, maybe an existing job still needs a rerun on this client? -        // * otherwise, um, i dunno. do nothing? - -        let runs = dbctx.get_pending_runs(Some(candidate.host_id)).unwrap(); - -        if runs.len() > 0 { -            println!("{} new runs", runs.len()); - -            for run in runs.into_iter() { -                let job = dbctx.job_by_id(run.job_id).expect("can query").expect("job exists"); - -                if candidate.will_accept(&job) { -                    break 'find_work (run, job); -                } - -            } -        } - -        let alt_run_jobs = dbctx.jobs_needing_task_runs_for_host(candidate.host_id as u64).expect("can query"); - -        for job in alt_run_jobs.into_iter() { -            if candidate.will_accept(&job) { -                let run = dbctx.new_run(job.id, Some(candidate.host_id)).unwrap(); -                break 'find_work (run, job); -            } -        } - -        tokio::time::sleep(std::time::Duration::from_millis(100)).await; - -        if candidate.test_connection().await.is_err() { -            return Err("lost client connection".to_string()); -        } - -        if find_client_task_start.elapsed().as_secs() > 300 { -            return Err("5min new task deadline elapsed".to_string()); -        } -    }; - -    eprintln!("enqueueing job {} for alternate run under host id {}", job.id, candidate.host_id); -    activate_run(Arc::clone(&dbctx), candidate, &job, &run).await?; - -    Ok(()) -} - -async fn old_task_reaper(dbctx: Arc<DbCtx>) { -    let mut potentially_stale_tasks = dbctx.get_active_runs().unwrap(); - -    let active_tasks = ACTIVE_TASKS.lock().unwrap(); - -    for (id, witness) in active_tasks.iter() { -        if let Some(idx) = potentially_stale_tasks.iter().position(|task| task.id == *id) { -            potentially_stale_tasks.swap_remove(idx); -        } -    } - -    std::mem::drop(active_tasks); - -    // ok, so we have tasks that are not active, now if the task is started we know someone should -    // be running it and they are not. retain only those tasks, as they are ones we may want to -    // mark dead. -    // -    // further, filter out any tasks created in the last 60 seconds. this is a VERY generous grace -    // period for clients that have accepted a job but for some reason we have not recorded them as -    // active (perhaps they are slow to ack somehow). - -    let stale_threshold = crate::io::now_ms() - 60_000; - -    let stale_tasks: Vec<Run> = potentially_stale_tasks.into_iter().filter(|task| { -        match (task.state, task.start_time) { -            // `run` is atomically set to `Started` and adorned with a `start_time`. disagreement -            // between the two means this run is corrupt and should be reaped. -            (RunState::Started, None) => { -                true -            }, -            (RunState::Started, Some(start_time)) => { -                start_time < stale_threshold -            } -            // and if it's not `started`, it's either pending (not assigned yet, so not stale), or -            // one of the complete statuses. -            _ => { -                false -            } -        } -    }).collect(); - -    for task in stale_tasks.iter() { -        eprintln!("looks like task {} is stale, reaping", task.id); -        dbctx.reap_task(task.id).expect("works"); -    } -} diff --git a/src/ci_runner.rs b/src/ci_runner.rs deleted file mode 100644 index 9aaf33d..0000000 --- a/src/ci_runner.rs +++ /dev/null @@ -1,668 +0,0 @@ -use std::time::Duration; -use std::os::unix::process::ExitStatusExt; -use rlua::prelude::LuaError; -use std::sync::{Arc, Mutex}; -use reqwest::{StatusCode, Response}; -use tokio::process::Command; -use std::process::Stdio; -use std::process::ExitStatus; -use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; -use serde_json::json; -use serde::{Deserialize, de::DeserializeOwned, Serialize}; -use std::task::{Context, Poll}; -use std::pin::Pin; -use std::marker::Unpin; - -mod protocol; -mod lua; -mod io; - -use crate::io::{ArtifactStream, VecSink}; -use crate::protocol::{ClientProto, CommandInfo, TaskInfo, RequestedJob}; -use crate::lua::CommandOutput; - -#[derive(Debug)] -enum WorkAcquireError { -    Reqwest(reqwest::Error), -    EarlyEof, -    Protocol(String), -} - -struct RunnerClient { -    http: reqwest::Client, -    host: String, -    tx: hyper::body::Sender, -    rx: Response, -    current_job: Option<RequestedJob>, -} - -impl RequestedJob { -    pub fn into_running(self, client: RunnerClient) -> RunningJob { -        RunningJob { -            job: self, -            client, -            current_step: StepTracker::new(), -        } -    } -} - -struct JobEnv { -    lua: lua::BuildEnv, -    job: Arc<Mutex<RunningJob>>, -} - -impl JobEnv { -    fn new(job: &Arc<Mutex<RunningJob>>) -> Self { -        let lua = lua::BuildEnv::new(job); -        JobEnv { -            lua, -            job: Arc::clone(job) -        } -    } - -    async fn default_goodfile(self) -> Result<(), LuaError> { -        self.lua.run_build(crate::lua::DEFAULT_RUST_GOODFILE).await -    } - -    async fn exec_goodfile(self) -> Result<(), LuaError> { -        let script = std::fs::read_to_string("./tmpdir/goodfile").unwrap(); -        self.lua.run_build(script.as_bytes()).await -    } -} - -pub struct RunningJob { -    job: RequestedJob, -    client: RunnerClient, -    current_step: StepTracker, -} - -enum RepoError { -    CloneFailedIdk { exit_code: ExitStatus }, -    CheckoutFailedIdk { exit_code: ExitStatus }, -    CheckoutFailedMissingRef, -} - -pub struct StepTracker { -    scopes: Vec<String> -} - -impl StepTracker { -    pub fn new() -> Self { -        StepTracker { -            scopes: Vec::new() -        } -    } - -    pub fn push(&mut self, name: String) { -        self.scopes.push(name); -    } - -    pub fn pop(&mut self) { -        self.scopes.pop(); -    } - -    pub fn clear(&mut self) { -        self.scopes.clear(); -    } - -    pub fn full_step_path(&self) -> &[String] { -        self.scopes.as_slice() -    } -} - -impl RunningJob { -    async fn send_metric(&mut self, name: &str, value: String) -> Result<(), String> { -        self.client.send_typed(&ClientProto::metric(name, value)) -            .await -            .map_err(|e| format!("failed to send metric {}: {:?})", name, e)) -    } - -    // TODO: panics if hyper finds the channel is closed. hum -    async fn create_artifact(&self, name: &str, desc: &str) -> Result<ArtifactStream, String> { -        let (mut sender, body) = hyper::Body::channel(); -        let resp = self.client.http.post("https://ci.butactuallyin.space:9876/api/artifact") -            .header("user-agent", "ci-butactuallyin-space-runner") -            .header("x-task-token", &self.job.build_token) -            .header("x-artifact-name", name) -            .header("x-artifact-desc", desc) -            .body(body) -            .send() -            .await -            .map_err(|e| format!("unable to send request: {:?}", e))?; - -        if resp.status() == StatusCode::OK { -            eprintln!("[+] artifact '{}' started", name); -            Ok(ArtifactStream::new(sender)) -        } else { -            Err(format!("[-] unable to create artifact: {:?}", resp)) -        } -    } - -    async fn clone_remote(&self) -> Result<(), RepoError> { -        let mut git_clone = Command::new("git"); -        git_clone -            .arg("clone") -            .arg(&self.job.remote_url) -            .arg("tmpdir"); - -        let clone_res = self.execute_command_and_report(git_clone, "git clone log", &format!("git clone {} tmpdir", &self.job.remote_url)).await -            .map_err(|e| { -                eprintln!("stringy error (exec failed?) for clone: {}", e); -                RepoError::CloneFailedIdk { exit_code: ExitStatus::from_raw(0) } -            })?; - -        if !clone_res.success() { -            return Err(RepoError::CloneFailedIdk { exit_code: clone_res }); -        } - -        let mut git_checkout = Command::new("git"); -        git_checkout -            .current_dir("tmpdir") -            .arg("checkout") -            .arg(&self.job.commit); - -        let checkout_res = self.execute_command_and_report(git_checkout, "git checkout log", &format!("git checkout {}", &self.job.commit)).await -            .map_err(|e| { -                eprintln!("stringy error (exec failed?) for checkout: {}", e); -                RepoError::CheckoutFailedIdk { exit_code: ExitStatus::from_raw(0) } -            })?; - -        if !checkout_res.success() { -            if checkout_res.code() == Some(128) { -                return Err(RepoError::CheckoutFailedIdk { exit_code: checkout_res }); -            } else { -                return Err(RepoError::CheckoutFailedMissingRef); -            } -        } - -        Ok(()) -    } - -    async fn execute_command_and_report(&self, mut command: Command, name: &str, desc: &str) -> Result<ExitStatus, String> { -        let stdout_artifact = self.create_artifact( -            &format!("{} (stdout)", name), -            &format!("{} (stdout)", desc) -        ).await.expect("works"); -        let stderr_artifact = self.create_artifact( -            &format!("{} (stderr)", name), -            &format!("{} (stderr)", desc) -        ).await.expect("works"); - -        let exit_status = self.execute_command(command, name, desc, stdout_artifact, stderr_artifact).await?; - -        Ok(exit_status) -    } - -    async fn execute_command_capture_output(&self, mut command: Command, name: &str, desc: &str) -> Result<crate::lua::CommandOutput, String> { -        let stdout_collector = VecSink::new(); -        let stderr_collector = VecSink::new(); - -        let exit_status = self.execute_command(command, name, desc, stdout_collector.clone(), stderr_collector.clone()).await?; - -        Ok(CommandOutput { -            exit_status, -            stdout: stdout_collector.take_buf(), -            stderr: stderr_collector.take_buf(), -        }) -    } - -    async fn execute_command(&self, mut command: Command, name: &str, desc: &str, mut stdout_reporter: impl AsyncWrite + Unpin + Send + 'static, mut stderr_reporter: impl AsyncWrite + Unpin + Send + 'static) -> Result<ExitStatus, String> { -        eprintln!("[.] running {}", name); - -        let mut child = command -            .stdin(Stdio::null()) -            .stdout(Stdio::piped()) -            .stderr(Stdio::piped()) -            .spawn() -            .map_err(|e| format!("failed to spawn '{}', {:?}", name, e))?; - -        let mut child_stdout = child.stdout.take().unwrap(); -        let mut child_stderr = child.stderr.take().unwrap(); - -        eprintln!("[.] '{}': forwarding stdout", name); -        tokio::spawn(async move { crate::io::forward_data(&mut child_stdout, &mut stdout_reporter).await }); -        eprintln!("[.] '{}': forwarding stderr", name); -        tokio::spawn(async move { crate::io::forward_data(&mut child_stderr, &mut stderr_reporter).await }); - -        let res = child.wait().await -            .map_err(|e| format!("failed to wait? {:?}", e))?; - -        if res.success() { -            eprintln!("[+] '{}' success", name); -        } else { -            eprintln!("[-] '{}' fail: {:?}", name, res); -        } - -        Ok(res) -    } - -    async fn run(mut self) { -        self.client.send_typed(&ClientProto::Started).await.unwrap(); - -        std::fs::remove_dir_all("tmpdir").unwrap(); -        std::fs::create_dir("tmpdir").unwrap(); - -        let ctx = Arc::new(Mutex::new(self)); - -        let checkout_res = ctx.lock().unwrap().clone_remote().await; - -        if let Err(e) = checkout_res { -            let status = "bad_ref"; -            let status = ClientProto::task_status(TaskInfo::finished(status)); -            eprintln!("checkout failed, reporting status: {:?}", status); - -            let res = ctx.lock().unwrap().client.send_typed(&status).await; -            if let Err(e) = res { -                eprintln!("[!] FAILED TO REPORT JOB STATUS ({}): {:?}", "success", e); -            } - -            return; -        } - -        let lua_env = JobEnv::new(&ctx); - -        let metadata = std::fs::metadata("./tmpdir/goodfile"); -        let res: Result<String, (String, String)> = match metadata { -            Ok(_) => { -                match lua_env.exec_goodfile().await { -                    Ok(()) => { -                        Ok("pass".to_string()) -                    }, -                    Err(lua_err) => { -                        Err(("failed".to_string(), lua_err.to_string())) -                    } -                } -            }, -            Err(e) if e.kind() == std::io::ErrorKind::NotFound => { -                match lua_env.default_goodfile().await { -                    Ok(()) => { -                        Ok("pass".to_string()) -                    }, -                    Err(lua_err) => { -                        Err(("failed".to_string(), lua_err.to_string())) -                    } -                } -            }, -            Err(e) => { -                eprintln!("[-] error finding goodfile: {:?}", e); -                Err(("failed".to_string(), "inaccessible goodfile".to_string())) -            } -        }; - -        match res { -            Ok(status) => { -                eprintln!("[+] job success!"); -                let status = ClientProto::task_status(TaskInfo::finished(status)); -                eprintln!("reporting status: {:?}", status); - -                let res = ctx.lock().unwrap().client.send_typed(&status).await; -                if let Err(e) = res { -                    eprintln!("[!] FAILED TO REPORT JOB STATUS ({}): {:?}", "success", e); -                } -            } -            Err((status, lua_err)) => { -                eprintln!("[-] job error: {}", status); - -                let status = ClientProto::task_status(TaskInfo::interrupted(status, lua_err.to_string())); -                let res = ctx.lock().unwrap().client.send_typed(&status).await; -                if let Err(e) = res { -                    eprintln!("[!] FAILED TO REPORT JOB STATUS ({:?}): {:?}", status, e); -                } -            } -        } -    } - -    fn prep_command(command: &[String], working_dir: Option<&str>) -> (Command, String) { -        let mut cmd = Command::new(&command[0]); -        let cwd = match working_dir { -            Some(dir) => { -                format!("tmpdir/{}", dir) -            }, -            None => { -                "tmpdir".to_string() -            } -        }; -        eprintln!("prepared {:?} to run in {}", &command, &cwd); -        let human_name = command.join(" "); -        cmd -            .current_dir(cwd) -            .args(&command[1..]); -        (cmd, human_name) -    } - -    async fn run_with_output(&mut self, command: &[String], working_dir: Option<&str>) -> Result<CommandOutput, String> { -        let (cmd, human_name) = Self::prep_command(command, working_dir); - -        let cmd_res = self.execute_command_capture_output(cmd, &format!("{} log", human_name), &human_name).await?; - -        if !cmd_res.exit_status.success() { -            return Err(format!("{} failed: {:?}", &human_name, cmd_res.exit_status)); -        } -        Ok(cmd_res) -    } - -    async fn run_command(&mut self, command: &[String], working_dir: Option<&str>) -> Result<(), String> { -        self.client.send_typed(&ClientProto::command(CommandInfo::started(command, working_dir, 1))) -            .await.unwrap(); - -        let (cmd, human_name) = Self::prep_command(command, working_dir); - -        let cmd_res = self.execute_command_and_report(cmd, &format!("{} log", human_name), &human_name).await?; - -        self.client.send_typed(&ClientProto::command(CommandInfo::finished(cmd_res.code(), 1))) -            .await.unwrap(); - - -        if !cmd_res.success() { -            return Err(format!("{} failed: {:?}", &human_name, cmd_res)); -        } - -        Ok(()) -    } -} - -impl RunnerClient { -    async fn new(host: &str, mut sender: hyper::body::Sender, mut res: Response) -> Result<Self, String> { -        if res.status() != StatusCode::OK { -            return Err(format!("server returned a bad response: {:?}, response itself: {:?}", res.status(), res)); -        } - -        let hello = res.chunk().await.expect("chunk"); -        if hello.as_ref().map(|x| &x[..]) != Some(b"hello") { -            return Err(format!("bad hello: {:?}", hello)); -        } - -        Ok(Self { -            http: reqwest::ClientBuilder::new() -                .connect_timeout(Duration::from_millis(1000)) -                .timeout(Duration::from_millis(600000)) -                .build() -                .expect("can build client"), -            host: host.to_string(), -            tx: sender, -            rx: res, -            current_job: None, -        }) -    } - -    async fn wait_for_work(&mut self, accepted_pushers: Option<&[String]>) -> Result<Option<RequestedJob>, WorkAcquireError> { -        loop { -            let message = self.recv_typed::<ClientProto>().await; -            match message { -                Ok(Some(ClientProto::NewTask(new_task))) => { -                    return Ok(Some(new_task)); -                }, -                Ok(Some(ClientProto::Ping)) => { -                    self.send_typed(&ClientProto::Pong).await -                        .map_err(|e| WorkAcquireError::Protocol(format!("failed to pong: {}", e)))?; -                }, -                Ok(Some(other)) => { -                    return Err(WorkAcquireError::Protocol(format!("unexpected message: {:?}", other))); -                }, -                Ok(None) => { -                    return Ok(None); -                }, -                Err(e) => { -                    return Err(WorkAcquireError::Protocol(e)); -                } -            } -        } -    } - -    async fn recv(&mut self) -> Result<Option<serde_json::Value>, String> { -        self.recv_typed().await -    } - -    async fn recv_typed<T: DeserializeOwned>(&mut self) -> Result<Option<T>, String> { -        match self.rx.chunk().await { -            Ok(Some(chunk)) => { -                serde_json::from_slice(&chunk) -                    .map(Option::Some) -                    .map_err(|e| { -                        format!("not json: {:?}", e) -                    }) -            }, -            Ok(None) => Ok(None), -            Err(e) => { -                Err(format!("error in recv: {:?}", e)) -            } -        } -    } - -    async fn send(&mut self, value: serde_json::Value) -> Result<(), String> { -        self.send_typed(&value).await -    } - -    async fn send_typed<T: Serialize>(&mut self, t: &T) -> Result<(), String> { -        self.tx.send_data( -            serde_json::to_vec(t) -                .map_err(|e| format!("json error: {:?}", e))? -                .into() -        ).await -            .map_err(|e| format!("send error: {:?}", e)) -    } -} - -#[derive(Deserialize, Serialize)] -struct RunnerConfig { -    server_address: String, -    auth_secret: String, -    allowed_pushers: Option<Vec<String>>, -} - -#[tokio::main] -async fn main() { -    tracing_subscriber::fmt::init(); -    let mut args = std::env::args(); -    args.next().expect("first arg exists"); -    let config_path = args.next().unwrap_or("./runner_config.json".to_string()); -    let runner_config: RunnerConfig = serde_json::from_reader(std::fs::File::open(config_path).expect("file exists and is accessible")).expect("valid json for RunnerConfig"); -    let client = reqwest::ClientBuilder::new() -        .connect_timeout(Duration::from_millis(1000)) -        .timeout(Duration::from_millis(600000)) -        .build() -        .expect("can build client"); - -    let host_info = host_info::collect_host_info(); -    eprintln!("host info: {:?}", host_info); - -    loop { -        let (mut sender, body) = hyper::Body::channel(); - -        sender.send_data(serde_json::to_string(&ClientProto::new_task_please( -            runner_config.allowed_pushers.clone(), -            host_info.clone(), -        )).unwrap().into()).await.expect("req"); - -        let poll = client.post("https://ci.butactuallyin.space:9876/api/next_job") -            .header("user-agent", "ci-butactuallyin-space-runner") -            .header("authorization", runner_config.auth_secret.trim()) -            .body(body) -            .send() -            .await; - -        match poll { -            Ok(mut res) => { -                let mut client = match RunnerClient::new("ci.butactuallyin.space:9876", sender, res).await { -                    Ok(client) => client, -                    Err(e) => { -                        eprintln!("failed to initialize client: {:?}", e); -                        std::thread::sleep(Duration::from_millis(10000)); -                        continue; -                    } -                }; -                let job = match client.wait_for_work(runner_config.allowed_pushers.as_ref().map(|x| x.as_ref())).await { -                    Ok(Some(request)) => request, -                    Ok(None) => { -                        eprintln!("no work to do (yet)"); -                        std::thread::sleep(Duration::from_millis(2000)); -                        continue; -                    } -                    Err(e) => { -                        eprintln!("failed to get work: {:?}", e); -                        std::thread::sleep(Duration::from_millis(10000)); -                        continue; -                    } -                }; -                eprintln!("requested work: {:?}", job); - -                eprintln!("doing {:?}", job); - -                let mut job = job.into_running(client); -                job.run().await; -                std::thread::sleep(Duration::from_millis(10000)); -            }, -            Err(e) => { -                let message = format!("{}", e); - -                if message.contains("tcp connect error") { -                    eprintln!("could not reach server. sleeping a bit and retrying."); -                    std::thread::sleep(Duration::from_millis(5000)); -                    continue; -                } - -                eprintln!("unhandled error: {}", message); - -                std::thread::sleep(Duration::from_millis(1000)); -            } -        } -    } -} - -mod host_info { -    use crate::protocol::{CpuInfo, EnvInfo, HostInfo, MemoryInfo}; - -    // get host model name, microcode, and how many cores -    fn collect_cpu_info() -> CpuInfo { -        fn find_line(lines: &[String], prefix: &str) -> String { -            lines.iter() -                .find(|line| line.starts_with(prefix)) -                .expect(&format!("{} line is present", prefix)) -                .split(":") -                .last() -                .unwrap() -                .trim() -                .to_string() -        } - -        /// try finding core `cpu`'s max frequency in khz. we'll assume this is the actual speed a -        /// build would run at.. fingers crossed. -        fn try_finding_cpu_freq(cpu: u32) -> Result<u64, String> { -            if let Ok(freq_str) = std::fs::read_to_string("/sys/devices/system/cpu/cpu0/cpufreq/cpuinfo_max_freq") { -                Ok(freq_str.trim().parse().unwrap()) -            } else { -                // so cpufreq probably isn't around, maybe /proc/cpuinfo's mhz figure is present? -                let cpu_lines: Vec<String> = std::fs::read_to_string("/proc/cpuinfo").unwrap().split("\n").map(|line| line.to_string()).collect(); -                let cpu_mhzes: Vec<&String> = cpu_lines.iter().filter(|line| line.starts_with("cpu MHz")).collect(); -                match cpu_mhzes.get(cpu as usize) { -                    Some(mhz) => { -                        let mut line_parts = cpu_mhzes[cpu as usize].split(":"); -                        let _ = line_parts.next(); -                        let mhz = line_parts.next().unwrap().trim(); -                        let mhz: f64 = mhz.parse().unwrap(); -                        Ok((mhz * 1000.0) as u64) -                    }, -                    None => { -                        panic!("could not get cpu freq either from cpufreq or /proc/cpuinfo?"); -                    } -                } -            } -        } - -        // we'll have to deploy one of a few techniques, because x86/x86_64 is internally -        // consistent, but aarch64 is different. who knows what other CPUs think. -        match std::env::consts::ARCH { -            "x86" | "x86_64" => { -                let cpu_lines: Vec<String> = std::fs::read_to_string("/proc/cpuinfo").unwrap().split("\n").map(|line| line.to_string()).collect(); -                let model_names: Vec<&String> = cpu_lines.iter().filter(|line| line.starts_with("model name")).collect(); -                let cores = model_names.len() as u32; -                let model_name = find_line(&cpu_lines, "model name"); -                let vendor_id = find_line(&cpu_lines, "vendor_id"); -                let family = find_line(&cpu_lines, "cpu family"); -                let model = find_line(&cpu_lines, "model\t"); -                let microcode = find_line(&cpu_lines, "microcode"); -                let max_freq = try_finding_cpu_freq(0).unwrap(); - -                CpuInfo { model_name, microcode, cores, vendor_id, family, model, max_freq } -            } -            "aarch64" => { -                let cpu_lines: Vec<String> = std::fs::read_to_string("/proc/cpuinfo").unwrap().split("\n").map(|line| line.to_string()).collect(); -                let processors: Vec<&String> = cpu_lines.iter().filter(|line| line.starts_with("processor")).collect(); -                let cores = processors.len() as u32; - -                // alternate possible path: /sys/firmware/devicetree/base/compatible -                let model_name = std::fs::read_to_string("/proc/device-tree/compatible").unwrap(); -                let model_name = model_name.replace("\x00", ";"); -                let vendor_id = find_line(&cpu_lines, "CPU implementer"); -                let vendor_name = match vendor_id.as_str() { -                    "0x41" => "Arm Limited".to_string(), -                    "0x42" => "Broadcom Corporation".to_string(), -                    "0x43" => "Cavium Inc".to_string(), -                    "0x44" => "Digital Equipment Corporation".to_string(), -                    "0x46" => "Fujitsu Ltd".to_string(), -                    "0x49" => "Infineon Technologies AG".to_string(), -                    "0x4d" => "Motorola".to_string(), -                    "0x4e" => "NVIDIA Corporation".to_string(), -                    "0x50" => "Applied Micro Circuits Corporation".to_string(), -                    "0x51" => "Qualcomm Inc".to_string(), -                    "0x56" => "Marvell International Ltd".to_string(), -                    "0x69" => "Intel Corporation".to_string(), -                    "0xc0" => "Ampere Computing".to_string(), -                    other => format!("unknown aarch64 vendor {}", other), -                }; -                let family = find_line(&cpu_lines, "CPU architecture"); -                let model = find_line(&cpu_lines, "CPU part"); -                let microcode = String::new(); -                let max_freq = std::fs::read_to_string("/sys/devices/system/cpu/cpu0/cpufreq/cpuinfo_max_freq").unwrap().trim().parse().unwrap(); - -                CpuInfo { model_name, microcode, cores, vendor_id: vendor_name, family, model, max_freq } -            } -            other => { -                panic!("dunno how to find cpu info for {}, panik", other); -            } -        } -    } - -    fn collect_mem_info() -> MemoryInfo { -        let mem_lines: Vec<String> = std::fs::read_to_string("/proc/meminfo").unwrap().split("\n").map(|line| line.to_string()).collect(); -        let total = mem_lines[0].split(":").last().unwrap().trim().to_string(); -        let available = mem_lines[2].split(":").last().unwrap().trim().to_string(); - -        MemoryInfo { total, available } -    } - -    fn hostname() -> String { -        let mut bytes = [0u8; 4096]; -        let res = unsafe { -            libc::gethostname(bytes.as_mut_ptr() as *mut std::ffi::c_char, bytes.len()) -        }; -        if res != 0 { -            panic!("gethostname failed {:?}", res); -        } -        let end = bytes.iter().position(|b| *b == 0).expect("hostname is null-terminated"); -        std::ffi::CStr::from_bytes_with_nul(&bytes[..end+1]).expect("null-terminated string").to_str().expect("is utf-8").to_string() -    } - -    pub fn collect_env_info() -> EnvInfo { -        EnvInfo { -            arch: std::env::consts::ARCH.to_string(), -            family: std::env::consts::FAMILY.to_string(), -            os: std::env::consts::OS.to_string(), -        } -    } - -    pub fn collect_host_info() -> HostInfo { -        let cpu_info = collect_cpu_info(); -        let memory_info = collect_mem_info(); -        let hostname = hostname(); -        let env_info = collect_env_info(); - -        HostInfo { -            hostname, -            cpu_info, -            memory_info, -            env_info, -        } -    } -} - diff --git a/src/dbctx.rs b/src/dbctx.rs deleted file mode 100644 index 7378b2e..0000000 --- a/src/dbctx.rs +++ /dev/null @@ -1,772 +0,0 @@ -use std::sync::Mutex; -use futures_util::StreamExt; -use rusqlite::{Connection, OptionalExtension}; -use std::time::{SystemTime, UNIX_EPOCH}; -use tokio::fs::{File, OpenOptions}; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use std::path::Path; -use std::path::PathBuf; - -use crate::io::ArtifactDescriptor; -use crate::notifier::{RemoteNotifier, NotifierConfig}; -use crate::sql; - -const TOKEN_EXPIRY_MS: u64 = 1000 * 60 * 30; - -pub struct DbCtx { -    pub config_path: PathBuf, -    // don't love this but.. for now... -    pub conn: Mutex<Connection>, -} - -#[derive(Debug, Clone)] -pub struct Repo { -    pub id: u64, -    pub name: String, -    pub default_run_preference: Option<String>, -} - -#[derive(Debug)] -pub struct Remote { -    pub id: u64, -    pub repo_id: u64, -    pub remote_path: String, -    pub remote_api: String, -    pub remote_url: String, -    pub remote_git_url: String, -    pub notifier_config_path: String, -} - -// a job tracks when we became aware of a commit from remote. typically a job will have a 1-1 -// relationship with commits, and potentially many *runs* of that job. -#[derive(Debug, Clone)] -pub struct Job { -    pub id: u64, -    pub remote_id: u64, -    pub commit_id: u64, -    pub created_time: u64, -    pub source: Option<String>, -    pub run_preferences: Option<String>, -} - -// a run tracks the intent or obligation to have some runner somewhere run a goodfile and report -// results. a job may have many runs from many different hosts rebuliding history, or reruns of the -// same job on the same hardware to collect more datapoints on the operation. -#[derive(Debug, Clone)] -pub struct Run { -    pub id: u64, -    pub job_id: u64, -    pub artifacts_path: Option<String>, -    pub state: sql::RunState, -    pub host_id: Option<u64>, -    pub create_time: u64, -    pub start_time: Option<u64>, -    pub complete_time: Option<u64>, -    pub build_token: Option<String>, -    pub run_timeout: Option<u64>, -    pub build_result: Option<u8>, -    pub final_text: Option<String>, -} - -impl Run { -    fn into_pending_run(self) -> PendingRun { -        PendingRun { -            id: self.id, -            job_id: self.job_id, -            create_time: self.create_time, -        } -    } -} - -#[derive(Debug, Clone)] -pub struct PendingRun { -    pub id: u64, -    pub job_id: u64, -    pub create_time: u64, -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum TokenValidity { -    Expired, -    Invalid, -    Valid, -} - -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct MetricRecord { -    pub id: u64, -    pub run_id: u64, -    pub name: String, -    pub value: String -} - -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct ArtifactRecord { -    pub id: u64, -    pub run_id: u64, -    pub name: String, -    pub desc: String, -    pub created_time: u64, -    pub completed_time: Option<u64>, -} - -impl DbCtx { -    pub fn new<P: AsRef<Path>>(config_path: P, db_path: P) -> Self { -        DbCtx { -            config_path: config_path.as_ref().to_owned(), -            conn: Mutex::new(Connection::open(db_path).unwrap()) -        } -    } - -    pub fn create_tables(&self) -> Result<(), ()> { -        let conn = self.conn.lock().unwrap(); -        conn.execute(sql::CREATE_ARTIFACTS_TABLE, ()).unwrap(); -        conn.execute(sql::CREATE_JOBS_TABLE, ()).unwrap(); -        conn.execute(sql::CREATE_METRICS_TABLE, ()).unwrap(); -        conn.execute(sql::CREATE_COMMITS_TABLE, ()).unwrap(); -        conn.execute(sql::CREATE_REPOS_TABLE, ()).unwrap(); -        conn.execute(sql::CREATE_REPO_NAME_INDEX, ()).unwrap(); -        conn.execute(sql::CREATE_REMOTES_TABLE, ()).unwrap(); -        conn.execute(sql::CREATE_REMOTES_INDEX, ()).unwrap(); -        conn.execute(sql::CREATE_RUNS_TABLE, ()).unwrap(); -        conn.execute(sql::CREATE_HOSTS_TABLE, ()).unwrap(); - -        Ok(()) -    } - -    pub fn insert_metric(&self, run_id: u64, name: &str, value: &str) -> Result<(), String> { -        let conn = self.conn.lock().unwrap(); -        conn -            .execute( -                "insert into metrics (run_id, name, value) values (?1, ?2, ?3) on conflict (run_id, name) do update set value=excluded.value", -                (run_id, name, value) -            ) -            .expect("can upsert"); -        Ok(()) -    } - -    pub fn new_commit(&self, sha: &str) -> Result<u64, String> { -        let conn = self.conn.lock().unwrap(); -        conn -            .execute( -                "insert into commits (sha) values (?1)", -                [sha.clone()] -            ) -            .expect("can insert"); - -        Ok(conn.last_insert_rowid() as u64) -    } - -    pub fn new_repo(&self, name: &str) -> Result<u64, String> { -        let conn = self.conn.lock().unwrap(); -        conn -            .execute( -                "insert into repos (repo_name) values (?1)", -                [name.clone()] -            ) -            .map_err(|e| { -                format!("{:?}", e) -            })?; - -        Ok(conn.last_insert_rowid() as u64) -    } - -    pub async fn finalize_artifact(&self, artifact_id: u64) -> Result<(), String> { -        let conn = self.conn.lock().unwrap(); -        conn -            .execute( -                "update artifacts set completed_time=?1 where id=?2", -                (crate::io::now_ms(), artifact_id) -            ) -            .map(|_| ()) -            .map_err(|e| { -                format!("{:?}", e) -            }) -    } - -    pub async fn reserve_artifact(&self, run_id: u64, name: &str, desc: &str) -> Result<ArtifactDescriptor, String> { -        let artifact_id = { -            let created_time = SystemTime::now() -                .duration_since(UNIX_EPOCH) -                .expect("now is before epoch") -                .as_millis() as u64; -            let conn = self.conn.lock().unwrap(); -            conn -                .execute( -                    "insert into artifacts (run_id, name, desc, created_time) values (?1, ?2, ?3, ?4)", -                    (run_id, name, desc, created_time) -                ) -                .map_err(|e| { -                    format!("{:?}", e) -                })?; - -            conn.last_insert_rowid() as u64 -        }; - -        ArtifactDescriptor::new(run_id, artifact_id).await -    } - -    pub fn lookup_artifact(&self, run_id: u64, artifact_id: u64) -> Result<Option<ArtifactRecord>, String> { -        let conn = self.conn.lock().unwrap(); -        conn -            .query_row(sql::ARTIFACT_BY_ID, [artifact_id, run_id], |row| { -                let (id, run_id, name, desc, created_time, completed_time) = row.try_into().unwrap(); - -                Ok(ArtifactRecord { -                    id, run_id, name, desc, created_time, completed_time -                }) -            }) -            .optional() -            .map_err(|e| e.to_string()) -    } - -    pub fn commit_sha(&self, commit_id: u64) -> Result<String, String> { -        self.conn.lock() -            .unwrap() -            .query_row( -                "select sha from commits where id=?1", -                [commit_id], -                |row| { row.get(0) } -            ) -            .map_err(|e| e.to_string()) -    } - -    pub fn job_for_commit(&self, sha: &str) -> Result<Option<u64>, String> { -        self.conn.lock() -            .unwrap() -            .query_row( -                "select id from commits where sha=?1", -                [sha], -                |row| { row.get(0) } -            ) -            .optional() -            .map_err(|e| e.to_string()) -    } - -    pub fn run_for_token(&self, token: &str) -> Result<Option<(u64, Option<String>, TokenValidity)>, String> { -        self.conn.lock() -            .unwrap() -            .query_row( -                "select id, artifacts_path, started_time, run_timeout from runs where build_token=?1", -                [token], -                |row| { -                    let timeout: Option<u64> = row.get(3).unwrap(); -                    let timeout = timeout.unwrap_or(TOKEN_EXPIRY_MS); - -                    let now = SystemTime::now() -                        .duration_since(UNIX_EPOCH) -                        .expect("now is before epoch") -                        .as_millis(); - -                    let time: Option<u64> = row.get(2).unwrap(); -                    let validity = if let Some(time) = time { -                        if now > time as u128 + timeout as u128 { -                            TokenValidity::Expired -                        } else { -                            TokenValidity::Valid -                        } -                    } else { -                        TokenValidity::Invalid -                    }; -                    Ok((row.get(0).unwrap(), row.get(1).unwrap(), validity)) -                } -            ) -            .optional() -            .map_err(|e| e.to_string()) -    } - -    pub fn job_by_id(&self, id: u64) -> Result<Option<Job>, String> { -        self.conn.lock() -            .unwrap() -            .query_row(crate::sql::JOB_BY_ID, [id], |row| { -                let (id, source, created_time, remote_id, commit_id, run_preferences) = row.try_into().unwrap(); - -                Ok(Job { -                    id, source, created_time, remote_id, commit_id, run_preferences -                }) -            }) -            .optional() -            .map_err(|e| e.to_string()) -    } - -    pub fn remote_by_path_and_api(&self, api: &str, path: &str) -> Result<Option<Remote>, String> { -        self.conn.lock() -            .unwrap() -            .query_row("select id, repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path from remotes where remote_api=?1 and remote_path=?2", [api, path], |row| { -                let (id, repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path) = row.try_into().unwrap(); - -                Ok(Remote { -                    id, repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path -                }) -            }) -            .optional() -            .map_err(|e| e.to_string()) -    } - -    pub fn remote_by_id(&self, id: u64) -> Result<Option<Remote>, String> { -        self.conn.lock() -            .unwrap() -            .query_row("select id, repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path from remotes where id=?1", [id], |row| { -                let (id, repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path) = row.try_into().unwrap(); - -                Ok(Remote { -                    id, repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path -                }) -            }) -            .optional() -            .map_err(|e| e.to_string()) -    } - -    pub fn repo_id_by_remote(&self, remote_id: u64) -> Result<Option<u64>, String> { -        self.conn.lock() -            .unwrap() -            .query_row("select repo_id from remotes where id=?1", [remote_id], |row| row.get(0)) -            .optional() -            .map_err(|e| e.to_string()) -    } - -    pub fn repo_id_by_name(&self, repo_name: &str) -> Result<Option<u64>, String> { -        self.conn.lock() -            .unwrap() -            .query_row("select id from repos where repo_name=?1", [repo_name], |row| row.get(0)) -            .optional() -            .map_err(|e| e.to_string()) -    } - -    pub fn new_remote(&self, repo_id: u64, remote: &str, remote_kind: &str, config_path: &str) -> Result<u64, String> { -        let (remote_path, remote_api, remote_url, remote_git_url) = match remote_kind { -            "github" => { -                (remote.to_owned(), remote_kind.to_owned(), format!("https://www.github.com/{}", remote), format!("https://www.github.com/{}.git", remote)) -            }, -            "github-email" => { -                (remote.to_owned(), "email".to_owned(), format!("https://www.github.com/{}", remote), format!("http://www.github.com/{}.git", remote)) -            }, -            other => { -                panic!("unsupported remote kind: {}", other); -            } -        }; - -        let conn = self.conn.lock().unwrap(); -        conn -            .execute( -                "insert into remotes (repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path) values (?1, ?2, ?3, ?4, ?5, ?6);", -                (repo_id, remote_path, remote_api, remote_url, remote_git_url, config_path) -            ) -            .expect("can insert"); - -        Ok(conn.last_insert_rowid() as u64) -    } - -    pub fn new_job(&self, remote_id: u64, sha: &str, pusher: Option<&str>, repo_default_run_pref: Option<String>) -> Result<u64, String> { -        // TODO: potential race: if two remotes learn about a commit at the same time and we decide -        // to create two jobs at the same time, this might return an incorrect id if the insert -        // didn't actually insert a new row. -        let commit_id = self.new_commit(sha).expect("can create commit record"); - -        let created_time = SystemTime::now() -            .duration_since(UNIX_EPOCH) -            .expect("now is before epoch") -            .as_millis() as u64; - -        let conn = self.conn.lock().unwrap(); - -        let rows_modified = conn.execute( -            "insert into jobs (remote_id, commit_id, created_time, source, run_preferences) values (?1, ?2, ?3, ?4, ?5);", -            (remote_id, commit_id, created_time, pusher, repo_default_run_pref) -        ).unwrap(); - -        assert_eq!(1, rows_modified); - -        let job_id = conn.last_insert_rowid() as u64; - -        Ok(job_id) -    } - -    pub fn new_run(&self, job_id: u64, host_preference: Option<u32>) -> Result<PendingRun, String> { -        let created_time = SystemTime::now() -            .duration_since(UNIX_EPOCH) -            .expect("now is before epoch") -            .as_millis() as u64; - -        let conn = self.conn.lock().unwrap(); - -        let rows_modified = conn.execute( -            "insert into runs (job_id, state, created_time, host_preference) values (?1, ?2, ?3, ?4);", -            (job_id, crate::sql::RunState::Pending as u64, created_time, host_preference) -        ).unwrap(); - -        assert_eq!(1, rows_modified); - -        let run_id = conn.last_insert_rowid() as u64; - -        Ok(PendingRun { -            id: run_id, -            job_id, -            create_time: created_time, -        }) -    } - -    pub fn reap_task(&self, task_id: u64) -> Result<(), String> { -        let conn = self.conn.lock().unwrap(); - -        conn.execute( -            "update runs set final_status=\"lost signal\", state=4 where id=?1;", -            [task_id] -        ).unwrap(); - -        Ok(()) -    } - -    pub fn metrics_for_run(&self, run: u64) -> Result<Vec<MetricRecord>, String> { -        let conn = self.conn.lock().unwrap(); - -        let mut metrics_query = conn.prepare(sql::METRICS_FOR_RUN).unwrap(); -        let mut result = metrics_query.query([run]).unwrap(); -        let mut metrics = Vec::new(); - -        while let Some(row) = result.next().unwrap() { -            let (id, run_id, name, value): (u64, u64, String, String) = row.try_into().unwrap(); -            metrics.push(MetricRecord { id, run_id, name, value }); -        } - -        Ok(metrics) -    } - -    pub fn artifacts_for_run(&self, run: u64, limit: Option<u64>) -> Result<Vec<ArtifactRecord>, String> { -        let conn = self.conn.lock().unwrap(); - -        let mut artifacts_query = conn.prepare(sql::LAST_ARTIFACTS_FOR_RUN).unwrap(); -        let mut result = artifacts_query.query([run, limit.unwrap_or(65535)]).unwrap(); -        let mut artifacts = Vec::new(); - -        while let Some(row) = result.next().unwrap() { -            let (id, run_id, name, desc, created_time, completed_time): (u64, u64, String, String, u64, Option<u64>) = row.try_into().unwrap(); -            artifacts.push(ArtifactRecord { id, run_id, name, desc, created_time, completed_time }); -        } - -        Ok(artifacts) -    } - -    pub fn repo_by_id(&self, id: u64) -> Result<Option<Repo>, String> { -        self.conn.lock() -            .unwrap() -            .query_row("select id, repo_name, default_run_preference from repos where id=?1", [id], |row| { -                let (id, repo_name, default_run_preference) = row.try_into().unwrap(); -                Ok(Repo { -                    id, -                    name: repo_name, -                    default_run_preference, -                }) -            }) -            .optional() -            .map_err(|e| e.to_string()) -    } - -    pub fn get_repos(&self) -> Result<Vec<Repo>, String> { -        let conn = self.conn.lock().unwrap(); - -        let mut repos_query = conn.prepare(sql::ALL_REPOS).unwrap(); -        let mut repos = repos_query.query([]).unwrap(); -        let mut result = Vec::new(); - -        while let Some(row) = repos.next().unwrap() { -            let (id, repo_name, default_run_preference) = row.try_into().unwrap(); -            result.push(Repo { -                id, -                name: repo_name, -                default_run_preference, -            }); -        } - -        Ok(result) -    } - -    pub fn last_job_from_remote(&self, id: u64) -> Result<Option<Job>, String> { -        self.recent_jobs_from_remote(id, 1) -            .map(|mut jobs| jobs.pop()) -    } - -    pub fn job_by_commit_id(&self, commit_id: u64) -> Result<Option<Job>, String> { -        let conn = self.conn.lock().unwrap(); - -        conn -            .query_row(sql::JOB_BY_COMMIT_ID, [commit_id], |row| { -                let (id, source, created_time, remote_id, commit_id, run_preferences) = row.try_into().unwrap(); -                Ok(Job { -                    id, -                    remote_id, -                    commit_id, -                    created_time, -                    source, -                    run_preferences, -                }) -            }) -            .optional() -            .map_err(|e| e.to_string()) -    } - -    pub fn recent_jobs_from_remote(&self, id: u64, limit: u64) -> Result<Vec<Job>, String> { -        let conn = self.conn.lock().unwrap(); - -        let mut job_query = conn.prepare(sql::LAST_JOBS_FROM_REMOTE).unwrap(); -        let mut result = job_query.query([id, limit]).unwrap(); - -        let mut jobs = Vec::new(); - -        while let Some(row) = result.next().unwrap() { -            let (id, source, created_time, remote_id, commit_id, run_preferences) = row.try_into().unwrap(); -            jobs.push(Job { -                id, -                remote_id, -                commit_id, -                created_time, -                source, -                run_preferences, -            }); -        } - -        Ok(jobs) -    } - -    pub fn get_active_runs(&self) -> Result<Vec<Run>, String> { -        let conn = self.conn.lock().unwrap(); - -        let mut started_query = conn.prepare(sql::ACTIVE_RUNS).unwrap(); -        let mut runs = started_query.query([]).unwrap(); -        let mut started = Vec::new(); - -        while let Some(row) = runs.next().unwrap() { -            started.push(crate::sql::row2run(row)); -        } - -        Ok(started) -    } - -    pub fn get_pending_runs(&self, host_id: Option<u32>) -> Result<Vec<PendingRun>, String> { -        let conn = self.conn.lock().unwrap(); - -        let mut pending_query = conn.prepare(sql::PENDING_RUNS).unwrap(); -        let mut runs = pending_query.query([host_id]).unwrap(); -        let mut pending = Vec::new(); - -        while let Some(row) = runs.next().unwrap() { -            let (id, job_id, create_time) = row.try_into().unwrap(); -            let run = PendingRun { -                id, -                job_id, -                create_time, -            }; -            pending.push(run); -        } - -        Ok(pending) -    } - -    pub fn jobs_needing_task_runs_for_host(&self, host_id: u64) -> Result<Vec<Job>, String> { -        // for jobs that this host has not run, we'll arbitrarily say that we won't generate new -        // runs for jobs more than a day old. -        // -        // we don't want to rebuild the entire history every time we see a new host by default; if -        // you really want to rebuild all of history on a new host, use `ci_ctl` to prepare the -        // runs. -        let cutoff = crate::io::now_ms() - 24 * 3600 * 1000; - -        let conn = self.conn.lock().unwrap(); - -        let mut jobs_needing_task_runs = conn.prepare(sql::JOBS_NEEDING_HOST_RUN).unwrap(); -        let mut job_rows = jobs_needing_task_runs.query([cutoff, host_id]).unwrap(); -        let mut jobs = Vec::new(); - -        while let Some(row) = job_rows.next().unwrap() { -            let (id, source, created_time, remote_id, commit_id, run_preferences) = row.try_into().unwrap(); - -            jobs.push(Job { -                id, source, created_time, remote_id, commit_id, run_preferences, -            }); -        } - -        Ok(jobs) -    } - - -    pub fn remotes_by_repo(&self, repo_id: u64) -> Result<Vec<Remote>, String> { -        let mut remotes: Vec<Remote> = Vec::new(); - -        let conn = self.conn.lock().unwrap(); -        let mut remotes_query = conn.prepare(crate::sql::REMOTES_FOR_REPO).unwrap(); -        let mut remote_results = remotes_query.query([repo_id]).unwrap(); - -        while let Some(row) = remote_results.next().unwrap() { -            let (id, repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path) = row.try_into().unwrap(); -            remotes.push(Remote { id, repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path }); -        } - -        Ok(remotes) -    } - -    /// try to find a host close to `host_info`, but maybe not an exact match. -    /// -    /// specifically, we'll ignore microcode and family/os - enough that measurements ought to be -    /// comparable but maybe not perfectly so. -    pub fn find_id_like_host(&self, host_info: &crate::protocol::HostInfo) -> Result<Option<u32>, String> { -        self.conn.lock() -            .unwrap() -            .query_row( -                "select id from hosts where \ -                    hostname=?1 and cpu_vendor_id=?2 and cpu_model_name=?3 and cpu_family=?4 and \ -                    cpu_model=?5 and cpu_max_freq_khz=?6 and cpu_cores=?7 and mem_total=?8 and \ -                    arch=?9);", -                ( -                    &host_info.hostname, -                    &host_info.cpu_info.vendor_id, -                    &host_info.cpu_info.model_name, -                    &host_info.cpu_info.family, -                    &host_info.cpu_info.model, -                    &host_info.cpu_info.max_freq, -                    &host_info.cpu_info.cores, -                    &host_info.memory_info.total, -                    &host_info.env_info.arch, -                ), -                |row| { row.get(0) } -            ) -            .map_err(|e| e.to_string()) -    } - -    /// get an id for the host described by `host_info`. this may create a new record if no such -    /// host exists. -    pub fn id_for_host(&self, host_info: &crate::protocol::HostInfo) -> Result<u32, String> { -        let conn = self.conn.lock().unwrap(); - -        conn -            .execute( -                "insert or ignore into hosts \ -                 (\ -                     hostname, cpu_vendor_id, cpu_model_name, cpu_family, \ -                     cpu_model, cpu_microcode, cpu_max_freq_khz, cpu_cores, \ -                     mem_total, arch, family, os\ -                 ) values (\ -                     ?1, ?2, ?3, ?4, \ -                     ?5, ?6, ?7, ?8, \ -                     ?9, ?10, ?11, ?12 \ -                 );", -                ( -                    &host_info.hostname, -                    &host_info.cpu_info.vendor_id, -                    &host_info.cpu_info.model_name, -                    &host_info.cpu_info.family, -                    &host_info.cpu_info.model, -                    &host_info.cpu_info.microcode, -                    &host_info.cpu_info.max_freq, -                    &host_info.cpu_info.cores, -                    &host_info.memory_info.total, -                    &host_info.env_info.arch, -                    &host_info.env_info.family, -                    &host_info.env_info.os, -                ) -            ) -            .expect("can insert"); - -        conn -            .query_row( -                "select id from hosts where \ -                    hostname=?1 and cpu_vendor_id=?2 and cpu_model_name=?3 and cpu_family=?4 and \ -                    cpu_model=?5 and cpu_microcode=?6 and cpu_max_freq_khz=?7 and \ -                    cpu_cores=?8 and mem_total=?9 and arch=?10 and family=?11 and os=?12;", -                ( -                    &host_info.hostname, -                    &host_info.cpu_info.vendor_id, -                    &host_info.cpu_info.model_name, -                    &host_info.cpu_info.family, -                    &host_info.cpu_info.model, -                    &host_info.cpu_info.microcode, -                    &host_info.cpu_info.max_freq, -                    &host_info.cpu_info.cores, -                    &host_info.memory_info.total, -                    &host_info.env_info.arch, -                    &host_info.env_info.family, -                    &host_info.env_info.os, -                ), -                |row| { row.get(0) } -            ) -            .map_err(|e| e.to_string()) -    } - -    pub fn host_model_info(&self, host_id: u64) -> Result<(String, String, String, String, u64), String> { -        let conn = self.conn.lock().unwrap(); -        conn -            .query_row("select hostname, cpu_vendor_id, cpu_family, cpu_model, cpu_max_freq_khz from hosts where id=?1;", [host_id], |row| { -                Ok(( -                    row.get(0).unwrap(), -                    row.get(1).unwrap(), -                    row.get(2).unwrap(), -                    row.get(3).unwrap(), -                    row.get(4).unwrap(), -                )) -            }) -            .map_err(|e| e.to_string()) -    } - -    pub fn runs_for_job_one_per_host(&self, job_id: u64) -> Result<Vec<Run>, String> { -        let conn = self.conn.lock().unwrap(); -        let mut runs_query = conn.prepare(crate::sql::RUNS_FOR_JOB).unwrap(); -        let mut runs_results = runs_query.query([job_id]).unwrap(); - -        let mut results = Vec::new(); - -        while let Some(row) = runs_results.next().unwrap() { -            results.push(crate::sql::row2run(row)); -        } - -        Ok(results) -    } - -    pub fn last_run_for_job(&self, job_id: u64) -> Result<Option<Run>, String> { -        let conn = self.conn.lock().unwrap(); - -        conn -            .query_row(sql::LAST_RUN_FOR_JOB, [job_id], |row| { -                Ok(crate::sql::row2run(row)) -            }) -            .optional() -            .map_err(|e| e.to_string()) -    } - -    pub fn notifiers_by_repo(&self, repo_id: u64) -> Result<Vec<RemoteNotifier>, String> { -        let remotes = self.remotes_by_repo(repo_id)?; - -        let mut notifiers: Vec<RemoteNotifier> = Vec::new(); - -        for remote in remotes.into_iter() { -            match remote.remote_api.as_str() { -                "github" => { -                    let mut notifier_path = self.config_path.clone(); -                    notifier_path.push(&remote.notifier_config_path); - -                    let notifier = RemoteNotifier { -                        remote_path: remote.remote_path, -                        notifier: NotifierConfig::github_from_file(¬ifier_path) -                            .expect("can load notifier config") -                    }; -                    notifiers.push(notifier); -                }, -                "email" => { -                    let mut notifier_path = self.config_path.clone(); -                    notifier_path.push(&remote.notifier_config_path); - -                    let notifier = RemoteNotifier { -                        remote_path: remote.remote_path, -                        notifier: NotifierConfig::email_from_file(¬ifier_path) -                            .expect("can load notifier config") -                    }; -                    notifiers.push(notifier); -                } -                other => { -                    eprintln!("unknown remote api kind: {:?}, remote is {:?}", other, &remote) -                } -            } -        } - -        Ok(notifiers) -    } -} - diff --git a/src/io.rs b/src/io.rs deleted file mode 100644 index f9f407f..0000000 --- a/src/io.rs +++ /dev/null @@ -1,181 +0,0 @@ -use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; -use futures_util::StreamExt; -use tokio::fs::File; -use std::io::Write; -use tokio::fs::OpenOptions; -use std::task::{Poll, Context}; -use std::pin::Pin; -use std::time::{UNIX_EPOCH, SystemTime}; -use std::sync::{Arc, Mutex}; - -pub fn now_ms() -> u64 { -    SystemTime::now() -        .duration_since(UNIX_EPOCH) -        .expect("now is later than epoch") -        .as_millis() as u64 -} - -#[derive(Clone)] -pub struct VecSink { -    body: Arc<Mutex<Vec<u8>>>, -} - -impl VecSink { -    pub fn new() -> Self { -        Self { body: Arc::new(Mutex::new(Vec::new())) } -    } - -    pub fn take_buf(&self) -> Vec<u8> { -        std::mem::replace(&mut *self.body.lock().unwrap(), Vec::new()) -    } -} - -impl tokio::io::AsyncWrite for VecSink { -    fn poll_write( -        self: Pin<&mut Self>, -        cx: &mut Context, -        buf: &[u8] -    ) -> Poll<Result<usize, std::io::Error>> { -        self.body.lock().unwrap().extend_from_slice(buf); -        Poll::Ready(Ok(buf.len())) -    } - -    fn poll_flush( -        self: Pin<&mut Self>, -        _cx: &mut Context -    ) -> Poll<Result<(), std::io::Error>> { -        Poll::Ready(Ok(())) -    } - -    fn poll_shutdown( -        self: Pin<&mut Self>, -        _cx: &mut Context -    ) -> Poll<Result<(), std::io::Error>> { -        Poll::Ready(Ok(())) -    } -} - -pub struct ArtifactStream { -    sender: hyper::body::Sender, -} - -impl ArtifactStream { -    pub fn new(sender: hyper::body::Sender) -> Self { -        Self { sender } -    } -} - -impl tokio::io::AsyncWrite for ArtifactStream { -    fn poll_write( -        self: Pin<&mut Self>, -        cx: &mut Context, -        buf: &[u8] -    ) -> Poll<Result<usize, std::io::Error>> { -        match self.get_mut().sender.try_send_data(buf.to_vec().into()) { -            Ok(()) => { -                Poll::Ready(Ok(buf.len())) -            }, -            _ => { -                cx.waker().wake_by_ref(); -                Poll::Pending -            } -        } -    } - -    fn poll_flush( -        self: Pin<&mut Self>, -        _cx: &mut Context -    ) -> Poll<Result<(), std::io::Error>> { -        Poll::Ready(Ok(())) -    } - -    fn poll_shutdown( -        self: Pin<&mut Self>, -        _cx: &mut Context -    ) -> Poll<Result<(), std::io::Error>> { -        Poll::Ready(Ok(())) -    } -} - - -pub struct ArtifactDescriptor { -    job_id: u64, -    pub artifact_id: u64, -    file: File, -} - -impl ArtifactDescriptor { -    pub async fn new(job_id: u64, artifact_id: u64) -> Result<Self, String> { -        // TODO: jobs should be a configurable path -        let path = format!("artifacts/{}/{}", job_id, artifact_id); -        let file = OpenOptions::new() -            .read(true) -            .write(true) -            .create_new(true) -            .open(&path) -            .await -            .map_err(|e| format!("couldn't open artifact file {}: {}", path, e))?; - -        Ok(ArtifactDescriptor { -            job_id, -            artifact_id, -            file, -        }) -    } - -    pub async fn store_all(&mut self, mut data: axum::extract::BodyStream) -> Result<(), String> { -        loop { -            let chunk = data.next().await; - -            let chunk = match chunk { -                Some(Ok(chunk)) => chunk, -                Some(Err(e)) => { -                    eprintln!("error: {:?}", e); -                    return Err(format!("error reading: {:?}", e)); -                } -                None => { -                    eprintln!("body done?"); -                    return Ok(()); -                } -            }; - -            let chunk = chunk.as_ref(); - -            self.file.write_all(chunk).await -                .map_err(|e| format!("failed to write: {:?}", e))?; -        } -    } -} - -pub async fn forward_data(source: &mut (impl AsyncRead + Unpin), dest: &mut (impl AsyncWrite + Unpin)) -> Result<(), String> { -    let mut buf = vec![0; 1024 * 1024]; -    loop { -        let n_read = source.read(&mut buf).await -            .map_err(|e| format!("failed to read: {:?}", e))?; - -        if n_read == 0 { -            eprintln!("done reading!"); -            return Ok(()); -        } - -        dest.write_all(&buf[..n_read]).await -            .map_err(|e| format!("failed to write: {:?}", e))?; -    } -} -/* -pub async fn forward_data(source: &mut (impl AsyncRead + Unpin), dest: &mut ArtifactStream) -> Result<(), String> { -    let mut buf = vec![0; 1024 * 1024]; -    loop { -        let n_read = source.read(&mut buf).await -            .map_err(|e| format!("failed to read: {:?}", e))?; - -        if n_read == 0 { -            eprintln!("done reading!"); -            return Ok(()); -        } - -        dest.sender.send_data(buf[..n_read].to_vec().into()).await -            .map_err(|e| format!("failed to write: {:?}", e))?; -    } -} -*/ diff --git a/src/lua/mod.rs b/src/lua/mod.rs deleted file mode 100644 index 6c4a281..0000000 --- a/src/lua/mod.rs +++ /dev/null @@ -1,411 +0,0 @@ -use crate::RunningJob; - -use rlua::prelude::*; - -use std::sync::{Arc, Mutex}; -use std::path::PathBuf; - -pub const DEFAULT_RUST_GOODFILE: &'static [u8] = include_bytes!("../../config/goodfiles/rust.lua"); - -pub struct BuildEnv { -    lua: Lua, -    job: Arc<Mutex<RunningJob>>, -} - -#[derive(Debug)] -pub struct RunParams { -    step: Option<String>, -    name: Option<String>, -    cwd: Option<String>, -} - -pub struct CommandOutput { -    pub exit_status: std::process::ExitStatus, -    pub stdout: Vec<u8>, -    pub stderr: Vec<u8>, -} - -mod lua_exports { -    use crate::RunningJob; -    use crate::lua::{CommandOutput, RunParams}; - -    use std::sync::{Arc, Mutex}; -    use std::path::PathBuf; - -    use rlua::prelude::*; - -    pub fn collect_build_args(command: LuaValue, params: LuaValue) -> Result<(Vec<String>, RunParams), rlua::Error> { -        let args = match command { -            LuaValue::Table(table) => { -                let len = table.len().expect("command table has a length"); -                let mut command_args = Vec::new(); -                for i in 0..len { -                    let value = table.get(i + 1).expect("command arg is gettble"); -                    match value { -                        LuaValue::String(s) => { -                            command_args.push(s.to_str().unwrap().to_owned()); -                        }, -                        other => { -                            return Err(LuaError::RuntimeError(format!("argument {} was not a string, was {:?}", i, other))); -                        } -                    }; -                } - -                command_args -            }, -            other => { -                return Err(LuaError::RuntimeError(format!("argument 1 was not a table: {:?}", other))); -            } -        }; - -        let params = match params { -            LuaValue::Table(table) => { -                let step = match table.get("step").expect("can get from table") { -                    LuaValue::String(v) => { -                        Some(v.to_str()?.to_owned()) -                    }, -                    LuaValue::Nil => { -                        None -                    }, -                    other => { -                        return Err(LuaError::RuntimeError(format!("params[\"step\"] must be a string"))); -                    } -                }; -                let name = match table.get("name").expect("can get from table") { -                    LuaValue::String(v) => { -                        Some(v.to_str()?.to_owned()) -                    }, -                    LuaValue::Nil => { -                        None -                    }, -                    other => { -                        return Err(LuaError::RuntimeError(format!("params[\"name\"] must be a string"))); -                    } -                }; -                let cwd = match table.get("cwd").expect("can get from table") { -                    LuaValue::String(v) => { -                        Some(v.to_str()?.to_owned()) -                    }, -                    LuaValue::Nil => { -                        None -                    }, -                    other => { -                        return Err(LuaError::RuntimeError(format!("params[\"cwd\"] must be a string"))); -                    } -                }; - -                RunParams { -                    step, -                    name, -                    cwd, -                } -            }, -            LuaValue::Nil => { -                RunParams { -                    step: None, -                    name: None, -                    cwd: None, -                } -            } -            other => { -                return Err(LuaError::RuntimeError(format!("argument 2 was not a table: {:?}", other))); -            } -        }; - -        Ok((args, params)) -    } - -    pub fn build_command_impl(command: LuaValue, params: LuaValue, job_ctx: Arc<Mutex<RunningJob>>) -> Result<(), rlua::Error> { -        let (args, params) = collect_build_args(command, params)?; -        eprintln!("args: {:?}", args); -        eprintln!("  params: {:?}", params); -        let rt = tokio::runtime::Builder::new_current_thread() -            .enable_all() -            .build() -            .unwrap(); -        rt.block_on(async move { -            job_ctx.lock().unwrap().run_command(&args, params.cwd.as_ref().map(|x| x.as_str())).await -                .map_err(|e| LuaError::RuntimeError(format!("run_command error: {:?}", e))) -        }) -    } - -    pub fn check_output_impl<'lua>(ctx: rlua::Context<'lua>, command: LuaValue<'lua>, params: LuaValue<'lua>, job_ctx: Arc<Mutex<RunningJob>>) -> Result<rlua::Table<'lua>, rlua::Error> { -        let (args, params) = collect_build_args(command, params)?; -        eprintln!("args: {:?}", args); -        eprintln!("  params: {:?}", params); -        let rt = tokio::runtime::Builder::new_current_thread() -            .enable_all() -            .build() -            .unwrap(); -        let command_output = rt.block_on(async move { -            job_ctx.lock().unwrap().run_with_output(&args, params.cwd.as_ref().map(|x| x.as_str())).await -                .map_err(|e| LuaError::RuntimeError(format!("run_command error: {:?}", e))) -        })?; - -        let stdout = ctx.create_string(command_output.stdout.as_slice())?; -        let stderr = ctx.create_string(command_output.stderr.as_slice())?; - -        let result = ctx.create_table()?; -        result.set("stdout", stdout)?; -        result.set("stderr", stderr)?; -        result.set("status", command_output.exit_status.code())?; -        Ok(result) -    } - -    pub fn check_dependencies(commands: Vec<String>) -> Result<(), rlua::Error> { -        let mut missing_deps = Vec::new(); -        for command in commands.iter() { -            if !has_cmd(command)? { -                missing_deps.push(command.clone()); -            } -        } - -        if missing_deps.len() > 0 { -            return Err(LuaError::RuntimeError(format!("missing dependencies: {}", missing_deps.join(", ")))); -        } - -        Ok(()) -    } - -    pub fn metric(name: String, value: String, job_ctx: Arc<Mutex<RunningJob>>) -> Result<(), rlua::Error> { -        let rt = tokio::runtime::Builder::new_current_thread() -            .enable_all() -            .build() -            .unwrap(); -        rt.block_on(async move { -            job_ctx.lock().unwrap().send_metric(&name, value).await -                .map_err(|e| LuaError::RuntimeError(format!("send_metric error: {:?}", e))) -        }) -    } - -    pub fn artifact(path: String, name: Option<String>, job_ctx: Arc<Mutex<RunningJob>>) -> Result<(), rlua::Error> { -        let path: PathBuf = path.into(); - -        let default_name: String = match (path.file_name(), path.parent()) { -            (Some(name), _) => name -                .to_str() -                .ok_or(LuaError::RuntimeError("artifact name is not a unicode string".to_string()))? -                .to_string(), -            (_, Some(parent)) => format!("{}", parent.display()), -            (None, None) => { -                // one day using directories for artifacts might work but / is still not going -                // to be accepted -                return Err(LuaError::RuntimeError(format!("cannot infer a default path name for {}", path.display()))); -            } -        }; - -        let name: String = match name { -            Some(name) => name, -            None => default_name, -        }; -        let rt = tokio::runtime::Builder::new_current_thread() -            .enable_all() -            .build() -            .unwrap(); -        rt.block_on(async move { -            let mut artifact = job_ctx.lock().unwrap().create_artifact(&name, &format!("{} (from {})", name, path.display())).await -                .map_err(|e| LuaError::RuntimeError(format!("create_artifact error: {:?}", e))) -                .unwrap(); -            let mut file = tokio::fs::File::open(&format!("tmpdir/{}", path.display())).await.unwrap(); -            eprintln!("uploading..."); -            crate::io::forward_data(&mut file, &mut artifact).await -                .map_err(|e| LuaError::RuntimeError(format!("failed uploading data for {}: {:?}", name, e)))?; -            std::mem::drop(artifact); -            Ok(()) -        }) -    } - -    pub fn has_cmd(name: &str) -> Result<bool, rlua::Error> { -        Ok(std::process::Command::new("which") -            .arg(name) -            .status() -            .map_err(|e| LuaError::RuntimeError(format!("could not fork which? {:?}", e)))? -            .success()) -    } - -    pub fn file_size(path: &str) -> Result<u64, rlua::Error> { -        Ok(std::fs::metadata(&format!("tmpdir/{}", path)) -            .map_err(|e| LuaError::RuntimeError(format!("could not stat {:?}", path)))? -            .len()) -    } - -    pub mod step { -        use crate::RunningJob; -        use std::sync::{Arc, Mutex}; - -        pub fn start(job_ref: Arc<Mutex<RunningJob>>, name: String) -> Result<(), rlua::Error> { -            let mut job = job_ref.lock().unwrap(); -            job.current_step.clear(); -            job.current_step.push(name); -            Ok(()) -        } - -        pub fn push(job_ref: Arc<Mutex<RunningJob>>, name: String) -> Result<(), rlua::Error> { -            let mut job = job_ref.lock().unwrap(); -            job.current_step.push(name); -            Ok(()) -        } - -        pub fn advance(job_ref: Arc<Mutex<RunningJob>>, name: String) -> Result<(), rlua::Error> { -            let mut job = job_ref.lock().unwrap(); -            job.current_step.pop(); -            job.current_step.push(name); -            Ok(()) -        } -    } -} - -struct DeclEnv<'lua, 'env> { -    lua_ctx: &'env rlua::Context<'lua>, -    job_ref: &'env Arc<Mutex<RunningJob>>, -} -impl<'lua, 'env> DeclEnv<'lua, 'env> { -    fn create_function<A, R, F>(&self, name: &str, f: F) ->  Result<rlua::Function<'lua>, String> -        where -        A: FromLuaMulti<'lua>, -        R: ToLuaMulti<'lua>, -        F: 'static + Send + Fn(rlua::Context<'lua>, Arc<Mutex<RunningJob>>, A) -> Result<R, rlua::Error> { - -        let job_ref = Arc::clone(self.job_ref); -        self.lua_ctx.create_function(move |ctx, args| { -            let job_ref = Arc::clone(&job_ref); -            f(ctx, job_ref, args) -        }) -            .map_err(|e| format!("problem defining {} function: {:?}", name, e)) -    } -} - -impl BuildEnv { -    pub fn new(job: &Arc<Mutex<RunningJob>>) -> Self { -        let env = BuildEnv { -            lua: Lua::new(), -            job: Arc::clone(job), -        }; -        env.lua.context(|lua_ctx| { -            env.define_env(lua_ctx) -        }).expect("can define context"); -        env -    } - -    fn define_env(&self, lua_ctx: rlua::Context) -> Result<(), String> { -        let decl_env = DeclEnv { -            lua_ctx: &lua_ctx, -            job_ref: &self.job, -        }; - -        let hello = decl_env.create_function("hello", |_, _, ()| { -            eprintln!("hello from lua!!!"); -            Ok(()) -        })?; - -        let check_dependencies = decl_env.create_function("dependencies", move |_, job_ref, commands: Vec<String>| { -            lua_exports::check_dependencies(commands) -        })?; - -        let build = decl_env.create_function("build", move |_, job_ref, (command, params): (LuaValue, LuaValue)| { -            lua_exports::build_command_impl(command, params, job_ref) -        })?; - -        let check_output = decl_env.create_function("check_output", move |ctx, job_ref, (command, params): (LuaValue, LuaValue)| { -            lua_exports::check_output_impl(ctx, command, params, job_ref) -        })?; - -        let metric = decl_env.create_function("metric", move |_, job_ref, (name, value): (String, String)| { -            lua_exports::metric(name, value, job_ref) -        })?; - -        let now_ms = decl_env.create_function("now_ms", move |_, job_ref, ()| Ok(crate::io::now_ms()))?; - -        let artifact = decl_env.create_function("artifact", move |_, job_ref, (path, name): (String, Option<String>)| { -            lua_exports::artifact(path, name, job_ref) -        })?; - -        let error = decl_env.create_function("error", move |_, job_ref, msg: String| { -            Err::<(), LuaError>(LuaError::RuntimeError(format!("explicit error: {}", msg))) -        })?; - -        let path_has_cmd = decl_env.create_function("path_has_cmd", move |_, job_ref, name: String| { -            lua_exports::has_cmd(&name) -        })?; - -        let size_of_file = decl_env.create_function("size_of_file", move |_, job_ref, name: String| { -            lua_exports::file_size(&name) -        })?; - -        let native_rust_triple = match std::env::consts::ARCH { -            "x86_64" => "x86_64-unknown-linux-gnu", -            "aarch64" => "aarch64-unknown-linux-gnu", -            other => { panic!("dunno native rust triple for arch {}", other); } -        }; -        let native_rust_triple = lua_ctx.create_string(native_rust_triple).unwrap(); -        let build_env_vars = lua_ctx.create_table_from( -            vec![ -                ("native_rust_triple", native_rust_triple) -            ] -        ).unwrap(); - -        let build_environment = lua_ctx.create_table_from( -            vec![ -                ("has", path_has_cmd), -                ("size", size_of_file), -            ] -        ).unwrap(); -        build_environment.set("vars", build_env_vars).unwrap(); - -        let build_functions = lua_ctx.create_table_from( -            vec![ -                ("hello", hello), -                ("run", build), -                ("dependencies", check_dependencies), -                ("metric", metric), -                ("error", error), -                ("artifact", artifact), -                ("now_ms", now_ms), -                ("check_output", check_output), -            ] -        ).unwrap(); -        build_functions.set("environment", build_environment).unwrap(); -        let current_commit = self.job.lock().unwrap().job.commit.clone(); -        build_functions.set("sha", lua_ctx.create_string(current_commit.as_bytes()).unwrap()).unwrap(); -        let globals = lua_ctx.globals(); -        globals.set("Build", build_functions).unwrap(); - - -        let step_start = decl_env.create_function("step_start", move |_, job_ref, name: String| { -            lua_exports::step::start(job_ref, name) -        })?; - -        let step_push = decl_env.create_function("step_push", move |_, job_ref, name: String| { -            lua_exports::step::push(job_ref, name) -        })?; - -        let step_advance = decl_env.create_function("step_advance", move |_, job_ref, name: String| { -            lua_exports::step::advance(job_ref, name) -        })?; - -        let step_functions = lua_ctx.create_table_from( -            vec![ -                ("start", step_start), -                ("push", step_push), -                ("advance", step_advance), -            ] -        ).unwrap(); -        globals.set("Step", step_functions).unwrap(); -        Ok(()) -    } - -    pub async fn run_build(self, script: &[u8]) -> Result<(), LuaError> { -        let script = script.to_vec(); -        let res: Result<(), LuaError> = tokio::task::spawn_blocking(|| { -            std::thread::spawn(move || { -                self.lua.context(|lua_ctx| { -                    lua_ctx.load(&script) -                        .set_name("goodfile")? -                        .exec() -                }) -            }).join().unwrap() -        }).await.unwrap(); -        eprintln!("lua res: {:?}", res); -        res -    } -} diff --git a/src/main.rs b/src/main.rs deleted file mode 100644 index d11df1f..0000000 --- a/src/main.rs +++ /dev/null @@ -1,1092 +0,0 @@ -#![allow(dead_code)] -#![allow(unused_variables)] -#![allow(unused_imports)] - -use chrono::{Utc, TimeZone}; -use lazy_static::lazy_static; -use std::sync::RwLock; -use std::collections::HashMap; -use serde_derive::{Deserialize, Serialize}; -use tokio::spawn; -use std::path::PathBuf; -use axum_server::tls_rustls::RustlsConfig; -use axum::routing::*; -use axum::Router; -use axum::response::{IntoResponse, Response, Html}; -use std::net::SocketAddr; -use axum::extract::{Path, State}; -use http_body::combinators::UnsyncBoxBody; -use axum::{Error, Json}; -use axum::extract::rejection::JsonRejection; -use axum::body::Bytes; -use axum::http::{StatusCode, Uri}; -use http::header::HeaderMap; -use tokio::sync::mpsc; -use tokio_stream::wrappers::ReceiverStream; -use axum::body::StreamBody; - -use std::sync::Arc; -use std::time::{SystemTime, UNIX_EPOCH}; - -use hmac::{Hmac, Mac}; -use sha2::Sha256; - -mod io; -mod sql; -mod notifier; -mod dbctx; -mod protocol; - -use sql::RunState; - -use dbctx::{DbCtx, Job, Run, ArtifactRecord}; - -use rusqlite::OptionalExtension; - -#[derive(Serialize, Deserialize)] -struct WebserverConfig { -    psks: Vec<GithubPsk>, -    jobs_path: PathBuf, -    config_path: PathBuf, -    db_path: PathBuf, -    debug_addr: Option<serde_json::Value>, -    server_addr: Option<serde_json::Value>, -} - -#[derive(Clone)] -struct WebserverState { -    jobs_path: PathBuf, -    dbctx: Arc<DbCtx>, -} - -#[derive(Clone, Serialize, Deserialize)] -struct GithubPsk { -    key: String, -    gh_user: String, -} - -lazy_static! { -    static ref PSKS: RwLock<Vec<GithubPsk>> = RwLock::new(Vec::new()); -} - -#[derive(Copy, Clone, Debug)] -enum GithubHookError { -    BodyNotObject, -    MissingElement { path: &'static str }, -    BadType { path: &'static str, expected: &'static str }, -} - -#[derive(Debug)] -enum GithubEvent { -    Push { tip: String, repo_name: String, head_commit: serde_json::Map<String, serde_json::Value>, pusher: serde_json::Map<String, serde_json::Value> }, -    Other {} -} - -/// return a duration rendered as the largest two non-zero units. -/// -/// 60000ms -> 1m -/// 60001ms -> 1m -/// 61000ms -> 1m1s -///  1030ms -> 1.03s -fn duration_as_human_string(duration_ms: u64) -> String { -    let duration_sec = duration_ms / 1000; -    let duration_min = duration_sec / 60; -    let duration_hours = duration_min / 60; - -    let duration_ms = duration_ms % 1000; -    let duration_sec = duration_sec % 60; -    let duration_min = duration_min % 60; -    // no need to clamp hours, we're gonna just hope that it's a reasonable number of hours - -    if duration_hours != 0 { -        let mut res = format!("{}h", duration_hours); -        if duration_min != 0 { -            res.push_str(&format!("{}m", duration_min)); -        } -        res -    } else if duration_min != 0 { -        let mut res = format!("{}m", duration_min); -        if duration_min != 0 { -            res.push_str(&format!("{}s", duration_sec)); -        } -        res -    } else { -        let mut res = format!("{}", duration_sec); -        if duration_ms != 0 { -            res.push_str(&format!(".{:03}", duration_ms)); -        } -        res.push('s'); -        res -    } -} - -/// try producing a url for whatever caused this job to be started, if possible -fn commit_url(job: &Job, commit_sha: &str, ctx: &Arc<DbCtx>) -> Option<String> { -    let remote = ctx.remote_by_id(job.remote_id).expect("query succeeds").expect("existing job references existing remote"); - -    match remote.remote_api.as_str() { -        "github" => { -            Some(format!("{}/commit/{}", remote.remote_url, commit_sha)) -        }, -        "email" => { -            None -        }, -        _ => { -            None -        } -    } -} - -/// produce a url to the ci.butactuallyin.space job details page -fn job_url(job: &Job, commit_sha: &str, ctx: &Arc<DbCtx>) -> String { -    let remote = ctx.remote_by_id(job.remote_id).expect("query succeeds").expect("existing job references existing remote"); - -    if remote.remote_api != "github" { -        eprintln!("job url for remote type {} can't be constructed, i think", &remote.remote_api); -    } - -    format!("{}/{}", &remote.remote_path, commit_sha) -} - -/// render how long a run took, or is taking, in a human-friendly way -fn display_run_time(run: &Run) -> String { -    if let Some(start_time) = run.start_time { -        if let Some(complete_time) = run.complete_time { -            if complete_time < start_time { -                if run.state == RunState::Started { -                    // this run has been restarted. the completed time is stale. -                    // further, this is a currently active run. -                    let now_ms = SystemTime::now().duration_since(UNIX_EPOCH).expect("now is after then").as_millis() as u64; -                    let mut duration = duration_as_human_string(now_ms - start_time); -                    duration.push_str(" (ongoing)"); -                    duration -                } else { -                    "invalid data".to_string() -                } -            } else { -                let duration_ms = complete_time - start_time; -                let duration = duration_as_human_string(duration_ms); -                duration -            } -        } else { -            if run.state != RunState::Invalid { -                let now_ms = SystemTime::now().duration_since(UNIX_EPOCH).expect("now is after then").as_millis() as u64; -                let mut duration = duration_as_human_string(now_ms - start_time); -                duration.push_str(" (ongoing)"); -                duration -            } else { -                "n/a".to_string() -            } -        } -    } else { -        "not yet run".to_owned() -    } -} - -fn parse_push_event(body: serde_json::Value) -> Result<GithubEvent, GithubHookError> { -    let body = body.as_object() -        .ok_or(GithubHookError::BodyNotObject)?; - -    let tip = body.get("after") -        .ok_or(GithubHookError::MissingElement { path: "after" })? -        .as_str() -        .ok_or(GithubHookError::BadType { path: "after", expected: "str" })? -        .to_owned(); - -    let repo_name = body.get("repository") -        .ok_or(GithubHookError::MissingElement { path: "repository" })? -        .as_object() -        .ok_or(GithubHookError::BadType { path: "repository", expected: "obj" })? -        .get("full_name") -        .ok_or(GithubHookError::MissingElement { path: "repository/full_name" })? -        .as_str() -        .ok_or(GithubHookError::BadType { path: "repository/full_name", expected: "str" })? -        .to_owned(); - -    let head_commit = body.get("head_commit") -        .ok_or(GithubHookError::MissingElement { path: "head_commit" })? -        .as_object() -        .ok_or(GithubHookError::BadType { path: "head_commit", expected: "obj" })? -        .to_owned(); - -    let pusher = body.get("pusher") -        .ok_or(GithubHookError::MissingElement { path: "pusher" })? -        .as_object() -        .ok_or(GithubHookError::BadType { path: "pusher", expected: "obj" })? -        .to_owned(); - -    Ok(GithubEvent::Push { tip, repo_name, head_commit, pusher }) -} - -async fn process_push_event(ctx: Arc<DbCtx>, owner: String, repo: String, event: GithubEvent) -> impl IntoResponse { -    let (sha, repo, head_commit, pusher) = if let GithubEvent::Push { tip, repo_name, head_commit, pusher } = event { -        (tip, repo_name, head_commit, pusher) -    } else { -        panic!("process push event on non-push event"); -    }; - -    println!("handling push event to {}/{}: sha {} in repo {}, {:?}\n  pusher: {:?}", owner, repo, sha, repo, head_commit, pusher); - -    // push event is in terms of a ref, but we don't know if it's a new commit (yet). -    // in terms of CI jobs, we care mainly about new commits. -    // so... -    // * look up the commit, -    // * if it known, bail out (new ref for existing commit we've already handled some way) -    // * create a new commit ref -    // * create a new job (state=pending) for the commit ref -    let commit_id: Option<u64> = ctx.conn.lock().unwrap() -        .query_row(sql::COMMIT_TO_ID, [sha.clone()], |row| row.get(0)) -        .optional() -        .expect("can run query"); - -    if commit_id.is_some() { -        eprintln!("commit already exists"); -        return (StatusCode::OK, String::new()); -    } - -    let remote_url = format!("https://www.github.com/{}.git", repo); -    eprintln!("looking for remote url: {}", remote_url); -    let (remote_id, repo_id): (u64, u64) = match ctx.conn.lock().unwrap() -        .query_row("select id, repo_id from remotes where remote_git_url=?1;", [&remote_url], |row| Ok((row.get(0).unwrap(), row.get(1).unwrap()))) -        .optional() -        .unwrap() { -        Some(elems) => elems, -        None => { -            eprintln!("no remote registered for url {} (repo {})", remote_url, repo); -            return (StatusCode::NOT_FOUND, String::new()); -        } -    }; - -    let repo_default_run_pref: Option<String> = ctx.conn.lock().unwrap() -        .query_row("select default_run_preference from repos where id=?1;", [repo_id], |row| { -            Ok((row.get(0)).unwrap()) -        }) -        .expect("can query"); - -    let pusher_email = pusher -        .get("email") -        .expect("has email") -        .as_str() -        .expect("is str"); - -    let job_id = ctx.new_job(remote_id, &sha, Some(pusher_email), repo_default_run_pref).unwrap(); -    let _ = ctx.new_run(job_id, None).unwrap(); - -    let notifiers = ctx.notifiers_by_repo(repo_id).expect("can get notifiers"); - -    for notifier in notifiers { -        notifier.tell_pending_job(&ctx, repo_id, &sha, job_id).await.expect("can notify"); -    } - -    (StatusCode::OK, String::new()) -} - -async fn handle_github_event(ctx: Arc<DbCtx>, owner: String, repo: String, event_kind: String, body: serde_json::Value) -> Response<UnsyncBoxBody<Bytes, Error>> { -    println!("got github event: {}, {}, {}", owner, repo, event_kind); -    match event_kind.as_str() { -        "push" => { -            let push_event = parse_push_event(body) -                .map_err(|e| { -                    eprintln!("TODO: handle push event error: {:?}", e); -                    panic!() -                }) -                .expect("parse works"); -            let res = process_push_event(ctx, owner, repo, push_event).await; -            "ok".into_response() -        }, -        "status" => { -            eprintln!("[.] status update"); -            "ok".into_response() -        } -        other => { -            eprintln!("unhandled event kind: {}, repo {}/{}. content: {:?}", other, owner, repo, body); -            "".into_response() -        } -    } -} - -async fn handle_ci_index(State(ctx): State<WebserverState>) -> impl IntoResponse { -    eprintln!("root index"); -    let repos = match ctx.dbctx.get_repos() { -        Ok(repos) => repos, -        Err(e) => { -            eprintln!("failed to get repos: {:?}", e); -            return (StatusCode::INTERNAL_SERVER_ERROR, Html("gonna feel that one tomorrow".to_string())); -        } -    }; - -    let mut response = String::new(); - -    response.push_str("<html>\n"); -    response.push_str("<style>\n"); -    response.push_str(".build-table { font-family: monospace; border: 1px solid black; border-collapse: collapse; }\n"); -    response.push_str(".row-item { padding-left: 4px; padding-right: 4px; border-right: 1px solid black; }\n"); -    response.push_str(".odd-row { background: #eee; }\n"); -    response.push_str(".even-row { background: #ddd; }\n"); -    response.push_str("</style>\n"); -    response.push_str("<h1>builds and build accessories</h1>\n"); - -    match repos.len() { -        0 => { response.push_str(&format!("<p>no repos configured, so there are no builds</p>\n")); }, -        1 => { response.push_str("<p>1 repo configured</p>\n"); }, -        other => { response.push_str(&format!("<p>{} repos configured</p>\n", other)); }, -    } - -    response.push_str("<table class='build-table'>"); -    response.push_str("<tr>\n"); -    let headings = ["repo", "last build", "job", "build commit", "duration", "status", "result"]; -    for heading in headings { -        response.push_str(&format!("<th class='row-item'>{}</th>", heading)); -    } -    response.push_str("</tr>\n"); - -    let mut row_num = 0; - -    for repo in repos { -        let mut most_recent_run: Option<(Job, Run)> = None; - -        for remote in ctx.dbctx.remotes_by_repo(repo.id).expect("remotes by repo works") { -            let last_job = ctx.dbctx.last_job_from_remote(remote.id).expect("job by remote works"); -            if let Some(last_job) = last_job { -                if let Some(last_run) = ctx.dbctx.last_run_for_job(last_job.id).expect("can query") { -                    if most_recent_run.as_ref().map(|run| run.1.create_time < last_run.create_time).unwrap_or(true) { -                        most_recent_run = Some((last_job, last_run)); -                    } -                } -            } -        } - -        let repo_html = format!("<a href=\"/{}\">{}</a>", &repo.name, &repo.name); - -        let row_html: String = match most_recent_run { -            Some((job, run)) => { -                let job_commit = ctx.dbctx.commit_sha(job.commit_id).expect("job has a commit"); -                let commit_html = match commit_url(&job, &job_commit, &ctx.dbctx) { -                    Some(url) => format!("<a href=\"{}\">{}</a>", url, &job_commit), -                    None => job_commit.clone() -                }; - -                let job_html = format!("<a href=\"{}\">{}</a>", job_url(&job, &job_commit, &ctx.dbctx), job.id); - -                let last_build_time = Utc.timestamp_millis_opt(run.create_time as i64).unwrap().to_rfc2822(); -                let duration = display_run_time(&run); - -                let status = format!("{:?}", run.state).to_lowercase(); - -                let result = match run.build_result { -                    Some(0) => "<span style='color:green;'>pass</span>", -                    Some(_) => "<span style='color:red;'>fail</span>", -                    None => match run.state { -                        RunState::Pending => { "unstarted" }, -                        RunState::Started => { "<span style='color:darkgoldenrod;'>in progress</span>" }, -                        _ => { "<span style='color:red;'>unreported</span>" } -                    } -                }; - -                let entries = [repo_html.as_str(), last_build_time.as_str(), job_html.as_str(), commit_html.as_str(), &duration, &status, &result]; -                let entries = entries.iter().chain(std::iter::repeat(&"")).take(headings.len()); - -                let mut row_html = String::new(); -                for entry in entries { -                    row_html.push_str(&format!("<td class='row-item'>{}</td>", entry)); -                } -                row_html -            } -            None => { -                let entries = [repo_html.as_str()]; -                let entries = entries.iter().chain(std::iter::repeat(&"")).take(headings.len()); - -                let mut row_html = String::new(); -                for entry in entries { -                    row_html.push_str(&format!("<td class='row-item'>{}</td>", entry)); -                } -                row_html -            } -        }; - -        let row_index = row_num % 2; -        response.push_str(&format!("<tr class=\"{}\">", ["even-row", "odd-row"][row_index])); -        response.push_str(&row_html); -        response.push_str("</tr>"); -        response.push('\n'); - -        row_num += 1; -    } -    response.push_str("</table>"); - -    response.push_str("<h4>active tasks</h4>\n"); - -    let runs = ctx.dbctx.get_active_runs().expect("can query"); -    if runs.len() == 0 { -        response.push_str("<p>(none)</p>\n"); -    } else { -        response.push_str("<table class='build-table'>"); -        response.push_str("<tr>\n"); -        let headings = ["repo", "last build", "job", "build commit", "duration", "status", "result"]; -        for heading in headings { -            response.push_str(&format!("<th class='row-item'>{}</th>", heading)); -        } -        response.push_str("</tr>\n"); - -        let mut row_num = 0; - -        for run in runs.iter() { -            let row_index = row_num % 2; - -            let job = ctx.dbctx.job_by_id(run.job_id).expect("query succeeds").expect("job id is valid"); -            let remote = ctx.dbctx.remote_by_id(job.remote_id).expect("query succeeds").expect("remote id is valid"); -            let repo = ctx.dbctx.repo_by_id(remote.repo_id).expect("query succeeds").expect("repo id is valid"); - -            let repo_html = format!("<a href=\"/{}\">{}</a>", &repo.name, &repo.name); - -            let job_commit = ctx.dbctx.commit_sha(job.commit_id).expect("job has a commit"); -            let commit_html = match commit_url(&job, &job_commit, &ctx.dbctx) { -                Some(url) => format!("<a href=\"{}\">{}</a>", url, &job_commit), -                None => job_commit.clone() -            }; - -            let job_html = format!("<a href=\"{}\">{}</a>", job_url(&job, &job_commit, &ctx.dbctx), job.id); - -            let last_build_time = Utc.timestamp_millis_opt(run.create_time as i64).unwrap().to_rfc2822(); -            let duration = display_run_time(&run); - -            let status = format!("{:?}", run.state).to_lowercase(); - -            let result = match run.build_result { -                Some(0) => "<span style='color:green;'>pass</span>", -                Some(_) => "<span style='color:red;'>fail</span>", -                None => match run.state { -                    RunState::Pending => { "unstarted" }, -                    RunState::Started => { "<span style='color:darkgoldenrod;'>in progress</span>" }, -                    _ => { "<span style='color:red;'>unreported</span>" } -                } -            }; - -            let entries = [repo_html.as_str(), last_build_time.as_str(), job_html.as_str(), commit_html.as_str(), &duration, &status, &result]; -            let entries = entries.iter().chain(std::iter::repeat(&"")).take(headings.len()); - -            let mut row_html = String::new(); -            for entry in entries { -                row_html.push_str(&format!("<td class='row-item'>{}</td>", entry)); -            } - - -            response.push_str(&format!("<tr class=\"{}\">", ["even-row", "odd-row"][row_index])); -            response.push_str(&row_html); -            response.push_str("</tr>"); -            response.push('\n'); - -            row_num += 1; -        } - -        response.push_str("</table>\n"); -    } - -    response.push_str("</html>"); - -    (StatusCode::OK, Html(response)) -} - -async fn handle_commit_status(Path(path): Path<(String, String, String)>, State(ctx): State<WebserverState>) -> impl IntoResponse { -    eprintln!("path: {}/{}, sha {}", path.0, path.1, path.2); -    let remote_path = format!("{}/{}", path.0, path.1); -    let sha = path.2; - -    let (commit_id, sha): (u64, String) = if sha.len() >= 7 { -        match ctx.dbctx.conn.lock().unwrap() -            .query_row("select id, sha from commits where sha like ?1;", [&format!("{}%", sha)], |row| Ok((row.get_unwrap(0), row.get_unwrap(1)))) -            .optional() -            .expect("can query") { -            Some((commit_id, sha)) => (commit_id, sha), -            None => { -                return (StatusCode::NOT_FOUND, Html("<html><body>no such commit</body></html>".to_string())); -            } -        } -    } else { -        return (StatusCode::NOT_FOUND, Html("<html><body>no such commit</body></html>".to_string())); -    }; - -    let short_sha = &sha[0..9]; - -    let (remote_id, repo_id): (u64, u64) = ctx.dbctx.conn.lock().unwrap() -        .query_row("select id, repo_id from remotes where remote_path=?1;", [&remote_path], |row| Ok((row.get_unwrap(0), row.get_unwrap(1)))) -        .expect("can query"); - -    let job = ctx.dbctx.job_by_commit_id(commit_id).expect("can query").expect("job exists"); - -    let run = ctx.dbctx.last_run_for_job(job.id).expect("can query").expect("run exists"); - -    let complete_time = run.complete_time.unwrap_or_else(crate::io::now_ms); - -    let (status_elem, status_desc) = match run.state { -        RunState::Pending | RunState::Started => { -            ("<span style='color:#660;'>pending</span>", "⌛in progress") -        }, -        RunState::Finished => { -            if let Some(build_result) = run.build_result { -                if build_result == 0 { -                    ("<span style='color:green;'>pass</span>", "✅ passed") -                } else { -                    ("<span style='color:red;'>failed</span>", "❌ failed") -                } -            } else { -                eprintln!("run {} for commit {} is missing a build result but is reportedly finished (old data)?", run.id, commit_id); -                ("<span style='color:red;'>unreported</span>", "❔ missing status") -            } -        }, -        RunState::Error => { -            ("<span style='color:red;'>error</span>", "🧯 error, uncompleted") -        } -        RunState::Invalid => { -            ("<span style='color:red;'>(server error)</span>", "dude even i don't know") -        } -    }; -    let debug_info = run.state == RunState::Finished && run.build_result == Some(1) || run.state == RunState::Error; - -    let repo_name: String = ctx.dbctx.conn.lock().unwrap() -        .query_row("select repo_name from repos where id=?1;", [repo_id], |row| row.get(0)) -        .expect("can query"); - -    let deployed = false; - -    let mut head = String::new(); -    head.push_str("<head>"); -    head.push_str(&format!("<title>ci.butactuallyin.space - {}</title>", repo_name)); -    let include_og_tags = true; -    if include_og_tags { -        head.push_str("\n"); -        head.push_str(&format!("<meta property=\"og:type\" content=\"website\">\n")); -        head.push_str(&format!("<meta property=\"og:site_name\" content=\"ci.butactuallyin.space\">\n")); -        head.push_str(&format!("<meta property=\"og:url\" content=\"/{}/{}/{}\">\n", &path.0, &path.1, &sha)); -        head.push_str(&format!("<meta property=\"og:title\" contents=\"{}/{} commit {}\">", &path.0, &path.1, &short_sha)); -        let build_og_description = format!("commit {} of {}/{}, {} after {}", -            short_sha, -            path.0, path.1, -            status_desc, -            display_run_time(&run) -        ); -        head.push_str(&format!("<meta name=\"description\" content=\"{}\"\n>", build_og_description)); -        head.push_str(&format!("<meta property=\"og:description\" content=\"{}\"\n>", build_og_description)); -    } -    head.push_str("</head>\n"); -    let repo_html = format!("<a href=\"/{}\">{}</a>", &repo_name, &repo_name); -    let remote_commit_elem = format!("<a href=\"https://www.github.com/{}/commit/{}\">{}</a>", &remote_path, &sha, &sha); - -    let mut artifacts_fragment = String::new(); -    let mut artifacts: Vec<ArtifactRecord> = ctx.dbctx.artifacts_for_run(run.id, None).unwrap() -        .into_iter() // HACK: filter out artifacts for previous runs of a run. artifacts should be attached to a run, runs should be distinct from run. but i'm sleepy. -        .filter(|artifact| artifact.created_time >= run.start_time.unwrap_or_else(crate::io::now_ms)) -        .collect(); - -    artifacts.sort_by_key(|artifact| artifact.created_time); - -    fn diff_times(run_completed: u64, artifact_completed: Option<u64>) -> u64 { -        let artifact_completed = artifact_completed.unwrap_or_else(crate::io::now_ms); -        let run_completed = std::cmp::max(run_completed, artifact_completed); -        run_completed - artifact_completed -    } - -    let recent_artifacts: Vec<ArtifactRecord> = artifacts.iter().filter(|artifact| diff_times(complete_time, artifact.completed_time) <= 60_000).cloned().collect(); -    let old_artifacts: Vec<ArtifactRecord> = artifacts.iter().filter(|artifact| diff_times(complete_time, artifact.completed_time) > 60_000).cloned().collect(); - -    for artifact in old_artifacts.iter() { -        let created_time_str = Utc.timestamp_millis_opt(artifact.created_time as i64).unwrap().to_rfc2822(); -        artifacts_fragment.push_str(&format!("<div><pre style='display:inline;'>{}</pre> step: <pre style='display:inline;'>{}</pre></div>\n", created_time_str, &artifact.name)); -        let duration_str = duration_as_human_string(artifact.completed_time.unwrap_or_else(crate::io::now_ms) - artifact.created_time); -        let size_str = (std::fs::metadata(&format!("./artifacts/{}/{}", artifact.run_id, artifact.id)).expect("metadata exists").len() / 1024).to_string(); -        artifacts_fragment.push_str(&format!("<pre>  {}kb in {} </pre>\n", size_str, duration_str)); -    } - -    for artifact in recent_artifacts.iter() { -        let created_time_str = Utc.timestamp_millis_opt(artifact.created_time as i64).unwrap().to_rfc2822(); -        artifacts_fragment.push_str(&format!("<div><pre style='display:inline;'>{}</pre> step: <pre style='display:inline;'>{}</pre></div>\n", created_time_str, &artifact.name)); -        if debug_info { -            artifacts_fragment.push_str("<pre>"); -            artifacts_fragment.push_str(&std::fs::read_to_string(format!("./artifacts/{}/{}", artifact.run_id, artifact.id)).unwrap()); -            artifacts_fragment.push_str("</pre>\n"); -        } else { -            let duration_str = duration_as_human_string(artifact.completed_time.unwrap_or_else(crate::io::now_ms) - artifact.created_time); -            let size_str = std::fs::metadata(&format!("./artifacts/{}/{}", artifact.run_id, artifact.id)).map(|md| { -                (md.len() / 1024).to_string() -            }).unwrap_or_else(|e| format!("[{}]", e)); -            artifacts_fragment.push_str(&format!("<pre>  {}kb in {} </pre>\n", size_str, duration_str)); -        } -    } - -    let metrics = summarize_job_metrics(&ctx.dbctx, run.id, run.job_id).unwrap(); - -    let mut html = String::new(); -    html.push_str("<html>\n"); -    html.push_str(&format!("  {}\n", head)); -    html.push_str("  <body>\n"); -    html.push_str("    <pre>\n"); -    html.push_str(&format!("repo: {}\n", repo_html)); -    html.push_str(&format!("commit: {}, run: {}\n", remote_commit_elem, run.id)); -    html.push_str(&format!("status: {} in {}\n", status_elem, display_run_time(&run))); -    if let Some(desc) = run.final_text.as_ref() { -        html.push_str(&format!("  description: {}\n  ", desc)); -    } -    html.push_str(&format!("deployed: {}\n", deployed)); -    html.push_str("    </pre>\n"); -    if artifacts_fragment.len() > 0 { -        html.push_str("    <div>artifacts</div>\n"); -        html.push_str(&artifacts_fragment); -    } -    if let Some(metrics) = metrics { -        html.push_str(&metrics); -    } -    html.push_str("  </body>\n"); -    html.push_str("</html>"); - -    (StatusCode::OK, Html(html)) -} - -fn summarize_job_metrics(dbctx: &Arc<DbCtx>, run_id: u64, job_id: u64) -> Result<Option<String>, String> { -    let runs = dbctx.runs_for_job_one_per_host(job_id)?; - -    let mut section = String::new(); -    section.push_str("<div>\n"); -    section.push_str("<h3>metrics</h3>\n"); -    section.push_str("<table style='font-family: monospace;'>\n"); - -    if runs.len() == 1 { -        let metrics = dbctx.metrics_for_run(run_id).unwrap(); -        if metrics.is_empty() { -            return Ok(None); -        } - -        section.push_str("<tr><th>name</th><th>value</th></tr>"); -        for metric in metrics { -            section.push_str(&format!("<tr><td>{}</td><td>{}</td></tr>", &metric.name, &metric.value)); -        } -    } else { -        // very silly ordering issue: need an authoritative ordering of metrics to display metrics -        // in a consistent order across runs (though they SHOULD all be ordered the same). -        // -        // the first run might not have all metrics (first run could be on the slowest build host -        // f.ex), so for now just assume that metrics *will* be consistently ordered and build a -        // list of metrics from the longest list of metrics we've seen. builders do not support -        // concurrency so at least the per-run metrics-order-consistency assumption should hold.. -        let mut all_names: Vec<String> = Vec::new(); - -        let all_metrics: Vec<(HashMap<String, String>, HostDesc)> = runs.iter().map(|run| { -            let metrics = dbctx.metrics_for_run(run.id).unwrap(); - -            let mut metrics_map = HashMap::new(); -            for metric in metrics.into_iter() { -                if !all_names.contains(&metric.name) { -                    all_names.push(metric.name.clone()); -                } -                metrics_map.insert(metric.name, metric.value); -            } - -            let (hostname, cpu_vendor_id, cpu_family, cpu_model, cpu_max_freq_khz) = match run.host_id { -                Some(host_id) => { -                    dbctx.host_model_info(host_id).unwrap() -                } -                None => { -                    ("unknown".to_string(), "unknown".to_string(), "0".to_string(), "0".to_string(), 0) -                } -            }; - -            (metrics_map, HostDesc::from_parts(hostname, cpu_vendor_id, cpu_family, cpu_model, cpu_max_freq_khz)) -        }).collect(); - -        if all_metrics.is_empty() { -            return Ok(None); -        } - -        let mut header = "<tr><th>name</th>".to_string(); -        for (_, host) in all_metrics.iter() { -            header.push_str(&format!("<th>{}</br>{} @ {:.3}GHz</th>", &host.hostname, &host.cpu_desc, (host.cpu_max_freq_khz as f64) / 1000_000.0)); -        } -        header.push_str("</tr>\n"); -        section.push_str(&header); - -        for name in all_names.iter() { -            let mut row = format!("<tr><td>{}</td>", &name); -            for (metrics, _) in all_metrics.iter() { -                let value = metrics.get(name) -                    .map(|x| x.clone()) -                    .unwrap_or_else(String::new); -                row.push_str(&format!("<td>{}</td>", value)); -            } -            row.push_str("</tr>\n"); -            section.push_str(&row); -        } -    }; -    section.push_str("</table>\n"); -    section.push_str("</div>\n"); - -    Ok(Some(section)) -} - -async fn handle_get_artifact(Path(path): Path<(String, String)>, State(ctx): State<WebserverState>) -> impl IntoResponse { -    eprintln!("get artifact, run={}, artifact={}", path.0, path.1); -    let run: u64 = path.0.parse().unwrap(); -    let artifact_id: u64 = path.1.parse().unwrap(); - -    let artifact_descriptor = match ctx.dbctx.lookup_artifact(run, artifact_id).unwrap() { -        Some(artifact) => artifact, -        None => { -            return (StatusCode::NOT_FOUND, Html("no such artifact")).into_response(); -        } -    }; - -    let mut live_artifact = false; - -    if let Some(completed_time) = artifact_descriptor.completed_time { -        if completed_time < artifact_descriptor.created_time { -            live_artifact = true; -        } -    } else { -        live_artifact = true; -    } - -    if live_artifact { -        let (mut tx_sender, tx_receiver) = tokio::io::duplex(65536); -        let resp_body = axum_extra::body::AsyncReadBody::new(tx_receiver); -        let mut artifact_path = ctx.jobs_path.clone(); -        artifact_path.push(artifact_descriptor.run_id.to_string()); -        artifact_path.push(artifact_descriptor.id.to_string()); -        spawn(async move { -            let mut artifact = artifact_descriptor; - -            let mut artifact_file = tokio::fs::File::open(&artifact_path) -                .await -                .expect("artifact file exists?"); -            while artifact.completed_time.is_none() { -                match crate::io::forward_data(&mut artifact_file, &mut tx_sender).await { -                    Ok(()) => { -                        // reached the current EOF, wait and then commit an unspeakable sin -                        tokio::time::sleep(std::time::Duration::from_millis(250)).await; -                        // this would be much implemented as yielding on a condvar woken when an -                        // inotify event on the file indicates a write has occurred. but i am -                        // dreadfully lazy, so we'll just uhh. busy-poll on the file? lmao. -                        artifact = ctx.dbctx.lookup_artifact(artifact.run_id, artifact.id) -                            .expect("can query db") -                            .expect("artifact still exists"); -                    } -                    Err(e) => { -                        eprintln!("artifact file streaming failed: {}", e); -                    } -                } -            } - -            eprintln!("[+] artifact {} is done being written, and we've sent the whole thing. bye!", artifact.id); -        }); -        (StatusCode::OK, resp_body).into_response() -    } else { -        (StatusCode::OK, Html("all done")).into_response() -    } -} - -async fn handle_repo_summary(Path(path): Path<String>, State(ctx): State<WebserverState>) -> impl IntoResponse { -    eprintln!("get repo summary: {:?}", path); - -    let mut last_builds = Vec::new(); - -    let (repo_id, repo_name, default_run_preference): (u64, String, Option<String>) = match ctx.dbctx.conn.lock().unwrap() -        .query_row("select id, repo_name, default_run_preference from repos where repo_name=?1;", [&path], |row| Ok((row.get(0).unwrap(), row.get(1).unwrap(), row.get(2).unwrap()))) -        .optional() -        .unwrap() { -        Some(elem) => elem, -        None => { -            eprintln!("no repo named {}", path); -            return (StatusCode::NOT_FOUND, Html(String::new())); -        } -    }; - -    // TODO: display default_run_preference somehow on the web summary? - -    for remote in ctx.dbctx.remotes_by_repo(repo_id).expect("can get repo from a path") { -        let mut last_ten_jobs = ctx.dbctx.recent_jobs_from_remote(remote.id, 10).expect("can look up jobs for a repo"); -        last_builds.extend(last_ten_jobs.drain(..)); -    } -    last_builds.sort_by_key(|job| -(job.created_time as i64)); - -    let mut response = String::new(); -    response.push_str("<html>\n"); -    response.push_str(&format!("<title> ci.butactuallyin.space - {} </title>\n", repo_name)); -    response.push_str("<style>\n"); -    response.push_str(".build-table { font-family: monospace; border: 1px solid black; border-collapse: collapse; }\n"); -    response.push_str(".row-item { padding-left: 4px; padding-right: 4px; border-right: 1px solid black; }\n"); -    response.push_str(".odd-row { background: #eee; }\n"); -    response.push_str(".even-row { background: #ddd; }\n"); -    response.push_str("</style>\n"); -    response.push_str(&format!("<h1>{} build history</h1>\n", repo_name)); -    response.push_str("<a href=/>full repos index</a><p> </p>\n"); - -    response.push_str("<table class='build-table'>"); -    response.push_str("<tr>\n"); -    let headings = ["last build", "job", "build commit", "duration", "status", "result"]; -    for heading in headings { -        response.push_str(&format!("<th class='row-item'>{}</th>", heading)); -    } -    response.push_str("</tr>\n"); - -    let mut row_num = 0; - -    for job in last_builds.iter().take(10) { -        let run = ctx.dbctx.last_run_for_job(job.id).expect("query succeeds").expect("TODO: run exists if job exists (small race if querying while creating job ...."); -        let job_commit = ctx.dbctx.commit_sha(job.commit_id).expect("job has a commit"); -        let commit_html = match commit_url(&job, &job_commit, &ctx.dbctx) { -            Some(url) => format!("<a href=\"{}\">{}</a>", url, &job_commit), -            None => job_commit.clone() -        }; - -        let job_html = format!("<a href=\"{}\">{}</a>", job_url(&job, &job_commit, &ctx.dbctx), job.id); - -        let last_build_time = Utc.timestamp_millis_opt(run.create_time as i64).unwrap().to_rfc2822(); -        let duration = display_run_time(&run); - -        let status = format!("{:?}", run.state).to_lowercase(); - -        let result = match run.build_result { -            Some(0) => "<span style='color:green;'>pass</span>", -            Some(_) => "<span style='color:red;'>fail</span>", -            None => match run.state { -                RunState::Pending => { "unstarted" }, -                RunState::Started => { "<span style='color:darkgoldenrod;'>in progress</span>" }, -                _ => { "<span style='color:red;'>unreported</span>" } -            } -        }; - -        let entries = [last_build_time.as_str(), job_html.as_str(), commit_html.as_str(), &duration, &status, &result]; -        let entries = entries.iter().chain(std::iter::repeat(&"")).take(headings.len()); - -        let mut row_html = String::new(); -        for entry in entries { -            row_html.push_str(&format!("<td class='row-item'>{}</td>", entry)); -        } - -        let row_index = row_num % 2; -        response.push_str(&format!("<tr class=\"{}\">", ["even-row", "odd-row"][row_index])); -        response.push_str(&row_html); -        response.push_str("</tr>\n"); - -        row_num += 1; -    } -    response.push_str("</html>"); - -    (StatusCode::OK, Html(response)) -} - -async fn handle_repo_event(Path(path): Path<(String, String)>, headers: HeaderMap, State(ctx): State<WebserverState>, body: Bytes) -> impl IntoResponse { -    let json: Result<serde_json::Value, _> = serde_json::from_slice(&body); -    eprintln!("repo event: {:?} {:?} {:?}", path.0, path.1, headers); - -    let payload = match json { -        Ok(payload) => { payload }, -        Err(e) => { -            eprintln!("bad request: path={}/{}\nheaders: {:?}\nbody err: {:?}", path.0, path.1, headers, e);  -            return (StatusCode::BAD_REQUEST, "").into_response(); -        } -    }; - -    let sent_hmac = match headers.get("x-hub-signature-256") { -        Some(sent_hmac) => { sent_hmac.to_str().expect("valid ascii string").to_owned() }, -        None => { -            eprintln!("bad request: path={}/{}\nheaders: {:?}\nno x-hub-signature-256", path.0, path.1, headers);  -            return (StatusCode::BAD_REQUEST, "").into_response(); -        } -    }; - -    let mut hmac_ok = false; - -    for psk in PSKS.read().unwrap().iter() { -        let mut mac = Hmac::<Sha256>::new_from_slice(psk.key.as_bytes()) -            .expect("hmac can be constructed"); -        mac.update(&body); -        let result = mac.finalize().into_bytes().to_vec(); - -        // hack: skip sha256= -        let decoded = hex::decode(&sent_hmac[7..]).expect("provided hmac is valid hex"); -        if decoded == result { -            hmac_ok = true; -            break; -        } -    } - -    if !hmac_ok { -        eprintln!("bad hmac by all psks"); -        return (StatusCode::BAD_REQUEST, "").into_response(); -    } - -    let kind = match headers.get("x-github-event") { -        Some(kind) => { kind.to_str().expect("valid ascii string").to_owned() }, -        None => { -            eprintln!("bad request: path={}/{}\nheaders: {:?}\nno x-github-event", path.0, path.1, headers);  -            return (StatusCode::BAD_REQUEST, "").into_response(); -        } -    }; - -    handle_github_event(Arc::clone(&ctx.dbctx), path.0, path.1, kind, payload).await -} - - -async fn make_app_server(jobs_path: PathBuf, cfg_path: &PathBuf, db_path: &PathBuf) -> Router { -    /* - -    // GET /hello/warp => 200 OK with body "Hello, warp!" -    let hello = warp::path!("hello" / String) -        .map(|name| format!("Hello, {}!\n", name)); - -    let github_event = warp::post() -        .and(warp::path!(String / String)) -        .and_then(|owner, repo| { -            warp::header::<String>("x-github-event") -                .and(warp::body::content_length_limit(1024 * 1024)) -                .and(warp::body::json()) -                .and_then(|event, json| handle_github_event(owner, repo, event, json)) -                .recover(|e| { -                    async fn handle_rejection(err: Rejection) -> Result<impl Reply, Rejection> { -                       Ok(warp::reply::with_status("65308", StatusCode::BAD_REQUEST)) -                    } -                    handle_rejection(e) -                }) -        }); - -    let repo_status = warp::get() -        .and(warp::path!(String / String / String)) -        .map(|owner, repo, sha| format!("CI status for {}/{} commit {}\n", owner, repo, sha)); - -    let other = -            warp::post() -                .and(warp::path::full()) -                .and(warp::addr::remote()) -                .and(warp::body::content_length_limit(1024 * 1024)) -                .and(warp::body::bytes()) -                .map(move |path, addr: Option<std::net::SocketAddr>, body| { -                    println!("{}: lets see what i got {:?}, {:?}", addr.unwrap(), path, body); -                    "hello :)\n" -                }) -            .or( -                warp::get() -                    .and(warp::path::full()) -                    .and(warp::addr::remote()) -                    .map(move |path, addr: Option<std::net::SocketAddr>| { -                        println!("{}: GET to {:?}", addr.unwrap(), path); -                        "hello!\n" -                    }) -            ) -        .recover(|e| { -            async fn handle_rejection(err: Rejection) -> Result<impl Reply, std::convert::Infallible> { -               Ok(warp::reply::with_status("50834", StatusCode::BAD_REQUEST)) -            } -            handle_rejection(e) -        }); -    */ - -    async fn fallback_get(uri: Uri) -> impl IntoResponse { -        (StatusCode::OK, "get resp") -    } - -    async fn fallback_post(Path(path): Path<String>) -> impl IntoResponse { -        "post resp" -    } - -    Router::new() -        .route("/:owner/:repo/:sha", get(handle_commit_status)) -        .route("/:owner", get(handle_repo_summary)) -        .route("/:owner/:repo", post(handle_repo_event)) -        .route("/artifact/:b/:artifact_id", get(handle_get_artifact)) -        .route("/", get(handle_ci_index)) -        .fallback(fallback_get) -        .with_state(WebserverState { -            jobs_path, -            dbctx: Arc::new(DbCtx::new(cfg_path, db_path)) -        }) -} - -async fn bind_server(conf: serde_json::Value, jobs_path: PathBuf, config_path: PathBuf, db_path: PathBuf) -> std::io::Result<()> { -    let server = make_app_server(jobs_path.clone(), &config_path, &db_path).await.into_make_service(); -    use serde_json::Value; -    match conf { -        Value::String(address) => { -            axum_server::bind(address.parse().unwrap()) -                .serve(server).await -        }, -        Value::Object(map) => { -            let address = match map.get("address") { -                Some(Value::String(address)) => address.clone(), -                None => { -                    panic!("no local address"); -                }, -                other => { -                    panic!("invalid local address: {:?}", other); -                } -            }; - -            match (map.get("cert_path"), map.get("key_path")) { -                (Some(Value::String(cert_path)), Some(Value::String(key_path))) => { -                    let config = RustlsConfig::from_pem_file( -                        cert_path.clone(), -                        key_path.clone(), -                    ).await.unwrap(); -                    axum_server::bind_rustls(address.parse().unwrap(), config) -                        .serve(server).await -                }, -                (Some(_), _) | (_, Some(_)) => { -                    panic!("invalid local tls config: only one of `cert_path` or `key_path` has been provided"); -                }, -                (None, None) => { -                    axum_server::bind(address.parse().unwrap()) -                        .serve(server).await -                } -            } -        }, -        other => { -            panic!("invalid server bind config: {:?}", other); -        } -    } -} - -#[tokio::main] -async fn main() { -    tracing_subscriber::fmt::init(); - -    let mut args = std::env::args(); -    args.next().expect("first arg exists"); -    let config_path = args.next().unwrap_or("./webserver_config.json".to_string()); -    let web_config: WebserverConfig = serde_json::from_reader(std::fs::File::open(config_path).expect("file exists and is accessible")).expect("valid json for WebserverConfig"); -    let mut psks = PSKS.write().expect("can write lock"); -    *psks = web_config.psks.clone(); -    // drop write lock so we can read PSKS elsewhere WITHOUT deadlocking. -    std::mem::drop(psks); - -    let jobs_path = web_config.jobs_path.clone(); -    let config_path = web_config.config_path.clone(); -    let db_path = web_config.db_path.clone(); -    if let Some(addr_conf) = web_config.debug_addr.as_ref() { -        spawn(bind_server(addr_conf.clone(), jobs_path.clone(), config_path.clone(), db_path.clone())); -    } -    if let Some(addr_conf) = web_config.server_addr.as_ref() { -        spawn(bind_server(addr_conf.clone(), jobs_path.clone(), config_path.clone(), db_path.clone())); -    } -    loop { -        tokio::time::sleep(std::time::Duration::from_millis(1000)).await; -    } -} - -struct HostDesc { -    hostname: String, -    cpu_desc: String, -    cpu_max_freq_khz: u64, -} -impl HostDesc { -    fn from_parts(hostname: String, vendor_id: String, cpu_family: String, model: String, cpu_max_freq_khz: u64) -> Self { -        let cpu_desc = match (vendor_id.as_str(), cpu_family.as_str(), model.as_str()) { -            ("Arm Limited", "8", "0xd03") => "aarch64 A53".to_string(), -            ("GenuineIntel", "6", "85") => "x86_64 Skylake".to_string(), -            ("AuthenticAMD", "23", "113") => "x86_64 Matisse".to_string(), -            (vendor, family, model) => format!("unknown {}:{}:{}", vendor, family, model) -        }; - -        HostDesc { -            hostname, -            cpu_desc, -            cpu_max_freq_khz, -        } -    } -} diff --git a/src/notifier.rs b/src/notifier.rs deleted file mode 100644 index f9d6084..0000000 --- a/src/notifier.rs +++ /dev/null @@ -1,181 +0,0 @@ -use serde_derive::{Deserialize, Serialize}; -use std::sync::Arc; -use axum::http::StatusCode; -use lettre::transport::smtp::authentication::{Credentials, Mechanism}; -use lettre::{Message, Transport}; -use lettre::transport::smtp::extension::ClientId; -use lettre::transport::smtp::client::{SmtpConnection, TlsParametersBuilder}; -use std::time::Duration; -use std::path::Path; - -use crate::DbCtx; - -pub struct RemoteNotifier { -    pub remote_path: String, -    pub notifier: NotifierConfig, -} - -#[derive(Serialize, Deserialize)] -#[serde(untagged)] -pub enum NotifierConfig { -    GitHub { -        token: String, -    }, -    Email { -        username: String, -        password: String, -        mailserver: String, -        from: String, -        to: String, -    } -} - -impl NotifierConfig { -    pub fn github_from_file<P: AsRef<Path>>(path: P) -> Result<Self, String> { -        let path = path.as_ref(); -        let bytes = std::fs::read(path) -            .map_err(|e| format!("can't read notifier config at {}: {:?}", path.display(), e))?; -        let config = serde_json::from_slice(&bytes) -            .map_err(|e| format!("can't deserialize notifier config at {}: {:?}", path.display(), e))?; - -        if matches!(config, NotifierConfig::GitHub { .. }) { -            Ok(config) -        } else { -            Err(format!("config at {} doesn't look like a github config (but was otherwise valid?)", path.display())) -        } -    } - -    pub fn email_from_file<P: AsRef<Path>>(path: P) -> Result<Self, String> { -        let path = path.as_ref(); -        let bytes = std::fs::read(path) -            .map_err(|e| format!("can't read notifier config at {}: {:?}", path.display(), e))?; -        let config = serde_json::from_slice(&bytes) -            .map_err(|e| format!("can't deserialize notifier config at {}: {:?}", path.display(), e))?; - -        if matches!(config, NotifierConfig::Email { .. }) { -            Ok(config) -        } else { -            Err(format!("config at {} doesn't look like an email config (but was otherwise valid?)", path.display())) -        } -    } -} - -impl RemoteNotifier { -    pub async fn tell_pending_job(&self, ctx: &Arc<DbCtx>, repo_id: u64, sha: &str, job_id: u64) -> Result<(), String> { -        self.tell_job_status( -            ctx, -            repo_id, sha, job_id, -            "pending", "build is queued", &format!("https://{}/{}/{}", "ci.butactuallyin.space", &self.remote_path, sha) -        ).await -    } - -    pub async fn tell_complete_job(&self, ctx: &Arc<DbCtx>, repo_id: u64, sha: &str, job_id: u64, desc: Result<String, String>) -> Result<(), String> { -        match desc { -            Ok(status) => { -                self.tell_job_status( -                    ctx, -                    repo_id, sha, job_id, -                    "success", &status, &format!("https://{}/{}/{}", "ci.butactuallyin.space", &self.remote_path, sha) -                ).await -            }, -            Err(status) => { -                self.tell_job_status( -                    ctx, -                    repo_id, sha, job_id, -                    "failure", &status, &format!("https://{}/{}/{}", "ci.butactuallyin.space", &self.remote_path, sha) -                ).await -            } -        } -    } - -    pub async fn tell_job_status(&self, ctx: &Arc<DbCtx>, repo_id: u64, sha: &str, job_id: u64, state: &str, desc: &str, target_url: &str) -> Result<(), String> { -        match &self.notifier { -            NotifierConfig::GitHub { token } => { -                let status_info = serde_json::json!({ -                    "state": state, -                    "description": desc, -                    "target_url": target_url, -                    "context": "actuallyinspace runner", -                }); - -                // TODO: should pool (probably in ctx?) to have an upper bound in concurrent -                // connections. -                let client = reqwest::Client::new(); -                let req = client.post(&format!("https://api.github.com/repos/{}/statuses/{}", &self.remote_path, sha)) -                    .body(serde_json::to_string(&status_info).expect("can stringify json")) -                    .header("content-type", "application/json") -                    .header("user-agent", "iximeow") -                    .header("authorization", format!("Bearer {}", token)) -                    .header("accept", "application/vnd.github+json"); -                eprintln!("sending {:?}", req); -                eprintln!("  body: {}", serde_json::to_string(&status_info).expect("can stringify json")); -                let res = req -                    .send() -                    .await; - -                match res { -                    Ok(res) => { -                        if res.status() == StatusCode::OK || res.status() == StatusCode::CREATED{ -                            Ok(()) -                        } else { -                            Err(format!("bad response: {}, response data: {:?}", res.status().as_u16(), res)) -                        } -                    } -                    Err(e) => { -                        Err(format!("failure sending request: {:?}", e)) -                    } -                } -            } -            NotifierConfig::Email { username, password, mailserver, from, to } => { -                eprintln!("[.] emailing {} for job {} via {}", state, &self.remote_path, mailserver); - -                let subject = format!("{}: job for {}", state, &self.remote_path); - -                let body = format!("{}", subject); - -                // TODO: when ci.butactuallyin.space has valid certs again, ... fix this. -                let tls = TlsParametersBuilder::new(mailserver.to_string()) -                    .dangerous_accept_invalid_certs(true) -                    .build() -                    .unwrap(); - -                let mut mailer = SmtpConnection::connect( -                    mailserver, -                    Some(Duration::from_millis(5000)), -                    &ClientId::Domain("ci.butactuallyin.space".to_string()), -                    None, -                    None, -                ).unwrap(); - -                mailer.starttls( -                    &tls, -                    &ClientId::Domain("ci.butactuallyin.space".to_string()), -                ).unwrap(); - -                let resp = mailer.auth( -                    &[Mechanism::Plain, Mechanism::Login], -                    &Credentials::new(username.to_owned(), password.to_owned()) -                ).unwrap(); -                assert!(resp.is_positive()); - -                let email = Message::builder() -                    .from(from.parse().unwrap()) -                    .to(to.parse().unwrap()) -                    .subject(&subject) -                    .body(body) -                    .unwrap(); - -                match mailer.send(email.envelope(), &email.formatted()) { -                    Ok(_) => { -                        eprintln!("[+] notified {}@{}", username, mailserver); -                        Ok(()) -                    } -                    Err(e) => { -                        eprintln!("[-] could not send email: {:?}", e); -                        Err(e.to_string()) -                    } -                } -            } -        } -    } -} diff --git a/src/protocol.rs b/src/protocol.rs deleted file mode 100644 index c7a9318..0000000 --- a/src/protocol.rs +++ /dev/null @@ -1,114 +0,0 @@ -use serde::{Serialize, Deserialize}; - -#[derive(Serialize, Deserialize, Debug)] -#[serde(tag = "kind")] -#[serde(rename_all = "snake_case")] -pub enum ClientProto { -    Started, -    ArtifactCreate, -    NewTask(RequestedJob), -    NewTaskPlease { allowed_pushers: Option<Vec<String>>, host_info: HostInfo }, -    Metric { name: String, value: String }, -    Command(CommandInfo), -    TaskStatus(TaskInfo), -    Ping, -    Pong, -} - -#[derive(Serialize, Deserialize, Debug)] -#[serde(tag = "command_info")] -#[serde(rename_all = "snake_case")] -pub enum CommandInfo { -    Started { command: Vec<String>, cwd: Option<String>, id: u32 }, -    Finished { exit_code: Option<i32>, id: u32 }, -} - -#[derive(Serialize, Deserialize, Debug)] -#[serde(tag = "task_info")] -#[serde(rename_all = "snake_case")] -pub enum TaskInfo { -    Finished { status: String }, -    Interrupted { status: String, description: Option<String> }, -} - -impl ClientProto { -    pub fn metric(name: impl Into<String>, value: impl Into<String>) -> Self { -        ClientProto::Metric { name: name.into(), value: value.into() } -    } - -    pub fn command(state: CommandInfo) -> Self { -        ClientProto::Command(state) -    } - -    pub fn new_task_please(allowed_pushers: Option<Vec<String>>, host_info: HostInfo) -> Self { -        ClientProto::NewTaskPlease { allowed_pushers, host_info } -    } - -    pub fn task_status(state: TaskInfo) -> Self { -        ClientProto::TaskStatus(state) -    } - -    pub fn new_task(task: RequestedJob) -> Self { -        ClientProto::NewTask(task) -    } -} - -impl CommandInfo { -    pub fn started(command: impl Into<Vec<String>>, cwd: Option<&str>, id: u32) -> Self { -        CommandInfo::Started { command: command.into(), cwd: cwd.map(ToOwned::to_owned), id } -    } - -    pub fn finished(exit_code: Option<i32>, id: u32) -> Self { -        CommandInfo::Finished { exit_code, id } -    } -} - -impl TaskInfo { -    pub fn finished(status: impl Into<String>) -> Self { -        TaskInfo::Finished { status: status.into() } -    } - -    pub fn interrupted(status: impl Into<String>, description: impl Into<Option<String>>) -> Self { -        TaskInfo::Interrupted { status: status.into(), description: description.into() } -    } -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct HostInfo { -    pub hostname: String, -    pub cpu_info: CpuInfo, -    pub memory_info: MemoryInfo, -    pub env_info: EnvInfo, -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct CpuInfo { -    pub model_name: String, -    pub microcode: String, -    pub cores: u32, -    pub vendor_id: String, -    pub family: String, -    pub model: String, -    // clock speed in khz -    pub max_freq: u64, -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct MemoryInfo { -    pub total: String, -    pub available: String, -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct EnvInfo { -    pub arch: String, -    pub family: String, -    pub os: String, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct RequestedJob { -    pub commit: String, -    pub remote_url: String, -    pub build_token: String, -} diff --git a/src/sql.rs b/src/sql.rs deleted file mode 100644 index 1071279..0000000 --- a/src/sql.rs +++ /dev/null @@ -1,229 +0,0 @@ -#![allow(dead_code)] - -use std::convert::TryFrom; - -use crate::dbctx::Run; -use crate::dbctx::Job; - -#[derive(Debug, Clone)] -pub enum JobResult { -    Pass = 0, -    Fail = 1, -} - -#[derive(Debug, Copy, Clone, PartialEq)] -pub enum RunState { -    Pending = 0, -    Started = 1, -    Finished = 2, -    Error = 3, -    Invalid = 4, -} - -impl TryFrom<u8> for RunState { -    type Error = String; - -    fn try_from(value: u8) -> Result<Self, String> { -        match value { -            0 => Ok(RunState::Pending), -            1 => Ok(RunState::Started), -            2 => Ok(RunState::Finished), -            3 => Ok(RunState::Error), -            4 => Ok(RunState::Invalid), -            other => Err(format!("invalid job state: {}", other)), -        } -    } -} - -pub(crate) fn row2run(row: &rusqlite::Row) -> Run { -    let (id, job_id, artifacts_path, state, host_id, build_token, create_time, start_time, complete_time, run_timeout, build_result, final_text) = row.try_into().unwrap(); -    let state: u8 = state; -    Run { -        id, -        job_id, -        artifacts_path, -        state: state.try_into().unwrap(), -        host_id, -        create_time, -        start_time, -        complete_time, -        build_token, -        run_timeout, -        build_result, -        final_text, -    } -} - -// remote_id is the remote from which we were notified. this is necessary so we know which remote -// to pull from to actually run the job. -pub const CREATE_JOBS_TABLE: &'static str = "\ -    CREATE TABLE IF NOT EXISTS jobs (id INTEGER PRIMARY KEY AUTOINCREMENT, -        source TEXT, -        created_time INTEGER, -        remote_id INTEGER, -        commit_id INTEGER, -        run_preferences TEXT);"; - -pub const CREATE_METRICS_TABLE: &'static str = "\ -    CREATE TABLE IF NOT EXISTS metrics (id INTEGER PRIMARY KEY AUTOINCREMENT, -        job_id INTEGER, -        name TEXT, -        value TEXT, -        UNIQUE(job_id, name) -    );"; - -pub const CREATE_COMMITS_TABLE: &'static str = "\ -    CREATE TABLE IF NOT EXISTS commits (id INTEGER PRIMARY KEY AUTOINCREMENT, sha TEXT UNIQUE);"; - -pub const CREATE_REPOS_TABLE: &'static str = "\ -    CREATE TABLE IF NOT EXISTS repos (id INTEGER PRIMARY KEY AUTOINCREMENT, -        repo_name TEXT, -        default_run_preference TEXT);"; - -// remote_api is `github` or NULL for now. hopefully a future cgit-style notifier one day. -// remote_path is some unique identifier for the relevant remote. -// * for `github` remotes, this will be `owner/repo`. -// * for others.. who knows. -// remote_url is a url for human interaction with the remote (think https://git.iximeow.net/zvm) -// remote_git_url is a url that can be `git clone`'d to fetch sources -pub const CREATE_REMOTES_TABLE: &'static str = "\ -    CREATE TABLE IF NOT EXISTS remotes (id INTEGER PRIMARY KEY AUTOINCREMENT, -        repo_id INTEGER, -        remote_path TEXT, -        remote_api TEXT, -        remote_url TEXT, -        remote_git_url TEXT, -        notifier_config_path TEXT);"; - -pub const CREATE_ARTIFACTS_TABLE: &'static str = "\ -    CREATE TABLE IF NOT EXISTS artifacts (id INTEGER PRIMARY KEY AUTOINCREMENT, -        run_id INTEGER, -        name TEXT, -        desc TEXT, -        created_time INTEGER, -        completed_time INTEGER);"; - -pub const CREATE_RUNS_TABLE: &'static str = "\ -    CREATE TABLE IF NOT EXISTS runs (id INTEGER PRIMARY KEY AUTOINCREMENT, -        job_id INTEGER, -        artifacts_path TEXT, -        state INTEGER NOT NULL, -        host_id INTEGER, -        build_token TEXT, -        created_time INTEGER, -        started_time INTEGER, -        complete_time INTEGER, -        run_timeout INTEGER, -        build_result INTEGER, -        final_status TEXT);"; - -pub const CREATE_HOSTS_TABLE: &'static str = "\ -    CREATE TABLE IF NOT EXISTS hosts (id INTEGER PRIMARY KEY AUTOINCREMENT, -        hostname TEXT, -        cpu_vendor_id TEXT, -        cpu_model_name TEXT, -        cpu_family TEXT, -        cpu_model TEXT, -        cpu_microcode TEXT, -        cpu_max_freq_khz INTEGER, -        cpu_cores INTEGER, -        mem_total TEXT, -        arch TEXT, -        family TEXT, -        os TEXT, -        UNIQUE(hostname, cpu_vendor_id, cpu_model_name, cpu_family, cpu_model, cpu_microcode, cpu_cores, mem_total, arch, family, os));"; - -pub const CREATE_REMOTES_INDEX: &'static str = "\ -    CREATE INDEX IF NOT EXISTS 'repo_to_remote' ON remotes(repo_id);"; - -pub const CREATE_REPO_NAME_INDEX: &'static str = "\ -    CREATE UNIQUE INDEX IF NOT EXISTS 'repo_names' ON repos(repo_name);"; - -pub const PENDING_RUNS: &'static str = "\ -    select id, job_id, created_time, host_preference from runs where state=0 and (host_preference=?1 or host_preference is null) order by created_time desc;"; - -pub const JOBS_NEEDING_HOST_RUN: &'static str = "\ -    select jobs.id, jobs.source, jobs.created_time, jobs.remote_id, jobs.commit_id, jobs.run_preferences from jobs \ -    where jobs.run_preferences=\"all\" and jobs.created_time > ?1 \ -    and not exists \ -        (select 1 from runs r2 where r2.job_id = jobs.id and r2.host_id = ?2);"; - -pub const ACTIVE_RUNS: &'static str = "\ -    select id, -        job_id, -        artifacts_path, -        state, -        host_id, -        build_token, -        created_time, -        started_time, -        complete_time, -        run_timeout, -        build_result, -        final_status from runs where state=1 or state=0;"; - -pub const LAST_ARTIFACTS_FOR_RUN: &'static str = "\ -    select * from artifacts where run_id=?1 and (name like \"%(stderr)%\" or name like \"%(stdout)%\") order by id desc limit ?2;"; - -pub const JOB_BY_COMMIT_ID: &'static str = "\ -    select id, source, created_time, remote_id, commit_id, run_preferences from jobs where commit_id=?1;"; - -pub const ARTIFACT_BY_ID: &'static str = "\ -    select * from artifacts where id=?1 and run_id=?2;"; - -pub const JOB_BY_ID: &'static str = "\ -    select id, source, created_time, remote_id, commit_id, run_preferences from jobs where id=?1"; - -pub const METRICS_FOR_RUN: &'static str = "\ -    select * from metrics where run_id=?1 order by id asc;"; - -pub const METRICS_FOR_JOB: &'static str = "\ -    select metrics.id, metrics.run_id, metrics.name, metrics.value from metrics \ -    join runs on runs.id=metrics.run_id \ -    where runs.job_id=?1 \ -    order by metrics.run_id desc, metrics.id desc;"; - -pub const COMMIT_TO_ID: &'static str = "\ -    select id from commits where sha=?1;"; - -pub const REMOTES_FOR_REPO: &'static str = "\ -    select * from remotes where repo_id=?1;"; - -pub const ALL_REPOS: &'static str = "\ -    select id, repo_name, default_run_preference from repos;"; - -pub const LAST_JOBS_FROM_REMOTE: &'static str = "\ -    select id, source, created_time, remote_id, commit_id, run_preferences from jobs where remote_id=?1 order by created_time desc limit ?2;"; - -pub const LAST_RUN_FOR_JOB: &'static str = "\ -    select id, -        job_id, -        artifacts_path, -        state, -        host_id, -        build_token, -        created_time, -        started_time, -        complete_time, -        run_timeout, -        build_result, -        final_status from runs where job_id=?1 order by started_time desc limit 1;"; - -pub const RUNS_FOR_JOB: &'static str = "\ -    select id, -        job_id, -        artifacts_path, -        state, -        host_id, -        build_token, -        created_time, -        started_time, -        complete_time, -        run_timeout, -        build_result, -        final_status from runs where job_id=?1 group by host_id order by started_time desc, state asc;"; - -pub const SELECT_ALL_RUNS_WITH_JOB_INFO: &'static str = "\ -    select jobs.id as job_id, runs.id as run_id, runs.state, runs.created_time, jobs.commit_id, jobs.run_preferences -    from jobs join runs on jobs.id=runs.job_id -    oder by runs.created_time asc;"; | 
