From ec5a274436bc8dda0b55d2c4da1411ff3c52434d Mon Sep 17 00:00:00 2001 From: iximeow Date: Sat, 1 Jul 2023 14:08:00 -0700 Subject: add a notion of runs distinct from jobs, lets see how well this goes over --- src/ci_driver.rs | 124 +++++++++++++++++++++---------------------------------- 1 file changed, 48 insertions(+), 76 deletions(-) (limited to 'src/ci_driver.rs') diff --git a/src/ci_driver.rs b/src/ci_driver.rs index 0f629e7..25928ae 100644 --- a/src/ci_driver.rs +++ b/src/ci_driver.rs @@ -28,17 +28,17 @@ mod sql; mod notifier; mod io; -use crate::dbctx::{DbCtx, PendingJob}; +use crate::dbctx::{DbCtx, PendingRun, Job}; use crate::sql::JobResult; -use crate::sql::JobState; +use crate::sql::RunState; lazy_static! { static ref AUTH_SECRET: RwLock> = RwLock::new(None); } -fn reserve_artifacts_dir(job: u64) -> std::io::Result { - let mut path: PathBuf = "/root/ixi_ci_server/jobs/".into(); - path.push(job.to_string()); +fn reserve_artifacts_dir(run: u64) -> std::io::Result { + let mut path: PathBuf = "/root/ixi_ci_server/artifacts/".into(); + path.push(run.to_string()); match std::fs::create_dir(&path) { Ok(()) => { Ok(path) @@ -53,9 +53,12 @@ fn reserve_artifacts_dir(job: u64) -> std::io::Result { } } -async fn activate_job(dbctx: Arc, job: &PendingJob, clients: &mut mpsc::Receiver) -> Result<(), String> { - eprintln!("activating job {:?}", job); +async fn activate_run(dbctx: Arc, run: &PendingRun, clients: &mut mpsc::Receiver) -> Result<(), String> { + eprintln!("activating task {:?}", run); + let job = dbctx.job_by_id(run.job_id).expect("can query").expect("job exists"); + let connection = dbctx.conn.lock().unwrap(); + let (repo_id, remote_git_url): (u64, String) = connection .query_row("select repo_id, remote_git_url from remotes where id=?1", [job.remote_id], |row| Ok((row.get_unwrap(0), row.get_unwrap(1)))) .expect("query succeeds"); @@ -73,52 +76,20 @@ async fn activate_job(dbctx: Arc, job: &PendingJob, clients: &mut mpsc::R .expect("can run query"); std::mem::drop(connection); - let artifacts: PathBuf = match &job.artifacts { - Some(artifacts) => PathBuf::from(artifacts), - None => reserve_artifacts_dir(job.id).expect("can reserve a directory for artifacts") - }; - /* - - if job.run_host.as_ref() == None { - eprintln!("need to find a host to run the job"); - } - - eprintln!("cloning {}", remote_git_url); - let mut repo_dir = artifacts.clone(); - repo_dir.push("repo"); - eprintln!(" ... into {}", repo_dir.display()); - - Command::new("git") - .arg("clone") - .arg(&remote_git_url) - .arg(&format!("{}", repo_dir.display())) - .status() - .expect("can clone the repo"); - - eprintln!("checking out {}", commit_sha); - Command::new("git") - .current_dir(&repo_dir) - .arg("checkout") - .arg(&commit_sha) - .status() - .expect("can checkout hash"); - */ + let artifacts: PathBuf = reserve_artifacts_dir(run.id).expect("can reserve a directory for artifacts"); eprintln!("running {}", repo_name); - /* - * find the CI script, figure out how to run it - */ let mut client_job = loop { let mut candidate = clients.recv().await .ok_or_else(|| "client channel disconnected".to_string())?; - if !candidate.will_accept(job) { + if !candidate.will_accept(&job) { eprintln!("client {:?} would not accept job {:?}", candidate, job); continue; } - if let Ok(Some(mut client_job)) = candidate.submit(&dbctx, &job, &remote_git_url, &commit_sha).await { + if let Ok(Some(mut client_job)) = candidate.submit(&dbctx, &run, &remote_git_url, &commit_sha).await { break client_job; } else { // failed to submit job, move on for now @@ -168,8 +139,8 @@ struct ClientJob { dbctx: Arc, remote_git_url: String, sha: String, - job: PendingJob, - client: RunnerClient + task: PendingRun, + client: RunnerClient } impl ClientJob { @@ -179,49 +150,50 @@ impl ClientJob { let msg = match self.client.recv().await.expect("recv works") { Some(msg) => msg, None => { - eprintln!("client hung up. job's done, i hope?"); + eprintln!("client hung up. task's done, i hope?"); return; } }; eprintln!("got {:?}", msg); let msg_kind = msg.as_object().unwrap().get("kind").unwrap().as_str().unwrap(); match msg_kind { - "new_job_please" => { - eprintln!("misdirected job request (after handshake?)"); + "new_task_please" => { + eprintln!("misdirected task request (after handshake?)"); return; }, - "job_status" => { + "task_status" => { let state = msg.as_object().unwrap().get("state").unwrap().as_str().unwrap(); - let (result, state): (Result, JobState) = if state == "finished" { + let (result, state): (Result, RunState) = if state == "finished" { let result = msg.as_object().unwrap().get("result").unwrap().as_str().unwrap(); - eprintln!("job update: state is {} and result is {}", state, result); + eprintln!("task update: state is {} and result is {}", state, result); match result { "pass" => { - (Ok("success".to_string()), JobState::Finished) + (Ok("success".to_string()), RunState::Finished) }, other => { let desc = msg.as_object().unwrap().get("desc") .map(|x| x.as_str().unwrap().to_string()) .unwrap_or_else(|| other.to_string()); - (Err(desc), JobState::Error) + (Err(desc), RunState::Error) } } } else if state == "interrupted" { let result = msg.as_object().unwrap().get("result").unwrap().as_str().unwrap(); - eprintln!("job update: state is {} and result is {}", state, result); + eprintln!("task update: state is {} and result is {}", state, result); let desc = msg.as_object().unwrap().get("desc") .map(|x| x.as_str().unwrap().to_string()) .unwrap_or_else(|| result.to_string()); - (Err(desc), JobState::Error) + (Err(desc), RunState::Error) } else { - eprintln!("job update: state is {}", state); - (Err(format!("atypical completion status: {}", state)), JobState::Invalid) + eprintln!("task update: state is {}", state); + (Err(format!("atypical completion status: {}", state)), RunState::Invalid) }; - let repo_id = self.dbctx.repo_id_by_remote(self.job.remote_id).unwrap().expect("remote exists"); + let job = self.dbctx.job_by_id(self.task.job_id).expect("can query").expect("job exists"); + let repo_id = self.dbctx.repo_id_by_remote(job.remote_id).unwrap().expect("remote exists"); for notifier in self.dbctx.notifiers_by_repo(repo_id).expect("can get notifiers") { - if let Err(e) = notifier.tell_complete_job(&self.dbctx, repo_id, &self.sha, self.job.id, result.clone()).await { + if let Err(e) = notifier.tell_complete_job(&self.dbctx, repo_id, &self.sha, self.task.id, result.clone()).await { eprintln!("could not notify {:?}: {:?}", notifier.remote_path, e); } } @@ -242,8 +214,8 @@ impl ClientJob { }; self.dbctx.conn.lock().unwrap().execute( - "update jobs set complete_time=?1, state=?2, build_result=?3, final_status=?4 where id=?5", - (now as u64, state as u64, build_result as u8, result_desc, self.job.id) + "update runs set complete_time=?1, state=?2, build_result=?3, final_status=?4 where id=?5", + (now as u64, state as u64, build_result as u8, result_desc, self.task.id) ) .expect("can update"); } @@ -258,7 +230,7 @@ impl ClientJob { let name = msg.as_object().unwrap().get("name").unwrap().as_str().unwrap(); let value = msg.as_object().unwrap().get("value").unwrap().as_str().unwrap(); - self.dbctx.insert_metric(self.job.id, name, value) + self.dbctx.insert_metric(self.task.id, name, value) .expect("TODO handle metric insert error?"); } "command" => { @@ -311,7 +283,7 @@ impl RunnerClient { } // is this client willing to run the job based on what it has told us so far? - fn will_accept(&self, job: &PendingJob) -> bool { + fn will_accept(&self, job: &Job) -> bool { match (job.source.as_ref(), self.accepted_sources.as_ref()) { (_, None) => true, (None, Some(_)) => false, @@ -321,7 +293,7 @@ impl RunnerClient { } } - async fn submit(mut self, dbctx: &Arc, job: &PendingJob, remote_git_url: &str, sha: &str) -> Result, String> { + async fn submit(mut self, dbctx: &Arc, job: &PendingRun, remote_git_url: &str, sha: &str) -> Result, String> { self.send(serde_json::json!({ "commit": sha, "remote_url": remote_git_url, @@ -334,7 +306,7 @@ impl RunnerClient { }) { eprintln!("resp: {:?}", resp); Ok(Some(ClientJob { - job: job.clone(), + task: job.clone(), dbctx: Arc::clone(dbctx), sha: sha.to_string(), remote_git_url: remote_git_url.to_string(), @@ -364,18 +336,18 @@ impl fmt::Debug for RunnerClient { #[axum_macros::debug_handler] async fn handle_artifact(State(ctx): State<(Arc, mpsc::Sender)>, headers: HeaderMap, artifact_content: BodyStream) -> impl IntoResponse { eprintln!("artifact request"); - let job_token = match headers.get("x-job-token") { - Some(job_token) => job_token.to_str().expect("valid string"), + let run_token = match headers.get("x-task-token") { + Some(run_token) => run_token.to_str().expect("valid string"), None => { - eprintln!("bad artifact post: headers: {:?}\nno x-job-token", headers); + eprintln!("bad artifact post: headers: {:?}\nno x-tasak-token", headers); return (StatusCode::BAD_REQUEST, "").into_response(); } }; - let (job, artifact_path, token_validity) = match ctx.0.job_for_token(&job_token).unwrap() { + let (run, artifact_path, token_validity) = match ctx.0.run_for_token(&run_token).unwrap() { Some(result) => result, None => { - eprintln!("bad artifact post: headers: {:?}\njob token is not known", headers); + eprintln!("bad artifact post: headers: {:?}\nrun token is not known", headers); return (StatusCode::BAD_REQUEST, "").into_response(); } }; @@ -408,7 +380,7 @@ async fn handle_artifact(State(ctx): State<(Arc, mpsc::Sender artifact, Err(err) => { eprintln!("failure to reserve artifact: {:?}", err); @@ -442,7 +414,7 @@ async fn handle_next_job(State(ctx): State<(Arc, mpsc::Sender { - eprintln!("bad artifact post: headers: {:?}\nno x-job-token", headers); + eprintln!("bad artifact post: headers: {:?}\nno authorization", headers); return (StatusCode::BAD_REQUEST, "").into_response(); } }; @@ -533,13 +505,13 @@ async fn main() { dbctx.create_tables().unwrap(); loop { - let jobs = dbctx.get_pending_jobs().unwrap(); + let runs = dbctx.get_pending_runs().unwrap(); - if jobs.len() > 0 { - println!("{} new jobs", jobs.len()); + if runs.len() > 0 { + println!("{} new runs", runs.len()); - for job in jobs.into_iter() { - activate_job(Arc::clone(&dbctx), &job, &mut channel).await; + for run in runs.into_iter() { + activate_run(Arc::clone(&dbctx), &run, &mut channel).await; } } tokio::time::sleep(std::time::Duration::from_millis(100)).await; -- cgit v1.1