From db4c638182d1c6b15bdc8d94f7baa691197e9a56 Mon Sep 17 00:00:00 2001 From: iximeow Date: Sun, 29 Oct 2023 13:08:37 -0700 Subject: split runner into local/remote logic ideally `trait Runner` describes whatever should have behavioral differences when a task is run locally vs in conjunction with a remote server. then everything consuming a `dyn Runner` will not need to change. if i've done it right. --- Cargo.lock | 1 + ci-lib-core/src/protocol.rs | 4 +- ci-runner/Cargo.toml | 1 + ci-runner/src/lua/mod.rs | 25 +++++---- ci-runner/src/main.rs | 122 +++++++++++++++++++++++++++++--------------- 5 files changed, 99 insertions(+), 54 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1ea5f9a..10919fa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -380,6 +380,7 @@ dependencies = [ name = "ci-runner" version = "0.0.1" dependencies = [ + "async-trait", "ci-lib-core", "ci-lib-native", "hyper", diff --git a/ci-lib-core/src/protocol.rs b/ci-lib-core/src/protocol.rs index c7a9318..c987c89 100644 --- a/ci-lib-core/src/protocol.rs +++ b/ci-lib-core/src/protocol.rs @@ -15,7 +15,7 @@ pub enum ClientProto { Pong, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] #[serde(tag = "command_info")] #[serde(rename_all = "snake_case")] pub enum CommandInfo { @@ -23,7 +23,7 @@ pub enum CommandInfo { Finished { exit_code: Option, id: u32 }, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] #[serde(tag = "task_info")] #[serde(rename_all = "snake_case")] pub enum TaskInfo { diff --git a/ci-runner/Cargo.toml b/ci-runner/Cargo.toml index 038ed14..c956fb5 100644 --- a/ci-runner/Cargo.toml +++ b/ci-runner/Cargo.toml @@ -13,6 +13,7 @@ path = "src/main.rs" ci-lib-core = { path = "../ci-lib-core" } ci-lib-native = { path = "../ci-lib-native" } +async-trait = "*" libc = "*" serde = "*" serde_derive = "*" diff --git a/ci-runner/src/lua/mod.rs b/ci-runner/src/lua/mod.rs index 62ac68b..92be5ce 100644 --- a/ci-runner/src/lua/mod.rs +++ b/ci-runner/src/lua/mod.rs @@ -1,3 +1,4 @@ +use crate::Runner; use crate::RunningJob; use rlua::prelude::*; @@ -9,7 +10,7 @@ pub const DEFAULT_RUST_GOODFILE: &'static [u8] = include_bytes!("../../../config pub struct BuildEnv { lua: Lua, - job: Arc>, + job: Arc>>, } #[derive(Debug)] @@ -26,6 +27,7 @@ pub struct CommandOutput { } mod lua_exports { + use crate::Runner; use crate::RunningJob; use crate::lua::{CommandOutput, RunParams}; @@ -115,7 +117,7 @@ mod lua_exports { Ok((args, params)) } - pub fn build_command_impl(command: LuaValue, params: LuaValue, job_ctx: Arc>) -> Result<(), rlua::Error> { + pub fn build_command_impl(command: LuaValue, params: LuaValue, job_ctx: Arc>>) -> Result<(), rlua::Error> { let (args, params) = collect_build_args(command, params)?; eprintln!("args: {:?}", args); eprintln!(" params: {:?}", params); @@ -129,7 +131,7 @@ mod lua_exports { }) } - pub fn check_output_impl<'lua>(ctx: rlua::Context<'lua>, command: LuaValue<'lua>, params: LuaValue<'lua>, job_ctx: Arc>) -> Result, rlua::Error> { + pub fn check_output_impl<'lua>(ctx: rlua::Context<'lua>, command: LuaValue<'lua>, params: LuaValue<'lua>, job_ctx: Arc>>) -> Result, rlua::Error> { let (args, params) = collect_build_args(command, params)?; eprintln!("args: {:?}", args); eprintln!(" params: {:?}", params); @@ -167,7 +169,7 @@ mod lua_exports { Ok(()) } - pub fn metric(name: String, value: String, job_ctx: Arc>) -> Result<(), rlua::Error> { + pub fn metric(name: String, value: String, job_ctx: Arc>>) -> Result<(), rlua::Error> { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build() @@ -178,7 +180,7 @@ mod lua_exports { }) } - pub fn artifact(path: String, name: Option, job_ctx: Arc>) -> Result<(), rlua::Error> { + pub fn artifact(path: String, name: Option, job_ctx: Arc>>) -> Result<(), rlua::Error> { let path: PathBuf = path.into(); let default_name: String = match (path.file_name(), path.parent()) { @@ -230,23 +232,24 @@ mod lua_exports { } pub mod step { + use crate::Runner; use crate::RunningJob; use std::sync::{Arc, Mutex}; - pub fn start(job_ref: Arc>, name: String) -> Result<(), rlua::Error> { + pub fn start(job_ref: Arc>>, name: String) -> Result<(), rlua::Error> { let mut job = job_ref.lock().unwrap(); job.current_step.clear(); job.current_step.push(name); Ok(()) } - pub fn push(job_ref: Arc>, name: String) -> Result<(), rlua::Error> { + pub fn push(job_ref: Arc>>, name: String) -> Result<(), rlua::Error> { let mut job = job_ref.lock().unwrap(); job.current_step.push(name); Ok(()) } - pub fn advance(job_ref: Arc>, name: String) -> Result<(), rlua::Error> { + pub fn advance(job_ref: Arc>>, name: String) -> Result<(), rlua::Error> { let mut job = job_ref.lock().unwrap(); job.current_step.pop(); job.current_step.push(name); @@ -257,14 +260,14 @@ mod lua_exports { struct DeclEnv<'lua, 'env> { lua_ctx: &'env rlua::Context<'lua>, - job_ref: &'env Arc>, + job_ref: &'env Arc>>, } impl<'lua, 'env> DeclEnv<'lua, 'env> { fn create_function(&self, name: &str, f: F) -> Result, String> where A: FromLuaMulti<'lua>, R: ToLuaMulti<'lua>, - F: 'static + Send + Fn(rlua::Context<'lua>, Arc>, A) -> Result { + F: 'static + Send + Fn(rlua::Context<'lua>, Arc>>, A) -> Result { let job_ref = Arc::clone(self.job_ref); self.lua_ctx.create_function(move |ctx, args| { @@ -276,7 +279,7 @@ impl<'lua, 'env> DeclEnv<'lua, 'env> { } impl BuildEnv { - pub fn new(job: &Arc>) -> Self { + pub fn new(job: &Arc>>) -> Self { let env = BuildEnv { lua: Lua::new(), job: Arc::clone(job), diff --git a/ci-runner/src/main.rs b/ci-runner/src/main.rs index 41f5594..fad852b 100644 --- a/ci-runner/src/main.rs +++ b/ci-runner/src/main.rs @@ -12,6 +12,8 @@ use serde::{Deserialize, de::DeserializeOwned, Serialize}; use std::task::{Context, Poll}; use std::pin::Pin; use std::marker::Unpin; +use std::future::Future; +use std::path::Path; use ci_lib_native::io; use ci_lib_native::io::{ArtifactStream, VecSink}; @@ -28,7 +30,22 @@ enum WorkAcquireError { Protocol(String), } -struct RunnerClient { +/// `Runner` describes the logic bridging a local task runner with whatever causes the task runner +/// to execute. most concretely, `Runner` is the implementation that varies between "run this +/// goodfile locally right here" and "run a remotely-requested goodfile and report results back to +/// a server" +#[async_trait::async_trait] +trait Runner: Send + Sync + 'static { + async fn report_start(&mut self) -> Result<(), String>; + async fn report_task_status(&mut self, status: TaskInfo) -> Result<(), String>; + async fn report_command_info(&mut self, info: CommandInfo) -> Result<(), String>; + async fn send_metric(&mut self, name: &str, value: String) -> Result<(), String>; + async fn create_artifact(&self, name: &str, desc: &str, build_token: &str) -> Result; +} + +/// `RmoteServerRunner` is the implementation of `Runner` supporting "a remote server has given me +/// a task", including reporting metrics and statuses back to the CI server. +struct RemoteServerRunner { http: reqwest::Client, host: String, tx: hyper::body::Sender, @@ -36,11 +53,51 @@ struct RunnerClient { current_job: Option, } +#[async_trait::async_trait] +impl Runner for RemoteServerRunner { + async fn report_start(&mut self) -> Result<(), String> { + self.send_typed(&ClientProto::Started).await + } + async fn report_task_status(&mut self, status: TaskInfo) -> Result<(), String> { + self.send_typed(&ClientProto::task_status(status)) + .await + } + async fn report_command_info(&mut self, info: CommandInfo) -> Result<(), String> { + self.send_typed(&ClientProto::command(info)) + .await + .map_err(|e| format!("failed to report command info: {:?})", e)) + } + async fn send_metric(&mut self, name: &str, value: String) -> Result<(), String> { + self.send_typed(&ClientProto::metric(name, value)) + .await + .map_err(|e| format!("failed to send metric {}: {:?})", name, e)) + } + async fn create_artifact(&self, name: &str, desc: &str, build_token: &str) -> Result { + let (mut sender, body) = hyper::Body::channel(); + let resp = self.http.post("https://ci.butactuallyin.space:9876/api/artifact") + .header("user-agent", "ci-butactuallyin-space-runner") + .header("x-task-token", build_token) + .header("x-artifact-name", name) + .header("x-artifact-desc", desc) + .body(body) + .send() + .await + .map_err(|e| format!("unable to send request: {:?}", e))?; + + if resp.status() == StatusCode::OK { + eprintln!("[+] artifact '{}' started", name); + Ok(ArtifactStream::new(sender)) + } else { + Err(format!("[-] unable to create artifact: {:?}", resp)) + } + } +} + impl RunningJob { - fn from_job(job: RequestedJob, client: RunnerClient) -> Self { + fn from_job(job: RequestedJob, client: RemoteServerRunner) -> Self { Self { job, - client, + runner_ctx: Box::new(client) as Box, current_step: StepTracker::new(), } } @@ -48,11 +105,11 @@ impl RunningJob { struct JobEnv { lua: lua::BuildEnv, - job: Arc>, + job: Arc>>, } impl JobEnv { - fn new(job: &Arc>) -> Self { + fn new(job: &Arc>>) -> Self { let lua = lua::BuildEnv::new(job); JobEnv { lua, @@ -72,7 +129,7 @@ impl JobEnv { pub struct RunningJob { job: RequestedJob, - client: RunnerClient, + runner_ctx: Box, current_step: StepTracker, } @@ -112,30 +169,12 @@ impl StepTracker { impl RunningJob { async fn send_metric(&mut self, name: &str, value: String) -> Result<(), String> { - self.client.send_typed(&ClientProto::metric(name, value)) - .await - .map_err(|e| format!("failed to send metric {}: {:?})", name, e)) + self.runner_ctx.send_metric(name, value).await } // TODO: panics if hyper finds the channel is closed. hum async fn create_artifact(&self, name: &str, desc: &str) -> Result { - let (mut sender, body) = hyper::Body::channel(); - let resp = self.client.http.post("https://ci.butactuallyin.space:9876/api/artifact") - .header("user-agent", "ci-butactuallyin-space-runner") - .header("x-task-token", &self.job.build_token) - .header("x-artifact-name", name) - .header("x-artifact-desc", desc) - .body(body) - .send() - .await - .map_err(|e| format!("unable to send request: {:?}", e))?; - - if resp.status() == StatusCode::OK { - eprintln!("[+] artifact '{}' started", name); - Ok(ArtifactStream::new(sender)) - } else { - Err(format!("[-] unable to create artifact: {:?}", resp)) - } + self.runner_ctx.create_artifact(name, desc, &self.job.build_token).await } async fn clone_remote(&self) -> Result<(), RepoError> { @@ -237,21 +276,21 @@ impl RunningJob { } async fn run(mut self) { - self.client.send_typed(&ClientProto::Started).await.unwrap(); + self.runner_ctx.report_start(); std::fs::remove_dir_all("tmpdir").unwrap(); std::fs::create_dir("tmpdir").unwrap(); - let ctx = Arc::new(Mutex::new(self)); + let ctx = Arc::new(Mutex::new(Box::new(self) as Box)); let checkout_res = ctx.lock().unwrap().clone_remote().await; if let Err(e) = checkout_res { let status = "bad_ref"; - let status = ClientProto::task_status(TaskInfo::finished(status)); + let status = TaskInfo::finished(status); eprintln!("checkout failed, reporting status: {:?}", status); - let res = ctx.lock().unwrap().client.send_typed(&status).await; + let res = ctx.lock().unwrap().runner_ctx.report_task_status(status).await; if let Err(e) = res { eprintln!("[!] FAILED TO REPORT JOB STATUS ({}): {:?}", "success", e); } @@ -292,19 +331,23 @@ impl RunningJob { match res { Ok(status) => { eprintln!("[+] job success!"); - let status = ClientProto::task_status(TaskInfo::finished(status)); + let status = TaskInfo::finished(status); eprintln!("reporting status: {:?}", status); - let res = ctx.lock().unwrap().client.send_typed(&status).await; + let res = ctx.lock().unwrap().runner_ctx.report_task_status(status).await; if let Err(e) = res { eprintln!("[!] FAILED TO REPORT JOB STATUS ({}): {:?}", "success", e); } } Err((status, lua_err)) => { eprintln!("[-] job error: {}", status); + let status = TaskInfo::interrupted(status, lua_err.to_string()); - let status = ClientProto::task_status(TaskInfo::interrupted(status, lua_err.to_string())); - let res = ctx.lock().unwrap().client.send_typed(&status).await; + let res = ctx + .lock() + .unwrap() + .runner_ctx + .report_task_status(status.clone()).await; if let Err(e) = res { eprintln!("[!] FAILED TO REPORT JOB STATUS ({:?}): {:?}", status, e); } @@ -342,16 +385,13 @@ impl RunningJob { } async fn run_command(&mut self, command: &[String], working_dir: Option<&str>) -> Result<(), String> { - self.client.send_typed(&ClientProto::command(CommandInfo::started(command, working_dir, 1))) - .await.unwrap(); + self.runner_ctx.report_command_info(CommandInfo::started(command, working_dir, 1)).await.unwrap(); let (cmd, human_name) = Self::prep_command(command, working_dir); let cmd_res = self.execute_command_and_report(cmd, &format!("{} log", human_name), &human_name).await?; - self.client.send_typed(&ClientProto::command(CommandInfo::finished(cmd_res.code(), 1))) - .await.unwrap(); - + self.runner_ctx.report_command_info(CommandInfo::finished(cmd_res.code(), 1)).await.unwrap(); if !cmd_res.success() { return Err(format!("{} failed: {:?}", &human_name, cmd_res)); @@ -361,7 +401,7 @@ impl RunningJob { } } -impl RunnerClient { +impl RemoteServerRunner { async fn new(host: &str, mut sender: hyper::body::Sender, mut res: Response) -> Result { if res.status() != StatusCode::OK { return Err(format!("server returned a bad response: {:?}, response itself: {:?}", res.status(), res)); @@ -483,7 +523,7 @@ async fn main() { match poll { Ok(mut res) => { - let mut client = match RunnerClient::new("ci.butactuallyin.space:9876", sender, res).await { + let mut client = match RemoteServerRunner::new("ci.butactuallyin.space:9876", sender, res).await { Ok(client) => client, Err(e) => { eprintln!("failed to initialize client: {:?}", e); -- cgit v1.1