From 4a213e872395f9b0562c113bb7303815a1d26a57 Mon Sep 17 00:00:00 2001 From: iximeow Date: Thu, 22 Dec 2022 18:29:26 +0000 Subject: draw almost all of the owl --- src/ci_driver.rs | 310 +++++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 280 insertions(+), 30 deletions(-) (limited to 'src/ci_driver.rs') diff --git a/src/ci_driver.rs b/src/ci_driver.rs index fd6f813..96e0d44 100644 --- a/src/ci_driver.rs +++ b/src/ci_driver.rs @@ -1,10 +1,29 @@ -use rusqlite::Connection; use std::process::Command; +use futures_util::StreamExt; +use std::fmt; use std::path::{Path, PathBuf}; +use tokio::spawn; +use tokio_stream::wrappers::ReceiverStream; +use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; +use axum_server::tls_rustls::RustlsConfig; +use axum::body::StreamBody; +use axum::http::{StatusCode}; +use axum::Router; +use axum::routing::*; +use axum::extract::State; +use axum::extract::BodyStream; +use axum::response::IntoResponse; +use tokio::sync::mpsc; +use tokio::sync::mpsc::error::TrySendError; +use serde_json::json; +mod dbctx; mod sql; +mod notifier; -use std::time::{SystemTime, UNIX_EPOCH}; +use crate::dbctx::{DbCtx, PendingJob}; +use crate::sql::JobState; fn reserve_artifacts_dir(job: u64) -> std::io::Result { let mut path: PathBuf = "/root/ixi_ci_server/jobs/".into(); @@ -13,33 +32,43 @@ fn reserve_artifacts_dir(job: u64) -> std::io::Result { Ok(path) } -fn activate_job(connection: &mut Connection, job: u64, artifacts: Option, state: u8, run_host: Option, commit_id: u64, repo_url: String, repo_name: String) { +async fn activate_job(dbctx: Arc, job: &PendingJob, clients: &mut mpsc::Receiver) -> Result<(), String> { + let connection = dbctx.conn.lock().unwrap(); + let (repo_id, remote_git_url): (u64, String) = connection + .query_row("select repo_id, remote_git_url from remotes where id=?1", [job.remote_id], |row| Ok((row.get_unwrap(0), row.get_unwrap(1)))) + .expect("query succeeds"); + let repo_name: String = connection + .query_row("select repo_name from repos where id=?1", [repo_id], |row| row.get(0)) + .expect("query succeeds"); + let now = SystemTime::now() .duration_since(UNIX_EPOCH) .expect("now is before epoch") .as_millis(); let commit_sha: String = connection - .query_row("select sha from commits where id=?1", [commit_id], |row| row.get(0)) + .query_row("select sha from commits where id=?1", [job.commit_id], |row| row.get(0)) .expect("can run query"); - let artifacts: PathBuf = match artifacts { + let artifacts = PathBuf::from("/tmp"); + /* + let artifacts: PathBuf = match &job.artifacts { Some(artifacts) => PathBuf::from(artifacts), - None => reserve_artifacts_dir(job).expect("can reserve a directory for artifacts") + None => reserve_artifacts_dir(job.id).expect("can reserve a directory for artifacts") }; - if run_host == None { + if job.run_host.as_ref() == None { eprintln!("need to find a host to run the job"); } - eprintln!("cloning {}", repo_url); + eprintln!("cloning {}", remote_git_url); let mut repo_dir = artifacts.clone(); repo_dir.push("repo"); eprintln!(" ... into {}", repo_dir.display()); Command::new("git") .arg("clone") - .arg(repo_url) + .arg(&remote_git_url) .arg(&format!("{}", repo_dir.display())) .status() .expect("can clone the repo"); @@ -48,44 +77,265 @@ fn activate_job(connection: &mut Connection, job: u64, artifacts: Option Command::new("git") .current_dir(&repo_dir) .arg("checkout") - .arg(commit_sha) + .arg(&commit_sha) .status() .expect("can checkout hash"); + */ eprintln!("running {}", repo_name); /* * find the CI script, figure out how to run it */ + let mut client_job = loop { + let mut candidate = clients.recv().await + .ok_or_else(|| "client channel disconnected".to_string())?; + + if let Ok(Some(mut client_job)) = candidate.submit(&dbctx, &job, &remote_git_url, &commit_sha).await { + break client_job; + } else { + // failed to submit job, move on for now + } + }; + + let run_host = client_job.client.name.clone(); + connection.execute( "update jobs set started_time=?1, run_host=?2, state=1, artifacts_path=?3 where id=?4", - (now as u64, "test host".to_string(), format!("{}", artifacts.display()), job) + (now as u64, run_host, format!("{}", artifacts.display()), job.id) ) .expect("can update"); + + spawn(async move { + client_job.run().await + }); + + Ok(()) } -fn main() { - let mut connection = Connection::open("/root/ixi_ci_server/state.db").unwrap(); - connection.execute(sql::CREATE_JOBS_TABLE, ()).unwrap(); - connection.execute(sql::CREATE_COMMITS_TABLE, ()).unwrap(); - connection.execute(sql::CREATE_REPOS_TABLE, ()).unwrap(); - connection.execute(sql::CREATE_REMOTES_TABLE, ()).unwrap(); +struct RunnerClient { + tx: mpsc::Sender>, + rx: BodyStream, + name: String, + build_token: String, +} - loop { - let mut pending_query = connection.prepare(sql::PENDING_JOBS).unwrap(); - let mut jobs = pending_query.query([]).unwrap(); - let mut to_start = Vec::new(); - while let Some(row) = jobs.next().unwrap() { - let (id, artifacts, state, run_host, commit_id, repo_url, repo_name): (u64, Option, u8, Option, u64, String, String)= TryInto::try_into(row).unwrap(); - to_start.push((id, artifacts, state, run_host, commit_id, repo_url, repo_name)); +fn random_name() -> String { + "random name".to_string() +} + +fn token_for_job() -> String { + "very secret token do not share".to_string() +} + +struct ClientJob { + dbctx: Arc, + remote_git_url: String, + sha: String, + job: PendingJob, + client: RunnerClient +} + +impl ClientJob { + pub async fn run(&mut self) { + loop { + let msg = self.client.recv().await.expect("recv works").expect("client sent an object"); + let msg_kind = msg.as_object().unwrap().get("kind").unwrap().as_str().unwrap(); + match msg_kind { + "job_status" => { + let state = msg.as_object().unwrap().get("state").unwrap().as_str().unwrap(); + let (result, state): (Result, JobState) = if state == "finished" { + let result = msg.as_object().unwrap().get("result").unwrap().as_str().unwrap(); + eprintln!("job update: state is {} and result is {}", state, result); + match result { + "pass" => { + (Ok("success".to_string()), JobState::Complete) + }, + other => { + (Err(other.to_string()), JobState::Error) + } + } + } else if state == "interrupted" { + let result = msg.as_object().unwrap().get("result").unwrap().as_str().unwrap(); + eprintln!("job update: state is {} and result is {}", state, result); + (Err(result.to_string()), JobState::Error) + } else { + eprintln!("job update: state is {}", state); + (Err(format!("atypical completion status: {}", state)), JobState::Invalid) + }; + + let repo_id = self.dbctx.repo_id_by_remote(self.job.remote_id).unwrap().expect("remote exists"); + + for notifier in self.dbctx.notifiers_by_repo(repo_id).expect("can get notifiers") { + notifier.tell_complete_job(&self.dbctx, repo_id, &self.sha, self.job.id, result.clone()).await.expect("can notify"); + } + + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("now is before epoch") + .as_millis(); + + self.dbctx.conn.lock().unwrap().execute( + "update jobs set complete_time=?1, state=?2 where id=?3", + (now as u64, state as u64, self.job.id) + ) + .expect("can update"); + } + "artifact_create" => { + eprintln!("creating artifact {:?}", msg); + self.client.send(serde_json::json!({ + "status": "ok", + "object_id": "10", + })).await.unwrap(); + }, + other => { + eprintln!("unhandled message kind {:?} ({:?})", msg_kind, msg); + return; + } + } + } + } +} + +impl RunnerClient { + async fn new(sender: mpsc::Sender>, resp: BodyStream) -> Result { + let name = random_name(); + let token = token_for_job(); + let client = RunnerClient { + tx: sender, + rx: resp, + name, + build_token: token, + }; + Ok(client) + } + + async fn send(&mut self, msg: serde_json::Value) -> Result<(), String> { + self.tx.send(Ok(serde_json::to_string(&msg).unwrap())) + .await + .map_err(|e| e.to_string()) + } + + async fn recv(&mut self) -> Result, String> { + match self.rx.next().await { + Some(Ok(bytes)) => { + serde_json::from_slice(&bytes) + .map(Option::Some) + .map_err(|e| e.to_string()) + }, + Some(Err(e)) => { + eprintln!("e: {:?}", e); + Err(format!("no client job: {:?}", e)) + }, + None => { + eprintln!("no more body chunks? client hung up?"); + Ok(None) + } + } + } + + async fn submit(mut self, dbctx: &Arc, job: &PendingJob, remote_git_url: &str, sha: &str) -> Result, String> { + self.send(serde_json::json!({ + "commit": sha, + "remote_url": remote_git_url, + "build_token": &self.build_token, + })).await?; + match self.recv().await { + Ok(Some(resp)) => { + if resp == serde_json::json!({ + "status": "started" + }) { + eprintln!("resp: {:?}", resp); + Ok(Some(ClientJob { + job: job.clone(), + dbctx: Arc::clone(dbctx), + sha: sha.to_string(), + remote_git_url: remote_git_url.to_string(), + client: self + })) + } else { + Err("client rejected job".to_string()) + } + } + Ok(None) => { + eprintln!("no more body chunks? client hung up?"); + Ok(None) + } + Err(e) => { + eprintln!("e: {:?}", e); + Err(e) + } + } + } +} + +impl fmt::Debug for RunnerClient { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.write_str("RunnerClient { .. }") + } +} + +async fn handle_next_job(State(ctx): State<(Arc, mpsc::Sender)>, job_resp: BodyStream) -> impl IntoResponse { + let (tx_sender, tx_receiver) = mpsc::channel(8); + let resp_body = StreamBody::new(ReceiverStream::new(tx_receiver)); + tx_sender.send(Ok("hello".to_string())).await.expect("works"); + let client = RunnerClient::new(tx_sender, job_resp).await; + match client { + Ok(client) => { + eprintln!("registering client"); + match ctx.1.try_send(client) { + Ok(()) => { + eprintln!("response established..."); + return (StatusCode::OK, resp_body); + } + Err(TrySendError::Full(client)) => { + return (StatusCode::IM_A_TEAPOT, resp_body); + } + Err(TrySendError::Closed(client)) => { + panic!("client holder is gone?"); + } + } } - std::mem::drop(jobs); - std::mem::drop(pending_query); - if to_start.len() > 0 { - println!("{} new jobs", to_start.len()); + Err(e) => { + eprintln!("unable to register client"); + return (StatusCode::MISDIRECTED_REQUEST, resp_body); + } + } +} + +async fn make_api_server(dbctx: Arc) -> (Router, mpsc::Receiver) { + let (pending_client_sender, pending_client_receiver) = mpsc::channel(8); + + let router = Router::new() + .route("/api/next_job", post(handle_next_job)) + .with_state((dbctx, pending_client_sender)); + (router, pending_client_receiver) +} + +#[tokio::main] +async fn main() { + tracing_subscriber::fmt::init(); + let config = RustlsConfig::from_pem_file( + PathBuf::from("/etc/letsencrypt/live/ci.butactuallyin.space/fullchain.pem"), + PathBuf::from("/etc/letsencrypt/live/ci.butactuallyin.space/privkey.pem"), + ).await.unwrap(); + + let dbctx = Arc::new(DbCtx::new("/root/ixi_ci_server/config", "/root/ixi_ci_server/state.db")); + + let (api_server, mut channel) = make_api_server(Arc::clone(&dbctx)).await; + spawn(axum_server::bind_rustls("0.0.0.0:9876".parse().unwrap(), config) + .serve(api_server.into_make_service())); + + dbctx.create_tables().unwrap(); + + loop { + let jobs = dbctx.get_pending_jobs().unwrap(); + + if jobs.len() > 0 { + println!("{} new jobs", jobs.len()); - for job in to_start.into_iter() { - activate_job(&mut connection, job.0, job.1, job.2, job.3, job.4, job.5, job.6); + for job in jobs.into_iter() { + activate_job(Arc::clone(&dbctx), &job, &mut channel).await; } } std::thread::sleep(std::time::Duration::from_millis(100)); -- cgit v1.1