summaryrefslogtreecommitdiff
path: root/ci-runner/src/main.rs
diff options
context:
space:
mode:
authoriximeow <me@iximeow.net>2023-10-29 15:43:03 -0700
committeriximeow <me@iximeow.net>2023-10-29 15:43:03 -0700
commitc0ed2168c6b39301511b84f52bd3f7b57a39024d (patch)
tree28ce2666e1d6ad052a31cbdb9a1c487f83a91e7a /ci-runner/src/main.rs
parent6f78ee3f15421b3450428e711659b28be0eccd7b (diff)
no more warnings :)runner-local-mode
Diffstat (limited to 'ci-runner/src/main.rs')
-rw-r--r--ci-runner/src/main.rs55
1 files changed, 28 insertions, 27 deletions
diff --git a/ci-runner/src/main.rs b/ci-runner/src/main.rs
index f5941fa..6ed6538 100644
--- a/ci-runner/src/main.rs
+++ b/ci-runner/src/main.rs
@@ -6,14 +6,10 @@ use reqwest::{StatusCode, Response};
use tokio::process::Command;
use std::process::Stdio;
use std::process::ExitStatus;
-use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
-use serde_json::json;
+use tokio::io::AsyncWrite;
use serde::{Deserialize, de::DeserializeOwned, Serialize};
-use std::task::{Context, Poll};
-use std::pin::Pin;
use std::marker::Unpin;
-use std::future::Future;
-use std::path::{Path, PathBuf};
+use std::path::PathBuf;
use ci_lib_native::io;
use ci_lib_native::io::{ArtifactStream, VecSink};
@@ -23,6 +19,7 @@ mod lua;
use crate::lua::CommandOutput;
+#[allow(dead_code)]
#[derive(Debug)]
enum WorkAcquireError {
Reqwest(reqwest::Error),
@@ -43,6 +40,7 @@ trait Runner: Send + Sync + 'static {
async fn create_artifact(&self, name: &str, desc: &str, build_token: &str) -> Result<ArtifactStream, String>;
}
+#[allow(dead_code)]
struct LocalRunner {
working_dir: PathBuf,
current_job: Option<RequestedJob>,
@@ -68,7 +66,7 @@ impl Runner for LocalRunner {
println!("metric reported: {} = {}", name, value);
Ok(())
}
- async fn create_artifact(&self, name: &str, desc: &str, build_token: &str) -> Result<ArtifactStream, String> {
+ async fn create_artifact(&self, _name: &str, _desc: &str, _build_token: &str) -> Result<ArtifactStream, String> {
Err("can't create artifacts yet".to_string())
}
}
@@ -77,9 +75,11 @@ impl Runner for LocalRunner {
/// a task", including reporting metrics and statuses back to the CI server.
struct RemoteServerRunner {
http: reqwest::Client,
+ #[allow(dead_code)]
host: String,
tx: hyper::body::Sender,
rx: Response,
+ #[allow(dead_code)]
current_job: Option<RequestedJob>,
}
@@ -103,7 +103,7 @@ impl Runner for RemoteServerRunner {
.map_err(|e| format!("failed to send metric {}: {:?})", name, e))
}
async fn create_artifact(&self, name: &str, desc: &str, build_token: &str) -> Result<ArtifactStream, String> {
- let (mut sender, body) = hyper::Body::channel();
+ let (sender, body) = hyper::Body::channel();
let resp = self.http.post("https://ci.butactuallyin.space:9876/api/artifact")
.header("user-agent", "ci-butactuallyin-space-runner")
.header("x-task-token", build_token)
@@ -147,6 +147,7 @@ impl RunningJob {
struct JobEnv {
lua: lua::BuildEnv,
+ #[allow(dead_code)]
job: Arc<Mutex<Box<RunningJob>>>,
}
@@ -175,6 +176,7 @@ pub struct RunningJob {
current_step: StepTracker,
}
+#[allow(dead_code)]
enum RepoError {
CloneFailedIdk { exit_code: ExitStatus },
CheckoutFailedIdk { exit_code: ExitStatus },
@@ -259,7 +261,7 @@ impl RunningJob {
Ok(())
}
- async fn execute_command_and_report(&self, mut command: Command, name: &str, desc: &str) -> Result<ExitStatus, String> {
+ async fn execute_command_and_report(&self, command: Command, name: &str, desc: &str) -> Result<ExitStatus, String> {
let stdout_artifact = self.create_artifact(
&format!("{} (stdout)", name),
&format!("{} (stdout)", desc)
@@ -274,7 +276,7 @@ impl RunningJob {
Ok(exit_status)
}
- async fn execute_command_capture_output(&self, mut command: Command, name: &str, desc: &str) -> Result<crate::lua::CommandOutput, String> {
+ async fn execute_command_capture_output(&self, command: Command, name: &str, desc: &str) -> Result<crate::lua::CommandOutput, String> {
let stdout_collector = VecSink::new();
let stderr_collector = VecSink::new();
@@ -287,7 +289,7 @@ impl RunningJob {
})
}
- async fn execute_command(&self, mut command: Command, name: &str, desc: &str, mut stdout_reporter: impl AsyncWrite + Unpin + Send + 'static, mut stderr_reporter: impl AsyncWrite + Unpin + Send + 'static) -> Result<ExitStatus, String> {
+ async fn execute_command(&self, mut command: Command, name: &str, _desc: &str, mut stdout_reporter: impl AsyncWrite + Unpin + Send + 'static, mut stderr_reporter: impl AsyncWrite + Unpin + Send + 'static) -> Result<ExitStatus, String> {
eprintln!("[.] running {}", name);
let mut child = command
@@ -327,7 +329,7 @@ impl RunningJob {
let checkout_res = ctx.lock().unwrap().clone_remote().await;
- if let Err(e) = checkout_res {
+ if let Err(_e) = checkout_res {
let status = "bad_ref";
let status = TaskInfo::finished(status);
eprintln!("checkout failed, reporting status: {:?}", status);
@@ -444,7 +446,7 @@ impl RunningJob {
}
impl RemoteServerRunner {
- async fn new(host: &str, mut sender: hyper::body::Sender, mut res: Response) -> Result<Self, String> {
+ async fn new(host: &str, sender: hyper::body::Sender, mut res: Response) -> Result<Self, String> {
if res.status() != StatusCode::OK {
return Err(format!("server returned a bad response: {:?}, response itself: {:?}", res.status(), res));
}
@@ -467,11 +469,18 @@ impl RemoteServerRunner {
})
}
- async fn wait_for_work(&mut self, accepted_pushers: Option<&[String]>) -> Result<Option<RequestedJob>, WorkAcquireError> {
+ async fn wait_for_work(&mut self, _accepted_pushers: Option<&[String]>) -> Result<Option<RequestedJob>, WorkAcquireError> {
loop {
let message = self.recv_typed::<ClientProto>().await;
match message {
Ok(Some(ClientProto::NewTask(new_task))) => {
+ // TODO: verify that `new_task` is for a commit authored by someone we're
+ // willing to run work for.
+ //
+ // we're also trusting the server to only tell us about work we would be
+ // interested in running, so if this rejects a task it's a server bug that we
+ // got the task in the first place or a client bug that the list of accepted
+ // pushers varied.
return Ok(Some(new_task));
},
Ok(Some(ClientProto::Ping)) => {
@@ -491,10 +500,6 @@ impl RemoteServerRunner {
}
}
- async fn recv(&mut self) -> Result<Option<serde_json::Value>, String> {
- self.recv_typed().await
- }
-
async fn recv_typed<T: DeserializeOwned>(&mut self) -> Result<Option<T>, String> {
match self.rx.chunk().await {
Ok(Some(chunk)) => {
@@ -511,10 +516,6 @@ impl RemoteServerRunner {
}
}
- async fn send(&mut self, value: serde_json::Value) -> Result<(), String> {
- self.send_typed(&value).await
- }
-
async fn send_typed<T: Serialize>(&mut self, t: &T) -> Result<(), String> {
self.tx.send_data(
serde_json::to_vec(t)
@@ -546,13 +547,13 @@ async fn main() {
}
}
-async fn run_local(config_path: String) {
+async fn run_local(_config_path: String) {
let job = RequestedJob {
commit: "current commit?".to_string(),
remote_url: "cwd?".to_string(),
build_token: "n/a".to_string(),
};
- let mut job = RunningJob::local_from_job(job);
+ let job = RunningJob::local_from_job(job);
job.run().await;
}
@@ -583,7 +584,7 @@ async fn run_remote(config_path: String) {
.await;
match poll {
- Ok(mut res) => {
+ Ok(res) => {
let mut client = match RemoteServerRunner::new("ci.butactuallyin.space:9876", sender, res).await {
Ok(client) => client,
Err(e) => {
@@ -609,7 +610,7 @@ async fn run_remote(config_path: String) {
eprintln!("doing {:?}", job);
- let mut job = RunningJob::remote_from_job(job, client);
+ let job = RunningJob::remote_from_job(job, client);
job.run().await;
std::thread::sleep(Duration::from_millis(10000));
},
@@ -657,7 +658,7 @@ mod host_info {
let cpu_mhzes: Vec<&String> = cpu_lines.iter().filter(|line| line.starts_with("cpu MHz")).collect();
match cpu_mhzes.get(cpu as usize) {
Some(mhz) => {
- let mut line_parts = cpu_mhzes[cpu as usize].split(":");
+ let mut line_parts = mhz.split(":");
let _ = line_parts.next();
let mhz = line_parts.next().unwrap().trim();
let mhz: f64 = mhz.parse().unwrap();