From 9e6906c00c49186189d211dc96e132d85e7ff641 Mon Sep 17 00:00:00 2001 From: iximeow Date: Thu, 13 Jul 2023 00:51:51 -0700 Subject: reorganize the whole thing into crates/ packages --- ci-lib-native/src/dbctx_ext.rs | 62 ++++++++++++++ ci-lib-native/src/io.rs | 174 +++++++++++++++++++++++++++++++++++++++ ci-lib-native/src/lib.rs | 3 + ci-lib-native/src/notifier.rs | 181 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 420 insertions(+) create mode 100644 ci-lib-native/src/dbctx_ext.rs create mode 100644 ci-lib-native/src/io.rs create mode 100644 ci-lib-native/src/lib.rs create mode 100644 ci-lib-native/src/notifier.rs (limited to 'ci-lib-native/src') diff --git a/ci-lib-native/src/dbctx_ext.rs b/ci-lib-native/src/dbctx_ext.rs new file mode 100644 index 0000000..44436fc --- /dev/null +++ b/ci-lib-native/src/dbctx_ext.rs @@ -0,0 +1,62 @@ +use crate::io::ArtifactDescriptor; +use crate::notifier::{RemoteNotifier, NotifierConfig}; +use tokio::fs::{File, OpenOptions}; + +use ci_lib_core::dbctx::DbCtx; + +pub fn notifiers_by_repo(ctx: &DbCtx, repo_id: u64) -> Result, String> { + let remotes = ctx.remotes_by_repo(repo_id)?; + + let mut notifiers: Vec = Vec::new(); + + for remote in remotes.into_iter() { + match remote.remote_api.as_str() { + "github" => { + let mut notifier_path = ctx.config_path.clone(); + notifier_path.push(&remote.notifier_config_path); + + let notifier = RemoteNotifier { + remote_path: remote.remote_path, + notifier: NotifierConfig::github_from_file(¬ifier_path) + .expect("can load notifier config") + }; + notifiers.push(notifier); + }, + "email" => { + let mut notifier_path = ctx.config_path.clone(); + notifier_path.push(&remote.notifier_config_path); + + let notifier = RemoteNotifier { + remote_path: remote.remote_path, + notifier: NotifierConfig::email_from_file(¬ifier_path) + .expect("can load notifier config") + }; + notifiers.push(notifier); + } + other => { + eprintln!("unknown remote api kind: {:?}, remote is {:?}", other, &remote) + } + } + } + + Ok(notifiers) +} + +pub async fn reserve_artifact(ctx: &DbCtx, run_id: u64, name: &str, desc: &str) -> Result { + let artifact_id = { + let created_time = ci_lib_core::now_ms(); + let conn = ctx.conn.lock().unwrap(); + conn + .execute( + "insert into artifacts (run_id, name, desc, created_time) values (?1, ?2, ?3, ?4)", + (run_id, name, desc, created_time) + ) + .map_err(|e| { + format!("{:?}", e) + })?; + + conn.last_insert_rowid() as u64 + }; + + ArtifactDescriptor::new(run_id, artifact_id).await +} diff --git a/ci-lib-native/src/io.rs b/ci-lib-native/src/io.rs new file mode 100644 index 0000000..d41349c --- /dev/null +++ b/ci-lib-native/src/io.rs @@ -0,0 +1,174 @@ +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use futures_util::StreamExt; +use tokio::fs::File; +use std::io::Write; +use tokio::fs::OpenOptions; +use std::task::{Poll, Context}; +use std::pin::Pin; +use std::time::{UNIX_EPOCH, SystemTime}; +use std::sync::{Arc, Mutex}; + +#[derive(Clone)] +pub struct VecSink { + body: Arc>>, +} + +impl VecSink { + pub fn new() -> Self { + Self { body: Arc::new(Mutex::new(Vec::new())) } + } + + pub fn take_buf(&self) -> Vec { + std::mem::replace(&mut *self.body.lock().unwrap(), Vec::new()) + } +} + +impl tokio::io::AsyncWrite for VecSink { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &[u8] + ) -> Poll> { + self.body.lock().unwrap().extend_from_slice(buf); + Poll::Ready(Ok(buf.len())) + } + + fn poll_flush( + self: Pin<&mut Self>, + _cx: &mut Context + ) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + _cx: &mut Context + ) -> Poll> { + Poll::Ready(Ok(())) + } +} + +pub struct ArtifactStream { + sender: hyper::body::Sender, +} + +impl ArtifactStream { + pub fn new(sender: hyper::body::Sender) -> Self { + Self { sender } + } +} + +impl tokio::io::AsyncWrite for ArtifactStream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &[u8] + ) -> Poll> { + match self.get_mut().sender.try_send_data(buf.to_vec().into()) { + Ok(()) => { + Poll::Ready(Ok(buf.len())) + }, + _ => { + cx.waker().wake_by_ref(); + Poll::Pending + } + } + } + + fn poll_flush( + self: Pin<&mut Self>, + _cx: &mut Context + ) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + _cx: &mut Context + ) -> Poll> { + Poll::Ready(Ok(())) + } +} + + +pub struct ArtifactDescriptor { + job_id: u64, + pub artifact_id: u64, + file: File, +} + +impl ArtifactDescriptor { + pub async fn new(job_id: u64, artifact_id: u64) -> Result { + // TODO: jobs should be a configurable path + let path = format!("artifacts/{}/{}", job_id, artifact_id); + let file = OpenOptions::new() + .read(true) + .write(true) + .create_new(true) + .open(&path) + .await + .map_err(|e| format!("couldn't open artifact file {}: {}", path, e))?; + + Ok(ArtifactDescriptor { + job_id, + artifact_id, + file, + }) + } + + pub async fn store_all(&mut self, mut data: axum::extract::BodyStream) -> Result<(), String> { + loop { + let chunk = data.next().await; + + let chunk = match chunk { + Some(Ok(chunk)) => chunk, + Some(Err(e)) => { + eprintln!("error: {:?}", e); + return Err(format!("error reading: {:?}", e)); + } + None => { + eprintln!("body done?"); + return Ok(()); + } + }; + + let chunk = chunk.as_ref(); + + self.file.write_all(chunk).await + .map_err(|e| format!("failed to write: {:?}", e))?; + } + } +} + +pub async fn forward_data(source: &mut (impl AsyncRead + Unpin), dest: &mut (impl AsyncWrite + Unpin)) -> Result<(), String> { + let mut buf = vec![0; 1024 * 1024]; + loop { + let n_read = source.read(&mut buf).await + .map_err(|e| format!("failed to read: {:?}", e))?; + + if n_read == 0 { + eprintln!("done reading!"); + return Ok(()); + } + + dest.write_all(&buf[..n_read]).await + .map_err(|e| format!("failed to write: {:?}", e))?; + } +} +/* +pub async fn forward_data(source: &mut (impl AsyncRead + Unpin), dest: &mut ArtifactStream) -> Result<(), String> { + let mut buf = vec![0; 1024 * 1024]; + loop { + let n_read = source.read(&mut buf).await + .map_err(|e| format!("failed to read: {:?}", e))?; + + if n_read == 0 { + eprintln!("done reading!"); + return Ok(()); + } + + dest.sender.send_data(buf[..n_read].to_vec().into()).await + .map_err(|e| format!("failed to write: {:?}", e))?; + } +} +*/ diff --git a/ci-lib-native/src/lib.rs b/ci-lib-native/src/lib.rs new file mode 100644 index 0000000..74cb710 --- /dev/null +++ b/ci-lib-native/src/lib.rs @@ -0,0 +1,3 @@ +pub mod io; +pub mod dbctx_ext; +pub mod notifier; diff --git a/ci-lib-native/src/notifier.rs b/ci-lib-native/src/notifier.rs new file mode 100644 index 0000000..dd4a35c --- /dev/null +++ b/ci-lib-native/src/notifier.rs @@ -0,0 +1,181 @@ +use serde::{Deserialize, Serialize}; +use std::sync::Arc; +use axum::http::StatusCode; +use lettre::transport::smtp::authentication::{Credentials, Mechanism}; +use lettre::{Message, Transport}; +use lettre::transport::smtp::extension::ClientId; +use lettre::transport::smtp::client::{SmtpConnection, TlsParametersBuilder}; +use std::time::Duration; +use std::path::Path; + +use ci_lib_core::dbctx::DbCtx; + +pub struct RemoteNotifier { + pub remote_path: String, + pub notifier: NotifierConfig, +} + +#[derive(Serialize, Deserialize)] +#[serde(untagged)] +pub enum NotifierConfig { + GitHub { + token: String, + }, + Email { + username: String, + password: String, + mailserver: String, + from: String, + to: String, + } +} + +impl NotifierConfig { + pub fn github_from_file>(path: P) -> Result { + let path = path.as_ref(); + let bytes = std::fs::read(path) + .map_err(|e| format!("can't read notifier config at {}: {:?}", path.display(), e))?; + let config = serde_json::from_slice(&bytes) + .map_err(|e| format!("can't deserialize notifier config at {}: {:?}", path.display(), e))?; + + if matches!(config, NotifierConfig::GitHub { .. }) { + Ok(config) + } else { + Err(format!("config at {} doesn't look like a github config (but was otherwise valid?)", path.display())) + } + } + + pub fn email_from_file>(path: P) -> Result { + let path = path.as_ref(); + let bytes = std::fs::read(path) + .map_err(|e| format!("can't read notifier config at {}: {:?}", path.display(), e))?; + let config = serde_json::from_slice(&bytes) + .map_err(|e| format!("can't deserialize notifier config at {}: {:?}", path.display(), e))?; + + if matches!(config, NotifierConfig::Email { .. }) { + Ok(config) + } else { + Err(format!("config at {} doesn't look like an email config (but was otherwise valid?)", path.display())) + } + } +} + +impl RemoteNotifier { + pub async fn tell_pending_job(&self, ctx: &Arc, repo_id: u64, sha: &str, job_id: u64) -> Result<(), String> { + self.tell_job_status( + ctx, + repo_id, sha, job_id, + "pending", "build is queued", &format!("https://{}/{}/{}", "ci.butactuallyin.space", &self.remote_path, sha) + ).await + } + + pub async fn tell_complete_job(&self, ctx: &Arc, repo_id: u64, sha: &str, job_id: u64, desc: Result) -> Result<(), String> { + match desc { + Ok(status) => { + self.tell_job_status( + ctx, + repo_id, sha, job_id, + "success", &status, &format!("https://{}/{}/{}", "ci.butactuallyin.space", &self.remote_path, sha) + ).await + }, + Err(status) => { + self.tell_job_status( + ctx, + repo_id, sha, job_id, + "failure", &status, &format!("https://{}/{}/{}", "ci.butactuallyin.space", &self.remote_path, sha) + ).await + } + } + } + + pub async fn tell_job_status(&self, ctx: &Arc, repo_id: u64, sha: &str, job_id: u64, state: &str, desc: &str, target_url: &str) -> Result<(), String> { + match &self.notifier { + NotifierConfig::GitHub { token } => { + let status_info = serde_json::json!({ + "state": state, + "description": desc, + "target_url": target_url, + "context": "actuallyinspace runner", + }); + + // TODO: should pool (probably in ctx?) to have an upper bound in concurrent + // connections. + let client = reqwest::Client::new(); + let req = client.post(&format!("https://api.github.com/repos/{}/statuses/{}", &self.remote_path, sha)) + .body(serde_json::to_string(&status_info).expect("can stringify json")) + .header("content-type", "application/json") + .header("user-agent", "iximeow") + .header("authorization", format!("Bearer {}", token)) + .header("accept", "application/vnd.github+json"); + eprintln!("sending {:?}", req); + eprintln!(" body: {}", serde_json::to_string(&status_info).expect("can stringify json")); + let res = req + .send() + .await; + + match res { + Ok(res) => { + if res.status() == StatusCode::OK || res.status() == StatusCode::CREATED{ + Ok(()) + } else { + Err(format!("bad response: {}, response data: {:?}", res.status().as_u16(), res)) + } + } + Err(e) => { + Err(format!("failure sending request: {:?}", e)) + } + } + } + NotifierConfig::Email { username, password, mailserver, from, to } => { + eprintln!("[.] emailing {} for job {} via {}", state, &self.remote_path, mailserver); + + let subject = format!("{}: job for {}", state, &self.remote_path); + + let body = format!("{}", subject); + + // TODO: when ci.butactuallyin.space has valid certs again, ... fix this. + let tls = TlsParametersBuilder::new(mailserver.to_string()) + .dangerous_accept_invalid_certs(true) + .build() + .unwrap(); + + let mut mailer = SmtpConnection::connect( + mailserver, + Some(Duration::from_millis(5000)), + &ClientId::Domain("ci.butactuallyin.space".to_string()), + None, + None, + ).unwrap(); + + mailer.starttls( + &tls, + &ClientId::Domain("ci.butactuallyin.space".to_string()), + ).unwrap(); + + let resp = mailer.auth( + &[Mechanism::Plain, Mechanism::Login], + &Credentials::new(username.to_owned(), password.to_owned()) + ).unwrap(); + assert!(resp.is_positive()); + + let email = Message::builder() + .from(from.parse().unwrap()) + .to(to.parse().unwrap()) + .subject(&subject) + .body(body) + .unwrap(); + + match mailer.send(email.envelope(), &email.formatted()) { + Ok(_) => { + eprintln!("[+] notified {}@{}", username, mailserver); + Ok(()) + } + Err(e) => { + eprintln!("[-] could not send email: {:?}", e); + Err(e.to_string()) + } + } + } + } + } +} -- cgit v1.1