From 4a213e872395f9b0562c113bb7303815a1d26a57 Mon Sep 17 00:00:00 2001 From: iximeow Date: Thu, 22 Dec 2022 18:29:26 +0000 Subject: draw almost all of the owl --- src/ci_runner.rs | 269 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 269 insertions(+) create mode 100644 src/ci_runner.rs (limited to 'src/ci_runner.rs') diff --git a/src/ci_runner.rs b/src/ci_runner.rs new file mode 100644 index 0000000..6554f05 --- /dev/null +++ b/src/ci_runner.rs @@ -0,0 +1,269 @@ +use std::time::Duration; +use reqwest::{StatusCode, Response}; +use std::process::Command; +use serde_derive::{Deserialize, Serialize}; +use serde::{Deserialize, de::DeserializeOwned, Serialize}; + +#[derive(Debug)] +enum WorkAcquireError { + Reqwest(reqwest::Error), + EarlyEof, + Protocol(String), +} + +struct RunnerClient { + host: String, + tx: hyper::body::Sender, + rx: Response, + current_job: Option, +} + +#[derive(Debug, Serialize, Deserialize)] +struct RequestedJob { + commit: String, + remote_url: String, + build_token: String, +} + +impl RequestedJob { + // TODO: panics if hyper finds the channel is closed. hum + async fn create_artifact(&self, client: &mut RunnerClient, name: &str, desc: &str) -> Result { + let create_artifact_message = serde_json::json!({ + "kind": "artifact_create", + "name": name, + "description": desc, + "job_token": &self.build_token, + }); + client.send(create_artifact_message).await + .map_err(|e| format!("create artifact send error: {:?}", e))?; + let resp = client.recv().await + .map_err(|e| format!("create artifact recv error: {:?}", e))?; + eprintln!("resp: {:?}", resp); + let object_id = resp.unwrap() + .as_object().expect("is an object") + .get("object_id").unwrap().as_str().expect("is str") + .to_owned(); + // POST to this object id... + Ok(ArtifactStream { + object_id, + }) + } + + async fn execute_goodfile(&self, client: &mut RunnerClient) -> Result { + let clone_log = self.create_artifact(client, "git clone log", &format!("git clone {} tmpdir", &self.remote_url)).await.expect("works"); + + let clone_res = Command::new("git") + .arg("clone") + .arg(&self.remote_url) + .arg("tmpdir") + .status() + .map_err(|e| format!("failed to run git clone? {:?}", e))?; + + if !clone_res.success() { + return Err(format!("git clone failed: {:?}", clone_res)); + } + + let checkout_log = self.create_artifact(client, "git checkout log", &format!("git checkout {}", &self.commit)).await.expect("works"); + + let checkout_res = Command::new("git") + .current_dir("tmpdir") + .arg("checkout") + .arg(&self.commit) + .status() + .map_err(|e| format!("failed to run git checkout? {:?}", e))?; + + if !checkout_res.success() { + return Err(format!("git checkout failed: {:?}", checkout_res)); + } + + let build_log = self.create_artifact(client, "cargo build log", "cargo build").await.expect("works"); + + let build_res = Command::new("cargo") + .current_dir("tmpdir") + .arg("build") + .status() + .map_err(|e| format!("failed to run cargo build? {:?}", e))?; + + if !build_res.success() { + return Err(format!("cargo build failed: {:?}", build_res)); + } + + let test_log = self.create_artifact(client, "cargo test log", "cargo test").await.expect("works"); + + let test_result = Command::new("cargo") + .current_dir("tmpdir") + .arg("test") + .status() + .map_err(|e| format!("failed to run cargo test? {:?}", e))?; + + match test_result.code() { + Some(0) => Ok("pass".to_string()), + Some(n) => Ok(format!("error: {}", n)), + None => Ok(format!("abnormal exit")), + } + } +} + +struct ArtifactStream { + object_id: String, +} + +impl RunnerClient { + async fn new(host: &str, sender: hyper::body::Sender, mut res: Response) -> Result { + if res.status() != StatusCode::OK { + return Err(format!("server returned a bad response: {:?}, response itself: {:?}", res.status(), res)); + } + + let hello = res.chunk().await.expect("chunk"); + if hello.as_ref().map(|x| &x[..]) != Some(b"hello") { + return Err(format!("bad hello: {:?}", hello)); + } + + Ok(Self { + host: host.to_string(), + tx: sender, + rx: res, + current_job: None, + }) + } + + async fn wait_for_work(&mut self) -> Result, WorkAcquireError> { + match self.rx.chunk().await { + Ok(Some(chunk)) => { + eprintln!("got chunk: {:?}", &chunk); + serde_json::from_slice(&chunk) + .map(Option::Some) + .map_err(|e| { + WorkAcquireError::Protocol(format!("not json: {:?}", e)) + }) + } + Ok(None) => { + Ok(None) + }, + Err(e) => { + Err(WorkAcquireError::Reqwest(e)) + } + } + } + + async fn recv(&mut self) -> Result, String> { + self.recv_typed().await + } + + async fn recv_typed(&mut self) -> Result, String> { + match self.rx.chunk().await { + Ok(Some(chunk)) => { + serde_json::from_slice(&chunk) + .map(Option::Some) + .map_err(|e| { + format!("not json: {:?}", e) + }) + }, + Ok(None) => Ok(None), + Err(e) => { + Err(format!("error in recv: {:?}", e)) + } + } + } + + async fn send(&mut self, value: serde_json::Value) -> Result<(), String> { + self.tx.send_data( + serde_json::to_vec(&value) + .map_err(|e| format!("json error: {:?}", e))? + .into() + ).await + .map_err(|e| format!("send error: {:?}", e)) + } + + async fn run_job(&mut self, job: RequestedJob) { + self.send(serde_json::json!({ + "status": "started" + })).await.unwrap(); + + std::fs::remove_dir_all("tmpdir").unwrap(); + std::fs::create_dir("tmpdir").unwrap(); + + let res = job.execute_goodfile(self).await; + + match res { + Ok(status) => { + self.send(serde_json::json!({ + "kind": "job_status", + "state": "finished", + "result": status + })).await.unwrap(); + } + Err(status) => { + self.send(serde_json::json!({ + "kind": "job_status", + "state": "interrupted", + "result": status + })).await.unwrap(); + } + } + } +} + +#[tokio::main] +async fn main() { + let secret = std::fs::read_to_string("./auth_secret").unwrap(); + let client = reqwest::ClientBuilder::new() + .connect_timeout(Duration::from_millis(1000)) + .timeout(Duration::from_millis(600000)) + .build() + .expect("can build client"); + + loop { + let (mut sender, body) = hyper::Body::channel(); + let poll = client.post("https://ci.butactuallyin.space:9876/api/next_job") + .header("user-agent", "ci-butactuallyin-space-runner") + .header("authorization", &secret) + .body(body) + .send() + .await; + + match poll { + Ok(mut res) => { + let mut client = match RunnerClient::new("ci.butactuallyin.space:9876", sender, res).await { + Ok(client) => client, + Err(e) => { + eprintln!("failed to initialize client: {:?}", e); + std::thread::sleep(Duration::from_millis(10000)); + continue; + } + }; + let job = match client.wait_for_work().await { + Ok(Some(request)) => request, + Ok(None) => { + eprintln!("no work to do (yet)"); + std::thread::sleep(Duration::from_millis(2000)); + continue; + } + Err(e) => { + eprintln!("failed to get work: {:?}", e); + std::thread::sleep(Duration::from_millis(10000)); + continue; + } + }; + eprintln!("requested work: {:?}", job); + + eprintln!("doing {:?}", job); + client.run_job(job).await; + std::thread::sleep(Duration::from_millis(10000)); + }, + Err(e) => { + let message = format!("{}", e); + + if message.contains("tcp connect error") { + eprintln!("could not reach server. sleeping a bit and retrying."); + std::thread::sleep(Duration::from_millis(5000)); + continue; + } + + eprintln!("unhandled error: {}", message); + + std::thread::sleep(Duration::from_millis(1000)); + } + } + } +} -- cgit v1.1