summaryrefslogtreecommitdiff
path: root/ci-driver
diff options
context:
space:
mode:
Diffstat (limited to 'ci-driver')
-rw-r--r--ci-driver/src/main.rs25
1 files changed, 12 insertions, 13 deletions
diff --git a/ci-driver/src/main.rs b/ci-driver/src/main.rs
index f8ff34c..08256f0 100644
--- a/ci-driver/src/main.rs
+++ b/ci-driver/src/main.rs
@@ -1,11 +1,10 @@
-use std::process::Command;
use std::collections::HashMap;
use std::sync::{Mutex, RwLock};
use lazy_static::lazy_static;
use std::io::Read;
use futures_util::StreamExt;
use std::fmt;
-use std::path::{Path, PathBuf};
+use std::path::PathBuf;
use tokio::spawn;
use tokio_stream::wrappers::ReceiverStream;
use std::sync::{Arc, Weak};
@@ -21,7 +20,6 @@ use axum::extract::BodyStream;
use axum::response::IntoResponse;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;
-use serde_json::json;
use serde::{Deserialize, Serialize};
use ci_lib_core::dbctx::DbCtx;
@@ -29,7 +27,7 @@ use ci_lib_core::sql;
use ci_lib_core::sql::{PendingRun, Job, Run};
use ci_lib_core::sql::JobResult;
use ci_lib_core::sql::RunState;
-use ci_lib_core::protocol::{ClientProto, CommandInfo, TaskInfo, RequestedJob};
+use ci_lib_core::protocol::{ClientProto, TaskInfo, RequestedJob};
lazy_static! {
static ref AUTH_SECRET: RwLock<Option<String>> = RwLock::new(None);
@@ -73,7 +71,7 @@ async fn activate_run(dbctx: Arc<DbCtx>, candidate: RunnerClient, job: &Job, run
let res = candidate.submit(&dbctx, &run, &remote.remote_git_url, &commit_sha).await;
let mut client_job = match res {
- Ok(Some(mut client_job)) => { client_job }
+ Ok(Some(client_job)) => { client_job }
Ok(None) => {
return Err("client hung up instead of acking task".to_string());
}
@@ -118,6 +116,7 @@ fn token_for_job() -> String {
base64::encode(data)
}
+#[allow(dead_code)]
struct ClientJob {
dbctx: Arc<DbCtx>,
remote_git_url: String,
@@ -141,7 +140,7 @@ impl ClientJob {
};
eprintln!("got {:?}", msg);
match msg {
- ClientProto::NewTaskPlease { allowed_pushers, host_info } => {
+ ClientProto::NewTaskPlease { allowed_pushers: _, host_info: _ } => {
eprintln!("misdirected task request (after handshake?)");
return;
}
@@ -355,7 +354,7 @@ async fn handle_artifact(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClien
return (StatusCode::BAD_REQUEST, "").into_response();
}
- let artifact_path = if let Some(artifact_path) = artifact_path {
+ let _artifact_path = if let Some(artifact_path) = artifact_path {
artifact_path
} else {
eprintln!("bad artifact post: headers: {:?}. no artifact path?", headers);
@@ -404,7 +403,7 @@ struct WorkRequest {
}
async fn handle_next_job(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClient>)>, headers: HeaderMap, mut job_resp: BodyStream) -> impl IntoResponse {
- let auth_token = match headers.get("authorization") {
+ let _auth_token = match headers.get("authorization") {
Some(token) => {
if Some(token.to_str().unwrap_or("")) != AUTH_SECRET.read().unwrap().as_ref().map(|x| &**x) {
eprintln!("BAD AUTH SECRET SUBMITTED: {:?}", token);
@@ -445,7 +444,7 @@ async fn handle_next_job(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClien
let client = match RunnerClient::new(tx_sender, job_resp, accepted_pushers, host_info_id).await {
Ok(v) => v,
Err(e) => {
- eprintln!("unable to register client");
+ eprintln!("unable to register client: {}", e);
return (StatusCode::MISDIRECTED_REQUEST, resp_body).into_response();
}
};
@@ -455,10 +454,10 @@ async fn handle_next_job(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClien
eprintln!("client requested work...");
return (StatusCode::OK, resp_body).into_response();
}
- Err(TrySendError::Full(client)) => {
+ Err(TrySendError::Full(_client)) => {
return (StatusCode::IM_A_TEAPOT, resp_body).into_response();
}
- Err(TrySendError::Closed(client)) => {
+ Err(TrySendError::Closed(_client)) => {
panic!("client holder is gone?");
}
}
@@ -512,7 +511,7 @@ async fn main() {
spawn(old_task_reaper(Arc::clone(&dbctx)));
loop {
- let mut candidate = match channel.recv().await
+ let candidate = match channel.recv().await
.ok_or_else(|| "client channel disconnected".to_string()) {
Ok(candidate) => { candidate },
@@ -583,7 +582,7 @@ async fn old_task_reaper(dbctx: Arc<DbCtx>) {
let active_tasks = ACTIVE_TASKS.lock().unwrap();
- for (id, witness) in active_tasks.iter() {
+ for (id, _witness) in active_tasks.iter() {
if let Some(idx) = potentially_stale_tasks.iter().position(|task| task.id == *id) {
potentially_stale_tasks.swap_remove(idx);
}