summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ci-ctl/src/main.rs4
-rw-r--r--ci-driver/src/main.rs25
-rw-r--r--ci-lib-core/src/dbctx.rs7
-rw-r--r--ci-lib-native/src/dbctx_ext.rs1
-rw-r--r--ci-lib-native/src/io.rs5
-rw-r--r--ci-lib-native/src/notifier.rs4
-rw-r--r--ci-runner/src/lua/mod.rs35
-rw-r--r--ci-runner/src/main.rs55
-rw-r--r--ci-web-server/Cargo.toml2
9 files changed, 63 insertions, 75 deletions
diff --git a/ci-ctl/src/main.rs b/ci-ctl/src/main.rs
index bd2f733..4f09f27 100644
--- a/ci-ctl/src/main.rs
+++ b/ci-ctl/src/main.rs
@@ -80,7 +80,7 @@ fn main() {
match what {
JobAction::List => {
let db = DbCtx::new(&config_path, &db_path);
- let mut conn = db.conn.lock().unwrap();
+ let conn = db.conn.lock().unwrap();
let mut query = conn.prepare(ci_lib_core::sql::SELECT_ALL_RUNS_WITH_JOB_INFO).unwrap();
let mut jobs = query.query([]).unwrap();
while let Some(row) = jobs.next().unwrap() {
@@ -170,7 +170,7 @@ fn main() {
println!("[+] new repo created: '{}' id {}", &name, repo_id);
if let Some((remote, remote_kind, config_path)) = remote_config {
let full_config_file_path = format!("{}/{}", &db.config_path.display(), config_path);
- let config = match remote_kind.as_ref() {
+ let _config = match remote_kind.as_ref() {
"github" => {
assert!(NotifierConfig::github_from_file(&full_config_file_path).is_ok());
}
diff --git a/ci-driver/src/main.rs b/ci-driver/src/main.rs
index f8ff34c..08256f0 100644
--- a/ci-driver/src/main.rs
+++ b/ci-driver/src/main.rs
@@ -1,11 +1,10 @@
-use std::process::Command;
use std::collections::HashMap;
use std::sync::{Mutex, RwLock};
use lazy_static::lazy_static;
use std::io::Read;
use futures_util::StreamExt;
use std::fmt;
-use std::path::{Path, PathBuf};
+use std::path::PathBuf;
use tokio::spawn;
use tokio_stream::wrappers::ReceiverStream;
use std::sync::{Arc, Weak};
@@ -21,7 +20,6 @@ use axum::extract::BodyStream;
use axum::response::IntoResponse;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;
-use serde_json::json;
use serde::{Deserialize, Serialize};
use ci_lib_core::dbctx::DbCtx;
@@ -29,7 +27,7 @@ use ci_lib_core::sql;
use ci_lib_core::sql::{PendingRun, Job, Run};
use ci_lib_core::sql::JobResult;
use ci_lib_core::sql::RunState;
-use ci_lib_core::protocol::{ClientProto, CommandInfo, TaskInfo, RequestedJob};
+use ci_lib_core::protocol::{ClientProto, TaskInfo, RequestedJob};
lazy_static! {
static ref AUTH_SECRET: RwLock<Option<String>> = RwLock::new(None);
@@ -73,7 +71,7 @@ async fn activate_run(dbctx: Arc<DbCtx>, candidate: RunnerClient, job: &Job, run
let res = candidate.submit(&dbctx, &run, &remote.remote_git_url, &commit_sha).await;
let mut client_job = match res {
- Ok(Some(mut client_job)) => { client_job }
+ Ok(Some(client_job)) => { client_job }
Ok(None) => {
return Err("client hung up instead of acking task".to_string());
}
@@ -118,6 +116,7 @@ fn token_for_job() -> String {
base64::encode(data)
}
+#[allow(dead_code)]
struct ClientJob {
dbctx: Arc<DbCtx>,
remote_git_url: String,
@@ -141,7 +140,7 @@ impl ClientJob {
};
eprintln!("got {:?}", msg);
match msg {
- ClientProto::NewTaskPlease { allowed_pushers, host_info } => {
+ ClientProto::NewTaskPlease { allowed_pushers: _, host_info: _ } => {
eprintln!("misdirected task request (after handshake?)");
return;
}
@@ -355,7 +354,7 @@ async fn handle_artifact(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClien
return (StatusCode::BAD_REQUEST, "").into_response();
}
- let artifact_path = if let Some(artifact_path) = artifact_path {
+ let _artifact_path = if let Some(artifact_path) = artifact_path {
artifact_path
} else {
eprintln!("bad artifact post: headers: {:?}. no artifact path?", headers);
@@ -404,7 +403,7 @@ struct WorkRequest {
}
async fn handle_next_job(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClient>)>, headers: HeaderMap, mut job_resp: BodyStream) -> impl IntoResponse {
- let auth_token = match headers.get("authorization") {
+ let _auth_token = match headers.get("authorization") {
Some(token) => {
if Some(token.to_str().unwrap_or("")) != AUTH_SECRET.read().unwrap().as_ref().map(|x| &**x) {
eprintln!("BAD AUTH SECRET SUBMITTED: {:?}", token);
@@ -445,7 +444,7 @@ async fn handle_next_job(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClien
let client = match RunnerClient::new(tx_sender, job_resp, accepted_pushers, host_info_id).await {
Ok(v) => v,
Err(e) => {
- eprintln!("unable to register client");
+ eprintln!("unable to register client: {}", e);
return (StatusCode::MISDIRECTED_REQUEST, resp_body).into_response();
}
};
@@ -455,10 +454,10 @@ async fn handle_next_job(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClien
eprintln!("client requested work...");
return (StatusCode::OK, resp_body).into_response();
}
- Err(TrySendError::Full(client)) => {
+ Err(TrySendError::Full(_client)) => {
return (StatusCode::IM_A_TEAPOT, resp_body).into_response();
}
- Err(TrySendError::Closed(client)) => {
+ Err(TrySendError::Closed(_client)) => {
panic!("client holder is gone?");
}
}
@@ -512,7 +511,7 @@ async fn main() {
spawn(old_task_reaper(Arc::clone(&dbctx)));
loop {
- let mut candidate = match channel.recv().await
+ let candidate = match channel.recv().await
.ok_or_else(|| "client channel disconnected".to_string()) {
Ok(candidate) => { candidate },
@@ -583,7 +582,7 @@ async fn old_task_reaper(dbctx: Arc<DbCtx>) {
let active_tasks = ACTIVE_TASKS.lock().unwrap();
- for (id, witness) in active_tasks.iter() {
+ for (id, _witness) in active_tasks.iter() {
if let Some(idx) = potentially_stale_tasks.iter().position(|task| task.id == *id) {
potentially_stale_tasks.swap_remove(idx);
}
diff --git a/ci-lib-core/src/dbctx.rs b/ci-lib-core/src/dbctx.rs
index eec5e72..399f67f 100644
--- a/ci-lib-core/src/dbctx.rs
+++ b/ci-lib-core/src/dbctx.rs
@@ -1,11 +1,8 @@
use std::sync::Mutex;
-// use futures_util::StreamExt;
use rusqlite::{params, Connection, OptionalExtension};
use std::time::{SystemTime, UNIX_EPOCH};
-// use tokio::io::{AsyncReadExt, AsyncWriteExt};
use std::path::Path;
use std::path::PathBuf;
-use std::ops::Deref;
use crate::sql;
@@ -34,10 +31,6 @@ impl DbCtx {
}
}
- fn conn<'a>(&'a self) -> impl Deref<Target = Connection> + 'a {
- self.conn.lock().unwrap()
- }
-
pub fn create_tables(&self) -> Result<(), ()> {
let conn = self.conn.lock().unwrap();
conn.execute(sql::CREATE_ARTIFACTS_TABLE, params![]).unwrap();
diff --git a/ci-lib-native/src/dbctx_ext.rs b/ci-lib-native/src/dbctx_ext.rs
index 44436fc..65ee851 100644
--- a/ci-lib-native/src/dbctx_ext.rs
+++ b/ci-lib-native/src/dbctx_ext.rs
@@ -1,6 +1,5 @@
use crate::io::ArtifactDescriptor;
use crate::notifier::{RemoteNotifier, NotifierConfig};
-use tokio::fs::{File, OpenOptions};
use ci_lib_core::dbctx::DbCtx;
diff --git a/ci-lib-native/src/io.rs b/ci-lib-native/src/io.rs
index d41349c..6071794 100644
--- a/ci-lib-native/src/io.rs
+++ b/ci-lib-native/src/io.rs
@@ -1,11 +1,9 @@
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use futures_util::StreamExt;
use tokio::fs::File;
-use std::io::Write;
use tokio::fs::OpenOptions;
use std::task::{Poll, Context};
use std::pin::Pin;
-use std::time::{UNIX_EPOCH, SystemTime};
use std::sync::{Arc, Mutex};
#[derive(Clone)]
@@ -26,7 +24,7 @@ impl VecSink {
impl tokio::io::AsyncWrite for VecSink {
fn poll_write(
self: Pin<&mut Self>,
- cx: &mut Context,
+ _cx: &mut Context,
buf: &[u8]
) -> Poll<Result<usize, std::io::Error>> {
self.body.lock().unwrap().extend_from_slice(buf);
@@ -92,6 +90,7 @@ impl tokio::io::AsyncWrite for ArtifactStream {
pub struct ArtifactDescriptor {
+ #[allow(dead_code)]
job_id: u64,
pub artifact_id: u64,
file: File,
diff --git a/ci-lib-native/src/notifier.rs b/ci-lib-native/src/notifier.rs
index dd4a35c..a6d7469 100644
--- a/ci-lib-native/src/notifier.rs
+++ b/ci-lib-native/src/notifier.rs
@@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize};
use std::sync::Arc;
use axum::http::StatusCode;
use lettre::transport::smtp::authentication::{Credentials, Mechanism};
-use lettre::{Message, Transport};
+use lettre::Message;
use lettre::transport::smtp::extension::ClientId;
use lettre::transport::smtp::client::{SmtpConnection, TlsParametersBuilder};
use std::time::Duration;
@@ -88,7 +88,7 @@ impl RemoteNotifier {
}
}
- pub async fn tell_job_status(&self, ctx: &Arc<DbCtx>, repo_id: u64, sha: &str, job_id: u64, state: &str, desc: &str, target_url: &str) -> Result<(), String> {
+ pub async fn tell_job_status(&self, _ctx: &Arc<DbCtx>, _repo_id: u64, sha: &str, _job_id: u64, state: &str, desc: &str, target_url: &str) -> Result<(), String> {
match &self.notifier {
NotifierConfig::GitHub { token } => {
let status_info = serde_json::json!({
diff --git a/ci-runner/src/lua/mod.rs b/ci-runner/src/lua/mod.rs
index d05ad7d..2c1423b 100644
--- a/ci-runner/src/lua/mod.rs
+++ b/ci-runner/src/lua/mod.rs
@@ -1,10 +1,8 @@
-use crate::Runner;
use crate::RunningJob;
use rlua::prelude::*;
use std::sync::{Arc, Mutex};
-use std::path::PathBuf;
pub const DEFAULT_RUST_GOODFILE: &'static [u8] = include_bytes!("../../../config/goodfiles/rust.lua");
@@ -15,8 +13,8 @@ pub struct BuildEnv {
#[derive(Debug)]
pub struct RunParams {
- step: Option<String>,
- name: Option<String>,
+ _step: Option<String>,
+ _name: Option<String>,
cwd: Option<String>,
}
@@ -27,9 +25,8 @@ pub struct CommandOutput {
}
mod lua_exports {
- use crate::Runner;
use crate::RunningJob;
- use crate::lua::{CommandOutput, RunParams};
+ use crate::lua::RunParams;
use std::sync::{Arc, Mutex};
use std::path::PathBuf;
@@ -69,7 +66,7 @@ mod lua_exports {
LuaValue::Nil => {
None
},
- other => {
+ _other => {
return Err(LuaError::RuntimeError(format!("params[\"step\"] must be a string")));
}
};
@@ -80,7 +77,7 @@ mod lua_exports {
LuaValue::Nil => {
None
},
- other => {
+ _other => {
return Err(LuaError::RuntimeError(format!("params[\"name\"] must be a string")));
}
};
@@ -91,21 +88,21 @@ mod lua_exports {
LuaValue::Nil => {
None
},
- other => {
+ _other => {
return Err(LuaError::RuntimeError(format!("params[\"cwd\"] must be a string")));
}
};
RunParams {
- step,
- name,
+ _step: step,
+ _name: name,
cwd,
}
},
LuaValue::Nil => {
RunParams {
- step: None,
- name: None,
+ _step: None,
+ _name: None,
cwd: None,
}
}
@@ -227,7 +224,7 @@ mod lua_exports {
pub fn file_size(path: &str) -> Result<u64, rlua::Error> {
Ok(std::fs::metadata(&format!("tmpdir/{}", path))
- .map_err(|e| LuaError::RuntimeError(format!("could not stat {:?}", path)))?
+ .map_err(|_e| LuaError::RuntimeError(format!("could not stat {:?}", path)))?
.len())
}
@@ -300,7 +297,7 @@ impl BuildEnv {
Ok(())
})?;
- let check_dependencies = decl_env.create_function("dependencies", move |_, job_ref, commands: Vec<String>| {
+ let check_dependencies = decl_env.create_function("dependencies", move |_, _job_ref, commands: Vec<String>| {
lua_exports::check_dependencies(commands)
})?;
@@ -316,21 +313,21 @@ impl BuildEnv {
lua_exports::metric(name, value, job_ref)
})?;
- let now_ms = decl_env.create_function("now_ms", move |_, job_ref, ()| Ok(ci_lib_core::now_ms()))?;
+ let now_ms = decl_env.create_function("now_ms", move |_, _job_ref, ()| Ok(ci_lib_core::now_ms()))?;
let artifact = decl_env.create_function("artifact", move |_, job_ref, (path, name): (String, Option<String>)| {
lua_exports::artifact(path, name, job_ref)
})?;
- let error = decl_env.create_function("error", move |_, job_ref, msg: String| {
+ let error = decl_env.create_function("error", move |_, _job_ref, msg: String| {
Err::<(), LuaError>(LuaError::RuntimeError(format!("explicit error: {}", msg)))
})?;
- let path_has_cmd = decl_env.create_function("path_has_cmd", move |_, job_ref, name: String| {
+ let path_has_cmd = decl_env.create_function("path_has_cmd", move |_, _job_ref, name: String| {
lua_exports::has_cmd(&name)
})?;
- let size_of_file = decl_env.create_function("size_of_file", move |_, job_ref, name: String| {
+ let size_of_file = decl_env.create_function("size_of_file", move |_, _job_ref, name: String| {
lua_exports::file_size(&name)
})?;
diff --git a/ci-runner/src/main.rs b/ci-runner/src/main.rs
index f5941fa..6ed6538 100644
--- a/ci-runner/src/main.rs
+++ b/ci-runner/src/main.rs
@@ -6,14 +6,10 @@ use reqwest::{StatusCode, Response};
use tokio::process::Command;
use std::process::Stdio;
use std::process::ExitStatus;
-use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
-use serde_json::json;
+use tokio::io::AsyncWrite;
use serde::{Deserialize, de::DeserializeOwned, Serialize};
-use std::task::{Context, Poll};
-use std::pin::Pin;
use std::marker::Unpin;
-use std::future::Future;
-use std::path::{Path, PathBuf};
+use std::path::PathBuf;
use ci_lib_native::io;
use ci_lib_native::io::{ArtifactStream, VecSink};
@@ -23,6 +19,7 @@ mod lua;
use crate::lua::CommandOutput;
+#[allow(dead_code)]
#[derive(Debug)]
enum WorkAcquireError {
Reqwest(reqwest::Error),
@@ -43,6 +40,7 @@ trait Runner: Send + Sync + 'static {
async fn create_artifact(&self, name: &str, desc: &str, build_token: &str) -> Result<ArtifactStream, String>;
}
+#[allow(dead_code)]
struct LocalRunner {
working_dir: PathBuf,
current_job: Option<RequestedJob>,
@@ -68,7 +66,7 @@ impl Runner for LocalRunner {
println!("metric reported: {} = {}", name, value);
Ok(())
}
- async fn create_artifact(&self, name: &str, desc: &str, build_token: &str) -> Result<ArtifactStream, String> {
+ async fn create_artifact(&self, _name: &str, _desc: &str, _build_token: &str) -> Result<ArtifactStream, String> {
Err("can't create artifacts yet".to_string())
}
}
@@ -77,9 +75,11 @@ impl Runner for LocalRunner {
/// a task", including reporting metrics and statuses back to the CI server.
struct RemoteServerRunner {
http: reqwest::Client,
+ #[allow(dead_code)]
host: String,
tx: hyper::body::Sender,
rx: Response,
+ #[allow(dead_code)]
current_job: Option<RequestedJob>,
}
@@ -103,7 +103,7 @@ impl Runner for RemoteServerRunner {
.map_err(|e| format!("failed to send metric {}: {:?})", name, e))
}
async fn create_artifact(&self, name: &str, desc: &str, build_token: &str) -> Result<ArtifactStream, String> {
- let (mut sender, body) = hyper::Body::channel();
+ let (sender, body) = hyper::Body::channel();
let resp = self.http.post("https://ci.butactuallyin.space:9876/api/artifact")
.header("user-agent", "ci-butactuallyin-space-runner")
.header("x-task-token", build_token)
@@ -147,6 +147,7 @@ impl RunningJob {
struct JobEnv {
lua: lua::BuildEnv,
+ #[allow(dead_code)]
job: Arc<Mutex<Box<RunningJob>>>,
}
@@ -175,6 +176,7 @@ pub struct RunningJob {
current_step: StepTracker,
}
+#[allow(dead_code)]
enum RepoError {
CloneFailedIdk { exit_code: ExitStatus },
CheckoutFailedIdk { exit_code: ExitStatus },
@@ -259,7 +261,7 @@ impl RunningJob {
Ok(())
}
- async fn execute_command_and_report(&self, mut command: Command, name: &str, desc: &str) -> Result<ExitStatus, String> {
+ async fn execute_command_and_report(&self, command: Command, name: &str, desc: &str) -> Result<ExitStatus, String> {
let stdout_artifact = self.create_artifact(
&format!("{} (stdout)", name),
&format!("{} (stdout)", desc)
@@ -274,7 +276,7 @@ impl RunningJob {
Ok(exit_status)
}
- async fn execute_command_capture_output(&self, mut command: Command, name: &str, desc: &str) -> Result<crate::lua::CommandOutput, String> {
+ async fn execute_command_capture_output(&self, command: Command, name: &str, desc: &str) -> Result<crate::lua::CommandOutput, String> {
let stdout_collector = VecSink::new();
let stderr_collector = VecSink::new();
@@ -287,7 +289,7 @@ impl RunningJob {
})
}
- async fn execute_command(&self, mut command: Command, name: &str, desc: &str, mut stdout_reporter: impl AsyncWrite + Unpin + Send + 'static, mut stderr_reporter: impl AsyncWrite + Unpin + Send + 'static) -> Result<ExitStatus, String> {
+ async fn execute_command(&self, mut command: Command, name: &str, _desc: &str, mut stdout_reporter: impl AsyncWrite + Unpin + Send + 'static, mut stderr_reporter: impl AsyncWrite + Unpin + Send + 'static) -> Result<ExitStatus, String> {
eprintln!("[.] running {}", name);
let mut child = command
@@ -327,7 +329,7 @@ impl RunningJob {
let checkout_res = ctx.lock().unwrap().clone_remote().await;
- if let Err(e) = checkout_res {
+ if let Err(_e) = checkout_res {
let status = "bad_ref";
let status = TaskInfo::finished(status);
eprintln!("checkout failed, reporting status: {:?}", status);
@@ -444,7 +446,7 @@ impl RunningJob {
}
impl RemoteServerRunner {
- async fn new(host: &str, mut sender: hyper::body::Sender, mut res: Response) -> Result<Self, String> {
+ async fn new(host: &str, sender: hyper::body::Sender, mut res: Response) -> Result<Self, String> {
if res.status() != StatusCode::OK {
return Err(format!("server returned a bad response: {:?}, response itself: {:?}", res.status(), res));
}
@@ -467,11 +469,18 @@ impl RemoteServerRunner {
})
}
- async fn wait_for_work(&mut self, accepted_pushers: Option<&[String]>) -> Result<Option<RequestedJob>, WorkAcquireError> {
+ async fn wait_for_work(&mut self, _accepted_pushers: Option<&[String]>) -> Result<Option<RequestedJob>, WorkAcquireError> {
loop {
let message = self.recv_typed::<ClientProto>().await;
match message {
Ok(Some(ClientProto::NewTask(new_task))) => {
+ // TODO: verify that `new_task` is for a commit authored by someone we're
+ // willing to run work for.
+ //
+ // we're also trusting the server to only tell us about work we would be
+ // interested in running, so if this rejects a task it's a server bug that we
+ // got the task in the first place or a client bug that the list of accepted
+ // pushers varied.
return Ok(Some(new_task));
},
Ok(Some(ClientProto::Ping)) => {
@@ -491,10 +500,6 @@ impl RemoteServerRunner {
}
}
- async fn recv(&mut self) -> Result<Option<serde_json::Value>, String> {
- self.recv_typed().await
- }
-
async fn recv_typed<T: DeserializeOwned>(&mut self) -> Result<Option<T>, String> {
match self.rx.chunk().await {
Ok(Some(chunk)) => {
@@ -511,10 +516,6 @@ impl RemoteServerRunner {
}
}
- async fn send(&mut self, value: serde_json::Value) -> Result<(), String> {
- self.send_typed(&value).await
- }
-
async fn send_typed<T: Serialize>(&mut self, t: &T) -> Result<(), String> {
self.tx.send_data(
serde_json::to_vec(t)
@@ -546,13 +547,13 @@ async fn main() {
}
}
-async fn run_local(config_path: String) {
+async fn run_local(_config_path: String) {
let job = RequestedJob {
commit: "current commit?".to_string(),
remote_url: "cwd?".to_string(),
build_token: "n/a".to_string(),
};
- let mut job = RunningJob::local_from_job(job);
+ let job = RunningJob::local_from_job(job);
job.run().await;
}
@@ -583,7 +584,7 @@ async fn run_remote(config_path: String) {
.await;
match poll {
- Ok(mut res) => {
+ Ok(res) => {
let mut client = match RemoteServerRunner::new("ci.butactuallyin.space:9876", sender, res).await {
Ok(client) => client,
Err(e) => {
@@ -609,7 +610,7 @@ async fn run_remote(config_path: String) {
eprintln!("doing {:?}", job);
- let mut job = RunningJob::remote_from_job(job, client);
+ let job = RunningJob::remote_from_job(job, client);
job.run().await;
std::thread::sleep(Duration::from_millis(10000));
},
@@ -657,7 +658,7 @@ mod host_info {
let cpu_mhzes: Vec<&String> = cpu_lines.iter().filter(|line| line.starts_with("cpu MHz")).collect();
match cpu_mhzes.get(cpu as usize) {
Some(mhz) => {
- let mut line_parts = cpu_mhzes[cpu as usize].split(":");
+ let mut line_parts = mhz.split(":");
let _ = line_parts.next();
let mhz = line_parts.next().unwrap().trim();
let mhz: f64 = mhz.parse().unwrap();
diff --git a/ci-web-server/Cargo.toml b/ci-web-server/Cargo.toml
index bb0b57c..164ec56 100644
--- a/ci-web-server/Cargo.toml
+++ b/ci-web-server/Cargo.toml
@@ -14,7 +14,7 @@ ci-lib-core = { path = "../ci-lib-core" }
ci-lib-native = { path = "../ci-lib-native" }
ci-lib-web = { path = "../ci-lib-web" }
-tokio = { features = ["full"] }
+tokio = { version = "*", features = ["full"] }
tokio-stream = "*"
serde_json = "*"
serde = { version = "*", features = ["derive"] }