summaryrefslogtreecommitdiff
path: root/ci-lib-native/src
diff options
context:
space:
mode:
authoriximeow <me@iximeow.net>2023-07-13 00:51:51 -0700
committeriximeow <me@iximeow.net>2023-07-13 00:51:51 -0700
commit9e6906c00c49186189d211dc96e132d85e7ff641 (patch)
tree05c20145ebc306313e3a12dc73c34b5dea40bbdc /ci-lib-native/src
parent543150f1666690351d4698421cc6ceb115c1e251 (diff)
reorganize the whole thing into crates/ packages
Diffstat (limited to 'ci-lib-native/src')
-rw-r--r--ci-lib-native/src/dbctx_ext.rs62
-rw-r--r--ci-lib-native/src/io.rs174
-rw-r--r--ci-lib-native/src/lib.rs3
-rw-r--r--ci-lib-native/src/notifier.rs181
4 files changed, 420 insertions, 0 deletions
diff --git a/ci-lib-native/src/dbctx_ext.rs b/ci-lib-native/src/dbctx_ext.rs
new file mode 100644
index 0000000..44436fc
--- /dev/null
+++ b/ci-lib-native/src/dbctx_ext.rs
@@ -0,0 +1,62 @@
+use crate::io::ArtifactDescriptor;
+use crate::notifier::{RemoteNotifier, NotifierConfig};
+use tokio::fs::{File, OpenOptions};
+
+use ci_lib_core::dbctx::DbCtx;
+
+pub fn notifiers_by_repo(ctx: &DbCtx, repo_id: u64) -> Result<Vec<RemoteNotifier>, String> {
+ let remotes = ctx.remotes_by_repo(repo_id)?;
+
+ let mut notifiers: Vec<RemoteNotifier> = Vec::new();
+
+ for remote in remotes.into_iter() {
+ match remote.remote_api.as_str() {
+ "github" => {
+ let mut notifier_path = ctx.config_path.clone();
+ notifier_path.push(&remote.notifier_config_path);
+
+ let notifier = RemoteNotifier {
+ remote_path: remote.remote_path,
+ notifier: NotifierConfig::github_from_file(&notifier_path)
+ .expect("can load notifier config")
+ };
+ notifiers.push(notifier);
+ },
+ "email" => {
+ let mut notifier_path = ctx.config_path.clone();
+ notifier_path.push(&remote.notifier_config_path);
+
+ let notifier = RemoteNotifier {
+ remote_path: remote.remote_path,
+ notifier: NotifierConfig::email_from_file(&notifier_path)
+ .expect("can load notifier config")
+ };
+ notifiers.push(notifier);
+ }
+ other => {
+ eprintln!("unknown remote api kind: {:?}, remote is {:?}", other, &remote)
+ }
+ }
+ }
+
+ Ok(notifiers)
+}
+
+pub async fn reserve_artifact(ctx: &DbCtx, run_id: u64, name: &str, desc: &str) -> Result<ArtifactDescriptor, String> {
+ let artifact_id = {
+ let created_time = ci_lib_core::now_ms();
+ let conn = ctx.conn.lock().unwrap();
+ conn
+ .execute(
+ "insert into artifacts (run_id, name, desc, created_time) values (?1, ?2, ?3, ?4)",
+ (run_id, name, desc, created_time)
+ )
+ .map_err(|e| {
+ format!("{:?}", e)
+ })?;
+
+ conn.last_insert_rowid() as u64
+ };
+
+ ArtifactDescriptor::new(run_id, artifact_id).await
+}
diff --git a/ci-lib-native/src/io.rs b/ci-lib-native/src/io.rs
new file mode 100644
index 0000000..d41349c
--- /dev/null
+++ b/ci-lib-native/src/io.rs
@@ -0,0 +1,174 @@
+use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
+use futures_util::StreamExt;
+use tokio::fs::File;
+use std::io::Write;
+use tokio::fs::OpenOptions;
+use std::task::{Poll, Context};
+use std::pin::Pin;
+use std::time::{UNIX_EPOCH, SystemTime};
+use std::sync::{Arc, Mutex};
+
+#[derive(Clone)]
+pub struct VecSink {
+ body: Arc<Mutex<Vec<u8>>>,
+}
+
+impl VecSink {
+ pub fn new() -> Self {
+ Self { body: Arc::new(Mutex::new(Vec::new())) }
+ }
+
+ pub fn take_buf(&self) -> Vec<u8> {
+ std::mem::replace(&mut *self.body.lock().unwrap(), Vec::new())
+ }
+}
+
+impl tokio::io::AsyncWrite for VecSink {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context,
+ buf: &[u8]
+ ) -> Poll<Result<usize, std::io::Error>> {
+ self.body.lock().unwrap().extend_from_slice(buf);
+ Poll::Ready(Ok(buf.len()))
+ }
+
+ fn poll_flush(
+ self: Pin<&mut Self>,
+ _cx: &mut Context
+ ) -> Poll<Result<(), std::io::Error>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn poll_shutdown(
+ self: Pin<&mut Self>,
+ _cx: &mut Context
+ ) -> Poll<Result<(), std::io::Error>> {
+ Poll::Ready(Ok(()))
+ }
+}
+
+pub struct ArtifactStream {
+ sender: hyper::body::Sender,
+}
+
+impl ArtifactStream {
+ pub fn new(sender: hyper::body::Sender) -> Self {
+ Self { sender }
+ }
+}
+
+impl tokio::io::AsyncWrite for ArtifactStream {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context,
+ buf: &[u8]
+ ) -> Poll<Result<usize, std::io::Error>> {
+ match self.get_mut().sender.try_send_data(buf.to_vec().into()) {
+ Ok(()) => {
+ Poll::Ready(Ok(buf.len()))
+ },
+ _ => {
+ cx.waker().wake_by_ref();
+ Poll::Pending
+ }
+ }
+ }
+
+ fn poll_flush(
+ self: Pin<&mut Self>,
+ _cx: &mut Context
+ ) -> Poll<Result<(), std::io::Error>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn poll_shutdown(
+ self: Pin<&mut Self>,
+ _cx: &mut Context
+ ) -> Poll<Result<(), std::io::Error>> {
+ Poll::Ready(Ok(()))
+ }
+}
+
+
+pub struct ArtifactDescriptor {
+ job_id: u64,
+ pub artifact_id: u64,
+ file: File,
+}
+
+impl ArtifactDescriptor {
+ pub async fn new(job_id: u64, artifact_id: u64) -> Result<Self, String> {
+ // TODO: jobs should be a configurable path
+ let path = format!("artifacts/{}/{}", job_id, artifact_id);
+ let file = OpenOptions::new()
+ .read(true)
+ .write(true)
+ .create_new(true)
+ .open(&path)
+ .await
+ .map_err(|e| format!("couldn't open artifact file {}: {}", path, e))?;
+
+ Ok(ArtifactDescriptor {
+ job_id,
+ artifact_id,
+ file,
+ })
+ }
+
+ pub async fn store_all(&mut self, mut data: axum::extract::BodyStream) -> Result<(), String> {
+ loop {
+ let chunk = data.next().await;
+
+ let chunk = match chunk {
+ Some(Ok(chunk)) => chunk,
+ Some(Err(e)) => {
+ eprintln!("error: {:?}", e);
+ return Err(format!("error reading: {:?}", e));
+ }
+ None => {
+ eprintln!("body done?");
+ return Ok(());
+ }
+ };
+
+ let chunk = chunk.as_ref();
+
+ self.file.write_all(chunk).await
+ .map_err(|e| format!("failed to write: {:?}", e))?;
+ }
+ }
+}
+
+pub async fn forward_data(source: &mut (impl AsyncRead + Unpin), dest: &mut (impl AsyncWrite + Unpin)) -> Result<(), String> {
+ let mut buf = vec![0; 1024 * 1024];
+ loop {
+ let n_read = source.read(&mut buf).await
+ .map_err(|e| format!("failed to read: {:?}", e))?;
+
+ if n_read == 0 {
+ eprintln!("done reading!");
+ return Ok(());
+ }
+
+ dest.write_all(&buf[..n_read]).await
+ .map_err(|e| format!("failed to write: {:?}", e))?;
+ }
+}
+/*
+pub async fn forward_data(source: &mut (impl AsyncRead + Unpin), dest: &mut ArtifactStream) -> Result<(), String> {
+ let mut buf = vec![0; 1024 * 1024];
+ loop {
+ let n_read = source.read(&mut buf).await
+ .map_err(|e| format!("failed to read: {:?}", e))?;
+
+ if n_read == 0 {
+ eprintln!("done reading!");
+ return Ok(());
+ }
+
+ dest.sender.send_data(buf[..n_read].to_vec().into()).await
+ .map_err(|e| format!("failed to write: {:?}", e))?;
+ }
+}
+*/
diff --git a/ci-lib-native/src/lib.rs b/ci-lib-native/src/lib.rs
new file mode 100644
index 0000000..74cb710
--- /dev/null
+++ b/ci-lib-native/src/lib.rs
@@ -0,0 +1,3 @@
+pub mod io;
+pub mod dbctx_ext;
+pub mod notifier;
diff --git a/ci-lib-native/src/notifier.rs b/ci-lib-native/src/notifier.rs
new file mode 100644
index 0000000..dd4a35c
--- /dev/null
+++ b/ci-lib-native/src/notifier.rs
@@ -0,0 +1,181 @@
+use serde::{Deserialize, Serialize};
+use std::sync::Arc;
+use axum::http::StatusCode;
+use lettre::transport::smtp::authentication::{Credentials, Mechanism};
+use lettre::{Message, Transport};
+use lettre::transport::smtp::extension::ClientId;
+use lettre::transport::smtp::client::{SmtpConnection, TlsParametersBuilder};
+use std::time::Duration;
+use std::path::Path;
+
+use ci_lib_core::dbctx::DbCtx;
+
+pub struct RemoteNotifier {
+ pub remote_path: String,
+ pub notifier: NotifierConfig,
+}
+
+#[derive(Serialize, Deserialize)]
+#[serde(untagged)]
+pub enum NotifierConfig {
+ GitHub {
+ token: String,
+ },
+ Email {
+ username: String,
+ password: String,
+ mailserver: String,
+ from: String,
+ to: String,
+ }
+}
+
+impl NotifierConfig {
+ pub fn github_from_file<P: AsRef<Path>>(path: P) -> Result<Self, String> {
+ let path = path.as_ref();
+ let bytes = std::fs::read(path)
+ .map_err(|e| format!("can't read notifier config at {}: {:?}", path.display(), e))?;
+ let config = serde_json::from_slice(&bytes)
+ .map_err(|e| format!("can't deserialize notifier config at {}: {:?}", path.display(), e))?;
+
+ if matches!(config, NotifierConfig::GitHub { .. }) {
+ Ok(config)
+ } else {
+ Err(format!("config at {} doesn't look like a github config (but was otherwise valid?)", path.display()))
+ }
+ }
+
+ pub fn email_from_file<P: AsRef<Path>>(path: P) -> Result<Self, String> {
+ let path = path.as_ref();
+ let bytes = std::fs::read(path)
+ .map_err(|e| format!("can't read notifier config at {}: {:?}", path.display(), e))?;
+ let config = serde_json::from_slice(&bytes)
+ .map_err(|e| format!("can't deserialize notifier config at {}: {:?}", path.display(), e))?;
+
+ if matches!(config, NotifierConfig::Email { .. }) {
+ Ok(config)
+ } else {
+ Err(format!("config at {} doesn't look like an email config (but was otherwise valid?)", path.display()))
+ }
+ }
+}
+
+impl RemoteNotifier {
+ pub async fn tell_pending_job(&self, ctx: &Arc<DbCtx>, repo_id: u64, sha: &str, job_id: u64) -> Result<(), String> {
+ self.tell_job_status(
+ ctx,
+ repo_id, sha, job_id,
+ "pending", "build is queued", &format!("https://{}/{}/{}", "ci.butactuallyin.space", &self.remote_path, sha)
+ ).await
+ }
+
+ pub async fn tell_complete_job(&self, ctx: &Arc<DbCtx>, repo_id: u64, sha: &str, job_id: u64, desc: Result<String, String>) -> Result<(), String> {
+ match desc {
+ Ok(status) => {
+ self.tell_job_status(
+ ctx,
+ repo_id, sha, job_id,
+ "success", &status, &format!("https://{}/{}/{}", "ci.butactuallyin.space", &self.remote_path, sha)
+ ).await
+ },
+ Err(status) => {
+ self.tell_job_status(
+ ctx,
+ repo_id, sha, job_id,
+ "failure", &status, &format!("https://{}/{}/{}", "ci.butactuallyin.space", &self.remote_path, sha)
+ ).await
+ }
+ }
+ }
+
+ pub async fn tell_job_status(&self, ctx: &Arc<DbCtx>, repo_id: u64, sha: &str, job_id: u64, state: &str, desc: &str, target_url: &str) -> Result<(), String> {
+ match &self.notifier {
+ NotifierConfig::GitHub { token } => {
+ let status_info = serde_json::json!({
+ "state": state,
+ "description": desc,
+ "target_url": target_url,
+ "context": "actuallyinspace runner",
+ });
+
+ // TODO: should pool (probably in ctx?) to have an upper bound in concurrent
+ // connections.
+ let client = reqwest::Client::new();
+ let req = client.post(&format!("https://api.github.com/repos/{}/statuses/{}", &self.remote_path, sha))
+ .body(serde_json::to_string(&status_info).expect("can stringify json"))
+ .header("content-type", "application/json")
+ .header("user-agent", "iximeow")
+ .header("authorization", format!("Bearer {}", token))
+ .header("accept", "application/vnd.github+json");
+ eprintln!("sending {:?}", req);
+ eprintln!(" body: {}", serde_json::to_string(&status_info).expect("can stringify json"));
+ let res = req
+ .send()
+ .await;
+
+ match res {
+ Ok(res) => {
+ if res.status() == StatusCode::OK || res.status() == StatusCode::CREATED{
+ Ok(())
+ } else {
+ Err(format!("bad response: {}, response data: {:?}", res.status().as_u16(), res))
+ }
+ }
+ Err(e) => {
+ Err(format!("failure sending request: {:?}", e))
+ }
+ }
+ }
+ NotifierConfig::Email { username, password, mailserver, from, to } => {
+ eprintln!("[.] emailing {} for job {} via {}", state, &self.remote_path, mailserver);
+
+ let subject = format!("{}: job for {}", state, &self.remote_path);
+
+ let body = format!("{}", subject);
+
+ // TODO: when ci.butactuallyin.space has valid certs again, ... fix this.
+ let tls = TlsParametersBuilder::new(mailserver.to_string())
+ .dangerous_accept_invalid_certs(true)
+ .build()
+ .unwrap();
+
+ let mut mailer = SmtpConnection::connect(
+ mailserver,
+ Some(Duration::from_millis(5000)),
+ &ClientId::Domain("ci.butactuallyin.space".to_string()),
+ None,
+ None,
+ ).unwrap();
+
+ mailer.starttls(
+ &tls,
+ &ClientId::Domain("ci.butactuallyin.space".to_string()),
+ ).unwrap();
+
+ let resp = mailer.auth(
+ &[Mechanism::Plain, Mechanism::Login],
+ &Credentials::new(username.to_owned(), password.to_owned())
+ ).unwrap();
+ assert!(resp.is_positive());
+
+ let email = Message::builder()
+ .from(from.parse().unwrap())
+ .to(to.parse().unwrap())
+ .subject(&subject)
+ .body(body)
+ .unwrap();
+
+ match mailer.send(email.envelope(), &email.formatted()) {
+ Ok(_) => {
+ eprintln!("[+] notified {}@{}", username, mailserver);
+ Ok(())
+ }
+ Err(e) => {
+ eprintln!("[-] could not send email: {:?}", e);
+ Err(e.to_string())
+ }
+ }
+ }
+ }
+ }
+}