summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock98
-rw-r--r--Cargo.toml31
-rw-r--r--ci-ctl/Cargo.toml15
-rw-r--r--ci-ctl/src/main.rs (renamed from src/ci_ctl.rs)12
-rw-r--r--ci-driver/Cargo.toml30
-rw-r--r--ci-driver/src/main.rs (renamed from src/ci_driver.rs)26
-rw-r--r--ci-lib-core/Cargo.toml13
-rw-r--r--ci-lib-core/src/dbctx.rs (renamed from src/dbctx.rs)203
-rw-r--r--ci-lib-core/src/lib.rs13
-rw-r--r--ci-lib-core/src/protocol.rs (renamed from src/protocol.rs)0
-rw-r--r--ci-lib-core/src/sql.rs (renamed from src/sql.rs)94
-rw-r--r--ci-lib-native/Cargo.toml20
-rw-r--r--ci-lib-native/src/dbctx_ext.rs62
-rw-r--r--ci-lib-native/src/io.rs (renamed from src/io.rs)7
-rw-r--r--ci-lib-native/src/lib.rs3
-rw-r--r--ci-lib-native/src/notifier.rs (renamed from src/notifier.rs)4
-rw-r--r--ci-runner/Cargo.toml25
-rw-r--r--ci-runner/src/lua/mod.rs (renamed from src/lua/mod.rs)4
-rw-r--r--ci-runner/src/main.rs (renamed from src/ci_runner.rs)24
-rw-r--r--ci-web-server/Cargo.toml31
-rw-r--r--ci-web-server/src/main.rs (renamed from src/main.rs)33
21 files changed, 501 insertions, 247 deletions
diff --git a/Cargo.lock b/Cargo.lock
index fdb63dc..729b789 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -312,6 +312,103 @@ dependencies = [
]
[[package]]
+name = "ci-ctl"
+version = "0.0.1"
+dependencies = [
+ "ci-lib-core",
+ "ci-lib-native",
+ "clap 4.0.29",
+]
+
+[[package]]
+name = "ci-driver"
+version = "0.0.1"
+dependencies = [
+ "axum",
+ "axum-extra",
+ "axum-macros",
+ "axum-server",
+ "base64",
+ "ci-lib-core",
+ "ci-lib-native",
+ "futures-util",
+ "hyper",
+ "lazy_static",
+ "lettre",
+ "reqwest",
+ "serde",
+ "serde_json",
+ "tokio",
+ "tokio-stream",
+ "tracing-subscriber 0.3.16",
+]
+
+[[package]]
+name = "ci-lib-core"
+version = "0.0.1"
+dependencies = [
+ "rusqlite",
+ "serde",
+]
+
+[[package]]
+name = "ci-lib-native"
+version = "0.0.1"
+dependencies = [
+ "axum",
+ "ci-lib-core",
+ "futures-util",
+ "hyper",
+ "lettre",
+ "reqwest",
+ "serde",
+ "serde_json",
+ "tokio",
+]
+
+[[package]]
+name = "ci-runner"
+version = "0.0.1"
+dependencies = [
+ "ci-lib-core",
+ "ci-lib-native",
+ "hyper",
+ "libc",
+ "reqwest",
+ "rlua",
+ "serde",
+ "serde_derive",
+ "serde_json",
+ "tokio",
+ "tracing",
+ "tracing-subscriber 0.3.16",
+]
+
+[[package]]
+name = "ci-web-server"
+version = "0.0.1"
+dependencies = [
+ "axum",
+ "axum-extra",
+ "axum-server",
+ "chrono",
+ "ci-lib-core",
+ "ci-lib-native",
+ "hex",
+ "hmac",
+ "http",
+ "http-body",
+ "lazy_static",
+ "rusqlite",
+ "serde",
+ "serde_json",
+ "sha2",
+ "tokio",
+ "tokio-stream",
+ "tracing-subscriber 0.3.16",
+]
+
+[[package]]
name = "clap"
version = "3.2.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1294,6 +1391,7 @@ version = "0.25.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "29f835d03d717946d28b1d1ed632eb6f0e24a299388ee623d0c23118d3e8a7fa"
dependencies = [
+ "cc",
"pkg-config",
"vcpkg",
]
diff --git a/Cargo.toml b/Cargo.toml
index 7d1a602..b81e4b8 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -5,6 +5,21 @@ authors = [ "iximeow <me@iximeow.net>" ]
license = "0BSD"
edition = "2021"
+[lib]
+
+[workspace]
+members = [
+ "ci-lib-core",
+ "ci-lib-native",
+ "ci-runner",
+ "ci-driver",
+ "ci-web-server",
+ "ci-ctl",
+]
+exclude = [
+ "ci-wasm-frontend"
+]
+
[dependencies]
lazy_static = "*"
axum = { version = "*" }
@@ -37,19 +52,3 @@ reqwest = { version = "*", features = ["rustls-tls-native-roots"] }
clap = { version = "*", features = ["derive"] }
rlua = "*"
chrono = "*"
-
-[[bin]]
-name = "ci_webserver"
-path = "src/main.rs"
-
-[[bin]]
-name = "ci_driver"
-path = "src/ci_driver.rs"
-
-[[bin]]
-name = "ci_ctl"
-path = "src/ci_ctl.rs"
-
-[[bin]]
-name = "ci_runner"
-path = "src/ci_runner.rs"
diff --git a/ci-ctl/Cargo.toml b/ci-ctl/Cargo.toml
new file mode 100644
index 0000000..e2662ad
--- /dev/null
+++ b/ci-ctl/Cargo.toml
@@ -0,0 +1,15 @@
+[package]
+name = "ci-ctl"
+version = "0.0.1"
+authors = [ "iximeow <me@iximeow.net>" ]
+license = "0BSD"
+edition = "2021"
+
+[[bin]]
+name = "ci_ctl"
+path = "src/main.rs"
+
+[dependencies]
+ci-lib-core = { path = "../ci-lib-core" }
+ci-lib-native = { path = "../ci-lib-native" }
+clap = { version = "4", features = ["derive"] }
diff --git a/src/ci_ctl.rs b/ci-ctl/src/main.rs
index f0ffa62..bd2f733 100644
--- a/src/ci_ctl.rs
+++ b/ci-ctl/src/main.rs
@@ -1,13 +1,7 @@
use clap::{Parser, Subcommand};
-mod sql;
-mod dbctx;
-mod notifier;
-mod io;
-mod protocol;
-
-use dbctx::DbCtx;
-use notifier::NotifierConfig;
+use ci_lib_core::dbctx::DbCtx;
+use ci_lib_native::notifier::NotifierConfig;
#[derive(Parser)]
#[command(version, about, long_about = None)]
@@ -87,7 +81,7 @@ fn main() {
JobAction::List => {
let db = DbCtx::new(&config_path, &db_path);
let mut conn = db.conn.lock().unwrap();
- let mut query = conn.prepare(crate::sql::SELECT_ALL_RUNS_WITH_JOB_INFO).unwrap();
+ let mut query = conn.prepare(ci_lib_core::sql::SELECT_ALL_RUNS_WITH_JOB_INFO).unwrap();
let mut jobs = query.query([]).unwrap();
while let Some(row) = jobs.next().unwrap() {
let (job_id, run_id, state, created_time, commit_id, run_preferences): (u64, u64, u64, u64, u64, Option<String>) = row.try_into().unwrap();
diff --git a/ci-driver/Cargo.toml b/ci-driver/Cargo.toml
new file mode 100644
index 0000000..697f929
--- /dev/null
+++ b/ci-driver/Cargo.toml
@@ -0,0 +1,30 @@
+[package]
+name = "ci-driver"
+version = "0.0.1"
+authors = [ "iximeow <me@iximeow.net>" ]
+license = "0BSD"
+edition = "2021"
+
+[[bin]]
+name = "ci_driver"
+path = "src/main.rs"
+
+[dependencies]
+ci-lib-core = { path = "../ci-lib-core" }
+ci-lib-native = { path = "../ci-lib-native" }
+
+axum = { version = "*" }
+axum-extra = { version = "*", features = ["async-read-body"] }
+axum-server = { version = "*", features = ["tls-rustls"] }
+axum-macros = "*"
+serde_json = "*"
+serde = { version = "*", features = ["derive"] }
+base64 = "*"
+tokio = { version = "*", features = ["full"] }
+tokio-stream = "*"
+tracing-subscriber = "*"
+hyper = "*"
+futures-util = "*"
+lazy_static = "*"
+lettre = "*"
+reqwest = "*"
diff --git a/src/ci_driver.rs b/ci-driver/src/main.rs
index f6c1828..f8ff34c 100644
--- a/src/ci_driver.rs
+++ b/ci-driver/src/main.rs
@@ -3,7 +3,6 @@ use std::collections::HashMap;
use std::sync::{Mutex, RwLock};
use lazy_static::lazy_static;
use std::io::Read;
-use serde_derive::{Deserialize, Serialize};
use futures_util::StreamExt;
use std::fmt;
use std::path::{Path, PathBuf};
@@ -23,17 +22,14 @@ use axum::response::IntoResponse;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;
use serde_json::json;
+use serde::{Deserialize, Serialize};
-mod dbctx;
-mod sql;
-mod notifier;
-mod io;
-mod protocol;
-
-use crate::dbctx::{DbCtx, PendingRun, Job, Run};
-use crate::sql::JobResult;
-use crate::sql::RunState;
-use crate::protocol::{ClientProto, CommandInfo, TaskInfo, RequestedJob};
+use ci_lib_core::dbctx::DbCtx;
+use ci_lib_core::sql;
+use ci_lib_core::sql::{PendingRun, Job, Run};
+use ci_lib_core::sql::JobResult;
+use ci_lib_core::sql::RunState;
+use ci_lib_core::protocol::{ClientProto, CommandInfo, TaskInfo, RequestedJob};
lazy_static! {
static ref AUTH_SECRET: RwLock<Option<String>> = RwLock::new(None);
@@ -173,7 +169,7 @@ impl ClientJob {
let job = self.dbctx.job_by_id(self.task.job_id).expect("can query").expect("job exists");
let repo_id = self.dbctx.repo_id_by_remote(job.remote_id).unwrap().expect("remote exists");
- for notifier in self.dbctx.notifiers_by_repo(repo_id).expect("can get notifiers") {
+ for notifier in ci_lib_native::dbctx_ext::notifiers_by_repo(&self.dbctx, repo_id).expect("can get notifiers") {
if let Err(e) = notifier.tell_complete_job(&self.dbctx, repo_id, &self.sha, self.task.id, result.clone()).await {
eprintln!("could not notify {:?}: {:?}", notifier.remote_path, e);
}
@@ -354,7 +350,7 @@ async fn handle_artifact(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClien
}
};
- if token_validity != dbctx::TokenValidity::Valid {
+ if token_validity != sql::TokenValidity::Valid {
eprintln!("bad artifact post: headers: {:?}. token is not valid: {:?}", headers, token_validity);
return (StatusCode::BAD_REQUEST, "").into_response();
}
@@ -382,7 +378,7 @@ async fn handle_artifact(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClien
}
};
- let mut artifact = match ctx.0.reserve_artifact(run, artifact_name, artifact_desc).await {
+ let mut artifact = match ci_lib_native::dbctx_ext::reserve_artifact(&ctx.0, run, artifact_name, artifact_desc).await {
Ok(artifact) => artifact,
Err(err) => {
eprintln!("failure to reserve artifact: {:?}", err);
@@ -603,7 +599,7 @@ async fn old_task_reaper(dbctx: Arc<DbCtx>) {
// period for clients that have accepted a job but for some reason we have not recorded them as
// active (perhaps they are slow to ack somehow).
- let stale_threshold = crate::io::now_ms() - 60_000;
+ let stale_threshold = ci_lib_core::now_ms() - 60_000;
let stale_tasks: Vec<Run> = potentially_stale_tasks.into_iter().filter(|task| {
match (task.state, task.start_time) {
diff --git a/ci-lib-core/Cargo.toml b/ci-lib-core/Cargo.toml
new file mode 100644
index 0000000..5ec649a
--- /dev/null
+++ b/ci-lib-core/Cargo.toml
@@ -0,0 +1,13 @@
+[package]
+name = "ci-lib-core"
+version = "0.0.1"
+authors = [ "iximeow <me@iximeow.net>" ]
+license = "0BSD"
+edition = "2021"
+description = "shared code across the ci project that is applicable for all targets"
+
+[lib]
+
+[dependencies]
+serde = { version = "*", features = ["derive"] }
+rusqlite = { version = "*", features = ["bundled"] }
diff --git a/src/dbctx.rs b/ci-lib-core/src/dbctx.rs
index 7378b2e..7493030 100644
--- a/src/dbctx.rs
+++ b/ci-lib-core/src/dbctx.rs
@@ -1,16 +1,23 @@
use std::sync::Mutex;
-use futures_util::StreamExt;
+// use futures_util::StreamExt;
use rusqlite::{Connection, OptionalExtension};
use std::time::{SystemTime, UNIX_EPOCH};
-use tokio::fs::{File, OpenOptions};
-use tokio::io::{AsyncReadExt, AsyncWriteExt};
+// use tokio::io::{AsyncReadExt, AsyncWriteExt};
use std::path::Path;
use std::path::PathBuf;
+use std::ops::Deref;
-use crate::io::ArtifactDescriptor;
-use crate::notifier::{RemoteNotifier, NotifierConfig};
use crate::sql;
+use crate::sql::ArtifactRecord;
+use crate::sql::Run;
+use crate::sql::TokenValidity;
+use crate::sql::MetricRecord;
+use crate::sql::PendingRun;
+use crate::sql::Job;
+use crate::sql::Remote;
+use crate::sql::Repo;
+
const TOKEN_EXPIRY_MS: u64 = 1000 * 60 * 30;
pub struct DbCtx {
@@ -19,97 +26,6 @@ pub struct DbCtx {
pub conn: Mutex<Connection>,
}
-#[derive(Debug, Clone)]
-pub struct Repo {
- pub id: u64,
- pub name: String,
- pub default_run_preference: Option<String>,
-}
-
-#[derive(Debug)]
-pub struct Remote {
- pub id: u64,
- pub repo_id: u64,
- pub remote_path: String,
- pub remote_api: String,
- pub remote_url: String,
- pub remote_git_url: String,
- pub notifier_config_path: String,
-}
-
-// a job tracks when we became aware of a commit from remote. typically a job will have a 1-1
-// relationship with commits, and potentially many *runs* of that job.
-#[derive(Debug, Clone)]
-pub struct Job {
- pub id: u64,
- pub remote_id: u64,
- pub commit_id: u64,
- pub created_time: u64,
- pub source: Option<String>,
- pub run_preferences: Option<String>,
-}
-
-// a run tracks the intent or obligation to have some runner somewhere run a goodfile and report
-// results. a job may have many runs from many different hosts rebuliding history, or reruns of the
-// same job on the same hardware to collect more datapoints on the operation.
-#[derive(Debug, Clone)]
-pub struct Run {
- pub id: u64,
- pub job_id: u64,
- pub artifacts_path: Option<String>,
- pub state: sql::RunState,
- pub host_id: Option<u64>,
- pub create_time: u64,
- pub start_time: Option<u64>,
- pub complete_time: Option<u64>,
- pub build_token: Option<String>,
- pub run_timeout: Option<u64>,
- pub build_result: Option<u8>,
- pub final_text: Option<String>,
-}
-
-impl Run {
- fn into_pending_run(self) -> PendingRun {
- PendingRun {
- id: self.id,
- job_id: self.job_id,
- create_time: self.create_time,
- }
- }
-}
-
-#[derive(Debug, Clone)]
-pub struct PendingRun {
- pub id: u64,
- pub job_id: u64,
- pub create_time: u64,
-}
-
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
-pub enum TokenValidity {
- Expired,
- Invalid,
- Valid,
-}
-
-#[derive(Debug, Clone, PartialEq, Eq)]
-pub struct MetricRecord {
- pub id: u64,
- pub run_id: u64,
- pub name: String,
- pub value: String
-}
-
-#[derive(Debug, Clone, PartialEq, Eq)]
-pub struct ArtifactRecord {
- pub id: u64,
- pub run_id: u64,
- pub name: String,
- pub desc: String,
- pub created_time: u64,
- pub completed_time: Option<u64>,
-}
-
impl DbCtx {
pub fn new<P: AsRef<Path>>(config_path: P, db_path: P) -> Self {
DbCtx {
@@ -118,6 +34,10 @@ impl DbCtx {
}
}
+ fn conn<'a>(&'a self) -> impl Deref<Target = Connection> + 'a {
+ self.conn.lock().unwrap()
+ }
+
pub fn create_tables(&self) -> Result<(), ()> {
let conn = self.conn.lock().unwrap();
conn.execute(sql::CREATE_ARTIFACTS_TABLE, ()).unwrap();
@@ -176,7 +96,7 @@ impl DbCtx {
conn
.execute(
"update artifacts set completed_time=?1 where id=?2",
- (crate::io::now_ms(), artifact_id)
+ (crate::now_ms(), artifact_id)
)
.map(|_| ())
.map_err(|e| {
@@ -184,28 +104,6 @@ impl DbCtx {
})
}
- pub async fn reserve_artifact(&self, run_id: u64, name: &str, desc: &str) -> Result<ArtifactDescriptor, String> {
- let artifact_id = {
- let created_time = SystemTime::now()
- .duration_since(UNIX_EPOCH)
- .expect("now is before epoch")
- .as_millis() as u64;
- let conn = self.conn.lock().unwrap();
- conn
- .execute(
- "insert into artifacts (run_id, name, desc, created_time) values (?1, ?2, ?3, ?4)",
- (run_id, name, desc, created_time)
- )
- .map_err(|e| {
- format!("{:?}", e)
- })?;
-
- conn.last_insert_rowid() as u64
- };
-
- ArtifactDescriptor::new(run_id, artifact_id).await
- }
-
pub fn lookup_artifact(&self, run_id: u64, artifact_id: u64) -> Result<Option<ArtifactRecord>, String> {
let conn = self.conn.lock().unwrap();
conn
@@ -253,14 +151,11 @@ impl DbCtx {
let timeout: Option<u64> = row.get(3).unwrap();
let timeout = timeout.unwrap_or(TOKEN_EXPIRY_MS);
- let now = SystemTime::now()
- .duration_since(UNIX_EPOCH)
- .expect("now is before epoch")
- .as_millis();
+ let now = crate::now_ms();
let time: Option<u64> = row.get(2).unwrap();
let validity = if let Some(time) = time {
- if now > time as u128 + timeout as u128 {
+ if now > time + timeout {
TokenValidity::Expired
} else {
TokenValidity::Valid
@@ -536,7 +431,7 @@ impl DbCtx {
let mut started = Vec::new();
while let Some(row) = runs.next().unwrap() {
- started.push(crate::sql::row2run(row));
+ started.push(Self::row2run(row));
}
Ok(started)
@@ -569,7 +464,7 @@ impl DbCtx {
// we don't want to rebuild the entire history every time we see a new host by default; if
// you really want to rebuild all of history on a new host, use `ci_ctl` to prepare the
// runs.
- let cutoff = crate::io::now_ms() - 24 * 3600 * 1000;
+ let cutoff = crate::now_ms() - 24 * 3600 * 1000;
let conn = self.conn.lock().unwrap();
@@ -714,7 +609,7 @@ impl DbCtx {
let mut results = Vec::new();
while let Some(row) = runs_results.next().unwrap() {
- results.push(crate::sql::row2run(row));
+ results.push(Self::row2run(row));
}
Ok(results)
@@ -725,48 +620,28 @@ impl DbCtx {
conn
.query_row(sql::LAST_RUN_FOR_JOB, [job_id], |row| {
- Ok(crate::sql::row2run(row))
+ Ok(Self::row2run(row))
})
.optional()
.map_err(|e| e.to_string())
}
- pub fn notifiers_by_repo(&self, repo_id: u64) -> Result<Vec<RemoteNotifier>, String> {
- let remotes = self.remotes_by_repo(repo_id)?;
-
- let mut notifiers: Vec<RemoteNotifier> = Vec::new();
-
- for remote in remotes.into_iter() {
- match remote.remote_api.as_str() {
- "github" => {
- let mut notifier_path = self.config_path.clone();
- notifier_path.push(&remote.notifier_config_path);
-
- let notifier = RemoteNotifier {
- remote_path: remote.remote_path,
- notifier: NotifierConfig::github_from_file(&notifier_path)
- .expect("can load notifier config")
- };
- notifiers.push(notifier);
- },
- "email" => {
- let mut notifier_path = self.config_path.clone();
- notifier_path.push(&remote.notifier_config_path);
-
- let notifier = RemoteNotifier {
- remote_path: remote.remote_path,
- notifier: NotifierConfig::email_from_file(&notifier_path)
- .expect("can load notifier config")
- };
- notifiers.push(notifier);
- }
- other => {
- eprintln!("unknown remote api kind: {:?}, remote is {:?}", other, &remote)
- }
- }
+ pub(crate) fn row2run(row: &rusqlite::Row) -> Run {
+ let (id, job_id, artifacts_path, state, host_id, build_token, create_time, start_time, complete_time, run_timeout, build_result, final_text) = row.try_into().unwrap();
+ let state: u8 = state;
+ Run {
+ id,
+ job_id,
+ artifacts_path,
+ state: state.try_into().unwrap(),
+ host_id,
+ create_time,
+ start_time,
+ complete_time,
+ build_token,
+ run_timeout,
+ build_result,
+ final_text,
}
-
- Ok(notifiers)
}
}
-
diff --git a/ci-lib-core/src/lib.rs b/ci-lib-core/src/lib.rs
new file mode 100644
index 0000000..c20ce8e
--- /dev/null
+++ b/ci-lib-core/src/lib.rs
@@ -0,0 +1,13 @@
+use std::time::{SystemTime, UNIX_EPOCH};
+
+
+pub mod protocol;
+pub mod sql;
+pub mod dbctx;
+
+pub fn now_ms() -> u64 {
+ SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .expect("now is later than epoch")
+ .as_millis() as u64
+}
diff --git a/src/protocol.rs b/ci-lib-core/src/protocol.rs
index c7a9318..c7a9318 100644
--- a/src/protocol.rs
+++ b/ci-lib-core/src/protocol.rs
diff --git a/src/sql.rs b/ci-lib-core/src/sql.rs
index 1071279..2aeb52b 100644
--- a/src/sql.rs
+++ b/ci-lib-core/src/sql.rs
@@ -2,8 +2,96 @@
use std::convert::TryFrom;
-use crate::dbctx::Run;
-use crate::dbctx::Job;
+#[derive(Debug, Clone)]
+pub struct PendingRun {
+ pub id: u64,
+ pub job_id: u64,
+ pub create_time: u64,
+}
+
+impl Run {
+ fn into_pending_run(self) -> PendingRun {
+ PendingRun {
+ id: self.id,
+ job_id: self.job_id,
+ create_time: self.create_time,
+ }
+ }
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum TokenValidity {
+ Expired,
+ Invalid,
+ Valid,
+}
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct MetricRecord {
+ pub id: u64,
+ pub run_id: u64,
+ pub name: String,
+ pub value: String
+}
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct ArtifactRecord {
+ pub id: u64,
+ pub run_id: u64,
+ pub name: String,
+ pub desc: String,
+ pub created_time: u64,
+ pub completed_time: Option<u64>,
+}
+
+#[derive(Debug, Clone)]
+pub struct Repo {
+ pub id: u64,
+ pub name: String,
+ pub default_run_preference: Option<String>,
+}
+
+#[derive(Debug)]
+pub struct Remote {
+ pub id: u64,
+ pub repo_id: u64,
+ pub remote_path: String,
+ pub remote_api: String,
+ pub remote_url: String,
+ pub remote_git_url: String,
+ pub notifier_config_path: String,
+}
+
+// a job tracks when we became aware of a commit from remote. typically a job will have a 1-1
+// relationship with commits, and potentially many *runs* of that job.
+#[derive(Debug, Clone)]
+pub struct Job {
+ pub id: u64,
+ pub remote_id: u64,
+ pub commit_id: u64,
+ pub created_time: u64,
+ pub source: Option<String>,
+ pub run_preferences: Option<String>,
+}
+
+// a run tracks the intent or obligation to have some runner somewhere run a goodfile and report
+// results. a job may have many runs from many different hosts rebuliding history, or reruns of the
+// same job on the same hardware to collect more datapoints on the operation.
+#[derive(Debug, Clone)]
+pub struct Run {
+ pub id: u64,
+ pub job_id: u64,
+ pub artifacts_path: Option<String>,
+ pub state: RunState,
+ pub host_id: Option<u64>,
+ pub create_time: u64,
+ pub start_time: Option<u64>,
+ pub complete_time: Option<u64>,
+ pub build_token: Option<String>,
+ pub run_timeout: Option<u64>,
+ pub build_result: Option<u8>,
+ pub final_text: Option<String>,
+}
#[derive(Debug, Clone)]
pub enum JobResult {
@@ -35,6 +123,7 @@ impl TryFrom<u8> for RunState {
}
}
+/*
pub(crate) fn row2run(row: &rusqlite::Row) -> Run {
let (id, job_id, artifacts_path, state, host_id, build_token, create_time, start_time, complete_time, run_timeout, build_result, final_text) = row.try_into().unwrap();
let state: u8 = state;
@@ -53,6 +142,7 @@ pub(crate) fn row2run(row: &rusqlite::Row) -> Run {
final_text,
}
}
+*/
// remote_id is the remote from which we were notified. this is necessary so we know which remote
// to pull from to actually run the job.
diff --git a/ci-lib-native/Cargo.toml b/ci-lib-native/Cargo.toml
new file mode 100644
index 0000000..7a4e665
--- /dev/null
+++ b/ci-lib-native/Cargo.toml
@@ -0,0 +1,20 @@
+[package]
+name = "ci-lib-native"
+version = "0.0.1"
+authors = [ "iximeow <me@iximeow.net>" ]
+license = "0BSD"
+edition = "2021"
+description = "shared code across the ci project that is applicable for native targets (uses tokio, etc)"
+
+[lib]
+
+[dependencies]
+ci-lib-core = { path = "../ci-lib-core" }
+tokio = { version = "*", features = ["full"] }
+futures-util = "*"
+axum = "*"
+hyper = "*"
+serde_json = "*"
+serde = { version = "*", features = ["derive"] }
+lettre = "*"
+reqwest = "*"
diff --git a/ci-lib-native/src/dbctx_ext.rs b/ci-lib-native/src/dbctx_ext.rs
new file mode 100644
index 0000000..44436fc
--- /dev/null
+++ b/ci-lib-native/src/dbctx_ext.rs
@@ -0,0 +1,62 @@
+use crate::io::ArtifactDescriptor;
+use crate::notifier::{RemoteNotifier, NotifierConfig};
+use tokio::fs::{File, OpenOptions};
+
+use ci_lib_core::dbctx::DbCtx;
+
+pub fn notifiers_by_repo(ctx: &DbCtx, repo_id: u64) -> Result<Vec<RemoteNotifier>, String> {
+ let remotes = ctx.remotes_by_repo(repo_id)?;
+
+ let mut notifiers: Vec<RemoteNotifier> = Vec::new();
+
+ for remote in remotes.into_iter() {
+ match remote.remote_api.as_str() {
+ "github" => {
+ let mut notifier_path = ctx.config_path.clone();
+ notifier_path.push(&remote.notifier_config_path);
+
+ let notifier = RemoteNotifier {
+ remote_path: remote.remote_path,
+ notifier: NotifierConfig::github_from_file(&notifier_path)
+ .expect("can load notifier config")
+ };
+ notifiers.push(notifier);
+ },
+ "email" => {
+ let mut notifier_path = ctx.config_path.clone();
+ notifier_path.push(&remote.notifier_config_path);
+
+ let notifier = RemoteNotifier {
+ remote_path: remote.remote_path,
+ notifier: NotifierConfig::email_from_file(&notifier_path)
+ .expect("can load notifier config")
+ };
+ notifiers.push(notifier);
+ }
+ other => {
+ eprintln!("unknown remote api kind: {:?}, remote is {:?}", other, &remote)
+ }
+ }
+ }
+
+ Ok(notifiers)
+}
+
+pub async fn reserve_artifact(ctx: &DbCtx, run_id: u64, name: &str, desc: &str) -> Result<ArtifactDescriptor, String> {
+ let artifact_id = {
+ let created_time = ci_lib_core::now_ms();
+ let conn = ctx.conn.lock().unwrap();
+ conn
+ .execute(
+ "insert into artifacts (run_id, name, desc, created_time) values (?1, ?2, ?3, ?4)",
+ (run_id, name, desc, created_time)
+ )
+ .map_err(|e| {
+ format!("{:?}", e)
+ })?;
+
+ conn.last_insert_rowid() as u64
+ };
+
+ ArtifactDescriptor::new(run_id, artifact_id).await
+}
diff --git a/src/io.rs b/ci-lib-native/src/io.rs
index f9f407f..d41349c 100644
--- a/src/io.rs
+++ b/ci-lib-native/src/io.rs
@@ -8,13 +8,6 @@ use std::pin::Pin;
use std::time::{UNIX_EPOCH, SystemTime};
use std::sync::{Arc, Mutex};
-pub fn now_ms() -> u64 {
- SystemTime::now()
- .duration_since(UNIX_EPOCH)
- .expect("now is later than epoch")
- .as_millis() as u64
-}
-
#[derive(Clone)]
pub struct VecSink {
body: Arc<Mutex<Vec<u8>>>,
diff --git a/ci-lib-native/src/lib.rs b/ci-lib-native/src/lib.rs
new file mode 100644
index 0000000..74cb710
--- /dev/null
+++ b/ci-lib-native/src/lib.rs
@@ -0,0 +1,3 @@
+pub mod io;
+pub mod dbctx_ext;
+pub mod notifier;
diff --git a/src/notifier.rs b/ci-lib-native/src/notifier.rs
index f9d6084..dd4a35c 100644
--- a/src/notifier.rs
+++ b/ci-lib-native/src/notifier.rs
@@ -1,4 +1,4 @@
-use serde_derive::{Deserialize, Serialize};
+use serde::{Deserialize, Serialize};
use std::sync::Arc;
use axum::http::StatusCode;
use lettre::transport::smtp::authentication::{Credentials, Mechanism};
@@ -8,7 +8,7 @@ use lettre::transport::smtp::client::{SmtpConnection, TlsParametersBuilder};
use std::time::Duration;
use std::path::Path;
-use crate::DbCtx;
+use ci_lib_core::dbctx::DbCtx;
pub struct RemoteNotifier {
pub remote_path: String,
diff --git a/ci-runner/Cargo.toml b/ci-runner/Cargo.toml
new file mode 100644
index 0000000..038ed14
--- /dev/null
+++ b/ci-runner/Cargo.toml
@@ -0,0 +1,25 @@
+[package]
+name = "ci-runner"
+version = "0.0.1"
+authors = [ "iximeow <me@iximeow.net>" ]
+license = "0BSD"
+edition = "2021"
+
+[[bin]]
+name = "ci_runner"
+path = "src/main.rs"
+
+[dependencies]
+ci-lib-core = { path = "../ci-lib-core" }
+ci-lib-native = { path = "../ci-lib-native" }
+
+libc = "*"
+serde = "*"
+serde_derive = "*"
+serde_json = "*"
+tokio = { version = "*", features = ["full"] }
+reqwest = "*"
+rlua = "*"
+hyper = "*"
+tracing = "*"
+tracing-subscriber = "*"
diff --git a/src/lua/mod.rs b/ci-runner/src/lua/mod.rs
index 6c4a281..62ac68b 100644
--- a/src/lua/mod.rs
+++ b/ci-runner/src/lua/mod.rs
@@ -5,7 +5,7 @@ use rlua::prelude::*;
use std::sync::{Arc, Mutex};
use std::path::PathBuf;
-pub const DEFAULT_RUST_GOODFILE: &'static [u8] = include_bytes!("../../config/goodfiles/rust.lua");
+pub const DEFAULT_RUST_GOODFILE: &'static [u8] = include_bytes!("../../../config/goodfiles/rust.lua");
pub struct BuildEnv {
lua: Lua,
@@ -314,7 +314,7 @@ impl BuildEnv {
lua_exports::metric(name, value, job_ref)
})?;
- let now_ms = decl_env.create_function("now_ms", move |_, job_ref, ()| Ok(crate::io::now_ms()))?;
+ let now_ms = decl_env.create_function("now_ms", move |_, job_ref, ()| Ok(ci_lib_core::now_ms()))?;
let artifact = decl_env.create_function("artifact", move |_, job_ref, (path, name): (String, Option<String>)| {
lua_exports::artifact(path, name, job_ref)
diff --git a/src/ci_runner.rs b/ci-runner/src/main.rs
index 9aaf33d..41f5594 100644
--- a/src/ci_runner.rs
+++ b/ci-runner/src/main.rs
@@ -13,12 +13,12 @@ use std::task::{Context, Poll};
use std::pin::Pin;
use std::marker::Unpin;
-mod protocol;
+use ci_lib_native::io;
+use ci_lib_native::io::{ArtifactStream, VecSink};
+use ci_lib_core::protocol::{ClientProto, CommandInfo, TaskInfo, RequestedJob};
+
mod lua;
-mod io;
-use crate::io::{ArtifactStream, VecSink};
-use crate::protocol::{ClientProto, CommandInfo, TaskInfo, RequestedJob};
use crate::lua::CommandOutput;
#[derive(Debug)]
@@ -36,10 +36,10 @@ struct RunnerClient {
current_job: Option<RequestedJob>,
}
-impl RequestedJob {
- pub fn into_running(self, client: RunnerClient) -> RunningJob {
- RunningJob {
- job: self,
+impl RunningJob {
+ fn from_job(job: RequestedJob, client: RunnerClient) -> Self {
+ Self {
+ job,
client,
current_step: StepTracker::new(),
}
@@ -220,9 +220,9 @@ impl RunningJob {
let mut child_stderr = child.stderr.take().unwrap();
eprintln!("[.] '{}': forwarding stdout", name);
- tokio::spawn(async move { crate::io::forward_data(&mut child_stdout, &mut stdout_reporter).await });
+ tokio::spawn(async move { io::forward_data(&mut child_stdout, &mut stdout_reporter).await });
eprintln!("[.] '{}': forwarding stderr", name);
- tokio::spawn(async move { crate::io::forward_data(&mut child_stderr, &mut stderr_reporter).await });
+ tokio::spawn(async move { io::forward_data(&mut child_stderr, &mut stderr_reporter).await });
let res = child.wait().await
.map_err(|e| format!("failed to wait? {:?}", e))?;
@@ -508,7 +508,7 @@ async fn main() {
eprintln!("doing {:?}", job);
- let mut job = job.into_running(client);
+ let mut job = RunningJob::from_job(job, client);
job.run().await;
std::thread::sleep(Duration::from_millis(10000));
},
@@ -530,7 +530,7 @@ async fn main() {
}
mod host_info {
- use crate::protocol::{CpuInfo, EnvInfo, HostInfo, MemoryInfo};
+ use ci_lib_core::protocol::{CpuInfo, EnvInfo, HostInfo, MemoryInfo};
// get host model name, microcode, and how many cores
fn collect_cpu_info() -> CpuInfo {
diff --git a/ci-web-server/Cargo.toml b/ci-web-server/Cargo.toml
new file mode 100644
index 0000000..2771e3a
--- /dev/null
+++ b/ci-web-server/Cargo.toml
@@ -0,0 +1,31 @@
+[package]
+name = "ci-web-server"
+version = "0.0.1"
+authors = [ "iximeow <me@iximeow.net>" ]
+license = "0BSD"
+edition = "2021"
+
+[[bin]]
+name = "ci_web_server"
+path = "src/main.rs"
+
+[dependencies]
+ci-lib-core = { path = "../ci-lib-core" }
+ci-lib-native = { path = "../ci-lib-native" }
+
+tokio = { features = ["full"] }
+tokio-stream = "*"
+serde_json = "*"
+serde = { version = "*", features = ["derive"] }
+axum-server = { version = "*", features = ["tls-rustls"] }
+axum-extra = { version = "*", features = ["async-read-body"] }
+axum = "*"
+hex = "*"
+tracing-subscriber = "*"
+hmac = "*"
+http = "*"
+http-body = "*"
+chrono = "*"
+lazy_static = "*"
+sha2 = "*"
+rusqlite = { version = "*" }
diff --git a/src/main.rs b/ci-web-server/src/main.rs
index d11df1f..e2be54f 100644
--- a/src/main.rs
+++ b/ci-web-server/src/main.rs
@@ -6,7 +6,7 @@ use chrono::{Utc, TimeZone};
use lazy_static::lazy_static;
use std::sync::RwLock;
use std::collections::HashMap;
-use serde_derive::{Deserialize, Serialize};
+use serde::{Deserialize, Serialize};
use tokio::spawn;
use std::path::PathBuf;
use axum_server::tls_rustls::RustlsConfig;
@@ -31,15 +31,12 @@ use std::time::{SystemTime, UNIX_EPOCH};
use hmac::{Hmac, Mac};
use sha2::Sha256;
-mod io;
-mod sql;
-mod notifier;
-mod dbctx;
-mod protocol;
+// mod protocol;
-use sql::RunState;
+use ci_lib_core::sql::RunState;
-use dbctx::{DbCtx, Job, Run, ArtifactRecord};
+use ci_lib_core::dbctx::DbCtx;
+use ci_lib_core::sql::{ArtifactRecord, Job, Run};
use rusqlite::OptionalExtension;
@@ -156,7 +153,7 @@ fn display_run_time(run: &Run) -> String {
if run.state == RunState::Started {
// this run has been restarted. the completed time is stale.
// further, this is a currently active run.
- let now_ms = SystemTime::now().duration_since(UNIX_EPOCH).expect("now is after then").as_millis() as u64;
+ let now_ms = ci_lib_core::now_ms();
let mut duration = duration_as_human_string(now_ms - start_time);
duration.push_str(" (ongoing)");
duration
@@ -170,7 +167,7 @@ fn display_run_time(run: &Run) -> String {
}
} else {
if run.state != RunState::Invalid {
- let now_ms = SystemTime::now().duration_since(UNIX_EPOCH).expect("now is after then").as_millis() as u64;
+ let now_ms = ci_lib_core::now_ms();
let mut duration = duration_as_human_string(now_ms - start_time);
duration.push_str(" (ongoing)");
duration
@@ -235,7 +232,7 @@ async fn process_push_event(ctx: Arc<DbCtx>, owner: String, repo: String, event:
// * create a new commit ref
// * create a new job (state=pending) for the commit ref
let commit_id: Option<u64> = ctx.conn.lock().unwrap()
- .query_row(sql::COMMIT_TO_ID, [sha.clone()], |row| row.get(0))
+ .query_row(ci_lib_core::sql::COMMIT_TO_ID, [sha.clone()], |row| row.get(0))
.optional()
.expect("can run query");
@@ -272,7 +269,7 @@ async fn process_push_event(ctx: Arc<DbCtx>, owner: String, repo: String, event:
let job_id = ctx.new_job(remote_id, &sha, Some(pusher_email), repo_default_run_pref).unwrap();
let _ = ctx.new_run(job_id, None).unwrap();
- let notifiers = ctx.notifiers_by_repo(repo_id).expect("can get notifiers");
+ let notifiers = ci_lib_native::dbctx_ext::notifiers_by_repo(&ctx, repo_id).expect("can get notifiers");
for notifier in notifiers {
notifier.tell_pending_job(&ctx, repo_id, &sha, job_id).await.expect("can notify");
@@ -516,7 +513,7 @@ async fn handle_commit_status(Path(path): Path<(String, String, String)>, State(
let run = ctx.dbctx.last_run_for_job(job.id).expect("can query").expect("run exists");
- let complete_time = run.complete_time.unwrap_or_else(crate::io::now_ms);
+ let complete_time = run.complete_time.unwrap_or_else(ci_lib_core::now_ms);
let (status_elem, status_desc) = match run.state {
RunState::Pending | RunState::Started => {
@@ -575,13 +572,13 @@ async fn handle_commit_status(Path(path): Path<(String, String, String)>, State(
let mut artifacts_fragment = String::new();
let mut artifacts: Vec<ArtifactRecord> = ctx.dbctx.artifacts_for_run(run.id, None).unwrap()
.into_iter() // HACK: filter out artifacts for previous runs of a run. artifacts should be attached to a run, runs should be distinct from run. but i'm sleepy.
- .filter(|artifact| artifact.created_time >= run.start_time.unwrap_or_else(crate::io::now_ms))
+ .filter(|artifact| artifact.created_time >= run.start_time.unwrap_or_else(ci_lib_core::now_ms))
.collect();
artifacts.sort_by_key(|artifact| artifact.created_time);
fn diff_times(run_completed: u64, artifact_completed: Option<u64>) -> u64 {
- let artifact_completed = artifact_completed.unwrap_or_else(crate::io::now_ms);
+ let artifact_completed = artifact_completed.unwrap_or_else(ci_lib_core::now_ms);
let run_completed = std::cmp::max(run_completed, artifact_completed);
run_completed - artifact_completed
}
@@ -592,7 +589,7 @@ async fn handle_commit_status(Path(path): Path<(String, String, String)>, State(
for artifact in old_artifacts.iter() {
let created_time_str = Utc.timestamp_millis_opt(artifact.created_time as i64).unwrap().to_rfc2822();
artifacts_fragment.push_str(&format!("<div><pre style='display:inline;'>{}</pre> step: <pre style='display:inline;'>{}</pre></div>\n", created_time_str, &artifact.name));
- let duration_str = duration_as_human_string(artifact.completed_time.unwrap_or_else(crate::io::now_ms) - artifact.created_time);
+ let duration_str = duration_as_human_string(artifact.completed_time.unwrap_or_else(ci_lib_core::now_ms) - artifact.created_time);
let size_str = (std::fs::metadata(&format!("./artifacts/{}/{}", artifact.run_id, artifact.id)).expect("metadata exists").len() / 1024).to_string();
artifacts_fragment.push_str(&format!("<pre> {}kb in {} </pre>\n", size_str, duration_str));
}
@@ -605,7 +602,7 @@ async fn handle_commit_status(Path(path): Path<(String, String, String)>, State(
artifacts_fragment.push_str(&std::fs::read_to_string(format!("./artifacts/{}/{}", artifact.run_id, artifact.id)).unwrap());
artifacts_fragment.push_str("</pre>\n");
} else {
- let duration_str = duration_as_human_string(artifact.completed_time.unwrap_or_else(crate::io::now_ms) - artifact.created_time);
+ let duration_str = duration_as_human_string(artifact.completed_time.unwrap_or_else(ci_lib_core::now_ms) - artifact.created_time);
let size_str = std::fs::metadata(&format!("./artifacts/{}/{}", artifact.run_id, artifact.id)).map(|md| {
(md.len() / 1024).to_string()
}).unwrap_or_else(|e| format!("[{}]", e));
@@ -756,7 +753,7 @@ async fn handle_get_artifact(Path(path): Path<(String, String)>, State(ctx): Sta
.await
.expect("artifact file exists?");
while artifact.completed_time.is_none() {
- match crate::io::forward_data(&mut artifact_file, &mut tx_sender).await {
+ match ci_lib_native::io::forward_data(&mut artifact_file, &mut tx_sender).await {
Ok(()) => {
// reached the current EOF, wait and then commit an unspeakable sin
tokio::time::sleep(std::time::Duration::from_millis(250)).await;