summaryrefslogtreecommitdiff
path: root/src/dbctx.rs
diff options
context:
space:
mode:
authoriximeow <me@iximeow.net>2023-07-01 14:08:00 -0700
committeriximeow <me@iximeow.net>2023-07-01 14:08:00 -0700
commitec5a274436bc8dda0b55d2c4da1411ff3c52434d (patch)
tree34bc1890b2a0cacd4d9e941e7b2221afe39ecfe9 /src/dbctx.rs
parent4657e736b6067c4dd0e25ad14253fdb8febffd89 (diff)
add a notion of runs distinct from jobs, lets see how well this goes over
Diffstat (limited to 'src/dbctx.rs')
-rw-r--r--src/dbctx.rs216
1 files changed, 118 insertions, 98 deletions
diff --git a/src/dbctx.rs b/src/dbctx.rs
index dcca362..331e55f 100644
--- a/src/dbctx.rs
+++ b/src/dbctx.rs
@@ -36,34 +36,51 @@ pub struct Remote {
pub notifier_config_path: String,
}
+// a job tracks when we became aware of a commit from remote. typically a job will have a 1-1
+// relationship with commits, and potentially many *runs* of that job.
#[derive(Debug, Clone)]
pub struct Job {
pub id: u64,
- pub artifacts_path: Option<String>,
- pub state: sql::JobState,
- pub run_host: Option<String>,
pub remote_id: u64,
pub commit_id: u64,
pub created_time: u64,
+ pub source: Option<String>,
+}
+
+// a run tracks the intent or obligation to have some runner somewhere run a goodfile and report
+// results. a job may have many runs from many different hosts rebuliding history, or reruns of the
+// same job on the same hardware to collect more datapoints on the operation.
+#[derive(Debug, Clone)]
+pub struct Run {
+ pub id: u64,
+ pub job_id: u64,
+ pub artifacts_path: Option<String>,
+ pub state: sql::RunState,
+ pub run_host: Option<String>,
+ pub create_time: u64,
pub start_time: Option<u64>,
pub complete_time: Option<u64>,
pub build_token: Option<String>,
- pub job_timeout: Option<u64>,
- pub source: Option<String>,
+ pub run_timeout: Option<u64>,
pub build_result: Option<u8>,
pub final_text: Option<String>,
}
+impl Run {
+ fn into_pending_run(self) -> PendingRun {
+ PendingRun {
+ id: self.id,
+ job_id: self.job_id,
+ create_time: self.create_time,
+ }
+ }
+}
+
#[derive(Debug, Clone)]
-pub struct PendingJob {
+pub struct PendingRun {
pub id: u64,
- pub artifacts: Option<String>,
- pub state: sql::JobState,
- pub run_host: Option<String>,
- pub remote_id: u64,
- pub commit_id: u64,
- pub created_time: u64,
- pub source: Option<String>,
+ pub job_id: u64,
+ pub create_time: u64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -76,7 +93,7 @@ pub enum TokenValidity {
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MetricRecord {
pub id: u64,
- pub job_id: u64,
+ pub run_id: u64,
pub name: String,
pub value: String
}
@@ -84,7 +101,7 @@ pub struct MetricRecord {
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ArtifactRecord {
pub id: u64,
- pub job_id: u64,
+ pub run_id: u64,
pub name: String,
pub desc: String,
pub created_time: u64,
@@ -113,12 +130,12 @@ impl DbCtx {
Ok(())
}
- pub fn insert_metric(&self, job_id: u64, name: &str, value: &str) -> Result<(), String> {
+ pub fn insert_metric(&self, run_id: u64, name: &str, value: &str) -> Result<(), String> {
let conn = self.conn.lock().unwrap();
conn
.execute(
- "insert into metrics (job_id, name, value) values (?1, ?2, ?3) on conflict (job_id, name) do update set value=excluded.value",
- (job_id, name, value)
+ "insert into metrics (run_id, name, value) values (?1, ?2, ?3) on conflict (run_id, name) do update set value=excluded.value",
+ (run_id, name, value)
)
.expect("can upsert");
Ok(())
@@ -163,7 +180,7 @@ impl DbCtx {
})
}
- pub async fn reserve_artifact(&self, job_id: u64, name: &str, desc: &str) -> Result<ArtifactDescriptor, String> {
+ pub async fn reserve_artifact(&self, run_id: u64, name: &str, desc: &str) -> Result<ArtifactDescriptor, String> {
let artifact_id = {
let created_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
@@ -172,8 +189,8 @@ impl DbCtx {
let conn = self.conn.lock().unwrap();
conn
.execute(
- "insert into artifacts (job_id, name, desc, created_time) values (?1, ?2, ?3, ?4)",
- (job_id, name, desc, created_time)
+ "insert into artifacts (run_id, name, desc, created_time) values (?1, ?2, ?3, ?4)",
+ (run_id, name, desc, created_time)
)
.map_err(|e| {
format!("{:?}", e)
@@ -182,17 +199,17 @@ impl DbCtx {
conn.last_insert_rowid() as u64
};
- ArtifactDescriptor::new(job_id, artifact_id).await
+ ArtifactDescriptor::new(run_id, artifact_id).await
}
- pub fn lookup_artifact(&self, job_id: u64, artifact_id: u64) -> Result<Option<ArtifactRecord>, String> {
+ pub fn lookup_artifact(&self, run_id: u64, artifact_id: u64) -> Result<Option<ArtifactRecord>, String> {
let conn = self.conn.lock().unwrap();
conn
- .query_row(sql::ARTIFACT_BY_ID, [artifact_id, job_id], |row| {
- let (id, job_id, name, desc, created_time, completed_time) = row.try_into().unwrap();
+ .query_row(sql::ARTIFACT_BY_ID, [artifact_id, run_id], |row| {
+ let (id, run_id, name, desc, created_time, completed_time) = row.try_into().unwrap();
Ok(ArtifactRecord {
- id, job_id, name, desc, created_time, completed_time
+ id, run_id, name, desc, created_time, completed_time
})
})
.optional()
@@ -222,11 +239,11 @@ impl DbCtx {
.map_err(|e| e.to_string())
}
- pub fn job_for_token(&self, token: &str) -> Result<Option<(u64, Option<String>, TokenValidity)>, String> {
+ pub fn run_for_token(&self, token: &str) -> Result<Option<(u64, Option<String>, TokenValidity)>, String> {
self.conn.lock()
.unwrap()
.query_row(
- "select id, artifacts_path, started_time, job_timeout from jobs where build_token=?1",
+ "select id, artifacts_path, started_time, run_timeout from runs where build_token=?1",
[token],
|row| {
let timeout: Option<u64> = row.get(3).unwrap();
@@ -254,6 +271,20 @@ impl DbCtx {
.map_err(|e| e.to_string())
}
+ pub fn job_by_id(&self, id: u64) -> Result<Option<Job>, String> {
+ self.conn.lock()
+ .unwrap()
+ .query_row(crate::sql::JOB_BY_ID, [id], |row| {
+ let (id, source, created_time, remote_id, commit_id) = row.try_into().unwrap();
+
+ Ok(Job {
+ id, source, created_time, remote_id, commit_id
+ })
+ })
+ .optional()
+ .map_err(|e| e.to_string())
+ }
+
pub fn remote_by_id(&self, id: u64) -> Result<Option<Remote>, String> {
self.conn.lock()
.unwrap()
@@ -322,40 +353,62 @@ impl DbCtx {
let conn = self.conn.lock().unwrap();
let rows_modified = conn.execute(
- "insert into jobs (state, remote_id, commit_id, created_time, source) values (?1, ?2, ?3, ?4, ?5);",
- (crate::sql::JobState::Pending as u64, remote_id, commit_id, created_time, pusher)
+ "insert into jobs (remote_id, commit_id, created_time, source) values (?1, ?2, ?3, ?4, ?5);",
+ (remote_id, commit_id, created_time, pusher)
).unwrap();
assert_eq!(1, rows_modified);
- Ok(conn.last_insert_rowid() as u64)
+ let job_id = conn.last_insert_rowid() as u64;
+
+ Ok(job_id)
}
- pub fn metrics_for_job(&self, job: u64) -> Result<Vec<MetricRecord>, String> {
+ pub fn new_run(&self, job_id: u64) -> Result<u64, String> {
+ let created_time = SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .expect("now is before epoch")
+ .as_millis() as u64;
+
let conn = self.conn.lock().unwrap();
- let mut metrics_query = conn.prepare(sql::METRICS_FOR_JOB).unwrap();
- let mut result = metrics_query.query([job]).unwrap();
+ let rows_modified = conn.execute(
+ "insert into runs (job_id, state, created_time) values (?1, ?2, ?3);",
+ (job_id, crate::sql::RunState::Pending as u64, created_time)
+ ).unwrap();
+
+ assert_eq!(1, rows_modified);
+
+ let run_id = conn.last_insert_rowid() as u64;
+
+ Ok(run_id)
+ }
+
+ pub fn metrics_for_run(&self, run: u64) -> Result<Vec<MetricRecord>, String> {
+ let conn = self.conn.lock().unwrap();
+
+ let mut metrics_query = conn.prepare(sql::METRICS_FOR_RUN).unwrap();
+ let mut result = metrics_query.query([run]).unwrap();
let mut metrics = Vec::new();
while let Some(row) = result.next().unwrap() {
- let (id, job_id, name, value): (u64, u64, String, String) = row.try_into().unwrap();
- metrics.push(MetricRecord { id, job_id, name, value });
+ let (id, run_id, name, value): (u64, u64, String, String) = row.try_into().unwrap();
+ metrics.push(MetricRecord { id, run_id, name, value });
}
Ok(metrics)
}
- pub fn artifacts_for_job(&self, job: u64, limit: Option<u64>) -> Result<Vec<ArtifactRecord>, String> {
+ pub fn artifacts_for_run(&self, run: u64, limit: Option<u64>) -> Result<Vec<ArtifactRecord>, String> {
let conn = self.conn.lock().unwrap();
- let mut artifacts_query = conn.prepare(sql::LAST_ARTIFACTS_FOR_JOB).unwrap();
- let mut result = artifacts_query.query([job, limit.unwrap_or(65535)]).unwrap();
+ let mut artifacts_query = conn.prepare(sql::LAST_ARTIFACTS_FOR_RUN).unwrap();
+ let mut result = artifacts_query.query([run, limit.unwrap_or(65535)]).unwrap();
let mut artifacts = Vec::new();
while let Some(row) = result.next().unwrap() {
- let (id, job_id, name, desc, created_time, completed_time): (u64, u64, String, String, u64, Option<u64>) = row.try_into().unwrap();
- artifacts.push(ArtifactRecord { id, job_id, name, desc, created_time, completed_time });
+ let (id, run_id, name, desc, created_time, completed_time): (u64, u64, String, String, u64, Option<u64>) = row.try_into().unwrap();
+ artifacts.push(ArtifactRecord { id, run_id, name, desc, created_time, completed_time });
}
Ok(artifacts)
@@ -403,23 +456,13 @@ impl DbCtx {
conn
.query_row(sql::JOB_BY_COMMIT_ID, [commit_id], |row| {
- let (id, artifacts_path, state, run_host, remote_id, commit_id, created_time, start_time, complete_time, build_token, job_timeout, source, build_result, final_text) = row.try_into().unwrap();
- let state: u8 = state;
+ let (id, source, created_time, remote_id, commit_id) = row.try_into().unwrap();
Ok(Job {
id,
- artifacts_path,
- state: state.try_into().unwrap(),
- run_host,
remote_id,
commit_id,
created_time,
- start_time,
- complete_time,
- build_token,
- job_timeout,
source,
- build_result,
- final_text,
})
})
.optional()
@@ -435,77 +478,43 @@ impl DbCtx {
let mut jobs = Vec::new();
while let Some(row) = result.next().unwrap() {
- let (id, artifacts_path, state, run_host, remote_id, commit_id, created_time, start_time, complete_time, build_token, job_timeout, source, build_result, final_text)= row.try_into().unwrap();
- let state: u8 = state;
+ let (id, source, created_time, remote_id, commit_id) = row.try_into().unwrap();
jobs.push(Job {
id,
- artifacts_path,
- state: state.try_into().unwrap(),
- run_host,
remote_id,
commit_id,
created_time,
- start_time,
- complete_time,
- build_token,
- job_timeout,
source,
- build_result,
- final_text,
});
}
Ok(jobs)
}
- pub fn get_active_jobs(&self) -> Result<Vec<Job>, String> {
+ pub fn get_active_runs(&self) -> Result<Vec<Run>, String> {
let conn = self.conn.lock().unwrap();
- let mut started_query = conn.prepare(sql::ACTIVE_JOBS).unwrap();
- let mut jobs = started_query.query([]).unwrap();
+ let mut started_query = conn.prepare(sql::ACTIVE_RUNS).unwrap();
+ let mut runs = started_query.query([]).unwrap();
let mut started = Vec::new();
- while let Some(row) = jobs.next().unwrap() {
- let (id, artifacts_path, state, run_host, remote_id, commit_id, created_time, start_time, complete_time, build_token, job_timeout, source, build_result, final_text) = row.try_into().unwrap();
- let state: u8 = state;
-
- started.push(Job {
- id,
- artifacts_path,
- state: state.try_into().unwrap(),
- run_host,
- remote_id,
- commit_id,
- created_time,
- start_time,
- complete_time,
- build_token,
- job_timeout,
- source,
- build_result,
- final_text,
- });
+ while let Some(row) = runs.next().unwrap() {
+ started.push(crate::sql::row2run(row));
}
Ok(started)
}
- pub fn get_pending_jobs(&self) -> Result<Vec<PendingJob>, String> {
+ pub fn get_pending_runs(&self) -> Result<Vec<PendingRun>, String> {
let conn = self.conn.lock().unwrap();
- let mut pending_query = conn.prepare(sql::PENDING_JOBS).unwrap();
- let mut jobs = pending_query.query([]).unwrap();
+ let mut pending_query = conn.prepare(sql::PENDING_RUNS).unwrap();
+ let mut runs = pending_query.query([]).unwrap();
let mut pending = Vec::new();
- while let Some(row) = jobs.next().unwrap() {
- let (id, artifacts, state, run_host, remote_id, commit_id, created_time, source) = row.try_into().unwrap();
- let state: u8 = state;
- pending.push(PendingJob {
- id, artifacts,
- state: state.try_into().unwrap(),
- run_host, remote_id, commit_id, created_time,
- source,
- });
+ while let Some(row) = runs.next().unwrap() {
+ let run = crate::sql::row2run(row).into_pending_run();
+ pending.push(run);
}
Ok(pending)
@@ -526,6 +535,17 @@ impl DbCtx {
Ok(remotes)
}
+ pub fn last_run_for_job(&self, job_id: u64) -> Result<Option<Run>, String> {
+ let conn = self.conn.lock().unwrap();
+
+ conn
+ .query_row(sql::LAST_RUN_FOR_JOB, [job_id], |row| {
+ Ok(crate::sql::row2run(row))
+ })
+ .optional()
+ .map_err(|e| e.to_string())
+ }
+
pub fn notifiers_by_repo(&self, repo_id: u64) -> Result<Vec<RemoteNotifier>, String> {
let remotes = self.remotes_by_repo(repo_id)?;