diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/ci_driver.rs | 67 | ||||
-rw-r--r-- | src/dbctx.rs | 11 | ||||
-rw-r--r-- | src/sql.rs | 2 |
3 files changed, 73 insertions, 7 deletions
diff --git a/src/ci_driver.rs b/src/ci_driver.rs index 439b4d4..d41cb75 100644 --- a/src/ci_driver.rs +++ b/src/ci_driver.rs @@ -1,5 +1,6 @@ use std::process::Command; -use std::sync::RwLock; +use std::collections::HashMap; +use std::sync::{Mutex, RwLock}; use lazy_static::lazy_static; use std::io::Read; use serde_derive::{Deserialize, Serialize}; @@ -8,7 +9,7 @@ use std::fmt; use std::path::{Path, PathBuf}; use tokio::spawn; use tokio_stream::wrappers::ReceiverStream; -use std::sync::Arc; +use std::sync::{Arc, Weak}; use std::time::{SystemTime, UNIX_EPOCH}; use axum_server::tls_rustls::RustlsConfig; use axum::body::StreamBody; @@ -28,12 +29,13 @@ mod sql; mod notifier; mod io; -use crate::dbctx::{DbCtx, PendingRun, Job}; +use crate::dbctx::{DbCtx, PendingRun, Job, Run}; use crate::sql::JobResult; use crate::sql::RunState; lazy_static! { static ref AUTH_SECRET: RwLock<Option<String>> = RwLock::new(None); + static ref ACTIVE_TASKS: Mutex<HashMap<u64, Weak<()>>> = Mutex::new(HashMap::new()); } fn reserve_artifacts_dir(run: u64) -> std::io::Result<PathBuf> { @@ -140,7 +142,9 @@ struct ClientJob { remote_git_url: String, sha: String, task: PendingRun, - client: RunnerClient + client: RunnerClient, + // exists only as confirmation this `ClientJob` is somewhere, still alive and being processed. + task_witness: Arc<()>, } impl ClientJob { @@ -304,13 +308,16 @@ impl RunnerClient { if resp == serde_json::json!({ "status": "started" }) { + let task_witness = Arc::new(()); + ACTIVE_TASKS.lock().unwrap().insert(job.id, Arc::downgrade(&task_witness)); eprintln!("resp: {:?}", resp); Ok(Some(ClientJob { task: job.clone(), dbctx: Arc::clone(dbctx), sha: sha.to_string(), remote_git_url: remote_git_url.to_string(), - client: self + client: self, + task_witness, })) } else { Err("client rejected job".to_string()) @@ -498,11 +505,59 @@ async fn main() { let dbctx = Arc::new(DbCtx::new(&driver_config.config_path, &driver_config.db_path)); + dbctx.create_tables().unwrap(); + let (api_server, mut channel) = make_api_server(Arc::clone(&dbctx)).await; spawn(axum_server::bind_rustls(driver_config.server_addr.parse().unwrap(), config) .serve(api_server.into_make_service())); - dbctx.create_tables().unwrap(); +async fn old_task_reaper(dbctx: Arc<DbCtx>) { + let mut potentially_stale_tasks = dbctx.get_active_runs().unwrap(); + + let active_tasks = ACTIVE_TASKS.lock().unwrap(); + + for (id, witness) in active_tasks.iter() { + if let Some(idx) = potentially_stale_tasks.iter().position(|task| task.id == *id) { + potentially_stale_tasks.swap_remove(idx); + } + } + + std::mem::drop(active_tasks); + + // ok, so we have tasks that are not active, now if the task is started we know someone should + // be running it and they are not. retain only those tasks, as they are ones we may want to + // mark dead. + // + // further, filter out any tasks created in the last 60 seconds. this is a VERY generous grace + // period for clients that have accepted a job but for some reason we have not recorded them as + // active (perhaps they are slow to ack somehow). + + let stale_threshold = crate::io::now_ms() - 60_000; + + let stale_tasks: Vec<Run> = potentially_stale_tasks.into_iter().filter(|task| { + match (task.state, task.start_time) { + // `run` is atomically set to `Started` and adorned with a `start_time`. disagreement + // between the two means this run is corrupt and should be reaped. + (RunState::Started, None) => { + true + }, + (RunState::Started, Some(start_time)) => { + start_time < stale_threshold + } + // and if it's not `started`, it's either pending (not assigned yet, so not stale), or + // one of the complete statuses. + _ => { + false + } + } + }).collect(); + + for task in stale_tasks.iter() { + dbctx.reap_task(task.id).expect("works"); + } +} + + spawn(old_task_reaper(Arc::clone(&dbctx))); loop { let runs = dbctx.get_pending_runs().unwrap(); diff --git a/src/dbctx.rs b/src/dbctx.rs index 0e999f3..3eb68bd 100644 --- a/src/dbctx.rs +++ b/src/dbctx.rs @@ -384,6 +384,17 @@ impl DbCtx { Ok(run_id) } + pub fn reap_task(&self, task_id: u64) -> Result<(), String> { + let conn = self.conn.lock().unwrap(); + + conn.execute( + "update runs set final_status=\"lost signal\", state=4 where id=?1;", + [task_id] + ).unwrap(); + + Ok(()) + } + pub fn metrics_for_run(&self, run: u64) -> Result<Vec<MetricRecord>, String> { let conn = self.conn.lock().unwrap(); @@ -11,7 +11,7 @@ pub enum JobResult { Fail = 1, } -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Copy, Clone, PartialEq)] pub enum RunState { Pending = 0, Started = 1, |