summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/ci_driver.rs67
-rw-r--r--src/dbctx.rs11
-rw-r--r--src/sql.rs2
3 files changed, 73 insertions, 7 deletions
diff --git a/src/ci_driver.rs b/src/ci_driver.rs
index 439b4d4..d41cb75 100644
--- a/src/ci_driver.rs
+++ b/src/ci_driver.rs
@@ -1,5 +1,6 @@
use std::process::Command;
-use std::sync::RwLock;
+use std::collections::HashMap;
+use std::sync::{Mutex, RwLock};
use lazy_static::lazy_static;
use std::io::Read;
use serde_derive::{Deserialize, Serialize};
@@ -8,7 +9,7 @@ use std::fmt;
use std::path::{Path, PathBuf};
use tokio::spawn;
use tokio_stream::wrappers::ReceiverStream;
-use std::sync::Arc;
+use std::sync::{Arc, Weak};
use std::time::{SystemTime, UNIX_EPOCH};
use axum_server::tls_rustls::RustlsConfig;
use axum::body::StreamBody;
@@ -28,12 +29,13 @@ mod sql;
mod notifier;
mod io;
-use crate::dbctx::{DbCtx, PendingRun, Job};
+use crate::dbctx::{DbCtx, PendingRun, Job, Run};
use crate::sql::JobResult;
use crate::sql::RunState;
lazy_static! {
static ref AUTH_SECRET: RwLock<Option<String>> = RwLock::new(None);
+ static ref ACTIVE_TASKS: Mutex<HashMap<u64, Weak<()>>> = Mutex::new(HashMap::new());
}
fn reserve_artifacts_dir(run: u64) -> std::io::Result<PathBuf> {
@@ -140,7 +142,9 @@ struct ClientJob {
remote_git_url: String,
sha: String,
task: PendingRun,
- client: RunnerClient
+ client: RunnerClient,
+ // exists only as confirmation this `ClientJob` is somewhere, still alive and being processed.
+ task_witness: Arc<()>,
}
impl ClientJob {
@@ -304,13 +308,16 @@ impl RunnerClient {
if resp == serde_json::json!({
"status": "started"
}) {
+ let task_witness = Arc::new(());
+ ACTIVE_TASKS.lock().unwrap().insert(job.id, Arc::downgrade(&task_witness));
eprintln!("resp: {:?}", resp);
Ok(Some(ClientJob {
task: job.clone(),
dbctx: Arc::clone(dbctx),
sha: sha.to_string(),
remote_git_url: remote_git_url.to_string(),
- client: self
+ client: self,
+ task_witness,
}))
} else {
Err("client rejected job".to_string())
@@ -498,11 +505,59 @@ async fn main() {
let dbctx = Arc::new(DbCtx::new(&driver_config.config_path, &driver_config.db_path));
+ dbctx.create_tables().unwrap();
+
let (api_server, mut channel) = make_api_server(Arc::clone(&dbctx)).await;
spawn(axum_server::bind_rustls(driver_config.server_addr.parse().unwrap(), config)
.serve(api_server.into_make_service()));
- dbctx.create_tables().unwrap();
+async fn old_task_reaper(dbctx: Arc<DbCtx>) {
+ let mut potentially_stale_tasks = dbctx.get_active_runs().unwrap();
+
+ let active_tasks = ACTIVE_TASKS.lock().unwrap();
+
+ for (id, witness) in active_tasks.iter() {
+ if let Some(idx) = potentially_stale_tasks.iter().position(|task| task.id == *id) {
+ potentially_stale_tasks.swap_remove(idx);
+ }
+ }
+
+ std::mem::drop(active_tasks);
+
+ // ok, so we have tasks that are not active, now if the task is started we know someone should
+ // be running it and they are not. retain only those tasks, as they are ones we may want to
+ // mark dead.
+ //
+ // further, filter out any tasks created in the last 60 seconds. this is a VERY generous grace
+ // period for clients that have accepted a job but for some reason we have not recorded them as
+ // active (perhaps they are slow to ack somehow).
+
+ let stale_threshold = crate::io::now_ms() - 60_000;
+
+ let stale_tasks: Vec<Run> = potentially_stale_tasks.into_iter().filter(|task| {
+ match (task.state, task.start_time) {
+ // `run` is atomically set to `Started` and adorned with a `start_time`. disagreement
+ // between the two means this run is corrupt and should be reaped.
+ (RunState::Started, None) => {
+ true
+ },
+ (RunState::Started, Some(start_time)) => {
+ start_time < stale_threshold
+ }
+ // and if it's not `started`, it's either pending (not assigned yet, so not stale), or
+ // one of the complete statuses.
+ _ => {
+ false
+ }
+ }
+ }).collect();
+
+ for task in stale_tasks.iter() {
+ dbctx.reap_task(task.id).expect("works");
+ }
+}
+
+ spawn(old_task_reaper(Arc::clone(&dbctx)));
loop {
let runs = dbctx.get_pending_runs().unwrap();
diff --git a/src/dbctx.rs b/src/dbctx.rs
index 0e999f3..3eb68bd 100644
--- a/src/dbctx.rs
+++ b/src/dbctx.rs
@@ -384,6 +384,17 @@ impl DbCtx {
Ok(run_id)
}
+ pub fn reap_task(&self, task_id: u64) -> Result<(), String> {
+ let conn = self.conn.lock().unwrap();
+
+ conn.execute(
+ "update runs set final_status=\"lost signal\", state=4 where id=?1;",
+ [task_id]
+ ).unwrap();
+
+ Ok(())
+ }
+
pub fn metrics_for_run(&self, run: u64) -> Result<Vec<MetricRecord>, String> {
let conn = self.conn.lock().unwrap();
diff --git a/src/sql.rs b/src/sql.rs
index e911c96..a09e4bf 100644
--- a/src/sql.rs
+++ b/src/sql.rs
@@ -11,7 +11,7 @@ pub enum JobResult {
Fail = 1,
}
-#[derive(Debug, Clone, PartialEq)]
+#[derive(Debug, Copy, Clone, PartialEq)]
pub enum RunState {
Pending = 0,
Started = 1,