diff options
-rw-r--r-- | Cargo.lock | 98 | ||||
-rw-r--r-- | Cargo.toml | 31 | ||||
-rw-r--r-- | ci-ctl/Cargo.toml | 15 | ||||
-rw-r--r-- | ci-ctl/src/main.rs (renamed from src/ci_ctl.rs) | 12 | ||||
-rw-r--r-- | ci-driver/Cargo.toml | 30 | ||||
-rw-r--r-- | ci-driver/src/main.rs (renamed from src/ci_driver.rs) | 26 | ||||
-rw-r--r-- | ci-lib-core/Cargo.toml | 13 | ||||
-rw-r--r-- | ci-lib-core/src/dbctx.rs (renamed from src/dbctx.rs) | 203 | ||||
-rw-r--r-- | ci-lib-core/src/lib.rs | 13 | ||||
-rw-r--r-- | ci-lib-core/src/protocol.rs (renamed from src/protocol.rs) | 0 | ||||
-rw-r--r-- | ci-lib-core/src/sql.rs (renamed from src/sql.rs) | 94 | ||||
-rw-r--r-- | ci-lib-native/Cargo.toml | 20 | ||||
-rw-r--r-- | ci-lib-native/src/dbctx_ext.rs | 62 | ||||
-rw-r--r-- | ci-lib-native/src/io.rs (renamed from src/io.rs) | 7 | ||||
-rw-r--r-- | ci-lib-native/src/lib.rs | 3 | ||||
-rw-r--r-- | ci-lib-native/src/notifier.rs (renamed from src/notifier.rs) | 4 | ||||
-rw-r--r-- | ci-runner/Cargo.toml | 25 | ||||
-rw-r--r-- | ci-runner/src/lua/mod.rs (renamed from src/lua/mod.rs) | 4 | ||||
-rw-r--r-- | ci-runner/src/main.rs (renamed from src/ci_runner.rs) | 24 | ||||
-rw-r--r-- | ci-web-server/Cargo.toml | 31 | ||||
-rw-r--r-- | ci-web-server/src/main.rs (renamed from src/main.rs) | 33 |
21 files changed, 501 insertions, 247 deletions
@@ -312,6 +312,103 @@ dependencies = [ ] [[package]] +name = "ci-ctl" +version = "0.0.1" +dependencies = [ + "ci-lib-core", + "ci-lib-native", + "clap 4.0.29", +] + +[[package]] +name = "ci-driver" +version = "0.0.1" +dependencies = [ + "axum", + "axum-extra", + "axum-macros", + "axum-server", + "base64", + "ci-lib-core", + "ci-lib-native", + "futures-util", + "hyper", + "lazy_static", + "lettre", + "reqwest", + "serde", + "serde_json", + "tokio", + "tokio-stream", + "tracing-subscriber 0.3.16", +] + +[[package]] +name = "ci-lib-core" +version = "0.0.1" +dependencies = [ + "rusqlite", + "serde", +] + +[[package]] +name = "ci-lib-native" +version = "0.0.1" +dependencies = [ + "axum", + "ci-lib-core", + "futures-util", + "hyper", + "lettre", + "reqwest", + "serde", + "serde_json", + "tokio", +] + +[[package]] +name = "ci-runner" +version = "0.0.1" +dependencies = [ + "ci-lib-core", + "ci-lib-native", + "hyper", + "libc", + "reqwest", + "rlua", + "serde", + "serde_derive", + "serde_json", + "tokio", + "tracing", + "tracing-subscriber 0.3.16", +] + +[[package]] +name = "ci-web-server" +version = "0.0.1" +dependencies = [ + "axum", + "axum-extra", + "axum-server", + "chrono", + "ci-lib-core", + "ci-lib-native", + "hex", + "hmac", + "http", + "http-body", + "lazy_static", + "rusqlite", + "serde", + "serde_json", + "sha2", + "tokio", + "tokio-stream", + "tracing-subscriber 0.3.16", +] + +[[package]] name = "clap" version = "3.2.23" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1294,6 +1391,7 @@ version = "0.25.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "29f835d03d717946d28b1d1ed632eb6f0e24a299388ee623d0c23118d3e8a7fa" dependencies = [ + "cc", "pkg-config", "vcpkg", ] @@ -5,6 +5,21 @@ authors = [ "iximeow <me@iximeow.net>" ] license = "0BSD" edition = "2021" +[lib] + +[workspace] +members = [ + "ci-lib-core", + "ci-lib-native", + "ci-runner", + "ci-driver", + "ci-web-server", + "ci-ctl", +] +exclude = [ + "ci-wasm-frontend" +] + [dependencies] lazy_static = "*" axum = { version = "*" } @@ -37,19 +52,3 @@ reqwest = { version = "*", features = ["rustls-tls-native-roots"] } clap = { version = "*", features = ["derive"] } rlua = "*" chrono = "*" - -[[bin]] -name = "ci_webserver" -path = "src/main.rs" - -[[bin]] -name = "ci_driver" -path = "src/ci_driver.rs" - -[[bin]] -name = "ci_ctl" -path = "src/ci_ctl.rs" - -[[bin]] -name = "ci_runner" -path = "src/ci_runner.rs" diff --git a/ci-ctl/Cargo.toml b/ci-ctl/Cargo.toml new file mode 100644 index 0000000..e2662ad --- /dev/null +++ b/ci-ctl/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "ci-ctl" +version = "0.0.1" +authors = [ "iximeow <me@iximeow.net>" ] +license = "0BSD" +edition = "2021" + +[[bin]] +name = "ci_ctl" +path = "src/main.rs" + +[dependencies] +ci-lib-core = { path = "../ci-lib-core" } +ci-lib-native = { path = "../ci-lib-native" } +clap = { version = "4", features = ["derive"] } diff --git a/src/ci_ctl.rs b/ci-ctl/src/main.rs index f0ffa62..bd2f733 100644 --- a/src/ci_ctl.rs +++ b/ci-ctl/src/main.rs @@ -1,13 +1,7 @@ use clap::{Parser, Subcommand}; -mod sql; -mod dbctx; -mod notifier; -mod io; -mod protocol; - -use dbctx::DbCtx; -use notifier::NotifierConfig; +use ci_lib_core::dbctx::DbCtx; +use ci_lib_native::notifier::NotifierConfig; #[derive(Parser)] #[command(version, about, long_about = None)] @@ -87,7 +81,7 @@ fn main() { JobAction::List => { let db = DbCtx::new(&config_path, &db_path); let mut conn = db.conn.lock().unwrap(); - let mut query = conn.prepare(crate::sql::SELECT_ALL_RUNS_WITH_JOB_INFO).unwrap(); + let mut query = conn.prepare(ci_lib_core::sql::SELECT_ALL_RUNS_WITH_JOB_INFO).unwrap(); let mut jobs = query.query([]).unwrap(); while let Some(row) = jobs.next().unwrap() { let (job_id, run_id, state, created_time, commit_id, run_preferences): (u64, u64, u64, u64, u64, Option<String>) = row.try_into().unwrap(); diff --git a/ci-driver/Cargo.toml b/ci-driver/Cargo.toml new file mode 100644 index 0000000..697f929 --- /dev/null +++ b/ci-driver/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "ci-driver" +version = "0.0.1" +authors = [ "iximeow <me@iximeow.net>" ] +license = "0BSD" +edition = "2021" + +[[bin]] +name = "ci_driver" +path = "src/main.rs" + +[dependencies] +ci-lib-core = { path = "../ci-lib-core" } +ci-lib-native = { path = "../ci-lib-native" } + +axum = { version = "*" } +axum-extra = { version = "*", features = ["async-read-body"] } +axum-server = { version = "*", features = ["tls-rustls"] } +axum-macros = "*" +serde_json = "*" +serde = { version = "*", features = ["derive"] } +base64 = "*" +tokio = { version = "*", features = ["full"] } +tokio-stream = "*" +tracing-subscriber = "*" +hyper = "*" +futures-util = "*" +lazy_static = "*" +lettre = "*" +reqwest = "*" diff --git a/src/ci_driver.rs b/ci-driver/src/main.rs index f6c1828..f8ff34c 100644 --- a/src/ci_driver.rs +++ b/ci-driver/src/main.rs @@ -3,7 +3,6 @@ use std::collections::HashMap; use std::sync::{Mutex, RwLock}; use lazy_static::lazy_static; use std::io::Read; -use serde_derive::{Deserialize, Serialize}; use futures_util::StreamExt; use std::fmt; use std::path::{Path, PathBuf}; @@ -23,17 +22,14 @@ use axum::response::IntoResponse; use tokio::sync::mpsc; use tokio::sync::mpsc::error::TrySendError; use serde_json::json; +use serde::{Deserialize, Serialize}; -mod dbctx; -mod sql; -mod notifier; -mod io; -mod protocol; - -use crate::dbctx::{DbCtx, PendingRun, Job, Run}; -use crate::sql::JobResult; -use crate::sql::RunState; -use crate::protocol::{ClientProto, CommandInfo, TaskInfo, RequestedJob}; +use ci_lib_core::dbctx::DbCtx; +use ci_lib_core::sql; +use ci_lib_core::sql::{PendingRun, Job, Run}; +use ci_lib_core::sql::JobResult; +use ci_lib_core::sql::RunState; +use ci_lib_core::protocol::{ClientProto, CommandInfo, TaskInfo, RequestedJob}; lazy_static! { static ref AUTH_SECRET: RwLock<Option<String>> = RwLock::new(None); @@ -173,7 +169,7 @@ impl ClientJob { let job = self.dbctx.job_by_id(self.task.job_id).expect("can query").expect("job exists"); let repo_id = self.dbctx.repo_id_by_remote(job.remote_id).unwrap().expect("remote exists"); - for notifier in self.dbctx.notifiers_by_repo(repo_id).expect("can get notifiers") { + for notifier in ci_lib_native::dbctx_ext::notifiers_by_repo(&self.dbctx, repo_id).expect("can get notifiers") { if let Err(e) = notifier.tell_complete_job(&self.dbctx, repo_id, &self.sha, self.task.id, result.clone()).await { eprintln!("could not notify {:?}: {:?}", notifier.remote_path, e); } @@ -354,7 +350,7 @@ async fn handle_artifact(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClien } }; - if token_validity != dbctx::TokenValidity::Valid { + if token_validity != sql::TokenValidity::Valid { eprintln!("bad artifact post: headers: {:?}. token is not valid: {:?}", headers, token_validity); return (StatusCode::BAD_REQUEST, "").into_response(); } @@ -382,7 +378,7 @@ async fn handle_artifact(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClien } }; - let mut artifact = match ctx.0.reserve_artifact(run, artifact_name, artifact_desc).await { + let mut artifact = match ci_lib_native::dbctx_ext::reserve_artifact(&ctx.0, run, artifact_name, artifact_desc).await { Ok(artifact) => artifact, Err(err) => { eprintln!("failure to reserve artifact: {:?}", err); @@ -603,7 +599,7 @@ async fn old_task_reaper(dbctx: Arc<DbCtx>) { // period for clients that have accepted a job but for some reason we have not recorded them as // active (perhaps they are slow to ack somehow). - let stale_threshold = crate::io::now_ms() - 60_000; + let stale_threshold = ci_lib_core::now_ms() - 60_000; let stale_tasks: Vec<Run> = potentially_stale_tasks.into_iter().filter(|task| { match (task.state, task.start_time) { diff --git a/ci-lib-core/Cargo.toml b/ci-lib-core/Cargo.toml new file mode 100644 index 0000000..5ec649a --- /dev/null +++ b/ci-lib-core/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "ci-lib-core" +version = "0.0.1" +authors = [ "iximeow <me@iximeow.net>" ] +license = "0BSD" +edition = "2021" +description = "shared code across the ci project that is applicable for all targets" + +[lib] + +[dependencies] +serde = { version = "*", features = ["derive"] } +rusqlite = { version = "*", features = ["bundled"] } diff --git a/src/dbctx.rs b/ci-lib-core/src/dbctx.rs index 7378b2e..7493030 100644 --- a/src/dbctx.rs +++ b/ci-lib-core/src/dbctx.rs @@ -1,16 +1,23 @@ use std::sync::Mutex; -use futures_util::StreamExt; +// use futures_util::StreamExt; use rusqlite::{Connection, OptionalExtension}; use std::time::{SystemTime, UNIX_EPOCH}; -use tokio::fs::{File, OpenOptions}; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; +// use tokio::io::{AsyncReadExt, AsyncWriteExt}; use std::path::Path; use std::path::PathBuf; +use std::ops::Deref; -use crate::io::ArtifactDescriptor; -use crate::notifier::{RemoteNotifier, NotifierConfig}; use crate::sql; +use crate::sql::ArtifactRecord; +use crate::sql::Run; +use crate::sql::TokenValidity; +use crate::sql::MetricRecord; +use crate::sql::PendingRun; +use crate::sql::Job; +use crate::sql::Remote; +use crate::sql::Repo; + const TOKEN_EXPIRY_MS: u64 = 1000 * 60 * 30; pub struct DbCtx { @@ -19,97 +26,6 @@ pub struct DbCtx { pub conn: Mutex<Connection>, } -#[derive(Debug, Clone)] -pub struct Repo { - pub id: u64, - pub name: String, - pub default_run_preference: Option<String>, -} - -#[derive(Debug)] -pub struct Remote { - pub id: u64, - pub repo_id: u64, - pub remote_path: String, - pub remote_api: String, - pub remote_url: String, - pub remote_git_url: String, - pub notifier_config_path: String, -} - -// a job tracks when we became aware of a commit from remote. typically a job will have a 1-1 -// relationship with commits, and potentially many *runs* of that job. -#[derive(Debug, Clone)] -pub struct Job { - pub id: u64, - pub remote_id: u64, - pub commit_id: u64, - pub created_time: u64, - pub source: Option<String>, - pub run_preferences: Option<String>, -} - -// a run tracks the intent or obligation to have some runner somewhere run a goodfile and report -// results. a job may have many runs from many different hosts rebuliding history, or reruns of the -// same job on the same hardware to collect more datapoints on the operation. -#[derive(Debug, Clone)] -pub struct Run { - pub id: u64, - pub job_id: u64, - pub artifacts_path: Option<String>, - pub state: sql::RunState, - pub host_id: Option<u64>, - pub create_time: u64, - pub start_time: Option<u64>, - pub complete_time: Option<u64>, - pub build_token: Option<String>, - pub run_timeout: Option<u64>, - pub build_result: Option<u8>, - pub final_text: Option<String>, -} - -impl Run { - fn into_pending_run(self) -> PendingRun { - PendingRun { - id: self.id, - job_id: self.job_id, - create_time: self.create_time, - } - } -} - -#[derive(Debug, Clone)] -pub struct PendingRun { - pub id: u64, - pub job_id: u64, - pub create_time: u64, -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum TokenValidity { - Expired, - Invalid, - Valid, -} - -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct MetricRecord { - pub id: u64, - pub run_id: u64, - pub name: String, - pub value: String -} - -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct ArtifactRecord { - pub id: u64, - pub run_id: u64, - pub name: String, - pub desc: String, - pub created_time: u64, - pub completed_time: Option<u64>, -} - impl DbCtx { pub fn new<P: AsRef<Path>>(config_path: P, db_path: P) -> Self { DbCtx { @@ -118,6 +34,10 @@ impl DbCtx { } } + fn conn<'a>(&'a self) -> impl Deref<Target = Connection> + 'a { + self.conn.lock().unwrap() + } + pub fn create_tables(&self) -> Result<(), ()> { let conn = self.conn.lock().unwrap(); conn.execute(sql::CREATE_ARTIFACTS_TABLE, ()).unwrap(); @@ -176,7 +96,7 @@ impl DbCtx { conn .execute( "update artifacts set completed_time=?1 where id=?2", - (crate::io::now_ms(), artifact_id) + (crate::now_ms(), artifact_id) ) .map(|_| ()) .map_err(|e| { @@ -184,28 +104,6 @@ impl DbCtx { }) } - pub async fn reserve_artifact(&self, run_id: u64, name: &str, desc: &str) -> Result<ArtifactDescriptor, String> { - let artifact_id = { - let created_time = SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("now is before epoch") - .as_millis() as u64; - let conn = self.conn.lock().unwrap(); - conn - .execute( - "insert into artifacts (run_id, name, desc, created_time) values (?1, ?2, ?3, ?4)", - (run_id, name, desc, created_time) - ) - .map_err(|e| { - format!("{:?}", e) - })?; - - conn.last_insert_rowid() as u64 - }; - - ArtifactDescriptor::new(run_id, artifact_id).await - } - pub fn lookup_artifact(&self, run_id: u64, artifact_id: u64) -> Result<Option<ArtifactRecord>, String> { let conn = self.conn.lock().unwrap(); conn @@ -253,14 +151,11 @@ impl DbCtx { let timeout: Option<u64> = row.get(3).unwrap(); let timeout = timeout.unwrap_or(TOKEN_EXPIRY_MS); - let now = SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("now is before epoch") - .as_millis(); + let now = crate::now_ms(); let time: Option<u64> = row.get(2).unwrap(); let validity = if let Some(time) = time { - if now > time as u128 + timeout as u128 { + if now > time + timeout { TokenValidity::Expired } else { TokenValidity::Valid @@ -536,7 +431,7 @@ impl DbCtx { let mut started = Vec::new(); while let Some(row) = runs.next().unwrap() { - started.push(crate::sql::row2run(row)); + started.push(Self::row2run(row)); } Ok(started) @@ -569,7 +464,7 @@ impl DbCtx { // we don't want to rebuild the entire history every time we see a new host by default; if // you really want to rebuild all of history on a new host, use `ci_ctl` to prepare the // runs. - let cutoff = crate::io::now_ms() - 24 * 3600 * 1000; + let cutoff = crate::now_ms() - 24 * 3600 * 1000; let conn = self.conn.lock().unwrap(); @@ -714,7 +609,7 @@ impl DbCtx { let mut results = Vec::new(); while let Some(row) = runs_results.next().unwrap() { - results.push(crate::sql::row2run(row)); + results.push(Self::row2run(row)); } Ok(results) @@ -725,48 +620,28 @@ impl DbCtx { conn .query_row(sql::LAST_RUN_FOR_JOB, [job_id], |row| { - Ok(crate::sql::row2run(row)) + Ok(Self::row2run(row)) }) .optional() .map_err(|e| e.to_string()) } - pub fn notifiers_by_repo(&self, repo_id: u64) -> Result<Vec<RemoteNotifier>, String> { - let remotes = self.remotes_by_repo(repo_id)?; - - let mut notifiers: Vec<RemoteNotifier> = Vec::new(); - - for remote in remotes.into_iter() { - match remote.remote_api.as_str() { - "github" => { - let mut notifier_path = self.config_path.clone(); - notifier_path.push(&remote.notifier_config_path); - - let notifier = RemoteNotifier { - remote_path: remote.remote_path, - notifier: NotifierConfig::github_from_file(¬ifier_path) - .expect("can load notifier config") - }; - notifiers.push(notifier); - }, - "email" => { - let mut notifier_path = self.config_path.clone(); - notifier_path.push(&remote.notifier_config_path); - - let notifier = RemoteNotifier { - remote_path: remote.remote_path, - notifier: NotifierConfig::email_from_file(¬ifier_path) - .expect("can load notifier config") - }; - notifiers.push(notifier); - } - other => { - eprintln!("unknown remote api kind: {:?}, remote is {:?}", other, &remote) - } - } + pub(crate) fn row2run(row: &rusqlite::Row) -> Run { + let (id, job_id, artifacts_path, state, host_id, build_token, create_time, start_time, complete_time, run_timeout, build_result, final_text) = row.try_into().unwrap(); + let state: u8 = state; + Run { + id, + job_id, + artifacts_path, + state: state.try_into().unwrap(), + host_id, + create_time, + start_time, + complete_time, + build_token, + run_timeout, + build_result, + final_text, } - - Ok(notifiers) } } - diff --git a/ci-lib-core/src/lib.rs b/ci-lib-core/src/lib.rs new file mode 100644 index 0000000..c20ce8e --- /dev/null +++ b/ci-lib-core/src/lib.rs @@ -0,0 +1,13 @@ +use std::time::{SystemTime, UNIX_EPOCH}; + + +pub mod protocol; +pub mod sql; +pub mod dbctx; + +pub fn now_ms() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("now is later than epoch") + .as_millis() as u64 +} diff --git a/src/protocol.rs b/ci-lib-core/src/protocol.rs index c7a9318..c7a9318 100644 --- a/src/protocol.rs +++ b/ci-lib-core/src/protocol.rs diff --git a/src/sql.rs b/ci-lib-core/src/sql.rs index 1071279..2aeb52b 100644 --- a/src/sql.rs +++ b/ci-lib-core/src/sql.rs @@ -2,8 +2,96 @@ use std::convert::TryFrom; -use crate::dbctx::Run; -use crate::dbctx::Job; +#[derive(Debug, Clone)] +pub struct PendingRun { + pub id: u64, + pub job_id: u64, + pub create_time: u64, +} + +impl Run { + fn into_pending_run(self) -> PendingRun { + PendingRun { + id: self.id, + job_id: self.job_id, + create_time: self.create_time, + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum TokenValidity { + Expired, + Invalid, + Valid, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct MetricRecord { + pub id: u64, + pub run_id: u64, + pub name: String, + pub value: String +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ArtifactRecord { + pub id: u64, + pub run_id: u64, + pub name: String, + pub desc: String, + pub created_time: u64, + pub completed_time: Option<u64>, +} + +#[derive(Debug, Clone)] +pub struct Repo { + pub id: u64, + pub name: String, + pub default_run_preference: Option<String>, +} + +#[derive(Debug)] +pub struct Remote { + pub id: u64, + pub repo_id: u64, + pub remote_path: String, + pub remote_api: String, + pub remote_url: String, + pub remote_git_url: String, + pub notifier_config_path: String, +} + +// a job tracks when we became aware of a commit from remote. typically a job will have a 1-1 +// relationship with commits, and potentially many *runs* of that job. +#[derive(Debug, Clone)] +pub struct Job { + pub id: u64, + pub remote_id: u64, + pub commit_id: u64, + pub created_time: u64, + pub source: Option<String>, + pub run_preferences: Option<String>, +} + +// a run tracks the intent or obligation to have some runner somewhere run a goodfile and report +// results. a job may have many runs from many different hosts rebuliding history, or reruns of the +// same job on the same hardware to collect more datapoints on the operation. +#[derive(Debug, Clone)] +pub struct Run { + pub id: u64, + pub job_id: u64, + pub artifacts_path: Option<String>, + pub state: RunState, + pub host_id: Option<u64>, + pub create_time: u64, + pub start_time: Option<u64>, + pub complete_time: Option<u64>, + pub build_token: Option<String>, + pub run_timeout: Option<u64>, + pub build_result: Option<u8>, + pub final_text: Option<String>, +} #[derive(Debug, Clone)] pub enum JobResult { @@ -35,6 +123,7 @@ impl TryFrom<u8> for RunState { } } +/* pub(crate) fn row2run(row: &rusqlite::Row) -> Run { let (id, job_id, artifacts_path, state, host_id, build_token, create_time, start_time, complete_time, run_timeout, build_result, final_text) = row.try_into().unwrap(); let state: u8 = state; @@ -53,6 +142,7 @@ pub(crate) fn row2run(row: &rusqlite::Row) -> Run { final_text, } } +*/ // remote_id is the remote from which we were notified. this is necessary so we know which remote // to pull from to actually run the job. diff --git a/ci-lib-native/Cargo.toml b/ci-lib-native/Cargo.toml new file mode 100644 index 0000000..7a4e665 --- /dev/null +++ b/ci-lib-native/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "ci-lib-native" +version = "0.0.1" +authors = [ "iximeow <me@iximeow.net>" ] +license = "0BSD" +edition = "2021" +description = "shared code across the ci project that is applicable for native targets (uses tokio, etc)" + +[lib] + +[dependencies] +ci-lib-core = { path = "../ci-lib-core" } +tokio = { version = "*", features = ["full"] } +futures-util = "*" +axum = "*" +hyper = "*" +serde_json = "*" +serde = { version = "*", features = ["derive"] } +lettre = "*" +reqwest = "*" diff --git a/ci-lib-native/src/dbctx_ext.rs b/ci-lib-native/src/dbctx_ext.rs new file mode 100644 index 0000000..44436fc --- /dev/null +++ b/ci-lib-native/src/dbctx_ext.rs @@ -0,0 +1,62 @@ +use crate::io::ArtifactDescriptor; +use crate::notifier::{RemoteNotifier, NotifierConfig}; +use tokio::fs::{File, OpenOptions}; + +use ci_lib_core::dbctx::DbCtx; + +pub fn notifiers_by_repo(ctx: &DbCtx, repo_id: u64) -> Result<Vec<RemoteNotifier>, String> { + let remotes = ctx.remotes_by_repo(repo_id)?; + + let mut notifiers: Vec<RemoteNotifier> = Vec::new(); + + for remote in remotes.into_iter() { + match remote.remote_api.as_str() { + "github" => { + let mut notifier_path = ctx.config_path.clone(); + notifier_path.push(&remote.notifier_config_path); + + let notifier = RemoteNotifier { + remote_path: remote.remote_path, + notifier: NotifierConfig::github_from_file(¬ifier_path) + .expect("can load notifier config") + }; + notifiers.push(notifier); + }, + "email" => { + let mut notifier_path = ctx.config_path.clone(); + notifier_path.push(&remote.notifier_config_path); + + let notifier = RemoteNotifier { + remote_path: remote.remote_path, + notifier: NotifierConfig::email_from_file(¬ifier_path) + .expect("can load notifier config") + }; + notifiers.push(notifier); + } + other => { + eprintln!("unknown remote api kind: {:?}, remote is {:?}", other, &remote) + } + } + } + + Ok(notifiers) +} + +pub async fn reserve_artifact(ctx: &DbCtx, run_id: u64, name: &str, desc: &str) -> Result<ArtifactDescriptor, String> { + let artifact_id = { + let created_time = ci_lib_core::now_ms(); + let conn = ctx.conn.lock().unwrap(); + conn + .execute( + "insert into artifacts (run_id, name, desc, created_time) values (?1, ?2, ?3, ?4)", + (run_id, name, desc, created_time) + ) + .map_err(|e| { + format!("{:?}", e) + })?; + + conn.last_insert_rowid() as u64 + }; + + ArtifactDescriptor::new(run_id, artifact_id).await +} diff --git a/src/io.rs b/ci-lib-native/src/io.rs index f9f407f..d41349c 100644 --- a/src/io.rs +++ b/ci-lib-native/src/io.rs @@ -8,13 +8,6 @@ use std::pin::Pin; use std::time::{UNIX_EPOCH, SystemTime}; use std::sync::{Arc, Mutex}; -pub fn now_ms() -> u64 { - SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("now is later than epoch") - .as_millis() as u64 -} - #[derive(Clone)] pub struct VecSink { body: Arc<Mutex<Vec<u8>>>, diff --git a/ci-lib-native/src/lib.rs b/ci-lib-native/src/lib.rs new file mode 100644 index 0000000..74cb710 --- /dev/null +++ b/ci-lib-native/src/lib.rs @@ -0,0 +1,3 @@ +pub mod io; +pub mod dbctx_ext; +pub mod notifier; diff --git a/src/notifier.rs b/ci-lib-native/src/notifier.rs index f9d6084..dd4a35c 100644 --- a/src/notifier.rs +++ b/ci-lib-native/src/notifier.rs @@ -1,4 +1,4 @@ -use serde_derive::{Deserialize, Serialize}; +use serde::{Deserialize, Serialize}; use std::sync::Arc; use axum::http::StatusCode; use lettre::transport::smtp::authentication::{Credentials, Mechanism}; @@ -8,7 +8,7 @@ use lettre::transport::smtp::client::{SmtpConnection, TlsParametersBuilder}; use std::time::Duration; use std::path::Path; -use crate::DbCtx; +use ci_lib_core::dbctx::DbCtx; pub struct RemoteNotifier { pub remote_path: String, diff --git a/ci-runner/Cargo.toml b/ci-runner/Cargo.toml new file mode 100644 index 0000000..038ed14 --- /dev/null +++ b/ci-runner/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "ci-runner" +version = "0.0.1" +authors = [ "iximeow <me@iximeow.net>" ] +license = "0BSD" +edition = "2021" + +[[bin]] +name = "ci_runner" +path = "src/main.rs" + +[dependencies] +ci-lib-core = { path = "../ci-lib-core" } +ci-lib-native = { path = "../ci-lib-native" } + +libc = "*" +serde = "*" +serde_derive = "*" +serde_json = "*" +tokio = { version = "*", features = ["full"] } +reqwest = "*" +rlua = "*" +hyper = "*" +tracing = "*" +tracing-subscriber = "*" diff --git a/src/lua/mod.rs b/ci-runner/src/lua/mod.rs index 6c4a281..62ac68b 100644 --- a/src/lua/mod.rs +++ b/ci-runner/src/lua/mod.rs @@ -5,7 +5,7 @@ use rlua::prelude::*; use std::sync::{Arc, Mutex}; use std::path::PathBuf; -pub const DEFAULT_RUST_GOODFILE: &'static [u8] = include_bytes!("../../config/goodfiles/rust.lua"); +pub const DEFAULT_RUST_GOODFILE: &'static [u8] = include_bytes!("../../../config/goodfiles/rust.lua"); pub struct BuildEnv { lua: Lua, @@ -314,7 +314,7 @@ impl BuildEnv { lua_exports::metric(name, value, job_ref) })?; - let now_ms = decl_env.create_function("now_ms", move |_, job_ref, ()| Ok(crate::io::now_ms()))?; + let now_ms = decl_env.create_function("now_ms", move |_, job_ref, ()| Ok(ci_lib_core::now_ms()))?; let artifact = decl_env.create_function("artifact", move |_, job_ref, (path, name): (String, Option<String>)| { lua_exports::artifact(path, name, job_ref) diff --git a/src/ci_runner.rs b/ci-runner/src/main.rs index 9aaf33d..41f5594 100644 --- a/src/ci_runner.rs +++ b/ci-runner/src/main.rs @@ -13,12 +13,12 @@ use std::task::{Context, Poll}; use std::pin::Pin; use std::marker::Unpin; -mod protocol; +use ci_lib_native::io; +use ci_lib_native::io::{ArtifactStream, VecSink}; +use ci_lib_core::protocol::{ClientProto, CommandInfo, TaskInfo, RequestedJob}; + mod lua; -mod io; -use crate::io::{ArtifactStream, VecSink}; -use crate::protocol::{ClientProto, CommandInfo, TaskInfo, RequestedJob}; use crate::lua::CommandOutput; #[derive(Debug)] @@ -36,10 +36,10 @@ struct RunnerClient { current_job: Option<RequestedJob>, } -impl RequestedJob { - pub fn into_running(self, client: RunnerClient) -> RunningJob { - RunningJob { - job: self, +impl RunningJob { + fn from_job(job: RequestedJob, client: RunnerClient) -> Self { + Self { + job, client, current_step: StepTracker::new(), } @@ -220,9 +220,9 @@ impl RunningJob { let mut child_stderr = child.stderr.take().unwrap(); eprintln!("[.] '{}': forwarding stdout", name); - tokio::spawn(async move { crate::io::forward_data(&mut child_stdout, &mut stdout_reporter).await }); + tokio::spawn(async move { io::forward_data(&mut child_stdout, &mut stdout_reporter).await }); eprintln!("[.] '{}': forwarding stderr", name); - tokio::spawn(async move { crate::io::forward_data(&mut child_stderr, &mut stderr_reporter).await }); + tokio::spawn(async move { io::forward_data(&mut child_stderr, &mut stderr_reporter).await }); let res = child.wait().await .map_err(|e| format!("failed to wait? {:?}", e))?; @@ -508,7 +508,7 @@ async fn main() { eprintln!("doing {:?}", job); - let mut job = job.into_running(client); + let mut job = RunningJob::from_job(job, client); job.run().await; std::thread::sleep(Duration::from_millis(10000)); }, @@ -530,7 +530,7 @@ async fn main() { } mod host_info { - use crate::protocol::{CpuInfo, EnvInfo, HostInfo, MemoryInfo}; + use ci_lib_core::protocol::{CpuInfo, EnvInfo, HostInfo, MemoryInfo}; // get host model name, microcode, and how many cores fn collect_cpu_info() -> CpuInfo { diff --git a/ci-web-server/Cargo.toml b/ci-web-server/Cargo.toml new file mode 100644 index 0000000..2771e3a --- /dev/null +++ b/ci-web-server/Cargo.toml @@ -0,0 +1,31 @@ +[package] +name = "ci-web-server" +version = "0.0.1" +authors = [ "iximeow <me@iximeow.net>" ] +license = "0BSD" +edition = "2021" + +[[bin]] +name = "ci_web_server" +path = "src/main.rs" + +[dependencies] +ci-lib-core = { path = "../ci-lib-core" } +ci-lib-native = { path = "../ci-lib-native" } + +tokio = { features = ["full"] } +tokio-stream = "*" +serde_json = "*" +serde = { version = "*", features = ["derive"] } +axum-server = { version = "*", features = ["tls-rustls"] } +axum-extra = { version = "*", features = ["async-read-body"] } +axum = "*" +hex = "*" +tracing-subscriber = "*" +hmac = "*" +http = "*" +http-body = "*" +chrono = "*" +lazy_static = "*" +sha2 = "*" +rusqlite = { version = "*" } diff --git a/src/main.rs b/ci-web-server/src/main.rs index d11df1f..e2be54f 100644 --- a/src/main.rs +++ b/ci-web-server/src/main.rs @@ -6,7 +6,7 @@ use chrono::{Utc, TimeZone}; use lazy_static::lazy_static; use std::sync::RwLock; use std::collections::HashMap; -use serde_derive::{Deserialize, Serialize}; +use serde::{Deserialize, Serialize}; use tokio::spawn; use std::path::PathBuf; use axum_server::tls_rustls::RustlsConfig; @@ -31,15 +31,12 @@ use std::time::{SystemTime, UNIX_EPOCH}; use hmac::{Hmac, Mac}; use sha2::Sha256; -mod io; -mod sql; -mod notifier; -mod dbctx; -mod protocol; +// mod protocol; -use sql::RunState; +use ci_lib_core::sql::RunState; -use dbctx::{DbCtx, Job, Run, ArtifactRecord}; +use ci_lib_core::dbctx::DbCtx; +use ci_lib_core::sql::{ArtifactRecord, Job, Run}; use rusqlite::OptionalExtension; @@ -156,7 +153,7 @@ fn display_run_time(run: &Run) -> String { if run.state == RunState::Started { // this run has been restarted. the completed time is stale. // further, this is a currently active run. - let now_ms = SystemTime::now().duration_since(UNIX_EPOCH).expect("now is after then").as_millis() as u64; + let now_ms = ci_lib_core::now_ms(); let mut duration = duration_as_human_string(now_ms - start_time); duration.push_str(" (ongoing)"); duration @@ -170,7 +167,7 @@ fn display_run_time(run: &Run) -> String { } } else { if run.state != RunState::Invalid { - let now_ms = SystemTime::now().duration_since(UNIX_EPOCH).expect("now is after then").as_millis() as u64; + let now_ms = ci_lib_core::now_ms(); let mut duration = duration_as_human_string(now_ms - start_time); duration.push_str(" (ongoing)"); duration @@ -235,7 +232,7 @@ async fn process_push_event(ctx: Arc<DbCtx>, owner: String, repo: String, event: // * create a new commit ref // * create a new job (state=pending) for the commit ref let commit_id: Option<u64> = ctx.conn.lock().unwrap() - .query_row(sql::COMMIT_TO_ID, [sha.clone()], |row| row.get(0)) + .query_row(ci_lib_core::sql::COMMIT_TO_ID, [sha.clone()], |row| row.get(0)) .optional() .expect("can run query"); @@ -272,7 +269,7 @@ async fn process_push_event(ctx: Arc<DbCtx>, owner: String, repo: String, event: let job_id = ctx.new_job(remote_id, &sha, Some(pusher_email), repo_default_run_pref).unwrap(); let _ = ctx.new_run(job_id, None).unwrap(); - let notifiers = ctx.notifiers_by_repo(repo_id).expect("can get notifiers"); + let notifiers = ci_lib_native::dbctx_ext::notifiers_by_repo(&ctx, repo_id).expect("can get notifiers"); for notifier in notifiers { notifier.tell_pending_job(&ctx, repo_id, &sha, job_id).await.expect("can notify"); @@ -516,7 +513,7 @@ async fn handle_commit_status(Path(path): Path<(String, String, String)>, State( let run = ctx.dbctx.last_run_for_job(job.id).expect("can query").expect("run exists"); - let complete_time = run.complete_time.unwrap_or_else(crate::io::now_ms); + let complete_time = run.complete_time.unwrap_or_else(ci_lib_core::now_ms); let (status_elem, status_desc) = match run.state { RunState::Pending | RunState::Started => { @@ -575,13 +572,13 @@ async fn handle_commit_status(Path(path): Path<(String, String, String)>, State( let mut artifacts_fragment = String::new(); let mut artifacts: Vec<ArtifactRecord> = ctx.dbctx.artifacts_for_run(run.id, None).unwrap() .into_iter() // HACK: filter out artifacts for previous runs of a run. artifacts should be attached to a run, runs should be distinct from run. but i'm sleepy. - .filter(|artifact| artifact.created_time >= run.start_time.unwrap_or_else(crate::io::now_ms)) + .filter(|artifact| artifact.created_time >= run.start_time.unwrap_or_else(ci_lib_core::now_ms)) .collect(); artifacts.sort_by_key(|artifact| artifact.created_time); fn diff_times(run_completed: u64, artifact_completed: Option<u64>) -> u64 { - let artifact_completed = artifact_completed.unwrap_or_else(crate::io::now_ms); + let artifact_completed = artifact_completed.unwrap_or_else(ci_lib_core::now_ms); let run_completed = std::cmp::max(run_completed, artifact_completed); run_completed - artifact_completed } @@ -592,7 +589,7 @@ async fn handle_commit_status(Path(path): Path<(String, String, String)>, State( for artifact in old_artifacts.iter() { let created_time_str = Utc.timestamp_millis_opt(artifact.created_time as i64).unwrap().to_rfc2822(); artifacts_fragment.push_str(&format!("<div><pre style='display:inline;'>{}</pre> step: <pre style='display:inline;'>{}</pre></div>\n", created_time_str, &artifact.name)); - let duration_str = duration_as_human_string(artifact.completed_time.unwrap_or_else(crate::io::now_ms) - artifact.created_time); + let duration_str = duration_as_human_string(artifact.completed_time.unwrap_or_else(ci_lib_core::now_ms) - artifact.created_time); let size_str = (std::fs::metadata(&format!("./artifacts/{}/{}", artifact.run_id, artifact.id)).expect("metadata exists").len() / 1024).to_string(); artifacts_fragment.push_str(&format!("<pre> {}kb in {} </pre>\n", size_str, duration_str)); } @@ -605,7 +602,7 @@ async fn handle_commit_status(Path(path): Path<(String, String, String)>, State( artifacts_fragment.push_str(&std::fs::read_to_string(format!("./artifacts/{}/{}", artifact.run_id, artifact.id)).unwrap()); artifacts_fragment.push_str("</pre>\n"); } else { - let duration_str = duration_as_human_string(artifact.completed_time.unwrap_or_else(crate::io::now_ms) - artifact.created_time); + let duration_str = duration_as_human_string(artifact.completed_time.unwrap_or_else(ci_lib_core::now_ms) - artifact.created_time); let size_str = std::fs::metadata(&format!("./artifacts/{}/{}", artifact.run_id, artifact.id)).map(|md| { (md.len() / 1024).to_string() }).unwrap_or_else(|e| format!("[{}]", e)); @@ -756,7 +753,7 @@ async fn handle_get_artifact(Path(path): Path<(String, String)>, State(ctx): Sta .await .expect("artifact file exists?"); while artifact.completed_time.is_none() { - match crate::io::forward_data(&mut artifact_file, &mut tx_sender).await { + match ci_lib_native::io::forward_data(&mut artifact_file, &mut tx_sender).await { Ok(()) => { // reached the current EOF, wait and then commit an unspeakable sin tokio::time::sleep(std::time::Duration::from_millis(250)).await; |