diff options
-rw-r--r-- | Cargo.lock | 118 | ||||
-rw-r--r-- | Cargo.toml | 2 | ||||
-rw-r--r-- | src/ci_driver.rs | 77 | ||||
-rw-r--r-- | src/ci_runner.rs | 193 | ||||
-rw-r--r-- | src/dbctx.rs | 114 | ||||
-rw-r--r-- | src/notifier.rs | 5 | ||||
-rw-r--r-- | src/sql.rs | 9 |
7 files changed, 465 insertions, 53 deletions
@@ -87,6 +87,18 @@ dependencies = [ ] [[package]] +name = "axum-macros" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4df0fc33ada14a338b799002f7e8657711422b25d4e16afb032708d6b185621" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn", +] + +[[package]] name = "axum-server" version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -244,6 +256,22 @@ dependencies = [ ] [[package]] +name = "email-encoding" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34dd14c63662e0206599796cd5e1ad0268ab2b9d19b868d6050d688eba2bbf98" +dependencies = [ + "base64", + "memchr", +] + +[[package]] +name = "email_address" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2153bd83ebc09db15bcbdc3e2194d901804952e3dc96967e1cd3b0c5c32d112" + +[[package]] name = "encoding_rs" version = "0.8.31" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -340,6 +368,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04909a7a7e4633ae6c4a9ab280aeb86da1236243a77b694a49eacd659a4bd3ac" [[package]] +name = "futures-io" +version = "0.3.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00f5fb52a06bdcadeb54e8d3671f8888a39697dcb0b81b23b55174030427f4eb" + +[[package]] name = "futures-macro" version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -369,8 +403,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "197676987abd2f9cadff84926f410af1c183608d36641465df73ae8211dc65d6" dependencies = [ "futures-core", + "futures-io", "futures-macro", "futures-task", + "memchr", "pin-project-lite", "pin-utils", "slab", @@ -488,6 +524,17 @@ dependencies = [ ] [[package]] +name = "hostname" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c731c3e10504cc8ed35cfe2f1db4c9274c3d35fa486e3b31df46f068ef3e867" +dependencies = [ + "libc", + "match_cfg", + "winapi", +] + +[[package]] name = "http" version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -579,6 +626,17 @@ dependencies = [ [[package]] name = "idna" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "418a0a6fab821475f634efe3ccc45c013f742efe03d853e8d3355d5cb850ecf8" +dependencies = [ + "matches", + "unicode-bidi", + "unicode-normalization", +] + +[[package]] +name = "idna" version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e14ddfc70884202db2244c223200c204c2bda1bc6e0998d11b5e024d657209e6" @@ -645,6 +703,7 @@ name = "ixi-ci-server" version = "0.0.1" dependencies = [ "axum", + "axum-macros", "axum-server", "clap", "futures-util", @@ -654,6 +713,7 @@ dependencies = [ "http", "http-body", "hyper", + "lettre", "libc", "rand", "reqwest", @@ -685,6 +745,28 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] +name = "lettre" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2eabca5e0b4d0e98e7f2243fb5b7520b6af2b65d8f87bcc86f2c75185a6ff243" +dependencies = [ + "base64", + "email-encoding", + "email_address", + "fastrand", + "futures-util", + "hostname", + "httpdate", + "idna 0.2.3", + "mime", + "native-tls", + "nom", + "once_cell", + "quoted_printable", + "socket2", +] + +[[package]] name = "libc" version = "0.2.137" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -726,6 +808,18 @@ dependencies = [ ] [[package]] +name = "match_cfg" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4" + +[[package]] +name = "matches" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f" + +[[package]] name = "matchit" version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -744,6 +838,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" [[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + +[[package]] name = "mio" version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -774,6 +874,16 @@ dependencies = [ ] [[package]] +name = "nom" +version = "7.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8903e5a29a317527874d0402f867152a3d21c908bb0b933e416c65e301d4c36" +dependencies = [ + "memchr", + "minimal-lexical", +] + +[[package]] name = "nu-ansi-term" version = "0.46.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1025,6 +1135,12 @@ dependencies = [ ] [[package]] +name = "quoted_printable" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20f14e071918cbeefc5edc986a7aa92c425dae244e003a35e1cdddb5ca39b5cb" + +[[package]] name = "rand" version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1726,7 +1842,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0d68c799ae75762b8c3fe375feb6600ef5602c883c5d21eb51c09f22b83c4643" dependencies = [ "form_urlencoded", - "idna", + "idna 0.3.0", "percent-encoding", ] @@ -19,6 +19,8 @@ tokio = { version = "*", features = ["full"] } tokio-stream = "*" hyper = "*" futures-util = "*" +axum-macros = "*" +lettre = "*" tracing = "*" tracing-subscriber = "*" http-body = "*" diff --git a/src/ci_driver.rs b/src/ci_driver.rs index 96e0d44..9ec0dd8 100644 --- a/src/ci_driver.rs +++ b/src/ci_driver.rs @@ -9,6 +9,7 @@ use std::time::{SystemTime, UNIX_EPOCH}; use axum_server::tls_rustls::RustlsConfig; use axum::body::StreamBody; use axum::http::{StatusCode}; +use hyper::HeaderMap; use axum::Router; use axum::routing::*; use axum::extract::State; @@ -101,8 +102,8 @@ async fn activate_job(dbctx: Arc<DbCtx>, job: &PendingJob, clients: &mut mpsc::R let run_host = client_job.client.name.clone(); connection.execute( - "update jobs set started_time=?1, run_host=?2, state=1, artifacts_path=?3 where id=?4", - (now as u64, run_host, format!("{}", artifacts.display()), job.id) + "update jobs set started_time=?1, run_host=?2, state=1, artifacts_path=?3, build_token=?4 where id=?5", + (now as u64, run_host, format!("{}", artifacts.display()), &client_job.client.build_token, job.id) ) .expect("can update"); @@ -139,7 +140,15 @@ struct ClientJob { impl ClientJob { pub async fn run(&mut self) { loop { - let msg = self.client.recv().await.expect("recv works").expect("client sent an object"); + eprintln!("waiting on response.."); + let msg = match self.client.recv().await.expect("recv works") { + Some(msg) => msg, + None => { + eprintln!("client hung up. job's done, i hope?"); + return; + } + }; + eprintln!("got {:?}", msg); let msg_kind = msg.as_object().unwrap().get("kind").unwrap().as_str().unwrap(); match msg_kind { "job_status" => { @@ -228,7 +237,6 @@ impl RunnerClient { Err(format!("no client job: {:?}", e)) }, None => { - eprintln!("no more body chunks? client hung up?"); Ok(None) } } @@ -258,7 +266,6 @@ impl RunnerClient { } } Ok(None) => { - eprintln!("no more body chunks? client hung up?"); Ok(None) } Err(e) => { @@ -275,6 +282,65 @@ impl fmt::Debug for RunnerClient { } } +#[axum_macros::debug_handler] +async fn handle_artifact(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClient>)>, headers: HeaderMap, artifact_content: BodyStream) -> impl IntoResponse { + let job_token = match headers.get("x-job-token") { + Some(job_token) => job_token.to_str().expect("valid string"), + None => { + eprintln!("bad artifact post: headers: {:?}\nno x-job-token", headers); + return (StatusCode::BAD_REQUEST, "").into_response(); + } + }; + + let (job, artifact_path, token_validity) = match ctx.0.job_for_token(&job_token).unwrap() { + Some(result) => result, + None => { + eprintln!("bad artifact post: headers: {:?}\njob token is not known", headers); + return (StatusCode::BAD_REQUEST, "").into_response(); + } + }; + + if token_validity != dbctx::TokenValidity::Valid { + eprintln!("bad artifact post: headers: {:?}. token is not valid: {:?}", headers, token_validity); + return (StatusCode::BAD_REQUEST, "").into_response(); + } + + let artifact_path = if let Some(artifact_path) = artifact_path { + artifact_path + } else { + eprintln!("bad artifact post: headers: {:?}. no artifact path?", headers); + return (StatusCode::BAD_REQUEST, "").into_response(); + }; + + let artifact_name = match headers.get("x-artifact-name") { + Some(artifact_name) => artifact_name.to_str().expect("valid string"), + None => { + eprintln!("bad artifact post: headers: {:?}\nno x-artifact-name", headers); + return (StatusCode::BAD_REQUEST, "").into_response(); + } + }; + + let artifact_desc = match headers.get("x-artifact-desc") { + Some(artifact_desc) => artifact_desc.to_str().expect("valid string"), + None => { + eprintln!("bad artifact post: headers: {:?}\nno x-artifact-desc", headers); + return (StatusCode::BAD_REQUEST, "").into_response(); + } + }; + + let mut artifact = match ctx.0.reserve_artifact(job, artifact_name, artifact_desc).await { + Ok(artifact) => artifact, + Err(err) => { + eprintln!("failure to reserve artifact: {:?}", err); + return (StatusCode::INTERNAL_SERVER_ERROR, "").into_response(); + } + }; + + spawn(async move { artifact.store_all(artifact_content).await }); + + (StatusCode::OK, "").into_response() +} + async fn handle_next_job(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClient>)>, job_resp: BodyStream) -> impl IntoResponse { let (tx_sender, tx_receiver) = mpsc::channel(8); let resp_body = StreamBody::new(ReceiverStream::new(tx_receiver)); @@ -308,6 +374,7 @@ async fn make_api_server(dbctx: Arc<DbCtx>) -> (Router, mpsc::Receiver<RunnerCli let router = Router::new() .route("/api/next_job", post(handle_next_job)) + .route("/api/artifact", post(handle_artifact)) .with_state((dbctx, pending_client_sender)); (router, pending_client_receiver) } diff --git a/src/ci_runner.rs b/src/ci_runner.rs index 6554f05..9224614 100644 --- a/src/ci_runner.rs +++ b/src/ci_runner.rs @@ -1,8 +1,14 @@ use std::time::Duration; use reqwest::{StatusCode, Response}; -use std::process::Command; +use tokio::process::Command; +use std::process::Stdio; +use std::process::ExitStatus; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use serde_derive::{Deserialize, Serialize}; use serde::{Deserialize, de::DeserializeOwned, Serialize}; +use std::task::{Context, Poll}; +use std::pin::Pin; +use std::marker::Unpin; #[derive(Debug)] enum WorkAcquireError { @@ -12,6 +18,7 @@ enum WorkAcquireError { } struct RunnerClient { + http: reqwest::Client, host: String, tx: hyper::body::Sender, rx: Response, @@ -28,75 +35,127 @@ struct RequestedJob { impl RequestedJob { // TODO: panics if hyper finds the channel is closed. hum async fn create_artifact(&self, client: &mut RunnerClient, name: &str, desc: &str) -> Result<ArtifactStream, String> { - let create_artifact_message = serde_json::json!({ - "kind": "artifact_create", - "name": name, - "description": desc, - "job_token": &self.build_token, - }); - client.send(create_artifact_message).await - .map_err(|e| format!("create artifact send error: {:?}", e))?; - let resp = client.recv().await - .map_err(|e| format!("create artifact recv error: {:?}", e))?; - eprintln!("resp: {:?}", resp); - let object_id = resp.unwrap() - .as_object().expect("is an object") - .get("object_id").unwrap().as_str().expect("is str") - .to_owned(); - // POST to this object id... - Ok(ArtifactStream { - object_id, - }) + eprintln!("[?] creating artifact..."); + let (mut sender, body) = hyper::Body::channel(); + let resp = client.http.post("https://ci.butactuallyin.space:9876/api/artifact") + .header("user-agent", "ci-butactuallyin-space-runner") + .header("x-job-token", &self.build_token) + .header("x-artifact-name", name) + .header("x-artifact-desc", desc) + .body(body) + .send() + .await + .map_err(|e| format!("unable to send request: {:?}", e))?; + + if resp.status() == StatusCode::OK { + eprintln!("[+] artifact '{}' started", name); + Ok(ArtifactStream { + sender, + }) + } else { + Err(format!("[-] unable to create artifact: {:?}", resp)) + } } - async fn execute_goodfile(&self, client: &mut RunnerClient) -> Result<String, String> { - let clone_log = self.create_artifact(client, "git clone log", &format!("git clone {} tmpdir", &self.remote_url)).await.expect("works"); + async fn execute_command(&self, client: &mut RunnerClient, mut command: Command, name: &str, desc: &str) -> Result<ExitStatus, String> { + eprintln!("[.] running {}", name); + async fn forward_data(mut source: impl AsyncRead + Unpin, mut dest: impl AsyncWrite + Unpin) -> Result<(), String> { + let mut buf = vec![0; 1024 * 1024]; + loop { + let n_read = source.read(&mut buf).await + .map_err(|e| format!("failed to read: {:?}", e))?; - let clone_res = Command::new("git") + if n_read == 0 { + return Ok(()); + } + + dest.write_all(&buf[..n_read]).await + .map_err(|e| format!("failed to write: {:?}", e))?; + } + } + + let stdout_artifact = self.create_artifact( + client, + &format!("{} (stdout)", name), + &format!("{} (stdout)", desc) + ).await.expect("works"); + let stderr_artifact = self.create_artifact( + client, + &format!("{} (stderr)", name), + &format!("{} (stderr)", desc) + ).await.expect("works"); + + let mut child = command + .stdin(Stdio::null()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + .map_err(|e| format!("failed to spawn '{}', {:?}", name, e))?; + + let child_stdout = child.stdout.take().unwrap(); + let child_stderr = child.stderr.take().unwrap(); + + eprintln!("[.] '{}': forwarding stdout", name); + tokio::spawn(forward_data(child_stdout, stdout_artifact)); + eprintln!("[.] '{}': forwarding stderr", name); + tokio::spawn(forward_data(child_stderr, stderr_artifact)); + + let res = child.wait().await + .map_err(|e| format!("failed to wait? {:?}", e))?; + + if res.success() { + eprintln!("[+] '{}' success", name); + } else { + eprintln!("[-] '{}' fail: {:?}", name, res); + } + + Ok(res) + } + + async fn execute_goodfile(&self, client: &mut RunnerClient) -> Result<String, String> { + let mut git_clone = Command::new("git"); + git_clone .arg("clone") .arg(&self.remote_url) - .arg("tmpdir") - .status() - .map_err(|e| format!("failed to run git clone? {:?}", e))?; + .arg("tmpdir"); + + let clone_res = self.execute_command(client, git_clone, "git clone log", &format!("git clone {} tmpdir", &self.remote_url)).await?; if !clone_res.success() { return Err(format!("git clone failed: {:?}", clone_res)); } - let checkout_log = self.create_artifact(client, "git checkout log", &format!("git checkout {}", &self.commit)).await.expect("works"); - - let checkout_res = Command::new("git") + let mut git_checkout = Command::new("git"); + git_checkout .current_dir("tmpdir") .arg("checkout") - .arg(&self.commit) - .status() - .map_err(|e| format!("failed to run git checkout? {:?}", e))?; + .arg(&self.commit); + + let checkout_res = self.execute_command(client, git_checkout, "git checkout log", &format!("git checkout {}", &self.commit)).await?; if !checkout_res.success() { return Err(format!("git checkout failed: {:?}", checkout_res)); } - let build_log = self.create_artifact(client, "cargo build log", "cargo build").await.expect("works"); - - let build_res = Command::new("cargo") + let mut build = Command::new("cargo"); + build .current_dir("tmpdir") - .arg("build") - .status() - .map_err(|e| format!("failed to run cargo build? {:?}", e))?; + .arg("build"); + + let build_res = self.execute_command(client, build, "cargo build log", "cargo build").await?; if !build_res.success() { return Err(format!("cargo build failed: {:?}", build_res)); } - let test_log = self.create_artifact(client, "cargo test log", "cargo test").await.expect("works"); - - let test_result = Command::new("cargo") + let mut test = Command::new("cargo"); + test .current_dir("tmpdir") - .arg("test") - .status() - .map_err(|e| format!("failed to run cargo test? {:?}", e))?; + .arg("test"); + + let test_res = self.execute_command(client, test, "cargo test log", "cargo test").await?; - match test_result.code() { + match test_res.code() { Some(0) => Ok("pass".to_string()), Some(n) => Ok(format!("error: {}", n)), None => Ok(format!("abnormal exit")), @@ -105,7 +164,38 @@ impl RequestedJob { } struct ArtifactStream { - object_id: String, + sender: hyper::body::Sender, +} + +impl tokio::io::AsyncWrite for ArtifactStream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &[u8] + ) -> Poll<Result<usize, std::io::Error>> { + match self.get_mut().sender.try_send_data(buf.to_vec().into()) { + Ok(()) => { + Poll::Ready(Ok(buf.len())) + }, + _ => { + Poll::Pending + } + } + } + + fn poll_flush( + self: Pin<&mut Self>, + _cx: &mut Context + ) -> Poll<Result<(), std::io::Error>> { + Poll::Ready(Ok(())) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + _cx: &mut Context + ) -> Poll<Result<(), std::io::Error>> { + Poll::Ready(Ok(())) + } } impl RunnerClient { @@ -120,6 +210,11 @@ impl RunnerClient { } Ok(Self { + http: reqwest::ClientBuilder::new() + .connect_timeout(Duration::from_millis(1000)) + .timeout(Duration::from_millis(600000)) + .build() + .expect("can build client"), host: host.to_string(), tx: sender, rx: res, @@ -187,6 +282,8 @@ impl RunnerClient { match res { Ok(status) => { + eprintln!("[+] job success!"); + self.send(serde_json::json!({ "kind": "job_status", "state": "finished", @@ -194,6 +291,8 @@ impl RunnerClient { })).await.unwrap(); } Err(status) => { + eprintln!("[-] job error: {}", status); + self.send(serde_json::json!({ "kind": "job_status", "state": "interrupted", @@ -213,6 +312,8 @@ async fn main() { .build() .expect("can build client"); + let allowed_pushers = None; + loop { let (mut sender, body) = hyper::Body::channel(); let poll = client.post("https://ci.butactuallyin.space:9876/api/next_job") diff --git a/src/dbctx.rs b/src/dbctx.rs index 3af2d56..d025b8d 100644 --- a/src/dbctx.rs +++ b/src/dbctx.rs @@ -1,10 +1,15 @@ use std::sync::Mutex; +use futures_util::StreamExt; use rusqlite::{Connection, OptionalExtension}; use std::time::{SystemTime, UNIX_EPOCH}; +use tokio::fs::{File, OpenOptions}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; use crate::notifier::{RemoteNotifier, NotifierConfig}; use crate::sql; +const TOKEN_EXPIRY_MS: u64 = 1000 * 60 * 30; + pub struct DbCtx { pub config_path: String, // don't love this but.. for now... @@ -22,6 +27,61 @@ pub struct PendingJob { pub created_time: u64, } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum TokenValidity { + Expired, + Invalid, + Valid, +} + +pub struct ArtifactDescriptor { + job_id: u64, + artifact_id: u64, + file: File, +} + +impl ArtifactDescriptor { + async fn new(job_id: u64, artifact_id: u64) -> Result<Self, String> { + // TODO: jobs should be a configurable path + let path = format!("jobs/{}/{}", job_id, artifact_id); + let file = OpenOptions::new() + .read(true) + .write(true) + .create_new(true) + .open(&path) + .await + .map_err(|e| format!("couldn't open artifact file {}: {}", path, e))?; + + Ok(ArtifactDescriptor { + job_id, + artifact_id, + file, + }) + } + + pub async fn store_all(&mut self, mut data: axum::extract::BodyStream) -> Result<(), String> { + loop { + let chunk = data.next().await; + + let chunk = match chunk { + Some(Ok(chunk)) => chunk, + Some(Err(e)) => { + return Err(format!("error reading: {:?}", e)); + } + None => { + eprintln!("body done?"); + return Ok(()); + } + }; + + let chunk = chunk.as_ref(); + + self.file.write_all(chunk).await + .map_err(|e| format!("failed to write: {:?}", e))?; + } + } +} + impl DbCtx { pub fn new(config_path: &str, db_path: &str) -> Self { DbCtx { @@ -32,6 +92,7 @@ impl DbCtx { pub fn create_tables(&self) -> Result<(), ()> { let conn = self.conn.lock().unwrap(); + conn.execute(sql::CREATE_ARTIFACTS_TABLE, ()).unwrap(); conn.execute(sql::CREATE_JOBS_TABLE, ()).unwrap(); conn.execute(sql::CREATE_COMMITS_TABLE, ()).unwrap(); conn.execute(sql::CREATE_REPOS_TABLE, ()).unwrap(); @@ -68,6 +129,56 @@ impl DbCtx { Ok(conn.last_insert_rowid() as u64) } + pub async fn reserve_artifact(&self, job_id: u64, name: &str, desc: &str) -> Result<ArtifactDescriptor, String> { + let artifact_id = { + let conn = self.conn.lock().unwrap(); + conn + .execute( + "insert into artifacts (job_id, name, desc) values (?1, ?2, ?3)", + (job_id, name, desc) + ) + .map_err(|e| { + format!("{:?}", e) + })?; + + conn.last_insert_rowid() as u64 + }; + + ArtifactDescriptor::new(job_id, artifact_id).await + } + + pub fn job_for_token(&self, token: &str) -> Result<Option<(u64, Option<String>, TokenValidity)>, String> { + self.conn.lock() + .unwrap() + .query_row( + "select id, artifacts_path, started_time, job_timeout from jobs where build_token=?1", + [token], + |row| { + let timeout: Option<u64> = row.get(3).unwrap(); + let timeout = timeout.unwrap_or(TOKEN_EXPIRY_MS); + + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("now is before epoch") + .as_millis(); + + let time: Option<u64> = row.get(2).unwrap(); + let validity = if let Some(time) = time { + if now > time as u128 + timeout as u128 { + TokenValidity::Expired + } else { + TokenValidity::Valid + } + } else { + TokenValidity::Invalid + }; + Ok((row.get(0).unwrap(), row.get(1).unwrap(), validity)) + } + ) + .optional() + .map_err(|e| e.to_string()) + } + pub fn repo_id_by_remote(&self, remote_id: u64) -> Result<Option<u64>, String> { self.conn.lock() .unwrap() @@ -89,6 +200,9 @@ impl DbCtx { "github" => { (remote.to_owned(), remote_kind.to_owned(), format!("https://www.github.com/{}", remote), format!("https://www.github.com/{}.git", remote)) }, + "github-email" => { + (remote.to_owned(), "email".to_owned(), format!("https://www.github.com/{}", remote), format!("http://www.github.com/{}.git", remote)) + }, other => { panic!("unsupported remote kind: {}", other); } diff --git a/src/notifier.rs b/src/notifier.rs index 4ddc91b..3d9964a 100644 --- a/src/notifier.rs +++ b/src/notifier.rs @@ -1,6 +1,11 @@ use serde_derive::{Deserialize, Serialize}; use std::sync::Arc; use axum::http::StatusCode; +use lettre::transport::smtp::authentication::{Credentials, Mechanism}; +use lettre::{Message, Transport}; +use lettre::transport::smtp::extension::ClientId; +use lettre::transport::smtp::client::{SmtpConnection, TlsParametersBuilder}; +use std::time::Duration; use crate::DbCtx; @@ -38,7 +38,8 @@ pub const CREATE_JOBS_TABLE: &'static str = "\ commit_id INTEGER, created_time INTEGER, started_time INTEGER, - complete_time INTEGER);"; + complete_time INTEGER, + job_timeout INTEGER);"; pub const CREATE_COMMITS_TABLE: &'static str = "\ CREATE TABLE IF NOT EXISTS commits (id INTEGER PRIMARY KEY AUTOINCREMENT, sha TEXT UNIQUE);"; @@ -62,6 +63,12 @@ pub const CREATE_REMOTES_TABLE: &'static str = "\ remote_git_url TEXT, notifier_config_path TEXT);"; +pub const CREATE_ARTIFACTS_TABLE: &'static str = "\ + CREATE TABLE IF NOT EXISTS artifacts (id INTEGER PRIMARY KEY AUTOINCREMENT, + job_id INTEGER, + name TEXT, + desc TEXT);"; + pub const CREATE_REMOTES_INDEX: &'static str = "\ CREATE INDEX IF NOT EXISTS 'repo_to_remote' ON remotes(repo_id);"; |