diff options
-rw-r--r-- | src/ci_driver.rs | 35 |
1 files changed, 29 insertions, 6 deletions
diff --git a/src/ci_driver.rs b/src/ci_driver.rs index c236cb3..29a699c 100644 --- a/src/ci_driver.rs +++ b/src/ci_driver.rs @@ -1,4 +1,6 @@ use std::process::Command; +use std::sync::RwLock; +use lazy_static::lazy_static; use std::io::Read; use serde_derive::{Deserialize, Serialize}; use futures_util::StreamExt; @@ -30,6 +32,10 @@ use crate::dbctx::{DbCtx, PendingJob}; use crate::sql::JobResult; use crate::sql::JobState; +lazy_static! { + static ref AUTH_SECRET: RwLock<Option<String>> = RwLock::new(None); +} + fn reserve_artifacts_dir(job: u64) -> std::io::Result<PathBuf> { let mut path: PathBuf = "/root/ixi_ci_server/jobs/".into(); path.push(job.to_string()); @@ -420,7 +426,20 @@ struct WorkRequest { accepted_pushers: Option<Vec<String>> } -async fn handle_next_job(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClient>)>, mut job_resp: BodyStream) -> impl IntoResponse { +async fn handle_next_job(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClient>)>, headers: HeaderMap, mut job_resp: BodyStream) -> impl IntoResponse { + let auth_token = match headers.get("authorization") { + Some(token) => { + if Some(token.to_str().unwrap_or("")) != AUTH_SECRET.read().unwrap().as_ref().map(|x| &**x) { + eprintln!("BAD AUTH SECRET SUBMITTED: {:?}", token); + return (StatusCode::BAD_REQUEST, "").into_response(); + } + } + None => { + eprintln!("bad artifact post: headers: {:?}\nno x-job-token", headers); + return (StatusCode::BAD_REQUEST, "").into_response(); + } + }; + let (tx_sender, tx_receiver) = mpsc::channel(8); let resp_body = StreamBody::new(ReceiverStream::new(tx_receiver)); tx_sender.send(Ok("hello".to_string())).await.expect("works"); @@ -431,29 +450,29 @@ async fn handle_next_job(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClien Ok(v) => v, Err(e) => { eprintln!("couldn't parse work request: {:?}", e); - return (StatusCode::MISDIRECTED_REQUEST, resp_body); + return (StatusCode::MISDIRECTED_REQUEST, resp_body).into_response(); } }; if &request.kind != "new_job_please" { eprintln!("bad request kind: {:?}", &request.kind); - return (StatusCode::MISDIRECTED_REQUEST, resp_body); + return (StatusCode::MISDIRECTED_REQUEST, resp_body).into_response(); } let client = match RunnerClient::new(tx_sender, job_resp, request.accepted_pushers).await { Ok(v) => v, Err(e) => { eprintln!("unable to register client"); - return (StatusCode::MISDIRECTED_REQUEST, resp_body); + return (StatusCode::MISDIRECTED_REQUEST, resp_body).into_response(); } }; match ctx.1.try_send(client) { Ok(()) => { eprintln!("client requested work..."); - return (StatusCode::OK, resp_body); + return (StatusCode::OK, resp_body).into_response(); } Err(TrySendError::Full(client)) => { - return (StatusCode::IM_A_TEAPOT, resp_body); + return (StatusCode::IM_A_TEAPOT, resp_body).into_response(); } Err(TrySendError::Closed(client)) => { panic!("client holder is gone?"); @@ -478,6 +497,7 @@ struct DriverConfig { config_path: PathBuf, db_path: PathBuf, server_addr: String, + auth_secret: String, } #[tokio::main] @@ -488,6 +508,9 @@ async fn main() { args.next().expect("first arg exists"); let config_path = args.next().unwrap_or("./driver_config.json".to_string()); let driver_config: DriverConfig = serde_json::from_reader(std::fs::File::open(config_path).expect("file exists and is accessible")).expect("valid json for DriverConfig"); + let mut auth_secret = AUTH_SECRET.write().unwrap(); + *auth_secret = Some(driver_config.auth_secret.clone()); + std::mem::drop(auth_secret); let config = RustlsConfig::from_pem_file( driver_config.cert_path.clone(), |