summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/ci_ctl.rs235
-rw-r--r--src/ci_driver.rs630
-rw-r--r--src/ci_runner.rs668
-rw-r--r--src/dbctx.rs772
-rw-r--r--src/io.rs181
-rw-r--r--src/lua/mod.rs411
-rw-r--r--src/main.rs1092
-rw-r--r--src/notifier.rs181
-rw-r--r--src/protocol.rs114
-rw-r--r--src/sql.rs229
10 files changed, 0 insertions, 4513 deletions
diff --git a/src/ci_ctl.rs b/src/ci_ctl.rs
deleted file mode 100644
index f0ffa62..0000000
--- a/src/ci_ctl.rs
+++ /dev/null
@@ -1,235 +0,0 @@
-use clap::{Parser, Subcommand};
-
-mod sql;
-mod dbctx;
-mod notifier;
-mod io;
-mod protocol;
-
-use dbctx::DbCtx;
-use notifier::NotifierConfig;
-
-#[derive(Parser)]
-#[command(version, about, long_about = None)]
-struct Args {
- /// path to a database to manage (defaults to "./state.db")
- db_path: Option<String>,
-
- /// path to where configs should be found (defaults to "./config")
- config_path: Option<String>,
-
- #[command(subcommand)]
- command: Command,
-}
-
-#[derive(Subcommand)]
-enum Command {
- /// add _something_ to the db
- Add {
- #[command(subcommand)]
- what: AddItem,
- },
-
- /// make sure that the state looks reasonable.
- ///
- /// currently, ensure that all notifiers have a config, that config references an existing
- /// file, and that the referenced file is valid.
- Validate,
-
- /// do something with jobs
- Job {
- #[command(subcommand)]
- what: JobAction,
- },
-}
-
-#[derive(Subcommand)]
-enum JobAction {
- List,
- Rerun {
- which: u32
- },
- RerunCommit {
- commit: String
- },
- Create {
- repo: String,
- commit: String,
- pusher_email: String,
- }
-}
-
-#[derive(Subcommand)]
-enum AddItem {
- Repo {
- name: String,
- remote: Option<String>,
- remote_kind: Option<String>,
- config: Option<String>,
- },
- Remote {
- repo_name: String,
- remote: String,
- remote_kind: String,
- config: String,
- },
-}
-
-fn main() {
- let args = Args::parse();
-
- let db_path = args.db_path.unwrap_or_else(|| "./state.db".to_owned());
- let config_path = args.config_path.unwrap_or_else(|| "./config".to_owned());
-
- match args.command {
- Command::Job { what } => {
- match what {
- JobAction::List => {
- let db = DbCtx::new(&config_path, &db_path);
- let mut conn = db.conn.lock().unwrap();
- let mut query = conn.prepare(crate::sql::SELECT_ALL_RUNS_WITH_JOB_INFO).unwrap();
- let mut jobs = query.query([]).unwrap();
- while let Some(row) = jobs.next().unwrap() {
- let (job_id, run_id, state, created_time, commit_id, run_preferences): (u64, u64, u64, u64, u64, Option<String>) = row.try_into().unwrap();
-
- eprint!("[+] {:04} ({:04}) | {: >8?} | {} | {}", run_id, job_id, state, created_time, commit_id);
- if let Some(run_preferences) = run_preferences {
- eprintln!(" | run preference: {}", run_preferences);
- } else {
- eprintln!("");
- }
- }
- eprintln!("jobs");
- },
- JobAction::Rerun { which } => {
- let db = DbCtx::new(&config_path, &db_path);
- let task_id = db.new_run(which as u64, None).expect("db can be queried").id;
- eprintln!("[+] rerunning job {} as task {}", which, task_id);
- }
- JobAction::RerunCommit { commit } => {
- let db = DbCtx::new(&config_path, &db_path);
- let job_id = db.job_for_commit(&commit).unwrap();
- if let Some(job_id) = job_id {
- let task_id = db.new_run(job_id, None).expect("db can be queried").id;
- eprintln!("[+] rerunning job {} (commit {}) as task {}", job_id, commit, task_id);
- } else {
- eprintln!("[-] no job for commit {}", commit);
- }
- }
- JobAction::Create { repo, commit, pusher_email } => {
- let db = DbCtx::new(&config_path, &db_path);
- let parts = repo.split(":").collect::<Vec<&str>>();
- let (remote_kind, repo_path) = (parts[0], parts[1]);
- let remote = match db.remote_by_path_and_api(&remote_kind, &repo_path).expect("can query") {
- Some(remote) => remote,
- None => {
- eprintln!("[-] no remote registered as {}:{}", remote_kind, repo_path);
- return;
- }
- };
-
- let repo_default_run_pref: Option<String> = db.conn.lock().unwrap()
- .query_row("select default_run_preference from repos where id=?1;", [remote.repo_id], |row| {
- Ok((row.get(0)).unwrap())
- })
- .expect("can query");
-
- let job_id = db.new_job(remote.id, &commit, Some(&pusher_email), repo_default_run_pref).expect("can create");
- let _ = db.new_run(job_id, None).unwrap();
- }
- }
- },
- Command::Add { what } => {
- match what {
- AddItem::Repo { name, remote, remote_kind, config } => {
- let remote_config = match (remote, remote_kind, config) {
- (Some(remote), Some(remote_kind), Some(config_path)) => {
- // do something
- if remote_kind != "github" {
- eprintln!("unknown remote kind: {}", remote);
- return;
- }
- Some((remote, remote_kind, config_path))
- },
- (None, None, None) => {
- None
- },
- _ => {
- eprintln!("when specifying a remote, `remote`, `remote_kind`, and `config_path` must either all be provided together or not at all");
- return;
- }
- };
-
- let db = DbCtx::new(&config_path, &db_path);
- let repo_id = match db.new_repo(&name) {
- Ok(repo_id) => repo_id,
- Err(e) => {
- if e.contains("UNIQUE constraint failed") {
- eprintln!("[!] repo '{}' already exists", name);
- return;
- } else {
- eprintln!("[!] failed to create repo entry: {}", e);
- return;
- }
- }
- };
- println!("[+] new repo created: '{}' id {}", &name, repo_id);
- if let Some((remote, remote_kind, config_path)) = remote_config {
- let full_config_file_path = format!("{}/{}", &db.config_path.display(), config_path);
- let config = match remote_kind.as_ref() {
- "github" => {
- assert!(NotifierConfig::github_from_file(&full_config_file_path).is_ok());
- }
- "github-email" => {
- assert!(NotifierConfig::email_from_file(&full_config_file_path).is_ok());
- }
- other => {
- panic!("[-] notifiers for '{}' remotes are not supported", other);
- }
- };
- db.new_remote(repo_id, remote.as_str(), remote_kind.as_str(), config_path.as_str()).unwrap();
- println!("[+] new remote created: repo '{}', {} remote at {}", &name, remote_kind, remote);
- match remote_kind.as_str() {
- "github" => {
- println!("[!] now go make sure your github repo has a webhook set for `https://ci.butactuallyin.space/{}` to receive at least the `push` event.", remote.as_str());
- println!(" the secret sent with calls to this webhook should be the same preshared secret as the CI server is configured to know.");
- }
- _ => { }
- }
- }
- },
- AddItem::Remote { repo_name, remote, remote_kind, config } => {
- let db = DbCtx::new(&config_path, &db_path);
- let repo_id = match db.repo_id_by_name(&repo_name) {
- Ok(Some(id)) => id,
- Ok(None) => {
- eprintln!("[-] repo '{}' does not exist", repo_name);
- return;
- },
- Err(e) => {
- eprintln!("[!] couldn't look up repo '{}': {:?}", repo_name, e);
- return;
- }
- };
- let config_file = format!("{}/{}", config_path, config);
- match remote_kind.as_ref() {
- "github" => {
- NotifierConfig::github_from_file(&config_file).unwrap();
- }
- "github-email" => {
- NotifierConfig::email_from_file(&config_file).unwrap();
- }
- other => {
- panic!("notifiers for '{}' remotes are not supported", other);
- }
- };
- db.new_remote(repo_id, remote.as_str(), remote_kind.as_str(), config.as_str()).unwrap();
- println!("[+] new remote created: repo '{}', {} remote at {}", &repo_name, remote_kind, remote);
- },
- }
- },
- Command::Validate => {
- println!("ok");
- }
- }
-}
diff --git a/src/ci_driver.rs b/src/ci_driver.rs
deleted file mode 100644
index f6c1828..0000000
--- a/src/ci_driver.rs
+++ /dev/null
@@ -1,630 +0,0 @@
-use std::process::Command;
-use std::collections::HashMap;
-use std::sync::{Mutex, RwLock};
-use lazy_static::lazy_static;
-use std::io::Read;
-use serde_derive::{Deserialize, Serialize};
-use futures_util::StreamExt;
-use std::fmt;
-use std::path::{Path, PathBuf};
-use tokio::spawn;
-use tokio_stream::wrappers::ReceiverStream;
-use std::sync::{Arc, Weak};
-use std::time::{SystemTime, UNIX_EPOCH};
-use axum_server::tls_rustls::RustlsConfig;
-use axum::body::StreamBody;
-use axum::http::{StatusCode};
-use hyper::HeaderMap;
-use axum::Router;
-use axum::routing::*;
-use axum::extract::State;
-use axum::extract::BodyStream;
-use axum::response::IntoResponse;
-use tokio::sync::mpsc;
-use tokio::sync::mpsc::error::TrySendError;
-use serde_json::json;
-
-mod dbctx;
-mod sql;
-mod notifier;
-mod io;
-mod protocol;
-
-use crate::dbctx::{DbCtx, PendingRun, Job, Run};
-use crate::sql::JobResult;
-use crate::sql::RunState;
-use crate::protocol::{ClientProto, CommandInfo, TaskInfo, RequestedJob};
-
-lazy_static! {
- static ref AUTH_SECRET: RwLock<Option<String>> = RwLock::new(None);
- static ref ACTIVE_TASKS: Mutex<HashMap<u64, Weak<()>>> = Mutex::new(HashMap::new());
-}
-
-fn reserve_artifacts_dir(run: u64) -> std::io::Result<PathBuf> {
- let mut path: PathBuf = "/root/ixi_ci_server/artifacts/".into();
- path.push(run.to_string());
- match std::fs::create_dir(&path) {
- Ok(()) => {
- Ok(path)
- },
- Err(e) => {
- if e.kind() == std::io::ErrorKind::AlreadyExists {
- Ok(path)
- } else {
- Err(e)
- }
- }
- }
-}
-
-async fn activate_run(dbctx: Arc<DbCtx>, candidate: RunnerClient, job: &Job, run: &PendingRun) -> Result<(), String> {
- eprintln!("activating task {:?}", run);
-
- let now = SystemTime::now()
- .duration_since(UNIX_EPOCH)
- .expect("now is before epoch")
- .as_millis();
-
- let remote = dbctx.remote_by_id(job.remote_id).expect("query succeeds").expect("job has remote");
- let repo = dbctx.repo_by_id(remote.repo_id).expect("query succeeds").expect("remote has repo");
-
- let commit_sha = dbctx.commit_sha(job.commit_id).expect("query succeeds");
-
- let artifacts: PathBuf = reserve_artifacts_dir(run.id).expect("can reserve a directory for artifacts");
-
- eprintln!("running {}", &repo.name);
-
- let res = candidate.submit(&dbctx, &run, &remote.remote_git_url, &commit_sha).await;
-
- let mut client_job = match res {
- Ok(Some(mut client_job)) => { client_job }
- Ok(None) => {
- return Err("client hung up instead of acking task".to_string());
- }
- Err(e) => {
- // failed to submit job, move on for now
- return Err(format!("failed to submit task: {:?}", e));
- }
- };
-
- let host_id = client_job.client.host_id;
-
- let connection = dbctx.conn.lock().unwrap();
- connection.execute(
- "update runs set started_time=?1, host_id=?2, state=1, artifacts_path=?3, build_token=?4 where id=?5",
- (now as u64, host_id, format!("{}", artifacts.display()), &client_job.client.build_token, run.id)
- )
- .expect("can update");
- std::mem::drop(connection);
-
- spawn(async move {
- client_job.run().await
- });
-
- Ok(())
-}
-
-struct RunnerClient {
- tx: mpsc::Sender<Result<String, String>>,
- rx: BodyStream,
- host_id: u32,
- build_token: String,
- accepted_sources: Option<Vec<String>>,
-}
-
-fn token_for_job() -> String {
- let mut data = [0u8; 32];
- std::fs::File::open("/dev/urandom")
- .unwrap()
- .read_exact(&mut data)
- .unwrap();
-
- base64::encode(data)
-}
-
-struct ClientJob {
- dbctx: Arc<DbCtx>,
- remote_git_url: String,
- sha: String,
- task: PendingRun,
- client: RunnerClient,
- // exists only as confirmation this `ClientJob` is somewhere, still alive and being processed.
- task_witness: Arc<()>,
-}
-
-impl ClientJob {
- pub async fn run(&mut self) {
- loop {
- eprintln!("waiting on response..");
- let msg = match self.client.recv_typed::<ClientProto>().await.expect("recv works") {
- Some(msg) => msg,
- None => {
- eprintln!("client hung up. task's done, i hope?");
- return;
- }
- };
- eprintln!("got {:?}", msg);
- match msg {
- ClientProto::NewTaskPlease { allowed_pushers, host_info } => {
- eprintln!("misdirected task request (after handshake?)");
- return;
- }
- ClientProto::TaskStatus(task_info) => {
- let (result, state): (Result<String, String>, RunState) = match task_info {
- TaskInfo::Finished { status } => {
- eprintln!("task update: state is finished and result is {}", status);
- match status.as_str() {
- "pass" => {
- (Ok("success".to_string()), RunState::Finished)
- },
- other => {
- eprintln!("unhandled task completion status: {}", other);
- (Err(other.to_string()), RunState::Error)
- }
- }
- },
- TaskInfo::Interrupted { status, description } => {
- eprintln!("task update: state is interrupted and result is {}", status);
- let desc = description.unwrap_or_else(|| status.clone());
- (Err(desc), RunState::Error)
- }
- };
-
- let job = self.dbctx.job_by_id(self.task.job_id).expect("can query").expect("job exists");
- let repo_id = self.dbctx.repo_id_by_remote(job.remote_id).unwrap().expect("remote exists");
-
- for notifier in self.dbctx.notifiers_by_repo(repo_id).expect("can get notifiers") {
- if let Err(e) = notifier.tell_complete_job(&self.dbctx, repo_id, &self.sha, self.task.id, result.clone()).await {
- eprintln!("could not notify {:?}: {:?}", notifier.remote_path, e);
- }
- }
-
- let now = SystemTime::now()
- .duration_since(UNIX_EPOCH)
- .expect("now is before epoch")
- .as_millis();
-
- let build_result = if result.is_ok() {
- JobResult::Pass
- } else {
- JobResult::Fail
- };
- let result_desc = match result {
- Ok(msg) => msg,
- Err(msg) => msg,
- };
-
- self.dbctx.conn.lock().unwrap().execute(
- "update runs set complete_time=?1, state=?2, build_result=?3, final_status=?4 where id=?5",
- (now as u64, state as u64, build_result as u8, result_desc, self.task.id)
- )
- .expect("can update");
- }
- ClientProto::ArtifactCreate => {
- eprintln!("creating artifact");
- self.client.send(serde_json::json!({
- "status": "ok",
- "object_id": "10",
- })).await.unwrap();
- },
- ClientProto::Metric { name, value } => {
- self.dbctx.insert_metric(self.task.id, &name, &value)
- .expect("TODO handle metric insert error?");
- }
- ClientProto::Command(_command_info) => {
- // record information about commands, start/stop, etc. probably also allow
- // artifacts to be attached to commands and default to attaching stdout/stderr?
- }
- other => {
- eprintln!("unhandled message {:?}", other);
- }
- }
- }
- }
-}
-
-impl RunnerClient {
- async fn new(sender: mpsc::Sender<Result<String, String>>, resp: BodyStream, accepted_sources: Option<Vec<String>>, host_id: u32) -> Result<Self, String> {
- let token = token_for_job();
- let client = RunnerClient {
- tx: sender,
- rx: resp,
- host_id,
- build_token: token,
- accepted_sources,
- };
- Ok(client)
- }
-
- async fn test_connection(&mut self) -> Result<(), String> {
- self.send_typed(&ClientProto::Ping).await?;
- let resp = self.recv_typed::<ClientProto>().await?;
- match resp {
- Some(ClientProto::Pong) => {
- Ok(())
- }
- Some(other) => {
- Err(format!("unexpected connection test response: {:?}", other))
- }
- None => {
- Err("client hung up".to_string())
- }
- }
- }
-
- async fn send(&mut self, msg: serde_json::Value) -> Result<(), String> {
- self.send_typed(&msg).await
- }
-
- async fn send_typed<T: serde::Serialize>(&mut self, msg: &T) -> Result<(), String> {
- self.tx.send(Ok(serde_json::to_string(msg).unwrap()))
- .await
- .map_err(|e| e.to_string())
- }
-
- async fn recv(&mut self) -> Result<Option<serde_json::Value>, String> {
- match self.rx.next().await {
- Some(Ok(bytes)) => {
- serde_json::from_slice(&bytes)
- .map(Option::Some)
- .map_err(|e| e.to_string())
- },
- Some(Err(e)) => {
- eprintln!("e: {:?}", e);
- Err(format!("no client job: {:?}", e))
- },
- None => {
- Ok(None)
- }
- }
- }
-
- async fn recv_typed<T: serde::de::DeserializeOwned>(&mut self) -> Result<Option<T>, String> {
- let json = self.recv().await?;
- Ok(json.map(|v| serde_json::from_value(v).unwrap()))
- }
-
- // is this client willing to run the job based on what it has told us so far?
- fn will_accept(&self, job: &Job) -> bool {
- match (job.source.as_ref(), self.accepted_sources.as_ref()) {
- (_, None) => true,
- (None, Some(_)) => false,
- (Some(source), Some(accepted_sources)) => {
- accepted_sources.contains(source)
- }
- }
- }
-
- async fn submit(mut self, dbctx: &Arc<DbCtx>, job: &PendingRun, remote_git_url: &str, sha: &str) -> Result<Option<ClientJob>, String> {
- self.send_typed(&ClientProto::new_task(RequestedJob {
- commit: sha.to_string(),
- remote_url: remote_git_url.to_string(),
- build_token: self.build_token.to_string(),
- })).await?;
- match self.recv_typed::<ClientProto>().await {
- Ok(Some(ClientProto::Started)) => {
- let task_witness = Arc::new(());
- ACTIVE_TASKS.lock().unwrap().insert(job.id, Arc::downgrade(&task_witness));
- Ok(Some(ClientJob {
- task: job.clone(),
- dbctx: Arc::clone(dbctx),
- sha: sha.to_string(),
- remote_git_url: remote_git_url.to_string(),
- client: self,
- task_witness,
- }))
- }
- Ok(Some(resp)) => {
- eprintln!("invalid response: {:?}", resp);
- Err("client rejected job".to_string())
- }
- Ok(None) => {
- Ok(None)
- }
- Err(e) => {
- eprintln!("e: {:?}", e);
- Err(e)
- }
- }
- }
-}
-
-impl fmt::Debug for RunnerClient {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- f.write_str("RunnerClient { .. }")
- }
-}
-
-#[axum_macros::debug_handler]
-async fn handle_artifact(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClient>)>, headers: HeaderMap, artifact_content: BodyStream) -> impl IntoResponse {
- eprintln!("artifact request");
- let run_token = match headers.get("x-task-token") {
- Some(run_token) => run_token.to_str().expect("valid string"),
- None => {
- eprintln!("bad artifact post: headers: {:?}\nno x-tasak-token", headers);
- return (StatusCode::BAD_REQUEST, "").into_response();
- }
- };
-
- let (run, artifact_path, token_validity) = match ctx.0.run_for_token(&run_token).unwrap() {
- Some(result) => result,
- None => {
- eprintln!("bad artifact post: headers: {:?}\nrun token is not known", headers);
- return (StatusCode::BAD_REQUEST, "").into_response();
- }
- };
-
- if token_validity != dbctx::TokenValidity::Valid {
- eprintln!("bad artifact post: headers: {:?}. token is not valid: {:?}", headers, token_validity);
- return (StatusCode::BAD_REQUEST, "").into_response();
- }
-
- let artifact_path = if let Some(artifact_path) = artifact_path {
- artifact_path
- } else {
- eprintln!("bad artifact post: headers: {:?}. no artifact path?", headers);
- return (StatusCode::BAD_REQUEST, "").into_response();
- };
-
- let artifact_name = match headers.get("x-artifact-name") {
- Some(artifact_name) => artifact_name.to_str().expect("valid string"),
- None => {
- eprintln!("bad artifact post: headers: {:?}\nno x-artifact-name", headers);
- return (StatusCode::BAD_REQUEST, "").into_response();
- }
- };
-
- let artifact_desc = match headers.get("x-artifact-desc") {
- Some(artifact_desc) => artifact_desc.to_str().expect("valid string"),
- None => {
- eprintln!("bad artifact post: headers: {:?}\nno x-artifact-desc", headers);
- return (StatusCode::BAD_REQUEST, "").into_response();
- }
- };
-
- let mut artifact = match ctx.0.reserve_artifact(run, artifact_name, artifact_desc).await {
- Ok(artifact) => artifact,
- Err(err) => {
- eprintln!("failure to reserve artifact: {:?}", err);
- return (StatusCode::INTERNAL_SERVER_ERROR, "").into_response();
- }
- };
-
- eprintln!("spawning task...");
- let dbctx_ref = Arc::clone(&ctx.0);
- spawn(async move {
- artifact.store_all(artifact_content).await.unwrap();
- dbctx_ref.finalize_artifact(artifact.artifact_id).await.unwrap();
- });
- eprintln!("done?");
-
- (StatusCode::OK, "").into_response()
-}
-
-#[derive(Serialize, Deserialize)]
-struct WorkRequest {
- kind: String,
- accepted_pushers: Option<Vec<String>>
-}
-
-async fn handle_next_job(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClient>)>, headers: HeaderMap, mut job_resp: BodyStream) -> impl IntoResponse {
- let auth_token = match headers.get("authorization") {
- Some(token) => {
- if Some(token.to_str().unwrap_or("")) != AUTH_SECRET.read().unwrap().as_ref().map(|x| &**x) {
- eprintln!("BAD AUTH SECRET SUBMITTED: {:?}", token);
- return (StatusCode::BAD_REQUEST, "").into_response();
- }
- }
- None => {
- eprintln!("bad artifact post: headers: {:?}\nno authorization", headers);
- return (StatusCode::BAD_REQUEST, "").into_response();
- }
- };
-
- let (tx_sender, tx_receiver) = mpsc::channel(8);
- let resp_body = StreamBody::new(ReceiverStream::new(tx_receiver));
- tx_sender.send(Ok("hello".to_string())).await.expect("works");
-
- let request = job_resp.next().await.expect("request chunk").expect("chunk exists");
- let request = std::str::from_utf8(&request).unwrap();
- let request: ClientProto = match serde_json::from_str(&request) {
- Ok(v) => v,
- Err(e) => {
- eprintln!("couldn't parse work request: {:?}", e);
- return (StatusCode::MISDIRECTED_REQUEST, resp_body).into_response();
- }
- };
- let (accepted_pushers, host_info) = match request {
- ClientProto::NewTaskPlease { allowed_pushers, host_info } => (allowed_pushers, host_info),
- other => {
- eprintln!("bad request kind: {:?}", &other);
- return (StatusCode::MISDIRECTED_REQUEST, resp_body).into_response();
- }
- };
-
- eprintln!("client identifies itself as {:?}", host_info);
-
- let host_info_id = ctx.0.id_for_host(&host_info).expect("can get a host info id");
-
- let client = match RunnerClient::new(tx_sender, job_resp, accepted_pushers, host_info_id).await {
- Ok(v) => v,
- Err(e) => {
- eprintln!("unable to register client");
- return (StatusCode::MISDIRECTED_REQUEST, resp_body).into_response();
- }
- };
-
- match ctx.1.try_send(client) {
- Ok(()) => {
- eprintln!("client requested work...");
- return (StatusCode::OK, resp_body).into_response();
- }
- Err(TrySendError::Full(client)) => {
- return (StatusCode::IM_A_TEAPOT, resp_body).into_response();
- }
- Err(TrySendError::Closed(client)) => {
- panic!("client holder is gone?");
- }
- }
-}
-
-async fn make_api_server(dbctx: Arc<DbCtx>) -> (Router, mpsc::Receiver<RunnerClient>) {
- let (pending_client_sender, pending_client_receiver) = mpsc::channel(8);
-
- let router = Router::new()
- .route("/api/next_job", post(handle_next_job))
- .route("/api/artifact", post(handle_artifact))
- .with_state((dbctx, pending_client_sender));
- (router, pending_client_receiver)
-}
-
-#[derive(Deserialize, Serialize)]
-struct DriverConfig {
- cert_path: PathBuf,
- key_path: PathBuf,
- config_path: PathBuf,
- db_path: PathBuf,
- server_addr: String,
- auth_secret: String,
-}
-
-#[tokio::main]
-async fn main() {
- tracing_subscriber::fmt::init();
-
- let mut args = std::env::args();
- args.next().expect("first arg exists");
- let config_path = args.next().unwrap_or("./driver_config.json".to_string());
- let driver_config: DriverConfig = serde_json::from_reader(std::fs::File::open(config_path).expect("file exists and is accessible")).expect("valid json for DriverConfig");
- let mut auth_secret = AUTH_SECRET.write().unwrap();
- *auth_secret = Some(driver_config.auth_secret.clone());
- std::mem::drop(auth_secret);
-
- let config = RustlsConfig::from_pem_file(
- driver_config.cert_path.clone(),
- driver_config.key_path.clone(),
- ).await.unwrap();
-
- let dbctx = Arc::new(DbCtx::new(&driver_config.config_path, &driver_config.db_path));
-
- dbctx.create_tables().unwrap();
-
- let (api_server, mut channel) = make_api_server(Arc::clone(&dbctx)).await;
- spawn(axum_server::bind_rustls(driver_config.server_addr.parse().unwrap(), config)
- .serve(api_server.into_make_service()));
-
- spawn(old_task_reaper(Arc::clone(&dbctx)));
-
- loop {
- let mut candidate = match channel.recv().await
- .ok_or_else(|| "client channel disconnected".to_string()) {
-
- Ok(candidate) => { candidate },
- Err(e) => { eprintln!("client error: {}", e); continue; }
- };
-
- let dbctx = Arc::clone(&dbctx);
- spawn(async move {
- let host_id = candidate.host_id;
- let res = find_client_task(dbctx, candidate).await;
- eprintln!("task client for {}: {:?}", host_id, res);
- });
- }
-}
-
-async fn find_client_task(dbctx: Arc<DbCtx>, mut candidate: RunnerClient) -> Result<(), String> {
- let find_client_task_start = std::time::Instant::now();
-
- let (run, job) = 'find_work: loop {
- // try to find a job for this candidate:
- // * start with pending runs - these need *some* client to run them, but do not care which
- // * if no new jobs, maybe an existing job still needs a rerun on this client?
- // * otherwise, um, i dunno. do nothing?
-
- let runs = dbctx.get_pending_runs(Some(candidate.host_id)).unwrap();
-
- if runs.len() > 0 {
- println!("{} new runs", runs.len());
-
- for run in runs.into_iter() {
- let job = dbctx.job_by_id(run.job_id).expect("can query").expect("job exists");
-
- if candidate.will_accept(&job) {
- break 'find_work (run, job);
- }
-
- }
- }
-
- let alt_run_jobs = dbctx.jobs_needing_task_runs_for_host(candidate.host_id as u64).expect("can query");
-
- for job in alt_run_jobs.into_iter() {
- if candidate.will_accept(&job) {
- let run = dbctx.new_run(job.id, Some(candidate.host_id)).unwrap();
- break 'find_work (run, job);
- }
- }
-
- tokio::time::sleep(std::time::Duration::from_millis(100)).await;
-
- if candidate.test_connection().await.is_err() {
- return Err("lost client connection".to_string());
- }
-
- if find_client_task_start.elapsed().as_secs() > 300 {
- return Err("5min new task deadline elapsed".to_string());
- }
- };
-
- eprintln!("enqueueing job {} for alternate run under host id {}", job.id, candidate.host_id);
- activate_run(Arc::clone(&dbctx), candidate, &job, &run).await?;
-
- Ok(())
-}
-
-async fn old_task_reaper(dbctx: Arc<DbCtx>) {
- let mut potentially_stale_tasks = dbctx.get_active_runs().unwrap();
-
- let active_tasks = ACTIVE_TASKS.lock().unwrap();
-
- for (id, witness) in active_tasks.iter() {
- if let Some(idx) = potentially_stale_tasks.iter().position(|task| task.id == *id) {
- potentially_stale_tasks.swap_remove(idx);
- }
- }
-
- std::mem::drop(active_tasks);
-
- // ok, so we have tasks that are not active, now if the task is started we know someone should
- // be running it and they are not. retain only those tasks, as they are ones we may want to
- // mark dead.
- //
- // further, filter out any tasks created in the last 60 seconds. this is a VERY generous grace
- // period for clients that have accepted a job but for some reason we have not recorded them as
- // active (perhaps they are slow to ack somehow).
-
- let stale_threshold = crate::io::now_ms() - 60_000;
-
- let stale_tasks: Vec<Run> = potentially_stale_tasks.into_iter().filter(|task| {
- match (task.state, task.start_time) {
- // `run` is atomically set to `Started` and adorned with a `start_time`. disagreement
- // between the two means this run is corrupt and should be reaped.
- (RunState::Started, None) => {
- true
- },
- (RunState::Started, Some(start_time)) => {
- start_time < stale_threshold
- }
- // and if it's not `started`, it's either pending (not assigned yet, so not stale), or
- // one of the complete statuses.
- _ => {
- false
- }
- }
- }).collect();
-
- for task in stale_tasks.iter() {
- eprintln!("looks like task {} is stale, reaping", task.id);
- dbctx.reap_task(task.id).expect("works");
- }
-}
diff --git a/src/ci_runner.rs b/src/ci_runner.rs
deleted file mode 100644
index 9aaf33d..0000000
--- a/src/ci_runner.rs
+++ /dev/null
@@ -1,668 +0,0 @@
-use std::time::Duration;
-use std::os::unix::process::ExitStatusExt;
-use rlua::prelude::LuaError;
-use std::sync::{Arc, Mutex};
-use reqwest::{StatusCode, Response};
-use tokio::process::Command;
-use std::process::Stdio;
-use std::process::ExitStatus;
-use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
-use serde_json::json;
-use serde::{Deserialize, de::DeserializeOwned, Serialize};
-use std::task::{Context, Poll};
-use std::pin::Pin;
-use std::marker::Unpin;
-
-mod protocol;
-mod lua;
-mod io;
-
-use crate::io::{ArtifactStream, VecSink};
-use crate::protocol::{ClientProto, CommandInfo, TaskInfo, RequestedJob};
-use crate::lua::CommandOutput;
-
-#[derive(Debug)]
-enum WorkAcquireError {
- Reqwest(reqwest::Error),
- EarlyEof,
- Protocol(String),
-}
-
-struct RunnerClient {
- http: reqwest::Client,
- host: String,
- tx: hyper::body::Sender,
- rx: Response,
- current_job: Option<RequestedJob>,
-}
-
-impl RequestedJob {
- pub fn into_running(self, client: RunnerClient) -> RunningJob {
- RunningJob {
- job: self,
- client,
- current_step: StepTracker::new(),
- }
- }
-}
-
-struct JobEnv {
- lua: lua::BuildEnv,
- job: Arc<Mutex<RunningJob>>,
-}
-
-impl JobEnv {
- fn new(job: &Arc<Mutex<RunningJob>>) -> Self {
- let lua = lua::BuildEnv::new(job);
- JobEnv {
- lua,
- job: Arc::clone(job)
- }
- }
-
- async fn default_goodfile(self) -> Result<(), LuaError> {
- self.lua.run_build(crate::lua::DEFAULT_RUST_GOODFILE).await
- }
-
- async fn exec_goodfile(self) -> Result<(), LuaError> {
- let script = std::fs::read_to_string("./tmpdir/goodfile").unwrap();
- self.lua.run_build(script.as_bytes()).await
- }
-}
-
-pub struct RunningJob {
- job: RequestedJob,
- client: RunnerClient,
- current_step: StepTracker,
-}
-
-enum RepoError {
- CloneFailedIdk { exit_code: ExitStatus },
- CheckoutFailedIdk { exit_code: ExitStatus },
- CheckoutFailedMissingRef,
-}
-
-pub struct StepTracker {
- scopes: Vec<String>
-}
-
-impl StepTracker {
- pub fn new() -> Self {
- StepTracker {
- scopes: Vec::new()
- }
- }
-
- pub fn push(&mut self, name: String) {
- self.scopes.push(name);
- }
-
- pub fn pop(&mut self) {
- self.scopes.pop();
- }
-
- pub fn clear(&mut self) {
- self.scopes.clear();
- }
-
- pub fn full_step_path(&self) -> &[String] {
- self.scopes.as_slice()
- }
-}
-
-impl RunningJob {
- async fn send_metric(&mut self, name: &str, value: String) -> Result<(), String> {
- self.client.send_typed(&ClientProto::metric(name, value))
- .await
- .map_err(|e| format!("failed to send metric {}: {:?})", name, e))
- }
-
- // TODO: panics if hyper finds the channel is closed. hum
- async fn create_artifact(&self, name: &str, desc: &str) -> Result<ArtifactStream, String> {
- let (mut sender, body) = hyper::Body::channel();
- let resp = self.client.http.post("https://ci.butactuallyin.space:9876/api/artifact")
- .header("user-agent", "ci-butactuallyin-space-runner")
- .header("x-task-token", &self.job.build_token)
- .header("x-artifact-name", name)
- .header("x-artifact-desc", desc)
- .body(body)
- .send()
- .await
- .map_err(|e| format!("unable to send request: {:?}", e))?;
-
- if resp.status() == StatusCode::OK {
- eprintln!("[+] artifact '{}' started", name);
- Ok(ArtifactStream::new(sender))
- } else {
- Err(format!("[-] unable to create artifact: {:?}", resp))
- }
- }
-
- async fn clone_remote(&self) -> Result<(), RepoError> {
- let mut git_clone = Command::new("git");
- git_clone
- .arg("clone")
- .arg(&self.job.remote_url)
- .arg("tmpdir");
-
- let clone_res = self.execute_command_and_report(git_clone, "git clone log", &format!("git clone {} tmpdir", &self.job.remote_url)).await
- .map_err(|e| {
- eprintln!("stringy error (exec failed?) for clone: {}", e);
- RepoError::CloneFailedIdk { exit_code: ExitStatus::from_raw(0) }
- })?;
-
- if !clone_res.success() {
- return Err(RepoError::CloneFailedIdk { exit_code: clone_res });
- }
-
- let mut git_checkout = Command::new("git");
- git_checkout
- .current_dir("tmpdir")
- .arg("checkout")
- .arg(&self.job.commit);
-
- let checkout_res = self.execute_command_and_report(git_checkout, "git checkout log", &format!("git checkout {}", &self.job.commit)).await
- .map_err(|e| {
- eprintln!("stringy error (exec failed?) for checkout: {}", e);
- RepoError::CheckoutFailedIdk { exit_code: ExitStatus::from_raw(0) }
- })?;
-
- if !checkout_res.success() {
- if checkout_res.code() == Some(128) {
- return Err(RepoError::CheckoutFailedIdk { exit_code: checkout_res });
- } else {
- return Err(RepoError::CheckoutFailedMissingRef);
- }
- }
-
- Ok(())
- }
-
- async fn execute_command_and_report(&self, mut command: Command, name: &str, desc: &str) -> Result<ExitStatus, String> {
- let stdout_artifact = self.create_artifact(
- &format!("{} (stdout)", name),
- &format!("{} (stdout)", desc)
- ).await.expect("works");
- let stderr_artifact = self.create_artifact(
- &format!("{} (stderr)", name),
- &format!("{} (stderr)", desc)
- ).await.expect("works");
-
- let exit_status = self.execute_command(command, name, desc, stdout_artifact, stderr_artifact).await?;
-
- Ok(exit_status)
- }
-
- async fn execute_command_capture_output(&self, mut command: Command, name: &str, desc: &str) -> Result<crate::lua::CommandOutput, String> {
- let stdout_collector = VecSink::new();
- let stderr_collector = VecSink::new();
-
- let exit_status = self.execute_command(command, name, desc, stdout_collector.clone(), stderr_collector.clone()).await?;
-
- Ok(CommandOutput {
- exit_status,
- stdout: stdout_collector.take_buf(),
- stderr: stderr_collector.take_buf(),
- })
- }
-
- async fn execute_command(&self, mut command: Command, name: &str, desc: &str, mut stdout_reporter: impl AsyncWrite + Unpin + Send + 'static, mut stderr_reporter: impl AsyncWrite + Unpin + Send + 'static) -> Result<ExitStatus, String> {
- eprintln!("[.] running {}", name);
-
- let mut child = command
- .stdin(Stdio::null())
- .stdout(Stdio::piped())
- .stderr(Stdio::piped())
- .spawn()
- .map_err(|e| format!("failed to spawn '{}', {:?}", name, e))?;
-
- let mut child_stdout = child.stdout.take().unwrap();
- let mut child_stderr = child.stderr.take().unwrap();
-
- eprintln!("[.] '{}': forwarding stdout", name);
- tokio::spawn(async move { crate::io::forward_data(&mut child_stdout, &mut stdout_reporter).await });
- eprintln!("[.] '{}': forwarding stderr", name);
- tokio::spawn(async move { crate::io::forward_data(&mut child_stderr, &mut stderr_reporter).await });
-
- let res = child.wait().await
- .map_err(|e| format!("failed to wait? {:?}", e))?;
-
- if res.success() {
- eprintln!("[+] '{}' success", name);
- } else {
- eprintln!("[-] '{}' fail: {:?}", name, res);
- }
-
- Ok(res)
- }
-
- async fn run(mut self) {
- self.client.send_typed(&ClientProto::Started).await.unwrap();
-
- std::fs::remove_dir_all("tmpdir").unwrap();
- std::fs::create_dir("tmpdir").unwrap();
-
- let ctx = Arc::new(Mutex::new(self));
-
- let checkout_res = ctx.lock().unwrap().clone_remote().await;
-
- if let Err(e) = checkout_res {
- let status = "bad_ref";
- let status = ClientProto::task_status(TaskInfo::finished(status));
- eprintln!("checkout failed, reporting status: {:?}", status);
-
- let res = ctx.lock().unwrap().client.send_typed(&status).await;
- if let Err(e) = res {
- eprintln!("[!] FAILED TO REPORT JOB STATUS ({}): {:?}", "success", e);
- }
-
- return;
- }
-
- let lua_env = JobEnv::new(&ctx);
-
- let metadata = std::fs::metadata("./tmpdir/goodfile");
- let res: Result<String, (String, String)> = match metadata {
- Ok(_) => {
- match lua_env.exec_goodfile().await {
- Ok(()) => {
- Ok("pass".to_string())
- },
- Err(lua_err) => {
- Err(("failed".to_string(), lua_err.to_string()))
- }
- }
- },
- Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
- match lua_env.default_goodfile().await {
- Ok(()) => {
- Ok("pass".to_string())
- },
- Err(lua_err) => {
- Err(("failed".to_string(), lua_err.to_string()))
- }
- }
- },
- Err(e) => {
- eprintln!("[-] error finding goodfile: {:?}", e);
- Err(("failed".to_string(), "inaccessible goodfile".to_string()))
- }
- };
-
- match res {
- Ok(status) => {
- eprintln!("[+] job success!");
- let status = ClientProto::task_status(TaskInfo::finished(status));
- eprintln!("reporting status: {:?}", status);
-
- let res = ctx.lock().unwrap().client.send_typed(&status).await;
- if let Err(e) = res {
- eprintln!("[!] FAILED TO REPORT JOB STATUS ({}): {:?}", "success", e);
- }
- }
- Err((status, lua_err)) => {
- eprintln!("[-] job error: {}", status);
-
- let status = ClientProto::task_status(TaskInfo::interrupted(status, lua_err.to_string()));
- let res = ctx.lock().unwrap().client.send_typed(&status).await;
- if let Err(e) = res {
- eprintln!("[!] FAILED TO REPORT JOB STATUS ({:?}): {:?}", status, e);
- }
- }
- }
- }
-
- fn prep_command(command: &[String], working_dir: Option<&str>) -> (Command, String) {
- let mut cmd = Command::new(&command[0]);
- let cwd = match working_dir {
- Some(dir) => {
- format!("tmpdir/{}", dir)
- },
- None => {
- "tmpdir".to_string()
- }
- };
- eprintln!("prepared {:?} to run in {}", &command, &cwd);
- let human_name = command.join(" ");
- cmd
- .current_dir(cwd)
- .args(&command[1..]);
- (cmd, human_name)
- }
-
- async fn run_with_output(&mut self, command: &[String], working_dir: Option<&str>) -> Result<CommandOutput, String> {
- let (cmd, human_name) = Self::prep_command(command, working_dir);
-
- let cmd_res = self.execute_command_capture_output(cmd, &format!("{} log", human_name), &human_name).await?;
-
- if !cmd_res.exit_status.success() {
- return Err(format!("{} failed: {:?}", &human_name, cmd_res.exit_status));
- }
- Ok(cmd_res)
- }
-
- async fn run_command(&mut self, command: &[String], working_dir: Option<&str>) -> Result<(), String> {
- self.client.send_typed(&ClientProto::command(CommandInfo::started(command, working_dir, 1)))
- .await.unwrap();
-
- let (cmd, human_name) = Self::prep_command(command, working_dir);
-
- let cmd_res = self.execute_command_and_report(cmd, &format!("{} log", human_name), &human_name).await?;
-
- self.client.send_typed(&ClientProto::command(CommandInfo::finished(cmd_res.code(), 1)))
- .await.unwrap();
-
-
- if !cmd_res.success() {
- return Err(format!("{} failed: {:?}", &human_name, cmd_res));
- }
-
- Ok(())
- }
-}
-
-impl RunnerClient {
- async fn new(host: &str, mut sender: hyper::body::Sender, mut res: Response) -> Result<Self, String> {
- if res.status() != StatusCode::OK {
- return Err(format!("server returned a bad response: {:?}, response itself: {:?}", res.status(), res));
- }
-
- let hello = res.chunk().await.expect("chunk");
- if hello.as_ref().map(|x| &x[..]) != Some(b"hello") {
- return Err(format!("bad hello: {:?}", hello));
- }
-
- Ok(Self {
- http: reqwest::ClientBuilder::new()
- .connect_timeout(Duration::from_millis(1000))
- .timeout(Duration::from_millis(600000))
- .build()
- .expect("can build client"),
- host: host.to_string(),
- tx: sender,
- rx: res,
- current_job: None,
- })
- }
-
- async fn wait_for_work(&mut self, accepted_pushers: Option<&[String]>) -> Result<Option<RequestedJob>, WorkAcquireError> {
- loop {
- let message = self.recv_typed::<ClientProto>().await;
- match message {
- Ok(Some(ClientProto::NewTask(new_task))) => {
- return Ok(Some(new_task));
- },
- Ok(Some(ClientProto::Ping)) => {
- self.send_typed(&ClientProto::Pong).await
- .map_err(|e| WorkAcquireError::Protocol(format!("failed to pong: {}", e)))?;
- },
- Ok(Some(other)) => {
- return Err(WorkAcquireError::Protocol(format!("unexpected message: {:?}", other)));
- },
- Ok(None) => {
- return Ok(None);
- },
- Err(e) => {
- return Err(WorkAcquireError::Protocol(e));
- }
- }
- }
- }
-
- async fn recv(&mut self) -> Result<Option<serde_json::Value>, String> {
- self.recv_typed().await
- }
-
- async fn recv_typed<T: DeserializeOwned>(&mut self) -> Result<Option<T>, String> {
- match self.rx.chunk().await {
- Ok(Some(chunk)) => {
- serde_json::from_slice(&chunk)
- .map(Option::Some)
- .map_err(|e| {
- format!("not json: {:?}", e)
- })
- },
- Ok(None) => Ok(None),
- Err(e) => {
- Err(format!("error in recv: {:?}", e))
- }
- }
- }
-
- async fn send(&mut self, value: serde_json::Value) -> Result<(), String> {
- self.send_typed(&value).await
- }
-
- async fn send_typed<T: Serialize>(&mut self, t: &T) -> Result<(), String> {
- self.tx.send_data(
- serde_json::to_vec(t)
- .map_err(|e| format!("json error: {:?}", e))?
- .into()
- ).await
- .map_err(|e| format!("send error: {:?}", e))
- }
-}
-
-#[derive(Deserialize, Serialize)]
-struct RunnerConfig {
- server_address: String,
- auth_secret: String,
- allowed_pushers: Option<Vec<String>>,
-}
-
-#[tokio::main]
-async fn main() {
- tracing_subscriber::fmt::init();
- let mut args = std::env::args();
- args.next().expect("first arg exists");
- let config_path = args.next().unwrap_or("./runner_config.json".to_string());
- let runner_config: RunnerConfig = serde_json::from_reader(std::fs::File::open(config_path).expect("file exists and is accessible")).expect("valid json for RunnerConfig");
- let client = reqwest::ClientBuilder::new()
- .connect_timeout(Duration::from_millis(1000))
- .timeout(Duration::from_millis(600000))
- .build()
- .expect("can build client");
-
- let host_info = host_info::collect_host_info();
- eprintln!("host info: {:?}", host_info);
-
- loop {
- let (mut sender, body) = hyper::Body::channel();
-
- sender.send_data(serde_json::to_string(&ClientProto::new_task_please(
- runner_config.allowed_pushers.clone(),
- host_info.clone(),
- )).unwrap().into()).await.expect("req");
-
- let poll = client.post("https://ci.butactuallyin.space:9876/api/next_job")
- .header("user-agent", "ci-butactuallyin-space-runner")
- .header("authorization", runner_config.auth_secret.trim())
- .body(body)
- .send()
- .await;
-
- match poll {
- Ok(mut res) => {
- let mut client = match RunnerClient::new("ci.butactuallyin.space:9876", sender, res).await {
- Ok(client) => client,
- Err(e) => {
- eprintln!("failed to initialize client: {:?}", e);
- std::thread::sleep(Duration::from_millis(10000));
- continue;
- }
- };
- let job = match client.wait_for_work(runner_config.allowed_pushers.as_ref().map(|x| x.as_ref())).await {
- Ok(Some(request)) => request,
- Ok(None) => {
- eprintln!("no work to do (yet)");
- std::thread::sleep(Duration::from_millis(2000));
- continue;
- }
- Err(e) => {
- eprintln!("failed to get work: {:?}", e);
- std::thread::sleep(Duration::from_millis(10000));
- continue;
- }
- };
- eprintln!("requested work: {:?}", job);
-
- eprintln!("doing {:?}", job);
-
- let mut job = job.into_running(client);
- job.run().await;
- std::thread::sleep(Duration::from_millis(10000));
- },
- Err(e) => {
- let message = format!("{}", e);
-
- if message.contains("tcp connect error") {
- eprintln!("could not reach server. sleeping a bit and retrying.");
- std::thread::sleep(Duration::from_millis(5000));
- continue;
- }
-
- eprintln!("unhandled error: {}", message);
-
- std::thread::sleep(Duration::from_millis(1000));
- }
- }
- }
-}
-
-mod host_info {
- use crate::protocol::{CpuInfo, EnvInfo, HostInfo, MemoryInfo};
-
- // get host model name, microcode, and how many cores
- fn collect_cpu_info() -> CpuInfo {
- fn find_line(lines: &[String], prefix: &str) -> String {
- lines.iter()
- .find(|line| line.starts_with(prefix))
- .expect(&format!("{} line is present", prefix))
- .split(":")
- .last()
- .unwrap()
- .trim()
- .to_string()
- }
-
- /// try finding core `cpu`'s max frequency in khz. we'll assume this is the actual speed a
- /// build would run at.. fingers crossed.
- fn try_finding_cpu_freq(cpu: u32) -> Result<u64, String> {
- if let Ok(freq_str) = std::fs::read_to_string("/sys/devices/system/cpu/cpu0/cpufreq/cpuinfo_max_freq") {
- Ok(freq_str.trim().parse().unwrap())
- } else {
- // so cpufreq probably isn't around, maybe /proc/cpuinfo's mhz figure is present?
- let cpu_lines: Vec<String> = std::fs::read_to_string("/proc/cpuinfo").unwrap().split("\n").map(|line| line.to_string()).collect();
- let cpu_mhzes: Vec<&String> = cpu_lines.iter().filter(|line| line.starts_with("cpu MHz")).collect();
- match cpu_mhzes.get(cpu as usize) {
- Some(mhz) => {
- let mut line_parts = cpu_mhzes[cpu as usize].split(":");
- let _ = line_parts.next();
- let mhz = line_parts.next().unwrap().trim();
- let mhz: f64 = mhz.parse().unwrap();
- Ok((mhz * 1000.0) as u64)
- },
- None => {
- panic!("could not get cpu freq either from cpufreq or /proc/cpuinfo?");
- }
- }
- }
- }
-
- // we'll have to deploy one of a few techniques, because x86/x86_64 is internally
- // consistent, but aarch64 is different. who knows what other CPUs think.
- match std::env::consts::ARCH {
- "x86" | "x86_64" => {
- let cpu_lines: Vec<String> = std::fs::read_to_string("/proc/cpuinfo").unwrap().split("\n").map(|line| line.to_string()).collect();
- let model_names: Vec<&String> = cpu_lines.iter().filter(|line| line.starts_with("model name")).collect();
- let cores = model_names.len() as u32;
- let model_name = find_line(&cpu_lines, "model name");
- let vendor_id = find_line(&cpu_lines, "vendor_id");
- let family = find_line(&cpu_lines, "cpu family");
- let model = find_line(&cpu_lines, "model\t");
- let microcode = find_line(&cpu_lines, "microcode");
- let max_freq = try_finding_cpu_freq(0).unwrap();
-
- CpuInfo { model_name, microcode, cores, vendor_id, family, model, max_freq }
- }
- "aarch64" => {
- let cpu_lines: Vec<String> = std::fs::read_to_string("/proc/cpuinfo").unwrap().split("\n").map(|line| line.to_string()).collect();
- let processors: Vec<&String> = cpu_lines.iter().filter(|line| line.starts_with("processor")).collect();
- let cores = processors.len() as u32;
-
- // alternate possible path: /sys/firmware/devicetree/base/compatible
- let model_name = std::fs::read_to_string("/proc/device-tree/compatible").unwrap();
- let model_name = model_name.replace("\x00", ";");
- let vendor_id = find_line(&cpu_lines, "CPU implementer");
- let vendor_name = match vendor_id.as_str() {
- "0x41" => "Arm Limited".to_string(),
- "0x42" => "Broadcom Corporation".to_string(),
- "0x43" => "Cavium Inc".to_string(),
- "0x44" => "Digital Equipment Corporation".to_string(),
- "0x46" => "Fujitsu Ltd".to_string(),
- "0x49" => "Infineon Technologies AG".to_string(),
- "0x4d" => "Motorola".to_string(),
- "0x4e" => "NVIDIA Corporation".to_string(),
- "0x50" => "Applied Micro Circuits Corporation".to_string(),
- "0x51" => "Qualcomm Inc".to_string(),
- "0x56" => "Marvell International Ltd".to_string(),
- "0x69" => "Intel Corporation".to_string(),
- "0xc0" => "Ampere Computing".to_string(),
- other => format!("unknown aarch64 vendor {}", other),
- };
- let family = find_line(&cpu_lines, "CPU architecture");
- let model = find_line(&cpu_lines, "CPU part");
- let microcode = String::new();
- let max_freq = std::fs::read_to_string("/sys/devices/system/cpu/cpu0/cpufreq/cpuinfo_max_freq").unwrap().trim().parse().unwrap();
-
- CpuInfo { model_name, microcode, cores, vendor_id: vendor_name, family, model, max_freq }
- }
- other => {
- panic!("dunno how to find cpu info for {}, panik", other);
- }
- }
- }
-
- fn collect_mem_info() -> MemoryInfo {
- let mem_lines: Vec<String> = std::fs::read_to_string("/proc/meminfo").unwrap().split("\n").map(|line| line.to_string()).collect();
- let total = mem_lines[0].split(":").last().unwrap().trim().to_string();
- let available = mem_lines[2].split(":").last().unwrap().trim().to_string();
-
- MemoryInfo { total, available }
- }
-
- fn hostname() -> String {
- let mut bytes = [0u8; 4096];
- let res = unsafe {
- libc::gethostname(bytes.as_mut_ptr() as *mut std::ffi::c_char, bytes.len())
- };
- if res != 0 {
- panic!("gethostname failed {:?}", res);
- }
- let end = bytes.iter().position(|b| *b == 0).expect("hostname is null-terminated");
- std::ffi::CStr::from_bytes_with_nul(&bytes[..end+1]).expect("null-terminated string").to_str().expect("is utf-8").to_string()
- }
-
- pub fn collect_env_info() -> EnvInfo {
- EnvInfo {
- arch: std::env::consts::ARCH.to_string(),
- family: std::env::consts::FAMILY.to_string(),
- os: std::env::consts::OS.to_string(),
- }
- }
-
- pub fn collect_host_info() -> HostInfo {
- let cpu_info = collect_cpu_info();
- let memory_info = collect_mem_info();
- let hostname = hostname();
- let env_info = collect_env_info();
-
- HostInfo {
- hostname,
- cpu_info,
- memory_info,
- env_info,
- }
- }
-}
-
diff --git a/src/dbctx.rs b/src/dbctx.rs
deleted file mode 100644
index 7378b2e..0000000
--- a/src/dbctx.rs
+++ /dev/null
@@ -1,772 +0,0 @@
-use std::sync::Mutex;
-use futures_util::StreamExt;
-use rusqlite::{Connection, OptionalExtension};
-use std::time::{SystemTime, UNIX_EPOCH};
-use tokio::fs::{File, OpenOptions};
-use tokio::io::{AsyncReadExt, AsyncWriteExt};
-use std::path::Path;
-use std::path::PathBuf;
-
-use crate::io::ArtifactDescriptor;
-use crate::notifier::{RemoteNotifier, NotifierConfig};
-use crate::sql;
-
-const TOKEN_EXPIRY_MS: u64 = 1000 * 60 * 30;
-
-pub struct DbCtx {
- pub config_path: PathBuf,
- // don't love this but.. for now...
- pub conn: Mutex<Connection>,
-}
-
-#[derive(Debug, Clone)]
-pub struct Repo {
- pub id: u64,
- pub name: String,
- pub default_run_preference: Option<String>,
-}
-
-#[derive(Debug)]
-pub struct Remote {
- pub id: u64,
- pub repo_id: u64,
- pub remote_path: String,
- pub remote_api: String,
- pub remote_url: String,
- pub remote_git_url: String,
- pub notifier_config_path: String,
-}
-
-// a job tracks when we became aware of a commit from remote. typically a job will have a 1-1
-// relationship with commits, and potentially many *runs* of that job.
-#[derive(Debug, Clone)]
-pub struct Job {
- pub id: u64,
- pub remote_id: u64,
- pub commit_id: u64,
- pub created_time: u64,
- pub source: Option<String>,
- pub run_preferences: Option<String>,
-}
-
-// a run tracks the intent or obligation to have some runner somewhere run a goodfile and report
-// results. a job may have many runs from many different hosts rebuliding history, or reruns of the
-// same job on the same hardware to collect more datapoints on the operation.
-#[derive(Debug, Clone)]
-pub struct Run {
- pub id: u64,
- pub job_id: u64,
- pub artifacts_path: Option<String>,
- pub state: sql::RunState,
- pub host_id: Option<u64>,
- pub create_time: u64,
- pub start_time: Option<u64>,
- pub complete_time: Option<u64>,
- pub build_token: Option<String>,
- pub run_timeout: Option<u64>,
- pub build_result: Option<u8>,
- pub final_text: Option<String>,
-}
-
-impl Run {
- fn into_pending_run(self) -> PendingRun {
- PendingRun {
- id: self.id,
- job_id: self.job_id,
- create_time: self.create_time,
- }
- }
-}
-
-#[derive(Debug, Clone)]
-pub struct PendingRun {
- pub id: u64,
- pub job_id: u64,
- pub create_time: u64,
-}
-
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
-pub enum TokenValidity {
- Expired,
- Invalid,
- Valid,
-}
-
-#[derive(Debug, Clone, PartialEq, Eq)]
-pub struct MetricRecord {
- pub id: u64,
- pub run_id: u64,
- pub name: String,
- pub value: String
-}
-
-#[derive(Debug, Clone, PartialEq, Eq)]
-pub struct ArtifactRecord {
- pub id: u64,
- pub run_id: u64,
- pub name: String,
- pub desc: String,
- pub created_time: u64,
- pub completed_time: Option<u64>,
-}
-
-impl DbCtx {
- pub fn new<P: AsRef<Path>>(config_path: P, db_path: P) -> Self {
- DbCtx {
- config_path: config_path.as_ref().to_owned(),
- conn: Mutex::new(Connection::open(db_path).unwrap())
- }
- }
-
- pub fn create_tables(&self) -> Result<(), ()> {
- let conn = self.conn.lock().unwrap();
- conn.execute(sql::CREATE_ARTIFACTS_TABLE, ()).unwrap();
- conn.execute(sql::CREATE_JOBS_TABLE, ()).unwrap();
- conn.execute(sql::CREATE_METRICS_TABLE, ()).unwrap();
- conn.execute(sql::CREATE_COMMITS_TABLE, ()).unwrap();
- conn.execute(sql::CREATE_REPOS_TABLE, ()).unwrap();
- conn.execute(sql::CREATE_REPO_NAME_INDEX, ()).unwrap();
- conn.execute(sql::CREATE_REMOTES_TABLE, ()).unwrap();
- conn.execute(sql::CREATE_REMOTES_INDEX, ()).unwrap();
- conn.execute(sql::CREATE_RUNS_TABLE, ()).unwrap();
- conn.execute(sql::CREATE_HOSTS_TABLE, ()).unwrap();
-
- Ok(())
- }
-
- pub fn insert_metric(&self, run_id: u64, name: &str, value: &str) -> Result<(), String> {
- let conn = self.conn.lock().unwrap();
- conn
- .execute(
- "insert into metrics (run_id, name, value) values (?1, ?2, ?3) on conflict (run_id, name) do update set value=excluded.value",
- (run_id, name, value)
- )
- .expect("can upsert");
- Ok(())
- }
-
- pub fn new_commit(&self, sha: &str) -> Result<u64, String> {
- let conn = self.conn.lock().unwrap();
- conn
- .execute(
- "insert into commits (sha) values (?1)",
- [sha.clone()]
- )
- .expect("can insert");
-
- Ok(conn.last_insert_rowid() as u64)
- }
-
- pub fn new_repo(&self, name: &str) -> Result<u64, String> {
- let conn = self.conn.lock().unwrap();
- conn
- .execute(
- "insert into repos (repo_name) values (?1)",
- [name.clone()]
- )
- .map_err(|e| {
- format!("{:?}", e)
- })?;
-
- Ok(conn.last_insert_rowid() as u64)
- }
-
- pub async fn finalize_artifact(&self, artifact_id: u64) -> Result<(), String> {
- let conn = self.conn.lock().unwrap();
- conn
- .execute(
- "update artifacts set completed_time=?1 where id=?2",
- (crate::io::now_ms(), artifact_id)
- )
- .map(|_| ())
- .map_err(|e| {
- format!("{:?}", e)
- })
- }
-
- pub async fn reserve_artifact(&self, run_id: u64, name: &str, desc: &str) -> Result<ArtifactDescriptor, String> {
- let artifact_id = {
- let created_time = SystemTime::now()
- .duration_since(UNIX_EPOCH)
- .expect("now is before epoch")
- .as_millis() as u64;
- let conn = self.conn.lock().unwrap();
- conn
- .execute(
- "insert into artifacts (run_id, name, desc, created_time) values (?1, ?2, ?3, ?4)",
- (run_id, name, desc, created_time)
- )
- .map_err(|e| {
- format!("{:?}", e)
- })?;
-
- conn.last_insert_rowid() as u64
- };
-
- ArtifactDescriptor::new(run_id, artifact_id).await
- }
-
- pub fn lookup_artifact(&self, run_id: u64, artifact_id: u64) -> Result<Option<ArtifactRecord>, String> {
- let conn = self.conn.lock().unwrap();
- conn
- .query_row(sql::ARTIFACT_BY_ID, [artifact_id, run_id], |row| {
- let (id, run_id, name, desc, created_time, completed_time) = row.try_into().unwrap();
-
- Ok(ArtifactRecord {
- id, run_id, name, desc, created_time, completed_time
- })
- })
- .optional()
- .map_err(|e| e.to_string())
- }
-
- pub fn commit_sha(&self, commit_id: u64) -> Result<String, String> {
- self.conn.lock()
- .unwrap()
- .query_row(
- "select sha from commits where id=?1",
- [commit_id],
- |row| { row.get(0) }
- )
- .map_err(|e| e.to_string())
- }
-
- pub fn job_for_commit(&self, sha: &str) -> Result<Option<u64>, String> {
- self.conn.lock()
- .unwrap()
- .query_row(
- "select id from commits where sha=?1",
- [sha],
- |row| { row.get(0) }
- )
- .optional()
- .map_err(|e| e.to_string())
- }
-
- pub fn run_for_token(&self, token: &str) -> Result<Option<(u64, Option<String>, TokenValidity)>, String> {
- self.conn.lock()
- .unwrap()
- .query_row(
- "select id, artifacts_path, started_time, run_timeout from runs where build_token=?1",
- [token],
- |row| {
- let timeout: Option<u64> = row.get(3).unwrap();
- let timeout = timeout.unwrap_or(TOKEN_EXPIRY_MS);
-
- let now = SystemTime::now()
- .duration_since(UNIX_EPOCH)
- .expect("now is before epoch")
- .as_millis();
-
- let time: Option<u64> = row.get(2).unwrap();
- let validity = if let Some(time) = time {
- if now > time as u128 + timeout as u128 {
- TokenValidity::Expired
- } else {
- TokenValidity::Valid
- }
- } else {
- TokenValidity::Invalid
- };
- Ok((row.get(0).unwrap(), row.get(1).unwrap(), validity))
- }
- )
- .optional()
- .map_err(|e| e.to_string())
- }
-
- pub fn job_by_id(&self, id: u64) -> Result<Option<Job>, String> {
- self.conn.lock()
- .unwrap()
- .query_row(crate::sql::JOB_BY_ID, [id], |row| {
- let (id, source, created_time, remote_id, commit_id, run_preferences) = row.try_into().unwrap();
-
- Ok(Job {
- id, source, created_time, remote_id, commit_id, run_preferences
- })
- })
- .optional()
- .map_err(|e| e.to_string())
- }
-
- pub fn remote_by_path_and_api(&self, api: &str, path: &str) -> Result<Option<Remote>, String> {
- self.conn.lock()
- .unwrap()
- .query_row("select id, repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path from remotes where remote_api=?1 and remote_path=?2", [api, path], |row| {
- let (id, repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path) = row.try_into().unwrap();
-
- Ok(Remote {
- id, repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path
- })
- })
- .optional()
- .map_err(|e| e.to_string())
- }
-
- pub fn remote_by_id(&self, id: u64) -> Result<Option<Remote>, String> {
- self.conn.lock()
- .unwrap()
- .query_row("select id, repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path from remotes where id=?1", [id], |row| {
- let (id, repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path) = row.try_into().unwrap();
-
- Ok(Remote {
- id, repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path
- })
- })
- .optional()
- .map_err(|e| e.to_string())
- }
-
- pub fn repo_id_by_remote(&self, remote_id: u64) -> Result<Option<u64>, String> {
- self.conn.lock()
- .unwrap()
- .query_row("select repo_id from remotes where id=?1", [remote_id], |row| row.get(0))
- .optional()
- .map_err(|e| e.to_string())
- }
-
- pub fn repo_id_by_name(&self, repo_name: &str) -> Result<Option<u64>, String> {
- self.conn.lock()
- .unwrap()
- .query_row("select id from repos where repo_name=?1", [repo_name], |row| row.get(0))
- .optional()
- .map_err(|e| e.to_string())
- }
-
- pub fn new_remote(&self, repo_id: u64, remote: &str, remote_kind: &str, config_path: &str) -> Result<u64, String> {
- let (remote_path, remote_api, remote_url, remote_git_url) = match remote_kind {
- "github" => {
- (remote.to_owned(), remote_kind.to_owned(), format!("https://www.github.com/{}", remote), format!("https://www.github.com/{}.git", remote))
- },
- "github-email" => {
- (remote.to_owned(), "email".to_owned(), format!("https://www.github.com/{}", remote), format!("http://www.github.com/{}.git", remote))
- },
- other => {
- panic!("unsupported remote kind: {}", other);
- }
- };
-
- let conn = self.conn.lock().unwrap();
- conn
- .execute(
- "insert into remotes (repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path) values (?1, ?2, ?3, ?4, ?5, ?6);",
- (repo_id, remote_path, remote_api, remote_url, remote_git_url, config_path)
- )
- .expect("can insert");
-
- Ok(conn.last_insert_rowid() as u64)
- }
-
- pub fn new_job(&self, remote_id: u64, sha: &str, pusher: Option<&str>, repo_default_run_pref: Option<String>) -> Result<u64, String> {
- // TODO: potential race: if two remotes learn about a commit at the same time and we decide
- // to create two jobs at the same time, this might return an incorrect id if the insert
- // didn't actually insert a new row.
- let commit_id = self.new_commit(sha).expect("can create commit record");
-
- let created_time = SystemTime::now()
- .duration_since(UNIX_EPOCH)
- .expect("now is before epoch")
- .as_millis() as u64;
-
- let conn = self.conn.lock().unwrap();
-
- let rows_modified = conn.execute(
- "insert into jobs (remote_id, commit_id, created_time, source, run_preferences) values (?1, ?2, ?3, ?4, ?5);",
- (remote_id, commit_id, created_time, pusher, repo_default_run_pref)
- ).unwrap();
-
- assert_eq!(1, rows_modified);
-
- let job_id = conn.last_insert_rowid() as u64;
-
- Ok(job_id)
- }
-
- pub fn new_run(&self, job_id: u64, host_preference: Option<u32>) -> Result<PendingRun, String> {
- let created_time = SystemTime::now()
- .duration_since(UNIX_EPOCH)
- .expect("now is before epoch")
- .as_millis() as u64;
-
- let conn = self.conn.lock().unwrap();
-
- let rows_modified = conn.execute(
- "insert into runs (job_id, state, created_time, host_preference) values (?1, ?2, ?3, ?4);",
- (job_id, crate::sql::RunState::Pending as u64, created_time, host_preference)
- ).unwrap();
-
- assert_eq!(1, rows_modified);
-
- let run_id = conn.last_insert_rowid() as u64;
-
- Ok(PendingRun {
- id: run_id,
- job_id,
- create_time: created_time,
- })
- }
-
- pub fn reap_task(&self, task_id: u64) -> Result<(), String> {
- let conn = self.conn.lock().unwrap();
-
- conn.execute(
- "update runs set final_status=\"lost signal\", state=4 where id=?1;",
- [task_id]
- ).unwrap();
-
- Ok(())
- }
-
- pub fn metrics_for_run(&self, run: u64) -> Result<Vec<MetricRecord>, String> {
- let conn = self.conn.lock().unwrap();
-
- let mut metrics_query = conn.prepare(sql::METRICS_FOR_RUN).unwrap();
- let mut result = metrics_query.query([run]).unwrap();
- let mut metrics = Vec::new();
-
- while let Some(row) = result.next().unwrap() {
- let (id, run_id, name, value): (u64, u64, String, String) = row.try_into().unwrap();
- metrics.push(MetricRecord { id, run_id, name, value });
- }
-
- Ok(metrics)
- }
-
- pub fn artifacts_for_run(&self, run: u64, limit: Option<u64>) -> Result<Vec<ArtifactRecord>, String> {
- let conn = self.conn.lock().unwrap();
-
- let mut artifacts_query = conn.prepare(sql::LAST_ARTIFACTS_FOR_RUN).unwrap();
- let mut result = artifacts_query.query([run, limit.unwrap_or(65535)]).unwrap();
- let mut artifacts = Vec::new();
-
- while let Some(row) = result.next().unwrap() {
- let (id, run_id, name, desc, created_time, completed_time): (u64, u64, String, String, u64, Option<u64>) = row.try_into().unwrap();
- artifacts.push(ArtifactRecord { id, run_id, name, desc, created_time, completed_time });
- }
-
- Ok(artifacts)
- }
-
- pub fn repo_by_id(&self, id: u64) -> Result<Option<Repo>, String> {
- self.conn.lock()
- .unwrap()
- .query_row("select id, repo_name, default_run_preference from repos where id=?1", [id], |row| {
- let (id, repo_name, default_run_preference) = row.try_into().unwrap();
- Ok(Repo {
- id,
- name: repo_name,
- default_run_preference,
- })
- })
- .optional()
- .map_err(|e| e.to_string())
- }
-
- pub fn get_repos(&self) -> Result<Vec<Repo>, String> {
- let conn = self.conn.lock().unwrap();
-
- let mut repos_query = conn.prepare(sql::ALL_REPOS).unwrap();
- let mut repos = repos_query.query([]).unwrap();
- let mut result = Vec::new();
-
- while let Some(row) = repos.next().unwrap() {
- let (id, repo_name, default_run_preference) = row.try_into().unwrap();
- result.push(Repo {
- id,
- name: repo_name,
- default_run_preference,
- });
- }
-
- Ok(result)
- }
-
- pub fn last_job_from_remote(&self, id: u64) -> Result<Option<Job>, String> {
- self.recent_jobs_from_remote(id, 1)
- .map(|mut jobs| jobs.pop())
- }
-
- pub fn job_by_commit_id(&self, commit_id: u64) -> Result<Option<Job>, String> {
- let conn = self.conn.lock().unwrap();
-
- conn
- .query_row(sql::JOB_BY_COMMIT_ID, [commit_id], |row| {
- let (id, source, created_time, remote_id, commit_id, run_preferences) = row.try_into().unwrap();
- Ok(Job {
- id,
- remote_id,
- commit_id,
- created_time,
- source,
- run_preferences,
- })
- })
- .optional()
- .map_err(|e| e.to_string())
- }
-
- pub fn recent_jobs_from_remote(&self, id: u64, limit: u64) -> Result<Vec<Job>, String> {
- let conn = self.conn.lock().unwrap();
-
- let mut job_query = conn.prepare(sql::LAST_JOBS_FROM_REMOTE).unwrap();
- let mut result = job_query.query([id, limit]).unwrap();
-
- let mut jobs = Vec::new();
-
- while let Some(row) = result.next().unwrap() {
- let (id, source, created_time, remote_id, commit_id, run_preferences) = row.try_into().unwrap();
- jobs.push(Job {
- id,
- remote_id,
- commit_id,
- created_time,
- source,
- run_preferences,
- });
- }
-
- Ok(jobs)
- }
-
- pub fn get_active_runs(&self) -> Result<Vec<Run>, String> {
- let conn = self.conn.lock().unwrap();
-
- let mut started_query = conn.prepare(sql::ACTIVE_RUNS).unwrap();
- let mut runs = started_query.query([]).unwrap();
- let mut started = Vec::new();
-
- while let Some(row) = runs.next().unwrap() {
- started.push(crate::sql::row2run(row));
- }
-
- Ok(started)
- }
-
- pub fn get_pending_runs(&self, host_id: Option<u32>) -> Result<Vec<PendingRun>, String> {
- let conn = self.conn.lock().unwrap();
-
- let mut pending_query = conn.prepare(sql::PENDING_RUNS).unwrap();
- let mut runs = pending_query.query([host_id]).unwrap();
- let mut pending = Vec::new();
-
- while let Some(row) = runs.next().unwrap() {
- let (id, job_id, create_time) = row.try_into().unwrap();
- let run = PendingRun {
- id,
- job_id,
- create_time,
- };
- pending.push(run);
- }
-
- Ok(pending)
- }
-
- pub fn jobs_needing_task_runs_for_host(&self, host_id: u64) -> Result<Vec<Job>, String> {
- // for jobs that this host has not run, we'll arbitrarily say that we won't generate new
- // runs for jobs more than a day old.
- //
- // we don't want to rebuild the entire history every time we see a new host by default; if
- // you really want to rebuild all of history on a new host, use `ci_ctl` to prepare the
- // runs.
- let cutoff = crate::io::now_ms() - 24 * 3600 * 1000;
-
- let conn = self.conn.lock().unwrap();
-
- let mut jobs_needing_task_runs = conn.prepare(sql::JOBS_NEEDING_HOST_RUN).unwrap();
- let mut job_rows = jobs_needing_task_runs.query([cutoff, host_id]).unwrap();
- let mut jobs = Vec::new();
-
- while let Some(row) = job_rows.next().unwrap() {
- let (id, source, created_time, remote_id, commit_id, run_preferences) = row.try_into().unwrap();
-
- jobs.push(Job {
- id, source, created_time, remote_id, commit_id, run_preferences,
- });
- }
-
- Ok(jobs)
- }
-
-
- pub fn remotes_by_repo(&self, repo_id: u64) -> Result<Vec<Remote>, String> {
- let mut remotes: Vec<Remote> = Vec::new();
-
- let conn = self.conn.lock().unwrap();
- let mut remotes_query = conn.prepare(crate::sql::REMOTES_FOR_REPO).unwrap();
- let mut remote_results = remotes_query.query([repo_id]).unwrap();
-
- while let Some(row) = remote_results.next().unwrap() {
- let (id, repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path) = row.try_into().unwrap();
- remotes.push(Remote { id, repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path });
- }
-
- Ok(remotes)
- }
-
- /// try to find a host close to `host_info`, but maybe not an exact match.
- ///
- /// specifically, we'll ignore microcode and family/os - enough that measurements ought to be
- /// comparable but maybe not perfectly so.
- pub fn find_id_like_host(&self, host_info: &crate::protocol::HostInfo) -> Result<Option<u32>, String> {
- self.conn.lock()
- .unwrap()
- .query_row(
- "select id from hosts where \
- hostname=?1 and cpu_vendor_id=?2 and cpu_model_name=?3 and cpu_family=?4 and \
- cpu_model=?5 and cpu_max_freq_khz=?6 and cpu_cores=?7 and mem_total=?8 and \
- arch=?9);",
- (
- &host_info.hostname,
- &host_info.cpu_info.vendor_id,
- &host_info.cpu_info.model_name,
- &host_info.cpu_info.family,
- &host_info.cpu_info.model,
- &host_info.cpu_info.max_freq,
- &host_info.cpu_info.cores,
- &host_info.memory_info.total,
- &host_info.env_info.arch,
- ),
- |row| { row.get(0) }
- )
- .map_err(|e| e.to_string())
- }
-
- /// get an id for the host described by `host_info`. this may create a new record if no such
- /// host exists.
- pub fn id_for_host(&self, host_info: &crate::protocol::HostInfo) -> Result<u32, String> {
- let conn = self.conn.lock().unwrap();
-
- conn
- .execute(
- "insert or ignore into hosts \
- (\
- hostname, cpu_vendor_id, cpu_model_name, cpu_family, \
- cpu_model, cpu_microcode, cpu_max_freq_khz, cpu_cores, \
- mem_total, arch, family, os\
- ) values (\
- ?1, ?2, ?3, ?4, \
- ?5, ?6, ?7, ?8, \
- ?9, ?10, ?11, ?12 \
- );",
- (
- &host_info.hostname,
- &host_info.cpu_info.vendor_id,
- &host_info.cpu_info.model_name,
- &host_info.cpu_info.family,
- &host_info.cpu_info.model,
- &host_info.cpu_info.microcode,
- &host_info.cpu_info.max_freq,
- &host_info.cpu_info.cores,
- &host_info.memory_info.total,
- &host_info.env_info.arch,
- &host_info.env_info.family,
- &host_info.env_info.os,
- )
- )
- .expect("can insert");
-
- conn
- .query_row(
- "select id from hosts where \
- hostname=?1 and cpu_vendor_id=?2 and cpu_model_name=?3 and cpu_family=?4 and \
- cpu_model=?5 and cpu_microcode=?6 and cpu_max_freq_khz=?7 and \
- cpu_cores=?8 and mem_total=?9 and arch=?10 and family=?11 and os=?12;",
- (
- &host_info.hostname,
- &host_info.cpu_info.vendor_id,
- &host_info.cpu_info.model_name,
- &host_info.cpu_info.family,
- &host_info.cpu_info.model,
- &host_info.cpu_info.microcode,
- &host_info.cpu_info.max_freq,
- &host_info.cpu_info.cores,
- &host_info.memory_info.total,
- &host_info.env_info.arch,
- &host_info.env_info.family,
- &host_info.env_info.os,
- ),
- |row| { row.get(0) }
- )
- .map_err(|e| e.to_string())
- }
-
- pub fn host_model_info(&self, host_id: u64) -> Result<(String, String, String, String, u64), String> {
- let conn = self.conn.lock().unwrap();
- conn
- .query_row("select hostname, cpu_vendor_id, cpu_family, cpu_model, cpu_max_freq_khz from hosts where id=?1;", [host_id], |row| {
- Ok((
- row.get(0).unwrap(),
- row.get(1).unwrap(),
- row.get(2).unwrap(),
- row.get(3).unwrap(),
- row.get(4).unwrap(),
- ))
- })
- .map_err(|e| e.to_string())
- }
-
- pub fn runs_for_job_one_per_host(&self, job_id: u64) -> Result<Vec<Run>, String> {
- let conn = self.conn.lock().unwrap();
- let mut runs_query = conn.prepare(crate::sql::RUNS_FOR_JOB).unwrap();
- let mut runs_results = runs_query.query([job_id]).unwrap();
-
- let mut results = Vec::new();
-
- while let Some(row) = runs_results.next().unwrap() {
- results.push(crate::sql::row2run(row));
- }
-
- Ok(results)
- }
-
- pub fn last_run_for_job(&self, job_id: u64) -> Result<Option<Run>, String> {
- let conn = self.conn.lock().unwrap();
-
- conn
- .query_row(sql::LAST_RUN_FOR_JOB, [job_id], |row| {
- Ok(crate::sql::row2run(row))
- })
- .optional()
- .map_err(|e| e.to_string())
- }
-
- pub fn notifiers_by_repo(&self, repo_id: u64) -> Result<Vec<RemoteNotifier>, String> {
- let remotes = self.remotes_by_repo(repo_id)?;
-
- let mut notifiers: Vec<RemoteNotifier> = Vec::new();
-
- for remote in remotes.into_iter() {
- match remote.remote_api.as_str() {
- "github" => {
- let mut notifier_path = self.config_path.clone();
- notifier_path.push(&remote.notifier_config_path);
-
- let notifier = RemoteNotifier {
- remote_path: remote.remote_path,
- notifier: NotifierConfig::github_from_file(&notifier_path)
- .expect("can load notifier config")
- };
- notifiers.push(notifier);
- },
- "email" => {
- let mut notifier_path = self.config_path.clone();
- notifier_path.push(&remote.notifier_config_path);
-
- let notifier = RemoteNotifier {
- remote_path: remote.remote_path,
- notifier: NotifierConfig::email_from_file(&notifier_path)
- .expect("can load notifier config")
- };
- notifiers.push(notifier);
- }
- other => {
- eprintln!("unknown remote api kind: {:?}, remote is {:?}", other, &remote)
- }
- }
- }
-
- Ok(notifiers)
- }
-}
-
diff --git a/src/io.rs b/src/io.rs
deleted file mode 100644
index f9f407f..0000000
--- a/src/io.rs
+++ /dev/null
@@ -1,181 +0,0 @@
-use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
-use futures_util::StreamExt;
-use tokio::fs::File;
-use std::io::Write;
-use tokio::fs::OpenOptions;
-use std::task::{Poll, Context};
-use std::pin::Pin;
-use std::time::{UNIX_EPOCH, SystemTime};
-use std::sync::{Arc, Mutex};
-
-pub fn now_ms() -> u64 {
- SystemTime::now()
- .duration_since(UNIX_EPOCH)
- .expect("now is later than epoch")
- .as_millis() as u64
-}
-
-#[derive(Clone)]
-pub struct VecSink {
- body: Arc<Mutex<Vec<u8>>>,
-}
-
-impl VecSink {
- pub fn new() -> Self {
- Self { body: Arc::new(Mutex::new(Vec::new())) }
- }
-
- pub fn take_buf(&self) -> Vec<u8> {
- std::mem::replace(&mut *self.body.lock().unwrap(), Vec::new())
- }
-}
-
-impl tokio::io::AsyncWrite for VecSink {
- fn poll_write(
- self: Pin<&mut Self>,
- cx: &mut Context,
- buf: &[u8]
- ) -> Poll<Result<usize, std::io::Error>> {
- self.body.lock().unwrap().extend_from_slice(buf);
- Poll::Ready(Ok(buf.len()))
- }
-
- fn poll_flush(
- self: Pin<&mut Self>,
- _cx: &mut Context
- ) -> Poll<Result<(), std::io::Error>> {
- Poll::Ready(Ok(()))
- }
-
- fn poll_shutdown(
- self: Pin<&mut Self>,
- _cx: &mut Context
- ) -> Poll<Result<(), std::io::Error>> {
- Poll::Ready(Ok(()))
- }
-}
-
-pub struct ArtifactStream {
- sender: hyper::body::Sender,
-}
-
-impl ArtifactStream {
- pub fn new(sender: hyper::body::Sender) -> Self {
- Self { sender }
- }
-}
-
-impl tokio::io::AsyncWrite for ArtifactStream {
- fn poll_write(
- self: Pin<&mut Self>,
- cx: &mut Context,
- buf: &[u8]
- ) -> Poll<Result<usize, std::io::Error>> {
- match self.get_mut().sender.try_send_data(buf.to_vec().into()) {
- Ok(()) => {
- Poll::Ready(Ok(buf.len()))
- },
- _ => {
- cx.waker().wake_by_ref();
- Poll::Pending
- }
- }
- }
-
- fn poll_flush(
- self: Pin<&mut Self>,
- _cx: &mut Context
- ) -> Poll<Result<(), std::io::Error>> {
- Poll::Ready(Ok(()))
- }
-
- fn poll_shutdown(
- self: Pin<&mut Self>,
- _cx: &mut Context
- ) -> Poll<Result<(), std::io::Error>> {
- Poll::Ready(Ok(()))
- }
-}
-
-
-pub struct ArtifactDescriptor {
- job_id: u64,
- pub artifact_id: u64,
- file: File,
-}
-
-impl ArtifactDescriptor {
- pub async fn new(job_id: u64, artifact_id: u64) -> Result<Self, String> {
- // TODO: jobs should be a configurable path
- let path = format!("artifacts/{}/{}", job_id, artifact_id);
- let file = OpenOptions::new()
- .read(true)
- .write(true)
- .create_new(true)
- .open(&path)
- .await
- .map_err(|e| format!("couldn't open artifact file {}: {}", path, e))?;
-
- Ok(ArtifactDescriptor {
- job_id,
- artifact_id,
- file,
- })
- }
-
- pub async fn store_all(&mut self, mut data: axum::extract::BodyStream) -> Result<(), String> {
- loop {
- let chunk = data.next().await;
-
- let chunk = match chunk {
- Some(Ok(chunk)) => chunk,
- Some(Err(e)) => {
- eprintln!("error: {:?}", e);
- return Err(format!("error reading: {:?}", e));
- }
- None => {
- eprintln!("body done?");
- return Ok(());
- }
- };
-
- let chunk = chunk.as_ref();
-
- self.file.write_all(chunk).await
- .map_err(|e| format!("failed to write: {:?}", e))?;
- }
- }
-}
-
-pub async fn forward_data(source: &mut (impl AsyncRead + Unpin), dest: &mut (impl AsyncWrite + Unpin)) -> Result<(), String> {
- let mut buf = vec![0; 1024 * 1024];
- loop {
- let n_read = source.read(&mut buf).await
- .map_err(|e| format!("failed to read: {:?}", e))?;
-
- if n_read == 0 {
- eprintln!("done reading!");
- return Ok(());
- }
-
- dest.write_all(&buf[..n_read]).await
- .map_err(|e| format!("failed to write: {:?}", e))?;
- }
-}
-/*
-pub async fn forward_data(source: &mut (impl AsyncRead + Unpin), dest: &mut ArtifactStream) -> Result<(), String> {
- let mut buf = vec![0; 1024 * 1024];
- loop {
- let n_read = source.read(&mut buf).await
- .map_err(|e| format!("failed to read: {:?}", e))?;
-
- if n_read == 0 {
- eprintln!("done reading!");
- return Ok(());
- }
-
- dest.sender.send_data(buf[..n_read].to_vec().into()).await
- .map_err(|e| format!("failed to write: {:?}", e))?;
- }
-}
-*/
diff --git a/src/lua/mod.rs b/src/lua/mod.rs
deleted file mode 100644
index 6c4a281..0000000
--- a/src/lua/mod.rs
+++ /dev/null
@@ -1,411 +0,0 @@
-use crate::RunningJob;
-
-use rlua::prelude::*;
-
-use std::sync::{Arc, Mutex};
-use std::path::PathBuf;
-
-pub const DEFAULT_RUST_GOODFILE: &'static [u8] = include_bytes!("../../config/goodfiles/rust.lua");
-
-pub struct BuildEnv {
- lua: Lua,
- job: Arc<Mutex<RunningJob>>,
-}
-
-#[derive(Debug)]
-pub struct RunParams {
- step: Option<String>,
- name: Option<String>,
- cwd: Option<String>,
-}
-
-pub struct CommandOutput {
- pub exit_status: std::process::ExitStatus,
- pub stdout: Vec<u8>,
- pub stderr: Vec<u8>,
-}
-
-mod lua_exports {
- use crate::RunningJob;
- use crate::lua::{CommandOutput, RunParams};
-
- use std::sync::{Arc, Mutex};
- use std::path::PathBuf;
-
- use rlua::prelude::*;
-
- pub fn collect_build_args(command: LuaValue, params: LuaValue) -> Result<(Vec<String>, RunParams), rlua::Error> {
- let args = match command {
- LuaValue::Table(table) => {
- let len = table.len().expect("command table has a length");
- let mut command_args = Vec::new();
- for i in 0..len {
- let value = table.get(i + 1).expect("command arg is gettble");
- match value {
- LuaValue::String(s) => {
- command_args.push(s.to_str().unwrap().to_owned());
- },
- other => {
- return Err(LuaError::RuntimeError(format!("argument {} was not a string, was {:?}", i, other)));
- }
- };
- }
-
- command_args
- },
- other => {
- return Err(LuaError::RuntimeError(format!("argument 1 was not a table: {:?}", other)));
- }
- };
-
- let params = match params {
- LuaValue::Table(table) => {
- let step = match table.get("step").expect("can get from table") {
- LuaValue::String(v) => {
- Some(v.to_str()?.to_owned())
- },
- LuaValue::Nil => {
- None
- },
- other => {
- return Err(LuaError::RuntimeError(format!("params[\"step\"] must be a string")));
- }
- };
- let name = match table.get("name").expect("can get from table") {
- LuaValue::String(v) => {
- Some(v.to_str()?.to_owned())
- },
- LuaValue::Nil => {
- None
- },
- other => {
- return Err(LuaError::RuntimeError(format!("params[\"name\"] must be a string")));
- }
- };
- let cwd = match table.get("cwd").expect("can get from table") {
- LuaValue::String(v) => {
- Some(v.to_str()?.to_owned())
- },
- LuaValue::Nil => {
- None
- },
- other => {
- return Err(LuaError::RuntimeError(format!("params[\"cwd\"] must be a string")));
- }
- };
-
- RunParams {
- step,
- name,
- cwd,
- }
- },
- LuaValue::Nil => {
- RunParams {
- step: None,
- name: None,
- cwd: None,
- }
- }
- other => {
- return Err(LuaError::RuntimeError(format!("argument 2 was not a table: {:?}", other)));
- }
- };
-
- Ok((args, params))
- }
-
- pub fn build_command_impl(command: LuaValue, params: LuaValue, job_ctx: Arc<Mutex<RunningJob>>) -> Result<(), rlua::Error> {
- let (args, params) = collect_build_args(command, params)?;
- eprintln!("args: {:?}", args);
- eprintln!(" params: {:?}", params);
- let rt = tokio::runtime::Builder::new_current_thread()
- .enable_all()
- .build()
- .unwrap();
- rt.block_on(async move {
- job_ctx.lock().unwrap().run_command(&args, params.cwd.as_ref().map(|x| x.as_str())).await
- .map_err(|e| LuaError::RuntimeError(format!("run_command error: {:?}", e)))
- })
- }
-
- pub fn check_output_impl<'lua>(ctx: rlua::Context<'lua>, command: LuaValue<'lua>, params: LuaValue<'lua>, job_ctx: Arc<Mutex<RunningJob>>) -> Result<rlua::Table<'lua>, rlua::Error> {
- let (args, params) = collect_build_args(command, params)?;
- eprintln!("args: {:?}", args);
- eprintln!(" params: {:?}", params);
- let rt = tokio::runtime::Builder::new_current_thread()
- .enable_all()
- .build()
- .unwrap();
- let command_output = rt.block_on(async move {
- job_ctx.lock().unwrap().run_with_output(&args, params.cwd.as_ref().map(|x| x.as_str())).await
- .map_err(|e| LuaError::RuntimeError(format!("run_command error: {:?}", e)))
- })?;
-
- let stdout = ctx.create_string(command_output.stdout.as_slice())?;
- let stderr = ctx.create_string(command_output.stderr.as_slice())?;
-
- let result = ctx.create_table()?;
- result.set("stdout", stdout)?;
- result.set("stderr", stderr)?;
- result.set("status", command_output.exit_status.code())?;
- Ok(result)
- }
-
- pub fn check_dependencies(commands: Vec<String>) -> Result<(), rlua::Error> {
- let mut missing_deps = Vec::new();
- for command in commands.iter() {
- if !has_cmd(command)? {
- missing_deps.push(command.clone());
- }
- }
-
- if missing_deps.len() > 0 {
- return Err(LuaError::RuntimeError(format!("missing dependencies: {}", missing_deps.join(", "))));
- }
-
- Ok(())
- }
-
- pub fn metric(name: String, value: String, job_ctx: Arc<Mutex<RunningJob>>) -> Result<(), rlua::Error> {
- let rt = tokio::runtime::Builder::new_current_thread()
- .enable_all()
- .build()
- .unwrap();
- rt.block_on(async move {
- job_ctx.lock().unwrap().send_metric(&name, value).await
- .map_err(|e| LuaError::RuntimeError(format!("send_metric error: {:?}", e)))
- })
- }
-
- pub fn artifact(path: String, name: Option<String>, job_ctx: Arc<Mutex<RunningJob>>) -> Result<(), rlua::Error> {
- let path: PathBuf = path.into();
-
- let default_name: String = match (path.file_name(), path.parent()) {
- (Some(name), _) => name
- .to_str()
- .ok_or(LuaError::RuntimeError("artifact name is not a unicode string".to_string()))?
- .to_string(),
- (_, Some(parent)) => format!("{}", parent.display()),
- (None, None) => {
- // one day using directories for artifacts might work but / is still not going
- // to be accepted
- return Err(LuaError::RuntimeError(format!("cannot infer a default path name for {}", path.display())));
- }
- };
-
- let name: String = match name {
- Some(name) => name,
- None => default_name,
- };
- let rt = tokio::runtime::Builder::new_current_thread()
- .enable_all()
- .build()
- .unwrap();
- rt.block_on(async move {
- let mut artifact = job_ctx.lock().unwrap().create_artifact(&name, &format!("{} (from {})", name, path.display())).await
- .map_err(|e| LuaError::RuntimeError(format!("create_artifact error: {:?}", e)))
- .unwrap();
- let mut file = tokio::fs::File::open(&format!("tmpdir/{}", path.display())).await.unwrap();
- eprintln!("uploading...");
- crate::io::forward_data(&mut file, &mut artifact).await
- .map_err(|e| LuaError::RuntimeError(format!("failed uploading data for {}: {:?}", name, e)))?;
- std::mem::drop(artifact);
- Ok(())
- })
- }
-
- pub fn has_cmd(name: &str) -> Result<bool, rlua::Error> {
- Ok(std::process::Command::new("which")
- .arg(name)
- .status()
- .map_err(|e| LuaError::RuntimeError(format!("could not fork which? {:?}", e)))?
- .success())
- }
-
- pub fn file_size(path: &str) -> Result<u64, rlua::Error> {
- Ok(std::fs::metadata(&format!("tmpdir/{}", path))
- .map_err(|e| LuaError::RuntimeError(format!("could not stat {:?}", path)))?
- .len())
- }
-
- pub mod step {
- use crate::RunningJob;
- use std::sync::{Arc, Mutex};
-
- pub fn start(job_ref: Arc<Mutex<RunningJob>>, name: String) -> Result<(), rlua::Error> {
- let mut job = job_ref.lock().unwrap();
- job.current_step.clear();
- job.current_step.push(name);
- Ok(())
- }
-
- pub fn push(job_ref: Arc<Mutex<RunningJob>>, name: String) -> Result<(), rlua::Error> {
- let mut job = job_ref.lock().unwrap();
- job.current_step.push(name);
- Ok(())
- }
-
- pub fn advance(job_ref: Arc<Mutex<RunningJob>>, name: String) -> Result<(), rlua::Error> {
- let mut job = job_ref.lock().unwrap();
- job.current_step.pop();
- job.current_step.push(name);
- Ok(())
- }
- }
-}
-
-struct DeclEnv<'lua, 'env> {
- lua_ctx: &'env rlua::Context<'lua>,
- job_ref: &'env Arc<Mutex<RunningJob>>,
-}
-impl<'lua, 'env> DeclEnv<'lua, 'env> {
- fn create_function<A, R, F>(&self, name: &str, f: F) -> Result<rlua::Function<'lua>, String>
- where
- A: FromLuaMulti<'lua>,
- R: ToLuaMulti<'lua>,
- F: 'static + Send + Fn(rlua::Context<'lua>, Arc<Mutex<RunningJob>>, A) -> Result<R, rlua::Error> {
-
- let job_ref = Arc::clone(self.job_ref);
- self.lua_ctx.create_function(move |ctx, args| {
- let job_ref = Arc::clone(&job_ref);
- f(ctx, job_ref, args)
- })
- .map_err(|e| format!("problem defining {} function: {:?}", name, e))
- }
-}
-
-impl BuildEnv {
- pub fn new(job: &Arc<Mutex<RunningJob>>) -> Self {
- let env = BuildEnv {
- lua: Lua::new(),
- job: Arc::clone(job),
- };
- env.lua.context(|lua_ctx| {
- env.define_env(lua_ctx)
- }).expect("can define context");
- env
- }
-
- fn define_env(&self, lua_ctx: rlua::Context) -> Result<(), String> {
- let decl_env = DeclEnv {
- lua_ctx: &lua_ctx,
- job_ref: &self.job,
- };
-
- let hello = decl_env.create_function("hello", |_, _, ()| {
- eprintln!("hello from lua!!!");
- Ok(())
- })?;
-
- let check_dependencies = decl_env.create_function("dependencies", move |_, job_ref, commands: Vec<String>| {
- lua_exports::check_dependencies(commands)
- })?;
-
- let build = decl_env.create_function("build", move |_, job_ref, (command, params): (LuaValue, LuaValue)| {
- lua_exports::build_command_impl(command, params, job_ref)
- })?;
-
- let check_output = decl_env.create_function("check_output", move |ctx, job_ref, (command, params): (LuaValue, LuaValue)| {
- lua_exports::check_output_impl(ctx, command, params, job_ref)
- })?;
-
- let metric = decl_env.create_function("metric", move |_, job_ref, (name, value): (String, String)| {
- lua_exports::metric(name, value, job_ref)
- })?;
-
- let now_ms = decl_env.create_function("now_ms", move |_, job_ref, ()| Ok(crate::io::now_ms()))?;
-
- let artifact = decl_env.create_function("artifact", move |_, job_ref, (path, name): (String, Option<String>)| {
- lua_exports::artifact(path, name, job_ref)
- })?;
-
- let error = decl_env.create_function("error", move |_, job_ref, msg: String| {
- Err::<(), LuaError>(LuaError::RuntimeError(format!("explicit error: {}", msg)))
- })?;
-
- let path_has_cmd = decl_env.create_function("path_has_cmd", move |_, job_ref, name: String| {
- lua_exports::has_cmd(&name)
- })?;
-
- let size_of_file = decl_env.create_function("size_of_file", move |_, job_ref, name: String| {
- lua_exports::file_size(&name)
- })?;
-
- let native_rust_triple = match std::env::consts::ARCH {
- "x86_64" => "x86_64-unknown-linux-gnu",
- "aarch64" => "aarch64-unknown-linux-gnu",
- other => { panic!("dunno native rust triple for arch {}", other); }
- };
- let native_rust_triple = lua_ctx.create_string(native_rust_triple).unwrap();
- let build_env_vars = lua_ctx.create_table_from(
- vec![
- ("native_rust_triple", native_rust_triple)
- ]
- ).unwrap();
-
- let build_environment = lua_ctx.create_table_from(
- vec![
- ("has", path_has_cmd),
- ("size", size_of_file),
- ]
- ).unwrap();
- build_environment.set("vars", build_env_vars).unwrap();
-
- let build_functions = lua_ctx.create_table_from(
- vec![
- ("hello", hello),
- ("run", build),
- ("dependencies", check_dependencies),
- ("metric", metric),
- ("error", error),
- ("artifact", artifact),
- ("now_ms", now_ms),
- ("check_output", check_output),
- ]
- ).unwrap();
- build_functions.set("environment", build_environment).unwrap();
- let current_commit = self.job.lock().unwrap().job.commit.clone();
- build_functions.set("sha", lua_ctx.create_string(current_commit.as_bytes()).unwrap()).unwrap();
- let globals = lua_ctx.globals();
- globals.set("Build", build_functions).unwrap();
-
-
- let step_start = decl_env.create_function("step_start", move |_, job_ref, name: String| {
- lua_exports::step::start(job_ref, name)
- })?;
-
- let step_push = decl_env.create_function("step_push", move |_, job_ref, name: String| {
- lua_exports::step::push(job_ref, name)
- })?;
-
- let step_advance = decl_env.create_function("step_advance", move |_, job_ref, name: String| {
- lua_exports::step::advance(job_ref, name)
- })?;
-
- let step_functions = lua_ctx.create_table_from(
- vec![
- ("start", step_start),
- ("push", step_push),
- ("advance", step_advance),
- ]
- ).unwrap();
- globals.set("Step", step_functions).unwrap();
- Ok(())
- }
-
- pub async fn run_build(self, script: &[u8]) -> Result<(), LuaError> {
- let script = script.to_vec();
- let res: Result<(), LuaError> = tokio::task::spawn_blocking(|| {
- std::thread::spawn(move || {
- self.lua.context(|lua_ctx| {
- lua_ctx.load(&script)
- .set_name("goodfile")?
- .exec()
- })
- }).join().unwrap()
- }).await.unwrap();
- eprintln!("lua res: {:?}", res);
- res
- }
-}
diff --git a/src/main.rs b/src/main.rs
deleted file mode 100644
index d11df1f..0000000
--- a/src/main.rs
+++ /dev/null
@@ -1,1092 +0,0 @@
-#![allow(dead_code)]
-#![allow(unused_variables)]
-#![allow(unused_imports)]
-
-use chrono::{Utc, TimeZone};
-use lazy_static::lazy_static;
-use std::sync::RwLock;
-use std::collections::HashMap;
-use serde_derive::{Deserialize, Serialize};
-use tokio::spawn;
-use std::path::PathBuf;
-use axum_server::tls_rustls::RustlsConfig;
-use axum::routing::*;
-use axum::Router;
-use axum::response::{IntoResponse, Response, Html};
-use std::net::SocketAddr;
-use axum::extract::{Path, State};
-use http_body::combinators::UnsyncBoxBody;
-use axum::{Error, Json};
-use axum::extract::rejection::JsonRejection;
-use axum::body::Bytes;
-use axum::http::{StatusCode, Uri};
-use http::header::HeaderMap;
-use tokio::sync::mpsc;
-use tokio_stream::wrappers::ReceiverStream;
-use axum::body::StreamBody;
-
-use std::sync::Arc;
-use std::time::{SystemTime, UNIX_EPOCH};
-
-use hmac::{Hmac, Mac};
-use sha2::Sha256;
-
-mod io;
-mod sql;
-mod notifier;
-mod dbctx;
-mod protocol;
-
-use sql::RunState;
-
-use dbctx::{DbCtx, Job, Run, ArtifactRecord};
-
-use rusqlite::OptionalExtension;
-
-#[derive(Serialize, Deserialize)]
-struct WebserverConfig {
- psks: Vec<GithubPsk>,
- jobs_path: PathBuf,
- config_path: PathBuf,
- db_path: PathBuf,
- debug_addr: Option<serde_json::Value>,
- server_addr: Option<serde_json::Value>,
-}
-
-#[derive(Clone)]
-struct WebserverState {
- jobs_path: PathBuf,
- dbctx: Arc<DbCtx>,
-}
-
-#[derive(Clone, Serialize, Deserialize)]
-struct GithubPsk {
- key: String,
- gh_user: String,
-}
-
-lazy_static! {
- static ref PSKS: RwLock<Vec<GithubPsk>> = RwLock::new(Vec::new());
-}
-
-#[derive(Copy, Clone, Debug)]
-enum GithubHookError {
- BodyNotObject,
- MissingElement { path: &'static str },
- BadType { path: &'static str, expected: &'static str },
-}
-
-#[derive(Debug)]
-enum GithubEvent {
- Push { tip: String, repo_name: String, head_commit: serde_json::Map<String, serde_json::Value>, pusher: serde_json::Map<String, serde_json::Value> },
- Other {}
-}
-
-/// return a duration rendered as the largest two non-zero units.
-///
-/// 60000ms -> 1m
-/// 60001ms -> 1m
-/// 61000ms -> 1m1s
-/// 1030ms -> 1.03s
-fn duration_as_human_string(duration_ms: u64) -> String {
- let duration_sec = duration_ms / 1000;
- let duration_min = duration_sec / 60;
- let duration_hours = duration_min / 60;
-
- let duration_ms = duration_ms % 1000;
- let duration_sec = duration_sec % 60;
- let duration_min = duration_min % 60;
- // no need to clamp hours, we're gonna just hope that it's a reasonable number of hours
-
- if duration_hours != 0 {
- let mut res = format!("{}h", duration_hours);
- if duration_min != 0 {
- res.push_str(&format!("{}m", duration_min));
- }
- res
- } else if duration_min != 0 {
- let mut res = format!("{}m", duration_min);
- if duration_min != 0 {
- res.push_str(&format!("{}s", duration_sec));
- }
- res
- } else {
- let mut res = format!("{}", duration_sec);
- if duration_ms != 0 {
- res.push_str(&format!(".{:03}", duration_ms));
- }
- res.push('s');
- res
- }
-}
-
-/// try producing a url for whatever caused this job to be started, if possible
-fn commit_url(job: &Job, commit_sha: &str, ctx: &Arc<DbCtx>) -> Option<String> {
- let remote = ctx.remote_by_id(job.remote_id).expect("query succeeds").expect("existing job references existing remote");
-
- match remote.remote_api.as_str() {
- "github" => {
- Some(format!("{}/commit/{}", remote.remote_url, commit_sha))
- },
- "email" => {
- None
- },
- _ => {
- None
- }
- }
-}
-
-/// produce a url to the ci.butactuallyin.space job details page
-fn job_url(job: &Job, commit_sha: &str, ctx: &Arc<DbCtx>) -> String {
- let remote = ctx.remote_by_id(job.remote_id).expect("query succeeds").expect("existing job references existing remote");
-
- if remote.remote_api != "github" {
- eprintln!("job url for remote type {} can't be constructed, i think", &remote.remote_api);
- }
-
- format!("{}/{}", &remote.remote_path, commit_sha)
-}
-
-/// render how long a run took, or is taking, in a human-friendly way
-fn display_run_time(run: &Run) -> String {
- if let Some(start_time) = run.start_time {
- if let Some(complete_time) = run.complete_time {
- if complete_time < start_time {
- if run.state == RunState::Started {
- // this run has been restarted. the completed time is stale.
- // further, this is a currently active run.
- let now_ms = SystemTime::now().duration_since(UNIX_EPOCH).expect("now is after then").as_millis() as u64;
- let mut duration = duration_as_human_string(now_ms - start_time);
- duration.push_str(" (ongoing)");
- duration
- } else {
- "invalid data".to_string()
- }
- } else {
- let duration_ms = complete_time - start_time;
- let duration = duration_as_human_string(duration_ms);
- duration
- }
- } else {
- if run.state != RunState::Invalid {
- let now_ms = SystemTime::now().duration_since(UNIX_EPOCH).expect("now is after then").as_millis() as u64;
- let mut duration = duration_as_human_string(now_ms - start_time);
- duration.push_str(" (ongoing)");
- duration
- } else {
- "n/a".to_string()
- }
- }
- } else {
- "not yet run".to_owned()
- }
-}
-
-fn parse_push_event(body: serde_json::Value) -> Result<GithubEvent, GithubHookError> {
- let body = body.as_object()
- .ok_or(GithubHookError::BodyNotObject)?;
-
- let tip = body.get("after")
- .ok_or(GithubHookError::MissingElement { path: "after" })?
- .as_str()
- .ok_or(GithubHookError::BadType { path: "after", expected: "str" })?
- .to_owned();
-
- let repo_name = body.get("repository")
- .ok_or(GithubHookError::MissingElement { path: "repository" })?
- .as_object()
- .ok_or(GithubHookError::BadType { path: "repository", expected: "obj" })?
- .get("full_name")
- .ok_or(GithubHookError::MissingElement { path: "repository/full_name" })?
- .as_str()
- .ok_or(GithubHookError::BadType { path: "repository/full_name", expected: "str" })?
- .to_owned();
-
- let head_commit = body.get("head_commit")
- .ok_or(GithubHookError::MissingElement { path: "head_commit" })?
- .as_object()
- .ok_or(GithubHookError::BadType { path: "head_commit", expected: "obj" })?
- .to_owned();
-
- let pusher = body.get("pusher")
- .ok_or(GithubHookError::MissingElement { path: "pusher" })?
- .as_object()
- .ok_or(GithubHookError::BadType { path: "pusher", expected: "obj" })?
- .to_owned();
-
- Ok(GithubEvent::Push { tip, repo_name, head_commit, pusher })
-}
-
-async fn process_push_event(ctx: Arc<DbCtx>, owner: String, repo: String, event: GithubEvent) -> impl IntoResponse {
- let (sha, repo, head_commit, pusher) = if let GithubEvent::Push { tip, repo_name, head_commit, pusher } = event {
- (tip, repo_name, head_commit, pusher)
- } else {
- panic!("process push event on non-push event");
- };
-
- println!("handling push event to {}/{}: sha {} in repo {}, {:?}\n pusher: {:?}", owner, repo, sha, repo, head_commit, pusher);
-
- // push event is in terms of a ref, but we don't know if it's a new commit (yet).
- // in terms of CI jobs, we care mainly about new commits.
- // so...
- // * look up the commit,
- // * if it known, bail out (new ref for existing commit we've already handled some way)
- // * create a new commit ref
- // * create a new job (state=pending) for the commit ref
- let commit_id: Option<u64> = ctx.conn.lock().unwrap()
- .query_row(sql::COMMIT_TO_ID, [sha.clone()], |row| row.get(0))
- .optional()
- .expect("can run query");
-
- if commit_id.is_some() {
- eprintln!("commit already exists");
- return (StatusCode::OK, String::new());
- }
-
- let remote_url = format!("https://www.github.com/{}.git", repo);
- eprintln!("looking for remote url: {}", remote_url);
- let (remote_id, repo_id): (u64, u64) = match ctx.conn.lock().unwrap()
- .query_row("select id, repo_id from remotes where remote_git_url=?1;", [&remote_url], |row| Ok((row.get(0).unwrap(), row.get(1).unwrap())))
- .optional()
- .unwrap() {
- Some(elems) => elems,
- None => {
- eprintln!("no remote registered for url {} (repo {})", remote_url, repo);
- return (StatusCode::NOT_FOUND, String::new());
- }
- };
-
- let repo_default_run_pref: Option<String> = ctx.conn.lock().unwrap()
- .query_row("select default_run_preference from repos where id=?1;", [repo_id], |row| {
- Ok((row.get(0)).unwrap())
- })
- .expect("can query");
-
- let pusher_email = pusher
- .get("email")
- .expect("has email")
- .as_str()
- .expect("is str");
-
- let job_id = ctx.new_job(remote_id, &sha, Some(pusher_email), repo_default_run_pref).unwrap();
- let _ = ctx.new_run(job_id, None).unwrap();
-
- let notifiers = ctx.notifiers_by_repo(repo_id).expect("can get notifiers");
-
- for notifier in notifiers {
- notifier.tell_pending_job(&ctx, repo_id, &sha, job_id).await.expect("can notify");
- }
-
- (StatusCode::OK, String::new())
-}
-
-async fn handle_github_event(ctx: Arc<DbCtx>, owner: String, repo: String, event_kind: String, body: serde_json::Value) -> Response<UnsyncBoxBody<Bytes, Error>> {
- println!("got github event: {}, {}, {}", owner, repo, event_kind);
- match event_kind.as_str() {
- "push" => {
- let push_event = parse_push_event(body)
- .map_err(|e| {
- eprintln!("TODO: handle push event error: {:?}", e);
- panic!()
- })
- .expect("parse works");
- let res = process_push_event(ctx, owner, repo, push_event).await;
- "ok".into_response()
- },
- "status" => {
- eprintln!("[.] status update");
- "ok".into_response()
- }
- other => {
- eprintln!("unhandled event kind: {}, repo {}/{}. content: {:?}", other, owner, repo, body);
- "".into_response()
- }
- }
-}
-
-async fn handle_ci_index(State(ctx): State<WebserverState>) -> impl IntoResponse {
- eprintln!("root index");
- let repos = match ctx.dbctx.get_repos() {
- Ok(repos) => repos,
- Err(e) => {
- eprintln!("failed to get repos: {:?}", e);
- return (StatusCode::INTERNAL_SERVER_ERROR, Html("gonna feel that one tomorrow".to_string()));
- }
- };
-
- let mut response = String::new();
-
- response.push_str("<html>\n");
- response.push_str("<style>\n");
- response.push_str(".build-table { font-family: monospace; border: 1px solid black; border-collapse: collapse; }\n");
- response.push_str(".row-item { padding-left: 4px; padding-right: 4px; border-right: 1px solid black; }\n");
- response.push_str(".odd-row { background: #eee; }\n");
- response.push_str(".even-row { background: #ddd; }\n");
- response.push_str("</style>\n");
- response.push_str("<h1>builds and build accessories</h1>\n");
-
- match repos.len() {
- 0 => { response.push_str(&format!("<p>no repos configured, so there are no builds</p>\n")); },
- 1 => { response.push_str("<p>1 repo configured</p>\n"); },
- other => { response.push_str(&format!("<p>{} repos configured</p>\n", other)); },
- }
-
- response.push_str("<table class='build-table'>");
- response.push_str("<tr>\n");
- let headings = ["repo", "last build", "job", "build commit", "duration", "status", "result"];
- for heading in headings {
- response.push_str(&format!("<th class='row-item'>{}</th>", heading));
- }
- response.push_str("</tr>\n");
-
- let mut row_num = 0;
-
- for repo in repos {
- let mut most_recent_run: Option<(Job, Run)> = None;
-
- for remote in ctx.dbctx.remotes_by_repo(repo.id).expect("remotes by repo works") {
- let last_job = ctx.dbctx.last_job_from_remote(remote.id).expect("job by remote works");
- if let Some(last_job) = last_job {
- if let Some(last_run) = ctx.dbctx.last_run_for_job(last_job.id).expect("can query") {
- if most_recent_run.as_ref().map(|run| run.1.create_time < last_run.create_time).unwrap_or(true) {
- most_recent_run = Some((last_job, last_run));
- }
- }
- }
- }
-
- let repo_html = format!("<a href=\"/{}\">{}</a>", &repo.name, &repo.name);
-
- let row_html: String = match most_recent_run {
- Some((job, run)) => {
- let job_commit = ctx.dbctx.commit_sha(job.commit_id).expect("job has a commit");
- let commit_html = match commit_url(&job, &job_commit, &ctx.dbctx) {
- Some(url) => format!("<a href=\"{}\">{}</a>", url, &job_commit),
- None => job_commit.clone()
- };
-
- let job_html = format!("<a href=\"{}\">{}</a>", job_url(&job, &job_commit, &ctx.dbctx), job.id);
-
- let last_build_time = Utc.timestamp_millis_opt(run.create_time as i64).unwrap().to_rfc2822();
- let duration = display_run_time(&run);
-
- let status = format!("{:?}", run.state).to_lowercase();
-
- let result = match run.build_result {
- Some(0) => "<span style='color:green;'>pass</span>",
- Some(_) => "<span style='color:red;'>fail</span>",
- None => match run.state {
- RunState::Pending => { "unstarted" },
- RunState::Started => { "<span style='color:darkgoldenrod;'>in progress</span>" },
- _ => { "<span style='color:red;'>unreported</span>" }
- }
- };
-
- let entries = [repo_html.as_str(), last_build_time.as_str(), job_html.as_str(), commit_html.as_str(), &duration, &status, &result];
- let entries = entries.iter().chain(std::iter::repeat(&"")).take(headings.len());
-
- let mut row_html = String::new();
- for entry in entries {
- row_html.push_str(&format!("<td class='row-item'>{}</td>", entry));
- }
- row_html
- }
- None => {
- let entries = [repo_html.as_str()];
- let entries = entries.iter().chain(std::iter::repeat(&"")).take(headings.len());
-
- let mut row_html = String::new();
- for entry in entries {
- row_html.push_str(&format!("<td class='row-item'>{}</td>", entry));
- }
- row_html
- }
- };
-
- let row_index = row_num % 2;
- response.push_str(&format!("<tr class=\"{}\">", ["even-row", "odd-row"][row_index]));
- response.push_str(&row_html);
- response.push_str("</tr>");
- response.push('\n');
-
- row_num += 1;
- }
- response.push_str("</table>");
-
- response.push_str("<h4>active tasks</h4>\n");
-
- let runs = ctx.dbctx.get_active_runs().expect("can query");
- if runs.len() == 0 {
- response.push_str("<p>(none)</p>\n");
- } else {
- response.push_str("<table class='build-table'>");
- response.push_str("<tr>\n");
- let headings = ["repo", "last build", "job", "build commit", "duration", "status", "result"];
- for heading in headings {
- response.push_str(&format!("<th class='row-item'>{}</th>", heading));
- }
- response.push_str("</tr>\n");
-
- let mut row_num = 0;
-
- for run in runs.iter() {
- let row_index = row_num % 2;
-
- let job = ctx.dbctx.job_by_id(run.job_id).expect("query succeeds").expect("job id is valid");
- let remote = ctx.dbctx.remote_by_id(job.remote_id).expect("query succeeds").expect("remote id is valid");
- let repo = ctx.dbctx.repo_by_id(remote.repo_id).expect("query succeeds").expect("repo id is valid");
-
- let repo_html = format!("<a href=\"/{}\">{}</a>", &repo.name, &repo.name);
-
- let job_commit = ctx.dbctx.commit_sha(job.commit_id).expect("job has a commit");
- let commit_html = match commit_url(&job, &job_commit, &ctx.dbctx) {
- Some(url) => format!("<a href=\"{}\">{}</a>", url, &job_commit),
- None => job_commit.clone()
- };
-
- let job_html = format!("<a href=\"{}\">{}</a>", job_url(&job, &job_commit, &ctx.dbctx), job.id);
-
- let last_build_time = Utc.timestamp_millis_opt(run.create_time as i64).unwrap().to_rfc2822();
- let duration = display_run_time(&run);
-
- let status = format!("{:?}", run.state).to_lowercase();
-
- let result = match run.build_result {
- Some(0) => "<span style='color:green;'>pass</span>",
- Some(_) => "<span style='color:red;'>fail</span>",
- None => match run.state {
- RunState::Pending => { "unstarted" },
- RunState::Started => { "<span style='color:darkgoldenrod;'>in progress</span>" },
- _ => { "<span style='color:red;'>unreported</span>" }
- }
- };
-
- let entries = [repo_html.as_str(), last_build_time.as_str(), job_html.as_str(), commit_html.as_str(), &duration, &status, &result];
- let entries = entries.iter().chain(std::iter::repeat(&"")).take(headings.len());
-
- let mut row_html = String::new();
- for entry in entries {
- row_html.push_str(&format!("<td class='row-item'>{}</td>", entry));
- }
-
-
- response.push_str(&format!("<tr class=\"{}\">", ["even-row", "odd-row"][row_index]));
- response.push_str(&row_html);
- response.push_str("</tr>");
- response.push('\n');
-
- row_num += 1;
- }
-
- response.push_str("</table>\n");
- }
-
- response.push_str("</html>");
-
- (StatusCode::OK, Html(response))
-}
-
-async fn handle_commit_status(Path(path): Path<(String, String, String)>, State(ctx): State<WebserverState>) -> impl IntoResponse {
- eprintln!("path: {}/{}, sha {}", path.0, path.1, path.2);
- let remote_path = format!("{}/{}", path.0, path.1);
- let sha = path.2;
-
- let (commit_id, sha): (u64, String) = if sha.len() >= 7 {
- match ctx.dbctx.conn.lock().unwrap()
- .query_row("select id, sha from commits where sha like ?1;", [&format!("{}%", sha)], |row| Ok((row.get_unwrap(0), row.get_unwrap(1))))
- .optional()
- .expect("can query") {
- Some((commit_id, sha)) => (commit_id, sha),
- None => {
- return (StatusCode::NOT_FOUND, Html("<html><body>no such commit</body></html>".to_string()));
- }
- }
- } else {
- return (StatusCode::NOT_FOUND, Html("<html><body>no such commit</body></html>".to_string()));
- };
-
- let short_sha = &sha[0..9];
-
- let (remote_id, repo_id): (u64, u64) = ctx.dbctx.conn.lock().unwrap()
- .query_row("select id, repo_id from remotes where remote_path=?1;", [&remote_path], |row| Ok((row.get_unwrap(0), row.get_unwrap(1))))
- .expect("can query");
-
- let job = ctx.dbctx.job_by_commit_id(commit_id).expect("can query").expect("job exists");
-
- let run = ctx.dbctx.last_run_for_job(job.id).expect("can query").expect("run exists");
-
- let complete_time = run.complete_time.unwrap_or_else(crate::io::now_ms);
-
- let (status_elem, status_desc) = match run.state {
- RunState::Pending | RunState::Started => {
- ("<span style='color:#660;'>pending</span>", "⌛in progress")
- },
- RunState::Finished => {
- if let Some(build_result) = run.build_result {
- if build_result == 0 {
- ("<span style='color:green;'>pass</span>", "✅ passed")
- } else {
- ("<span style='color:red;'>failed</span>", "❌ failed")
- }
- } else {
- eprintln!("run {} for commit {} is missing a build result but is reportedly finished (old data)?", run.id, commit_id);
- ("<span style='color:red;'>unreported</span>", "❔ missing status")
- }
- },
- RunState::Error => {
- ("<span style='color:red;'>error</span>", "🧯 error, uncompleted")
- }
- RunState::Invalid => {
- ("<span style='color:red;'>(server error)</span>", "dude even i don't know")
- }
- };
- let debug_info = run.state == RunState::Finished && run.build_result == Some(1) || run.state == RunState::Error;
-
- let repo_name: String = ctx.dbctx.conn.lock().unwrap()
- .query_row("select repo_name from repos where id=?1;", [repo_id], |row| row.get(0))
- .expect("can query");
-
- let deployed = false;
-
- let mut head = String::new();
- head.push_str("<head>");
- head.push_str(&format!("<title>ci.butactuallyin.space - {}</title>", repo_name));
- let include_og_tags = true;
- if include_og_tags {
- head.push_str("\n");
- head.push_str(&format!("<meta property=\"og:type\" content=\"website\">\n"));
- head.push_str(&format!("<meta property=\"og:site_name\" content=\"ci.butactuallyin.space\">\n"));
- head.push_str(&format!("<meta property=\"og:url\" content=\"/{}/{}/{}\">\n", &path.0, &path.1, &sha));
- head.push_str(&format!("<meta property=\"og:title\" contents=\"{}/{} commit {}\">", &path.0, &path.1, &short_sha));
- let build_og_description = format!("commit {} of {}/{}, {} after {}",
- short_sha,
- path.0, path.1,
- status_desc,
- display_run_time(&run)
- );
- head.push_str(&format!("<meta name=\"description\" content=\"{}\"\n>", build_og_description));
- head.push_str(&format!("<meta property=\"og:description\" content=\"{}\"\n>", build_og_description));
- }
- head.push_str("</head>\n");
- let repo_html = format!("<a href=\"/{}\">{}</a>", &repo_name, &repo_name);
- let remote_commit_elem = format!("<a href=\"https://www.github.com/{}/commit/{}\">{}</a>", &remote_path, &sha, &sha);
-
- let mut artifacts_fragment = String::new();
- let mut artifacts: Vec<ArtifactRecord> = ctx.dbctx.artifacts_for_run(run.id, None).unwrap()
- .into_iter() // HACK: filter out artifacts for previous runs of a run. artifacts should be attached to a run, runs should be distinct from run. but i'm sleepy.
- .filter(|artifact| artifact.created_time >= run.start_time.unwrap_or_else(crate::io::now_ms))
- .collect();
-
- artifacts.sort_by_key(|artifact| artifact.created_time);
-
- fn diff_times(run_completed: u64, artifact_completed: Option<u64>) -> u64 {
- let artifact_completed = artifact_completed.unwrap_or_else(crate::io::now_ms);
- let run_completed = std::cmp::max(run_completed, artifact_completed);
- run_completed - artifact_completed
- }
-
- let recent_artifacts: Vec<ArtifactRecord> = artifacts.iter().filter(|artifact| diff_times(complete_time, artifact.completed_time) <= 60_000).cloned().collect();
- let old_artifacts: Vec<ArtifactRecord> = artifacts.iter().filter(|artifact| diff_times(complete_time, artifact.completed_time) > 60_000).cloned().collect();
-
- for artifact in old_artifacts.iter() {
- let created_time_str = Utc.timestamp_millis_opt(artifact.created_time as i64).unwrap().to_rfc2822();
- artifacts_fragment.push_str(&format!("<div><pre style='display:inline;'>{}</pre> step: <pre style='display:inline;'>{}</pre></div>\n", created_time_str, &artifact.name));
- let duration_str = duration_as_human_string(artifact.completed_time.unwrap_or_else(crate::io::now_ms) - artifact.created_time);
- let size_str = (std::fs::metadata(&format!("./artifacts/{}/{}", artifact.run_id, artifact.id)).expect("metadata exists").len() / 1024).to_string();
- artifacts_fragment.push_str(&format!("<pre> {}kb in {} </pre>\n", size_str, duration_str));
- }
-
- for artifact in recent_artifacts.iter() {
- let created_time_str = Utc.timestamp_millis_opt(artifact.created_time as i64).unwrap().to_rfc2822();
- artifacts_fragment.push_str(&format!("<div><pre style='display:inline;'>{}</pre> step: <pre style='display:inline;'>{}</pre></div>\n", created_time_str, &artifact.name));
- if debug_info {
- artifacts_fragment.push_str("<pre>");
- artifacts_fragment.push_str(&std::fs::read_to_string(format!("./artifacts/{}/{}", artifact.run_id, artifact.id)).unwrap());
- artifacts_fragment.push_str("</pre>\n");
- } else {
- let duration_str = duration_as_human_string(artifact.completed_time.unwrap_or_else(crate::io::now_ms) - artifact.created_time);
- let size_str = std::fs::metadata(&format!("./artifacts/{}/{}", artifact.run_id, artifact.id)).map(|md| {
- (md.len() / 1024).to_string()
- }).unwrap_or_else(|e| format!("[{}]", e));
- artifacts_fragment.push_str(&format!("<pre> {}kb in {} </pre>\n", size_str, duration_str));
- }
- }
-
- let metrics = summarize_job_metrics(&ctx.dbctx, run.id, run.job_id).unwrap();
-
- let mut html = String::new();
- html.push_str("<html>\n");
- html.push_str(&format!(" {}\n", head));
- html.push_str(" <body>\n");
- html.push_str(" <pre>\n");
- html.push_str(&format!("repo: {}\n", repo_html));
- html.push_str(&format!("commit: {}, run: {}\n", remote_commit_elem, run.id));
- html.push_str(&format!("status: {} in {}\n", status_elem, display_run_time(&run)));
- if let Some(desc) = run.final_text.as_ref() {
- html.push_str(&format!(" description: {}\n ", desc));
- }
- html.push_str(&format!("deployed: {}\n", deployed));
- html.push_str(" </pre>\n");
- if artifacts_fragment.len() > 0 {
- html.push_str(" <div>artifacts</div>\n");
- html.push_str(&artifacts_fragment);
- }
- if let Some(metrics) = metrics {
- html.push_str(&metrics);
- }
- html.push_str(" </body>\n");
- html.push_str("</html>");
-
- (StatusCode::OK, Html(html))
-}
-
-fn summarize_job_metrics(dbctx: &Arc<DbCtx>, run_id: u64, job_id: u64) -> Result<Option<String>, String> {
- let runs = dbctx.runs_for_job_one_per_host(job_id)?;
-
- let mut section = String::new();
- section.push_str("<div>\n");
- section.push_str("<h3>metrics</h3>\n");
- section.push_str("<table style='font-family: monospace;'>\n");
-
- if runs.len() == 1 {
- let metrics = dbctx.metrics_for_run(run_id).unwrap();
- if metrics.is_empty() {
- return Ok(None);
- }
-
- section.push_str("<tr><th>name</th><th>value</th></tr>");
- for metric in metrics {
- section.push_str(&format!("<tr><td>{}</td><td>{}</td></tr>", &metric.name, &metric.value));
- }
- } else {
- // very silly ordering issue: need an authoritative ordering of metrics to display metrics
- // in a consistent order across runs (though they SHOULD all be ordered the same).
- //
- // the first run might not have all metrics (first run could be on the slowest build host
- // f.ex), so for now just assume that metrics *will* be consistently ordered and build a
- // list of metrics from the longest list of metrics we've seen. builders do not support
- // concurrency so at least the per-run metrics-order-consistency assumption should hold..
- let mut all_names: Vec<String> = Vec::new();
-
- let all_metrics: Vec<(HashMap<String, String>, HostDesc)> = runs.iter().map(|run| {
- let metrics = dbctx.metrics_for_run(run.id).unwrap();
-
- let mut metrics_map = HashMap::new();
- for metric in metrics.into_iter() {
- if !all_names.contains(&metric.name) {
- all_names.push(metric.name.clone());
- }
- metrics_map.insert(metric.name, metric.value);
- }
-
- let (hostname, cpu_vendor_id, cpu_family, cpu_model, cpu_max_freq_khz) = match run.host_id {
- Some(host_id) => {
- dbctx.host_model_info(host_id).unwrap()
- }
- None => {
- ("unknown".to_string(), "unknown".to_string(), "0".to_string(), "0".to_string(), 0)
- }
- };
-
- (metrics_map, HostDesc::from_parts(hostname, cpu_vendor_id, cpu_family, cpu_model, cpu_max_freq_khz))
- }).collect();
-
- if all_metrics.is_empty() {
- return Ok(None);
- }
-
- let mut header = "<tr><th>name</th>".to_string();
- for (_, host) in all_metrics.iter() {
- header.push_str(&format!("<th>{}</br>{} @ {:.3}GHz</th>", &host.hostname, &host.cpu_desc, (host.cpu_max_freq_khz as f64) / 1000_000.0));
- }
- header.push_str("</tr>\n");
- section.push_str(&header);
-
- for name in all_names.iter() {
- let mut row = format!("<tr><td>{}</td>", &name);
- for (metrics, _) in all_metrics.iter() {
- let value = metrics.get(name)
- .map(|x| x.clone())
- .unwrap_or_else(String::new);
- row.push_str(&format!("<td>{}</td>", value));
- }
- row.push_str("</tr>\n");
- section.push_str(&row);
- }
- };
- section.push_str("</table>\n");
- section.push_str("</div>\n");
-
- Ok(Some(section))
-}
-
-async fn handle_get_artifact(Path(path): Path<(String, String)>, State(ctx): State<WebserverState>) -> impl IntoResponse {
- eprintln!("get artifact, run={}, artifact={}", path.0, path.1);
- let run: u64 = path.0.parse().unwrap();
- let artifact_id: u64 = path.1.parse().unwrap();
-
- let artifact_descriptor = match ctx.dbctx.lookup_artifact(run, artifact_id).unwrap() {
- Some(artifact) => artifact,
- None => {
- return (StatusCode::NOT_FOUND, Html("no such artifact")).into_response();
- }
- };
-
- let mut live_artifact = false;
-
- if let Some(completed_time) = artifact_descriptor.completed_time {
- if completed_time < artifact_descriptor.created_time {
- live_artifact = true;
- }
- } else {
- live_artifact = true;
- }
-
- if live_artifact {
- let (mut tx_sender, tx_receiver) = tokio::io::duplex(65536);
- let resp_body = axum_extra::body::AsyncReadBody::new(tx_receiver);
- let mut artifact_path = ctx.jobs_path.clone();
- artifact_path.push(artifact_descriptor.run_id.to_string());
- artifact_path.push(artifact_descriptor.id.to_string());
- spawn(async move {
- let mut artifact = artifact_descriptor;
-
- let mut artifact_file = tokio::fs::File::open(&artifact_path)
- .await
- .expect("artifact file exists?");
- while artifact.completed_time.is_none() {
- match crate::io::forward_data(&mut artifact_file, &mut tx_sender).await {
- Ok(()) => {
- // reached the current EOF, wait and then commit an unspeakable sin
- tokio::time::sleep(std::time::Duration::from_millis(250)).await;
- // this would be much implemented as yielding on a condvar woken when an
- // inotify event on the file indicates a write has occurred. but i am
- // dreadfully lazy, so we'll just uhh. busy-poll on the file? lmao.
- artifact = ctx.dbctx.lookup_artifact(artifact.run_id, artifact.id)
- .expect("can query db")
- .expect("artifact still exists");
- }
- Err(e) => {
- eprintln!("artifact file streaming failed: {}", e);
- }
- }
- }
-
- eprintln!("[+] artifact {} is done being written, and we've sent the whole thing. bye!", artifact.id);
- });
- (StatusCode::OK, resp_body).into_response()
- } else {
- (StatusCode::OK, Html("all done")).into_response()
- }
-}
-
-async fn handle_repo_summary(Path(path): Path<String>, State(ctx): State<WebserverState>) -> impl IntoResponse {
- eprintln!("get repo summary: {:?}", path);
-
- let mut last_builds = Vec::new();
-
- let (repo_id, repo_name, default_run_preference): (u64, String, Option<String>) = match ctx.dbctx.conn.lock().unwrap()
- .query_row("select id, repo_name, default_run_preference from repos where repo_name=?1;", [&path], |row| Ok((row.get(0).unwrap(), row.get(1).unwrap(), row.get(2).unwrap())))
- .optional()
- .unwrap() {
- Some(elem) => elem,
- None => {
- eprintln!("no repo named {}", path);
- return (StatusCode::NOT_FOUND, Html(String::new()));
- }
- };
-
- // TODO: display default_run_preference somehow on the web summary?
-
- for remote in ctx.dbctx.remotes_by_repo(repo_id).expect("can get repo from a path") {
- let mut last_ten_jobs = ctx.dbctx.recent_jobs_from_remote(remote.id, 10).expect("can look up jobs for a repo");
- last_builds.extend(last_ten_jobs.drain(..));
- }
- last_builds.sort_by_key(|job| -(job.created_time as i64));
-
- let mut response = String::new();
- response.push_str("<html>\n");
- response.push_str(&format!("<title> ci.butactuallyin.space - {} </title>\n", repo_name));
- response.push_str("<style>\n");
- response.push_str(".build-table { font-family: monospace; border: 1px solid black; border-collapse: collapse; }\n");
- response.push_str(".row-item { padding-left: 4px; padding-right: 4px; border-right: 1px solid black; }\n");
- response.push_str(".odd-row { background: #eee; }\n");
- response.push_str(".even-row { background: #ddd; }\n");
- response.push_str("</style>\n");
- response.push_str(&format!("<h1>{} build history</h1>\n", repo_name));
- response.push_str("<a href=/>full repos index</a><p> </p>\n");
-
- response.push_str("<table class='build-table'>");
- response.push_str("<tr>\n");
- let headings = ["last build", "job", "build commit", "duration", "status", "result"];
- for heading in headings {
- response.push_str(&format!("<th class='row-item'>{}</th>", heading));
- }
- response.push_str("</tr>\n");
-
- let mut row_num = 0;
-
- for job in last_builds.iter().take(10) {
- let run = ctx.dbctx.last_run_for_job(job.id).expect("query succeeds").expect("TODO: run exists if job exists (small race if querying while creating job ....");
- let job_commit = ctx.dbctx.commit_sha(job.commit_id).expect("job has a commit");
- let commit_html = match commit_url(&job, &job_commit, &ctx.dbctx) {
- Some(url) => format!("<a href=\"{}\">{}</a>", url, &job_commit),
- None => job_commit.clone()
- };
-
- let job_html = format!("<a href=\"{}\">{}</a>", job_url(&job, &job_commit, &ctx.dbctx), job.id);
-
- let last_build_time = Utc.timestamp_millis_opt(run.create_time as i64).unwrap().to_rfc2822();
- let duration = display_run_time(&run);
-
- let status = format!("{:?}", run.state).to_lowercase();
-
- let result = match run.build_result {
- Some(0) => "<span style='color:green;'>pass</span>",
- Some(_) => "<span style='color:red;'>fail</span>",
- None => match run.state {
- RunState::Pending => { "unstarted" },
- RunState::Started => { "<span style='color:darkgoldenrod;'>in progress</span>" },
- _ => { "<span style='color:red;'>unreported</span>" }
- }
- };
-
- let entries = [last_build_time.as_str(), job_html.as_str(), commit_html.as_str(), &duration, &status, &result];
- let entries = entries.iter().chain(std::iter::repeat(&"")).take(headings.len());
-
- let mut row_html = String::new();
- for entry in entries {
- row_html.push_str(&format!("<td class='row-item'>{}</td>", entry));
- }
-
- let row_index = row_num % 2;
- response.push_str(&format!("<tr class=\"{}\">", ["even-row", "odd-row"][row_index]));
- response.push_str(&row_html);
- response.push_str("</tr>\n");
-
- row_num += 1;
- }
- response.push_str("</html>");
-
- (StatusCode::OK, Html(response))
-}
-
-async fn handle_repo_event(Path(path): Path<(String, String)>, headers: HeaderMap, State(ctx): State<WebserverState>, body: Bytes) -> impl IntoResponse {
- let json: Result<serde_json::Value, _> = serde_json::from_slice(&body);
- eprintln!("repo event: {:?} {:?} {:?}", path.0, path.1, headers);
-
- let payload = match json {
- Ok(payload) => { payload },
- Err(e) => {
- eprintln!("bad request: path={}/{}\nheaders: {:?}\nbody err: {:?}", path.0, path.1, headers, e);
- return (StatusCode::BAD_REQUEST, "").into_response();
- }
- };
-
- let sent_hmac = match headers.get("x-hub-signature-256") {
- Some(sent_hmac) => { sent_hmac.to_str().expect("valid ascii string").to_owned() },
- None => {
- eprintln!("bad request: path={}/{}\nheaders: {:?}\nno x-hub-signature-256", path.0, path.1, headers);
- return (StatusCode::BAD_REQUEST, "").into_response();
- }
- };
-
- let mut hmac_ok = false;
-
- for psk in PSKS.read().unwrap().iter() {
- let mut mac = Hmac::<Sha256>::new_from_slice(psk.key.as_bytes())
- .expect("hmac can be constructed");
- mac.update(&body);
- let result = mac.finalize().into_bytes().to_vec();
-
- // hack: skip sha256=
- let decoded = hex::decode(&sent_hmac[7..]).expect("provided hmac is valid hex");
- if decoded == result {
- hmac_ok = true;
- break;
- }
- }
-
- if !hmac_ok {
- eprintln!("bad hmac by all psks");
- return (StatusCode::BAD_REQUEST, "").into_response();
- }
-
- let kind = match headers.get("x-github-event") {
- Some(kind) => { kind.to_str().expect("valid ascii string").to_owned() },
- None => {
- eprintln!("bad request: path={}/{}\nheaders: {:?}\nno x-github-event", path.0, path.1, headers);
- return (StatusCode::BAD_REQUEST, "").into_response();
- }
- };
-
- handle_github_event(Arc::clone(&ctx.dbctx), path.0, path.1, kind, payload).await
-}
-
-
-async fn make_app_server(jobs_path: PathBuf, cfg_path: &PathBuf, db_path: &PathBuf) -> Router {
- /*
-
- // GET /hello/warp => 200 OK with body "Hello, warp!"
- let hello = warp::path!("hello" / String)
- .map(|name| format!("Hello, {}!\n", name));
-
- let github_event = warp::post()
- .and(warp::path!(String / String))
- .and_then(|owner, repo| {
- warp::header::<String>("x-github-event")
- .and(warp::body::content_length_limit(1024 * 1024))
- .and(warp::body::json())
- .and_then(|event, json| handle_github_event(owner, repo, event, json))
- .recover(|e| {
- async fn handle_rejection(err: Rejection) -> Result<impl Reply, Rejection> {
- Ok(warp::reply::with_status("65308", StatusCode::BAD_REQUEST))
- }
- handle_rejection(e)
- })
- });
-
- let repo_status = warp::get()
- .and(warp::path!(String / String / String))
- .map(|owner, repo, sha| format!("CI status for {}/{} commit {}\n", owner, repo, sha));
-
- let other =
- warp::post()
- .and(warp::path::full())
- .and(warp::addr::remote())
- .and(warp::body::content_length_limit(1024 * 1024))
- .and(warp::body::bytes())
- .map(move |path, addr: Option<std::net::SocketAddr>, body| {
- println!("{}: lets see what i got {:?}, {:?}", addr.unwrap(), path, body);
- "hello :)\n"
- })
- .or(
- warp::get()
- .and(warp::path::full())
- .and(warp::addr::remote())
- .map(move |path, addr: Option<std::net::SocketAddr>| {
- println!("{}: GET to {:?}", addr.unwrap(), path);
- "hello!\n"
- })
- )
- .recover(|e| {
- async fn handle_rejection(err: Rejection) -> Result<impl Reply, std::convert::Infallible> {
- Ok(warp::reply::with_status("50834", StatusCode::BAD_REQUEST))
- }
- handle_rejection(e)
- });
- */
-
- async fn fallback_get(uri: Uri) -> impl IntoResponse {
- (StatusCode::OK, "get resp")
- }
-
- async fn fallback_post(Path(path): Path<String>) -> impl IntoResponse {
- "post resp"
- }
-
- Router::new()
- .route("/:owner/:repo/:sha", get(handle_commit_status))
- .route("/:owner", get(handle_repo_summary))
- .route("/:owner/:repo", post(handle_repo_event))
- .route("/artifact/:b/:artifact_id", get(handle_get_artifact))
- .route("/", get(handle_ci_index))
- .fallback(fallback_get)
- .with_state(WebserverState {
- jobs_path,
- dbctx: Arc::new(DbCtx::new(cfg_path, db_path))
- })
-}
-
-async fn bind_server(conf: serde_json::Value, jobs_path: PathBuf, config_path: PathBuf, db_path: PathBuf) -> std::io::Result<()> {
- let server = make_app_server(jobs_path.clone(), &config_path, &db_path).await.into_make_service();
- use serde_json::Value;
- match conf {
- Value::String(address) => {
- axum_server::bind(address.parse().unwrap())
- .serve(server).await
- },
- Value::Object(map) => {
- let address = match map.get("address") {
- Some(Value::String(address)) => address.clone(),
- None => {
- panic!("no local address");
- },
- other => {
- panic!("invalid local address: {:?}", other);
- }
- };
-
- match (map.get("cert_path"), map.get("key_path")) {
- (Some(Value::String(cert_path)), Some(Value::String(key_path))) => {
- let config = RustlsConfig::from_pem_file(
- cert_path.clone(),
- key_path.clone(),
- ).await.unwrap();
- axum_server::bind_rustls(address.parse().unwrap(), config)
- .serve(server).await
- },
- (Some(_), _) | (_, Some(_)) => {
- panic!("invalid local tls config: only one of `cert_path` or `key_path` has been provided");
- },
- (None, None) => {
- axum_server::bind(address.parse().unwrap())
- .serve(server).await
- }
- }
- },
- other => {
- panic!("invalid server bind config: {:?}", other);
- }
- }
-}
-
-#[tokio::main]
-async fn main() {
- tracing_subscriber::fmt::init();
-
- let mut args = std::env::args();
- args.next().expect("first arg exists");
- let config_path = args.next().unwrap_or("./webserver_config.json".to_string());
- let web_config: WebserverConfig = serde_json::from_reader(std::fs::File::open(config_path).expect("file exists and is accessible")).expect("valid json for WebserverConfig");
- let mut psks = PSKS.write().expect("can write lock");
- *psks = web_config.psks.clone();
- // drop write lock so we can read PSKS elsewhere WITHOUT deadlocking.
- std::mem::drop(psks);
-
- let jobs_path = web_config.jobs_path.clone();
- let config_path = web_config.config_path.clone();
- let db_path = web_config.db_path.clone();
- if let Some(addr_conf) = web_config.debug_addr.as_ref() {
- spawn(bind_server(addr_conf.clone(), jobs_path.clone(), config_path.clone(), db_path.clone()));
- }
- if let Some(addr_conf) = web_config.server_addr.as_ref() {
- spawn(bind_server(addr_conf.clone(), jobs_path.clone(), config_path.clone(), db_path.clone()));
- }
- loop {
- tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
- }
-}
-
-struct HostDesc {
- hostname: String,
- cpu_desc: String,
- cpu_max_freq_khz: u64,
-}
-impl HostDesc {
- fn from_parts(hostname: String, vendor_id: String, cpu_family: String, model: String, cpu_max_freq_khz: u64) -> Self {
- let cpu_desc = match (vendor_id.as_str(), cpu_family.as_str(), model.as_str()) {
- ("Arm Limited", "8", "0xd03") => "aarch64 A53".to_string(),
- ("GenuineIntel", "6", "85") => "x86_64 Skylake".to_string(),
- ("AuthenticAMD", "23", "113") => "x86_64 Matisse".to_string(),
- (vendor, family, model) => format!("unknown {}:{}:{}", vendor, family, model)
- };
-
- HostDesc {
- hostname,
- cpu_desc,
- cpu_max_freq_khz,
- }
- }
-}
diff --git a/src/notifier.rs b/src/notifier.rs
deleted file mode 100644
index f9d6084..0000000
--- a/src/notifier.rs
+++ /dev/null
@@ -1,181 +0,0 @@
-use serde_derive::{Deserialize, Serialize};
-use std::sync::Arc;
-use axum::http::StatusCode;
-use lettre::transport::smtp::authentication::{Credentials, Mechanism};
-use lettre::{Message, Transport};
-use lettre::transport::smtp::extension::ClientId;
-use lettre::transport::smtp::client::{SmtpConnection, TlsParametersBuilder};
-use std::time::Duration;
-use std::path::Path;
-
-use crate::DbCtx;
-
-pub struct RemoteNotifier {
- pub remote_path: String,
- pub notifier: NotifierConfig,
-}
-
-#[derive(Serialize, Deserialize)]
-#[serde(untagged)]
-pub enum NotifierConfig {
- GitHub {
- token: String,
- },
- Email {
- username: String,
- password: String,
- mailserver: String,
- from: String,
- to: String,
- }
-}
-
-impl NotifierConfig {
- pub fn github_from_file<P: AsRef<Path>>(path: P) -> Result<Self, String> {
- let path = path.as_ref();
- let bytes = std::fs::read(path)
- .map_err(|e| format!("can't read notifier config at {}: {:?}", path.display(), e))?;
- let config = serde_json::from_slice(&bytes)
- .map_err(|e| format!("can't deserialize notifier config at {}: {:?}", path.display(), e))?;
-
- if matches!(config, NotifierConfig::GitHub { .. }) {
- Ok(config)
- } else {
- Err(format!("config at {} doesn't look like a github config (but was otherwise valid?)", path.display()))
- }
- }
-
- pub fn email_from_file<P: AsRef<Path>>(path: P) -> Result<Self, String> {
- let path = path.as_ref();
- let bytes = std::fs::read(path)
- .map_err(|e| format!("can't read notifier config at {}: {:?}", path.display(), e))?;
- let config = serde_json::from_slice(&bytes)
- .map_err(|e| format!("can't deserialize notifier config at {}: {:?}", path.display(), e))?;
-
- if matches!(config, NotifierConfig::Email { .. }) {
- Ok(config)
- } else {
- Err(format!("config at {} doesn't look like an email config (but was otherwise valid?)", path.display()))
- }
- }
-}
-
-impl RemoteNotifier {
- pub async fn tell_pending_job(&self, ctx: &Arc<DbCtx>, repo_id: u64, sha: &str, job_id: u64) -> Result<(), String> {
- self.tell_job_status(
- ctx,
- repo_id, sha, job_id,
- "pending", "build is queued", &format!("https://{}/{}/{}", "ci.butactuallyin.space", &self.remote_path, sha)
- ).await
- }
-
- pub async fn tell_complete_job(&self, ctx: &Arc<DbCtx>, repo_id: u64, sha: &str, job_id: u64, desc: Result<String, String>) -> Result<(), String> {
- match desc {
- Ok(status) => {
- self.tell_job_status(
- ctx,
- repo_id, sha, job_id,
- "success", &status, &format!("https://{}/{}/{}", "ci.butactuallyin.space", &self.remote_path, sha)
- ).await
- },
- Err(status) => {
- self.tell_job_status(
- ctx,
- repo_id, sha, job_id,
- "failure", &status, &format!("https://{}/{}/{}", "ci.butactuallyin.space", &self.remote_path, sha)
- ).await
- }
- }
- }
-
- pub async fn tell_job_status(&self, ctx: &Arc<DbCtx>, repo_id: u64, sha: &str, job_id: u64, state: &str, desc: &str, target_url: &str) -> Result<(), String> {
- match &self.notifier {
- NotifierConfig::GitHub { token } => {
- let status_info = serde_json::json!({
- "state": state,
- "description": desc,
- "target_url": target_url,
- "context": "actuallyinspace runner",
- });
-
- // TODO: should pool (probably in ctx?) to have an upper bound in concurrent
- // connections.
- let client = reqwest::Client::new();
- let req = client.post(&format!("https://api.github.com/repos/{}/statuses/{}", &self.remote_path, sha))
- .body(serde_json::to_string(&status_info).expect("can stringify json"))
- .header("content-type", "application/json")
- .header("user-agent", "iximeow")
- .header("authorization", format!("Bearer {}", token))
- .header("accept", "application/vnd.github+json");
- eprintln!("sending {:?}", req);
- eprintln!(" body: {}", serde_json::to_string(&status_info).expect("can stringify json"));
- let res = req
- .send()
- .await;
-
- match res {
- Ok(res) => {
- if res.status() == StatusCode::OK || res.status() == StatusCode::CREATED{
- Ok(())
- } else {
- Err(format!("bad response: {}, response data: {:?}", res.status().as_u16(), res))
- }
- }
- Err(e) => {
- Err(format!("failure sending request: {:?}", e))
- }
- }
- }
- NotifierConfig::Email { username, password, mailserver, from, to } => {
- eprintln!("[.] emailing {} for job {} via {}", state, &self.remote_path, mailserver);
-
- let subject = format!("{}: job for {}", state, &self.remote_path);
-
- let body = format!("{}", subject);
-
- // TODO: when ci.butactuallyin.space has valid certs again, ... fix this.
- let tls = TlsParametersBuilder::new(mailserver.to_string())
- .dangerous_accept_invalid_certs(true)
- .build()
- .unwrap();
-
- let mut mailer = SmtpConnection::connect(
- mailserver,
- Some(Duration::from_millis(5000)),
- &ClientId::Domain("ci.butactuallyin.space".to_string()),
- None,
- None,
- ).unwrap();
-
- mailer.starttls(
- &tls,
- &ClientId::Domain("ci.butactuallyin.space".to_string()),
- ).unwrap();
-
- let resp = mailer.auth(
- &[Mechanism::Plain, Mechanism::Login],
- &Credentials::new(username.to_owned(), password.to_owned())
- ).unwrap();
- assert!(resp.is_positive());
-
- let email = Message::builder()
- .from(from.parse().unwrap())
- .to(to.parse().unwrap())
- .subject(&subject)
- .body(body)
- .unwrap();
-
- match mailer.send(email.envelope(), &email.formatted()) {
- Ok(_) => {
- eprintln!("[+] notified {}@{}", username, mailserver);
- Ok(())
- }
- Err(e) => {
- eprintln!("[-] could not send email: {:?}", e);
- Err(e.to_string())
- }
- }
- }
- }
- }
-}
diff --git a/src/protocol.rs b/src/protocol.rs
deleted file mode 100644
index c7a9318..0000000
--- a/src/protocol.rs
+++ /dev/null
@@ -1,114 +0,0 @@
-use serde::{Serialize, Deserialize};
-
-#[derive(Serialize, Deserialize, Debug)]
-#[serde(tag = "kind")]
-#[serde(rename_all = "snake_case")]
-pub enum ClientProto {
- Started,
- ArtifactCreate,
- NewTask(RequestedJob),
- NewTaskPlease { allowed_pushers: Option<Vec<String>>, host_info: HostInfo },
- Metric { name: String, value: String },
- Command(CommandInfo),
- TaskStatus(TaskInfo),
- Ping,
- Pong,
-}
-
-#[derive(Serialize, Deserialize, Debug)]
-#[serde(tag = "command_info")]
-#[serde(rename_all = "snake_case")]
-pub enum CommandInfo {
- Started { command: Vec<String>, cwd: Option<String>, id: u32 },
- Finished { exit_code: Option<i32>, id: u32 },
-}
-
-#[derive(Serialize, Deserialize, Debug)]
-#[serde(tag = "task_info")]
-#[serde(rename_all = "snake_case")]
-pub enum TaskInfo {
- Finished { status: String },
- Interrupted { status: String, description: Option<String> },
-}
-
-impl ClientProto {
- pub fn metric(name: impl Into<String>, value: impl Into<String>) -> Self {
- ClientProto::Metric { name: name.into(), value: value.into() }
- }
-
- pub fn command(state: CommandInfo) -> Self {
- ClientProto::Command(state)
- }
-
- pub fn new_task_please(allowed_pushers: Option<Vec<String>>, host_info: HostInfo) -> Self {
- ClientProto::NewTaskPlease { allowed_pushers, host_info }
- }
-
- pub fn task_status(state: TaskInfo) -> Self {
- ClientProto::TaskStatus(state)
- }
-
- pub fn new_task(task: RequestedJob) -> Self {
- ClientProto::NewTask(task)
- }
-}
-
-impl CommandInfo {
- pub fn started(command: impl Into<Vec<String>>, cwd: Option<&str>, id: u32) -> Self {
- CommandInfo::Started { command: command.into(), cwd: cwd.map(ToOwned::to_owned), id }
- }
-
- pub fn finished(exit_code: Option<i32>, id: u32) -> Self {
- CommandInfo::Finished { exit_code, id }
- }
-}
-
-impl TaskInfo {
- pub fn finished(status: impl Into<String>) -> Self {
- TaskInfo::Finished { status: status.into() }
- }
-
- pub fn interrupted(status: impl Into<String>, description: impl Into<Option<String>>) -> Self {
- TaskInfo::Interrupted { status: status.into(), description: description.into() }
- }
-}
-
-#[derive(Serialize, Deserialize, Debug, Clone)]
-pub struct HostInfo {
- pub hostname: String,
- pub cpu_info: CpuInfo,
- pub memory_info: MemoryInfo,
- pub env_info: EnvInfo,
-}
-
-#[derive(Serialize, Deserialize, Debug, Clone)]
-pub struct CpuInfo {
- pub model_name: String,
- pub microcode: String,
- pub cores: u32,
- pub vendor_id: String,
- pub family: String,
- pub model: String,
- // clock speed in khz
- pub max_freq: u64,
-}
-
-#[derive(Serialize, Deserialize, Debug, Clone)]
-pub struct MemoryInfo {
- pub total: String,
- pub available: String,
-}
-
-#[derive(Serialize, Deserialize, Debug, Clone)]
-pub struct EnvInfo {
- pub arch: String,
- pub family: String,
- pub os: String,
-}
-
-#[derive(Debug, Clone, Serialize, Deserialize)]
-pub struct RequestedJob {
- pub commit: String,
- pub remote_url: String,
- pub build_token: String,
-}
diff --git a/src/sql.rs b/src/sql.rs
deleted file mode 100644
index 1071279..0000000
--- a/src/sql.rs
+++ /dev/null
@@ -1,229 +0,0 @@
-#![allow(dead_code)]
-
-use std::convert::TryFrom;
-
-use crate::dbctx::Run;
-use crate::dbctx::Job;
-
-#[derive(Debug, Clone)]
-pub enum JobResult {
- Pass = 0,
- Fail = 1,
-}
-
-#[derive(Debug, Copy, Clone, PartialEq)]
-pub enum RunState {
- Pending = 0,
- Started = 1,
- Finished = 2,
- Error = 3,
- Invalid = 4,
-}
-
-impl TryFrom<u8> for RunState {
- type Error = String;
-
- fn try_from(value: u8) -> Result<Self, String> {
- match value {
- 0 => Ok(RunState::Pending),
- 1 => Ok(RunState::Started),
- 2 => Ok(RunState::Finished),
- 3 => Ok(RunState::Error),
- 4 => Ok(RunState::Invalid),
- other => Err(format!("invalid job state: {}", other)),
- }
- }
-}
-
-pub(crate) fn row2run(row: &rusqlite::Row) -> Run {
- let (id, job_id, artifacts_path, state, host_id, build_token, create_time, start_time, complete_time, run_timeout, build_result, final_text) = row.try_into().unwrap();
- let state: u8 = state;
- Run {
- id,
- job_id,
- artifacts_path,
- state: state.try_into().unwrap(),
- host_id,
- create_time,
- start_time,
- complete_time,
- build_token,
- run_timeout,
- build_result,
- final_text,
- }
-}
-
-// remote_id is the remote from which we were notified. this is necessary so we know which remote
-// to pull from to actually run the job.
-pub const CREATE_JOBS_TABLE: &'static str = "\
- CREATE TABLE IF NOT EXISTS jobs (id INTEGER PRIMARY KEY AUTOINCREMENT,
- source TEXT,
- created_time INTEGER,
- remote_id INTEGER,
- commit_id INTEGER,
- run_preferences TEXT);";
-
-pub const CREATE_METRICS_TABLE: &'static str = "\
- CREATE TABLE IF NOT EXISTS metrics (id INTEGER PRIMARY KEY AUTOINCREMENT,
- job_id INTEGER,
- name TEXT,
- value TEXT,
- UNIQUE(job_id, name)
- );";
-
-pub const CREATE_COMMITS_TABLE: &'static str = "\
- CREATE TABLE IF NOT EXISTS commits (id INTEGER PRIMARY KEY AUTOINCREMENT, sha TEXT UNIQUE);";
-
-pub const CREATE_REPOS_TABLE: &'static str = "\
- CREATE TABLE IF NOT EXISTS repos (id INTEGER PRIMARY KEY AUTOINCREMENT,
- repo_name TEXT,
- default_run_preference TEXT);";
-
-// remote_api is `github` or NULL for now. hopefully a future cgit-style notifier one day.
-// remote_path is some unique identifier for the relevant remote.
-// * for `github` remotes, this will be `owner/repo`.
-// * for others.. who knows.
-// remote_url is a url for human interaction with the remote (think https://git.iximeow.net/zvm)
-// remote_git_url is a url that can be `git clone`'d to fetch sources
-pub const CREATE_REMOTES_TABLE: &'static str = "\
- CREATE TABLE IF NOT EXISTS remotes (id INTEGER PRIMARY KEY AUTOINCREMENT,
- repo_id INTEGER,
- remote_path TEXT,
- remote_api TEXT,
- remote_url TEXT,
- remote_git_url TEXT,
- notifier_config_path TEXT);";
-
-pub const CREATE_ARTIFACTS_TABLE: &'static str = "\
- CREATE TABLE IF NOT EXISTS artifacts (id INTEGER PRIMARY KEY AUTOINCREMENT,
- run_id INTEGER,
- name TEXT,
- desc TEXT,
- created_time INTEGER,
- completed_time INTEGER);";
-
-pub const CREATE_RUNS_TABLE: &'static str = "\
- CREATE TABLE IF NOT EXISTS runs (id INTEGER PRIMARY KEY AUTOINCREMENT,
- job_id INTEGER,
- artifacts_path TEXT,
- state INTEGER NOT NULL,
- host_id INTEGER,
- build_token TEXT,
- created_time INTEGER,
- started_time INTEGER,
- complete_time INTEGER,
- run_timeout INTEGER,
- build_result INTEGER,
- final_status TEXT);";
-
-pub const CREATE_HOSTS_TABLE: &'static str = "\
- CREATE TABLE IF NOT EXISTS hosts (id INTEGER PRIMARY KEY AUTOINCREMENT,
- hostname TEXT,
- cpu_vendor_id TEXT,
- cpu_model_name TEXT,
- cpu_family TEXT,
- cpu_model TEXT,
- cpu_microcode TEXT,
- cpu_max_freq_khz INTEGER,
- cpu_cores INTEGER,
- mem_total TEXT,
- arch TEXT,
- family TEXT,
- os TEXT,
- UNIQUE(hostname, cpu_vendor_id, cpu_model_name, cpu_family, cpu_model, cpu_microcode, cpu_cores, mem_total, arch, family, os));";
-
-pub const CREATE_REMOTES_INDEX: &'static str = "\
- CREATE INDEX IF NOT EXISTS 'repo_to_remote' ON remotes(repo_id);";
-
-pub const CREATE_REPO_NAME_INDEX: &'static str = "\
- CREATE UNIQUE INDEX IF NOT EXISTS 'repo_names' ON repos(repo_name);";
-
-pub const PENDING_RUNS: &'static str = "\
- select id, job_id, created_time, host_preference from runs where state=0 and (host_preference=?1 or host_preference is null) order by created_time desc;";
-
-pub const JOBS_NEEDING_HOST_RUN: &'static str = "\
- select jobs.id, jobs.source, jobs.created_time, jobs.remote_id, jobs.commit_id, jobs.run_preferences from jobs \
- where jobs.run_preferences=\"all\" and jobs.created_time > ?1 \
- and not exists \
- (select 1 from runs r2 where r2.job_id = jobs.id and r2.host_id = ?2);";
-
-pub const ACTIVE_RUNS: &'static str = "\
- select id,
- job_id,
- artifacts_path,
- state,
- host_id,
- build_token,
- created_time,
- started_time,
- complete_time,
- run_timeout,
- build_result,
- final_status from runs where state=1 or state=0;";
-
-pub const LAST_ARTIFACTS_FOR_RUN: &'static str = "\
- select * from artifacts where run_id=?1 and (name like \"%(stderr)%\" or name like \"%(stdout)%\") order by id desc limit ?2;";
-
-pub const JOB_BY_COMMIT_ID: &'static str = "\
- select id, source, created_time, remote_id, commit_id, run_preferences from jobs where commit_id=?1;";
-
-pub const ARTIFACT_BY_ID: &'static str = "\
- select * from artifacts where id=?1 and run_id=?2;";
-
-pub const JOB_BY_ID: &'static str = "\
- select id, source, created_time, remote_id, commit_id, run_preferences from jobs where id=?1";
-
-pub const METRICS_FOR_RUN: &'static str = "\
- select * from metrics where run_id=?1 order by id asc;";
-
-pub const METRICS_FOR_JOB: &'static str = "\
- select metrics.id, metrics.run_id, metrics.name, metrics.value from metrics \
- join runs on runs.id=metrics.run_id \
- where runs.job_id=?1 \
- order by metrics.run_id desc, metrics.id desc;";
-
-pub const COMMIT_TO_ID: &'static str = "\
- select id from commits where sha=?1;";
-
-pub const REMOTES_FOR_REPO: &'static str = "\
- select * from remotes where repo_id=?1;";
-
-pub const ALL_REPOS: &'static str = "\
- select id, repo_name, default_run_preference from repos;";
-
-pub const LAST_JOBS_FROM_REMOTE: &'static str = "\
- select id, source, created_time, remote_id, commit_id, run_preferences from jobs where remote_id=?1 order by created_time desc limit ?2;";
-
-pub const LAST_RUN_FOR_JOB: &'static str = "\
- select id,
- job_id,
- artifacts_path,
- state,
- host_id,
- build_token,
- created_time,
- started_time,
- complete_time,
- run_timeout,
- build_result,
- final_status from runs where job_id=?1 order by started_time desc limit 1;";
-
-pub const RUNS_FOR_JOB: &'static str = "\
- select id,
- job_id,
- artifacts_path,
- state,
- host_id,
- build_token,
- created_time,
- started_time,
- complete_time,
- run_timeout,
- build_result,
- final_status from runs where job_id=?1 group by host_id order by started_time desc, state asc;";
-
-pub const SELECT_ALL_RUNS_WITH_JOB_INFO: &'static str = "\
- select jobs.id as job_id, runs.id as run_id, runs.state, runs.created_time, jobs.commit_id, jobs.run_preferences
- from jobs join runs on jobs.id=runs.job_id
- oder by runs.created_time asc;";