summaryrefslogtreecommitdiff
path: root/ci-lib-core
diff options
context:
space:
mode:
authoriximeow <me@iximeow.net>2023-07-13 00:51:51 -0700
committeriximeow <me@iximeow.net>2023-07-13 00:51:51 -0700
commit9e6906c00c49186189d211dc96e132d85e7ff641 (patch)
tree05c20145ebc306313e3a12dc73c34b5dea40bbdc /ci-lib-core
parent543150f1666690351d4698421cc6ceb115c1e251 (diff)
reorganize the whole thing into crates/ packages
Diffstat (limited to 'ci-lib-core')
-rw-r--r--ci-lib-core/Cargo.toml13
-rw-r--r--ci-lib-core/src/dbctx.rs647
-rw-r--r--ci-lib-core/src/lib.rs13
-rw-r--r--ci-lib-core/src/protocol.rs114
-rw-r--r--ci-lib-core/src/sql.rs319
5 files changed, 1106 insertions, 0 deletions
diff --git a/ci-lib-core/Cargo.toml b/ci-lib-core/Cargo.toml
new file mode 100644
index 0000000..5ec649a
--- /dev/null
+++ b/ci-lib-core/Cargo.toml
@@ -0,0 +1,13 @@
+[package]
+name = "ci-lib-core"
+version = "0.0.1"
+authors = [ "iximeow <me@iximeow.net>" ]
+license = "0BSD"
+edition = "2021"
+description = "shared code across the ci project that is applicable for all targets"
+
+[lib]
+
+[dependencies]
+serde = { version = "*", features = ["derive"] }
+rusqlite = { version = "*", features = ["bundled"] }
diff --git a/ci-lib-core/src/dbctx.rs b/ci-lib-core/src/dbctx.rs
new file mode 100644
index 0000000..7493030
--- /dev/null
+++ b/ci-lib-core/src/dbctx.rs
@@ -0,0 +1,647 @@
+use std::sync::Mutex;
+// use futures_util::StreamExt;
+use rusqlite::{Connection, OptionalExtension};
+use std::time::{SystemTime, UNIX_EPOCH};
+// use tokio::io::{AsyncReadExt, AsyncWriteExt};
+use std::path::Path;
+use std::path::PathBuf;
+use std::ops::Deref;
+
+use crate::sql;
+
+use crate::sql::ArtifactRecord;
+use crate::sql::Run;
+use crate::sql::TokenValidity;
+use crate::sql::MetricRecord;
+use crate::sql::PendingRun;
+use crate::sql::Job;
+use crate::sql::Remote;
+use crate::sql::Repo;
+
+const TOKEN_EXPIRY_MS: u64 = 1000 * 60 * 30;
+
+pub struct DbCtx {
+ pub config_path: PathBuf,
+ // don't love this but.. for now...
+ pub conn: Mutex<Connection>,
+}
+
+impl DbCtx {
+ pub fn new<P: AsRef<Path>>(config_path: P, db_path: P) -> Self {
+ DbCtx {
+ config_path: config_path.as_ref().to_owned(),
+ conn: Mutex::new(Connection::open(db_path).unwrap())
+ }
+ }
+
+ fn conn<'a>(&'a self) -> impl Deref<Target = Connection> + 'a {
+ self.conn.lock().unwrap()
+ }
+
+ pub fn create_tables(&self) -> Result<(), ()> {
+ let conn = self.conn.lock().unwrap();
+ conn.execute(sql::CREATE_ARTIFACTS_TABLE, ()).unwrap();
+ conn.execute(sql::CREATE_JOBS_TABLE, ()).unwrap();
+ conn.execute(sql::CREATE_METRICS_TABLE, ()).unwrap();
+ conn.execute(sql::CREATE_COMMITS_TABLE, ()).unwrap();
+ conn.execute(sql::CREATE_REPOS_TABLE, ()).unwrap();
+ conn.execute(sql::CREATE_REPO_NAME_INDEX, ()).unwrap();
+ conn.execute(sql::CREATE_REMOTES_TABLE, ()).unwrap();
+ conn.execute(sql::CREATE_REMOTES_INDEX, ()).unwrap();
+ conn.execute(sql::CREATE_RUNS_TABLE, ()).unwrap();
+ conn.execute(sql::CREATE_HOSTS_TABLE, ()).unwrap();
+
+ Ok(())
+ }
+
+ pub fn insert_metric(&self, run_id: u64, name: &str, value: &str) -> Result<(), String> {
+ let conn = self.conn.lock().unwrap();
+ conn
+ .execute(
+ "insert into metrics (run_id, name, value) values (?1, ?2, ?3) on conflict (run_id, name) do update set value=excluded.value",
+ (run_id, name, value)
+ )
+ .expect("can upsert");
+ Ok(())
+ }
+
+ pub fn new_commit(&self, sha: &str) -> Result<u64, String> {
+ let conn = self.conn.lock().unwrap();
+ conn
+ .execute(
+ "insert into commits (sha) values (?1)",
+ [sha.clone()]
+ )
+ .expect("can insert");
+
+ Ok(conn.last_insert_rowid() as u64)
+ }
+
+ pub fn new_repo(&self, name: &str) -> Result<u64, String> {
+ let conn = self.conn.lock().unwrap();
+ conn
+ .execute(
+ "insert into repos (repo_name) values (?1)",
+ [name.clone()]
+ )
+ .map_err(|e| {
+ format!("{:?}", e)
+ })?;
+
+ Ok(conn.last_insert_rowid() as u64)
+ }
+
+ pub async fn finalize_artifact(&self, artifact_id: u64) -> Result<(), String> {
+ let conn = self.conn.lock().unwrap();
+ conn
+ .execute(
+ "update artifacts set completed_time=?1 where id=?2",
+ (crate::now_ms(), artifact_id)
+ )
+ .map(|_| ())
+ .map_err(|e| {
+ format!("{:?}", e)
+ })
+ }
+
+ pub fn lookup_artifact(&self, run_id: u64, artifact_id: u64) -> Result<Option<ArtifactRecord>, String> {
+ let conn = self.conn.lock().unwrap();
+ conn
+ .query_row(sql::ARTIFACT_BY_ID, [artifact_id, run_id], |row| {
+ let (id, run_id, name, desc, created_time, completed_time) = row.try_into().unwrap();
+
+ Ok(ArtifactRecord {
+ id, run_id, name, desc, created_time, completed_time
+ })
+ })
+ .optional()
+ .map_err(|e| e.to_string())
+ }
+
+ pub fn commit_sha(&self, commit_id: u64) -> Result<String, String> {
+ self.conn.lock()
+ .unwrap()
+ .query_row(
+ "select sha from commits where id=?1",
+ [commit_id],
+ |row| { row.get(0) }
+ )
+ .map_err(|e| e.to_string())
+ }
+
+ pub fn job_for_commit(&self, sha: &str) -> Result<Option<u64>, String> {
+ self.conn.lock()
+ .unwrap()
+ .query_row(
+ "select id from commits where sha=?1",
+ [sha],
+ |row| { row.get(0) }
+ )
+ .optional()
+ .map_err(|e| e.to_string())
+ }
+
+ pub fn run_for_token(&self, token: &str) -> Result<Option<(u64, Option<String>, TokenValidity)>, String> {
+ self.conn.lock()
+ .unwrap()
+ .query_row(
+ "select id, artifacts_path, started_time, run_timeout from runs where build_token=?1",
+ [token],
+ |row| {
+ let timeout: Option<u64> = row.get(3).unwrap();
+ let timeout = timeout.unwrap_or(TOKEN_EXPIRY_MS);
+
+ let now = crate::now_ms();
+
+ let time: Option<u64> = row.get(2).unwrap();
+ let validity = if let Some(time) = time {
+ if now > time + timeout {
+ TokenValidity::Expired
+ } else {
+ TokenValidity::Valid
+ }
+ } else {
+ TokenValidity::Invalid
+ };
+ Ok((row.get(0).unwrap(), row.get(1).unwrap(), validity))
+ }
+ )
+ .optional()
+ .map_err(|e| e.to_string())
+ }
+
+ pub fn job_by_id(&self, id: u64) -> Result<Option<Job>, String> {
+ self.conn.lock()
+ .unwrap()
+ .query_row(crate::sql::JOB_BY_ID, [id], |row| {
+ let (id, source, created_time, remote_id, commit_id, run_preferences) = row.try_into().unwrap();
+
+ Ok(Job {
+ id, source, created_time, remote_id, commit_id, run_preferences
+ })
+ })
+ .optional()
+ .map_err(|e| e.to_string())
+ }
+
+ pub fn remote_by_path_and_api(&self, api: &str, path: &str) -> Result<Option<Remote>, String> {
+ self.conn.lock()
+ .unwrap()
+ .query_row("select id, repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path from remotes where remote_api=?1 and remote_path=?2", [api, path], |row| {
+ let (id, repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path) = row.try_into().unwrap();
+
+ Ok(Remote {
+ id, repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path
+ })
+ })
+ .optional()
+ .map_err(|e| e.to_string())
+ }
+
+ pub fn remote_by_id(&self, id: u64) -> Result<Option<Remote>, String> {
+ self.conn.lock()
+ .unwrap()
+ .query_row("select id, repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path from remotes where id=?1", [id], |row| {
+ let (id, repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path) = row.try_into().unwrap();
+
+ Ok(Remote {
+ id, repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path
+ })
+ })
+ .optional()
+ .map_err(|e| e.to_string())
+ }
+
+ pub fn repo_id_by_remote(&self, remote_id: u64) -> Result<Option<u64>, String> {
+ self.conn.lock()
+ .unwrap()
+ .query_row("select repo_id from remotes where id=?1", [remote_id], |row| row.get(0))
+ .optional()
+ .map_err(|e| e.to_string())
+ }
+
+ pub fn repo_id_by_name(&self, repo_name: &str) -> Result<Option<u64>, String> {
+ self.conn.lock()
+ .unwrap()
+ .query_row("select id from repos where repo_name=?1", [repo_name], |row| row.get(0))
+ .optional()
+ .map_err(|e| e.to_string())
+ }
+
+ pub fn new_remote(&self, repo_id: u64, remote: &str, remote_kind: &str, config_path: &str) -> Result<u64, String> {
+ let (remote_path, remote_api, remote_url, remote_git_url) = match remote_kind {
+ "github" => {
+ (remote.to_owned(), remote_kind.to_owned(), format!("https://www.github.com/{}", remote), format!("https://www.github.com/{}.git", remote))
+ },
+ "github-email" => {
+ (remote.to_owned(), "email".to_owned(), format!("https://www.github.com/{}", remote), format!("http://www.github.com/{}.git", remote))
+ },
+ other => {
+ panic!("unsupported remote kind: {}", other);
+ }
+ };
+
+ let conn = self.conn.lock().unwrap();
+ conn
+ .execute(
+ "insert into remotes (repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path) values (?1, ?2, ?3, ?4, ?5, ?6);",
+ (repo_id, remote_path, remote_api, remote_url, remote_git_url, config_path)
+ )
+ .expect("can insert");
+
+ Ok(conn.last_insert_rowid() as u64)
+ }
+
+ pub fn new_job(&self, remote_id: u64, sha: &str, pusher: Option<&str>, repo_default_run_pref: Option<String>) -> Result<u64, String> {
+ // TODO: potential race: if two remotes learn about a commit at the same time and we decide
+ // to create two jobs at the same time, this might return an incorrect id if the insert
+ // didn't actually insert a new row.
+ let commit_id = self.new_commit(sha).expect("can create commit record");
+
+ let created_time = SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .expect("now is before epoch")
+ .as_millis() as u64;
+
+ let conn = self.conn.lock().unwrap();
+
+ let rows_modified = conn.execute(
+ "insert into jobs (remote_id, commit_id, created_time, source, run_preferences) values (?1, ?2, ?3, ?4, ?5);",
+ (remote_id, commit_id, created_time, pusher, repo_default_run_pref)
+ ).unwrap();
+
+ assert_eq!(1, rows_modified);
+
+ let job_id = conn.last_insert_rowid() as u64;
+
+ Ok(job_id)
+ }
+
+ pub fn new_run(&self, job_id: u64, host_preference: Option<u32>) -> Result<PendingRun, String> {
+ let created_time = SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .expect("now is before epoch")
+ .as_millis() as u64;
+
+ let conn = self.conn.lock().unwrap();
+
+ let rows_modified = conn.execute(
+ "insert into runs (job_id, state, created_time, host_preference) values (?1, ?2, ?3, ?4);",
+ (job_id, crate::sql::RunState::Pending as u64, created_time, host_preference)
+ ).unwrap();
+
+ assert_eq!(1, rows_modified);
+
+ let run_id = conn.last_insert_rowid() as u64;
+
+ Ok(PendingRun {
+ id: run_id,
+ job_id,
+ create_time: created_time,
+ })
+ }
+
+ pub fn reap_task(&self, task_id: u64) -> Result<(), String> {
+ let conn = self.conn.lock().unwrap();
+
+ conn.execute(
+ "update runs set final_status=\"lost signal\", state=4 where id=?1;",
+ [task_id]
+ ).unwrap();
+
+ Ok(())
+ }
+
+ pub fn metrics_for_run(&self, run: u64) -> Result<Vec<MetricRecord>, String> {
+ let conn = self.conn.lock().unwrap();
+
+ let mut metrics_query = conn.prepare(sql::METRICS_FOR_RUN).unwrap();
+ let mut result = metrics_query.query([run]).unwrap();
+ let mut metrics = Vec::new();
+
+ while let Some(row) = result.next().unwrap() {
+ let (id, run_id, name, value): (u64, u64, String, String) = row.try_into().unwrap();
+ metrics.push(MetricRecord { id, run_id, name, value });
+ }
+
+ Ok(metrics)
+ }
+
+ pub fn artifacts_for_run(&self, run: u64, limit: Option<u64>) -> Result<Vec<ArtifactRecord>, String> {
+ let conn = self.conn.lock().unwrap();
+
+ let mut artifacts_query = conn.prepare(sql::LAST_ARTIFACTS_FOR_RUN).unwrap();
+ let mut result = artifacts_query.query([run, limit.unwrap_or(65535)]).unwrap();
+ let mut artifacts = Vec::new();
+
+ while let Some(row) = result.next().unwrap() {
+ let (id, run_id, name, desc, created_time, completed_time): (u64, u64, String, String, u64, Option<u64>) = row.try_into().unwrap();
+ artifacts.push(ArtifactRecord { id, run_id, name, desc, created_time, completed_time });
+ }
+
+ Ok(artifacts)
+ }
+
+ pub fn repo_by_id(&self, id: u64) -> Result<Option<Repo>, String> {
+ self.conn.lock()
+ .unwrap()
+ .query_row("select id, repo_name, default_run_preference from repos where id=?1", [id], |row| {
+ let (id, repo_name, default_run_preference) = row.try_into().unwrap();
+ Ok(Repo {
+ id,
+ name: repo_name,
+ default_run_preference,
+ })
+ })
+ .optional()
+ .map_err(|e| e.to_string())
+ }
+
+ pub fn get_repos(&self) -> Result<Vec<Repo>, String> {
+ let conn = self.conn.lock().unwrap();
+
+ let mut repos_query = conn.prepare(sql::ALL_REPOS).unwrap();
+ let mut repos = repos_query.query([]).unwrap();
+ let mut result = Vec::new();
+
+ while let Some(row) = repos.next().unwrap() {
+ let (id, repo_name, default_run_preference) = row.try_into().unwrap();
+ result.push(Repo {
+ id,
+ name: repo_name,
+ default_run_preference,
+ });
+ }
+
+ Ok(result)
+ }
+
+ pub fn last_job_from_remote(&self, id: u64) -> Result<Option<Job>, String> {
+ self.recent_jobs_from_remote(id, 1)
+ .map(|mut jobs| jobs.pop())
+ }
+
+ pub fn job_by_commit_id(&self, commit_id: u64) -> Result<Option<Job>, String> {
+ let conn = self.conn.lock().unwrap();
+
+ conn
+ .query_row(sql::JOB_BY_COMMIT_ID, [commit_id], |row| {
+ let (id, source, created_time, remote_id, commit_id, run_preferences) = row.try_into().unwrap();
+ Ok(Job {
+ id,
+ remote_id,
+ commit_id,
+ created_time,
+ source,
+ run_preferences,
+ })
+ })
+ .optional()
+ .map_err(|e| e.to_string())
+ }
+
+ pub fn recent_jobs_from_remote(&self, id: u64, limit: u64) -> Result<Vec<Job>, String> {
+ let conn = self.conn.lock().unwrap();
+
+ let mut job_query = conn.prepare(sql::LAST_JOBS_FROM_REMOTE).unwrap();
+ let mut result = job_query.query([id, limit]).unwrap();
+
+ let mut jobs = Vec::new();
+
+ while let Some(row) = result.next().unwrap() {
+ let (id, source, created_time, remote_id, commit_id, run_preferences) = row.try_into().unwrap();
+ jobs.push(Job {
+ id,
+ remote_id,
+ commit_id,
+ created_time,
+ source,
+ run_preferences,
+ });
+ }
+
+ Ok(jobs)
+ }
+
+ pub fn get_active_runs(&self) -> Result<Vec<Run>, String> {
+ let conn = self.conn.lock().unwrap();
+
+ let mut started_query = conn.prepare(sql::ACTIVE_RUNS).unwrap();
+ let mut runs = started_query.query([]).unwrap();
+ let mut started = Vec::new();
+
+ while let Some(row) = runs.next().unwrap() {
+ started.push(Self::row2run(row));
+ }
+
+ Ok(started)
+ }
+
+ pub fn get_pending_runs(&self, host_id: Option<u32>) -> Result<Vec<PendingRun>, String> {
+ let conn = self.conn.lock().unwrap();
+
+ let mut pending_query = conn.prepare(sql::PENDING_RUNS).unwrap();
+ let mut runs = pending_query.query([host_id]).unwrap();
+ let mut pending = Vec::new();
+
+ while let Some(row) = runs.next().unwrap() {
+ let (id, job_id, create_time) = row.try_into().unwrap();
+ let run = PendingRun {
+ id,
+ job_id,
+ create_time,
+ };
+ pending.push(run);
+ }
+
+ Ok(pending)
+ }
+
+ pub fn jobs_needing_task_runs_for_host(&self, host_id: u64) -> Result<Vec<Job>, String> {
+ // for jobs that this host has not run, we'll arbitrarily say that we won't generate new
+ // runs for jobs more than a day old.
+ //
+ // we don't want to rebuild the entire history every time we see a new host by default; if
+ // you really want to rebuild all of history on a new host, use `ci_ctl` to prepare the
+ // runs.
+ let cutoff = crate::now_ms() - 24 * 3600 * 1000;
+
+ let conn = self.conn.lock().unwrap();
+
+ let mut jobs_needing_task_runs = conn.prepare(sql::JOBS_NEEDING_HOST_RUN).unwrap();
+ let mut job_rows = jobs_needing_task_runs.query([cutoff, host_id]).unwrap();
+ let mut jobs = Vec::new();
+
+ while let Some(row) = job_rows.next().unwrap() {
+ let (id, source, created_time, remote_id, commit_id, run_preferences) = row.try_into().unwrap();
+
+ jobs.push(Job {
+ id, source, created_time, remote_id, commit_id, run_preferences,
+ });
+ }
+
+ Ok(jobs)
+ }
+
+
+ pub fn remotes_by_repo(&self, repo_id: u64) -> Result<Vec<Remote>, String> {
+ let mut remotes: Vec<Remote> = Vec::new();
+
+ let conn = self.conn.lock().unwrap();
+ let mut remotes_query = conn.prepare(crate::sql::REMOTES_FOR_REPO).unwrap();
+ let mut remote_results = remotes_query.query([repo_id]).unwrap();
+
+ while let Some(row) = remote_results.next().unwrap() {
+ let (id, repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path) = row.try_into().unwrap();
+ remotes.push(Remote { id, repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path });
+ }
+
+ Ok(remotes)
+ }
+
+ /// try to find a host close to `host_info`, but maybe not an exact match.
+ ///
+ /// specifically, we'll ignore microcode and family/os - enough that measurements ought to be
+ /// comparable but maybe not perfectly so.
+ pub fn find_id_like_host(&self, host_info: &crate::protocol::HostInfo) -> Result<Option<u32>, String> {
+ self.conn.lock()
+ .unwrap()
+ .query_row(
+ "select id from hosts where \
+ hostname=?1 and cpu_vendor_id=?2 and cpu_model_name=?3 and cpu_family=?4 and \
+ cpu_model=?5 and cpu_max_freq_khz=?6 and cpu_cores=?7 and mem_total=?8 and \
+ arch=?9);",
+ (
+ &host_info.hostname,
+ &host_info.cpu_info.vendor_id,
+ &host_info.cpu_info.model_name,
+ &host_info.cpu_info.family,
+ &host_info.cpu_info.model,
+ &host_info.cpu_info.max_freq,
+ &host_info.cpu_info.cores,
+ &host_info.memory_info.total,
+ &host_info.env_info.arch,
+ ),
+ |row| { row.get(0) }
+ )
+ .map_err(|e| e.to_string())
+ }
+
+ /// get an id for the host described by `host_info`. this may create a new record if no such
+ /// host exists.
+ pub fn id_for_host(&self, host_info: &crate::protocol::HostInfo) -> Result<u32, String> {
+ let conn = self.conn.lock().unwrap();
+
+ conn
+ .execute(
+ "insert or ignore into hosts \
+ (\
+ hostname, cpu_vendor_id, cpu_model_name, cpu_family, \
+ cpu_model, cpu_microcode, cpu_max_freq_khz, cpu_cores, \
+ mem_total, arch, family, os\
+ ) values (\
+ ?1, ?2, ?3, ?4, \
+ ?5, ?6, ?7, ?8, \
+ ?9, ?10, ?11, ?12 \
+ );",
+ (
+ &host_info.hostname,
+ &host_info.cpu_info.vendor_id,
+ &host_info.cpu_info.model_name,
+ &host_info.cpu_info.family,
+ &host_info.cpu_info.model,
+ &host_info.cpu_info.microcode,
+ &host_info.cpu_info.max_freq,
+ &host_info.cpu_info.cores,
+ &host_info.memory_info.total,
+ &host_info.env_info.arch,
+ &host_info.env_info.family,
+ &host_info.env_info.os,
+ )
+ )
+ .expect("can insert");
+
+ conn
+ .query_row(
+ "select id from hosts where \
+ hostname=?1 and cpu_vendor_id=?2 and cpu_model_name=?3 and cpu_family=?4 and \
+ cpu_model=?5 and cpu_microcode=?6 and cpu_max_freq_khz=?7 and \
+ cpu_cores=?8 and mem_total=?9 and arch=?10 and family=?11 and os=?12;",
+ (
+ &host_info.hostname,
+ &host_info.cpu_info.vendor_id,
+ &host_info.cpu_info.model_name,
+ &host_info.cpu_info.family,
+ &host_info.cpu_info.model,
+ &host_info.cpu_info.microcode,
+ &host_info.cpu_info.max_freq,
+ &host_info.cpu_info.cores,
+ &host_info.memory_info.total,
+ &host_info.env_info.arch,
+ &host_info.env_info.family,
+ &host_info.env_info.os,
+ ),
+ |row| { row.get(0) }
+ )
+ .map_err(|e| e.to_string())
+ }
+
+ pub fn host_model_info(&self, host_id: u64) -> Result<(String, String, String, String, u64), String> {
+ let conn = self.conn.lock().unwrap();
+ conn
+ .query_row("select hostname, cpu_vendor_id, cpu_family, cpu_model, cpu_max_freq_khz from hosts where id=?1;", [host_id], |row| {
+ Ok((
+ row.get(0).unwrap(),
+ row.get(1).unwrap(),
+ row.get(2).unwrap(),
+ row.get(3).unwrap(),
+ row.get(4).unwrap(),
+ ))
+ })
+ .map_err(|e| e.to_string())
+ }
+
+ pub fn runs_for_job_one_per_host(&self, job_id: u64) -> Result<Vec<Run>, String> {
+ let conn = self.conn.lock().unwrap();
+ let mut runs_query = conn.prepare(crate::sql::RUNS_FOR_JOB).unwrap();
+ let mut runs_results = runs_query.query([job_id]).unwrap();
+
+ let mut results = Vec::new();
+
+ while let Some(row) = runs_results.next().unwrap() {
+ results.push(Self::row2run(row));
+ }
+
+ Ok(results)
+ }
+
+ pub fn last_run_for_job(&self, job_id: u64) -> Result<Option<Run>, String> {
+ let conn = self.conn.lock().unwrap();
+
+ conn
+ .query_row(sql::LAST_RUN_FOR_JOB, [job_id], |row| {
+ Ok(Self::row2run(row))
+ })
+ .optional()
+ .map_err(|e| e.to_string())
+ }
+
+ pub(crate) fn row2run(row: &rusqlite::Row) -> Run {
+ let (id, job_id, artifacts_path, state, host_id, build_token, create_time, start_time, complete_time, run_timeout, build_result, final_text) = row.try_into().unwrap();
+ let state: u8 = state;
+ Run {
+ id,
+ job_id,
+ artifacts_path,
+ state: state.try_into().unwrap(),
+ host_id,
+ create_time,
+ start_time,
+ complete_time,
+ build_token,
+ run_timeout,
+ build_result,
+ final_text,
+ }
+ }
+}
diff --git a/ci-lib-core/src/lib.rs b/ci-lib-core/src/lib.rs
new file mode 100644
index 0000000..c20ce8e
--- /dev/null
+++ b/ci-lib-core/src/lib.rs
@@ -0,0 +1,13 @@
+use std::time::{SystemTime, UNIX_EPOCH};
+
+
+pub mod protocol;
+pub mod sql;
+pub mod dbctx;
+
+pub fn now_ms() -> u64 {
+ SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .expect("now is later than epoch")
+ .as_millis() as u64
+}
diff --git a/ci-lib-core/src/protocol.rs b/ci-lib-core/src/protocol.rs
new file mode 100644
index 0000000..c7a9318
--- /dev/null
+++ b/ci-lib-core/src/protocol.rs
@@ -0,0 +1,114 @@
+use serde::{Serialize, Deserialize};
+
+#[derive(Serialize, Deserialize, Debug)]
+#[serde(tag = "kind")]
+#[serde(rename_all = "snake_case")]
+pub enum ClientProto {
+ Started,
+ ArtifactCreate,
+ NewTask(RequestedJob),
+ NewTaskPlease { allowed_pushers: Option<Vec<String>>, host_info: HostInfo },
+ Metric { name: String, value: String },
+ Command(CommandInfo),
+ TaskStatus(TaskInfo),
+ Ping,
+ Pong,
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+#[serde(tag = "command_info")]
+#[serde(rename_all = "snake_case")]
+pub enum CommandInfo {
+ Started { command: Vec<String>, cwd: Option<String>, id: u32 },
+ Finished { exit_code: Option<i32>, id: u32 },
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+#[serde(tag = "task_info")]
+#[serde(rename_all = "snake_case")]
+pub enum TaskInfo {
+ Finished { status: String },
+ Interrupted { status: String, description: Option<String> },
+}
+
+impl ClientProto {
+ pub fn metric(name: impl Into<String>, value: impl Into<String>) -> Self {
+ ClientProto::Metric { name: name.into(), value: value.into() }
+ }
+
+ pub fn command(state: CommandInfo) -> Self {
+ ClientProto::Command(state)
+ }
+
+ pub fn new_task_please(allowed_pushers: Option<Vec<String>>, host_info: HostInfo) -> Self {
+ ClientProto::NewTaskPlease { allowed_pushers, host_info }
+ }
+
+ pub fn task_status(state: TaskInfo) -> Self {
+ ClientProto::TaskStatus(state)
+ }
+
+ pub fn new_task(task: RequestedJob) -> Self {
+ ClientProto::NewTask(task)
+ }
+}
+
+impl CommandInfo {
+ pub fn started(command: impl Into<Vec<String>>, cwd: Option<&str>, id: u32) -> Self {
+ CommandInfo::Started { command: command.into(), cwd: cwd.map(ToOwned::to_owned), id }
+ }
+
+ pub fn finished(exit_code: Option<i32>, id: u32) -> Self {
+ CommandInfo::Finished { exit_code, id }
+ }
+}
+
+impl TaskInfo {
+ pub fn finished(status: impl Into<String>) -> Self {
+ TaskInfo::Finished { status: status.into() }
+ }
+
+ pub fn interrupted(status: impl Into<String>, description: impl Into<Option<String>>) -> Self {
+ TaskInfo::Interrupted { status: status.into(), description: description.into() }
+ }
+}
+
+#[derive(Serialize, Deserialize, Debug, Clone)]
+pub struct HostInfo {
+ pub hostname: String,
+ pub cpu_info: CpuInfo,
+ pub memory_info: MemoryInfo,
+ pub env_info: EnvInfo,
+}
+
+#[derive(Serialize, Deserialize, Debug, Clone)]
+pub struct CpuInfo {
+ pub model_name: String,
+ pub microcode: String,
+ pub cores: u32,
+ pub vendor_id: String,
+ pub family: String,
+ pub model: String,
+ // clock speed in khz
+ pub max_freq: u64,
+}
+
+#[derive(Serialize, Deserialize, Debug, Clone)]
+pub struct MemoryInfo {
+ pub total: String,
+ pub available: String,
+}
+
+#[derive(Serialize, Deserialize, Debug, Clone)]
+pub struct EnvInfo {
+ pub arch: String,
+ pub family: String,
+ pub os: String,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct RequestedJob {
+ pub commit: String,
+ pub remote_url: String,
+ pub build_token: String,
+}
diff --git a/ci-lib-core/src/sql.rs b/ci-lib-core/src/sql.rs
new file mode 100644
index 0000000..2aeb52b
--- /dev/null
+++ b/ci-lib-core/src/sql.rs
@@ -0,0 +1,319 @@
+#![allow(dead_code)]
+
+use std::convert::TryFrom;
+
+#[derive(Debug, Clone)]
+pub struct PendingRun {
+ pub id: u64,
+ pub job_id: u64,
+ pub create_time: u64,
+}
+
+impl Run {
+ fn into_pending_run(self) -> PendingRun {
+ PendingRun {
+ id: self.id,
+ job_id: self.job_id,
+ create_time: self.create_time,
+ }
+ }
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum TokenValidity {
+ Expired,
+ Invalid,
+ Valid,
+}
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct MetricRecord {
+ pub id: u64,
+ pub run_id: u64,
+ pub name: String,
+ pub value: String
+}
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct ArtifactRecord {
+ pub id: u64,
+ pub run_id: u64,
+ pub name: String,
+ pub desc: String,
+ pub created_time: u64,
+ pub completed_time: Option<u64>,
+}
+
+#[derive(Debug, Clone)]
+pub struct Repo {
+ pub id: u64,
+ pub name: String,
+ pub default_run_preference: Option<String>,
+}
+
+#[derive(Debug)]
+pub struct Remote {
+ pub id: u64,
+ pub repo_id: u64,
+ pub remote_path: String,
+ pub remote_api: String,
+ pub remote_url: String,
+ pub remote_git_url: String,
+ pub notifier_config_path: String,
+}
+
+// a job tracks when we became aware of a commit from remote. typically a job will have a 1-1
+// relationship with commits, and potentially many *runs* of that job.
+#[derive(Debug, Clone)]
+pub struct Job {
+ pub id: u64,
+ pub remote_id: u64,
+ pub commit_id: u64,
+ pub created_time: u64,
+ pub source: Option<String>,
+ pub run_preferences: Option<String>,
+}
+
+// a run tracks the intent or obligation to have some runner somewhere run a goodfile and report
+// results. a job may have many runs from many different hosts rebuliding history, or reruns of the
+// same job on the same hardware to collect more datapoints on the operation.
+#[derive(Debug, Clone)]
+pub struct Run {
+ pub id: u64,
+ pub job_id: u64,
+ pub artifacts_path: Option<String>,
+ pub state: RunState,
+ pub host_id: Option<u64>,
+ pub create_time: u64,
+ pub start_time: Option<u64>,
+ pub complete_time: Option<u64>,
+ pub build_token: Option<String>,
+ pub run_timeout: Option<u64>,
+ pub build_result: Option<u8>,
+ pub final_text: Option<String>,
+}
+
+#[derive(Debug, Clone)]
+pub enum JobResult {
+ Pass = 0,
+ Fail = 1,
+}
+
+#[derive(Debug, Copy, Clone, PartialEq)]
+pub enum RunState {
+ Pending = 0,
+ Started = 1,
+ Finished = 2,
+ Error = 3,
+ Invalid = 4,
+}
+
+impl TryFrom<u8> for RunState {
+ type Error = String;
+
+ fn try_from(value: u8) -> Result<Self, String> {
+ match value {
+ 0 => Ok(RunState::Pending),
+ 1 => Ok(RunState::Started),
+ 2 => Ok(RunState::Finished),
+ 3 => Ok(RunState::Error),
+ 4 => Ok(RunState::Invalid),
+ other => Err(format!("invalid job state: {}", other)),
+ }
+ }
+}
+
+/*
+pub(crate) fn row2run(row: &rusqlite::Row) -> Run {
+ let (id, job_id, artifacts_path, state, host_id, build_token, create_time, start_time, complete_time, run_timeout, build_result, final_text) = row.try_into().unwrap();
+ let state: u8 = state;
+ Run {
+ id,
+ job_id,
+ artifacts_path,
+ state: state.try_into().unwrap(),
+ host_id,
+ create_time,
+ start_time,
+ complete_time,
+ build_token,
+ run_timeout,
+ build_result,
+ final_text,
+ }
+}
+*/
+
+// remote_id is the remote from which we were notified. this is necessary so we know which remote
+// to pull from to actually run the job.
+pub const CREATE_JOBS_TABLE: &'static str = "\
+ CREATE TABLE IF NOT EXISTS jobs (id INTEGER PRIMARY KEY AUTOINCREMENT,
+ source TEXT,
+ created_time INTEGER,
+ remote_id INTEGER,
+ commit_id INTEGER,
+ run_preferences TEXT);";
+
+pub const CREATE_METRICS_TABLE: &'static str = "\
+ CREATE TABLE IF NOT EXISTS metrics (id INTEGER PRIMARY KEY AUTOINCREMENT,
+ job_id INTEGER,
+ name TEXT,
+ value TEXT,
+ UNIQUE(job_id, name)
+ );";
+
+pub const CREATE_COMMITS_TABLE: &'static str = "\
+ CREATE TABLE IF NOT EXISTS commits (id INTEGER PRIMARY KEY AUTOINCREMENT, sha TEXT UNIQUE);";
+
+pub const CREATE_REPOS_TABLE: &'static str = "\
+ CREATE TABLE IF NOT EXISTS repos (id INTEGER PRIMARY KEY AUTOINCREMENT,
+ repo_name TEXT,
+ default_run_preference TEXT);";
+
+// remote_api is `github` or NULL for now. hopefully a future cgit-style notifier one day.
+// remote_path is some unique identifier for the relevant remote.
+// * for `github` remotes, this will be `owner/repo`.
+// * for others.. who knows.
+// remote_url is a url for human interaction with the remote (think https://git.iximeow.net/zvm)
+// remote_git_url is a url that can be `git clone`'d to fetch sources
+pub const CREATE_REMOTES_TABLE: &'static str = "\
+ CREATE TABLE IF NOT EXISTS remotes (id INTEGER PRIMARY KEY AUTOINCREMENT,
+ repo_id INTEGER,
+ remote_path TEXT,
+ remote_api TEXT,
+ remote_url TEXT,
+ remote_git_url TEXT,
+ notifier_config_path TEXT);";
+
+pub const CREATE_ARTIFACTS_TABLE: &'static str = "\
+ CREATE TABLE IF NOT EXISTS artifacts (id INTEGER PRIMARY KEY AUTOINCREMENT,
+ run_id INTEGER,
+ name TEXT,
+ desc TEXT,
+ created_time INTEGER,
+ completed_time INTEGER);";
+
+pub const CREATE_RUNS_TABLE: &'static str = "\
+ CREATE TABLE IF NOT EXISTS runs (id INTEGER PRIMARY KEY AUTOINCREMENT,
+ job_id INTEGER,
+ artifacts_path TEXT,
+ state INTEGER NOT NULL,
+ host_id INTEGER,
+ build_token TEXT,
+ created_time INTEGER,
+ started_time INTEGER,
+ complete_time INTEGER,
+ run_timeout INTEGER,
+ build_result INTEGER,
+ final_status TEXT);";
+
+pub const CREATE_HOSTS_TABLE: &'static str = "\
+ CREATE TABLE IF NOT EXISTS hosts (id INTEGER PRIMARY KEY AUTOINCREMENT,
+ hostname TEXT,
+ cpu_vendor_id TEXT,
+ cpu_model_name TEXT,
+ cpu_family TEXT,
+ cpu_model TEXT,
+ cpu_microcode TEXT,
+ cpu_max_freq_khz INTEGER,
+ cpu_cores INTEGER,
+ mem_total TEXT,
+ arch TEXT,
+ family TEXT,
+ os TEXT,
+ UNIQUE(hostname, cpu_vendor_id, cpu_model_name, cpu_family, cpu_model, cpu_microcode, cpu_cores, mem_total, arch, family, os));";
+
+pub const CREATE_REMOTES_INDEX: &'static str = "\
+ CREATE INDEX IF NOT EXISTS 'repo_to_remote' ON remotes(repo_id);";
+
+pub const CREATE_REPO_NAME_INDEX: &'static str = "\
+ CREATE UNIQUE INDEX IF NOT EXISTS 'repo_names' ON repos(repo_name);";
+
+pub const PENDING_RUNS: &'static str = "\
+ select id, job_id, created_time, host_preference from runs where state=0 and (host_preference=?1 or host_preference is null) order by created_time desc;";
+
+pub const JOBS_NEEDING_HOST_RUN: &'static str = "\
+ select jobs.id, jobs.source, jobs.created_time, jobs.remote_id, jobs.commit_id, jobs.run_preferences from jobs \
+ where jobs.run_preferences=\"all\" and jobs.created_time > ?1 \
+ and not exists \
+ (select 1 from runs r2 where r2.job_id = jobs.id and r2.host_id = ?2);";
+
+pub const ACTIVE_RUNS: &'static str = "\
+ select id,
+ job_id,
+ artifacts_path,
+ state,
+ host_id,
+ build_token,
+ created_time,
+ started_time,
+ complete_time,
+ run_timeout,
+ build_result,
+ final_status from runs where state=1 or state=0;";
+
+pub const LAST_ARTIFACTS_FOR_RUN: &'static str = "\
+ select * from artifacts where run_id=?1 and (name like \"%(stderr)%\" or name like \"%(stdout)%\") order by id desc limit ?2;";
+
+pub const JOB_BY_COMMIT_ID: &'static str = "\
+ select id, source, created_time, remote_id, commit_id, run_preferences from jobs where commit_id=?1;";
+
+pub const ARTIFACT_BY_ID: &'static str = "\
+ select * from artifacts where id=?1 and run_id=?2;";
+
+pub const JOB_BY_ID: &'static str = "\
+ select id, source, created_time, remote_id, commit_id, run_preferences from jobs where id=?1";
+
+pub const METRICS_FOR_RUN: &'static str = "\
+ select * from metrics where run_id=?1 order by id asc;";
+
+pub const METRICS_FOR_JOB: &'static str = "\
+ select metrics.id, metrics.run_id, metrics.name, metrics.value from metrics \
+ join runs on runs.id=metrics.run_id \
+ where runs.job_id=?1 \
+ order by metrics.run_id desc, metrics.id desc;";
+
+pub const COMMIT_TO_ID: &'static str = "\
+ select id from commits where sha=?1;";
+
+pub const REMOTES_FOR_REPO: &'static str = "\
+ select * from remotes where repo_id=?1;";
+
+pub const ALL_REPOS: &'static str = "\
+ select id, repo_name, default_run_preference from repos;";
+
+pub const LAST_JOBS_FROM_REMOTE: &'static str = "\
+ select id, source, created_time, remote_id, commit_id, run_preferences from jobs where remote_id=?1 order by created_time desc limit ?2;";
+
+pub const LAST_RUN_FOR_JOB: &'static str = "\
+ select id,
+ job_id,
+ artifacts_path,
+ state,
+ host_id,
+ build_token,
+ created_time,
+ started_time,
+ complete_time,
+ run_timeout,
+ build_result,
+ final_status from runs where job_id=?1 order by started_time desc limit 1;";
+
+pub const RUNS_FOR_JOB: &'static str = "\
+ select id,
+ job_id,
+ artifacts_path,
+ state,
+ host_id,
+ build_token,
+ created_time,
+ started_time,
+ complete_time,
+ run_timeout,
+ build_result,
+ final_status from runs where job_id=?1 group by host_id order by started_time desc, state asc;";
+
+pub const SELECT_ALL_RUNS_WITH_JOB_INFO: &'static str = "\
+ select jobs.id as job_id, runs.id as run_id, runs.state, runs.created_time, jobs.commit_id, jobs.run_preferences
+ from jobs join runs on jobs.id=runs.job_id
+ oder by runs.created_time asc;";