diff options
Diffstat (limited to 'ci-driver')
-rw-r--r-- | ci-driver/src/main.rs | 25 |
1 files changed, 12 insertions, 13 deletions
diff --git a/ci-driver/src/main.rs b/ci-driver/src/main.rs index f8ff34c..08256f0 100644 --- a/ci-driver/src/main.rs +++ b/ci-driver/src/main.rs @@ -1,11 +1,10 @@ -use std::process::Command; use std::collections::HashMap; use std::sync::{Mutex, RwLock}; use lazy_static::lazy_static; use std::io::Read; use futures_util::StreamExt; use std::fmt; -use std::path::{Path, PathBuf}; +use std::path::PathBuf; use tokio::spawn; use tokio_stream::wrappers::ReceiverStream; use std::sync::{Arc, Weak}; @@ -21,7 +20,6 @@ use axum::extract::BodyStream; use axum::response::IntoResponse; use tokio::sync::mpsc; use tokio::sync::mpsc::error::TrySendError; -use serde_json::json; use serde::{Deserialize, Serialize}; use ci_lib_core::dbctx::DbCtx; @@ -29,7 +27,7 @@ use ci_lib_core::sql; use ci_lib_core::sql::{PendingRun, Job, Run}; use ci_lib_core::sql::JobResult; use ci_lib_core::sql::RunState; -use ci_lib_core::protocol::{ClientProto, CommandInfo, TaskInfo, RequestedJob}; +use ci_lib_core::protocol::{ClientProto, TaskInfo, RequestedJob}; lazy_static! { static ref AUTH_SECRET: RwLock<Option<String>> = RwLock::new(None); @@ -73,7 +71,7 @@ async fn activate_run(dbctx: Arc<DbCtx>, candidate: RunnerClient, job: &Job, run let res = candidate.submit(&dbctx, &run, &remote.remote_git_url, &commit_sha).await; let mut client_job = match res { - Ok(Some(mut client_job)) => { client_job } + Ok(Some(client_job)) => { client_job } Ok(None) => { return Err("client hung up instead of acking task".to_string()); } @@ -118,6 +116,7 @@ fn token_for_job() -> String { base64::encode(data) } +#[allow(dead_code)] struct ClientJob { dbctx: Arc<DbCtx>, remote_git_url: String, @@ -141,7 +140,7 @@ impl ClientJob { }; eprintln!("got {:?}", msg); match msg { - ClientProto::NewTaskPlease { allowed_pushers, host_info } => { + ClientProto::NewTaskPlease { allowed_pushers: _, host_info: _ } => { eprintln!("misdirected task request (after handshake?)"); return; } @@ -355,7 +354,7 @@ async fn handle_artifact(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClien return (StatusCode::BAD_REQUEST, "").into_response(); } - let artifact_path = if let Some(artifact_path) = artifact_path { + let _artifact_path = if let Some(artifact_path) = artifact_path { artifact_path } else { eprintln!("bad artifact post: headers: {:?}. no artifact path?", headers); @@ -404,7 +403,7 @@ struct WorkRequest { } async fn handle_next_job(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClient>)>, headers: HeaderMap, mut job_resp: BodyStream) -> impl IntoResponse { - let auth_token = match headers.get("authorization") { + let _auth_token = match headers.get("authorization") { Some(token) => { if Some(token.to_str().unwrap_or("")) != AUTH_SECRET.read().unwrap().as_ref().map(|x| &**x) { eprintln!("BAD AUTH SECRET SUBMITTED: {:?}", token); @@ -445,7 +444,7 @@ async fn handle_next_job(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClien let client = match RunnerClient::new(tx_sender, job_resp, accepted_pushers, host_info_id).await { Ok(v) => v, Err(e) => { - eprintln!("unable to register client"); + eprintln!("unable to register client: {}", e); return (StatusCode::MISDIRECTED_REQUEST, resp_body).into_response(); } }; @@ -455,10 +454,10 @@ async fn handle_next_job(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClien eprintln!("client requested work..."); return (StatusCode::OK, resp_body).into_response(); } - Err(TrySendError::Full(client)) => { + Err(TrySendError::Full(_client)) => { return (StatusCode::IM_A_TEAPOT, resp_body).into_response(); } - Err(TrySendError::Closed(client)) => { + Err(TrySendError::Closed(_client)) => { panic!("client holder is gone?"); } } @@ -512,7 +511,7 @@ async fn main() { spawn(old_task_reaper(Arc::clone(&dbctx))); loop { - let mut candidate = match channel.recv().await + let candidate = match channel.recv().await .ok_or_else(|| "client channel disconnected".to_string()) { Ok(candidate) => { candidate }, @@ -583,7 +582,7 @@ async fn old_task_reaper(dbctx: Arc<DbCtx>) { let active_tasks = ACTIVE_TASKS.lock().unwrap(); - for (id, witness) in active_tasks.iter() { + for (id, _witness) in active_tasks.iter() { if let Some(idx) = potentially_stale_tasks.iter().position(|task| task.id == *id) { potentially_stale_tasks.swap_remove(idx); } |