diff options
author | iximeow <me@iximeow.net> | 2023-10-29 15:43:03 -0700 |
---|---|---|
committer | iximeow <me@iximeow.net> | 2023-10-29 15:43:03 -0700 |
commit | c0ed2168c6b39301511b84f52bd3f7b57a39024d (patch) | |
tree | 28ce2666e1d6ad052a31cbdb9a1c487f83a91e7a /ci-runner/src/main.rs | |
parent | 6f78ee3f15421b3450428e711659b28be0eccd7b (diff) |
no more warnings :)runner-local-mode
Diffstat (limited to 'ci-runner/src/main.rs')
-rw-r--r-- | ci-runner/src/main.rs | 55 |
1 files changed, 28 insertions, 27 deletions
diff --git a/ci-runner/src/main.rs b/ci-runner/src/main.rs index f5941fa..6ed6538 100644 --- a/ci-runner/src/main.rs +++ b/ci-runner/src/main.rs @@ -6,14 +6,10 @@ use reqwest::{StatusCode, Response}; use tokio::process::Command; use std::process::Stdio; use std::process::ExitStatus; -use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; -use serde_json::json; +use tokio::io::AsyncWrite; use serde::{Deserialize, de::DeserializeOwned, Serialize}; -use std::task::{Context, Poll}; -use std::pin::Pin; use std::marker::Unpin; -use std::future::Future; -use std::path::{Path, PathBuf}; +use std::path::PathBuf; use ci_lib_native::io; use ci_lib_native::io::{ArtifactStream, VecSink}; @@ -23,6 +19,7 @@ mod lua; use crate::lua::CommandOutput; +#[allow(dead_code)] #[derive(Debug)] enum WorkAcquireError { Reqwest(reqwest::Error), @@ -43,6 +40,7 @@ trait Runner: Send + Sync + 'static { async fn create_artifact(&self, name: &str, desc: &str, build_token: &str) -> Result<ArtifactStream, String>; } +#[allow(dead_code)] struct LocalRunner { working_dir: PathBuf, current_job: Option<RequestedJob>, @@ -68,7 +66,7 @@ impl Runner for LocalRunner { println!("metric reported: {} = {}", name, value); Ok(()) } - async fn create_artifact(&self, name: &str, desc: &str, build_token: &str) -> Result<ArtifactStream, String> { + async fn create_artifact(&self, _name: &str, _desc: &str, _build_token: &str) -> Result<ArtifactStream, String> { Err("can't create artifacts yet".to_string()) } } @@ -77,9 +75,11 @@ impl Runner for LocalRunner { /// a task", including reporting metrics and statuses back to the CI server. struct RemoteServerRunner { http: reqwest::Client, + #[allow(dead_code)] host: String, tx: hyper::body::Sender, rx: Response, + #[allow(dead_code)] current_job: Option<RequestedJob>, } @@ -103,7 +103,7 @@ impl Runner for RemoteServerRunner { .map_err(|e| format!("failed to send metric {}: {:?})", name, e)) } async fn create_artifact(&self, name: &str, desc: &str, build_token: &str) -> Result<ArtifactStream, String> { - let (mut sender, body) = hyper::Body::channel(); + let (sender, body) = hyper::Body::channel(); let resp = self.http.post("https://ci.butactuallyin.space:9876/api/artifact") .header("user-agent", "ci-butactuallyin-space-runner") .header("x-task-token", build_token) @@ -147,6 +147,7 @@ impl RunningJob { struct JobEnv { lua: lua::BuildEnv, + #[allow(dead_code)] job: Arc<Mutex<Box<RunningJob>>>, } @@ -175,6 +176,7 @@ pub struct RunningJob { current_step: StepTracker, } +#[allow(dead_code)] enum RepoError { CloneFailedIdk { exit_code: ExitStatus }, CheckoutFailedIdk { exit_code: ExitStatus }, @@ -259,7 +261,7 @@ impl RunningJob { Ok(()) } - async fn execute_command_and_report(&self, mut command: Command, name: &str, desc: &str) -> Result<ExitStatus, String> { + async fn execute_command_and_report(&self, command: Command, name: &str, desc: &str) -> Result<ExitStatus, String> { let stdout_artifact = self.create_artifact( &format!("{} (stdout)", name), &format!("{} (stdout)", desc) @@ -274,7 +276,7 @@ impl RunningJob { Ok(exit_status) } - async fn execute_command_capture_output(&self, mut command: Command, name: &str, desc: &str) -> Result<crate::lua::CommandOutput, String> { + async fn execute_command_capture_output(&self, command: Command, name: &str, desc: &str) -> Result<crate::lua::CommandOutput, String> { let stdout_collector = VecSink::new(); let stderr_collector = VecSink::new(); @@ -287,7 +289,7 @@ impl RunningJob { }) } - async fn execute_command(&self, mut command: Command, name: &str, desc: &str, mut stdout_reporter: impl AsyncWrite + Unpin + Send + 'static, mut stderr_reporter: impl AsyncWrite + Unpin + Send + 'static) -> Result<ExitStatus, String> { + async fn execute_command(&self, mut command: Command, name: &str, _desc: &str, mut stdout_reporter: impl AsyncWrite + Unpin + Send + 'static, mut stderr_reporter: impl AsyncWrite + Unpin + Send + 'static) -> Result<ExitStatus, String> { eprintln!("[.] running {}", name); let mut child = command @@ -327,7 +329,7 @@ impl RunningJob { let checkout_res = ctx.lock().unwrap().clone_remote().await; - if let Err(e) = checkout_res { + if let Err(_e) = checkout_res { let status = "bad_ref"; let status = TaskInfo::finished(status); eprintln!("checkout failed, reporting status: {:?}", status); @@ -444,7 +446,7 @@ impl RunningJob { } impl RemoteServerRunner { - async fn new(host: &str, mut sender: hyper::body::Sender, mut res: Response) -> Result<Self, String> { + async fn new(host: &str, sender: hyper::body::Sender, mut res: Response) -> Result<Self, String> { if res.status() != StatusCode::OK { return Err(format!("server returned a bad response: {:?}, response itself: {:?}", res.status(), res)); } @@ -467,11 +469,18 @@ impl RemoteServerRunner { }) } - async fn wait_for_work(&mut self, accepted_pushers: Option<&[String]>) -> Result<Option<RequestedJob>, WorkAcquireError> { + async fn wait_for_work(&mut self, _accepted_pushers: Option<&[String]>) -> Result<Option<RequestedJob>, WorkAcquireError> { loop { let message = self.recv_typed::<ClientProto>().await; match message { Ok(Some(ClientProto::NewTask(new_task))) => { + // TODO: verify that `new_task` is for a commit authored by someone we're + // willing to run work for. + // + // we're also trusting the server to only tell us about work we would be + // interested in running, so if this rejects a task it's a server bug that we + // got the task in the first place or a client bug that the list of accepted + // pushers varied. return Ok(Some(new_task)); }, Ok(Some(ClientProto::Ping)) => { @@ -491,10 +500,6 @@ impl RemoteServerRunner { } } - async fn recv(&mut self) -> Result<Option<serde_json::Value>, String> { - self.recv_typed().await - } - async fn recv_typed<T: DeserializeOwned>(&mut self) -> Result<Option<T>, String> { match self.rx.chunk().await { Ok(Some(chunk)) => { @@ -511,10 +516,6 @@ impl RemoteServerRunner { } } - async fn send(&mut self, value: serde_json::Value) -> Result<(), String> { - self.send_typed(&value).await - } - async fn send_typed<T: Serialize>(&mut self, t: &T) -> Result<(), String> { self.tx.send_data( serde_json::to_vec(t) @@ -546,13 +547,13 @@ async fn main() { } } -async fn run_local(config_path: String) { +async fn run_local(_config_path: String) { let job = RequestedJob { commit: "current commit?".to_string(), remote_url: "cwd?".to_string(), build_token: "n/a".to_string(), }; - let mut job = RunningJob::local_from_job(job); + let job = RunningJob::local_from_job(job); job.run().await; } @@ -583,7 +584,7 @@ async fn run_remote(config_path: String) { .await; match poll { - Ok(mut res) => { + Ok(res) => { let mut client = match RemoteServerRunner::new("ci.butactuallyin.space:9876", sender, res).await { Ok(client) => client, Err(e) => { @@ -609,7 +610,7 @@ async fn run_remote(config_path: String) { eprintln!("doing {:?}", job); - let mut job = RunningJob::remote_from_job(job, client); + let job = RunningJob::remote_from_job(job, client); job.run().await; std::thread::sleep(Duration::from_millis(10000)); }, @@ -657,7 +658,7 @@ mod host_info { let cpu_mhzes: Vec<&String> = cpu_lines.iter().filter(|line| line.starts_with("cpu MHz")).collect(); match cpu_mhzes.get(cpu as usize) { Some(mhz) => { - let mut line_parts = cpu_mhzes[cpu as usize].split(":"); + let mut line_parts = mhz.split(":"); let _ = line_parts.next(); let mhz = line_parts.next().unwrap().trim(); let mhz: f64 = mhz.parse().unwrap(); |