summaryrefslogtreecommitdiff
path: root/ci-runner/src/main.rs
diff options
context:
space:
mode:
Diffstat (limited to 'ci-runner/src/main.rs')
-rw-r--r--ci-runner/src/main.rs122
1 files changed, 81 insertions, 41 deletions
diff --git a/ci-runner/src/main.rs b/ci-runner/src/main.rs
index 41f5594..fad852b 100644
--- a/ci-runner/src/main.rs
+++ b/ci-runner/src/main.rs
@@ -12,6 +12,8 @@ use serde::{Deserialize, de::DeserializeOwned, Serialize};
use std::task::{Context, Poll};
use std::pin::Pin;
use std::marker::Unpin;
+use std::future::Future;
+use std::path::Path;
use ci_lib_native::io;
use ci_lib_native::io::{ArtifactStream, VecSink};
@@ -28,7 +30,22 @@ enum WorkAcquireError {
Protocol(String),
}
-struct RunnerClient {
+/// `Runner` describes the logic bridging a local task runner with whatever causes the task runner
+/// to execute. most concretely, `Runner` is the implementation that varies between "run this
+/// goodfile locally right here" and "run a remotely-requested goodfile and report results back to
+/// a server"
+#[async_trait::async_trait]
+trait Runner: Send + Sync + 'static {
+ async fn report_start(&mut self) -> Result<(), String>;
+ async fn report_task_status(&mut self, status: TaskInfo) -> Result<(), String>;
+ async fn report_command_info(&mut self, info: CommandInfo) -> Result<(), String>;
+ async fn send_metric(&mut self, name: &str, value: String) -> Result<(), String>;
+ async fn create_artifact(&self, name: &str, desc: &str, build_token: &str) -> Result<ArtifactStream, String>;
+}
+
+/// `RmoteServerRunner` is the implementation of `Runner` supporting "a remote server has given me
+/// a task", including reporting metrics and statuses back to the CI server.
+struct RemoteServerRunner {
http: reqwest::Client,
host: String,
tx: hyper::body::Sender,
@@ -36,11 +53,51 @@ struct RunnerClient {
current_job: Option<RequestedJob>,
}
+#[async_trait::async_trait]
+impl Runner for RemoteServerRunner {
+ async fn report_start(&mut self) -> Result<(), String> {
+ self.send_typed(&ClientProto::Started).await
+ }
+ async fn report_task_status(&mut self, status: TaskInfo) -> Result<(), String> {
+ self.send_typed(&ClientProto::task_status(status))
+ .await
+ }
+ async fn report_command_info(&mut self, info: CommandInfo) -> Result<(), String> {
+ self.send_typed(&ClientProto::command(info))
+ .await
+ .map_err(|e| format!("failed to report command info: {:?})", e))
+ }
+ async fn send_metric(&mut self, name: &str, value: String) -> Result<(), String> {
+ self.send_typed(&ClientProto::metric(name, value))
+ .await
+ .map_err(|e| format!("failed to send metric {}: {:?})", name, e))
+ }
+ async fn create_artifact(&self, name: &str, desc: &str, build_token: &str) -> Result<ArtifactStream, String> {
+ let (mut sender, body) = hyper::Body::channel();
+ let resp = self.http.post("https://ci.butactuallyin.space:9876/api/artifact")
+ .header("user-agent", "ci-butactuallyin-space-runner")
+ .header("x-task-token", build_token)
+ .header("x-artifact-name", name)
+ .header("x-artifact-desc", desc)
+ .body(body)
+ .send()
+ .await
+ .map_err(|e| format!("unable to send request: {:?}", e))?;
+
+ if resp.status() == StatusCode::OK {
+ eprintln!("[+] artifact '{}' started", name);
+ Ok(ArtifactStream::new(sender))
+ } else {
+ Err(format!("[-] unable to create artifact: {:?}", resp))
+ }
+ }
+}
+
impl RunningJob {
- fn from_job(job: RequestedJob, client: RunnerClient) -> Self {
+ fn from_job(job: RequestedJob, client: RemoteServerRunner) -> Self {
Self {
job,
- client,
+ runner_ctx: Box::new(client) as Box<dyn Runner>,
current_step: StepTracker::new(),
}
}
@@ -48,11 +105,11 @@ impl RunningJob {
struct JobEnv {
lua: lua::BuildEnv,
- job: Arc<Mutex<RunningJob>>,
+ job: Arc<Mutex<Box<RunningJob>>>,
}
impl JobEnv {
- fn new(job: &Arc<Mutex<RunningJob>>) -> Self {
+ fn new(job: &Arc<Mutex<Box<RunningJob>>>) -> Self {
let lua = lua::BuildEnv::new(job);
JobEnv {
lua,
@@ -72,7 +129,7 @@ impl JobEnv {
pub struct RunningJob {
job: RequestedJob,
- client: RunnerClient,
+ runner_ctx: Box<dyn Runner>,
current_step: StepTracker,
}
@@ -112,30 +169,12 @@ impl StepTracker {
impl RunningJob {
async fn send_metric(&mut self, name: &str, value: String) -> Result<(), String> {
- self.client.send_typed(&ClientProto::metric(name, value))
- .await
- .map_err(|e| format!("failed to send metric {}: {:?})", name, e))
+ self.runner_ctx.send_metric(name, value).await
}
// TODO: panics if hyper finds the channel is closed. hum
async fn create_artifact(&self, name: &str, desc: &str) -> Result<ArtifactStream, String> {
- let (mut sender, body) = hyper::Body::channel();
- let resp = self.client.http.post("https://ci.butactuallyin.space:9876/api/artifact")
- .header("user-agent", "ci-butactuallyin-space-runner")
- .header("x-task-token", &self.job.build_token)
- .header("x-artifact-name", name)
- .header("x-artifact-desc", desc)
- .body(body)
- .send()
- .await
- .map_err(|e| format!("unable to send request: {:?}", e))?;
-
- if resp.status() == StatusCode::OK {
- eprintln!("[+] artifact '{}' started", name);
- Ok(ArtifactStream::new(sender))
- } else {
- Err(format!("[-] unable to create artifact: {:?}", resp))
- }
+ self.runner_ctx.create_artifact(name, desc, &self.job.build_token).await
}
async fn clone_remote(&self) -> Result<(), RepoError> {
@@ -237,21 +276,21 @@ impl RunningJob {
}
async fn run(mut self) {
- self.client.send_typed(&ClientProto::Started).await.unwrap();
+ self.runner_ctx.report_start();
std::fs::remove_dir_all("tmpdir").unwrap();
std::fs::create_dir("tmpdir").unwrap();
- let ctx = Arc::new(Mutex::new(self));
+ let ctx = Arc::new(Mutex::new(Box::new(self) as Box<RunningJob>));
let checkout_res = ctx.lock().unwrap().clone_remote().await;
if let Err(e) = checkout_res {
let status = "bad_ref";
- let status = ClientProto::task_status(TaskInfo::finished(status));
+ let status = TaskInfo::finished(status);
eprintln!("checkout failed, reporting status: {:?}", status);
- let res = ctx.lock().unwrap().client.send_typed(&status).await;
+ let res = ctx.lock().unwrap().runner_ctx.report_task_status(status).await;
if let Err(e) = res {
eprintln!("[!] FAILED TO REPORT JOB STATUS ({}): {:?}", "success", e);
}
@@ -292,19 +331,23 @@ impl RunningJob {
match res {
Ok(status) => {
eprintln!("[+] job success!");
- let status = ClientProto::task_status(TaskInfo::finished(status));
+ let status = TaskInfo::finished(status);
eprintln!("reporting status: {:?}", status);
- let res = ctx.lock().unwrap().client.send_typed(&status).await;
+ let res = ctx.lock().unwrap().runner_ctx.report_task_status(status).await;
if let Err(e) = res {
eprintln!("[!] FAILED TO REPORT JOB STATUS ({}): {:?}", "success", e);
}
}
Err((status, lua_err)) => {
eprintln!("[-] job error: {}", status);
+ let status = TaskInfo::interrupted(status, lua_err.to_string());
- let status = ClientProto::task_status(TaskInfo::interrupted(status, lua_err.to_string()));
- let res = ctx.lock().unwrap().client.send_typed(&status).await;
+ let res = ctx
+ .lock()
+ .unwrap()
+ .runner_ctx
+ .report_task_status(status.clone()).await;
if let Err(e) = res {
eprintln!("[!] FAILED TO REPORT JOB STATUS ({:?}): {:?}", status, e);
}
@@ -342,16 +385,13 @@ impl RunningJob {
}
async fn run_command(&mut self, command: &[String], working_dir: Option<&str>) -> Result<(), String> {
- self.client.send_typed(&ClientProto::command(CommandInfo::started(command, working_dir, 1)))
- .await.unwrap();
+ self.runner_ctx.report_command_info(CommandInfo::started(command, working_dir, 1)).await.unwrap();
let (cmd, human_name) = Self::prep_command(command, working_dir);
let cmd_res = self.execute_command_and_report(cmd, &format!("{} log", human_name), &human_name).await?;
- self.client.send_typed(&ClientProto::command(CommandInfo::finished(cmd_res.code(), 1)))
- .await.unwrap();
-
+ self.runner_ctx.report_command_info(CommandInfo::finished(cmd_res.code(), 1)).await.unwrap();
if !cmd_res.success() {
return Err(format!("{} failed: {:?}", &human_name, cmd_res));
@@ -361,7 +401,7 @@ impl RunningJob {
}
}
-impl RunnerClient {
+impl RemoteServerRunner {
async fn new(host: &str, mut sender: hyper::body::Sender, mut res: Response) -> Result<Self, String> {
if res.status() != StatusCode::OK {
return Err(format!("server returned a bad response: {:?}, response itself: {:?}", res.status(), res));
@@ -483,7 +523,7 @@ async fn main() {
match poll {
Ok(mut res) => {
- let mut client = match RunnerClient::new("ci.butactuallyin.space:9876", sender, res).await {
+ let mut client = match RemoteServerRunner::new("ci.butactuallyin.space:9876", sender, res).await {
Ok(client) => client,
Err(e) => {
eprintln!("failed to initialize client: {:?}", e);