diff options
| -rw-r--r-- | src/ci_ctl.rs | 17 | ||||
| -rw-r--r-- | src/ci_driver.rs | 124 | ||||
| -rw-r--r-- | src/ci_runner.rs | 2 | ||||
| -rw-r--r-- | src/dbctx.rs | 216 | ||||
| -rw-r--r-- | src/main.rs | 140 | ||||
| -rw-r--r-- | src/sql.rs | 127 | 
6 files changed, 340 insertions, 286 deletions
| diff --git a/src/ci_ctl.rs b/src/ci_ctl.rs index 36eada5..15feca9 100644 --- a/src/ci_ctl.rs +++ b/src/ci_ctl.rs @@ -5,7 +5,6 @@ mod dbctx;  mod notifier;  mod io; -use sql::JobState;  use dbctx::DbCtx;  use notifier::NotifierConfig; @@ -82,28 +81,26 @@ fn main() {                  JobAction::List => {                      let db = DbCtx::new(&config_path, &db_path);                      let mut conn = db.conn.lock().unwrap(); -                    let mut query = conn.prepare("select id, artifacts_path, state, commit_id, created_time from jobs;").unwrap(); +                    let mut query = conn.prepare(crate::sql::SELECT_ALL_RUNS_WITH_JOB_INFO).unwrap();                      let mut jobs = query.query([]).unwrap();                      while let Some(row) = jobs.next().unwrap() { -                        let (id, artifacts, state, commit_id, created_time): (u64, Option<String>, u64, u64, u64) = row.try_into().unwrap(); +                        let (job_id, run_id, state, created_time, commit_id): (u64, u64, u64, u64, u64) = row.try_into().unwrap(); -                        eprintln!("[+] {:04} | {: >8?} | {}", id, state, created_time); +                        eprintln!("[+] {:04} ({:04}) | {: >8?} | {} | {}", run_id, job_id, state, created_time, commit_id);                      }                      eprintln!("jobs");                  },                  JobAction::Rerun { which } => {                      let db = DbCtx::new(&config_path, &db_path); -                    db.conn.lock().unwrap().execute("update jobs set state=0 where id=?1", [which]) -                        .expect("works"); -                    eprintln!("[+] job {} set to pending", which); +                    let task_id = db.new_run(which as u64).expect("db can be queried"); +                    eprintln!("[+] rerunning job {} as task {}", which, task_id);                  }                  JobAction::RerunCommit { commit } => {                      let db = DbCtx::new(&config_path, &db_path);                      let job_id = db.job_for_commit(&commit).unwrap();                      if let Some(job_id) = job_id { -                        db.conn.lock().unwrap().execute("update jobs set state=0 where id=?1", [job_id]) -                            .expect("works"); -                        eprintln!("[+] job {} (commit {}) set to pending", job_id, commit); +                        let task_id = db.new_run(job_id).expect("db can be queried"); +                        eprintln!("[+] rerunning job {} (commit {}) as task {}", job_id, commit, task_id);                      } else {                          eprintln!("[-] no job for commit {}", commit);                      } diff --git a/src/ci_driver.rs b/src/ci_driver.rs index 0f629e7..25928ae 100644 --- a/src/ci_driver.rs +++ b/src/ci_driver.rs @@ -28,17 +28,17 @@ mod sql;  mod notifier;  mod io; -use crate::dbctx::{DbCtx, PendingJob}; +use crate::dbctx::{DbCtx, PendingRun, Job};  use crate::sql::JobResult; -use crate::sql::JobState; +use crate::sql::RunState;  lazy_static! {      static ref AUTH_SECRET: RwLock<Option<String>> = RwLock::new(None);  } -fn reserve_artifacts_dir(job: u64) -> std::io::Result<PathBuf> { -    let mut path: PathBuf = "/root/ixi_ci_server/jobs/".into(); -    path.push(job.to_string()); +fn reserve_artifacts_dir(run: u64) -> std::io::Result<PathBuf> { +    let mut path: PathBuf = "/root/ixi_ci_server/artifacts/".into(); +    path.push(run.to_string());      match std::fs::create_dir(&path) {          Ok(()) => {              Ok(path) @@ -53,9 +53,12 @@ fn reserve_artifacts_dir(job: u64) -> std::io::Result<PathBuf> {      }  } -async fn activate_job(dbctx: Arc<DbCtx>, job: &PendingJob, clients: &mut mpsc::Receiver<RunnerClient>) -> Result<(), String> { -    eprintln!("activating job {:?}", job); +async fn activate_run(dbctx: Arc<DbCtx>, run: &PendingRun, clients: &mut mpsc::Receiver<RunnerClient>) -> Result<(), String> { +    eprintln!("activating task {:?}", run); +    let job = dbctx.job_by_id(run.job_id).expect("can query").expect("job exists"); +      let connection = dbctx.conn.lock().unwrap(); +      let (repo_id, remote_git_url): (u64, String) = connection          .query_row("select repo_id, remote_git_url from remotes where id=?1", [job.remote_id], |row| Ok((row.get_unwrap(0), row.get_unwrap(1))))          .expect("query succeeds"); @@ -73,52 +76,20 @@ async fn activate_job(dbctx: Arc<DbCtx>, job: &PendingJob, clients: &mut mpsc::R          .expect("can run query");      std::mem::drop(connection); -    let artifacts: PathBuf = match &job.artifacts { -        Some(artifacts) => PathBuf::from(artifacts), -        None => reserve_artifacts_dir(job.id).expect("can reserve a directory for artifacts") -    }; -    /* - -    if job.run_host.as_ref() == None { -        eprintln!("need to find a host to run the job"); -    } - -    eprintln!("cloning {}", remote_git_url); -    let mut repo_dir = artifacts.clone(); -    repo_dir.push("repo"); -    eprintln!(" ... into {}", repo_dir.display()); - -    Command::new("git") -        .arg("clone") -        .arg(&remote_git_url) -        .arg(&format!("{}", repo_dir.display())) -        .status() -        .expect("can clone the repo"); - -    eprintln!("checking out {}", commit_sha); -    Command::new("git") -        .current_dir(&repo_dir) -        .arg("checkout") -        .arg(&commit_sha) -        .status() -        .expect("can checkout hash"); -    */ +    let artifacts: PathBuf = reserve_artifacts_dir(run.id).expect("can reserve a directory for artifacts");      eprintln!("running {}", repo_name); -    /* -     * find the CI script, figure out how to run it -     */      let mut client_job = loop {          let mut candidate = clients.recv().await              .ok_or_else(|| "client channel disconnected".to_string())?; -        if !candidate.will_accept(job) { +        if !candidate.will_accept(&job) {              eprintln!("client {:?} would not accept job {:?}", candidate, job);              continue;          } -        if let Ok(Some(mut client_job)) = candidate.submit(&dbctx, &job, &remote_git_url, &commit_sha).await { +        if let Ok(Some(mut client_job)) = candidate.submit(&dbctx, &run, &remote_git_url, &commit_sha).await {              break client_job;          } else {              // failed to submit job, move on for now @@ -168,8 +139,8 @@ struct ClientJob {      dbctx: Arc<DbCtx>,      remote_git_url: String,      sha: String, -    job: PendingJob, -    client: RunnerClient     +    task: PendingRun, +    client: RunnerClient  }  impl ClientJob { @@ -179,49 +150,50 @@ impl ClientJob {              let msg = match self.client.recv().await.expect("recv works") {                  Some(msg) => msg,                  None => { -                    eprintln!("client hung up. job's done, i hope?"); +                    eprintln!("client hung up. task's done, i hope?");                      return;                  }              };              eprintln!("got {:?}", msg);              let msg_kind = msg.as_object().unwrap().get("kind").unwrap().as_str().unwrap();              match msg_kind { -                "new_job_please" => { -                    eprintln!("misdirected job request (after handshake?)"); +                "new_task_please" => { +                    eprintln!("misdirected task request (after handshake?)");                      return;                  }, -                "job_status" => { +                "task_status" => {                      let state = msg.as_object().unwrap().get("state").unwrap().as_str().unwrap(); -                    let (result, state): (Result<String, String>, JobState) = if state == "finished" { +                    let (result, state): (Result<String, String>, RunState) = if state == "finished" {                          let result = msg.as_object().unwrap().get("result").unwrap().as_str().unwrap(); -                        eprintln!("job update: state is {} and result is {}", state, result); +                        eprintln!("task update: state is {} and result is {}", state, result);                          match result {                              "pass" => { -                                (Ok("success".to_string()), JobState::Finished) +                                (Ok("success".to_string()), RunState::Finished)                              },                              other => {                                  let desc = msg.as_object().unwrap().get("desc")                                      .map(|x| x.as_str().unwrap().to_string())                                      .unwrap_or_else(|| other.to_string()); -                                (Err(desc), JobState::Error) +                                (Err(desc), RunState::Error)                              }                          }                      } else if state == "interrupted" {                          let result = msg.as_object().unwrap().get("result").unwrap().as_str().unwrap(); -                        eprintln!("job update: state is {} and result is {}", state, result); +                        eprintln!("task update: state is {} and result is {}", state, result);                          let desc = msg.as_object().unwrap().get("desc")                              .map(|x| x.as_str().unwrap().to_string())                              .unwrap_or_else(|| result.to_string()); -                        (Err(desc), JobState::Error) +                        (Err(desc), RunState::Error)                      } else { -                        eprintln!("job update: state is {}", state); -                        (Err(format!("atypical completion status: {}", state)), JobState::Invalid) +                        eprintln!("task update: state is {}", state); +                        (Err(format!("atypical completion status: {}", state)), RunState::Invalid)                      }; -                    let repo_id = self.dbctx.repo_id_by_remote(self.job.remote_id).unwrap().expect("remote exists"); +                    let job = self.dbctx.job_by_id(self.task.job_id).expect("can query").expect("job exists"); +                    let repo_id = self.dbctx.repo_id_by_remote(job.remote_id).unwrap().expect("remote exists");                      for notifier in self.dbctx.notifiers_by_repo(repo_id).expect("can get notifiers") { -                        if let Err(e) = notifier.tell_complete_job(&self.dbctx, repo_id, &self.sha, self.job.id, result.clone()).await { +                        if let Err(e) = notifier.tell_complete_job(&self.dbctx, repo_id, &self.sha, self.task.id, result.clone()).await {                              eprintln!("could not notify {:?}: {:?}", notifier.remote_path, e);                          }                      } @@ -242,8 +214,8 @@ impl ClientJob {                      };                      self.dbctx.conn.lock().unwrap().execute( -                        "update jobs set complete_time=?1, state=?2, build_result=?3, final_status=?4 where id=?5", -                        (now as u64, state as u64, build_result as u8, result_desc, self.job.id) +                        "update runs set complete_time=?1, state=?2, build_result=?3, final_status=?4 where id=?5", +                        (now as u64, state as u64, build_result as u8, result_desc, self.task.id)                      )                          .expect("can update");                  } @@ -258,7 +230,7 @@ impl ClientJob {                      let name = msg.as_object().unwrap().get("name").unwrap().as_str().unwrap();                      let value = msg.as_object().unwrap().get("value").unwrap().as_str().unwrap(); -                    self.dbctx.insert_metric(self.job.id, name, value) +                    self.dbctx.insert_metric(self.task.id, name, value)                          .expect("TODO handle metric insert error?");                  }                  "command" => { @@ -311,7 +283,7 @@ impl RunnerClient {      }      // is this client willing to run the job based on what it has told us so far? -    fn will_accept(&self, job: &PendingJob) -> bool { +    fn will_accept(&self, job: &Job) -> bool {          match (job.source.as_ref(), self.accepted_sources.as_ref()) {              (_, None) => true,              (None, Some(_)) => false, @@ -321,7 +293,7 @@ impl RunnerClient {          }      } -    async fn submit(mut self, dbctx: &Arc<DbCtx>, job: &PendingJob, remote_git_url: &str, sha: &str) -> Result<Option<ClientJob>, String> { +    async fn submit(mut self, dbctx: &Arc<DbCtx>, job: &PendingRun, remote_git_url: &str, sha: &str) -> Result<Option<ClientJob>, String> {          self.send(serde_json::json!({              "commit": sha,              "remote_url": remote_git_url, @@ -334,7 +306,7 @@ impl RunnerClient {                  }) {                      eprintln!("resp: {:?}", resp);                      Ok(Some(ClientJob { -                        job: job.clone(), +                        task: job.clone(),                          dbctx: Arc::clone(dbctx),                          sha: sha.to_string(),                          remote_git_url: remote_git_url.to_string(), @@ -364,18 +336,18 @@ impl fmt::Debug for RunnerClient {  #[axum_macros::debug_handler]  async fn handle_artifact(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClient>)>, headers: HeaderMap, artifact_content: BodyStream) -> impl IntoResponse {      eprintln!("artifact request"); -    let job_token = match headers.get("x-job-token") { -        Some(job_token) => job_token.to_str().expect("valid string"), +    let run_token = match headers.get("x-task-token") { +        Some(run_token) => run_token.to_str().expect("valid string"),          None => { -            eprintln!("bad artifact post: headers: {:?}\nno x-job-token", headers); +            eprintln!("bad artifact post: headers: {:?}\nno x-tasak-token", headers);              return (StatusCode::BAD_REQUEST, "").into_response();          }      }; -    let (job, artifact_path, token_validity) = match ctx.0.job_for_token(&job_token).unwrap() { +    let (run, artifact_path, token_validity) = match ctx.0.run_for_token(&run_token).unwrap() {          Some(result) => result,          None => { -            eprintln!("bad artifact post: headers: {:?}\njob token is not known", headers); +            eprintln!("bad artifact post: headers: {:?}\nrun token is not known", headers);              return (StatusCode::BAD_REQUEST, "").into_response();          }      }; @@ -408,7 +380,7 @@ async fn handle_artifact(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClien          }      }; -    let mut artifact = match ctx.0.reserve_artifact(job, artifact_name, artifact_desc).await { +    let mut artifact = match ctx.0.reserve_artifact(run, artifact_name, artifact_desc).await {          Ok(artifact) => artifact,          Err(err) => {              eprintln!("failure to reserve artifact: {:?}", err); @@ -442,7 +414,7 @@ async fn handle_next_job(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClien              }          }          None => { -            eprintln!("bad artifact post: headers: {:?}\nno x-job-token", headers); +            eprintln!("bad artifact post: headers: {:?}\nno authorization", headers);              return (StatusCode::BAD_REQUEST, "").into_response();          }      }; @@ -533,13 +505,13 @@ async fn main() {      dbctx.create_tables().unwrap();      loop { -        let jobs = dbctx.get_pending_jobs().unwrap(); +        let runs = dbctx.get_pending_runs().unwrap(); -        if jobs.len() > 0 { -            println!("{} new jobs", jobs.len()); +        if runs.len() > 0 { +            println!("{} new runs", runs.len()); -            for job in jobs.into_iter() { -                activate_job(Arc::clone(&dbctx), &job, &mut channel).await; +            for run in runs.into_iter() { +                activate_run(Arc::clone(&dbctx), &run, &mut channel).await;              }          }          tokio::time::sleep(std::time::Duration::from_millis(100)).await; diff --git a/src/ci_runner.rs b/src/ci_runner.rs index 4052ba5..330490c 100644 --- a/src/ci_runner.rs +++ b/src/ci_runner.rs @@ -129,7 +129,7 @@ impl RunningJob {          let (mut sender, body) = hyper::Body::channel();          let resp = self.client.http.post("https://ci.butactuallyin.space:9876/api/artifact")              .header("user-agent", "ci-butactuallyin-space-runner") -            .header("x-job-token", &self.job.build_token) +            .header("x-task-token", &self.job.build_token)              .header("x-artifact-name", name)              .header("x-artifact-desc", desc)              .body(body) diff --git a/src/dbctx.rs b/src/dbctx.rs index dcca362..331e55f 100644 --- a/src/dbctx.rs +++ b/src/dbctx.rs @@ -36,34 +36,51 @@ pub struct Remote {      pub notifier_config_path: String,  } +// a job tracks when we became aware of a commit from remote. typically a job will have a 1-1 +// relationship with commits, and potentially many *runs* of that job.  #[derive(Debug, Clone)]  pub struct Job {      pub id: u64, -    pub artifacts_path: Option<String>, -    pub state: sql::JobState, -    pub run_host: Option<String>,      pub remote_id: u64,      pub commit_id: u64,      pub created_time: u64, +    pub source: Option<String>, +} + +// a run tracks the intent or obligation to have some runner somewhere run a goodfile and report +// results. a job may have many runs from many different hosts rebuliding history, or reruns of the +// same job on the same hardware to collect more datapoints on the operation. +#[derive(Debug, Clone)] +pub struct Run { +    pub id: u64, +    pub job_id: u64, +    pub artifacts_path: Option<String>, +    pub state: sql::RunState, +    pub run_host: Option<String>, +    pub create_time: u64,      pub start_time: Option<u64>,      pub complete_time: Option<u64>,      pub build_token: Option<String>, -    pub job_timeout: Option<u64>, -    pub source: Option<String>, +    pub run_timeout: Option<u64>,      pub build_result: Option<u8>,      pub final_text: Option<String>,  } +impl Run { +    fn into_pending_run(self) -> PendingRun { +        PendingRun { +            id: self.id, +            job_id: self.job_id, +            create_time: self.create_time, +        } +    } +} +  #[derive(Debug, Clone)] -pub struct PendingJob { +pub struct PendingRun {      pub id: u64, -    pub artifacts: Option<String>, -    pub state: sql::JobState, -    pub run_host: Option<String>, -    pub remote_id: u64, -    pub commit_id: u64, -    pub created_time: u64, -    pub source: Option<String>, +    pub job_id: u64, +    pub create_time: u64,  }  #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -76,7 +93,7 @@ pub enum TokenValidity {  #[derive(Debug, Clone, PartialEq, Eq)]  pub struct MetricRecord {      pub id: u64, -    pub job_id: u64, +    pub run_id: u64,      pub name: String,      pub value: String  } @@ -84,7 +101,7 @@ pub struct MetricRecord {  #[derive(Debug, Clone, PartialEq, Eq)]  pub struct ArtifactRecord {      pub id: u64, -    pub job_id: u64, +    pub run_id: u64,      pub name: String,      pub desc: String,      pub created_time: u64, @@ -113,12 +130,12 @@ impl DbCtx {          Ok(())      } -    pub fn insert_metric(&self, job_id: u64, name: &str, value: &str) -> Result<(), String> { +    pub fn insert_metric(&self, run_id: u64, name: &str, value: &str) -> Result<(), String> {          let conn = self.conn.lock().unwrap();          conn              .execute( -                "insert into metrics (job_id, name, value) values (?1, ?2, ?3) on conflict (job_id, name) do update set value=excluded.value", -                (job_id, name, value) +                "insert into metrics (run_id, name, value) values (?1, ?2, ?3) on conflict (run_id, name) do update set value=excluded.value", +                (run_id, name, value)              )              .expect("can upsert");          Ok(()) @@ -163,7 +180,7 @@ impl DbCtx {              })      } -    pub async fn reserve_artifact(&self, job_id: u64, name: &str, desc: &str) -> Result<ArtifactDescriptor, String> { +    pub async fn reserve_artifact(&self, run_id: u64, name: &str, desc: &str) -> Result<ArtifactDescriptor, String> {          let artifact_id = {              let created_time = SystemTime::now()                  .duration_since(UNIX_EPOCH) @@ -172,8 +189,8 @@ impl DbCtx {              let conn = self.conn.lock().unwrap();              conn                  .execute( -                    "insert into artifacts (job_id, name, desc, created_time) values (?1, ?2, ?3, ?4)", -                    (job_id, name, desc, created_time) +                    "insert into artifacts (run_id, name, desc, created_time) values (?1, ?2, ?3, ?4)", +                    (run_id, name, desc, created_time)                  )                  .map_err(|e| {                      format!("{:?}", e) @@ -182,17 +199,17 @@ impl DbCtx {              conn.last_insert_rowid() as u64          }; -        ArtifactDescriptor::new(job_id, artifact_id).await +        ArtifactDescriptor::new(run_id, artifact_id).await      } -    pub fn lookup_artifact(&self, job_id: u64, artifact_id: u64) -> Result<Option<ArtifactRecord>, String> { +    pub fn lookup_artifact(&self, run_id: u64, artifact_id: u64) -> Result<Option<ArtifactRecord>, String> {          let conn = self.conn.lock().unwrap();          conn -            .query_row(sql::ARTIFACT_BY_ID, [artifact_id, job_id], |row| { -                let (id, job_id, name, desc, created_time, completed_time) = row.try_into().unwrap(); +            .query_row(sql::ARTIFACT_BY_ID, [artifact_id, run_id], |row| { +                let (id, run_id, name, desc, created_time, completed_time) = row.try_into().unwrap();                  Ok(ArtifactRecord { -                    id, job_id, name, desc, created_time, completed_time +                    id, run_id, name, desc, created_time, completed_time                  })              })              .optional() @@ -222,11 +239,11 @@ impl DbCtx {              .map_err(|e| e.to_string())      } -    pub fn job_for_token(&self, token: &str) -> Result<Option<(u64, Option<String>, TokenValidity)>, String> { +    pub fn run_for_token(&self, token: &str) -> Result<Option<(u64, Option<String>, TokenValidity)>, String> {          self.conn.lock()              .unwrap()              .query_row( -                "select id, artifacts_path, started_time, job_timeout from jobs where build_token=?1", +                "select id, artifacts_path, started_time, run_timeout from runs where build_token=?1",                  [token],                  |row| {                      let timeout: Option<u64> = row.get(3).unwrap(); @@ -254,6 +271,20 @@ impl DbCtx {              .map_err(|e| e.to_string())      } +    pub fn job_by_id(&self, id: u64) -> Result<Option<Job>, String> { +        self.conn.lock() +            .unwrap() +            .query_row(crate::sql::JOB_BY_ID, [id], |row| { +                let (id, source, created_time, remote_id, commit_id) = row.try_into().unwrap(); + +                Ok(Job { +                    id, source, created_time, remote_id, commit_id +                }) +            }) +            .optional() +            .map_err(|e| e.to_string()) +    } +      pub fn remote_by_id(&self, id: u64) -> Result<Option<Remote>, String> {          self.conn.lock()              .unwrap() @@ -322,40 +353,62 @@ impl DbCtx {          let conn = self.conn.lock().unwrap();          let rows_modified = conn.execute( -            "insert into jobs (state, remote_id, commit_id, created_time, source) values (?1, ?2, ?3, ?4, ?5);", -            (crate::sql::JobState::Pending as u64, remote_id, commit_id, created_time, pusher) +            "insert into jobs (remote_id, commit_id, created_time, source) values (?1, ?2, ?3, ?4, ?5);", +            (remote_id, commit_id, created_time, pusher)          ).unwrap();          assert_eq!(1, rows_modified); -        Ok(conn.last_insert_rowid() as u64) +        let job_id = conn.last_insert_rowid() as u64; + +        Ok(job_id)      } -    pub fn metrics_for_job(&self, job: u64) -> Result<Vec<MetricRecord>, String> { +    pub fn new_run(&self, job_id: u64) -> Result<u64, String> { +        let created_time = SystemTime::now() +            .duration_since(UNIX_EPOCH) +            .expect("now is before epoch") +            .as_millis() as u64; +          let conn = self.conn.lock().unwrap(); -        let mut metrics_query = conn.prepare(sql::METRICS_FOR_JOB).unwrap(); -        let mut result = metrics_query.query([job]).unwrap(); +        let rows_modified = conn.execute( +            "insert into runs (job_id, state, created_time) values (?1, ?2, ?3);", +            (job_id, crate::sql::RunState::Pending as u64, created_time) +        ).unwrap(); + +        assert_eq!(1, rows_modified); + +        let run_id = conn.last_insert_rowid() as u64; + +        Ok(run_id) +    } + +    pub fn metrics_for_run(&self, run: u64) -> Result<Vec<MetricRecord>, String> { +        let conn = self.conn.lock().unwrap(); + +        let mut metrics_query = conn.prepare(sql::METRICS_FOR_RUN).unwrap(); +        let mut result = metrics_query.query([run]).unwrap();          let mut metrics = Vec::new();          while let Some(row) = result.next().unwrap() { -            let (id, job_id, name, value): (u64, u64, String, String) = row.try_into().unwrap(); -            metrics.push(MetricRecord { id, job_id, name, value }); +            let (id, run_id, name, value): (u64, u64, String, String) = row.try_into().unwrap(); +            metrics.push(MetricRecord { id, run_id, name, value });          }          Ok(metrics)      } -    pub fn artifacts_for_job(&self, job: u64, limit: Option<u64>) -> Result<Vec<ArtifactRecord>, String> { +    pub fn artifacts_for_run(&self, run: u64, limit: Option<u64>) -> Result<Vec<ArtifactRecord>, String> {          let conn = self.conn.lock().unwrap(); -        let mut artifacts_query = conn.prepare(sql::LAST_ARTIFACTS_FOR_JOB).unwrap(); -        let mut result = artifacts_query.query([job, limit.unwrap_or(65535)]).unwrap(); +        let mut artifacts_query = conn.prepare(sql::LAST_ARTIFACTS_FOR_RUN).unwrap(); +        let mut result = artifacts_query.query([run, limit.unwrap_or(65535)]).unwrap();          let mut artifacts = Vec::new();          while let Some(row) = result.next().unwrap() { -            let (id, job_id, name, desc, created_time, completed_time): (u64, u64, String, String, u64, Option<u64>) = row.try_into().unwrap(); -            artifacts.push(ArtifactRecord { id, job_id, name, desc, created_time, completed_time }); +            let (id, run_id, name, desc, created_time, completed_time): (u64, u64, String, String, u64, Option<u64>) = row.try_into().unwrap(); +            artifacts.push(ArtifactRecord { id, run_id, name, desc, created_time, completed_time });          }          Ok(artifacts) @@ -403,23 +456,13 @@ impl DbCtx {          conn              .query_row(sql::JOB_BY_COMMIT_ID, [commit_id], |row| { -                let (id, artifacts_path, state, run_host, remote_id, commit_id, created_time, start_time, complete_time, build_token, job_timeout, source, build_result, final_text) = row.try_into().unwrap(); -                let state: u8 = state; +                let (id, source, created_time, remote_id, commit_id) = row.try_into().unwrap();                  Ok(Job {                      id, -                    artifacts_path, -                    state: state.try_into().unwrap(), -                    run_host,                      remote_id,                      commit_id,                      created_time, -                    start_time, -                    complete_time, -                    build_token, -                    job_timeout,                      source, -                    build_result, -                    final_text,                  })              })              .optional() @@ -435,77 +478,43 @@ impl DbCtx {          let mut jobs = Vec::new();          while let Some(row) = result.next().unwrap() { -            let (id, artifacts_path, state, run_host, remote_id, commit_id, created_time, start_time, complete_time, build_token, job_timeout, source, build_result, final_text)= row.try_into().unwrap(); -            let state: u8 = state; +            let (id, source, created_time, remote_id, commit_id) = row.try_into().unwrap();              jobs.push(Job {                  id, -                artifacts_path, -                state: state.try_into().unwrap(), -                run_host,                  remote_id,                  commit_id,                  created_time, -                start_time, -                complete_time, -                build_token, -                job_timeout,                  source, -                build_result, -                final_text,              });          }          Ok(jobs)      } -    pub fn get_active_jobs(&self) -> Result<Vec<Job>, String> { +    pub fn get_active_runs(&self) -> Result<Vec<Run>, String> {          let conn = self.conn.lock().unwrap(); -        let mut started_query = conn.prepare(sql::ACTIVE_JOBS).unwrap(); -        let mut jobs = started_query.query([]).unwrap(); +        let mut started_query = conn.prepare(sql::ACTIVE_RUNS).unwrap(); +        let mut runs = started_query.query([]).unwrap();          let mut started = Vec::new(); -        while let Some(row) = jobs.next().unwrap() { -            let (id, artifacts_path, state, run_host, remote_id, commit_id, created_time, start_time, complete_time, build_token, job_timeout, source, build_result, final_text) = row.try_into().unwrap(); -            let state: u8 = state; - -            started.push(Job { -                id, -                artifacts_path, -                state: state.try_into().unwrap(), -                run_host, -                remote_id, -                commit_id, -                created_time, -                start_time, -                complete_time, -                build_token, -                job_timeout, -                source, -                build_result, -                final_text, -            }); +        while let Some(row) = runs.next().unwrap() { +            started.push(crate::sql::row2run(row));          }          Ok(started)      } -    pub fn get_pending_jobs(&self) -> Result<Vec<PendingJob>, String> { +    pub fn get_pending_runs(&self) -> Result<Vec<PendingRun>, String> {          let conn = self.conn.lock().unwrap(); -        let mut pending_query = conn.prepare(sql::PENDING_JOBS).unwrap(); -        let mut jobs = pending_query.query([]).unwrap(); +        let mut pending_query = conn.prepare(sql::PENDING_RUNS).unwrap(); +        let mut runs = pending_query.query([]).unwrap();          let mut pending = Vec::new(); -        while let Some(row) = jobs.next().unwrap() { -            let (id, artifacts, state, run_host, remote_id, commit_id, created_time, source) = row.try_into().unwrap(); -            let state: u8 = state; -            pending.push(PendingJob { -                id, artifacts, -                state: state.try_into().unwrap(), -                run_host, remote_id, commit_id, created_time, -                source, -            }); +        while let Some(row) = runs.next().unwrap() { +            let run = crate::sql::row2run(row).into_pending_run(); +            pending.push(run);          }          Ok(pending) @@ -526,6 +535,17 @@ impl DbCtx {          Ok(remotes)      } +    pub fn last_run_for_job(&self, job_id: u64) -> Result<Option<Run>, String> { +        let conn = self.conn.lock().unwrap(); + +        conn +            .query_row(sql::LAST_RUN_FOR_JOB, [job_id], |row| { +                Ok(crate::sql::row2run(row)) +            }) +            .optional() +            .map_err(|e| e.to_string()) +    } +      pub fn notifiers_by_repo(&self, repo_id: u64) -> Result<Vec<RemoteNotifier>, String> {          let remotes = self.remotes_by_repo(repo_id)?; diff --git a/src/main.rs b/src/main.rs index 5f719a0..f008a56 100644 --- a/src/main.rs +++ b/src/main.rs @@ -35,9 +35,9 @@ mod sql;  mod notifier;  mod dbctx; -use sql::JobState; +use sql::RunState; -use dbctx::{DbCtx, Job, ArtifactRecord}; +use dbctx::{DbCtx, Job, Run, ArtifactRecord};  use rusqlite::OptionalExtension; @@ -148,14 +148,14 @@ fn job_url(job: &Job, commit_sha: &str, ctx: &Arc<DbCtx>) -> String {      format!("{}/{}", &remote.remote_path, commit_sha)  } -/// render how long a job took, or is taking, in a human-friendly way -fn display_job_time(job: &Job) -> String { -    if let Some(start_time) = job.start_time { -        if let Some(complete_time) = job.complete_time { +/// render how long a run took, or is taking, in a human-friendly way +fn display_run_time(run: &Run) -> String { +    if let Some(start_time) = run.start_time { +        if let Some(complete_time) = run.complete_time {              if complete_time < start_time { -                if job.state == JobState::Started { -                    // this job has been restarted. the completed time is stale. -                    // further, this is a currently active job. +                if run.state == RunState::Started { +                    // this run has been restarted. the completed time is stale. +                    // further, this is a currently active run.                      let now_ms = SystemTime::now().duration_since(UNIX_EPOCH).expect("now is after then").as_millis() as u64;                      let mut duration = duration_as_human_string(now_ms - start_time);                      duration.push_str(" (ongoing)"); @@ -332,21 +332,23 @@ async fn handle_ci_index(State(ctx): State<WebserverState>) -> impl IntoResponse      let mut row_num = 0;      for repo in repos { -        let mut most_recent_job: Option<Job> = None; +        let mut most_recent_run: Option<(Job, Run)> = None;          for remote in ctx.dbctx.remotes_by_repo(repo.id).expect("remotes by repo works") {              let last_job = ctx.dbctx.last_job_from_remote(remote.id).expect("job by remote works");              if let Some(last_job) = last_job { -                if most_recent_job.as_ref().map(|job| job.created_time < last_job.created_time).unwrap_or(true) { -                    most_recent_job = Some(last_job); +                if let Some(last_run) = ctx.dbctx.last_run_for_job(last_job.id).expect("can query") { +                    if most_recent_run.as_ref().map(|run| run.1.create_time < last_run.create_time).unwrap_or(true) { +                        most_recent_run = Some((last_job, last_run)); +                    }                  }              }          }          let repo_html = format!("<a href=\"/{}\">{}</a>", &repo.name, &repo.name); -        let row_html: String = match most_recent_job { -            Some(job) => { +        let row_html: String = match most_recent_run { +            Some((job, run)) => {                  let job_commit = ctx.dbctx.commit_sha(job.commit_id).expect("job has a commit");                  let commit_html = match commit_url(&job, &job_commit, &ctx.dbctx) {                      Some(url) => format!("<a href=\"{}\">{}</a>", url, &job_commit), @@ -355,17 +357,17 @@ async fn handle_ci_index(State(ctx): State<WebserverState>) -> impl IntoResponse                  let job_html = format!("<a href=\"{}\">{}</a>", job_url(&job, &job_commit, &ctx.dbctx), job.id); -                let last_build_time = Utc.timestamp_millis_opt(job.created_time as i64).unwrap().to_rfc2822(); -                let duration = display_job_time(&job); +                let last_build_time = Utc.timestamp_millis_opt(run.create_time as i64).unwrap().to_rfc2822(); +                let duration = display_run_time(&run); -                let status = format!("{:?}", job.state).to_lowercase(); +                let status = format!("{:?}", run.state).to_lowercase(); -                let result = match job.build_result { +                let result = match run.build_result {                      Some(0) => "<span style='color:green;'>pass</span>",                      Some(_) => "<span style='color:red;'>fail</span>", -                    None => match job.state { -                        JobState::Pending => { "unstarted" }, -                        JobState::Started => { "<span style='color:darkgoldenrod;'>in progress</span>" }, +                    None => match run.state { +                        RunState::Pending => { "unstarted" }, +                        RunState::Started => { "<span style='color:darkgoldenrod;'>in progress</span>" },                          _ => { "<span style='color:red;'>unreported</span>" }                      }                  }; @@ -401,10 +403,10 @@ async fn handle_ci_index(State(ctx): State<WebserverState>) -> impl IntoResponse      }      response.push_str("</table>"); -    response.push_str("<h4>active jobs</h4>\n"); +    response.push_str("<h4>active tasks</h4>\n"); -    let jobs = ctx.dbctx.get_active_jobs().expect("can query"); -    if jobs.len() == 0 { +    let runs = ctx.dbctx.get_active_runs().expect("can query"); +    if runs.len() == 0 {          response.push_str("<p>(none)</p>\n");      } else {          response.push_str("<table class='build-table'>"); @@ -417,9 +419,10 @@ async fn handle_ci_index(State(ctx): State<WebserverState>) -> impl IntoResponse          let mut row_num = 0; -        for job in jobs.iter() { +        for run in runs.iter() {              let row_index = row_num % 2; +            let job = ctx.dbctx.job_by_id(run.job_id).expect("query succeeds").expect("job id is valid");              let remote = ctx.dbctx.remote_by_id(job.remote_id).expect("query succeeds").expect("remote id is valid");              let repo = ctx.dbctx.repo_by_id(remote.repo_id).expect("query succeeds").expect("repo id is valid"); @@ -433,17 +436,17 @@ async fn handle_ci_index(State(ctx): State<WebserverState>) -> impl IntoResponse              let job_html = format!("<a href=\"{}\">{}</a>", job_url(&job, &job_commit, &ctx.dbctx), job.id); -            let last_build_time = Utc.timestamp_millis_opt(job.created_time as i64).unwrap().to_rfc2822(); -            let duration = display_job_time(&job); +            let last_build_time = Utc.timestamp_millis_opt(run.create_time as i64).unwrap().to_rfc2822(); +            let duration = display_run_time(&run); -            let status = format!("{:?}", job.state).to_lowercase(); +            let status = format!("{:?}", run.state).to_lowercase(); -            let result = match job.build_result { +            let result = match run.build_result {                  Some(0) => "<span style='color:green;'>pass</span>",                  Some(_) => "<span style='color:red;'>fail</span>", -                None => match job.state { -                    JobState::Pending => { "unstarted" }, -                    JobState::Started => { "<span style='color:darkgoldenrod;'>in progress</span>" }, +                None => match run.state { +                    RunState::Pending => { "unstarted" }, +                    RunState::Started => { "<span style='color:darkgoldenrod;'>in progress</span>" },                      _ => { "<span style='color:red;'>unreported</span>" }                  }              }; @@ -498,9 +501,11 @@ async fn handle_commit_status(Path(path): Path<(String, String, String)>, State(      let job = ctx.dbctx.job_by_commit_id(commit_id).expect("can query").expect("job exists"); -    let complete_time = job.complete_time.unwrap_or_else(crate::io::now_ms); +    let run = ctx.dbctx.last_run_for_job(job.id).expect("can query").expect("run exists"); + +    let complete_time = run.complete_time.unwrap_or_else(crate::io::now_ms); -    let debug_info = job.state == JobState::Finished && job.build_result == Some(1) || job.state == JobState::Error; +    let debug_info = run.state == RunState::Finished && run.build_result == Some(1) || run.state == RunState::Error;      let repo_name: String = ctx.dbctx.conn.lock().unwrap()          .query_row("select repo_name from repos where id=?1;", [repo_id], |row| row.get(0)) @@ -511,42 +516,42 @@ async fn handle_commit_status(Path(path): Path<(String, String, String)>, State(      let head = format!("<head><title>ci.butactuallyin.space - {}</title></head>", repo_name);      let repo_html = format!("<a href=\"/{}\">{}</a>", &repo_name, &repo_name);      let remote_commit_elem = format!("<a href=\"https://www.github.com/{}/commit/{}\">{}</a>", &remote_path, &sha, &sha); -    let status_elem = match job.state { -        JobState::Pending | JobState::Started => { +    let status_elem = match run.state { +        RunState::Pending | RunState::Started => {              "<span style='color:#660;'>pending</span>"          }, -        JobState::Finished => { -            if let Some(build_result) = job.build_result { +        RunState::Finished => { +            if let Some(build_result) = run.build_result {                  if build_result == 0 {                      "<span style='color:green;'>pass</span>"                  } else {                      "<span style='color:red;'>failed</span>"                  }              } else { -                eprintln!("job {} for commit {} is missing a build result but is reportedly finished (old data)?", job.id, commit_id); +                eprintln!("run {} for commit {} is missing a build result but is reportedly finished (old data)?", run.id, commit_id);                  "<span style='color:red;'>unreported</span>"              }          }, -        JobState::Error => { +        RunState::Error => {              "<span style='color:red;'>error</span>"          } -        JobState::Invalid => { +        RunState::Invalid => {              "<span style='color:red;'>(server error)</span>"          }      };      let mut artifacts_fragment = String::new(); -    let mut artifacts: Vec<ArtifactRecord> = ctx.dbctx.artifacts_for_job(job.id, None).unwrap() -        .into_iter() // HACK: filter out artifacts for previous runs of a job. artifacts should be attached to a run, runs should be distinct from jobs. but i'm sleepy. -        .filter(|artifact| artifact.created_time >= job.start_time.unwrap_or_else(crate::io::now_ms)) +    let mut artifacts: Vec<ArtifactRecord> = ctx.dbctx.artifacts_for_run(run.id, None).unwrap() +        .into_iter() // HACK: filter out artifacts for previous runs of a run. artifacts should be attached to a run, runs should be distinct from run. but i'm sleepy. +        .filter(|artifact| artifact.created_time >= run.start_time.unwrap_or_else(crate::io::now_ms))          .collect();      artifacts.sort_by_key(|artifact| artifact.created_time); -    fn diff_times(job_completed: u64, artifact_completed: Option<u64>) -> u64 { +    fn diff_times(run_completed: u64, artifact_completed: Option<u64>) -> u64 {          let artifact_completed = artifact_completed.unwrap_or_else(crate::io::now_ms); -        let job_completed = std::cmp::max(job_completed, artifact_completed); -        job_completed - artifact_completed +        let run_completed = std::cmp::max(run_completed, artifact_completed); +        run_completed - artifact_completed      }      let recent_artifacts: Vec<ArtifactRecord> = artifacts.iter().filter(|artifact| diff_times(complete_time, artifact.completed_time) <= 60_000).cloned().collect(); @@ -556,7 +561,7 @@ async fn handle_commit_status(Path(path): Path<(String, String, String)>, State(          let created_time_str = Utc.timestamp_millis_opt(artifact.created_time as i64).unwrap().to_rfc2822();          artifacts_fragment.push_str(&format!("<div><pre style='display:inline;'>{}</pre> step: <pre style='display:inline;'>{}</pre></div>\n", created_time_str, &artifact.name));          let duration_str = duration_as_human_string(artifact.completed_time.unwrap_or_else(crate::io::now_ms) - artifact.created_time); -        let size_str = (std::fs::metadata(&format!("./jobs/{}/{}", artifact.job_id, artifact.id)).expect("metadata exists").len() / 1024).to_string(); +        let size_str = (std::fs::metadata(&format!("./artifacts/{}/{}", artifact.run_id, artifact.id)).expect("metadata exists").len() / 1024).to_string();          artifacts_fragment.push_str(&format!("<pre>  {}kb in {} </pre>\n", size_str, duration_str));      } @@ -565,18 +570,18 @@ async fn handle_commit_status(Path(path): Path<(String, String, String)>, State(          artifacts_fragment.push_str(&format!("<div><pre style='display:inline;'>{}</pre> step: <pre style='display:inline;'>{}</pre></div>\n", created_time_str, &artifact.name));          if debug_info {              artifacts_fragment.push_str("<pre>"); -            artifacts_fragment.push_str(&std::fs::read_to_string(format!("./jobs/{}/{}", artifact.job_id, artifact.id)).unwrap()); +            artifacts_fragment.push_str(&std::fs::read_to_string(format!("./artifacts/{}/{}", artifact.run_id, artifact.id)).unwrap());              artifacts_fragment.push_str("</pre>\n");          } else {              let duration_str = duration_as_human_string(artifact.completed_time.unwrap_or_else(crate::io::now_ms) - artifact.created_time); -            let size_str = std::fs::metadata(&format!("./jobs/{}/{}", artifact.job_id, artifact.id)).map(|md| { +            let size_str = std::fs::metadata(&format!("./artifacts/{}/{}", artifact.run_id, artifact.id)).map(|md| {                  (md.len() / 1024).to_string()              }).unwrap_or_else(|e| format!("[{}]", e));              artifacts_fragment.push_str(&format!("<pre>  {}kb in {} </pre>\n", size_str, duration_str));          }      } -    let metrics = ctx.dbctx.metrics_for_job(job.id).unwrap(); +    let metrics = ctx.dbctx.metrics_for_run(run.id).unwrap();      let metrics_section = if metrics.len() > 0 {          let mut section = String::new();          section.push_str("<div>"); @@ -599,9 +604,9 @@ async fn handle_commit_status(Path(path): Path<(String, String, String)>, State(      html.push_str("  <body>\n");      html.push_str("    <pre>\n");      html.push_str(&format!("repo: {}\n", repo_html)); -    html.push_str(&format!("commit: {}, job: {}\n", remote_commit_elem, job.id)); -    html.push_str(&format!("status: {} in {}\n", status_elem, display_job_time(&job))); -    if let Some(desc) = job.final_text.as_ref() { +    html.push_str(&format!("commit: {}, run: {}\n", remote_commit_elem, run.id)); +    html.push_str(&format!("status: {} in {}\n", status_elem, display_run_time(&run))); +    if let Some(desc) = run.final_text.as_ref() {          html.push_str(&format!("  description: {}\n  ", desc));      }      html.push_str(&format!("deployed: {}\n", deployed)); @@ -620,11 +625,11 @@ async fn handle_commit_status(Path(path): Path<(String, String, String)>, State(  }  async fn handle_get_artifact(Path(path): Path<(String, String)>, State(ctx): State<WebserverState>) -> impl IntoResponse { -    eprintln!("get artifact, job={}, artifact={}", path.0, path.1); -    let job_id: u64 = path.0.parse().unwrap(); +    eprintln!("get artifact, run={}, artifact={}", path.0, path.1); +    let run: u64 = path.0.parse().unwrap();      let artifact_id: u64 = path.1.parse().unwrap(); -    let artifact_descriptor = match ctx.dbctx.lookup_artifact(job_id, artifact_id).unwrap() { +    let artifact_descriptor = match ctx.dbctx.lookup_artifact(run, artifact_id).unwrap() {          Some(artifact) => artifact,          None => {              return (StatusCode::NOT_FOUND, Html("no such artifact")).into_response(); @@ -645,7 +650,7 @@ async fn handle_get_artifact(Path(path): Path<(String, String)>, State(ctx): Sta          let (mut tx_sender, tx_receiver) = tokio::io::duplex(65536);          let resp_body = axum_extra::body::AsyncReadBody::new(tx_receiver);          let mut artifact_path = ctx.jobs_path.clone(); -        artifact_path.push(artifact_descriptor.job_id.to_string()); +        artifact_path.push(artifact_descriptor.run_id.to_string());          artifact_path.push(artifact_descriptor.id.to_string());          spawn(async move {              let mut artifact = artifact_descriptor; @@ -661,7 +666,7 @@ async fn handle_get_artifact(Path(path): Path<(String, String)>, State(ctx): Sta                          // this would be much implemented as yielding on a condvar woken when an                          // inotify event on the file indicates a write has occurred. but i am                          // dreadfully lazy, so we'll just uhh. busy-poll on the file? lmao. -                        artifact = ctx.dbctx.lookup_artifact(artifact.job_id, artifact.id) +                        artifact = ctx.dbctx.lookup_artifact(artifact.run_id, artifact.id)                              .expect("can query db")                              .expect("artifact still exists");                      } @@ -724,6 +729,7 @@ async fn handle_repo_summary(Path(path): Path<String>, State(ctx): State<Webserv      let mut row_num = 0;      for job in last_builds.iter().take(10) { +        let run = ctx.dbctx.last_run_for_job(job.id).expect("query succeeds").expect("TODO: run exists if job exists (small race if querying while creating job ....");          let job_commit = ctx.dbctx.commit_sha(job.commit_id).expect("job has a commit");          let commit_html = match commit_url(&job, &job_commit, &ctx.dbctx) {              Some(url) => format!("<a href=\"{}\">{}</a>", url, &job_commit), @@ -732,17 +738,17 @@ async fn handle_repo_summary(Path(path): Path<String>, State(ctx): State<Webserv          let job_html = format!("<a href=\"{}\">{}</a>", job_url(&job, &job_commit, &ctx.dbctx), job.id); -        let last_build_time = Utc.timestamp_millis_opt(job.created_time as i64).unwrap().to_rfc2822(); -        let duration = display_job_time(&job); +        let last_build_time = Utc.timestamp_millis_opt(run.create_time as i64).unwrap().to_rfc2822(); +        let duration = display_run_time(&run); -        let status = format!("{:?}", job.state).to_lowercase(); +        let status = format!("{:?}", run.state).to_lowercase(); -        let result = match job.build_result { +        let result = match run.build_result {              Some(0) => "<span style='color:green;'>pass</span>",              Some(_) => "<span style='color:red;'>fail</span>", -            None => match job.state { -                JobState::Pending => { "unstarted" }, -                JobState::Started => { "<span style='color:darkgoldenrod;'>in progress</span>" }, +            None => match run.state { +                RunState::Pending => { "unstarted" }, +                RunState::Started => { "<span style='color:darkgoldenrod;'>in progress</span>" },                  _ => { "<span style='color:red;'>unreported</span>" }              }          }; @@ -885,7 +891,7 @@ async fn make_app_server(jobs_path: PathBuf, cfg_path: &PathBuf, db_path: &PathB          .route("/:owner/:repo/:sha", get(handle_commit_status))          .route("/:owner", get(handle_repo_summary))          .route("/:owner/:repo", post(handle_repo_event)) -        .route("/artifact/:job/:artifact_id", get(handle_get_artifact)) +        .route("/artifact/:b/:artifact_id", get(handle_get_artifact))          .route("/", get(handle_ci_index))          .fallback(fallback_get)          .with_state(WebserverState { @@ -2,6 +2,9 @@  use std::convert::TryFrom; +use crate::dbctx::Run; +use crate::dbctx::Job; +  #[derive(Debug, Clone)]  pub enum JobResult {      Pass = 0, @@ -9,7 +12,7 @@ pub enum JobResult {  }  #[derive(Debug, Clone, PartialEq)] -pub enum JobState { +pub enum RunState {      Pending = 0,      Started = 1,      Finished = 2, @@ -17,38 +20,48 @@ pub enum JobState {      Invalid = 4,  } -impl TryFrom<u8> for JobState { +impl TryFrom<u8> for RunState {      type Error = String;      fn try_from(value: u8) -> Result<Self, String> {          match value { -            0 => Ok(JobState::Pending), -            1 => Ok(JobState::Started), -            2 => Ok(JobState::Finished), -            3 => Ok(JobState::Error), -            4 => Ok(JobState::Invalid), +            0 => Ok(RunState::Pending), +            1 => Ok(RunState::Started), +            2 => Ok(RunState::Finished), +            3 => Ok(RunState::Error), +            4 => Ok(RunState::Invalid),              other => Err(format!("invalid job state: {}", other)),          }      }  } +pub(crate) fn row2run(row: &rusqlite::Row) -> Run { +    let (id, job_id, artifacts_path, state, run_host, build_token, create_time, start_time, complete_time, run_timeout, build_result, final_text) = row.try_into().unwrap(); +    let state: u8 = state; +    Run { +        id, +        job_id, +        artifacts_path, +        state: state.try_into().unwrap(), +        run_host, +        create_time, +        start_time, +        complete_time, +        build_token, +        run_timeout, +        build_result, +        final_text, +    } +} +  // remote_id is the remote from which we were notified. this is necessary so we know which remote  // to pull from to actually run the job.  pub const CREATE_JOBS_TABLE: &'static str = "\      CREATE TABLE IF NOT EXISTS jobs (id INTEGER PRIMARY KEY AUTOINCREMENT, -        artifacts_path TEXT, -        state INTEGER NOT NULL, -        run_host TEXT, -        build_token TEXT, -        remote_id INTEGER, -        commit_id INTEGER, -        created_time INTEGER, -        started_time INTEGER, -        complete_time INTEGER, -        job_timeout INTEGER,          source TEXT, -        build_result INTEGER, -        final_status TEXT);"; +        created_time INTEGER, +        remote_id INTEGER, +        commit_id INTEGER);";  pub const CREATE_METRICS_TABLE: &'static str = "\      CREATE TABLE IF NOT EXISTS metrics (id INTEGER PRIMARY KEY AUTOINCREMENT, @@ -82,35 +95,63 @@ pub const CREATE_REMOTES_TABLE: &'static str = "\  pub const CREATE_ARTIFACTS_TABLE: &'static str = "\      CREATE TABLE IF NOT EXISTS artifacts (id INTEGER PRIMARY KEY AUTOINCREMENT, -        job_id INTEGER, +        run_id INTEGER,          name TEXT,          desc TEXT,          created_time INTEGER,          completed_time INTEGER);"; +pub const CREATE_RUN_TABLE: &'static str = "\ +    CREATE TABLE IF NOT EXISTS runs (id INTEGER PRIMARY KEY AUTOINCREMENT, +        job_id INTEGER, +        artifacts_path TEXT, +        state INTEGER NOT NULL, +        run_host TEXT, +        build_token TEXT, +        created_time INTEGER, +        started_time INTEGER, +        complete_time INTEGER, +        run_timeout INTEGER, +        build_result INTEGER, +        final_status TEXT);"; +  pub const CREATE_REMOTES_INDEX: &'static str = "\      CREATE INDEX IF NOT EXISTS 'repo_to_remote' ON remotes(repo_id);";  pub const CREATE_REPO_NAME_INDEX: &'static str = "\      CREATE UNIQUE INDEX IF NOT EXISTS 'repo_names' ON repos(repo_name);"; -pub const PENDING_JOBS: &'static str = "\ -    select id, artifacts_path, state, run_host, remote_id, commit_id, created_time, source from jobs where state=0;"; - -pub const ACTIVE_JOBS: &'static str = "\ -    select * from jobs where state=1 or state=0;"; - -pub const LAST_ARTIFACTS_FOR_JOB: &'static str = "\ -    select * from artifacts where job_id=?1 and (name like \"%(stderr)%\" or name like \"%(stdout)%\") order by id desc limit ?2;"; +pub const PENDING_RUNS: &'static str = "\ +    select id, job_id, artifacts_path, state, run_host, created_time from runs where state=0;"; + +pub const ACTIVE_RUNS: &'static str = "\ +    select id, +        job_id, +        artifacts_path, +        state, +        run_host, +        build_token, +        created_time, +        started_time, +        complete_time, +        run_timeout, +        build_result, +        final_status from runs where state=1 or state=0;"; + +pub const LAST_ARTIFACTS_FOR_RUN: &'static str = "\ +    select * from artifacts where run_id=?1 and (name like \"%(stderr)%\" or name like \"%(stdout)%\") order by id desc limit ?2;";  pub const JOB_BY_COMMIT_ID: &'static str = "\ -    select * from jobs where commit_id=?1;"; +    select id, source, created_time, remote_id, commit_id from jobs where commit_id=?1;";  pub const ARTIFACT_BY_ID: &'static str = "\ -    select * from artifacts where id=?1 and job_id=?2;"; +    select * from artifacts where id=?1 and run_id=?2;"; -pub const METRICS_FOR_JOB: &'static str = "\ -    select * from metrics where job_id=?1 order by id asc;"; +pub const JOB_BY_ID: &'static str = "\ +    select id, source, created_time, remote_id, commit_id from jobs where id=?1"; + +pub const METRICS_FOR_RUN: &'static str = "\ +    select * from metrics where run_id=?1 order by id asc;";  pub const COMMIT_TO_ID: &'static str = "\      select id from commits where sha=?1;"; @@ -122,5 +163,23 @@ pub const ALL_REPOS: &'static str = "\      select * from repos;";  pub const LAST_JOBS_FROM_REMOTE: &'static str = "\ -    select * from jobs where remote_id=?1 order by created_time desc limit ?2;"; - +    select id, source, created_time, remote_id, commit_id from jobs where remote_id=?1 order by created_time desc limit ?2;"; + +pub const LAST_RUN_FOR_JOB: &'static str = "\ +    select id, +        job_id, +        artifacts_path, +        state, +        run_host, +        build_token, +        created_time, +        started_time, +        complete_time, +        run_timeout, +        build_result, +        final_status from runs where job_id=?1;"; + +pub const SELECT_ALL_RUNS_WITH_JOB_INFO: &'static str = "\ +    select jobs.id as job_id, runs.id as run_id, runs.state, runs.created_time, jobs.commit_id +    from jobs join runs on jobs.id=runs.job_id +    oder by runs.created_time asc;"; | 
