diff options
| author | iximeow <me@iximeow.net> | 2023-10-29 13:08:37 -0700 | 
|---|---|---|
| committer | iximeow <me@iximeow.net> | 2023-10-29 13:14:32 -0700 | 
| commit | db4c638182d1c6b15bdc8d94f7baa691197e9a56 (patch) | |
| tree | 8fb0ae2a00a0f1f2f3c9a2ce08f770da14acf15e /ci-runner | |
| parent | 6c5f395978b34e7b4626ea8605f0eca20ab730d8 (diff) | |
split runner into local/remote logic
ideally `trait Runner` describes whatever should have behavioral
differences when a task is run locally vs in conjunction with a remote
server. then everything consuming a `dyn Runner` will not need to
change. if i've done it right.
Diffstat (limited to 'ci-runner')
| -rw-r--r-- | ci-runner/Cargo.toml | 1 | ||||
| -rw-r--r-- | ci-runner/src/lua/mod.rs | 25 | ||||
| -rw-r--r-- | ci-runner/src/main.rs | 122 | 
3 files changed, 96 insertions, 52 deletions
| diff --git a/ci-runner/Cargo.toml b/ci-runner/Cargo.toml index 038ed14..c956fb5 100644 --- a/ci-runner/Cargo.toml +++ b/ci-runner/Cargo.toml @@ -13,6 +13,7 @@ path = "src/main.rs"  ci-lib-core = { path = "../ci-lib-core" }  ci-lib-native = { path = "../ci-lib-native" } +async-trait = "*"  libc = "*"  serde = "*"  serde_derive = "*" diff --git a/ci-runner/src/lua/mod.rs b/ci-runner/src/lua/mod.rs index 62ac68b..92be5ce 100644 --- a/ci-runner/src/lua/mod.rs +++ b/ci-runner/src/lua/mod.rs @@ -1,3 +1,4 @@ +use crate::Runner;  use crate::RunningJob;  use rlua::prelude::*; @@ -9,7 +10,7 @@ pub const DEFAULT_RUST_GOODFILE: &'static [u8] = include_bytes!("../../../config  pub struct BuildEnv {      lua: Lua, -    job: Arc<Mutex<RunningJob>>, +    job: Arc<Mutex<Box<RunningJob>>>,  }  #[derive(Debug)] @@ -26,6 +27,7 @@ pub struct CommandOutput {  }  mod lua_exports { +    use crate::Runner;      use crate::RunningJob;      use crate::lua::{CommandOutput, RunParams}; @@ -115,7 +117,7 @@ mod lua_exports {          Ok((args, params))      } -    pub fn build_command_impl(command: LuaValue, params: LuaValue, job_ctx: Arc<Mutex<RunningJob>>) -> Result<(), rlua::Error> { +    pub fn build_command_impl(command: LuaValue, params: LuaValue, job_ctx: Arc<Mutex<Box<RunningJob>>>) -> Result<(), rlua::Error> {          let (args, params) = collect_build_args(command, params)?;          eprintln!("args: {:?}", args);          eprintln!("  params: {:?}", params); @@ -129,7 +131,7 @@ mod lua_exports {          })      } -    pub fn check_output_impl<'lua>(ctx: rlua::Context<'lua>, command: LuaValue<'lua>, params: LuaValue<'lua>, job_ctx: Arc<Mutex<RunningJob>>) -> Result<rlua::Table<'lua>, rlua::Error> { +    pub fn check_output_impl<'lua>(ctx: rlua::Context<'lua>, command: LuaValue<'lua>, params: LuaValue<'lua>, job_ctx: Arc<Mutex<Box<RunningJob>>>) -> Result<rlua::Table<'lua>, rlua::Error> {          let (args, params) = collect_build_args(command, params)?;          eprintln!("args: {:?}", args);          eprintln!("  params: {:?}", params); @@ -167,7 +169,7 @@ mod lua_exports {          Ok(())      } -    pub fn metric(name: String, value: String, job_ctx: Arc<Mutex<RunningJob>>) -> Result<(), rlua::Error> { +    pub fn metric(name: String, value: String, job_ctx: Arc<Mutex<Box<RunningJob>>>) -> Result<(), rlua::Error> {          let rt = tokio::runtime::Builder::new_current_thread()              .enable_all()              .build() @@ -178,7 +180,7 @@ mod lua_exports {          })      } -    pub fn artifact(path: String, name: Option<String>, job_ctx: Arc<Mutex<RunningJob>>) -> Result<(), rlua::Error> { +    pub fn artifact(path: String, name: Option<String>, job_ctx: Arc<Mutex<Box<RunningJob>>>) -> Result<(), rlua::Error> {          let path: PathBuf = path.into();          let default_name: String = match (path.file_name(), path.parent()) { @@ -230,23 +232,24 @@ mod lua_exports {      }      pub mod step { +        use crate::Runner;          use crate::RunningJob;          use std::sync::{Arc, Mutex}; -        pub fn start(job_ref: Arc<Mutex<RunningJob>>, name: String) -> Result<(), rlua::Error> { +        pub fn start(job_ref: Arc<Mutex<Box<RunningJob>>>, name: String) -> Result<(), rlua::Error> {              let mut job = job_ref.lock().unwrap();              job.current_step.clear();              job.current_step.push(name);              Ok(())          } -        pub fn push(job_ref: Arc<Mutex<RunningJob>>, name: String) -> Result<(), rlua::Error> { +        pub fn push(job_ref: Arc<Mutex<Box<RunningJob>>>, name: String) -> Result<(), rlua::Error> {              let mut job = job_ref.lock().unwrap();              job.current_step.push(name);              Ok(())          } -        pub fn advance(job_ref: Arc<Mutex<RunningJob>>, name: String) -> Result<(), rlua::Error> { +        pub fn advance(job_ref: Arc<Mutex<Box<RunningJob>>>, name: String) -> Result<(), rlua::Error> {              let mut job = job_ref.lock().unwrap();              job.current_step.pop();              job.current_step.push(name); @@ -257,14 +260,14 @@ mod lua_exports {  struct DeclEnv<'lua, 'env> {      lua_ctx: &'env rlua::Context<'lua>, -    job_ref: &'env Arc<Mutex<RunningJob>>, +    job_ref: &'env Arc<Mutex<Box<RunningJob>>>,  }  impl<'lua, 'env> DeclEnv<'lua, 'env> {      fn create_function<A, R, F>(&self, name: &str, f: F) ->  Result<rlua::Function<'lua>, String>          where          A: FromLuaMulti<'lua>,          R: ToLuaMulti<'lua>, -        F: 'static + Send + Fn(rlua::Context<'lua>, Arc<Mutex<RunningJob>>, A) -> Result<R, rlua::Error> { +        F: 'static + Send + Fn(rlua::Context<'lua>, Arc<Mutex<Box<RunningJob>>>, A) -> Result<R, rlua::Error> {          let job_ref = Arc::clone(self.job_ref);          self.lua_ctx.create_function(move |ctx, args| { @@ -276,7 +279,7 @@ impl<'lua, 'env> DeclEnv<'lua, 'env> {  }  impl BuildEnv { -    pub fn new(job: &Arc<Mutex<RunningJob>>) -> Self { +    pub fn new(job: &Arc<Mutex<Box<RunningJob>>>) -> Self {          let env = BuildEnv {              lua: Lua::new(),              job: Arc::clone(job), diff --git a/ci-runner/src/main.rs b/ci-runner/src/main.rs index 41f5594..fad852b 100644 --- a/ci-runner/src/main.rs +++ b/ci-runner/src/main.rs @@ -12,6 +12,8 @@ use serde::{Deserialize, de::DeserializeOwned, Serialize};  use std::task::{Context, Poll};  use std::pin::Pin;  use std::marker::Unpin; +use std::future::Future; +use std::path::Path;  use ci_lib_native::io;  use ci_lib_native::io::{ArtifactStream, VecSink}; @@ -28,7 +30,22 @@ enum WorkAcquireError {      Protocol(String),  } -struct RunnerClient { +/// `Runner` describes the logic bridging a local task runner with whatever causes the task runner +/// to execute. most concretely, `Runner` is the implementation that varies between "run this +/// goodfile locally right here" and "run a remotely-requested goodfile and report results back to +/// a server" +#[async_trait::async_trait] +trait Runner: Send + Sync + 'static { +    async fn report_start(&mut self) -> Result<(), String>; +    async fn report_task_status(&mut self, status: TaskInfo) -> Result<(), String>; +    async fn report_command_info(&mut self, info: CommandInfo) -> Result<(), String>; +    async fn send_metric(&mut self, name: &str, value: String) -> Result<(), String>; +    async fn create_artifact(&self, name: &str, desc: &str, build_token: &str) -> Result<ArtifactStream, String>; +} + +/// `RmoteServerRunner` is the implementation of `Runner` supporting "a remote server has given me +/// a task", including reporting metrics and statuses back to the CI server. +struct RemoteServerRunner {      http: reqwest::Client,      host: String,      tx: hyper::body::Sender, @@ -36,11 +53,51 @@ struct RunnerClient {      current_job: Option<RequestedJob>,  } +#[async_trait::async_trait] +impl Runner for RemoteServerRunner { +    async fn report_start(&mut self) -> Result<(), String> { +        self.send_typed(&ClientProto::Started).await +    } +    async fn report_task_status(&mut self, status: TaskInfo) -> Result<(), String> { +        self.send_typed(&ClientProto::task_status(status)) +            .await +    } +    async fn report_command_info(&mut self, info: CommandInfo) -> Result<(), String> { +        self.send_typed(&ClientProto::command(info)) +            .await +            .map_err(|e| format!("failed to report command info: {:?})", e)) +    } +    async fn send_metric(&mut self, name: &str, value: String) -> Result<(), String> { +        self.send_typed(&ClientProto::metric(name, value)) +            .await +            .map_err(|e| format!("failed to send metric {}: {:?})", name, e)) +    } +    async fn create_artifact(&self, name: &str, desc: &str, build_token: &str) -> Result<ArtifactStream, String> { +        let (mut sender, body) = hyper::Body::channel(); +        let resp = self.http.post("https://ci.butactuallyin.space:9876/api/artifact") +            .header("user-agent", "ci-butactuallyin-space-runner") +            .header("x-task-token", build_token) +            .header("x-artifact-name", name) +            .header("x-artifact-desc", desc) +            .body(body) +            .send() +            .await +            .map_err(|e| format!("unable to send request: {:?}", e))?; + +        if resp.status() == StatusCode::OK { +            eprintln!("[+] artifact '{}' started", name); +            Ok(ArtifactStream::new(sender)) +        } else { +            Err(format!("[-] unable to create artifact: {:?}", resp)) +        } +    } +} +  impl RunningJob { -    fn from_job(job: RequestedJob, client: RunnerClient) -> Self { +    fn from_job(job: RequestedJob, client: RemoteServerRunner) -> Self {          Self {              job, -            client, +            runner_ctx: Box::new(client) as Box<dyn Runner>,              current_step: StepTracker::new(),          }      } @@ -48,11 +105,11 @@ impl RunningJob {  struct JobEnv {      lua: lua::BuildEnv, -    job: Arc<Mutex<RunningJob>>, +    job: Arc<Mutex<Box<RunningJob>>>,  }  impl JobEnv { -    fn new(job: &Arc<Mutex<RunningJob>>) -> Self { +    fn new(job: &Arc<Mutex<Box<RunningJob>>>) -> Self {          let lua = lua::BuildEnv::new(job);          JobEnv {              lua, @@ -72,7 +129,7 @@ impl JobEnv {  pub struct RunningJob {      job: RequestedJob, -    client: RunnerClient, +    runner_ctx: Box<dyn Runner>,      current_step: StepTracker,  } @@ -112,30 +169,12 @@ impl StepTracker {  impl RunningJob {      async fn send_metric(&mut self, name: &str, value: String) -> Result<(), String> { -        self.client.send_typed(&ClientProto::metric(name, value)) -            .await -            .map_err(|e| format!("failed to send metric {}: {:?})", name, e)) +        self.runner_ctx.send_metric(name, value).await      }      // TODO: panics if hyper finds the channel is closed. hum      async fn create_artifact(&self, name: &str, desc: &str) -> Result<ArtifactStream, String> { -        let (mut sender, body) = hyper::Body::channel(); -        let resp = self.client.http.post("https://ci.butactuallyin.space:9876/api/artifact") -            .header("user-agent", "ci-butactuallyin-space-runner") -            .header("x-task-token", &self.job.build_token) -            .header("x-artifact-name", name) -            .header("x-artifact-desc", desc) -            .body(body) -            .send() -            .await -            .map_err(|e| format!("unable to send request: {:?}", e))?; - -        if resp.status() == StatusCode::OK { -            eprintln!("[+] artifact '{}' started", name); -            Ok(ArtifactStream::new(sender)) -        } else { -            Err(format!("[-] unable to create artifact: {:?}", resp)) -        } +        self.runner_ctx.create_artifact(name, desc, &self.job.build_token).await      }      async fn clone_remote(&self) -> Result<(), RepoError> { @@ -237,21 +276,21 @@ impl RunningJob {      }      async fn run(mut self) { -        self.client.send_typed(&ClientProto::Started).await.unwrap(); +        self.runner_ctx.report_start();          std::fs::remove_dir_all("tmpdir").unwrap();          std::fs::create_dir("tmpdir").unwrap(); -        let ctx = Arc::new(Mutex::new(self)); +        let ctx = Arc::new(Mutex::new(Box::new(self) as Box<RunningJob>));          let checkout_res = ctx.lock().unwrap().clone_remote().await;          if let Err(e) = checkout_res {              let status = "bad_ref"; -            let status = ClientProto::task_status(TaskInfo::finished(status)); +            let status = TaskInfo::finished(status);              eprintln!("checkout failed, reporting status: {:?}", status); -            let res = ctx.lock().unwrap().client.send_typed(&status).await; +            let res = ctx.lock().unwrap().runner_ctx.report_task_status(status).await;              if let Err(e) = res {                  eprintln!("[!] FAILED TO REPORT JOB STATUS ({}): {:?}", "success", e);              } @@ -292,19 +331,23 @@ impl RunningJob {          match res {              Ok(status) => {                  eprintln!("[+] job success!"); -                let status = ClientProto::task_status(TaskInfo::finished(status)); +                let status = TaskInfo::finished(status);                  eprintln!("reporting status: {:?}", status); -                let res = ctx.lock().unwrap().client.send_typed(&status).await; +                let res = ctx.lock().unwrap().runner_ctx.report_task_status(status).await;                  if let Err(e) = res {                      eprintln!("[!] FAILED TO REPORT JOB STATUS ({}): {:?}", "success", e);                  }              }              Err((status, lua_err)) => {                  eprintln!("[-] job error: {}", status); +                let status = TaskInfo::interrupted(status, lua_err.to_string()); -                let status = ClientProto::task_status(TaskInfo::interrupted(status, lua_err.to_string())); -                let res = ctx.lock().unwrap().client.send_typed(&status).await; +                let res = ctx +                    .lock() +                    .unwrap() +                    .runner_ctx +                    .report_task_status(status.clone()).await;                  if let Err(e) = res {                      eprintln!("[!] FAILED TO REPORT JOB STATUS ({:?}): {:?}", status, e);                  } @@ -342,16 +385,13 @@ impl RunningJob {      }      async fn run_command(&mut self, command: &[String], working_dir: Option<&str>) -> Result<(), String> { -        self.client.send_typed(&ClientProto::command(CommandInfo::started(command, working_dir, 1))) -            .await.unwrap(); +        self.runner_ctx.report_command_info(CommandInfo::started(command, working_dir, 1)).await.unwrap();          let (cmd, human_name) = Self::prep_command(command, working_dir);          let cmd_res = self.execute_command_and_report(cmd, &format!("{} log", human_name), &human_name).await?; -        self.client.send_typed(&ClientProto::command(CommandInfo::finished(cmd_res.code(), 1))) -            .await.unwrap(); - +        self.runner_ctx.report_command_info(CommandInfo::finished(cmd_res.code(), 1)).await.unwrap();          if !cmd_res.success() {              return Err(format!("{} failed: {:?}", &human_name, cmd_res)); @@ -361,7 +401,7 @@ impl RunningJob {      }  } -impl RunnerClient { +impl RemoteServerRunner {      async fn new(host: &str, mut sender: hyper::body::Sender, mut res: Response) -> Result<Self, String> {          if res.status() != StatusCode::OK {              return Err(format!("server returned a bad response: {:?}, response itself: {:?}", res.status(), res)); @@ -483,7 +523,7 @@ async fn main() {          match poll {              Ok(mut res) => { -                let mut client = match RunnerClient::new("ci.butactuallyin.space:9876", sender, res).await { +                let mut client = match RemoteServerRunner::new("ci.butactuallyin.space:9876", sender, res).await {                      Ok(client) => client,                      Err(e) => {                          eprintln!("failed to initialize client: {:?}", e); | 
