summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoriximeow <me@iximeow.net>2023-10-29 13:08:37 -0700
committeriximeow <me@iximeow.net>2023-10-29 13:14:32 -0700
commitdb4c638182d1c6b15bdc8d94f7baa691197e9a56 (patch)
tree8fb0ae2a00a0f1f2f3c9a2ce08f770da14acf15e
parent6c5f395978b34e7b4626ea8605f0eca20ab730d8 (diff)
split runner into local/remote logic
ideally `trait Runner` describes whatever should have behavioral differences when a task is run locally vs in conjunction with a remote server. then everything consuming a `dyn Runner` will not need to change. if i've done it right.
-rw-r--r--Cargo.lock1
-rw-r--r--ci-lib-core/src/protocol.rs4
-rw-r--r--ci-runner/Cargo.toml1
-rw-r--r--ci-runner/src/lua/mod.rs25
-rw-r--r--ci-runner/src/main.rs122
5 files changed, 99 insertions, 54 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 1ea5f9a..10919fa 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -380,6 +380,7 @@ dependencies = [
name = "ci-runner"
version = "0.0.1"
dependencies = [
+ "async-trait",
"ci-lib-core",
"ci-lib-native",
"hyper",
diff --git a/ci-lib-core/src/protocol.rs b/ci-lib-core/src/protocol.rs
index c7a9318..c987c89 100644
--- a/ci-lib-core/src/protocol.rs
+++ b/ci-lib-core/src/protocol.rs
@@ -15,7 +15,7 @@ pub enum ClientProto {
Pong,
}
-#[derive(Serialize, Deserialize, Debug)]
+#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(tag = "command_info")]
#[serde(rename_all = "snake_case")]
pub enum CommandInfo {
@@ -23,7 +23,7 @@ pub enum CommandInfo {
Finished { exit_code: Option<i32>, id: u32 },
}
-#[derive(Serialize, Deserialize, Debug)]
+#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(tag = "task_info")]
#[serde(rename_all = "snake_case")]
pub enum TaskInfo {
diff --git a/ci-runner/Cargo.toml b/ci-runner/Cargo.toml
index 038ed14..c956fb5 100644
--- a/ci-runner/Cargo.toml
+++ b/ci-runner/Cargo.toml
@@ -13,6 +13,7 @@ path = "src/main.rs"
ci-lib-core = { path = "../ci-lib-core" }
ci-lib-native = { path = "../ci-lib-native" }
+async-trait = "*"
libc = "*"
serde = "*"
serde_derive = "*"
diff --git a/ci-runner/src/lua/mod.rs b/ci-runner/src/lua/mod.rs
index 62ac68b..92be5ce 100644
--- a/ci-runner/src/lua/mod.rs
+++ b/ci-runner/src/lua/mod.rs
@@ -1,3 +1,4 @@
+use crate::Runner;
use crate::RunningJob;
use rlua::prelude::*;
@@ -9,7 +10,7 @@ pub const DEFAULT_RUST_GOODFILE: &'static [u8] = include_bytes!("../../../config
pub struct BuildEnv {
lua: Lua,
- job: Arc<Mutex<RunningJob>>,
+ job: Arc<Mutex<Box<RunningJob>>>,
}
#[derive(Debug)]
@@ -26,6 +27,7 @@ pub struct CommandOutput {
}
mod lua_exports {
+ use crate::Runner;
use crate::RunningJob;
use crate::lua::{CommandOutput, RunParams};
@@ -115,7 +117,7 @@ mod lua_exports {
Ok((args, params))
}
- pub fn build_command_impl(command: LuaValue, params: LuaValue, job_ctx: Arc<Mutex<RunningJob>>) -> Result<(), rlua::Error> {
+ pub fn build_command_impl(command: LuaValue, params: LuaValue, job_ctx: Arc<Mutex<Box<RunningJob>>>) -> Result<(), rlua::Error> {
let (args, params) = collect_build_args(command, params)?;
eprintln!("args: {:?}", args);
eprintln!(" params: {:?}", params);
@@ -129,7 +131,7 @@ mod lua_exports {
})
}
- pub fn check_output_impl<'lua>(ctx: rlua::Context<'lua>, command: LuaValue<'lua>, params: LuaValue<'lua>, job_ctx: Arc<Mutex<RunningJob>>) -> Result<rlua::Table<'lua>, rlua::Error> {
+ pub fn check_output_impl<'lua>(ctx: rlua::Context<'lua>, command: LuaValue<'lua>, params: LuaValue<'lua>, job_ctx: Arc<Mutex<Box<RunningJob>>>) -> Result<rlua::Table<'lua>, rlua::Error> {
let (args, params) = collect_build_args(command, params)?;
eprintln!("args: {:?}", args);
eprintln!(" params: {:?}", params);
@@ -167,7 +169,7 @@ mod lua_exports {
Ok(())
}
- pub fn metric(name: String, value: String, job_ctx: Arc<Mutex<RunningJob>>) -> Result<(), rlua::Error> {
+ pub fn metric(name: String, value: String, job_ctx: Arc<Mutex<Box<RunningJob>>>) -> Result<(), rlua::Error> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
@@ -178,7 +180,7 @@ mod lua_exports {
})
}
- pub fn artifact(path: String, name: Option<String>, job_ctx: Arc<Mutex<RunningJob>>) -> Result<(), rlua::Error> {
+ pub fn artifact(path: String, name: Option<String>, job_ctx: Arc<Mutex<Box<RunningJob>>>) -> Result<(), rlua::Error> {
let path: PathBuf = path.into();
let default_name: String = match (path.file_name(), path.parent()) {
@@ -230,23 +232,24 @@ mod lua_exports {
}
pub mod step {
+ use crate::Runner;
use crate::RunningJob;
use std::sync::{Arc, Mutex};
- pub fn start(job_ref: Arc<Mutex<RunningJob>>, name: String) -> Result<(), rlua::Error> {
+ pub fn start(job_ref: Arc<Mutex<Box<RunningJob>>>, name: String) -> Result<(), rlua::Error> {
let mut job = job_ref.lock().unwrap();
job.current_step.clear();
job.current_step.push(name);
Ok(())
}
- pub fn push(job_ref: Arc<Mutex<RunningJob>>, name: String) -> Result<(), rlua::Error> {
+ pub fn push(job_ref: Arc<Mutex<Box<RunningJob>>>, name: String) -> Result<(), rlua::Error> {
let mut job = job_ref.lock().unwrap();
job.current_step.push(name);
Ok(())
}
- pub fn advance(job_ref: Arc<Mutex<RunningJob>>, name: String) -> Result<(), rlua::Error> {
+ pub fn advance(job_ref: Arc<Mutex<Box<RunningJob>>>, name: String) -> Result<(), rlua::Error> {
let mut job = job_ref.lock().unwrap();
job.current_step.pop();
job.current_step.push(name);
@@ -257,14 +260,14 @@ mod lua_exports {
struct DeclEnv<'lua, 'env> {
lua_ctx: &'env rlua::Context<'lua>,
- job_ref: &'env Arc<Mutex<RunningJob>>,
+ job_ref: &'env Arc<Mutex<Box<RunningJob>>>,
}
impl<'lua, 'env> DeclEnv<'lua, 'env> {
fn create_function<A, R, F>(&self, name: &str, f: F) -> Result<rlua::Function<'lua>, String>
where
A: FromLuaMulti<'lua>,
R: ToLuaMulti<'lua>,
- F: 'static + Send + Fn(rlua::Context<'lua>, Arc<Mutex<RunningJob>>, A) -> Result<R, rlua::Error> {
+ F: 'static + Send + Fn(rlua::Context<'lua>, Arc<Mutex<Box<RunningJob>>>, A) -> Result<R, rlua::Error> {
let job_ref = Arc::clone(self.job_ref);
self.lua_ctx.create_function(move |ctx, args| {
@@ -276,7 +279,7 @@ impl<'lua, 'env> DeclEnv<'lua, 'env> {
}
impl BuildEnv {
- pub fn new(job: &Arc<Mutex<RunningJob>>) -> Self {
+ pub fn new(job: &Arc<Mutex<Box<RunningJob>>>) -> Self {
let env = BuildEnv {
lua: Lua::new(),
job: Arc::clone(job),
diff --git a/ci-runner/src/main.rs b/ci-runner/src/main.rs
index 41f5594..fad852b 100644
--- a/ci-runner/src/main.rs
+++ b/ci-runner/src/main.rs
@@ -12,6 +12,8 @@ use serde::{Deserialize, de::DeserializeOwned, Serialize};
use std::task::{Context, Poll};
use std::pin::Pin;
use std::marker::Unpin;
+use std::future::Future;
+use std::path::Path;
use ci_lib_native::io;
use ci_lib_native::io::{ArtifactStream, VecSink};
@@ -28,7 +30,22 @@ enum WorkAcquireError {
Protocol(String),
}
-struct RunnerClient {
+/// `Runner` describes the logic bridging a local task runner with whatever causes the task runner
+/// to execute. most concretely, `Runner` is the implementation that varies between "run this
+/// goodfile locally right here" and "run a remotely-requested goodfile and report results back to
+/// a server"
+#[async_trait::async_trait]
+trait Runner: Send + Sync + 'static {
+ async fn report_start(&mut self) -> Result<(), String>;
+ async fn report_task_status(&mut self, status: TaskInfo) -> Result<(), String>;
+ async fn report_command_info(&mut self, info: CommandInfo) -> Result<(), String>;
+ async fn send_metric(&mut self, name: &str, value: String) -> Result<(), String>;
+ async fn create_artifact(&self, name: &str, desc: &str, build_token: &str) -> Result<ArtifactStream, String>;
+}
+
+/// `RmoteServerRunner` is the implementation of `Runner` supporting "a remote server has given me
+/// a task", including reporting metrics and statuses back to the CI server.
+struct RemoteServerRunner {
http: reqwest::Client,
host: String,
tx: hyper::body::Sender,
@@ -36,11 +53,51 @@ struct RunnerClient {
current_job: Option<RequestedJob>,
}
+#[async_trait::async_trait]
+impl Runner for RemoteServerRunner {
+ async fn report_start(&mut self) -> Result<(), String> {
+ self.send_typed(&ClientProto::Started).await
+ }
+ async fn report_task_status(&mut self, status: TaskInfo) -> Result<(), String> {
+ self.send_typed(&ClientProto::task_status(status))
+ .await
+ }
+ async fn report_command_info(&mut self, info: CommandInfo) -> Result<(), String> {
+ self.send_typed(&ClientProto::command(info))
+ .await
+ .map_err(|e| format!("failed to report command info: {:?})", e))
+ }
+ async fn send_metric(&mut self, name: &str, value: String) -> Result<(), String> {
+ self.send_typed(&ClientProto::metric(name, value))
+ .await
+ .map_err(|e| format!("failed to send metric {}: {:?})", name, e))
+ }
+ async fn create_artifact(&self, name: &str, desc: &str, build_token: &str) -> Result<ArtifactStream, String> {
+ let (mut sender, body) = hyper::Body::channel();
+ let resp = self.http.post("https://ci.butactuallyin.space:9876/api/artifact")
+ .header("user-agent", "ci-butactuallyin-space-runner")
+ .header("x-task-token", build_token)
+ .header("x-artifact-name", name)
+ .header("x-artifact-desc", desc)
+ .body(body)
+ .send()
+ .await
+ .map_err(|e| format!("unable to send request: {:?}", e))?;
+
+ if resp.status() == StatusCode::OK {
+ eprintln!("[+] artifact '{}' started", name);
+ Ok(ArtifactStream::new(sender))
+ } else {
+ Err(format!("[-] unable to create artifact: {:?}", resp))
+ }
+ }
+}
+
impl RunningJob {
- fn from_job(job: RequestedJob, client: RunnerClient) -> Self {
+ fn from_job(job: RequestedJob, client: RemoteServerRunner) -> Self {
Self {
job,
- client,
+ runner_ctx: Box::new(client) as Box<dyn Runner>,
current_step: StepTracker::new(),
}
}
@@ -48,11 +105,11 @@ impl RunningJob {
struct JobEnv {
lua: lua::BuildEnv,
- job: Arc<Mutex<RunningJob>>,
+ job: Arc<Mutex<Box<RunningJob>>>,
}
impl JobEnv {
- fn new(job: &Arc<Mutex<RunningJob>>) -> Self {
+ fn new(job: &Arc<Mutex<Box<RunningJob>>>) -> Self {
let lua = lua::BuildEnv::new(job);
JobEnv {
lua,
@@ -72,7 +129,7 @@ impl JobEnv {
pub struct RunningJob {
job: RequestedJob,
- client: RunnerClient,
+ runner_ctx: Box<dyn Runner>,
current_step: StepTracker,
}
@@ -112,30 +169,12 @@ impl StepTracker {
impl RunningJob {
async fn send_metric(&mut self, name: &str, value: String) -> Result<(), String> {
- self.client.send_typed(&ClientProto::metric(name, value))
- .await
- .map_err(|e| format!("failed to send metric {}: {:?})", name, e))
+ self.runner_ctx.send_metric(name, value).await
}
// TODO: panics if hyper finds the channel is closed. hum
async fn create_artifact(&self, name: &str, desc: &str) -> Result<ArtifactStream, String> {
- let (mut sender, body) = hyper::Body::channel();
- let resp = self.client.http.post("https://ci.butactuallyin.space:9876/api/artifact")
- .header("user-agent", "ci-butactuallyin-space-runner")
- .header("x-task-token", &self.job.build_token)
- .header("x-artifact-name", name)
- .header("x-artifact-desc", desc)
- .body(body)
- .send()
- .await
- .map_err(|e| format!("unable to send request: {:?}", e))?;
-
- if resp.status() == StatusCode::OK {
- eprintln!("[+] artifact '{}' started", name);
- Ok(ArtifactStream::new(sender))
- } else {
- Err(format!("[-] unable to create artifact: {:?}", resp))
- }
+ self.runner_ctx.create_artifact(name, desc, &self.job.build_token).await
}
async fn clone_remote(&self) -> Result<(), RepoError> {
@@ -237,21 +276,21 @@ impl RunningJob {
}
async fn run(mut self) {
- self.client.send_typed(&ClientProto::Started).await.unwrap();
+ self.runner_ctx.report_start();
std::fs::remove_dir_all("tmpdir").unwrap();
std::fs::create_dir("tmpdir").unwrap();
- let ctx = Arc::new(Mutex::new(self));
+ let ctx = Arc::new(Mutex::new(Box::new(self) as Box<RunningJob>));
let checkout_res = ctx.lock().unwrap().clone_remote().await;
if let Err(e) = checkout_res {
let status = "bad_ref";
- let status = ClientProto::task_status(TaskInfo::finished(status));
+ let status = TaskInfo::finished(status);
eprintln!("checkout failed, reporting status: {:?}", status);
- let res = ctx.lock().unwrap().client.send_typed(&status).await;
+ let res = ctx.lock().unwrap().runner_ctx.report_task_status(status).await;
if let Err(e) = res {
eprintln!("[!] FAILED TO REPORT JOB STATUS ({}): {:?}", "success", e);
}
@@ -292,19 +331,23 @@ impl RunningJob {
match res {
Ok(status) => {
eprintln!("[+] job success!");
- let status = ClientProto::task_status(TaskInfo::finished(status));
+ let status = TaskInfo::finished(status);
eprintln!("reporting status: {:?}", status);
- let res = ctx.lock().unwrap().client.send_typed(&status).await;
+ let res = ctx.lock().unwrap().runner_ctx.report_task_status(status).await;
if let Err(e) = res {
eprintln!("[!] FAILED TO REPORT JOB STATUS ({}): {:?}", "success", e);
}
}
Err((status, lua_err)) => {
eprintln!("[-] job error: {}", status);
+ let status = TaskInfo::interrupted(status, lua_err.to_string());
- let status = ClientProto::task_status(TaskInfo::interrupted(status, lua_err.to_string()));
- let res = ctx.lock().unwrap().client.send_typed(&status).await;
+ let res = ctx
+ .lock()
+ .unwrap()
+ .runner_ctx
+ .report_task_status(status.clone()).await;
if let Err(e) = res {
eprintln!("[!] FAILED TO REPORT JOB STATUS ({:?}): {:?}", status, e);
}
@@ -342,16 +385,13 @@ impl RunningJob {
}
async fn run_command(&mut self, command: &[String], working_dir: Option<&str>) -> Result<(), String> {
- self.client.send_typed(&ClientProto::command(CommandInfo::started(command, working_dir, 1)))
- .await.unwrap();
+ self.runner_ctx.report_command_info(CommandInfo::started(command, working_dir, 1)).await.unwrap();
let (cmd, human_name) = Self::prep_command(command, working_dir);
let cmd_res = self.execute_command_and_report(cmd, &format!("{} log", human_name), &human_name).await?;
- self.client.send_typed(&ClientProto::command(CommandInfo::finished(cmd_res.code(), 1)))
- .await.unwrap();
-
+ self.runner_ctx.report_command_info(CommandInfo::finished(cmd_res.code(), 1)).await.unwrap();
if !cmd_res.success() {
return Err(format!("{} failed: {:?}", &human_name, cmd_res));
@@ -361,7 +401,7 @@ impl RunningJob {
}
}
-impl RunnerClient {
+impl RemoteServerRunner {
async fn new(host: &str, mut sender: hyper::body::Sender, mut res: Response) -> Result<Self, String> {
if res.status() != StatusCode::OK {
return Err(format!("server returned a bad response: {:?}, response itself: {:?}", res.status(), res));
@@ -483,7 +523,7 @@ async fn main() {
match poll {
Ok(mut res) => {
- let mut client = match RunnerClient::new("ci.butactuallyin.space:9876", sender, res).await {
+ let mut client = match RemoteServerRunner::new("ci.butactuallyin.space:9876", sender, res).await {
Ok(client) => client,
Err(e) => {
eprintln!("failed to initialize client: {:?}", e);