diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/ci_ctl.rs | 235 | ||||
-rw-r--r-- | src/ci_driver.rs | 630 | ||||
-rw-r--r-- | src/ci_runner.rs | 668 | ||||
-rw-r--r-- | src/dbctx.rs | 772 | ||||
-rw-r--r-- | src/io.rs | 181 | ||||
-rw-r--r-- | src/lua/mod.rs | 411 | ||||
-rw-r--r-- | src/main.rs | 1092 | ||||
-rw-r--r-- | src/notifier.rs | 181 | ||||
-rw-r--r-- | src/protocol.rs | 114 | ||||
-rw-r--r-- | src/sql.rs | 229 |
10 files changed, 0 insertions, 4513 deletions
diff --git a/src/ci_ctl.rs b/src/ci_ctl.rs deleted file mode 100644 index f0ffa62..0000000 --- a/src/ci_ctl.rs +++ /dev/null @@ -1,235 +0,0 @@ -use clap::{Parser, Subcommand}; - -mod sql; -mod dbctx; -mod notifier; -mod io; -mod protocol; - -use dbctx::DbCtx; -use notifier::NotifierConfig; - -#[derive(Parser)] -#[command(version, about, long_about = None)] -struct Args { - /// path to a database to manage (defaults to "./state.db") - db_path: Option<String>, - - /// path to where configs should be found (defaults to "./config") - config_path: Option<String>, - - #[command(subcommand)] - command: Command, -} - -#[derive(Subcommand)] -enum Command { - /// add _something_ to the db - Add { - #[command(subcommand)] - what: AddItem, - }, - - /// make sure that the state looks reasonable. - /// - /// currently, ensure that all notifiers have a config, that config references an existing - /// file, and that the referenced file is valid. - Validate, - - /// do something with jobs - Job { - #[command(subcommand)] - what: JobAction, - }, -} - -#[derive(Subcommand)] -enum JobAction { - List, - Rerun { - which: u32 - }, - RerunCommit { - commit: String - }, - Create { - repo: String, - commit: String, - pusher_email: String, - } -} - -#[derive(Subcommand)] -enum AddItem { - Repo { - name: String, - remote: Option<String>, - remote_kind: Option<String>, - config: Option<String>, - }, - Remote { - repo_name: String, - remote: String, - remote_kind: String, - config: String, - }, -} - -fn main() { - let args = Args::parse(); - - let db_path = args.db_path.unwrap_or_else(|| "./state.db".to_owned()); - let config_path = args.config_path.unwrap_or_else(|| "./config".to_owned()); - - match args.command { - Command::Job { what } => { - match what { - JobAction::List => { - let db = DbCtx::new(&config_path, &db_path); - let mut conn = db.conn.lock().unwrap(); - let mut query = conn.prepare(crate::sql::SELECT_ALL_RUNS_WITH_JOB_INFO).unwrap(); - let mut jobs = query.query([]).unwrap(); - while let Some(row) = jobs.next().unwrap() { - let (job_id, run_id, state, created_time, commit_id, run_preferences): (u64, u64, u64, u64, u64, Option<String>) = row.try_into().unwrap(); - - eprint!("[+] {:04} ({:04}) | {: >8?} | {} | {}", run_id, job_id, state, created_time, commit_id); - if let Some(run_preferences) = run_preferences { - eprintln!(" | run preference: {}", run_preferences); - } else { - eprintln!(""); - } - } - eprintln!("jobs"); - }, - JobAction::Rerun { which } => { - let db = DbCtx::new(&config_path, &db_path); - let task_id = db.new_run(which as u64, None).expect("db can be queried").id; - eprintln!("[+] rerunning job {} as task {}", which, task_id); - } - JobAction::RerunCommit { commit } => { - let db = DbCtx::new(&config_path, &db_path); - let job_id = db.job_for_commit(&commit).unwrap(); - if let Some(job_id) = job_id { - let task_id = db.new_run(job_id, None).expect("db can be queried").id; - eprintln!("[+] rerunning job {} (commit {}) as task {}", job_id, commit, task_id); - } else { - eprintln!("[-] no job for commit {}", commit); - } - } - JobAction::Create { repo, commit, pusher_email } => { - let db = DbCtx::new(&config_path, &db_path); - let parts = repo.split(":").collect::<Vec<&str>>(); - let (remote_kind, repo_path) = (parts[0], parts[1]); - let remote = match db.remote_by_path_and_api(&remote_kind, &repo_path).expect("can query") { - Some(remote) => remote, - None => { - eprintln!("[-] no remote registered as {}:{}", remote_kind, repo_path); - return; - } - }; - - let repo_default_run_pref: Option<String> = db.conn.lock().unwrap() - .query_row("select default_run_preference from repos where id=?1;", [remote.repo_id], |row| { - Ok((row.get(0)).unwrap()) - }) - .expect("can query"); - - let job_id = db.new_job(remote.id, &commit, Some(&pusher_email), repo_default_run_pref).expect("can create"); - let _ = db.new_run(job_id, None).unwrap(); - } - } - }, - Command::Add { what } => { - match what { - AddItem::Repo { name, remote, remote_kind, config } => { - let remote_config = match (remote, remote_kind, config) { - (Some(remote), Some(remote_kind), Some(config_path)) => { - // do something - if remote_kind != "github" { - eprintln!("unknown remote kind: {}", remote); - return; - } - Some((remote, remote_kind, config_path)) - }, - (None, None, None) => { - None - }, - _ => { - eprintln!("when specifying a remote, `remote`, `remote_kind`, and `config_path` must either all be provided together or not at all"); - return; - } - }; - - let db = DbCtx::new(&config_path, &db_path); - let repo_id = match db.new_repo(&name) { - Ok(repo_id) => repo_id, - Err(e) => { - if e.contains("UNIQUE constraint failed") { - eprintln!("[!] repo '{}' already exists", name); - return; - } else { - eprintln!("[!] failed to create repo entry: {}", e); - return; - } - } - }; - println!("[+] new repo created: '{}' id {}", &name, repo_id); - if let Some((remote, remote_kind, config_path)) = remote_config { - let full_config_file_path = format!("{}/{}", &db.config_path.display(), config_path); - let config = match remote_kind.as_ref() { - "github" => { - assert!(NotifierConfig::github_from_file(&full_config_file_path).is_ok()); - } - "github-email" => { - assert!(NotifierConfig::email_from_file(&full_config_file_path).is_ok()); - } - other => { - panic!("[-] notifiers for '{}' remotes are not supported", other); - } - }; - db.new_remote(repo_id, remote.as_str(), remote_kind.as_str(), config_path.as_str()).unwrap(); - println!("[+] new remote created: repo '{}', {} remote at {}", &name, remote_kind, remote); - match remote_kind.as_str() { - "github" => { - println!("[!] now go make sure your github repo has a webhook set for `https://ci.butactuallyin.space/{}` to receive at least the `push` event.", remote.as_str()); - println!(" the secret sent with calls to this webhook should be the same preshared secret as the CI server is configured to know."); - } - _ => { } - } - } - }, - AddItem::Remote { repo_name, remote, remote_kind, config } => { - let db = DbCtx::new(&config_path, &db_path); - let repo_id = match db.repo_id_by_name(&repo_name) { - Ok(Some(id)) => id, - Ok(None) => { - eprintln!("[-] repo '{}' does not exist", repo_name); - return; - }, - Err(e) => { - eprintln!("[!] couldn't look up repo '{}': {:?}", repo_name, e); - return; - } - }; - let config_file = format!("{}/{}", config_path, config); - match remote_kind.as_ref() { - "github" => { - NotifierConfig::github_from_file(&config_file).unwrap(); - } - "github-email" => { - NotifierConfig::email_from_file(&config_file).unwrap(); - } - other => { - panic!("notifiers for '{}' remotes are not supported", other); - } - }; - db.new_remote(repo_id, remote.as_str(), remote_kind.as_str(), config.as_str()).unwrap(); - println!("[+] new remote created: repo '{}', {} remote at {}", &repo_name, remote_kind, remote); - }, - } - }, - Command::Validate => { - println!("ok"); - } - } -} diff --git a/src/ci_driver.rs b/src/ci_driver.rs deleted file mode 100644 index f6c1828..0000000 --- a/src/ci_driver.rs +++ /dev/null @@ -1,630 +0,0 @@ -use std::process::Command; -use std::collections::HashMap; -use std::sync::{Mutex, RwLock}; -use lazy_static::lazy_static; -use std::io::Read; -use serde_derive::{Deserialize, Serialize}; -use futures_util::StreamExt; -use std::fmt; -use std::path::{Path, PathBuf}; -use tokio::spawn; -use tokio_stream::wrappers::ReceiverStream; -use std::sync::{Arc, Weak}; -use std::time::{SystemTime, UNIX_EPOCH}; -use axum_server::tls_rustls::RustlsConfig; -use axum::body::StreamBody; -use axum::http::{StatusCode}; -use hyper::HeaderMap; -use axum::Router; -use axum::routing::*; -use axum::extract::State; -use axum::extract::BodyStream; -use axum::response::IntoResponse; -use tokio::sync::mpsc; -use tokio::sync::mpsc::error::TrySendError; -use serde_json::json; - -mod dbctx; -mod sql; -mod notifier; -mod io; -mod protocol; - -use crate::dbctx::{DbCtx, PendingRun, Job, Run}; -use crate::sql::JobResult; -use crate::sql::RunState; -use crate::protocol::{ClientProto, CommandInfo, TaskInfo, RequestedJob}; - -lazy_static! { - static ref AUTH_SECRET: RwLock<Option<String>> = RwLock::new(None); - static ref ACTIVE_TASKS: Mutex<HashMap<u64, Weak<()>>> = Mutex::new(HashMap::new()); -} - -fn reserve_artifacts_dir(run: u64) -> std::io::Result<PathBuf> { - let mut path: PathBuf = "/root/ixi_ci_server/artifacts/".into(); - path.push(run.to_string()); - match std::fs::create_dir(&path) { - Ok(()) => { - Ok(path) - }, - Err(e) => { - if e.kind() == std::io::ErrorKind::AlreadyExists { - Ok(path) - } else { - Err(e) - } - } - } -} - -async fn activate_run(dbctx: Arc<DbCtx>, candidate: RunnerClient, job: &Job, run: &PendingRun) -> Result<(), String> { - eprintln!("activating task {:?}", run); - - let now = SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("now is before epoch") - .as_millis(); - - let remote = dbctx.remote_by_id(job.remote_id).expect("query succeeds").expect("job has remote"); - let repo = dbctx.repo_by_id(remote.repo_id).expect("query succeeds").expect("remote has repo"); - - let commit_sha = dbctx.commit_sha(job.commit_id).expect("query succeeds"); - - let artifacts: PathBuf = reserve_artifacts_dir(run.id).expect("can reserve a directory for artifacts"); - - eprintln!("running {}", &repo.name); - - let res = candidate.submit(&dbctx, &run, &remote.remote_git_url, &commit_sha).await; - - let mut client_job = match res { - Ok(Some(mut client_job)) => { client_job } - Ok(None) => { - return Err("client hung up instead of acking task".to_string()); - } - Err(e) => { - // failed to submit job, move on for now - return Err(format!("failed to submit task: {:?}", e)); - } - }; - - let host_id = client_job.client.host_id; - - let connection = dbctx.conn.lock().unwrap(); - connection.execute( - "update runs set started_time=?1, host_id=?2, state=1, artifacts_path=?3, build_token=?4 where id=?5", - (now as u64, host_id, format!("{}", artifacts.display()), &client_job.client.build_token, run.id) - ) - .expect("can update"); - std::mem::drop(connection); - - spawn(async move { - client_job.run().await - }); - - Ok(()) -} - -struct RunnerClient { - tx: mpsc::Sender<Result<String, String>>, - rx: BodyStream, - host_id: u32, - build_token: String, - accepted_sources: Option<Vec<String>>, -} - -fn token_for_job() -> String { - let mut data = [0u8; 32]; - std::fs::File::open("/dev/urandom") - .unwrap() - .read_exact(&mut data) - .unwrap(); - - base64::encode(data) -} - -struct ClientJob { - dbctx: Arc<DbCtx>, - remote_git_url: String, - sha: String, - task: PendingRun, - client: RunnerClient, - // exists only as confirmation this `ClientJob` is somewhere, still alive and being processed. - task_witness: Arc<()>, -} - -impl ClientJob { - pub async fn run(&mut self) { - loop { - eprintln!("waiting on response.."); - let msg = match self.client.recv_typed::<ClientProto>().await.expect("recv works") { - Some(msg) => msg, - None => { - eprintln!("client hung up. task's done, i hope?"); - return; - } - }; - eprintln!("got {:?}", msg); - match msg { - ClientProto::NewTaskPlease { allowed_pushers, host_info } => { - eprintln!("misdirected task request (after handshake?)"); - return; - } - ClientProto::TaskStatus(task_info) => { - let (result, state): (Result<String, String>, RunState) = match task_info { - TaskInfo::Finished { status } => { - eprintln!("task update: state is finished and result is {}", status); - match status.as_str() { - "pass" => { - (Ok("success".to_string()), RunState::Finished) - }, - other => { - eprintln!("unhandled task completion status: {}", other); - (Err(other.to_string()), RunState::Error) - } - } - }, - TaskInfo::Interrupted { status, description } => { - eprintln!("task update: state is interrupted and result is {}", status); - let desc = description.unwrap_or_else(|| status.clone()); - (Err(desc), RunState::Error) - } - }; - - let job = self.dbctx.job_by_id(self.task.job_id).expect("can query").expect("job exists"); - let repo_id = self.dbctx.repo_id_by_remote(job.remote_id).unwrap().expect("remote exists"); - - for notifier in self.dbctx.notifiers_by_repo(repo_id).expect("can get notifiers") { - if let Err(e) = notifier.tell_complete_job(&self.dbctx, repo_id, &self.sha, self.task.id, result.clone()).await { - eprintln!("could not notify {:?}: {:?}", notifier.remote_path, e); - } - } - - let now = SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("now is before epoch") - .as_millis(); - - let build_result = if result.is_ok() { - JobResult::Pass - } else { - JobResult::Fail - }; - let result_desc = match result { - Ok(msg) => msg, - Err(msg) => msg, - }; - - self.dbctx.conn.lock().unwrap().execute( - "update runs set complete_time=?1, state=?2, build_result=?3, final_status=?4 where id=?5", - (now as u64, state as u64, build_result as u8, result_desc, self.task.id) - ) - .expect("can update"); - } - ClientProto::ArtifactCreate => { - eprintln!("creating artifact"); - self.client.send(serde_json::json!({ - "status": "ok", - "object_id": "10", - })).await.unwrap(); - }, - ClientProto::Metric { name, value } => { - self.dbctx.insert_metric(self.task.id, &name, &value) - .expect("TODO handle metric insert error?"); - } - ClientProto::Command(_command_info) => { - // record information about commands, start/stop, etc. probably also allow - // artifacts to be attached to commands and default to attaching stdout/stderr? - } - other => { - eprintln!("unhandled message {:?}", other); - } - } - } - } -} - -impl RunnerClient { - async fn new(sender: mpsc::Sender<Result<String, String>>, resp: BodyStream, accepted_sources: Option<Vec<String>>, host_id: u32) -> Result<Self, String> { - let token = token_for_job(); - let client = RunnerClient { - tx: sender, - rx: resp, - host_id, - build_token: token, - accepted_sources, - }; - Ok(client) - } - - async fn test_connection(&mut self) -> Result<(), String> { - self.send_typed(&ClientProto::Ping).await?; - let resp = self.recv_typed::<ClientProto>().await?; - match resp { - Some(ClientProto::Pong) => { - Ok(()) - } - Some(other) => { - Err(format!("unexpected connection test response: {:?}", other)) - } - None => { - Err("client hung up".to_string()) - } - } - } - - async fn send(&mut self, msg: serde_json::Value) -> Result<(), String> { - self.send_typed(&msg).await - } - - async fn send_typed<T: serde::Serialize>(&mut self, msg: &T) -> Result<(), String> { - self.tx.send(Ok(serde_json::to_string(msg).unwrap())) - .await - .map_err(|e| e.to_string()) - } - - async fn recv(&mut self) -> Result<Option<serde_json::Value>, String> { - match self.rx.next().await { - Some(Ok(bytes)) => { - serde_json::from_slice(&bytes) - .map(Option::Some) - .map_err(|e| e.to_string()) - }, - Some(Err(e)) => { - eprintln!("e: {:?}", e); - Err(format!("no client job: {:?}", e)) - }, - None => { - Ok(None) - } - } - } - - async fn recv_typed<T: serde::de::DeserializeOwned>(&mut self) -> Result<Option<T>, String> { - let json = self.recv().await?; - Ok(json.map(|v| serde_json::from_value(v).unwrap())) - } - - // is this client willing to run the job based on what it has told us so far? - fn will_accept(&self, job: &Job) -> bool { - match (job.source.as_ref(), self.accepted_sources.as_ref()) { - (_, None) => true, - (None, Some(_)) => false, - (Some(source), Some(accepted_sources)) => { - accepted_sources.contains(source) - } - } - } - - async fn submit(mut self, dbctx: &Arc<DbCtx>, job: &PendingRun, remote_git_url: &str, sha: &str) -> Result<Option<ClientJob>, String> { - self.send_typed(&ClientProto::new_task(RequestedJob { - commit: sha.to_string(), - remote_url: remote_git_url.to_string(), - build_token: self.build_token.to_string(), - })).await?; - match self.recv_typed::<ClientProto>().await { - Ok(Some(ClientProto::Started)) => { - let task_witness = Arc::new(()); - ACTIVE_TASKS.lock().unwrap().insert(job.id, Arc::downgrade(&task_witness)); - Ok(Some(ClientJob { - task: job.clone(), - dbctx: Arc::clone(dbctx), - sha: sha.to_string(), - remote_git_url: remote_git_url.to_string(), - client: self, - task_witness, - })) - } - Ok(Some(resp)) => { - eprintln!("invalid response: {:?}", resp); - Err("client rejected job".to_string()) - } - Ok(None) => { - Ok(None) - } - Err(e) => { - eprintln!("e: {:?}", e); - Err(e) - } - } - } -} - -impl fmt::Debug for RunnerClient { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.write_str("RunnerClient { .. }") - } -} - -#[axum_macros::debug_handler] -async fn handle_artifact(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClient>)>, headers: HeaderMap, artifact_content: BodyStream) -> impl IntoResponse { - eprintln!("artifact request"); - let run_token = match headers.get("x-task-token") { - Some(run_token) => run_token.to_str().expect("valid string"), - None => { - eprintln!("bad artifact post: headers: {:?}\nno x-tasak-token", headers); - return (StatusCode::BAD_REQUEST, "").into_response(); - } - }; - - let (run, artifact_path, token_validity) = match ctx.0.run_for_token(&run_token).unwrap() { - Some(result) => result, - None => { - eprintln!("bad artifact post: headers: {:?}\nrun token is not known", headers); - return (StatusCode::BAD_REQUEST, "").into_response(); - } - }; - - if token_validity != dbctx::TokenValidity::Valid { - eprintln!("bad artifact post: headers: {:?}. token is not valid: {:?}", headers, token_validity); - return (StatusCode::BAD_REQUEST, "").into_response(); - } - - let artifact_path = if let Some(artifact_path) = artifact_path { - artifact_path - } else { - eprintln!("bad artifact post: headers: {:?}. no artifact path?", headers); - return (StatusCode::BAD_REQUEST, "").into_response(); - }; - - let artifact_name = match headers.get("x-artifact-name") { - Some(artifact_name) => artifact_name.to_str().expect("valid string"), - None => { - eprintln!("bad artifact post: headers: {:?}\nno x-artifact-name", headers); - return (StatusCode::BAD_REQUEST, "").into_response(); - } - }; - - let artifact_desc = match headers.get("x-artifact-desc") { - Some(artifact_desc) => artifact_desc.to_str().expect("valid string"), - None => { - eprintln!("bad artifact post: headers: {:?}\nno x-artifact-desc", headers); - return (StatusCode::BAD_REQUEST, "").into_response(); - } - }; - - let mut artifact = match ctx.0.reserve_artifact(run, artifact_name, artifact_desc).await { - Ok(artifact) => artifact, - Err(err) => { - eprintln!("failure to reserve artifact: {:?}", err); - return (StatusCode::INTERNAL_SERVER_ERROR, "").into_response(); - } - }; - - eprintln!("spawning task..."); - let dbctx_ref = Arc::clone(&ctx.0); - spawn(async move { - artifact.store_all(artifact_content).await.unwrap(); - dbctx_ref.finalize_artifact(artifact.artifact_id).await.unwrap(); - }); - eprintln!("done?"); - - (StatusCode::OK, "").into_response() -} - -#[derive(Serialize, Deserialize)] -struct WorkRequest { - kind: String, - accepted_pushers: Option<Vec<String>> -} - -async fn handle_next_job(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClient>)>, headers: HeaderMap, mut job_resp: BodyStream) -> impl IntoResponse { - let auth_token = match headers.get("authorization") { - Some(token) => { - if Some(token.to_str().unwrap_or("")) != AUTH_SECRET.read().unwrap().as_ref().map(|x| &**x) { - eprintln!("BAD AUTH SECRET SUBMITTED: {:?}", token); - return (StatusCode::BAD_REQUEST, "").into_response(); - } - } - None => { - eprintln!("bad artifact post: headers: {:?}\nno authorization", headers); - return (StatusCode::BAD_REQUEST, "").into_response(); - } - }; - - let (tx_sender, tx_receiver) = mpsc::channel(8); - let resp_body = StreamBody::new(ReceiverStream::new(tx_receiver)); - tx_sender.send(Ok("hello".to_string())).await.expect("works"); - - let request = job_resp.next().await.expect("request chunk").expect("chunk exists"); - let request = std::str::from_utf8(&request).unwrap(); - let request: ClientProto = match serde_json::from_str(&request) { - Ok(v) => v, - Err(e) => { - eprintln!("couldn't parse work request: {:?}", e); - return (StatusCode::MISDIRECTED_REQUEST, resp_body).into_response(); - } - }; - let (accepted_pushers, host_info) = match request { - ClientProto::NewTaskPlease { allowed_pushers, host_info } => (allowed_pushers, host_info), - other => { - eprintln!("bad request kind: {:?}", &other); - return (StatusCode::MISDIRECTED_REQUEST, resp_body).into_response(); - } - }; - - eprintln!("client identifies itself as {:?}", host_info); - - let host_info_id = ctx.0.id_for_host(&host_info).expect("can get a host info id"); - - let client = match RunnerClient::new(tx_sender, job_resp, accepted_pushers, host_info_id).await { - Ok(v) => v, - Err(e) => { - eprintln!("unable to register client"); - return (StatusCode::MISDIRECTED_REQUEST, resp_body).into_response(); - } - }; - - match ctx.1.try_send(client) { - Ok(()) => { - eprintln!("client requested work..."); - return (StatusCode::OK, resp_body).into_response(); - } - Err(TrySendError::Full(client)) => { - return (StatusCode::IM_A_TEAPOT, resp_body).into_response(); - } - Err(TrySendError::Closed(client)) => { - panic!("client holder is gone?"); - } - } -} - -async fn make_api_server(dbctx: Arc<DbCtx>) -> (Router, mpsc::Receiver<RunnerClient>) { - let (pending_client_sender, pending_client_receiver) = mpsc::channel(8); - - let router = Router::new() - .route("/api/next_job", post(handle_next_job)) - .route("/api/artifact", post(handle_artifact)) - .with_state((dbctx, pending_client_sender)); - (router, pending_client_receiver) -} - -#[derive(Deserialize, Serialize)] -struct DriverConfig { - cert_path: PathBuf, - key_path: PathBuf, - config_path: PathBuf, - db_path: PathBuf, - server_addr: String, - auth_secret: String, -} - -#[tokio::main] -async fn main() { - tracing_subscriber::fmt::init(); - - let mut args = std::env::args(); - args.next().expect("first arg exists"); - let config_path = args.next().unwrap_or("./driver_config.json".to_string()); - let driver_config: DriverConfig = serde_json::from_reader(std::fs::File::open(config_path).expect("file exists and is accessible")).expect("valid json for DriverConfig"); - let mut auth_secret = AUTH_SECRET.write().unwrap(); - *auth_secret = Some(driver_config.auth_secret.clone()); - std::mem::drop(auth_secret); - - let config = RustlsConfig::from_pem_file( - driver_config.cert_path.clone(), - driver_config.key_path.clone(), - ).await.unwrap(); - - let dbctx = Arc::new(DbCtx::new(&driver_config.config_path, &driver_config.db_path)); - - dbctx.create_tables().unwrap(); - - let (api_server, mut channel) = make_api_server(Arc::clone(&dbctx)).await; - spawn(axum_server::bind_rustls(driver_config.server_addr.parse().unwrap(), config) - .serve(api_server.into_make_service())); - - spawn(old_task_reaper(Arc::clone(&dbctx))); - - loop { - let mut candidate = match channel.recv().await - .ok_or_else(|| "client channel disconnected".to_string()) { - - Ok(candidate) => { candidate }, - Err(e) => { eprintln!("client error: {}", e); continue; } - }; - - let dbctx = Arc::clone(&dbctx); - spawn(async move { - let host_id = candidate.host_id; - let res = find_client_task(dbctx, candidate).await; - eprintln!("task client for {}: {:?}", host_id, res); - }); - } -} - -async fn find_client_task(dbctx: Arc<DbCtx>, mut candidate: RunnerClient) -> Result<(), String> { - let find_client_task_start = std::time::Instant::now(); - - let (run, job) = 'find_work: loop { - // try to find a job for this candidate: - // * start with pending runs - these need *some* client to run them, but do not care which - // * if no new jobs, maybe an existing job still needs a rerun on this client? - // * otherwise, um, i dunno. do nothing? - - let runs = dbctx.get_pending_runs(Some(candidate.host_id)).unwrap(); - - if runs.len() > 0 { - println!("{} new runs", runs.len()); - - for run in runs.into_iter() { - let job = dbctx.job_by_id(run.job_id).expect("can query").expect("job exists"); - - if candidate.will_accept(&job) { - break 'find_work (run, job); - } - - } - } - - let alt_run_jobs = dbctx.jobs_needing_task_runs_for_host(candidate.host_id as u64).expect("can query"); - - for job in alt_run_jobs.into_iter() { - if candidate.will_accept(&job) { - let run = dbctx.new_run(job.id, Some(candidate.host_id)).unwrap(); - break 'find_work (run, job); - } - } - - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - - if candidate.test_connection().await.is_err() { - return Err("lost client connection".to_string()); - } - - if find_client_task_start.elapsed().as_secs() > 300 { - return Err("5min new task deadline elapsed".to_string()); - } - }; - - eprintln!("enqueueing job {} for alternate run under host id {}", job.id, candidate.host_id); - activate_run(Arc::clone(&dbctx), candidate, &job, &run).await?; - - Ok(()) -} - -async fn old_task_reaper(dbctx: Arc<DbCtx>) { - let mut potentially_stale_tasks = dbctx.get_active_runs().unwrap(); - - let active_tasks = ACTIVE_TASKS.lock().unwrap(); - - for (id, witness) in active_tasks.iter() { - if let Some(idx) = potentially_stale_tasks.iter().position(|task| task.id == *id) { - potentially_stale_tasks.swap_remove(idx); - } - } - - std::mem::drop(active_tasks); - - // ok, so we have tasks that are not active, now if the task is started we know someone should - // be running it and they are not. retain only those tasks, as they are ones we may want to - // mark dead. - // - // further, filter out any tasks created in the last 60 seconds. this is a VERY generous grace - // period for clients that have accepted a job but for some reason we have not recorded them as - // active (perhaps they are slow to ack somehow). - - let stale_threshold = crate::io::now_ms() - 60_000; - - let stale_tasks: Vec<Run> = potentially_stale_tasks.into_iter().filter(|task| { - match (task.state, task.start_time) { - // `run` is atomically set to `Started` and adorned with a `start_time`. disagreement - // between the two means this run is corrupt and should be reaped. - (RunState::Started, None) => { - true - }, - (RunState::Started, Some(start_time)) => { - start_time < stale_threshold - } - // and if it's not `started`, it's either pending (not assigned yet, so not stale), or - // one of the complete statuses. - _ => { - false - } - } - }).collect(); - - for task in stale_tasks.iter() { - eprintln!("looks like task {} is stale, reaping", task.id); - dbctx.reap_task(task.id).expect("works"); - } -} diff --git a/src/ci_runner.rs b/src/ci_runner.rs deleted file mode 100644 index 9aaf33d..0000000 --- a/src/ci_runner.rs +++ /dev/null @@ -1,668 +0,0 @@ -use std::time::Duration; -use std::os::unix::process::ExitStatusExt; -use rlua::prelude::LuaError; -use std::sync::{Arc, Mutex}; -use reqwest::{StatusCode, Response}; -use tokio::process::Command; -use std::process::Stdio; -use std::process::ExitStatus; -use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; -use serde_json::json; -use serde::{Deserialize, de::DeserializeOwned, Serialize}; -use std::task::{Context, Poll}; -use std::pin::Pin; -use std::marker::Unpin; - -mod protocol; -mod lua; -mod io; - -use crate::io::{ArtifactStream, VecSink}; -use crate::protocol::{ClientProto, CommandInfo, TaskInfo, RequestedJob}; -use crate::lua::CommandOutput; - -#[derive(Debug)] -enum WorkAcquireError { - Reqwest(reqwest::Error), - EarlyEof, - Protocol(String), -} - -struct RunnerClient { - http: reqwest::Client, - host: String, - tx: hyper::body::Sender, - rx: Response, - current_job: Option<RequestedJob>, -} - -impl RequestedJob { - pub fn into_running(self, client: RunnerClient) -> RunningJob { - RunningJob { - job: self, - client, - current_step: StepTracker::new(), - } - } -} - -struct JobEnv { - lua: lua::BuildEnv, - job: Arc<Mutex<RunningJob>>, -} - -impl JobEnv { - fn new(job: &Arc<Mutex<RunningJob>>) -> Self { - let lua = lua::BuildEnv::new(job); - JobEnv { - lua, - job: Arc::clone(job) - } - } - - async fn default_goodfile(self) -> Result<(), LuaError> { - self.lua.run_build(crate::lua::DEFAULT_RUST_GOODFILE).await - } - - async fn exec_goodfile(self) -> Result<(), LuaError> { - let script = std::fs::read_to_string("./tmpdir/goodfile").unwrap(); - self.lua.run_build(script.as_bytes()).await - } -} - -pub struct RunningJob { - job: RequestedJob, - client: RunnerClient, - current_step: StepTracker, -} - -enum RepoError { - CloneFailedIdk { exit_code: ExitStatus }, - CheckoutFailedIdk { exit_code: ExitStatus }, - CheckoutFailedMissingRef, -} - -pub struct StepTracker { - scopes: Vec<String> -} - -impl StepTracker { - pub fn new() -> Self { - StepTracker { - scopes: Vec::new() - } - } - - pub fn push(&mut self, name: String) { - self.scopes.push(name); - } - - pub fn pop(&mut self) { - self.scopes.pop(); - } - - pub fn clear(&mut self) { - self.scopes.clear(); - } - - pub fn full_step_path(&self) -> &[String] { - self.scopes.as_slice() - } -} - -impl RunningJob { - async fn send_metric(&mut self, name: &str, value: String) -> Result<(), String> { - self.client.send_typed(&ClientProto::metric(name, value)) - .await - .map_err(|e| format!("failed to send metric {}: {:?})", name, e)) - } - - // TODO: panics if hyper finds the channel is closed. hum - async fn create_artifact(&self, name: &str, desc: &str) -> Result<ArtifactStream, String> { - let (mut sender, body) = hyper::Body::channel(); - let resp = self.client.http.post("https://ci.butactuallyin.space:9876/api/artifact") - .header("user-agent", "ci-butactuallyin-space-runner") - .header("x-task-token", &self.job.build_token) - .header("x-artifact-name", name) - .header("x-artifact-desc", desc) - .body(body) - .send() - .await - .map_err(|e| format!("unable to send request: {:?}", e))?; - - if resp.status() == StatusCode::OK { - eprintln!("[+] artifact '{}' started", name); - Ok(ArtifactStream::new(sender)) - } else { - Err(format!("[-] unable to create artifact: {:?}", resp)) - } - } - - async fn clone_remote(&self) -> Result<(), RepoError> { - let mut git_clone = Command::new("git"); - git_clone - .arg("clone") - .arg(&self.job.remote_url) - .arg("tmpdir"); - - let clone_res = self.execute_command_and_report(git_clone, "git clone log", &format!("git clone {} tmpdir", &self.job.remote_url)).await - .map_err(|e| { - eprintln!("stringy error (exec failed?) for clone: {}", e); - RepoError::CloneFailedIdk { exit_code: ExitStatus::from_raw(0) } - })?; - - if !clone_res.success() { - return Err(RepoError::CloneFailedIdk { exit_code: clone_res }); - } - - let mut git_checkout = Command::new("git"); - git_checkout - .current_dir("tmpdir") - .arg("checkout") - .arg(&self.job.commit); - - let checkout_res = self.execute_command_and_report(git_checkout, "git checkout log", &format!("git checkout {}", &self.job.commit)).await - .map_err(|e| { - eprintln!("stringy error (exec failed?) for checkout: {}", e); - RepoError::CheckoutFailedIdk { exit_code: ExitStatus::from_raw(0) } - })?; - - if !checkout_res.success() { - if checkout_res.code() == Some(128) { - return Err(RepoError::CheckoutFailedIdk { exit_code: checkout_res }); - } else { - return Err(RepoError::CheckoutFailedMissingRef); - } - } - - Ok(()) - } - - async fn execute_command_and_report(&self, mut command: Command, name: &str, desc: &str) -> Result<ExitStatus, String> { - let stdout_artifact = self.create_artifact( - &format!("{} (stdout)", name), - &format!("{} (stdout)", desc) - ).await.expect("works"); - let stderr_artifact = self.create_artifact( - &format!("{} (stderr)", name), - &format!("{} (stderr)", desc) - ).await.expect("works"); - - let exit_status = self.execute_command(command, name, desc, stdout_artifact, stderr_artifact).await?; - - Ok(exit_status) - } - - async fn execute_command_capture_output(&self, mut command: Command, name: &str, desc: &str) -> Result<crate::lua::CommandOutput, String> { - let stdout_collector = VecSink::new(); - let stderr_collector = VecSink::new(); - - let exit_status = self.execute_command(command, name, desc, stdout_collector.clone(), stderr_collector.clone()).await?; - - Ok(CommandOutput { - exit_status, - stdout: stdout_collector.take_buf(), - stderr: stderr_collector.take_buf(), - }) - } - - async fn execute_command(&self, mut command: Command, name: &str, desc: &str, mut stdout_reporter: impl AsyncWrite + Unpin + Send + 'static, mut stderr_reporter: impl AsyncWrite + Unpin + Send + 'static) -> Result<ExitStatus, String> { - eprintln!("[.] running {}", name); - - let mut child = command - .stdin(Stdio::null()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .spawn() - .map_err(|e| format!("failed to spawn '{}', {:?}", name, e))?; - - let mut child_stdout = child.stdout.take().unwrap(); - let mut child_stderr = child.stderr.take().unwrap(); - - eprintln!("[.] '{}': forwarding stdout", name); - tokio::spawn(async move { crate::io::forward_data(&mut child_stdout, &mut stdout_reporter).await }); - eprintln!("[.] '{}': forwarding stderr", name); - tokio::spawn(async move { crate::io::forward_data(&mut child_stderr, &mut stderr_reporter).await }); - - let res = child.wait().await - .map_err(|e| format!("failed to wait? {:?}", e))?; - - if res.success() { - eprintln!("[+] '{}' success", name); - } else { - eprintln!("[-] '{}' fail: {:?}", name, res); - } - - Ok(res) - } - - async fn run(mut self) { - self.client.send_typed(&ClientProto::Started).await.unwrap(); - - std::fs::remove_dir_all("tmpdir").unwrap(); - std::fs::create_dir("tmpdir").unwrap(); - - let ctx = Arc::new(Mutex::new(self)); - - let checkout_res = ctx.lock().unwrap().clone_remote().await; - - if let Err(e) = checkout_res { - let status = "bad_ref"; - let status = ClientProto::task_status(TaskInfo::finished(status)); - eprintln!("checkout failed, reporting status: {:?}", status); - - let res = ctx.lock().unwrap().client.send_typed(&status).await; - if let Err(e) = res { - eprintln!("[!] FAILED TO REPORT JOB STATUS ({}): {:?}", "success", e); - } - - return; - } - - let lua_env = JobEnv::new(&ctx); - - let metadata = std::fs::metadata("./tmpdir/goodfile"); - let res: Result<String, (String, String)> = match metadata { - Ok(_) => { - match lua_env.exec_goodfile().await { - Ok(()) => { - Ok("pass".to_string()) - }, - Err(lua_err) => { - Err(("failed".to_string(), lua_err.to_string())) - } - } - }, - Err(e) if e.kind() == std::io::ErrorKind::NotFound => { - match lua_env.default_goodfile().await { - Ok(()) => { - Ok("pass".to_string()) - }, - Err(lua_err) => { - Err(("failed".to_string(), lua_err.to_string())) - } - } - }, - Err(e) => { - eprintln!("[-] error finding goodfile: {:?}", e); - Err(("failed".to_string(), "inaccessible goodfile".to_string())) - } - }; - - match res { - Ok(status) => { - eprintln!("[+] job success!"); - let status = ClientProto::task_status(TaskInfo::finished(status)); - eprintln!("reporting status: {:?}", status); - - let res = ctx.lock().unwrap().client.send_typed(&status).await; - if let Err(e) = res { - eprintln!("[!] FAILED TO REPORT JOB STATUS ({}): {:?}", "success", e); - } - } - Err((status, lua_err)) => { - eprintln!("[-] job error: {}", status); - - let status = ClientProto::task_status(TaskInfo::interrupted(status, lua_err.to_string())); - let res = ctx.lock().unwrap().client.send_typed(&status).await; - if let Err(e) = res { - eprintln!("[!] FAILED TO REPORT JOB STATUS ({:?}): {:?}", status, e); - } - } - } - } - - fn prep_command(command: &[String], working_dir: Option<&str>) -> (Command, String) { - let mut cmd = Command::new(&command[0]); - let cwd = match working_dir { - Some(dir) => { - format!("tmpdir/{}", dir) - }, - None => { - "tmpdir".to_string() - } - }; - eprintln!("prepared {:?} to run in {}", &command, &cwd); - let human_name = command.join(" "); - cmd - .current_dir(cwd) - .args(&command[1..]); - (cmd, human_name) - } - - async fn run_with_output(&mut self, command: &[String], working_dir: Option<&str>) -> Result<CommandOutput, String> { - let (cmd, human_name) = Self::prep_command(command, working_dir); - - let cmd_res = self.execute_command_capture_output(cmd, &format!("{} log", human_name), &human_name).await?; - - if !cmd_res.exit_status.success() { - return Err(format!("{} failed: {:?}", &human_name, cmd_res.exit_status)); - } - Ok(cmd_res) - } - - async fn run_command(&mut self, command: &[String], working_dir: Option<&str>) -> Result<(), String> { - self.client.send_typed(&ClientProto::command(CommandInfo::started(command, working_dir, 1))) - .await.unwrap(); - - let (cmd, human_name) = Self::prep_command(command, working_dir); - - let cmd_res = self.execute_command_and_report(cmd, &format!("{} log", human_name), &human_name).await?; - - self.client.send_typed(&ClientProto::command(CommandInfo::finished(cmd_res.code(), 1))) - .await.unwrap(); - - - if !cmd_res.success() { - return Err(format!("{} failed: {:?}", &human_name, cmd_res)); - } - - Ok(()) - } -} - -impl RunnerClient { - async fn new(host: &str, mut sender: hyper::body::Sender, mut res: Response) -> Result<Self, String> { - if res.status() != StatusCode::OK { - return Err(format!("server returned a bad response: {:?}, response itself: {:?}", res.status(), res)); - } - - let hello = res.chunk().await.expect("chunk"); - if hello.as_ref().map(|x| &x[..]) != Some(b"hello") { - return Err(format!("bad hello: {:?}", hello)); - } - - Ok(Self { - http: reqwest::ClientBuilder::new() - .connect_timeout(Duration::from_millis(1000)) - .timeout(Duration::from_millis(600000)) - .build() - .expect("can build client"), - host: host.to_string(), - tx: sender, - rx: res, - current_job: None, - }) - } - - async fn wait_for_work(&mut self, accepted_pushers: Option<&[String]>) -> Result<Option<RequestedJob>, WorkAcquireError> { - loop { - let message = self.recv_typed::<ClientProto>().await; - match message { - Ok(Some(ClientProto::NewTask(new_task))) => { - return Ok(Some(new_task)); - }, - Ok(Some(ClientProto::Ping)) => { - self.send_typed(&ClientProto::Pong).await - .map_err(|e| WorkAcquireError::Protocol(format!("failed to pong: {}", e)))?; - }, - Ok(Some(other)) => { - return Err(WorkAcquireError::Protocol(format!("unexpected message: {:?}", other))); - }, - Ok(None) => { - return Ok(None); - }, - Err(e) => { - return Err(WorkAcquireError::Protocol(e)); - } - } - } - } - - async fn recv(&mut self) -> Result<Option<serde_json::Value>, String> { - self.recv_typed().await - } - - async fn recv_typed<T: DeserializeOwned>(&mut self) -> Result<Option<T>, String> { - match self.rx.chunk().await { - Ok(Some(chunk)) => { - serde_json::from_slice(&chunk) - .map(Option::Some) - .map_err(|e| { - format!("not json: {:?}", e) - }) - }, - Ok(None) => Ok(None), - Err(e) => { - Err(format!("error in recv: {:?}", e)) - } - } - } - - async fn send(&mut self, value: serde_json::Value) -> Result<(), String> { - self.send_typed(&value).await - } - - async fn send_typed<T: Serialize>(&mut self, t: &T) -> Result<(), String> { - self.tx.send_data( - serde_json::to_vec(t) - .map_err(|e| format!("json error: {:?}", e))? - .into() - ).await - .map_err(|e| format!("send error: {:?}", e)) - } -} - -#[derive(Deserialize, Serialize)] -struct RunnerConfig { - server_address: String, - auth_secret: String, - allowed_pushers: Option<Vec<String>>, -} - -#[tokio::main] -async fn main() { - tracing_subscriber::fmt::init(); - let mut args = std::env::args(); - args.next().expect("first arg exists"); - let config_path = args.next().unwrap_or("./runner_config.json".to_string()); - let runner_config: RunnerConfig = serde_json::from_reader(std::fs::File::open(config_path).expect("file exists and is accessible")).expect("valid json for RunnerConfig"); - let client = reqwest::ClientBuilder::new() - .connect_timeout(Duration::from_millis(1000)) - .timeout(Duration::from_millis(600000)) - .build() - .expect("can build client"); - - let host_info = host_info::collect_host_info(); - eprintln!("host info: {:?}", host_info); - - loop { - let (mut sender, body) = hyper::Body::channel(); - - sender.send_data(serde_json::to_string(&ClientProto::new_task_please( - runner_config.allowed_pushers.clone(), - host_info.clone(), - )).unwrap().into()).await.expect("req"); - - let poll = client.post("https://ci.butactuallyin.space:9876/api/next_job") - .header("user-agent", "ci-butactuallyin-space-runner") - .header("authorization", runner_config.auth_secret.trim()) - .body(body) - .send() - .await; - - match poll { - Ok(mut res) => { - let mut client = match RunnerClient::new("ci.butactuallyin.space:9876", sender, res).await { - Ok(client) => client, - Err(e) => { - eprintln!("failed to initialize client: {:?}", e); - std::thread::sleep(Duration::from_millis(10000)); - continue; - } - }; - let job = match client.wait_for_work(runner_config.allowed_pushers.as_ref().map(|x| x.as_ref())).await { - Ok(Some(request)) => request, - Ok(None) => { - eprintln!("no work to do (yet)"); - std::thread::sleep(Duration::from_millis(2000)); - continue; - } - Err(e) => { - eprintln!("failed to get work: {:?}", e); - std::thread::sleep(Duration::from_millis(10000)); - continue; - } - }; - eprintln!("requested work: {:?}", job); - - eprintln!("doing {:?}", job); - - let mut job = job.into_running(client); - job.run().await; - std::thread::sleep(Duration::from_millis(10000)); - }, - Err(e) => { - let message = format!("{}", e); - - if message.contains("tcp connect error") { - eprintln!("could not reach server. sleeping a bit and retrying."); - std::thread::sleep(Duration::from_millis(5000)); - continue; - } - - eprintln!("unhandled error: {}", message); - - std::thread::sleep(Duration::from_millis(1000)); - } - } - } -} - -mod host_info { - use crate::protocol::{CpuInfo, EnvInfo, HostInfo, MemoryInfo}; - - // get host model name, microcode, and how many cores - fn collect_cpu_info() -> CpuInfo { - fn find_line(lines: &[String], prefix: &str) -> String { - lines.iter() - .find(|line| line.starts_with(prefix)) - .expect(&format!("{} line is present", prefix)) - .split(":") - .last() - .unwrap() - .trim() - .to_string() - } - - /// try finding core `cpu`'s max frequency in khz. we'll assume this is the actual speed a - /// build would run at.. fingers crossed. - fn try_finding_cpu_freq(cpu: u32) -> Result<u64, String> { - if let Ok(freq_str) = std::fs::read_to_string("/sys/devices/system/cpu/cpu0/cpufreq/cpuinfo_max_freq") { - Ok(freq_str.trim().parse().unwrap()) - } else { - // so cpufreq probably isn't around, maybe /proc/cpuinfo's mhz figure is present? - let cpu_lines: Vec<String> = std::fs::read_to_string("/proc/cpuinfo").unwrap().split("\n").map(|line| line.to_string()).collect(); - let cpu_mhzes: Vec<&String> = cpu_lines.iter().filter(|line| line.starts_with("cpu MHz")).collect(); - match cpu_mhzes.get(cpu as usize) { - Some(mhz) => { - let mut line_parts = cpu_mhzes[cpu as usize].split(":"); - let _ = line_parts.next(); - let mhz = line_parts.next().unwrap().trim(); - let mhz: f64 = mhz.parse().unwrap(); - Ok((mhz * 1000.0) as u64) - }, - None => { - panic!("could not get cpu freq either from cpufreq or /proc/cpuinfo?"); - } - } - } - } - - // we'll have to deploy one of a few techniques, because x86/x86_64 is internally - // consistent, but aarch64 is different. who knows what other CPUs think. - match std::env::consts::ARCH { - "x86" | "x86_64" => { - let cpu_lines: Vec<String> = std::fs::read_to_string("/proc/cpuinfo").unwrap().split("\n").map(|line| line.to_string()).collect(); - let model_names: Vec<&String> = cpu_lines.iter().filter(|line| line.starts_with("model name")).collect(); - let cores = model_names.len() as u32; - let model_name = find_line(&cpu_lines, "model name"); - let vendor_id = find_line(&cpu_lines, "vendor_id"); - let family = find_line(&cpu_lines, "cpu family"); - let model = find_line(&cpu_lines, "model\t"); - let microcode = find_line(&cpu_lines, "microcode"); - let max_freq = try_finding_cpu_freq(0).unwrap(); - - CpuInfo { model_name, microcode, cores, vendor_id, family, model, max_freq } - } - "aarch64" => { - let cpu_lines: Vec<String> = std::fs::read_to_string("/proc/cpuinfo").unwrap().split("\n").map(|line| line.to_string()).collect(); - let processors: Vec<&String> = cpu_lines.iter().filter(|line| line.starts_with("processor")).collect(); - let cores = processors.len() as u32; - - // alternate possible path: /sys/firmware/devicetree/base/compatible - let model_name = std::fs::read_to_string("/proc/device-tree/compatible").unwrap(); - let model_name = model_name.replace("\x00", ";"); - let vendor_id = find_line(&cpu_lines, "CPU implementer"); - let vendor_name = match vendor_id.as_str() { - "0x41" => "Arm Limited".to_string(), - "0x42" => "Broadcom Corporation".to_string(), - "0x43" => "Cavium Inc".to_string(), - "0x44" => "Digital Equipment Corporation".to_string(), - "0x46" => "Fujitsu Ltd".to_string(), - "0x49" => "Infineon Technologies AG".to_string(), - "0x4d" => "Motorola".to_string(), - "0x4e" => "NVIDIA Corporation".to_string(), - "0x50" => "Applied Micro Circuits Corporation".to_string(), - "0x51" => "Qualcomm Inc".to_string(), - "0x56" => "Marvell International Ltd".to_string(), - "0x69" => "Intel Corporation".to_string(), - "0xc0" => "Ampere Computing".to_string(), - other => format!("unknown aarch64 vendor {}", other), - }; - let family = find_line(&cpu_lines, "CPU architecture"); - let model = find_line(&cpu_lines, "CPU part"); - let microcode = String::new(); - let max_freq = std::fs::read_to_string("/sys/devices/system/cpu/cpu0/cpufreq/cpuinfo_max_freq").unwrap().trim().parse().unwrap(); - - CpuInfo { model_name, microcode, cores, vendor_id: vendor_name, family, model, max_freq } - } - other => { - panic!("dunno how to find cpu info for {}, panik", other); - } - } - } - - fn collect_mem_info() -> MemoryInfo { - let mem_lines: Vec<String> = std::fs::read_to_string("/proc/meminfo").unwrap().split("\n").map(|line| line.to_string()).collect(); - let total = mem_lines[0].split(":").last().unwrap().trim().to_string(); - let available = mem_lines[2].split(":").last().unwrap().trim().to_string(); - - MemoryInfo { total, available } - } - - fn hostname() -> String { - let mut bytes = [0u8; 4096]; - let res = unsafe { - libc::gethostname(bytes.as_mut_ptr() as *mut std::ffi::c_char, bytes.len()) - }; - if res != 0 { - panic!("gethostname failed {:?}", res); - } - let end = bytes.iter().position(|b| *b == 0).expect("hostname is null-terminated"); - std::ffi::CStr::from_bytes_with_nul(&bytes[..end+1]).expect("null-terminated string").to_str().expect("is utf-8").to_string() - } - - pub fn collect_env_info() -> EnvInfo { - EnvInfo { - arch: std::env::consts::ARCH.to_string(), - family: std::env::consts::FAMILY.to_string(), - os: std::env::consts::OS.to_string(), - } - } - - pub fn collect_host_info() -> HostInfo { - let cpu_info = collect_cpu_info(); - let memory_info = collect_mem_info(); - let hostname = hostname(); - let env_info = collect_env_info(); - - HostInfo { - hostname, - cpu_info, - memory_info, - env_info, - } - } -} - diff --git a/src/dbctx.rs b/src/dbctx.rs deleted file mode 100644 index 7378b2e..0000000 --- a/src/dbctx.rs +++ /dev/null @@ -1,772 +0,0 @@ -use std::sync::Mutex; -use futures_util::StreamExt; -use rusqlite::{Connection, OptionalExtension}; -use std::time::{SystemTime, UNIX_EPOCH}; -use tokio::fs::{File, OpenOptions}; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use std::path::Path; -use std::path::PathBuf; - -use crate::io::ArtifactDescriptor; -use crate::notifier::{RemoteNotifier, NotifierConfig}; -use crate::sql; - -const TOKEN_EXPIRY_MS: u64 = 1000 * 60 * 30; - -pub struct DbCtx { - pub config_path: PathBuf, - // don't love this but.. for now... - pub conn: Mutex<Connection>, -} - -#[derive(Debug, Clone)] -pub struct Repo { - pub id: u64, - pub name: String, - pub default_run_preference: Option<String>, -} - -#[derive(Debug)] -pub struct Remote { - pub id: u64, - pub repo_id: u64, - pub remote_path: String, - pub remote_api: String, - pub remote_url: String, - pub remote_git_url: String, - pub notifier_config_path: String, -} - -// a job tracks when we became aware of a commit from remote. typically a job will have a 1-1 -// relationship with commits, and potentially many *runs* of that job. -#[derive(Debug, Clone)] -pub struct Job { - pub id: u64, - pub remote_id: u64, - pub commit_id: u64, - pub created_time: u64, - pub source: Option<String>, - pub run_preferences: Option<String>, -} - -// a run tracks the intent or obligation to have some runner somewhere run a goodfile and report -// results. a job may have many runs from many different hosts rebuliding history, or reruns of the -// same job on the same hardware to collect more datapoints on the operation. -#[derive(Debug, Clone)] -pub struct Run { - pub id: u64, - pub job_id: u64, - pub artifacts_path: Option<String>, - pub state: sql::RunState, - pub host_id: Option<u64>, - pub create_time: u64, - pub start_time: Option<u64>, - pub complete_time: Option<u64>, - pub build_token: Option<String>, - pub run_timeout: Option<u64>, - pub build_result: Option<u8>, - pub final_text: Option<String>, -} - -impl Run { - fn into_pending_run(self) -> PendingRun { - PendingRun { - id: self.id, - job_id: self.job_id, - create_time: self.create_time, - } - } -} - -#[derive(Debug, Clone)] -pub struct PendingRun { - pub id: u64, - pub job_id: u64, - pub create_time: u64, -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum TokenValidity { - Expired, - Invalid, - Valid, -} - -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct MetricRecord { - pub id: u64, - pub run_id: u64, - pub name: String, - pub value: String -} - -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct ArtifactRecord { - pub id: u64, - pub run_id: u64, - pub name: String, - pub desc: String, - pub created_time: u64, - pub completed_time: Option<u64>, -} - -impl DbCtx { - pub fn new<P: AsRef<Path>>(config_path: P, db_path: P) -> Self { - DbCtx { - config_path: config_path.as_ref().to_owned(), - conn: Mutex::new(Connection::open(db_path).unwrap()) - } - } - - pub fn create_tables(&self) -> Result<(), ()> { - let conn = self.conn.lock().unwrap(); - conn.execute(sql::CREATE_ARTIFACTS_TABLE, ()).unwrap(); - conn.execute(sql::CREATE_JOBS_TABLE, ()).unwrap(); - conn.execute(sql::CREATE_METRICS_TABLE, ()).unwrap(); - conn.execute(sql::CREATE_COMMITS_TABLE, ()).unwrap(); - conn.execute(sql::CREATE_REPOS_TABLE, ()).unwrap(); - conn.execute(sql::CREATE_REPO_NAME_INDEX, ()).unwrap(); - conn.execute(sql::CREATE_REMOTES_TABLE, ()).unwrap(); - conn.execute(sql::CREATE_REMOTES_INDEX, ()).unwrap(); - conn.execute(sql::CREATE_RUNS_TABLE, ()).unwrap(); - conn.execute(sql::CREATE_HOSTS_TABLE, ()).unwrap(); - - Ok(()) - } - - pub fn insert_metric(&self, run_id: u64, name: &str, value: &str) -> Result<(), String> { - let conn = self.conn.lock().unwrap(); - conn - .execute( - "insert into metrics (run_id, name, value) values (?1, ?2, ?3) on conflict (run_id, name) do update set value=excluded.value", - (run_id, name, value) - ) - .expect("can upsert"); - Ok(()) - } - - pub fn new_commit(&self, sha: &str) -> Result<u64, String> { - let conn = self.conn.lock().unwrap(); - conn - .execute( - "insert into commits (sha) values (?1)", - [sha.clone()] - ) - .expect("can insert"); - - Ok(conn.last_insert_rowid() as u64) - } - - pub fn new_repo(&self, name: &str) -> Result<u64, String> { - let conn = self.conn.lock().unwrap(); - conn - .execute( - "insert into repos (repo_name) values (?1)", - [name.clone()] - ) - .map_err(|e| { - format!("{:?}", e) - })?; - - Ok(conn.last_insert_rowid() as u64) - } - - pub async fn finalize_artifact(&self, artifact_id: u64) -> Result<(), String> { - let conn = self.conn.lock().unwrap(); - conn - .execute( - "update artifacts set completed_time=?1 where id=?2", - (crate::io::now_ms(), artifact_id) - ) - .map(|_| ()) - .map_err(|e| { - format!("{:?}", e) - }) - } - - pub async fn reserve_artifact(&self, run_id: u64, name: &str, desc: &str) -> Result<ArtifactDescriptor, String> { - let artifact_id = { - let created_time = SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("now is before epoch") - .as_millis() as u64; - let conn = self.conn.lock().unwrap(); - conn - .execute( - "insert into artifacts (run_id, name, desc, created_time) values (?1, ?2, ?3, ?4)", - (run_id, name, desc, created_time) - ) - .map_err(|e| { - format!("{:?}", e) - })?; - - conn.last_insert_rowid() as u64 - }; - - ArtifactDescriptor::new(run_id, artifact_id).await - } - - pub fn lookup_artifact(&self, run_id: u64, artifact_id: u64) -> Result<Option<ArtifactRecord>, String> { - let conn = self.conn.lock().unwrap(); - conn - .query_row(sql::ARTIFACT_BY_ID, [artifact_id, run_id], |row| { - let (id, run_id, name, desc, created_time, completed_time) = row.try_into().unwrap(); - - Ok(ArtifactRecord { - id, run_id, name, desc, created_time, completed_time - }) - }) - .optional() - .map_err(|e| e.to_string()) - } - - pub fn commit_sha(&self, commit_id: u64) -> Result<String, String> { - self.conn.lock() - .unwrap() - .query_row( - "select sha from commits where id=?1", - [commit_id], - |row| { row.get(0) } - ) - .map_err(|e| e.to_string()) - } - - pub fn job_for_commit(&self, sha: &str) -> Result<Option<u64>, String> { - self.conn.lock() - .unwrap() - .query_row( - "select id from commits where sha=?1", - [sha], - |row| { row.get(0) } - ) - .optional() - .map_err(|e| e.to_string()) - } - - pub fn run_for_token(&self, token: &str) -> Result<Option<(u64, Option<String>, TokenValidity)>, String> { - self.conn.lock() - .unwrap() - .query_row( - "select id, artifacts_path, started_time, run_timeout from runs where build_token=?1", - [token], - |row| { - let timeout: Option<u64> = row.get(3).unwrap(); - let timeout = timeout.unwrap_or(TOKEN_EXPIRY_MS); - - let now = SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("now is before epoch") - .as_millis(); - - let time: Option<u64> = row.get(2).unwrap(); - let validity = if let Some(time) = time { - if now > time as u128 + timeout as u128 { - TokenValidity::Expired - } else { - TokenValidity::Valid - } - } else { - TokenValidity::Invalid - }; - Ok((row.get(0).unwrap(), row.get(1).unwrap(), validity)) - } - ) - .optional() - .map_err(|e| e.to_string()) - } - - pub fn job_by_id(&self, id: u64) -> Result<Option<Job>, String> { - self.conn.lock() - .unwrap() - .query_row(crate::sql::JOB_BY_ID, [id], |row| { - let (id, source, created_time, remote_id, commit_id, run_preferences) = row.try_into().unwrap(); - - Ok(Job { - id, source, created_time, remote_id, commit_id, run_preferences - }) - }) - .optional() - .map_err(|e| e.to_string()) - } - - pub fn remote_by_path_and_api(&self, api: &str, path: &str) -> Result<Option<Remote>, String> { - self.conn.lock() - .unwrap() - .query_row("select id, repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path from remotes where remote_api=?1 and remote_path=?2", [api, path], |row| { - let (id, repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path) = row.try_into().unwrap(); - - Ok(Remote { - id, repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path - }) - }) - .optional() - .map_err(|e| e.to_string()) - } - - pub fn remote_by_id(&self, id: u64) -> Result<Option<Remote>, String> { - self.conn.lock() - .unwrap() - .query_row("select id, repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path from remotes where id=?1", [id], |row| { - let (id, repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path) = row.try_into().unwrap(); - - Ok(Remote { - id, repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path - }) - }) - .optional() - .map_err(|e| e.to_string()) - } - - pub fn repo_id_by_remote(&self, remote_id: u64) -> Result<Option<u64>, String> { - self.conn.lock() - .unwrap() - .query_row("select repo_id from remotes where id=?1", [remote_id], |row| row.get(0)) - .optional() - .map_err(|e| e.to_string()) - } - - pub fn repo_id_by_name(&self, repo_name: &str) -> Result<Option<u64>, String> { - self.conn.lock() - .unwrap() - .query_row("select id from repos where repo_name=?1", [repo_name], |row| row.get(0)) - .optional() - .map_err(|e| e.to_string()) - } - - pub fn new_remote(&self, repo_id: u64, remote: &str, remote_kind: &str, config_path: &str) -> Result<u64, String> { - let (remote_path, remote_api, remote_url, remote_git_url) = match remote_kind { - "github" => { - (remote.to_owned(), remote_kind.to_owned(), format!("https://www.github.com/{}", remote), format!("https://www.github.com/{}.git", remote)) - }, - "github-email" => { - (remote.to_owned(), "email".to_owned(), format!("https://www.github.com/{}", remote), format!("http://www.github.com/{}.git", remote)) - }, - other => { - panic!("unsupported remote kind: {}", other); - } - }; - - let conn = self.conn.lock().unwrap(); - conn - .execute( - "insert into remotes (repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path) values (?1, ?2, ?3, ?4, ?5, ?6);", - (repo_id, remote_path, remote_api, remote_url, remote_git_url, config_path) - ) - .expect("can insert"); - - Ok(conn.last_insert_rowid() as u64) - } - - pub fn new_job(&self, remote_id: u64, sha: &str, pusher: Option<&str>, repo_default_run_pref: Option<String>) -> Result<u64, String> { - // TODO: potential race: if two remotes learn about a commit at the same time and we decide - // to create two jobs at the same time, this might return an incorrect id if the insert - // didn't actually insert a new row. - let commit_id = self.new_commit(sha).expect("can create commit record"); - - let created_time = SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("now is before epoch") - .as_millis() as u64; - - let conn = self.conn.lock().unwrap(); - - let rows_modified = conn.execute( - "insert into jobs (remote_id, commit_id, created_time, source, run_preferences) values (?1, ?2, ?3, ?4, ?5);", - (remote_id, commit_id, created_time, pusher, repo_default_run_pref) - ).unwrap(); - - assert_eq!(1, rows_modified); - - let job_id = conn.last_insert_rowid() as u64; - - Ok(job_id) - } - - pub fn new_run(&self, job_id: u64, host_preference: Option<u32>) -> Result<PendingRun, String> { - let created_time = SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("now is before epoch") - .as_millis() as u64; - - let conn = self.conn.lock().unwrap(); - - let rows_modified = conn.execute( - "insert into runs (job_id, state, created_time, host_preference) values (?1, ?2, ?3, ?4);", - (job_id, crate::sql::RunState::Pending as u64, created_time, host_preference) - ).unwrap(); - - assert_eq!(1, rows_modified); - - let run_id = conn.last_insert_rowid() as u64; - - Ok(PendingRun { - id: run_id, - job_id, - create_time: created_time, - }) - } - - pub fn reap_task(&self, task_id: u64) -> Result<(), String> { - let conn = self.conn.lock().unwrap(); - - conn.execute( - "update runs set final_status=\"lost signal\", state=4 where id=?1;", - [task_id] - ).unwrap(); - - Ok(()) - } - - pub fn metrics_for_run(&self, run: u64) -> Result<Vec<MetricRecord>, String> { - let conn = self.conn.lock().unwrap(); - - let mut metrics_query = conn.prepare(sql::METRICS_FOR_RUN).unwrap(); - let mut result = metrics_query.query([run]).unwrap(); - let mut metrics = Vec::new(); - - while let Some(row) = result.next().unwrap() { - let (id, run_id, name, value): (u64, u64, String, String) = row.try_into().unwrap(); - metrics.push(MetricRecord { id, run_id, name, value }); - } - - Ok(metrics) - } - - pub fn artifacts_for_run(&self, run: u64, limit: Option<u64>) -> Result<Vec<ArtifactRecord>, String> { - let conn = self.conn.lock().unwrap(); - - let mut artifacts_query = conn.prepare(sql::LAST_ARTIFACTS_FOR_RUN).unwrap(); - let mut result = artifacts_query.query([run, limit.unwrap_or(65535)]).unwrap(); - let mut artifacts = Vec::new(); - - while let Some(row) = result.next().unwrap() { - let (id, run_id, name, desc, created_time, completed_time): (u64, u64, String, String, u64, Option<u64>) = row.try_into().unwrap(); - artifacts.push(ArtifactRecord { id, run_id, name, desc, created_time, completed_time }); - } - - Ok(artifacts) - } - - pub fn repo_by_id(&self, id: u64) -> Result<Option<Repo>, String> { - self.conn.lock() - .unwrap() - .query_row("select id, repo_name, default_run_preference from repos where id=?1", [id], |row| { - let (id, repo_name, default_run_preference) = row.try_into().unwrap(); - Ok(Repo { - id, - name: repo_name, - default_run_preference, - }) - }) - .optional() - .map_err(|e| e.to_string()) - } - - pub fn get_repos(&self) -> Result<Vec<Repo>, String> { - let conn = self.conn.lock().unwrap(); - - let mut repos_query = conn.prepare(sql::ALL_REPOS).unwrap(); - let mut repos = repos_query.query([]).unwrap(); - let mut result = Vec::new(); - - while let Some(row) = repos.next().unwrap() { - let (id, repo_name, default_run_preference) = row.try_into().unwrap(); - result.push(Repo { - id, - name: repo_name, - default_run_preference, - }); - } - - Ok(result) - } - - pub fn last_job_from_remote(&self, id: u64) -> Result<Option<Job>, String> { - self.recent_jobs_from_remote(id, 1) - .map(|mut jobs| jobs.pop()) - } - - pub fn job_by_commit_id(&self, commit_id: u64) -> Result<Option<Job>, String> { - let conn = self.conn.lock().unwrap(); - - conn - .query_row(sql::JOB_BY_COMMIT_ID, [commit_id], |row| { - let (id, source, created_time, remote_id, commit_id, run_preferences) = row.try_into().unwrap(); - Ok(Job { - id, - remote_id, - commit_id, - created_time, - source, - run_preferences, - }) - }) - .optional() - .map_err(|e| e.to_string()) - } - - pub fn recent_jobs_from_remote(&self, id: u64, limit: u64) -> Result<Vec<Job>, String> { - let conn = self.conn.lock().unwrap(); - - let mut job_query = conn.prepare(sql::LAST_JOBS_FROM_REMOTE).unwrap(); - let mut result = job_query.query([id, limit]).unwrap(); - - let mut jobs = Vec::new(); - - while let Some(row) = result.next().unwrap() { - let (id, source, created_time, remote_id, commit_id, run_preferences) = row.try_into().unwrap(); - jobs.push(Job { - id, - remote_id, - commit_id, - created_time, - source, - run_preferences, - }); - } - - Ok(jobs) - } - - pub fn get_active_runs(&self) -> Result<Vec<Run>, String> { - let conn = self.conn.lock().unwrap(); - - let mut started_query = conn.prepare(sql::ACTIVE_RUNS).unwrap(); - let mut runs = started_query.query([]).unwrap(); - let mut started = Vec::new(); - - while let Some(row) = runs.next().unwrap() { - started.push(crate::sql::row2run(row)); - } - - Ok(started) - } - - pub fn get_pending_runs(&self, host_id: Option<u32>) -> Result<Vec<PendingRun>, String> { - let conn = self.conn.lock().unwrap(); - - let mut pending_query = conn.prepare(sql::PENDING_RUNS).unwrap(); - let mut runs = pending_query.query([host_id]).unwrap(); - let mut pending = Vec::new(); - - while let Some(row) = runs.next().unwrap() { - let (id, job_id, create_time) = row.try_into().unwrap(); - let run = PendingRun { - id, - job_id, - create_time, - }; - pending.push(run); - } - - Ok(pending) - } - - pub fn jobs_needing_task_runs_for_host(&self, host_id: u64) -> Result<Vec<Job>, String> { - // for jobs that this host has not run, we'll arbitrarily say that we won't generate new - // runs for jobs more than a day old. - // - // we don't want to rebuild the entire history every time we see a new host by default; if - // you really want to rebuild all of history on a new host, use `ci_ctl` to prepare the - // runs. - let cutoff = crate::io::now_ms() - 24 * 3600 * 1000; - - let conn = self.conn.lock().unwrap(); - - let mut jobs_needing_task_runs = conn.prepare(sql::JOBS_NEEDING_HOST_RUN).unwrap(); - let mut job_rows = jobs_needing_task_runs.query([cutoff, host_id]).unwrap(); - let mut jobs = Vec::new(); - - while let Some(row) = job_rows.next().unwrap() { - let (id, source, created_time, remote_id, commit_id, run_preferences) = row.try_into().unwrap(); - - jobs.push(Job { - id, source, created_time, remote_id, commit_id, run_preferences, - }); - } - - Ok(jobs) - } - - - pub fn remotes_by_repo(&self, repo_id: u64) -> Result<Vec<Remote>, String> { - let mut remotes: Vec<Remote> = Vec::new(); - - let conn = self.conn.lock().unwrap(); - let mut remotes_query = conn.prepare(crate::sql::REMOTES_FOR_REPO).unwrap(); - let mut remote_results = remotes_query.query([repo_id]).unwrap(); - - while let Some(row) = remote_results.next().unwrap() { - let (id, repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path) = row.try_into().unwrap(); - remotes.push(Remote { id, repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path }); - } - - Ok(remotes) - } - - /// try to find a host close to `host_info`, but maybe not an exact match. - /// - /// specifically, we'll ignore microcode and family/os - enough that measurements ought to be - /// comparable but maybe not perfectly so. - pub fn find_id_like_host(&self, host_info: &crate::protocol::HostInfo) -> Result<Option<u32>, String> { - self.conn.lock() - .unwrap() - .query_row( - "select id from hosts where \ - hostname=?1 and cpu_vendor_id=?2 and cpu_model_name=?3 and cpu_family=?4 and \ - cpu_model=?5 and cpu_max_freq_khz=?6 and cpu_cores=?7 and mem_total=?8 and \ - arch=?9);", - ( - &host_info.hostname, - &host_info.cpu_info.vendor_id, - &host_info.cpu_info.model_name, - &host_info.cpu_info.family, - &host_info.cpu_info.model, - &host_info.cpu_info.max_freq, - &host_info.cpu_info.cores, - &host_info.memory_info.total, - &host_info.env_info.arch, - ), - |row| { row.get(0) } - ) - .map_err(|e| e.to_string()) - } - - /// get an id for the host described by `host_info`. this may create a new record if no such - /// host exists. - pub fn id_for_host(&self, host_info: &crate::protocol::HostInfo) -> Result<u32, String> { - let conn = self.conn.lock().unwrap(); - - conn - .execute( - "insert or ignore into hosts \ - (\ - hostname, cpu_vendor_id, cpu_model_name, cpu_family, \ - cpu_model, cpu_microcode, cpu_max_freq_khz, cpu_cores, \ - mem_total, arch, family, os\ - ) values (\ - ?1, ?2, ?3, ?4, \ - ?5, ?6, ?7, ?8, \ - ?9, ?10, ?11, ?12 \ - );", - ( - &host_info.hostname, - &host_info.cpu_info.vendor_id, - &host_info.cpu_info.model_name, - &host_info.cpu_info.family, - &host_info.cpu_info.model, - &host_info.cpu_info.microcode, - &host_info.cpu_info.max_freq, - &host_info.cpu_info.cores, - &host_info.memory_info.total, - &host_info.env_info.arch, - &host_info.env_info.family, - &host_info.env_info.os, - ) - ) - .expect("can insert"); - - conn - .query_row( - "select id from hosts where \ - hostname=?1 and cpu_vendor_id=?2 and cpu_model_name=?3 and cpu_family=?4 and \ - cpu_model=?5 and cpu_microcode=?6 and cpu_max_freq_khz=?7 and \ - cpu_cores=?8 and mem_total=?9 and arch=?10 and family=?11 and os=?12;", - ( - &host_info.hostname, - &host_info.cpu_info.vendor_id, - &host_info.cpu_info.model_name, - &host_info.cpu_info.family, - &host_info.cpu_info.model, - &host_info.cpu_info.microcode, - &host_info.cpu_info.max_freq, - &host_info.cpu_info.cores, - &host_info.memory_info.total, - &host_info.env_info.arch, - &host_info.env_info.family, - &host_info.env_info.os, - ), - |row| { row.get(0) } - ) - .map_err(|e| e.to_string()) - } - - pub fn host_model_info(&self, host_id: u64) -> Result<(String, String, String, String, u64), String> { - let conn = self.conn.lock().unwrap(); - conn - .query_row("select hostname, cpu_vendor_id, cpu_family, cpu_model, cpu_max_freq_khz from hosts where id=?1;", [host_id], |row| { - Ok(( - row.get(0).unwrap(), - row.get(1).unwrap(), - row.get(2).unwrap(), - row.get(3).unwrap(), - row.get(4).unwrap(), - )) - }) - .map_err(|e| e.to_string()) - } - - pub fn runs_for_job_one_per_host(&self, job_id: u64) -> Result<Vec<Run>, String> { - let conn = self.conn.lock().unwrap(); - let mut runs_query = conn.prepare(crate::sql::RUNS_FOR_JOB).unwrap(); - let mut runs_results = runs_query.query([job_id]).unwrap(); - - let mut results = Vec::new(); - - while let Some(row) = runs_results.next().unwrap() { - results.push(crate::sql::row2run(row)); - } - - Ok(results) - } - - pub fn last_run_for_job(&self, job_id: u64) -> Result<Option<Run>, String> { - let conn = self.conn.lock().unwrap(); - - conn - .query_row(sql::LAST_RUN_FOR_JOB, [job_id], |row| { - Ok(crate::sql::row2run(row)) - }) - .optional() - .map_err(|e| e.to_string()) - } - - pub fn notifiers_by_repo(&self, repo_id: u64) -> Result<Vec<RemoteNotifier>, String> { - let remotes = self.remotes_by_repo(repo_id)?; - - let mut notifiers: Vec<RemoteNotifier> = Vec::new(); - - for remote in remotes.into_iter() { - match remote.remote_api.as_str() { - "github" => { - let mut notifier_path = self.config_path.clone(); - notifier_path.push(&remote.notifier_config_path); - - let notifier = RemoteNotifier { - remote_path: remote.remote_path, - notifier: NotifierConfig::github_from_file(¬ifier_path) - .expect("can load notifier config") - }; - notifiers.push(notifier); - }, - "email" => { - let mut notifier_path = self.config_path.clone(); - notifier_path.push(&remote.notifier_config_path); - - let notifier = RemoteNotifier { - remote_path: remote.remote_path, - notifier: NotifierConfig::email_from_file(¬ifier_path) - .expect("can load notifier config") - }; - notifiers.push(notifier); - } - other => { - eprintln!("unknown remote api kind: {:?}, remote is {:?}", other, &remote) - } - } - } - - Ok(notifiers) - } -} - diff --git a/src/io.rs b/src/io.rs deleted file mode 100644 index f9f407f..0000000 --- a/src/io.rs +++ /dev/null @@ -1,181 +0,0 @@ -use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; -use futures_util::StreamExt; -use tokio::fs::File; -use std::io::Write; -use tokio::fs::OpenOptions; -use std::task::{Poll, Context}; -use std::pin::Pin; -use std::time::{UNIX_EPOCH, SystemTime}; -use std::sync::{Arc, Mutex}; - -pub fn now_ms() -> u64 { - SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("now is later than epoch") - .as_millis() as u64 -} - -#[derive(Clone)] -pub struct VecSink { - body: Arc<Mutex<Vec<u8>>>, -} - -impl VecSink { - pub fn new() -> Self { - Self { body: Arc::new(Mutex::new(Vec::new())) } - } - - pub fn take_buf(&self) -> Vec<u8> { - std::mem::replace(&mut *self.body.lock().unwrap(), Vec::new()) - } -} - -impl tokio::io::AsyncWrite for VecSink { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context, - buf: &[u8] - ) -> Poll<Result<usize, std::io::Error>> { - self.body.lock().unwrap().extend_from_slice(buf); - Poll::Ready(Ok(buf.len())) - } - - fn poll_flush( - self: Pin<&mut Self>, - _cx: &mut Context - ) -> Poll<Result<(), std::io::Error>> { - Poll::Ready(Ok(())) - } - - fn poll_shutdown( - self: Pin<&mut Self>, - _cx: &mut Context - ) -> Poll<Result<(), std::io::Error>> { - Poll::Ready(Ok(())) - } -} - -pub struct ArtifactStream { - sender: hyper::body::Sender, -} - -impl ArtifactStream { - pub fn new(sender: hyper::body::Sender) -> Self { - Self { sender } - } -} - -impl tokio::io::AsyncWrite for ArtifactStream { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context, - buf: &[u8] - ) -> Poll<Result<usize, std::io::Error>> { - match self.get_mut().sender.try_send_data(buf.to_vec().into()) { - Ok(()) => { - Poll::Ready(Ok(buf.len())) - }, - _ => { - cx.waker().wake_by_ref(); - Poll::Pending - } - } - } - - fn poll_flush( - self: Pin<&mut Self>, - _cx: &mut Context - ) -> Poll<Result<(), std::io::Error>> { - Poll::Ready(Ok(())) - } - - fn poll_shutdown( - self: Pin<&mut Self>, - _cx: &mut Context - ) -> Poll<Result<(), std::io::Error>> { - Poll::Ready(Ok(())) - } -} - - -pub struct ArtifactDescriptor { - job_id: u64, - pub artifact_id: u64, - file: File, -} - -impl ArtifactDescriptor { - pub async fn new(job_id: u64, artifact_id: u64) -> Result<Self, String> { - // TODO: jobs should be a configurable path - let path = format!("artifacts/{}/{}", job_id, artifact_id); - let file = OpenOptions::new() - .read(true) - .write(true) - .create_new(true) - .open(&path) - .await - .map_err(|e| format!("couldn't open artifact file {}: {}", path, e))?; - - Ok(ArtifactDescriptor { - job_id, - artifact_id, - file, - }) - } - - pub async fn store_all(&mut self, mut data: axum::extract::BodyStream) -> Result<(), String> { - loop { - let chunk = data.next().await; - - let chunk = match chunk { - Some(Ok(chunk)) => chunk, - Some(Err(e)) => { - eprintln!("error: {:?}", e); - return Err(format!("error reading: {:?}", e)); - } - None => { - eprintln!("body done?"); - return Ok(()); - } - }; - - let chunk = chunk.as_ref(); - - self.file.write_all(chunk).await - .map_err(|e| format!("failed to write: {:?}", e))?; - } - } -} - -pub async fn forward_data(source: &mut (impl AsyncRead + Unpin), dest: &mut (impl AsyncWrite + Unpin)) -> Result<(), String> { - let mut buf = vec![0; 1024 * 1024]; - loop { - let n_read = source.read(&mut buf).await - .map_err(|e| format!("failed to read: {:?}", e))?; - - if n_read == 0 { - eprintln!("done reading!"); - return Ok(()); - } - - dest.write_all(&buf[..n_read]).await - .map_err(|e| format!("failed to write: {:?}", e))?; - } -} -/* -pub async fn forward_data(source: &mut (impl AsyncRead + Unpin), dest: &mut ArtifactStream) -> Result<(), String> { - let mut buf = vec![0; 1024 * 1024]; - loop { - let n_read = source.read(&mut buf).await - .map_err(|e| format!("failed to read: {:?}", e))?; - - if n_read == 0 { - eprintln!("done reading!"); - return Ok(()); - } - - dest.sender.send_data(buf[..n_read].to_vec().into()).await - .map_err(|e| format!("failed to write: {:?}", e))?; - } -} -*/ diff --git a/src/lua/mod.rs b/src/lua/mod.rs deleted file mode 100644 index 6c4a281..0000000 --- a/src/lua/mod.rs +++ /dev/null @@ -1,411 +0,0 @@ -use crate::RunningJob; - -use rlua::prelude::*; - -use std::sync::{Arc, Mutex}; -use std::path::PathBuf; - -pub const DEFAULT_RUST_GOODFILE: &'static [u8] = include_bytes!("../../config/goodfiles/rust.lua"); - -pub struct BuildEnv { - lua: Lua, - job: Arc<Mutex<RunningJob>>, -} - -#[derive(Debug)] -pub struct RunParams { - step: Option<String>, - name: Option<String>, - cwd: Option<String>, -} - -pub struct CommandOutput { - pub exit_status: std::process::ExitStatus, - pub stdout: Vec<u8>, - pub stderr: Vec<u8>, -} - -mod lua_exports { - use crate::RunningJob; - use crate::lua::{CommandOutput, RunParams}; - - use std::sync::{Arc, Mutex}; - use std::path::PathBuf; - - use rlua::prelude::*; - - pub fn collect_build_args(command: LuaValue, params: LuaValue) -> Result<(Vec<String>, RunParams), rlua::Error> { - let args = match command { - LuaValue::Table(table) => { - let len = table.len().expect("command table has a length"); - let mut command_args = Vec::new(); - for i in 0..len { - let value = table.get(i + 1).expect("command arg is gettble"); - match value { - LuaValue::String(s) => { - command_args.push(s.to_str().unwrap().to_owned()); - }, - other => { - return Err(LuaError::RuntimeError(format!("argument {} was not a string, was {:?}", i, other))); - } - }; - } - - command_args - }, - other => { - return Err(LuaError::RuntimeError(format!("argument 1 was not a table: {:?}", other))); - } - }; - - let params = match params { - LuaValue::Table(table) => { - let step = match table.get("step").expect("can get from table") { - LuaValue::String(v) => { - Some(v.to_str()?.to_owned()) - }, - LuaValue::Nil => { - None - }, - other => { - return Err(LuaError::RuntimeError(format!("params[\"step\"] must be a string"))); - } - }; - let name = match table.get("name").expect("can get from table") { - LuaValue::String(v) => { - Some(v.to_str()?.to_owned()) - }, - LuaValue::Nil => { - None - }, - other => { - return Err(LuaError::RuntimeError(format!("params[\"name\"] must be a string"))); - } - }; - let cwd = match table.get("cwd").expect("can get from table") { - LuaValue::String(v) => { - Some(v.to_str()?.to_owned()) - }, - LuaValue::Nil => { - None - }, - other => { - return Err(LuaError::RuntimeError(format!("params[\"cwd\"] must be a string"))); - } - }; - - RunParams { - step, - name, - cwd, - } - }, - LuaValue::Nil => { - RunParams { - step: None, - name: None, - cwd: None, - } - } - other => { - return Err(LuaError::RuntimeError(format!("argument 2 was not a table: {:?}", other))); - } - }; - - Ok((args, params)) - } - - pub fn build_command_impl(command: LuaValue, params: LuaValue, job_ctx: Arc<Mutex<RunningJob>>) -> Result<(), rlua::Error> { - let (args, params) = collect_build_args(command, params)?; - eprintln!("args: {:?}", args); - eprintln!(" params: {:?}", params); - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); - rt.block_on(async move { - job_ctx.lock().unwrap().run_command(&args, params.cwd.as_ref().map(|x| x.as_str())).await - .map_err(|e| LuaError::RuntimeError(format!("run_command error: {:?}", e))) - }) - } - - pub fn check_output_impl<'lua>(ctx: rlua::Context<'lua>, command: LuaValue<'lua>, params: LuaValue<'lua>, job_ctx: Arc<Mutex<RunningJob>>) -> Result<rlua::Table<'lua>, rlua::Error> { - let (args, params) = collect_build_args(command, params)?; - eprintln!("args: {:?}", args); - eprintln!(" params: {:?}", params); - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); - let command_output = rt.block_on(async move { - job_ctx.lock().unwrap().run_with_output(&args, params.cwd.as_ref().map(|x| x.as_str())).await - .map_err(|e| LuaError::RuntimeError(format!("run_command error: {:?}", e))) - })?; - - let stdout = ctx.create_string(command_output.stdout.as_slice())?; - let stderr = ctx.create_string(command_output.stderr.as_slice())?; - - let result = ctx.create_table()?; - result.set("stdout", stdout)?; - result.set("stderr", stderr)?; - result.set("status", command_output.exit_status.code())?; - Ok(result) - } - - pub fn check_dependencies(commands: Vec<String>) -> Result<(), rlua::Error> { - let mut missing_deps = Vec::new(); - for command in commands.iter() { - if !has_cmd(command)? { - missing_deps.push(command.clone()); - } - } - - if missing_deps.len() > 0 { - return Err(LuaError::RuntimeError(format!("missing dependencies: {}", missing_deps.join(", ")))); - } - - Ok(()) - } - - pub fn metric(name: String, value: String, job_ctx: Arc<Mutex<RunningJob>>) -> Result<(), rlua::Error> { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); - rt.block_on(async move { - job_ctx.lock().unwrap().send_metric(&name, value).await - .map_err(|e| LuaError::RuntimeError(format!("send_metric error: {:?}", e))) - }) - } - - pub fn artifact(path: String, name: Option<String>, job_ctx: Arc<Mutex<RunningJob>>) -> Result<(), rlua::Error> { - let path: PathBuf = path.into(); - - let default_name: String = match (path.file_name(), path.parent()) { - (Some(name), _) => name - .to_str() - .ok_or(LuaError::RuntimeError("artifact name is not a unicode string".to_string()))? - .to_string(), - (_, Some(parent)) => format!("{}", parent.display()), - (None, None) => { - // one day using directories for artifacts might work but / is still not going - // to be accepted - return Err(LuaError::RuntimeError(format!("cannot infer a default path name for {}", path.display()))); - } - }; - - let name: String = match name { - Some(name) => name, - None => default_name, - }; - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); - rt.block_on(async move { - let mut artifact = job_ctx.lock().unwrap().create_artifact(&name, &format!("{} (from {})", name, path.display())).await - .map_err(|e| LuaError::RuntimeError(format!("create_artifact error: {:?}", e))) - .unwrap(); - let mut file = tokio::fs::File::open(&format!("tmpdir/{}", path.display())).await.unwrap(); - eprintln!("uploading..."); - crate::io::forward_data(&mut file, &mut artifact).await - .map_err(|e| LuaError::RuntimeError(format!("failed uploading data for {}: {:?}", name, e)))?; - std::mem::drop(artifact); - Ok(()) - }) - } - - pub fn has_cmd(name: &str) -> Result<bool, rlua::Error> { - Ok(std::process::Command::new("which") - .arg(name) - .status() - .map_err(|e| LuaError::RuntimeError(format!("could not fork which? {:?}", e)))? - .success()) - } - - pub fn file_size(path: &str) -> Result<u64, rlua::Error> { - Ok(std::fs::metadata(&format!("tmpdir/{}", path)) - .map_err(|e| LuaError::RuntimeError(format!("could not stat {:?}", path)))? - .len()) - } - - pub mod step { - use crate::RunningJob; - use std::sync::{Arc, Mutex}; - - pub fn start(job_ref: Arc<Mutex<RunningJob>>, name: String) -> Result<(), rlua::Error> { - let mut job = job_ref.lock().unwrap(); - job.current_step.clear(); - job.current_step.push(name); - Ok(()) - } - - pub fn push(job_ref: Arc<Mutex<RunningJob>>, name: String) -> Result<(), rlua::Error> { - let mut job = job_ref.lock().unwrap(); - job.current_step.push(name); - Ok(()) - } - - pub fn advance(job_ref: Arc<Mutex<RunningJob>>, name: String) -> Result<(), rlua::Error> { - let mut job = job_ref.lock().unwrap(); - job.current_step.pop(); - job.current_step.push(name); - Ok(()) - } - } -} - -struct DeclEnv<'lua, 'env> { - lua_ctx: &'env rlua::Context<'lua>, - job_ref: &'env Arc<Mutex<RunningJob>>, -} -impl<'lua, 'env> DeclEnv<'lua, 'env> { - fn create_function<A, R, F>(&self, name: &str, f: F) -> Result<rlua::Function<'lua>, String> - where - A: FromLuaMulti<'lua>, - R: ToLuaMulti<'lua>, - F: 'static + Send + Fn(rlua::Context<'lua>, Arc<Mutex<RunningJob>>, A) -> Result<R, rlua::Error> { - - let job_ref = Arc::clone(self.job_ref); - self.lua_ctx.create_function(move |ctx, args| { - let job_ref = Arc::clone(&job_ref); - f(ctx, job_ref, args) - }) - .map_err(|e| format!("problem defining {} function: {:?}", name, e)) - } -} - -impl BuildEnv { - pub fn new(job: &Arc<Mutex<RunningJob>>) -> Self { - let env = BuildEnv { - lua: Lua::new(), - job: Arc::clone(job), - }; - env.lua.context(|lua_ctx| { - env.define_env(lua_ctx) - }).expect("can define context"); - env - } - - fn define_env(&self, lua_ctx: rlua::Context) -> Result<(), String> { - let decl_env = DeclEnv { - lua_ctx: &lua_ctx, - job_ref: &self.job, - }; - - let hello = decl_env.create_function("hello", |_, _, ()| { - eprintln!("hello from lua!!!"); - Ok(()) - })?; - - let check_dependencies = decl_env.create_function("dependencies", move |_, job_ref, commands: Vec<String>| { - lua_exports::check_dependencies(commands) - })?; - - let build = decl_env.create_function("build", move |_, job_ref, (command, params): (LuaValue, LuaValue)| { - lua_exports::build_command_impl(command, params, job_ref) - })?; - - let check_output = decl_env.create_function("check_output", move |ctx, job_ref, (command, params): (LuaValue, LuaValue)| { - lua_exports::check_output_impl(ctx, command, params, job_ref) - })?; - - let metric = decl_env.create_function("metric", move |_, job_ref, (name, value): (String, String)| { - lua_exports::metric(name, value, job_ref) - })?; - - let now_ms = decl_env.create_function("now_ms", move |_, job_ref, ()| Ok(crate::io::now_ms()))?; - - let artifact = decl_env.create_function("artifact", move |_, job_ref, (path, name): (String, Option<String>)| { - lua_exports::artifact(path, name, job_ref) - })?; - - let error = decl_env.create_function("error", move |_, job_ref, msg: String| { - Err::<(), LuaError>(LuaError::RuntimeError(format!("explicit error: {}", msg))) - })?; - - let path_has_cmd = decl_env.create_function("path_has_cmd", move |_, job_ref, name: String| { - lua_exports::has_cmd(&name) - })?; - - let size_of_file = decl_env.create_function("size_of_file", move |_, job_ref, name: String| { - lua_exports::file_size(&name) - })?; - - let native_rust_triple = match std::env::consts::ARCH { - "x86_64" => "x86_64-unknown-linux-gnu", - "aarch64" => "aarch64-unknown-linux-gnu", - other => { panic!("dunno native rust triple for arch {}", other); } - }; - let native_rust_triple = lua_ctx.create_string(native_rust_triple).unwrap(); - let build_env_vars = lua_ctx.create_table_from( - vec![ - ("native_rust_triple", native_rust_triple) - ] - ).unwrap(); - - let build_environment = lua_ctx.create_table_from( - vec![ - ("has", path_has_cmd), - ("size", size_of_file), - ] - ).unwrap(); - build_environment.set("vars", build_env_vars).unwrap(); - - let build_functions = lua_ctx.create_table_from( - vec![ - ("hello", hello), - ("run", build), - ("dependencies", check_dependencies), - ("metric", metric), - ("error", error), - ("artifact", artifact), - ("now_ms", now_ms), - ("check_output", check_output), - ] - ).unwrap(); - build_functions.set("environment", build_environment).unwrap(); - let current_commit = self.job.lock().unwrap().job.commit.clone(); - build_functions.set("sha", lua_ctx.create_string(current_commit.as_bytes()).unwrap()).unwrap(); - let globals = lua_ctx.globals(); - globals.set("Build", build_functions).unwrap(); - - - let step_start = decl_env.create_function("step_start", move |_, job_ref, name: String| { - lua_exports::step::start(job_ref, name) - })?; - - let step_push = decl_env.create_function("step_push", move |_, job_ref, name: String| { - lua_exports::step::push(job_ref, name) - })?; - - let step_advance = decl_env.create_function("step_advance", move |_, job_ref, name: String| { - lua_exports::step::advance(job_ref, name) - })?; - - let step_functions = lua_ctx.create_table_from( - vec![ - ("start", step_start), - ("push", step_push), - ("advance", step_advance), - ] - ).unwrap(); - globals.set("Step", step_functions).unwrap(); - Ok(()) - } - - pub async fn run_build(self, script: &[u8]) -> Result<(), LuaError> { - let script = script.to_vec(); - let res: Result<(), LuaError> = tokio::task::spawn_blocking(|| { - std::thread::spawn(move || { - self.lua.context(|lua_ctx| { - lua_ctx.load(&script) - .set_name("goodfile")? - .exec() - }) - }).join().unwrap() - }).await.unwrap(); - eprintln!("lua res: {:?}", res); - res - } -} diff --git a/src/main.rs b/src/main.rs deleted file mode 100644 index d11df1f..0000000 --- a/src/main.rs +++ /dev/null @@ -1,1092 +0,0 @@ -#![allow(dead_code)] -#![allow(unused_variables)] -#![allow(unused_imports)] - -use chrono::{Utc, TimeZone}; -use lazy_static::lazy_static; -use std::sync::RwLock; -use std::collections::HashMap; -use serde_derive::{Deserialize, Serialize}; -use tokio::spawn; -use std::path::PathBuf; -use axum_server::tls_rustls::RustlsConfig; -use axum::routing::*; -use axum::Router; -use axum::response::{IntoResponse, Response, Html}; -use std::net::SocketAddr; -use axum::extract::{Path, State}; -use http_body::combinators::UnsyncBoxBody; -use axum::{Error, Json}; -use axum::extract::rejection::JsonRejection; -use axum::body::Bytes; -use axum::http::{StatusCode, Uri}; -use http::header::HeaderMap; -use tokio::sync::mpsc; -use tokio_stream::wrappers::ReceiverStream; -use axum::body::StreamBody; - -use std::sync::Arc; -use std::time::{SystemTime, UNIX_EPOCH}; - -use hmac::{Hmac, Mac}; -use sha2::Sha256; - -mod io; -mod sql; -mod notifier; -mod dbctx; -mod protocol; - -use sql::RunState; - -use dbctx::{DbCtx, Job, Run, ArtifactRecord}; - -use rusqlite::OptionalExtension; - -#[derive(Serialize, Deserialize)] -struct WebserverConfig { - psks: Vec<GithubPsk>, - jobs_path: PathBuf, - config_path: PathBuf, - db_path: PathBuf, - debug_addr: Option<serde_json::Value>, - server_addr: Option<serde_json::Value>, -} - -#[derive(Clone)] -struct WebserverState { - jobs_path: PathBuf, - dbctx: Arc<DbCtx>, -} - -#[derive(Clone, Serialize, Deserialize)] -struct GithubPsk { - key: String, - gh_user: String, -} - -lazy_static! { - static ref PSKS: RwLock<Vec<GithubPsk>> = RwLock::new(Vec::new()); -} - -#[derive(Copy, Clone, Debug)] -enum GithubHookError { - BodyNotObject, - MissingElement { path: &'static str }, - BadType { path: &'static str, expected: &'static str }, -} - -#[derive(Debug)] -enum GithubEvent { - Push { tip: String, repo_name: String, head_commit: serde_json::Map<String, serde_json::Value>, pusher: serde_json::Map<String, serde_json::Value> }, - Other {} -} - -/// return a duration rendered as the largest two non-zero units. -/// -/// 60000ms -> 1m -/// 60001ms -> 1m -/// 61000ms -> 1m1s -/// 1030ms -> 1.03s -fn duration_as_human_string(duration_ms: u64) -> String { - let duration_sec = duration_ms / 1000; - let duration_min = duration_sec / 60; - let duration_hours = duration_min / 60; - - let duration_ms = duration_ms % 1000; - let duration_sec = duration_sec % 60; - let duration_min = duration_min % 60; - // no need to clamp hours, we're gonna just hope that it's a reasonable number of hours - - if duration_hours != 0 { - let mut res = format!("{}h", duration_hours); - if duration_min != 0 { - res.push_str(&format!("{}m", duration_min)); - } - res - } else if duration_min != 0 { - let mut res = format!("{}m", duration_min); - if duration_min != 0 { - res.push_str(&format!("{}s", duration_sec)); - } - res - } else { - let mut res = format!("{}", duration_sec); - if duration_ms != 0 { - res.push_str(&format!(".{:03}", duration_ms)); - } - res.push('s'); - res - } -} - -/// try producing a url for whatever caused this job to be started, if possible -fn commit_url(job: &Job, commit_sha: &str, ctx: &Arc<DbCtx>) -> Option<String> { - let remote = ctx.remote_by_id(job.remote_id).expect("query succeeds").expect("existing job references existing remote"); - - match remote.remote_api.as_str() { - "github" => { - Some(format!("{}/commit/{}", remote.remote_url, commit_sha)) - }, - "email" => { - None - }, - _ => { - None - } - } -} - -/// produce a url to the ci.butactuallyin.space job details page -fn job_url(job: &Job, commit_sha: &str, ctx: &Arc<DbCtx>) -> String { - let remote = ctx.remote_by_id(job.remote_id).expect("query succeeds").expect("existing job references existing remote"); - - if remote.remote_api != "github" { - eprintln!("job url for remote type {} can't be constructed, i think", &remote.remote_api); - } - - format!("{}/{}", &remote.remote_path, commit_sha) -} - -/// render how long a run took, or is taking, in a human-friendly way -fn display_run_time(run: &Run) -> String { - if let Some(start_time) = run.start_time { - if let Some(complete_time) = run.complete_time { - if complete_time < start_time { - if run.state == RunState::Started { - // this run has been restarted. the completed time is stale. - // further, this is a currently active run. - let now_ms = SystemTime::now().duration_since(UNIX_EPOCH).expect("now is after then").as_millis() as u64; - let mut duration = duration_as_human_string(now_ms - start_time); - duration.push_str(" (ongoing)"); - duration - } else { - "invalid data".to_string() - } - } else { - let duration_ms = complete_time - start_time; - let duration = duration_as_human_string(duration_ms); - duration - } - } else { - if run.state != RunState::Invalid { - let now_ms = SystemTime::now().duration_since(UNIX_EPOCH).expect("now is after then").as_millis() as u64; - let mut duration = duration_as_human_string(now_ms - start_time); - duration.push_str(" (ongoing)"); - duration - } else { - "n/a".to_string() - } - } - } else { - "not yet run".to_owned() - } -} - -fn parse_push_event(body: serde_json::Value) -> Result<GithubEvent, GithubHookError> { - let body = body.as_object() - .ok_or(GithubHookError::BodyNotObject)?; - - let tip = body.get("after") - .ok_or(GithubHookError::MissingElement { path: "after" })? - .as_str() - .ok_or(GithubHookError::BadType { path: "after", expected: "str" })? - .to_owned(); - - let repo_name = body.get("repository") - .ok_or(GithubHookError::MissingElement { path: "repository" })? - .as_object() - .ok_or(GithubHookError::BadType { path: "repository", expected: "obj" })? - .get("full_name") - .ok_or(GithubHookError::MissingElement { path: "repository/full_name" })? - .as_str() - .ok_or(GithubHookError::BadType { path: "repository/full_name", expected: "str" })? - .to_owned(); - - let head_commit = body.get("head_commit") - .ok_or(GithubHookError::MissingElement { path: "head_commit" })? - .as_object() - .ok_or(GithubHookError::BadType { path: "head_commit", expected: "obj" })? - .to_owned(); - - let pusher = body.get("pusher") - .ok_or(GithubHookError::MissingElement { path: "pusher" })? - .as_object() - .ok_or(GithubHookError::BadType { path: "pusher", expected: "obj" })? - .to_owned(); - - Ok(GithubEvent::Push { tip, repo_name, head_commit, pusher }) -} - -async fn process_push_event(ctx: Arc<DbCtx>, owner: String, repo: String, event: GithubEvent) -> impl IntoResponse { - let (sha, repo, head_commit, pusher) = if let GithubEvent::Push { tip, repo_name, head_commit, pusher } = event { - (tip, repo_name, head_commit, pusher) - } else { - panic!("process push event on non-push event"); - }; - - println!("handling push event to {}/{}: sha {} in repo {}, {:?}\n pusher: {:?}", owner, repo, sha, repo, head_commit, pusher); - - // push event is in terms of a ref, but we don't know if it's a new commit (yet). - // in terms of CI jobs, we care mainly about new commits. - // so... - // * look up the commit, - // * if it known, bail out (new ref for existing commit we've already handled some way) - // * create a new commit ref - // * create a new job (state=pending) for the commit ref - let commit_id: Option<u64> = ctx.conn.lock().unwrap() - .query_row(sql::COMMIT_TO_ID, [sha.clone()], |row| row.get(0)) - .optional() - .expect("can run query"); - - if commit_id.is_some() { - eprintln!("commit already exists"); - return (StatusCode::OK, String::new()); - } - - let remote_url = format!("https://www.github.com/{}.git", repo); - eprintln!("looking for remote url: {}", remote_url); - let (remote_id, repo_id): (u64, u64) = match ctx.conn.lock().unwrap() - .query_row("select id, repo_id from remotes where remote_git_url=?1;", [&remote_url], |row| Ok((row.get(0).unwrap(), row.get(1).unwrap()))) - .optional() - .unwrap() { - Some(elems) => elems, - None => { - eprintln!("no remote registered for url {} (repo {})", remote_url, repo); - return (StatusCode::NOT_FOUND, String::new()); - } - }; - - let repo_default_run_pref: Option<String> = ctx.conn.lock().unwrap() - .query_row("select default_run_preference from repos where id=?1;", [repo_id], |row| { - Ok((row.get(0)).unwrap()) - }) - .expect("can query"); - - let pusher_email = pusher - .get("email") - .expect("has email") - .as_str() - .expect("is str"); - - let job_id = ctx.new_job(remote_id, &sha, Some(pusher_email), repo_default_run_pref).unwrap(); - let _ = ctx.new_run(job_id, None).unwrap(); - - let notifiers = ctx.notifiers_by_repo(repo_id).expect("can get notifiers"); - - for notifier in notifiers { - notifier.tell_pending_job(&ctx, repo_id, &sha, job_id).await.expect("can notify"); - } - - (StatusCode::OK, String::new()) -} - -async fn handle_github_event(ctx: Arc<DbCtx>, owner: String, repo: String, event_kind: String, body: serde_json::Value) -> Response<UnsyncBoxBody<Bytes, Error>> { - println!("got github event: {}, {}, {}", owner, repo, event_kind); - match event_kind.as_str() { - "push" => { - let push_event = parse_push_event(body) - .map_err(|e| { - eprintln!("TODO: handle push event error: {:?}", e); - panic!() - }) - .expect("parse works"); - let res = process_push_event(ctx, owner, repo, push_event).await; - "ok".into_response() - }, - "status" => { - eprintln!("[.] status update"); - "ok".into_response() - } - other => { - eprintln!("unhandled event kind: {}, repo {}/{}. content: {:?}", other, owner, repo, body); - "".into_response() - } - } -} - -async fn handle_ci_index(State(ctx): State<WebserverState>) -> impl IntoResponse { - eprintln!("root index"); - let repos = match ctx.dbctx.get_repos() { - Ok(repos) => repos, - Err(e) => { - eprintln!("failed to get repos: {:?}", e); - return (StatusCode::INTERNAL_SERVER_ERROR, Html("gonna feel that one tomorrow".to_string())); - } - }; - - let mut response = String::new(); - - response.push_str("<html>\n"); - response.push_str("<style>\n"); - response.push_str(".build-table { font-family: monospace; border: 1px solid black; border-collapse: collapse; }\n"); - response.push_str(".row-item { padding-left: 4px; padding-right: 4px; border-right: 1px solid black; }\n"); - response.push_str(".odd-row { background: #eee; }\n"); - response.push_str(".even-row { background: #ddd; }\n"); - response.push_str("</style>\n"); - response.push_str("<h1>builds and build accessories</h1>\n"); - - match repos.len() { - 0 => { response.push_str(&format!("<p>no repos configured, so there are no builds</p>\n")); }, - 1 => { response.push_str("<p>1 repo configured</p>\n"); }, - other => { response.push_str(&format!("<p>{} repos configured</p>\n", other)); }, - } - - response.push_str("<table class='build-table'>"); - response.push_str("<tr>\n"); - let headings = ["repo", "last build", "job", "build commit", "duration", "status", "result"]; - for heading in headings { - response.push_str(&format!("<th class='row-item'>{}</th>", heading)); - } - response.push_str("</tr>\n"); - - let mut row_num = 0; - - for repo in repos { - let mut most_recent_run: Option<(Job, Run)> = None; - - for remote in ctx.dbctx.remotes_by_repo(repo.id).expect("remotes by repo works") { - let last_job = ctx.dbctx.last_job_from_remote(remote.id).expect("job by remote works"); - if let Some(last_job) = last_job { - if let Some(last_run) = ctx.dbctx.last_run_for_job(last_job.id).expect("can query") { - if most_recent_run.as_ref().map(|run| run.1.create_time < last_run.create_time).unwrap_or(true) { - most_recent_run = Some((last_job, last_run)); - } - } - } - } - - let repo_html = format!("<a href=\"/{}\">{}</a>", &repo.name, &repo.name); - - let row_html: String = match most_recent_run { - Some((job, run)) => { - let job_commit = ctx.dbctx.commit_sha(job.commit_id).expect("job has a commit"); - let commit_html = match commit_url(&job, &job_commit, &ctx.dbctx) { - Some(url) => format!("<a href=\"{}\">{}</a>", url, &job_commit), - None => job_commit.clone() - }; - - let job_html = format!("<a href=\"{}\">{}</a>", job_url(&job, &job_commit, &ctx.dbctx), job.id); - - let last_build_time = Utc.timestamp_millis_opt(run.create_time as i64).unwrap().to_rfc2822(); - let duration = display_run_time(&run); - - let status = format!("{:?}", run.state).to_lowercase(); - - let result = match run.build_result { - Some(0) => "<span style='color:green;'>pass</span>", - Some(_) => "<span style='color:red;'>fail</span>", - None => match run.state { - RunState::Pending => { "unstarted" }, - RunState::Started => { "<span style='color:darkgoldenrod;'>in progress</span>" }, - _ => { "<span style='color:red;'>unreported</span>" } - } - }; - - let entries = [repo_html.as_str(), last_build_time.as_str(), job_html.as_str(), commit_html.as_str(), &duration, &status, &result]; - let entries = entries.iter().chain(std::iter::repeat(&"")).take(headings.len()); - - let mut row_html = String::new(); - for entry in entries { - row_html.push_str(&format!("<td class='row-item'>{}</td>", entry)); - } - row_html - } - None => { - let entries = [repo_html.as_str()]; - let entries = entries.iter().chain(std::iter::repeat(&"")).take(headings.len()); - - let mut row_html = String::new(); - for entry in entries { - row_html.push_str(&format!("<td class='row-item'>{}</td>", entry)); - } - row_html - } - }; - - let row_index = row_num % 2; - response.push_str(&format!("<tr class=\"{}\">", ["even-row", "odd-row"][row_index])); - response.push_str(&row_html); - response.push_str("</tr>"); - response.push('\n'); - - row_num += 1; - } - response.push_str("</table>"); - - response.push_str("<h4>active tasks</h4>\n"); - - let runs = ctx.dbctx.get_active_runs().expect("can query"); - if runs.len() == 0 { - response.push_str("<p>(none)</p>\n"); - } else { - response.push_str("<table class='build-table'>"); - response.push_str("<tr>\n"); - let headings = ["repo", "last build", "job", "build commit", "duration", "status", "result"]; - for heading in headings { - response.push_str(&format!("<th class='row-item'>{}</th>", heading)); - } - response.push_str("</tr>\n"); - - let mut row_num = 0; - - for run in runs.iter() { - let row_index = row_num % 2; - - let job = ctx.dbctx.job_by_id(run.job_id).expect("query succeeds").expect("job id is valid"); - let remote = ctx.dbctx.remote_by_id(job.remote_id).expect("query succeeds").expect("remote id is valid"); - let repo = ctx.dbctx.repo_by_id(remote.repo_id).expect("query succeeds").expect("repo id is valid"); - - let repo_html = format!("<a href=\"/{}\">{}</a>", &repo.name, &repo.name); - - let job_commit = ctx.dbctx.commit_sha(job.commit_id).expect("job has a commit"); - let commit_html = match commit_url(&job, &job_commit, &ctx.dbctx) { - Some(url) => format!("<a href=\"{}\">{}</a>", url, &job_commit), - None => job_commit.clone() - }; - - let job_html = format!("<a href=\"{}\">{}</a>", job_url(&job, &job_commit, &ctx.dbctx), job.id); - - let last_build_time = Utc.timestamp_millis_opt(run.create_time as i64).unwrap().to_rfc2822(); - let duration = display_run_time(&run); - - let status = format!("{:?}", run.state).to_lowercase(); - - let result = match run.build_result { - Some(0) => "<span style='color:green;'>pass</span>", - Some(_) => "<span style='color:red;'>fail</span>", - None => match run.state { - RunState::Pending => { "unstarted" }, - RunState::Started => { "<span style='color:darkgoldenrod;'>in progress</span>" }, - _ => { "<span style='color:red;'>unreported</span>" } - } - }; - - let entries = [repo_html.as_str(), last_build_time.as_str(), job_html.as_str(), commit_html.as_str(), &duration, &status, &result]; - let entries = entries.iter().chain(std::iter::repeat(&"")).take(headings.len()); - - let mut row_html = String::new(); - for entry in entries { - row_html.push_str(&format!("<td class='row-item'>{}</td>", entry)); - } - - - response.push_str(&format!("<tr class=\"{}\">", ["even-row", "odd-row"][row_index])); - response.push_str(&row_html); - response.push_str("</tr>"); - response.push('\n'); - - row_num += 1; - } - - response.push_str("</table>\n"); - } - - response.push_str("</html>"); - - (StatusCode::OK, Html(response)) -} - -async fn handle_commit_status(Path(path): Path<(String, String, String)>, State(ctx): State<WebserverState>) -> impl IntoResponse { - eprintln!("path: {}/{}, sha {}", path.0, path.1, path.2); - let remote_path = format!("{}/{}", path.0, path.1); - let sha = path.2; - - let (commit_id, sha): (u64, String) = if sha.len() >= 7 { - match ctx.dbctx.conn.lock().unwrap() - .query_row("select id, sha from commits where sha like ?1;", [&format!("{}%", sha)], |row| Ok((row.get_unwrap(0), row.get_unwrap(1)))) - .optional() - .expect("can query") { - Some((commit_id, sha)) => (commit_id, sha), - None => { - return (StatusCode::NOT_FOUND, Html("<html><body>no such commit</body></html>".to_string())); - } - } - } else { - return (StatusCode::NOT_FOUND, Html("<html><body>no such commit</body></html>".to_string())); - }; - - let short_sha = &sha[0..9]; - - let (remote_id, repo_id): (u64, u64) = ctx.dbctx.conn.lock().unwrap() - .query_row("select id, repo_id from remotes where remote_path=?1;", [&remote_path], |row| Ok((row.get_unwrap(0), row.get_unwrap(1)))) - .expect("can query"); - - let job = ctx.dbctx.job_by_commit_id(commit_id).expect("can query").expect("job exists"); - - let run = ctx.dbctx.last_run_for_job(job.id).expect("can query").expect("run exists"); - - let complete_time = run.complete_time.unwrap_or_else(crate::io::now_ms); - - let (status_elem, status_desc) = match run.state { - RunState::Pending | RunState::Started => { - ("<span style='color:#660;'>pending</span>", "⌛in progress") - }, - RunState::Finished => { - if let Some(build_result) = run.build_result { - if build_result == 0 { - ("<span style='color:green;'>pass</span>", "✅ passed") - } else { - ("<span style='color:red;'>failed</span>", "❌ failed") - } - } else { - eprintln!("run {} for commit {} is missing a build result but is reportedly finished (old data)?", run.id, commit_id); - ("<span style='color:red;'>unreported</span>", "❔ missing status") - } - }, - RunState::Error => { - ("<span style='color:red;'>error</span>", "🧯 error, uncompleted") - } - RunState::Invalid => { - ("<span style='color:red;'>(server error)</span>", "dude even i don't know") - } - }; - let debug_info = run.state == RunState::Finished && run.build_result == Some(1) || run.state == RunState::Error; - - let repo_name: String = ctx.dbctx.conn.lock().unwrap() - .query_row("select repo_name from repos where id=?1;", [repo_id], |row| row.get(0)) - .expect("can query"); - - let deployed = false; - - let mut head = String::new(); - head.push_str("<head>"); - head.push_str(&format!("<title>ci.butactuallyin.space - {}</title>", repo_name)); - let include_og_tags = true; - if include_og_tags { - head.push_str("\n"); - head.push_str(&format!("<meta property=\"og:type\" content=\"website\">\n")); - head.push_str(&format!("<meta property=\"og:site_name\" content=\"ci.butactuallyin.space\">\n")); - head.push_str(&format!("<meta property=\"og:url\" content=\"/{}/{}/{}\">\n", &path.0, &path.1, &sha)); - head.push_str(&format!("<meta property=\"og:title\" contents=\"{}/{} commit {}\">", &path.0, &path.1, &short_sha)); - let build_og_description = format!("commit {} of {}/{}, {} after {}", - short_sha, - path.0, path.1, - status_desc, - display_run_time(&run) - ); - head.push_str(&format!("<meta name=\"description\" content=\"{}\"\n>", build_og_description)); - head.push_str(&format!("<meta property=\"og:description\" content=\"{}\"\n>", build_og_description)); - } - head.push_str("</head>\n"); - let repo_html = format!("<a href=\"/{}\">{}</a>", &repo_name, &repo_name); - let remote_commit_elem = format!("<a href=\"https://www.github.com/{}/commit/{}\">{}</a>", &remote_path, &sha, &sha); - - let mut artifacts_fragment = String::new(); - let mut artifacts: Vec<ArtifactRecord> = ctx.dbctx.artifacts_for_run(run.id, None).unwrap() - .into_iter() // HACK: filter out artifacts for previous runs of a run. artifacts should be attached to a run, runs should be distinct from run. but i'm sleepy. - .filter(|artifact| artifact.created_time >= run.start_time.unwrap_or_else(crate::io::now_ms)) - .collect(); - - artifacts.sort_by_key(|artifact| artifact.created_time); - - fn diff_times(run_completed: u64, artifact_completed: Option<u64>) -> u64 { - let artifact_completed = artifact_completed.unwrap_or_else(crate::io::now_ms); - let run_completed = std::cmp::max(run_completed, artifact_completed); - run_completed - artifact_completed - } - - let recent_artifacts: Vec<ArtifactRecord> = artifacts.iter().filter(|artifact| diff_times(complete_time, artifact.completed_time) <= 60_000).cloned().collect(); - let old_artifacts: Vec<ArtifactRecord> = artifacts.iter().filter(|artifact| diff_times(complete_time, artifact.completed_time) > 60_000).cloned().collect(); - - for artifact in old_artifacts.iter() { - let created_time_str = Utc.timestamp_millis_opt(artifact.created_time as i64).unwrap().to_rfc2822(); - artifacts_fragment.push_str(&format!("<div><pre style='display:inline;'>{}</pre> step: <pre style='display:inline;'>{}</pre></div>\n", created_time_str, &artifact.name)); - let duration_str = duration_as_human_string(artifact.completed_time.unwrap_or_else(crate::io::now_ms) - artifact.created_time); - let size_str = (std::fs::metadata(&format!("./artifacts/{}/{}", artifact.run_id, artifact.id)).expect("metadata exists").len() / 1024).to_string(); - artifacts_fragment.push_str(&format!("<pre> {}kb in {} </pre>\n", size_str, duration_str)); - } - - for artifact in recent_artifacts.iter() { - let created_time_str = Utc.timestamp_millis_opt(artifact.created_time as i64).unwrap().to_rfc2822(); - artifacts_fragment.push_str(&format!("<div><pre style='display:inline;'>{}</pre> step: <pre style='display:inline;'>{}</pre></div>\n", created_time_str, &artifact.name)); - if debug_info { - artifacts_fragment.push_str("<pre>"); - artifacts_fragment.push_str(&std::fs::read_to_string(format!("./artifacts/{}/{}", artifact.run_id, artifact.id)).unwrap()); - artifacts_fragment.push_str("</pre>\n"); - } else { - let duration_str = duration_as_human_string(artifact.completed_time.unwrap_or_else(crate::io::now_ms) - artifact.created_time); - let size_str = std::fs::metadata(&format!("./artifacts/{}/{}", artifact.run_id, artifact.id)).map(|md| { - (md.len() / 1024).to_string() - }).unwrap_or_else(|e| format!("[{}]", e)); - artifacts_fragment.push_str(&format!("<pre> {}kb in {} </pre>\n", size_str, duration_str)); - } - } - - let metrics = summarize_job_metrics(&ctx.dbctx, run.id, run.job_id).unwrap(); - - let mut html = String::new(); - html.push_str("<html>\n"); - html.push_str(&format!(" {}\n", head)); - html.push_str(" <body>\n"); - html.push_str(" <pre>\n"); - html.push_str(&format!("repo: {}\n", repo_html)); - html.push_str(&format!("commit: {}, run: {}\n", remote_commit_elem, run.id)); - html.push_str(&format!("status: {} in {}\n", status_elem, display_run_time(&run))); - if let Some(desc) = run.final_text.as_ref() { - html.push_str(&format!(" description: {}\n ", desc)); - } - html.push_str(&format!("deployed: {}\n", deployed)); - html.push_str(" </pre>\n"); - if artifacts_fragment.len() > 0 { - html.push_str(" <div>artifacts</div>\n"); - html.push_str(&artifacts_fragment); - } - if let Some(metrics) = metrics { - html.push_str(&metrics); - } - html.push_str(" </body>\n"); - html.push_str("</html>"); - - (StatusCode::OK, Html(html)) -} - -fn summarize_job_metrics(dbctx: &Arc<DbCtx>, run_id: u64, job_id: u64) -> Result<Option<String>, String> { - let runs = dbctx.runs_for_job_one_per_host(job_id)?; - - let mut section = String::new(); - section.push_str("<div>\n"); - section.push_str("<h3>metrics</h3>\n"); - section.push_str("<table style='font-family: monospace;'>\n"); - - if runs.len() == 1 { - let metrics = dbctx.metrics_for_run(run_id).unwrap(); - if metrics.is_empty() { - return Ok(None); - } - - section.push_str("<tr><th>name</th><th>value</th></tr>"); - for metric in metrics { - section.push_str(&format!("<tr><td>{}</td><td>{}</td></tr>", &metric.name, &metric.value)); - } - } else { - // very silly ordering issue: need an authoritative ordering of metrics to display metrics - // in a consistent order across runs (though they SHOULD all be ordered the same). - // - // the first run might not have all metrics (first run could be on the slowest build host - // f.ex), so for now just assume that metrics *will* be consistently ordered and build a - // list of metrics from the longest list of metrics we've seen. builders do not support - // concurrency so at least the per-run metrics-order-consistency assumption should hold.. - let mut all_names: Vec<String> = Vec::new(); - - let all_metrics: Vec<(HashMap<String, String>, HostDesc)> = runs.iter().map(|run| { - let metrics = dbctx.metrics_for_run(run.id).unwrap(); - - let mut metrics_map = HashMap::new(); - for metric in metrics.into_iter() { - if !all_names.contains(&metric.name) { - all_names.push(metric.name.clone()); - } - metrics_map.insert(metric.name, metric.value); - } - - let (hostname, cpu_vendor_id, cpu_family, cpu_model, cpu_max_freq_khz) = match run.host_id { - Some(host_id) => { - dbctx.host_model_info(host_id).unwrap() - } - None => { - ("unknown".to_string(), "unknown".to_string(), "0".to_string(), "0".to_string(), 0) - } - }; - - (metrics_map, HostDesc::from_parts(hostname, cpu_vendor_id, cpu_family, cpu_model, cpu_max_freq_khz)) - }).collect(); - - if all_metrics.is_empty() { - return Ok(None); - } - - let mut header = "<tr><th>name</th>".to_string(); - for (_, host) in all_metrics.iter() { - header.push_str(&format!("<th>{}</br>{} @ {:.3}GHz</th>", &host.hostname, &host.cpu_desc, (host.cpu_max_freq_khz as f64) / 1000_000.0)); - } - header.push_str("</tr>\n"); - section.push_str(&header); - - for name in all_names.iter() { - let mut row = format!("<tr><td>{}</td>", &name); - for (metrics, _) in all_metrics.iter() { - let value = metrics.get(name) - .map(|x| x.clone()) - .unwrap_or_else(String::new); - row.push_str(&format!("<td>{}</td>", value)); - } - row.push_str("</tr>\n"); - section.push_str(&row); - } - }; - section.push_str("</table>\n"); - section.push_str("</div>\n"); - - Ok(Some(section)) -} - -async fn handle_get_artifact(Path(path): Path<(String, String)>, State(ctx): State<WebserverState>) -> impl IntoResponse { - eprintln!("get artifact, run={}, artifact={}", path.0, path.1); - let run: u64 = path.0.parse().unwrap(); - let artifact_id: u64 = path.1.parse().unwrap(); - - let artifact_descriptor = match ctx.dbctx.lookup_artifact(run, artifact_id).unwrap() { - Some(artifact) => artifact, - None => { - return (StatusCode::NOT_FOUND, Html("no such artifact")).into_response(); - } - }; - - let mut live_artifact = false; - - if let Some(completed_time) = artifact_descriptor.completed_time { - if completed_time < artifact_descriptor.created_time { - live_artifact = true; - } - } else { - live_artifact = true; - } - - if live_artifact { - let (mut tx_sender, tx_receiver) = tokio::io::duplex(65536); - let resp_body = axum_extra::body::AsyncReadBody::new(tx_receiver); - let mut artifact_path = ctx.jobs_path.clone(); - artifact_path.push(artifact_descriptor.run_id.to_string()); - artifact_path.push(artifact_descriptor.id.to_string()); - spawn(async move { - let mut artifact = artifact_descriptor; - - let mut artifact_file = tokio::fs::File::open(&artifact_path) - .await - .expect("artifact file exists?"); - while artifact.completed_time.is_none() { - match crate::io::forward_data(&mut artifact_file, &mut tx_sender).await { - Ok(()) => { - // reached the current EOF, wait and then commit an unspeakable sin - tokio::time::sleep(std::time::Duration::from_millis(250)).await; - // this would be much implemented as yielding on a condvar woken when an - // inotify event on the file indicates a write has occurred. but i am - // dreadfully lazy, so we'll just uhh. busy-poll on the file? lmao. - artifact = ctx.dbctx.lookup_artifact(artifact.run_id, artifact.id) - .expect("can query db") - .expect("artifact still exists"); - } - Err(e) => { - eprintln!("artifact file streaming failed: {}", e); - } - } - } - - eprintln!("[+] artifact {} is done being written, and we've sent the whole thing. bye!", artifact.id); - }); - (StatusCode::OK, resp_body).into_response() - } else { - (StatusCode::OK, Html("all done")).into_response() - } -} - -async fn handle_repo_summary(Path(path): Path<String>, State(ctx): State<WebserverState>) -> impl IntoResponse { - eprintln!("get repo summary: {:?}", path); - - let mut last_builds = Vec::new(); - - let (repo_id, repo_name, default_run_preference): (u64, String, Option<String>) = match ctx.dbctx.conn.lock().unwrap() - .query_row("select id, repo_name, default_run_preference from repos where repo_name=?1;", [&path], |row| Ok((row.get(0).unwrap(), row.get(1).unwrap(), row.get(2).unwrap()))) - .optional() - .unwrap() { - Some(elem) => elem, - None => { - eprintln!("no repo named {}", path); - return (StatusCode::NOT_FOUND, Html(String::new())); - } - }; - - // TODO: display default_run_preference somehow on the web summary? - - for remote in ctx.dbctx.remotes_by_repo(repo_id).expect("can get repo from a path") { - let mut last_ten_jobs = ctx.dbctx.recent_jobs_from_remote(remote.id, 10).expect("can look up jobs for a repo"); - last_builds.extend(last_ten_jobs.drain(..)); - } - last_builds.sort_by_key(|job| -(job.created_time as i64)); - - let mut response = String::new(); - response.push_str("<html>\n"); - response.push_str(&format!("<title> ci.butactuallyin.space - {} </title>\n", repo_name)); - response.push_str("<style>\n"); - response.push_str(".build-table { font-family: monospace; border: 1px solid black; border-collapse: collapse; }\n"); - response.push_str(".row-item { padding-left: 4px; padding-right: 4px; border-right: 1px solid black; }\n"); - response.push_str(".odd-row { background: #eee; }\n"); - response.push_str(".even-row { background: #ddd; }\n"); - response.push_str("</style>\n"); - response.push_str(&format!("<h1>{} build history</h1>\n", repo_name)); - response.push_str("<a href=/>full repos index</a><p> </p>\n"); - - response.push_str("<table class='build-table'>"); - response.push_str("<tr>\n"); - let headings = ["last build", "job", "build commit", "duration", "status", "result"]; - for heading in headings { - response.push_str(&format!("<th class='row-item'>{}</th>", heading)); - } - response.push_str("</tr>\n"); - - let mut row_num = 0; - - for job in last_builds.iter().take(10) { - let run = ctx.dbctx.last_run_for_job(job.id).expect("query succeeds").expect("TODO: run exists if job exists (small race if querying while creating job ...."); - let job_commit = ctx.dbctx.commit_sha(job.commit_id).expect("job has a commit"); - let commit_html = match commit_url(&job, &job_commit, &ctx.dbctx) { - Some(url) => format!("<a href=\"{}\">{}</a>", url, &job_commit), - None => job_commit.clone() - }; - - let job_html = format!("<a href=\"{}\">{}</a>", job_url(&job, &job_commit, &ctx.dbctx), job.id); - - let last_build_time = Utc.timestamp_millis_opt(run.create_time as i64).unwrap().to_rfc2822(); - let duration = display_run_time(&run); - - let status = format!("{:?}", run.state).to_lowercase(); - - let result = match run.build_result { - Some(0) => "<span style='color:green;'>pass</span>", - Some(_) => "<span style='color:red;'>fail</span>", - None => match run.state { - RunState::Pending => { "unstarted" }, - RunState::Started => { "<span style='color:darkgoldenrod;'>in progress</span>" }, - _ => { "<span style='color:red;'>unreported</span>" } - } - }; - - let entries = [last_build_time.as_str(), job_html.as_str(), commit_html.as_str(), &duration, &status, &result]; - let entries = entries.iter().chain(std::iter::repeat(&"")).take(headings.len()); - - let mut row_html = String::new(); - for entry in entries { - row_html.push_str(&format!("<td class='row-item'>{}</td>", entry)); - } - - let row_index = row_num % 2; - response.push_str(&format!("<tr class=\"{}\">", ["even-row", "odd-row"][row_index])); - response.push_str(&row_html); - response.push_str("</tr>\n"); - - row_num += 1; - } - response.push_str("</html>"); - - (StatusCode::OK, Html(response)) -} - -async fn handle_repo_event(Path(path): Path<(String, String)>, headers: HeaderMap, State(ctx): State<WebserverState>, body: Bytes) -> impl IntoResponse { - let json: Result<serde_json::Value, _> = serde_json::from_slice(&body); - eprintln!("repo event: {:?} {:?} {:?}", path.0, path.1, headers); - - let payload = match json { - Ok(payload) => { payload }, - Err(e) => { - eprintln!("bad request: path={}/{}\nheaders: {:?}\nbody err: {:?}", path.0, path.1, headers, e); - return (StatusCode::BAD_REQUEST, "").into_response(); - } - }; - - let sent_hmac = match headers.get("x-hub-signature-256") { - Some(sent_hmac) => { sent_hmac.to_str().expect("valid ascii string").to_owned() }, - None => { - eprintln!("bad request: path={}/{}\nheaders: {:?}\nno x-hub-signature-256", path.0, path.1, headers); - return (StatusCode::BAD_REQUEST, "").into_response(); - } - }; - - let mut hmac_ok = false; - - for psk in PSKS.read().unwrap().iter() { - let mut mac = Hmac::<Sha256>::new_from_slice(psk.key.as_bytes()) - .expect("hmac can be constructed"); - mac.update(&body); - let result = mac.finalize().into_bytes().to_vec(); - - // hack: skip sha256= - let decoded = hex::decode(&sent_hmac[7..]).expect("provided hmac is valid hex"); - if decoded == result { - hmac_ok = true; - break; - } - } - - if !hmac_ok { - eprintln!("bad hmac by all psks"); - return (StatusCode::BAD_REQUEST, "").into_response(); - } - - let kind = match headers.get("x-github-event") { - Some(kind) => { kind.to_str().expect("valid ascii string").to_owned() }, - None => { - eprintln!("bad request: path={}/{}\nheaders: {:?}\nno x-github-event", path.0, path.1, headers); - return (StatusCode::BAD_REQUEST, "").into_response(); - } - }; - - handle_github_event(Arc::clone(&ctx.dbctx), path.0, path.1, kind, payload).await -} - - -async fn make_app_server(jobs_path: PathBuf, cfg_path: &PathBuf, db_path: &PathBuf) -> Router { - /* - - // GET /hello/warp => 200 OK with body "Hello, warp!" - let hello = warp::path!("hello" / String) - .map(|name| format!("Hello, {}!\n", name)); - - let github_event = warp::post() - .and(warp::path!(String / String)) - .and_then(|owner, repo| { - warp::header::<String>("x-github-event") - .and(warp::body::content_length_limit(1024 * 1024)) - .and(warp::body::json()) - .and_then(|event, json| handle_github_event(owner, repo, event, json)) - .recover(|e| { - async fn handle_rejection(err: Rejection) -> Result<impl Reply, Rejection> { - Ok(warp::reply::with_status("65308", StatusCode::BAD_REQUEST)) - } - handle_rejection(e) - }) - }); - - let repo_status = warp::get() - .and(warp::path!(String / String / String)) - .map(|owner, repo, sha| format!("CI status for {}/{} commit {}\n", owner, repo, sha)); - - let other = - warp::post() - .and(warp::path::full()) - .and(warp::addr::remote()) - .and(warp::body::content_length_limit(1024 * 1024)) - .and(warp::body::bytes()) - .map(move |path, addr: Option<std::net::SocketAddr>, body| { - println!("{}: lets see what i got {:?}, {:?}", addr.unwrap(), path, body); - "hello :)\n" - }) - .or( - warp::get() - .and(warp::path::full()) - .and(warp::addr::remote()) - .map(move |path, addr: Option<std::net::SocketAddr>| { - println!("{}: GET to {:?}", addr.unwrap(), path); - "hello!\n" - }) - ) - .recover(|e| { - async fn handle_rejection(err: Rejection) -> Result<impl Reply, std::convert::Infallible> { - Ok(warp::reply::with_status("50834", StatusCode::BAD_REQUEST)) - } - handle_rejection(e) - }); - */ - - async fn fallback_get(uri: Uri) -> impl IntoResponse { - (StatusCode::OK, "get resp") - } - - async fn fallback_post(Path(path): Path<String>) -> impl IntoResponse { - "post resp" - } - - Router::new() - .route("/:owner/:repo/:sha", get(handle_commit_status)) - .route("/:owner", get(handle_repo_summary)) - .route("/:owner/:repo", post(handle_repo_event)) - .route("/artifact/:b/:artifact_id", get(handle_get_artifact)) - .route("/", get(handle_ci_index)) - .fallback(fallback_get) - .with_state(WebserverState { - jobs_path, - dbctx: Arc::new(DbCtx::new(cfg_path, db_path)) - }) -} - -async fn bind_server(conf: serde_json::Value, jobs_path: PathBuf, config_path: PathBuf, db_path: PathBuf) -> std::io::Result<()> { - let server = make_app_server(jobs_path.clone(), &config_path, &db_path).await.into_make_service(); - use serde_json::Value; - match conf { - Value::String(address) => { - axum_server::bind(address.parse().unwrap()) - .serve(server).await - }, - Value::Object(map) => { - let address = match map.get("address") { - Some(Value::String(address)) => address.clone(), - None => { - panic!("no local address"); - }, - other => { - panic!("invalid local address: {:?}", other); - } - }; - - match (map.get("cert_path"), map.get("key_path")) { - (Some(Value::String(cert_path)), Some(Value::String(key_path))) => { - let config = RustlsConfig::from_pem_file( - cert_path.clone(), - key_path.clone(), - ).await.unwrap(); - axum_server::bind_rustls(address.parse().unwrap(), config) - .serve(server).await - }, - (Some(_), _) | (_, Some(_)) => { - panic!("invalid local tls config: only one of `cert_path` or `key_path` has been provided"); - }, - (None, None) => { - axum_server::bind(address.parse().unwrap()) - .serve(server).await - } - } - }, - other => { - panic!("invalid server bind config: {:?}", other); - } - } -} - -#[tokio::main] -async fn main() { - tracing_subscriber::fmt::init(); - - let mut args = std::env::args(); - args.next().expect("first arg exists"); - let config_path = args.next().unwrap_or("./webserver_config.json".to_string()); - let web_config: WebserverConfig = serde_json::from_reader(std::fs::File::open(config_path).expect("file exists and is accessible")).expect("valid json for WebserverConfig"); - let mut psks = PSKS.write().expect("can write lock"); - *psks = web_config.psks.clone(); - // drop write lock so we can read PSKS elsewhere WITHOUT deadlocking. - std::mem::drop(psks); - - let jobs_path = web_config.jobs_path.clone(); - let config_path = web_config.config_path.clone(); - let db_path = web_config.db_path.clone(); - if let Some(addr_conf) = web_config.debug_addr.as_ref() { - spawn(bind_server(addr_conf.clone(), jobs_path.clone(), config_path.clone(), db_path.clone())); - } - if let Some(addr_conf) = web_config.server_addr.as_ref() { - spawn(bind_server(addr_conf.clone(), jobs_path.clone(), config_path.clone(), db_path.clone())); - } - loop { - tokio::time::sleep(std::time::Duration::from_millis(1000)).await; - } -} - -struct HostDesc { - hostname: String, - cpu_desc: String, - cpu_max_freq_khz: u64, -} -impl HostDesc { - fn from_parts(hostname: String, vendor_id: String, cpu_family: String, model: String, cpu_max_freq_khz: u64) -> Self { - let cpu_desc = match (vendor_id.as_str(), cpu_family.as_str(), model.as_str()) { - ("Arm Limited", "8", "0xd03") => "aarch64 A53".to_string(), - ("GenuineIntel", "6", "85") => "x86_64 Skylake".to_string(), - ("AuthenticAMD", "23", "113") => "x86_64 Matisse".to_string(), - (vendor, family, model) => format!("unknown {}:{}:{}", vendor, family, model) - }; - - HostDesc { - hostname, - cpu_desc, - cpu_max_freq_khz, - } - } -} diff --git a/src/notifier.rs b/src/notifier.rs deleted file mode 100644 index f9d6084..0000000 --- a/src/notifier.rs +++ /dev/null @@ -1,181 +0,0 @@ -use serde_derive::{Deserialize, Serialize}; -use std::sync::Arc; -use axum::http::StatusCode; -use lettre::transport::smtp::authentication::{Credentials, Mechanism}; -use lettre::{Message, Transport}; -use lettre::transport::smtp::extension::ClientId; -use lettre::transport::smtp::client::{SmtpConnection, TlsParametersBuilder}; -use std::time::Duration; -use std::path::Path; - -use crate::DbCtx; - -pub struct RemoteNotifier { - pub remote_path: String, - pub notifier: NotifierConfig, -} - -#[derive(Serialize, Deserialize)] -#[serde(untagged)] -pub enum NotifierConfig { - GitHub { - token: String, - }, - Email { - username: String, - password: String, - mailserver: String, - from: String, - to: String, - } -} - -impl NotifierConfig { - pub fn github_from_file<P: AsRef<Path>>(path: P) -> Result<Self, String> { - let path = path.as_ref(); - let bytes = std::fs::read(path) - .map_err(|e| format!("can't read notifier config at {}: {:?}", path.display(), e))?; - let config = serde_json::from_slice(&bytes) - .map_err(|e| format!("can't deserialize notifier config at {}: {:?}", path.display(), e))?; - - if matches!(config, NotifierConfig::GitHub { .. }) { - Ok(config) - } else { - Err(format!("config at {} doesn't look like a github config (but was otherwise valid?)", path.display())) - } - } - - pub fn email_from_file<P: AsRef<Path>>(path: P) -> Result<Self, String> { - let path = path.as_ref(); - let bytes = std::fs::read(path) - .map_err(|e| format!("can't read notifier config at {}: {:?}", path.display(), e))?; - let config = serde_json::from_slice(&bytes) - .map_err(|e| format!("can't deserialize notifier config at {}: {:?}", path.display(), e))?; - - if matches!(config, NotifierConfig::Email { .. }) { - Ok(config) - } else { - Err(format!("config at {} doesn't look like an email config (but was otherwise valid?)", path.display())) - } - } -} - -impl RemoteNotifier { - pub async fn tell_pending_job(&self, ctx: &Arc<DbCtx>, repo_id: u64, sha: &str, job_id: u64) -> Result<(), String> { - self.tell_job_status( - ctx, - repo_id, sha, job_id, - "pending", "build is queued", &format!("https://{}/{}/{}", "ci.butactuallyin.space", &self.remote_path, sha) - ).await - } - - pub async fn tell_complete_job(&self, ctx: &Arc<DbCtx>, repo_id: u64, sha: &str, job_id: u64, desc: Result<String, String>) -> Result<(), String> { - match desc { - Ok(status) => { - self.tell_job_status( - ctx, - repo_id, sha, job_id, - "success", &status, &format!("https://{}/{}/{}", "ci.butactuallyin.space", &self.remote_path, sha) - ).await - }, - Err(status) => { - self.tell_job_status( - ctx, - repo_id, sha, job_id, - "failure", &status, &format!("https://{}/{}/{}", "ci.butactuallyin.space", &self.remote_path, sha) - ).await - } - } - } - - pub async fn tell_job_status(&self, ctx: &Arc<DbCtx>, repo_id: u64, sha: &str, job_id: u64, state: &str, desc: &str, target_url: &str) -> Result<(), String> { - match &self.notifier { - NotifierConfig::GitHub { token } => { - let status_info = serde_json::json!({ - "state": state, - "description": desc, - "target_url": target_url, - "context": "actuallyinspace runner", - }); - - // TODO: should pool (probably in ctx?) to have an upper bound in concurrent - // connections. - let client = reqwest::Client::new(); - let req = client.post(&format!("https://api.github.com/repos/{}/statuses/{}", &self.remote_path, sha)) - .body(serde_json::to_string(&status_info).expect("can stringify json")) - .header("content-type", "application/json") - .header("user-agent", "iximeow") - .header("authorization", format!("Bearer {}", token)) - .header("accept", "application/vnd.github+json"); - eprintln!("sending {:?}", req); - eprintln!(" body: {}", serde_json::to_string(&status_info).expect("can stringify json")); - let res = req - .send() - .await; - - match res { - Ok(res) => { - if res.status() == StatusCode::OK || res.status() == StatusCode::CREATED{ - Ok(()) - } else { - Err(format!("bad response: {}, response data: {:?}", res.status().as_u16(), res)) - } - } - Err(e) => { - Err(format!("failure sending request: {:?}", e)) - } - } - } - NotifierConfig::Email { username, password, mailserver, from, to } => { - eprintln!("[.] emailing {} for job {} via {}", state, &self.remote_path, mailserver); - - let subject = format!("{}: job for {}", state, &self.remote_path); - - let body = format!("{}", subject); - - // TODO: when ci.butactuallyin.space has valid certs again, ... fix this. - let tls = TlsParametersBuilder::new(mailserver.to_string()) - .dangerous_accept_invalid_certs(true) - .build() - .unwrap(); - - let mut mailer = SmtpConnection::connect( - mailserver, - Some(Duration::from_millis(5000)), - &ClientId::Domain("ci.butactuallyin.space".to_string()), - None, - None, - ).unwrap(); - - mailer.starttls( - &tls, - &ClientId::Domain("ci.butactuallyin.space".to_string()), - ).unwrap(); - - let resp = mailer.auth( - &[Mechanism::Plain, Mechanism::Login], - &Credentials::new(username.to_owned(), password.to_owned()) - ).unwrap(); - assert!(resp.is_positive()); - - let email = Message::builder() - .from(from.parse().unwrap()) - .to(to.parse().unwrap()) - .subject(&subject) - .body(body) - .unwrap(); - - match mailer.send(email.envelope(), &email.formatted()) { - Ok(_) => { - eprintln!("[+] notified {}@{}", username, mailserver); - Ok(()) - } - Err(e) => { - eprintln!("[-] could not send email: {:?}", e); - Err(e.to_string()) - } - } - } - } - } -} diff --git a/src/protocol.rs b/src/protocol.rs deleted file mode 100644 index c7a9318..0000000 --- a/src/protocol.rs +++ /dev/null @@ -1,114 +0,0 @@ -use serde::{Serialize, Deserialize}; - -#[derive(Serialize, Deserialize, Debug)] -#[serde(tag = "kind")] -#[serde(rename_all = "snake_case")] -pub enum ClientProto { - Started, - ArtifactCreate, - NewTask(RequestedJob), - NewTaskPlease { allowed_pushers: Option<Vec<String>>, host_info: HostInfo }, - Metric { name: String, value: String }, - Command(CommandInfo), - TaskStatus(TaskInfo), - Ping, - Pong, -} - -#[derive(Serialize, Deserialize, Debug)] -#[serde(tag = "command_info")] -#[serde(rename_all = "snake_case")] -pub enum CommandInfo { - Started { command: Vec<String>, cwd: Option<String>, id: u32 }, - Finished { exit_code: Option<i32>, id: u32 }, -} - -#[derive(Serialize, Deserialize, Debug)] -#[serde(tag = "task_info")] -#[serde(rename_all = "snake_case")] -pub enum TaskInfo { - Finished { status: String }, - Interrupted { status: String, description: Option<String> }, -} - -impl ClientProto { - pub fn metric(name: impl Into<String>, value: impl Into<String>) -> Self { - ClientProto::Metric { name: name.into(), value: value.into() } - } - - pub fn command(state: CommandInfo) -> Self { - ClientProto::Command(state) - } - - pub fn new_task_please(allowed_pushers: Option<Vec<String>>, host_info: HostInfo) -> Self { - ClientProto::NewTaskPlease { allowed_pushers, host_info } - } - - pub fn task_status(state: TaskInfo) -> Self { - ClientProto::TaskStatus(state) - } - - pub fn new_task(task: RequestedJob) -> Self { - ClientProto::NewTask(task) - } -} - -impl CommandInfo { - pub fn started(command: impl Into<Vec<String>>, cwd: Option<&str>, id: u32) -> Self { - CommandInfo::Started { command: command.into(), cwd: cwd.map(ToOwned::to_owned), id } - } - - pub fn finished(exit_code: Option<i32>, id: u32) -> Self { - CommandInfo::Finished { exit_code, id } - } -} - -impl TaskInfo { - pub fn finished(status: impl Into<String>) -> Self { - TaskInfo::Finished { status: status.into() } - } - - pub fn interrupted(status: impl Into<String>, description: impl Into<Option<String>>) -> Self { - TaskInfo::Interrupted { status: status.into(), description: description.into() } - } -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct HostInfo { - pub hostname: String, - pub cpu_info: CpuInfo, - pub memory_info: MemoryInfo, - pub env_info: EnvInfo, -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct CpuInfo { - pub model_name: String, - pub microcode: String, - pub cores: u32, - pub vendor_id: String, - pub family: String, - pub model: String, - // clock speed in khz - pub max_freq: u64, -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct MemoryInfo { - pub total: String, - pub available: String, -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct EnvInfo { - pub arch: String, - pub family: String, - pub os: String, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct RequestedJob { - pub commit: String, - pub remote_url: String, - pub build_token: String, -} diff --git a/src/sql.rs b/src/sql.rs deleted file mode 100644 index 1071279..0000000 --- a/src/sql.rs +++ /dev/null @@ -1,229 +0,0 @@ -#![allow(dead_code)] - -use std::convert::TryFrom; - -use crate::dbctx::Run; -use crate::dbctx::Job; - -#[derive(Debug, Clone)] -pub enum JobResult { - Pass = 0, - Fail = 1, -} - -#[derive(Debug, Copy, Clone, PartialEq)] -pub enum RunState { - Pending = 0, - Started = 1, - Finished = 2, - Error = 3, - Invalid = 4, -} - -impl TryFrom<u8> for RunState { - type Error = String; - - fn try_from(value: u8) -> Result<Self, String> { - match value { - 0 => Ok(RunState::Pending), - 1 => Ok(RunState::Started), - 2 => Ok(RunState::Finished), - 3 => Ok(RunState::Error), - 4 => Ok(RunState::Invalid), - other => Err(format!("invalid job state: {}", other)), - } - } -} - -pub(crate) fn row2run(row: &rusqlite::Row) -> Run { - let (id, job_id, artifacts_path, state, host_id, build_token, create_time, start_time, complete_time, run_timeout, build_result, final_text) = row.try_into().unwrap(); - let state: u8 = state; - Run { - id, - job_id, - artifacts_path, - state: state.try_into().unwrap(), - host_id, - create_time, - start_time, - complete_time, - build_token, - run_timeout, - build_result, - final_text, - } -} - -// remote_id is the remote from which we were notified. this is necessary so we know which remote -// to pull from to actually run the job. -pub const CREATE_JOBS_TABLE: &'static str = "\ - CREATE TABLE IF NOT EXISTS jobs (id INTEGER PRIMARY KEY AUTOINCREMENT, - source TEXT, - created_time INTEGER, - remote_id INTEGER, - commit_id INTEGER, - run_preferences TEXT);"; - -pub const CREATE_METRICS_TABLE: &'static str = "\ - CREATE TABLE IF NOT EXISTS metrics (id INTEGER PRIMARY KEY AUTOINCREMENT, - job_id INTEGER, - name TEXT, - value TEXT, - UNIQUE(job_id, name) - );"; - -pub const CREATE_COMMITS_TABLE: &'static str = "\ - CREATE TABLE IF NOT EXISTS commits (id INTEGER PRIMARY KEY AUTOINCREMENT, sha TEXT UNIQUE);"; - -pub const CREATE_REPOS_TABLE: &'static str = "\ - CREATE TABLE IF NOT EXISTS repos (id INTEGER PRIMARY KEY AUTOINCREMENT, - repo_name TEXT, - default_run_preference TEXT);"; - -// remote_api is `github` or NULL for now. hopefully a future cgit-style notifier one day. -// remote_path is some unique identifier for the relevant remote. -// * for `github` remotes, this will be `owner/repo`. -// * for others.. who knows. -// remote_url is a url for human interaction with the remote (think https://git.iximeow.net/zvm) -// remote_git_url is a url that can be `git clone`'d to fetch sources -pub const CREATE_REMOTES_TABLE: &'static str = "\ - CREATE TABLE IF NOT EXISTS remotes (id INTEGER PRIMARY KEY AUTOINCREMENT, - repo_id INTEGER, - remote_path TEXT, - remote_api TEXT, - remote_url TEXT, - remote_git_url TEXT, - notifier_config_path TEXT);"; - -pub const CREATE_ARTIFACTS_TABLE: &'static str = "\ - CREATE TABLE IF NOT EXISTS artifacts (id INTEGER PRIMARY KEY AUTOINCREMENT, - run_id INTEGER, - name TEXT, - desc TEXT, - created_time INTEGER, - completed_time INTEGER);"; - -pub const CREATE_RUNS_TABLE: &'static str = "\ - CREATE TABLE IF NOT EXISTS runs (id INTEGER PRIMARY KEY AUTOINCREMENT, - job_id INTEGER, - artifacts_path TEXT, - state INTEGER NOT NULL, - host_id INTEGER, - build_token TEXT, - created_time INTEGER, - started_time INTEGER, - complete_time INTEGER, - run_timeout INTEGER, - build_result INTEGER, - final_status TEXT);"; - -pub const CREATE_HOSTS_TABLE: &'static str = "\ - CREATE TABLE IF NOT EXISTS hosts (id INTEGER PRIMARY KEY AUTOINCREMENT, - hostname TEXT, - cpu_vendor_id TEXT, - cpu_model_name TEXT, - cpu_family TEXT, - cpu_model TEXT, - cpu_microcode TEXT, - cpu_max_freq_khz INTEGER, - cpu_cores INTEGER, - mem_total TEXT, - arch TEXT, - family TEXT, - os TEXT, - UNIQUE(hostname, cpu_vendor_id, cpu_model_name, cpu_family, cpu_model, cpu_microcode, cpu_cores, mem_total, arch, family, os));"; - -pub const CREATE_REMOTES_INDEX: &'static str = "\ - CREATE INDEX IF NOT EXISTS 'repo_to_remote' ON remotes(repo_id);"; - -pub const CREATE_REPO_NAME_INDEX: &'static str = "\ - CREATE UNIQUE INDEX IF NOT EXISTS 'repo_names' ON repos(repo_name);"; - -pub const PENDING_RUNS: &'static str = "\ - select id, job_id, created_time, host_preference from runs where state=0 and (host_preference=?1 or host_preference is null) order by created_time desc;"; - -pub const JOBS_NEEDING_HOST_RUN: &'static str = "\ - select jobs.id, jobs.source, jobs.created_time, jobs.remote_id, jobs.commit_id, jobs.run_preferences from jobs \ - where jobs.run_preferences=\"all\" and jobs.created_time > ?1 \ - and not exists \ - (select 1 from runs r2 where r2.job_id = jobs.id and r2.host_id = ?2);"; - -pub const ACTIVE_RUNS: &'static str = "\ - select id, - job_id, - artifacts_path, - state, - host_id, - build_token, - created_time, - started_time, - complete_time, - run_timeout, - build_result, - final_status from runs where state=1 or state=0;"; - -pub const LAST_ARTIFACTS_FOR_RUN: &'static str = "\ - select * from artifacts where run_id=?1 and (name like \"%(stderr)%\" or name like \"%(stdout)%\") order by id desc limit ?2;"; - -pub const JOB_BY_COMMIT_ID: &'static str = "\ - select id, source, created_time, remote_id, commit_id, run_preferences from jobs where commit_id=?1;"; - -pub const ARTIFACT_BY_ID: &'static str = "\ - select * from artifacts where id=?1 and run_id=?2;"; - -pub const JOB_BY_ID: &'static str = "\ - select id, source, created_time, remote_id, commit_id, run_preferences from jobs where id=?1"; - -pub const METRICS_FOR_RUN: &'static str = "\ - select * from metrics where run_id=?1 order by id asc;"; - -pub const METRICS_FOR_JOB: &'static str = "\ - select metrics.id, metrics.run_id, metrics.name, metrics.value from metrics \ - join runs on runs.id=metrics.run_id \ - where runs.job_id=?1 \ - order by metrics.run_id desc, metrics.id desc;"; - -pub const COMMIT_TO_ID: &'static str = "\ - select id from commits where sha=?1;"; - -pub const REMOTES_FOR_REPO: &'static str = "\ - select * from remotes where repo_id=?1;"; - -pub const ALL_REPOS: &'static str = "\ - select id, repo_name, default_run_preference from repos;"; - -pub const LAST_JOBS_FROM_REMOTE: &'static str = "\ - select id, source, created_time, remote_id, commit_id, run_preferences from jobs where remote_id=?1 order by created_time desc limit ?2;"; - -pub const LAST_RUN_FOR_JOB: &'static str = "\ - select id, - job_id, - artifacts_path, - state, - host_id, - build_token, - created_time, - started_time, - complete_time, - run_timeout, - build_result, - final_status from runs where job_id=?1 order by started_time desc limit 1;"; - -pub const RUNS_FOR_JOB: &'static str = "\ - select id, - job_id, - artifacts_path, - state, - host_id, - build_token, - created_time, - started_time, - complete_time, - run_timeout, - build_result, - final_status from runs where job_id=?1 group by host_id order by started_time desc, state asc;"; - -pub const SELECT_ALL_RUNS_WITH_JOB_INFO: &'static str = "\ - select jobs.id as job_id, runs.id as run_id, runs.state, runs.created_time, jobs.commit_id, jobs.run_preferences - from jobs join runs on jobs.id=runs.job_id - oder by runs.created_time asc;"; |