diff options
| author | iximeow <me@iximeow.net> | 2023-07-02 01:01:39 -0700 | 
|---|---|---|
| committer | iximeow <me@iximeow.net> | 2023-07-02 01:01:39 -0700 | 
| commit | 5da80c7631f07913f5d5568046089c91a0deb7d2 (patch) | |
| tree | 8dfe5a48a411ac9610ceb6cd60277a33e661c44f /src | |
| parent | f2366f4f95b0011aab517264e0ae84419abef660 (diff) | |
clean up old stale jobs for clients that have gone away
Diffstat (limited to 'src')
| -rw-r--r-- | src/ci_driver.rs | 67 | ||||
| -rw-r--r-- | src/dbctx.rs | 11 | ||||
| -rw-r--r-- | src/sql.rs | 2 | 
3 files changed, 73 insertions, 7 deletions
| diff --git a/src/ci_driver.rs b/src/ci_driver.rs index 439b4d4..d41cb75 100644 --- a/src/ci_driver.rs +++ b/src/ci_driver.rs @@ -1,5 +1,6 @@  use std::process::Command; -use std::sync::RwLock; +use std::collections::HashMap; +use std::sync::{Mutex, RwLock};  use lazy_static::lazy_static;  use std::io::Read;  use serde_derive::{Deserialize, Serialize}; @@ -8,7 +9,7 @@ use std::fmt;  use std::path::{Path, PathBuf};  use tokio::spawn;  use tokio_stream::wrappers::ReceiverStream; -use std::sync::Arc; +use std::sync::{Arc, Weak};  use std::time::{SystemTime, UNIX_EPOCH};  use axum_server::tls_rustls::RustlsConfig;  use axum::body::StreamBody; @@ -28,12 +29,13 @@ mod sql;  mod notifier;  mod io; -use crate::dbctx::{DbCtx, PendingRun, Job}; +use crate::dbctx::{DbCtx, PendingRun, Job, Run};  use crate::sql::JobResult;  use crate::sql::RunState;  lazy_static! {      static ref AUTH_SECRET: RwLock<Option<String>> = RwLock::new(None); +    static ref ACTIVE_TASKS: Mutex<HashMap<u64, Weak<()>>> = Mutex::new(HashMap::new());  }  fn reserve_artifacts_dir(run: u64) -> std::io::Result<PathBuf> { @@ -140,7 +142,9 @@ struct ClientJob {      remote_git_url: String,      sha: String,      task: PendingRun, -    client: RunnerClient +    client: RunnerClient, +    // exists only as confirmation this `ClientJob` is somewhere, still alive and being processed. +    task_witness: Arc<()>,  }  impl ClientJob { @@ -304,13 +308,16 @@ impl RunnerClient {                  if resp == serde_json::json!({                      "status": "started"                  }) { +                    let task_witness = Arc::new(()); +                    ACTIVE_TASKS.lock().unwrap().insert(job.id, Arc::downgrade(&task_witness));                      eprintln!("resp: {:?}", resp);                      Ok(Some(ClientJob {                          task: job.clone(),                          dbctx: Arc::clone(dbctx),                          sha: sha.to_string(),                          remote_git_url: remote_git_url.to_string(), -                        client: self +                        client: self, +                        task_witness,                      }))                  } else {                      Err("client rejected job".to_string()) @@ -498,11 +505,59 @@ async fn main() {      let dbctx = Arc::new(DbCtx::new(&driver_config.config_path, &driver_config.db_path)); +    dbctx.create_tables().unwrap(); +      let (api_server, mut channel) = make_api_server(Arc::clone(&dbctx)).await;      spawn(axum_server::bind_rustls(driver_config.server_addr.parse().unwrap(), config)            .serve(api_server.into_make_service())); -    dbctx.create_tables().unwrap(); +async fn old_task_reaper(dbctx: Arc<DbCtx>) { +    let mut potentially_stale_tasks = dbctx.get_active_runs().unwrap(); + +    let active_tasks = ACTIVE_TASKS.lock().unwrap(); + +    for (id, witness) in active_tasks.iter() { +        if let Some(idx) = potentially_stale_tasks.iter().position(|task| task.id == *id) { +            potentially_stale_tasks.swap_remove(idx); +        } +    } + +    std::mem::drop(active_tasks); + +    // ok, so we have tasks that are not active, now if the task is started we know someone should +    // be running it and they are not. retain only those tasks, as they are ones we may want to +    // mark dead. +    // +    // further, filter out any tasks created in the last 60 seconds. this is a VERY generous grace +    // period for clients that have accepted a job but for some reason we have not recorded them as +    // active (perhaps they are slow to ack somehow). + +    let stale_threshold = crate::io::now_ms() - 60_000; + +    let stale_tasks: Vec<Run> = potentially_stale_tasks.into_iter().filter(|task| { +        match (task.state, task.start_time) { +            // `run` is atomically set to `Started` and adorned with a `start_time`. disagreement +            // between the two means this run is corrupt and should be reaped. +            (RunState::Started, None) => { +                true +            }, +            (RunState::Started, Some(start_time)) => { +                start_time < stale_threshold +            } +            // and if it's not `started`, it's either pending (not assigned yet, so not stale), or +            // one of the complete statuses. +            _ => { +                false +            } +        } +    }).collect(); + +    for task in stale_tasks.iter() { +        dbctx.reap_task(task.id).expect("works"); +    } +} + +    spawn(old_task_reaper(Arc::clone(&dbctx)));      loop {          let runs = dbctx.get_pending_runs().unwrap(); diff --git a/src/dbctx.rs b/src/dbctx.rs index 0e999f3..3eb68bd 100644 --- a/src/dbctx.rs +++ b/src/dbctx.rs @@ -384,6 +384,17 @@ impl DbCtx {          Ok(run_id)      } +    pub fn reap_task(&self, task_id: u64) -> Result<(), String> { +        let conn = self.conn.lock().unwrap(); + +        conn.execute( +            "update runs set final_status=\"lost signal\", state=4 where id=?1;", +            [task_id] +        ).unwrap(); + +        Ok(()) +    } +      pub fn metrics_for_run(&self, run: u64) -> Result<Vec<MetricRecord>, String> {          let conn = self.conn.lock().unwrap(); @@ -11,7 +11,7 @@ pub enum JobResult {      Fail = 1,  } -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Copy, Clone, PartialEq)]  pub enum RunState {      Pending = 0,      Started = 1, | 
