diff options
-rw-r--r-- | ci-ctl/src/main.rs | 4 | ||||
-rw-r--r-- | ci-driver/src/main.rs | 25 | ||||
-rw-r--r-- | ci-lib-core/src/dbctx.rs | 7 | ||||
-rw-r--r-- | ci-lib-native/src/dbctx_ext.rs | 1 | ||||
-rw-r--r-- | ci-lib-native/src/io.rs | 5 | ||||
-rw-r--r-- | ci-lib-native/src/notifier.rs | 4 | ||||
-rw-r--r-- | ci-runner/src/lua/mod.rs | 35 | ||||
-rw-r--r-- | ci-runner/src/main.rs | 55 | ||||
-rw-r--r-- | ci-web-server/Cargo.toml | 2 |
9 files changed, 63 insertions, 75 deletions
diff --git a/ci-ctl/src/main.rs b/ci-ctl/src/main.rs index bd2f733..4f09f27 100644 --- a/ci-ctl/src/main.rs +++ b/ci-ctl/src/main.rs @@ -80,7 +80,7 @@ fn main() { match what { JobAction::List => { let db = DbCtx::new(&config_path, &db_path); - let mut conn = db.conn.lock().unwrap(); + let conn = db.conn.lock().unwrap(); let mut query = conn.prepare(ci_lib_core::sql::SELECT_ALL_RUNS_WITH_JOB_INFO).unwrap(); let mut jobs = query.query([]).unwrap(); while let Some(row) = jobs.next().unwrap() { @@ -170,7 +170,7 @@ fn main() { println!("[+] new repo created: '{}' id {}", &name, repo_id); if let Some((remote, remote_kind, config_path)) = remote_config { let full_config_file_path = format!("{}/{}", &db.config_path.display(), config_path); - let config = match remote_kind.as_ref() { + let _config = match remote_kind.as_ref() { "github" => { assert!(NotifierConfig::github_from_file(&full_config_file_path).is_ok()); } diff --git a/ci-driver/src/main.rs b/ci-driver/src/main.rs index f8ff34c..08256f0 100644 --- a/ci-driver/src/main.rs +++ b/ci-driver/src/main.rs @@ -1,11 +1,10 @@ -use std::process::Command; use std::collections::HashMap; use std::sync::{Mutex, RwLock}; use lazy_static::lazy_static; use std::io::Read; use futures_util::StreamExt; use std::fmt; -use std::path::{Path, PathBuf}; +use std::path::PathBuf; use tokio::spawn; use tokio_stream::wrappers::ReceiverStream; use std::sync::{Arc, Weak}; @@ -21,7 +20,6 @@ use axum::extract::BodyStream; use axum::response::IntoResponse; use tokio::sync::mpsc; use tokio::sync::mpsc::error::TrySendError; -use serde_json::json; use serde::{Deserialize, Serialize}; use ci_lib_core::dbctx::DbCtx; @@ -29,7 +27,7 @@ use ci_lib_core::sql; use ci_lib_core::sql::{PendingRun, Job, Run}; use ci_lib_core::sql::JobResult; use ci_lib_core::sql::RunState; -use ci_lib_core::protocol::{ClientProto, CommandInfo, TaskInfo, RequestedJob}; +use ci_lib_core::protocol::{ClientProto, TaskInfo, RequestedJob}; lazy_static! { static ref AUTH_SECRET: RwLock<Option<String>> = RwLock::new(None); @@ -73,7 +71,7 @@ async fn activate_run(dbctx: Arc<DbCtx>, candidate: RunnerClient, job: &Job, run let res = candidate.submit(&dbctx, &run, &remote.remote_git_url, &commit_sha).await; let mut client_job = match res { - Ok(Some(mut client_job)) => { client_job } + Ok(Some(client_job)) => { client_job } Ok(None) => { return Err("client hung up instead of acking task".to_string()); } @@ -118,6 +116,7 @@ fn token_for_job() -> String { base64::encode(data) } +#[allow(dead_code)] struct ClientJob { dbctx: Arc<DbCtx>, remote_git_url: String, @@ -141,7 +140,7 @@ impl ClientJob { }; eprintln!("got {:?}", msg); match msg { - ClientProto::NewTaskPlease { allowed_pushers, host_info } => { + ClientProto::NewTaskPlease { allowed_pushers: _, host_info: _ } => { eprintln!("misdirected task request (after handshake?)"); return; } @@ -355,7 +354,7 @@ async fn handle_artifact(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClien return (StatusCode::BAD_REQUEST, "").into_response(); } - let artifact_path = if let Some(artifact_path) = artifact_path { + let _artifact_path = if let Some(artifact_path) = artifact_path { artifact_path } else { eprintln!("bad artifact post: headers: {:?}. no artifact path?", headers); @@ -404,7 +403,7 @@ struct WorkRequest { } async fn handle_next_job(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClient>)>, headers: HeaderMap, mut job_resp: BodyStream) -> impl IntoResponse { - let auth_token = match headers.get("authorization") { + let _auth_token = match headers.get("authorization") { Some(token) => { if Some(token.to_str().unwrap_or("")) != AUTH_SECRET.read().unwrap().as_ref().map(|x| &**x) { eprintln!("BAD AUTH SECRET SUBMITTED: {:?}", token); @@ -445,7 +444,7 @@ async fn handle_next_job(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClien let client = match RunnerClient::new(tx_sender, job_resp, accepted_pushers, host_info_id).await { Ok(v) => v, Err(e) => { - eprintln!("unable to register client"); + eprintln!("unable to register client: {}", e); return (StatusCode::MISDIRECTED_REQUEST, resp_body).into_response(); } }; @@ -455,10 +454,10 @@ async fn handle_next_job(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClien eprintln!("client requested work..."); return (StatusCode::OK, resp_body).into_response(); } - Err(TrySendError::Full(client)) => { + Err(TrySendError::Full(_client)) => { return (StatusCode::IM_A_TEAPOT, resp_body).into_response(); } - Err(TrySendError::Closed(client)) => { + Err(TrySendError::Closed(_client)) => { panic!("client holder is gone?"); } } @@ -512,7 +511,7 @@ async fn main() { spawn(old_task_reaper(Arc::clone(&dbctx))); loop { - let mut candidate = match channel.recv().await + let candidate = match channel.recv().await .ok_or_else(|| "client channel disconnected".to_string()) { Ok(candidate) => { candidate }, @@ -583,7 +582,7 @@ async fn old_task_reaper(dbctx: Arc<DbCtx>) { let active_tasks = ACTIVE_TASKS.lock().unwrap(); - for (id, witness) in active_tasks.iter() { + for (id, _witness) in active_tasks.iter() { if let Some(idx) = potentially_stale_tasks.iter().position(|task| task.id == *id) { potentially_stale_tasks.swap_remove(idx); } diff --git a/ci-lib-core/src/dbctx.rs b/ci-lib-core/src/dbctx.rs index eec5e72..399f67f 100644 --- a/ci-lib-core/src/dbctx.rs +++ b/ci-lib-core/src/dbctx.rs @@ -1,11 +1,8 @@ use std::sync::Mutex; -// use futures_util::StreamExt; use rusqlite::{params, Connection, OptionalExtension}; use std::time::{SystemTime, UNIX_EPOCH}; -// use tokio::io::{AsyncReadExt, AsyncWriteExt}; use std::path::Path; use std::path::PathBuf; -use std::ops::Deref; use crate::sql; @@ -34,10 +31,6 @@ impl DbCtx { } } - fn conn<'a>(&'a self) -> impl Deref<Target = Connection> + 'a { - self.conn.lock().unwrap() - } - pub fn create_tables(&self) -> Result<(), ()> { let conn = self.conn.lock().unwrap(); conn.execute(sql::CREATE_ARTIFACTS_TABLE, params![]).unwrap(); diff --git a/ci-lib-native/src/dbctx_ext.rs b/ci-lib-native/src/dbctx_ext.rs index 44436fc..65ee851 100644 --- a/ci-lib-native/src/dbctx_ext.rs +++ b/ci-lib-native/src/dbctx_ext.rs @@ -1,6 +1,5 @@ use crate::io::ArtifactDescriptor; use crate::notifier::{RemoteNotifier, NotifierConfig}; -use tokio::fs::{File, OpenOptions}; use ci_lib_core::dbctx::DbCtx; diff --git a/ci-lib-native/src/io.rs b/ci-lib-native/src/io.rs index d41349c..6071794 100644 --- a/ci-lib-native/src/io.rs +++ b/ci-lib-native/src/io.rs @@ -1,11 +1,9 @@ use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use futures_util::StreamExt; use tokio::fs::File; -use std::io::Write; use tokio::fs::OpenOptions; use std::task::{Poll, Context}; use std::pin::Pin; -use std::time::{UNIX_EPOCH, SystemTime}; use std::sync::{Arc, Mutex}; #[derive(Clone)] @@ -26,7 +24,7 @@ impl VecSink { impl tokio::io::AsyncWrite for VecSink { fn poll_write( self: Pin<&mut Self>, - cx: &mut Context, + _cx: &mut Context, buf: &[u8] ) -> Poll<Result<usize, std::io::Error>> { self.body.lock().unwrap().extend_from_slice(buf); @@ -92,6 +90,7 @@ impl tokio::io::AsyncWrite for ArtifactStream { pub struct ArtifactDescriptor { + #[allow(dead_code)] job_id: u64, pub artifact_id: u64, file: File, diff --git a/ci-lib-native/src/notifier.rs b/ci-lib-native/src/notifier.rs index dd4a35c..a6d7469 100644 --- a/ci-lib-native/src/notifier.rs +++ b/ci-lib-native/src/notifier.rs @@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize}; use std::sync::Arc; use axum::http::StatusCode; use lettre::transport::smtp::authentication::{Credentials, Mechanism}; -use lettre::{Message, Transport}; +use lettre::Message; use lettre::transport::smtp::extension::ClientId; use lettre::transport::smtp::client::{SmtpConnection, TlsParametersBuilder}; use std::time::Duration; @@ -88,7 +88,7 @@ impl RemoteNotifier { } } - pub async fn tell_job_status(&self, ctx: &Arc<DbCtx>, repo_id: u64, sha: &str, job_id: u64, state: &str, desc: &str, target_url: &str) -> Result<(), String> { + pub async fn tell_job_status(&self, _ctx: &Arc<DbCtx>, _repo_id: u64, sha: &str, _job_id: u64, state: &str, desc: &str, target_url: &str) -> Result<(), String> { match &self.notifier { NotifierConfig::GitHub { token } => { let status_info = serde_json::json!({ diff --git a/ci-runner/src/lua/mod.rs b/ci-runner/src/lua/mod.rs index d05ad7d..2c1423b 100644 --- a/ci-runner/src/lua/mod.rs +++ b/ci-runner/src/lua/mod.rs @@ -1,10 +1,8 @@ -use crate::Runner; use crate::RunningJob; use rlua::prelude::*; use std::sync::{Arc, Mutex}; -use std::path::PathBuf; pub const DEFAULT_RUST_GOODFILE: &'static [u8] = include_bytes!("../../../config/goodfiles/rust.lua"); @@ -15,8 +13,8 @@ pub struct BuildEnv { #[derive(Debug)] pub struct RunParams { - step: Option<String>, - name: Option<String>, + _step: Option<String>, + _name: Option<String>, cwd: Option<String>, } @@ -27,9 +25,8 @@ pub struct CommandOutput { } mod lua_exports { - use crate::Runner; use crate::RunningJob; - use crate::lua::{CommandOutput, RunParams}; + use crate::lua::RunParams; use std::sync::{Arc, Mutex}; use std::path::PathBuf; @@ -69,7 +66,7 @@ mod lua_exports { LuaValue::Nil => { None }, - other => { + _other => { return Err(LuaError::RuntimeError(format!("params[\"step\"] must be a string"))); } }; @@ -80,7 +77,7 @@ mod lua_exports { LuaValue::Nil => { None }, - other => { + _other => { return Err(LuaError::RuntimeError(format!("params[\"name\"] must be a string"))); } }; @@ -91,21 +88,21 @@ mod lua_exports { LuaValue::Nil => { None }, - other => { + _other => { return Err(LuaError::RuntimeError(format!("params[\"cwd\"] must be a string"))); } }; RunParams { - step, - name, + _step: step, + _name: name, cwd, } }, LuaValue::Nil => { RunParams { - step: None, - name: None, + _step: None, + _name: None, cwd: None, } } @@ -227,7 +224,7 @@ mod lua_exports { pub fn file_size(path: &str) -> Result<u64, rlua::Error> { Ok(std::fs::metadata(&format!("tmpdir/{}", path)) - .map_err(|e| LuaError::RuntimeError(format!("could not stat {:?}", path)))? + .map_err(|_e| LuaError::RuntimeError(format!("could not stat {:?}", path)))? .len()) } @@ -300,7 +297,7 @@ impl BuildEnv { Ok(()) })?; - let check_dependencies = decl_env.create_function("dependencies", move |_, job_ref, commands: Vec<String>| { + let check_dependencies = decl_env.create_function("dependencies", move |_, _job_ref, commands: Vec<String>| { lua_exports::check_dependencies(commands) })?; @@ -316,21 +313,21 @@ impl BuildEnv { lua_exports::metric(name, value, job_ref) })?; - let now_ms = decl_env.create_function("now_ms", move |_, job_ref, ()| Ok(ci_lib_core::now_ms()))?; + let now_ms = decl_env.create_function("now_ms", move |_, _job_ref, ()| Ok(ci_lib_core::now_ms()))?; let artifact = decl_env.create_function("artifact", move |_, job_ref, (path, name): (String, Option<String>)| { lua_exports::artifact(path, name, job_ref) })?; - let error = decl_env.create_function("error", move |_, job_ref, msg: String| { + let error = decl_env.create_function("error", move |_, _job_ref, msg: String| { Err::<(), LuaError>(LuaError::RuntimeError(format!("explicit error: {}", msg))) })?; - let path_has_cmd = decl_env.create_function("path_has_cmd", move |_, job_ref, name: String| { + let path_has_cmd = decl_env.create_function("path_has_cmd", move |_, _job_ref, name: String| { lua_exports::has_cmd(&name) })?; - let size_of_file = decl_env.create_function("size_of_file", move |_, job_ref, name: String| { + let size_of_file = decl_env.create_function("size_of_file", move |_, _job_ref, name: String| { lua_exports::file_size(&name) })?; diff --git a/ci-runner/src/main.rs b/ci-runner/src/main.rs index f5941fa..6ed6538 100644 --- a/ci-runner/src/main.rs +++ b/ci-runner/src/main.rs @@ -6,14 +6,10 @@ use reqwest::{StatusCode, Response}; use tokio::process::Command; use std::process::Stdio; use std::process::ExitStatus; -use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; -use serde_json::json; +use tokio::io::AsyncWrite; use serde::{Deserialize, de::DeserializeOwned, Serialize}; -use std::task::{Context, Poll}; -use std::pin::Pin; use std::marker::Unpin; -use std::future::Future; -use std::path::{Path, PathBuf}; +use std::path::PathBuf; use ci_lib_native::io; use ci_lib_native::io::{ArtifactStream, VecSink}; @@ -23,6 +19,7 @@ mod lua; use crate::lua::CommandOutput; +#[allow(dead_code)] #[derive(Debug)] enum WorkAcquireError { Reqwest(reqwest::Error), @@ -43,6 +40,7 @@ trait Runner: Send + Sync + 'static { async fn create_artifact(&self, name: &str, desc: &str, build_token: &str) -> Result<ArtifactStream, String>; } +#[allow(dead_code)] struct LocalRunner { working_dir: PathBuf, current_job: Option<RequestedJob>, @@ -68,7 +66,7 @@ impl Runner for LocalRunner { println!("metric reported: {} = {}", name, value); Ok(()) } - async fn create_artifact(&self, name: &str, desc: &str, build_token: &str) -> Result<ArtifactStream, String> { + async fn create_artifact(&self, _name: &str, _desc: &str, _build_token: &str) -> Result<ArtifactStream, String> { Err("can't create artifacts yet".to_string()) } } @@ -77,9 +75,11 @@ impl Runner for LocalRunner { /// a task", including reporting metrics and statuses back to the CI server. struct RemoteServerRunner { http: reqwest::Client, + #[allow(dead_code)] host: String, tx: hyper::body::Sender, rx: Response, + #[allow(dead_code)] current_job: Option<RequestedJob>, } @@ -103,7 +103,7 @@ impl Runner for RemoteServerRunner { .map_err(|e| format!("failed to send metric {}: {:?})", name, e)) } async fn create_artifact(&self, name: &str, desc: &str, build_token: &str) -> Result<ArtifactStream, String> { - let (mut sender, body) = hyper::Body::channel(); + let (sender, body) = hyper::Body::channel(); let resp = self.http.post("https://ci.butactuallyin.space:9876/api/artifact") .header("user-agent", "ci-butactuallyin-space-runner") .header("x-task-token", build_token) @@ -147,6 +147,7 @@ impl RunningJob { struct JobEnv { lua: lua::BuildEnv, + #[allow(dead_code)] job: Arc<Mutex<Box<RunningJob>>>, } @@ -175,6 +176,7 @@ pub struct RunningJob { current_step: StepTracker, } +#[allow(dead_code)] enum RepoError { CloneFailedIdk { exit_code: ExitStatus }, CheckoutFailedIdk { exit_code: ExitStatus }, @@ -259,7 +261,7 @@ impl RunningJob { Ok(()) } - async fn execute_command_and_report(&self, mut command: Command, name: &str, desc: &str) -> Result<ExitStatus, String> { + async fn execute_command_and_report(&self, command: Command, name: &str, desc: &str) -> Result<ExitStatus, String> { let stdout_artifact = self.create_artifact( &format!("{} (stdout)", name), &format!("{} (stdout)", desc) @@ -274,7 +276,7 @@ impl RunningJob { Ok(exit_status) } - async fn execute_command_capture_output(&self, mut command: Command, name: &str, desc: &str) -> Result<crate::lua::CommandOutput, String> { + async fn execute_command_capture_output(&self, command: Command, name: &str, desc: &str) -> Result<crate::lua::CommandOutput, String> { let stdout_collector = VecSink::new(); let stderr_collector = VecSink::new(); @@ -287,7 +289,7 @@ impl RunningJob { }) } - async fn execute_command(&self, mut command: Command, name: &str, desc: &str, mut stdout_reporter: impl AsyncWrite + Unpin + Send + 'static, mut stderr_reporter: impl AsyncWrite + Unpin + Send + 'static) -> Result<ExitStatus, String> { + async fn execute_command(&self, mut command: Command, name: &str, _desc: &str, mut stdout_reporter: impl AsyncWrite + Unpin + Send + 'static, mut stderr_reporter: impl AsyncWrite + Unpin + Send + 'static) -> Result<ExitStatus, String> { eprintln!("[.] running {}", name); let mut child = command @@ -327,7 +329,7 @@ impl RunningJob { let checkout_res = ctx.lock().unwrap().clone_remote().await; - if let Err(e) = checkout_res { + if let Err(_e) = checkout_res { let status = "bad_ref"; let status = TaskInfo::finished(status); eprintln!("checkout failed, reporting status: {:?}", status); @@ -444,7 +446,7 @@ impl RunningJob { } impl RemoteServerRunner { - async fn new(host: &str, mut sender: hyper::body::Sender, mut res: Response) -> Result<Self, String> { + async fn new(host: &str, sender: hyper::body::Sender, mut res: Response) -> Result<Self, String> { if res.status() != StatusCode::OK { return Err(format!("server returned a bad response: {:?}, response itself: {:?}", res.status(), res)); } @@ -467,11 +469,18 @@ impl RemoteServerRunner { }) } - async fn wait_for_work(&mut self, accepted_pushers: Option<&[String]>) -> Result<Option<RequestedJob>, WorkAcquireError> { + async fn wait_for_work(&mut self, _accepted_pushers: Option<&[String]>) -> Result<Option<RequestedJob>, WorkAcquireError> { loop { let message = self.recv_typed::<ClientProto>().await; match message { Ok(Some(ClientProto::NewTask(new_task))) => { + // TODO: verify that `new_task` is for a commit authored by someone we're + // willing to run work for. + // + // we're also trusting the server to only tell us about work we would be + // interested in running, so if this rejects a task it's a server bug that we + // got the task in the first place or a client bug that the list of accepted + // pushers varied. return Ok(Some(new_task)); }, Ok(Some(ClientProto::Ping)) => { @@ -491,10 +500,6 @@ impl RemoteServerRunner { } } - async fn recv(&mut self) -> Result<Option<serde_json::Value>, String> { - self.recv_typed().await - } - async fn recv_typed<T: DeserializeOwned>(&mut self) -> Result<Option<T>, String> { match self.rx.chunk().await { Ok(Some(chunk)) => { @@ -511,10 +516,6 @@ impl RemoteServerRunner { } } - async fn send(&mut self, value: serde_json::Value) -> Result<(), String> { - self.send_typed(&value).await - } - async fn send_typed<T: Serialize>(&mut self, t: &T) -> Result<(), String> { self.tx.send_data( serde_json::to_vec(t) @@ -546,13 +547,13 @@ async fn main() { } } -async fn run_local(config_path: String) { +async fn run_local(_config_path: String) { let job = RequestedJob { commit: "current commit?".to_string(), remote_url: "cwd?".to_string(), build_token: "n/a".to_string(), }; - let mut job = RunningJob::local_from_job(job); + let job = RunningJob::local_from_job(job); job.run().await; } @@ -583,7 +584,7 @@ async fn run_remote(config_path: String) { .await; match poll { - Ok(mut res) => { + Ok(res) => { let mut client = match RemoteServerRunner::new("ci.butactuallyin.space:9876", sender, res).await { Ok(client) => client, Err(e) => { @@ -609,7 +610,7 @@ async fn run_remote(config_path: String) { eprintln!("doing {:?}", job); - let mut job = RunningJob::remote_from_job(job, client); + let job = RunningJob::remote_from_job(job, client); job.run().await; std::thread::sleep(Duration::from_millis(10000)); }, @@ -657,7 +658,7 @@ mod host_info { let cpu_mhzes: Vec<&String> = cpu_lines.iter().filter(|line| line.starts_with("cpu MHz")).collect(); match cpu_mhzes.get(cpu as usize) { Some(mhz) => { - let mut line_parts = cpu_mhzes[cpu as usize].split(":"); + let mut line_parts = mhz.split(":"); let _ = line_parts.next(); let mhz = line_parts.next().unwrap().trim(); let mhz: f64 = mhz.parse().unwrap(); diff --git a/ci-web-server/Cargo.toml b/ci-web-server/Cargo.toml index bb0b57c..164ec56 100644 --- a/ci-web-server/Cargo.toml +++ b/ci-web-server/Cargo.toml @@ -14,7 +14,7 @@ ci-lib-core = { path = "../ci-lib-core" } ci-lib-native = { path = "../ci-lib-native" } ci-lib-web = { path = "../ci-lib-web" } -tokio = { features = ["full"] } +tokio = { version = "*", features = ["full"] } tokio-stream = "*" serde_json = "*" serde = { version = "*", features = ["derive"] } |