diff options
Diffstat (limited to 'ci-lib-native')
| -rw-r--r-- | ci-lib-native/Cargo.toml | 20 | ||||
| -rw-r--r-- | ci-lib-native/src/dbctx_ext.rs | 62 | ||||
| -rw-r--r-- | ci-lib-native/src/io.rs | 174 | ||||
| -rw-r--r-- | ci-lib-native/src/lib.rs | 3 | ||||
| -rw-r--r-- | ci-lib-native/src/notifier.rs | 181 | 
5 files changed, 440 insertions, 0 deletions
| diff --git a/ci-lib-native/Cargo.toml b/ci-lib-native/Cargo.toml new file mode 100644 index 0000000..7a4e665 --- /dev/null +++ b/ci-lib-native/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "ci-lib-native" +version = "0.0.1" +authors = [ "iximeow <me@iximeow.net>" ] +license = "0BSD" +edition = "2021" +description = "shared code across the ci project that is applicable for native targets (uses tokio, etc)" + +[lib] + +[dependencies] +ci-lib-core = { path = "../ci-lib-core" } +tokio = { version = "*", features = ["full"] } +futures-util = "*" +axum = "*" +hyper = "*" +serde_json = "*" +serde = { version = "*", features = ["derive"] } +lettre = "*" +reqwest = "*" diff --git a/ci-lib-native/src/dbctx_ext.rs b/ci-lib-native/src/dbctx_ext.rs new file mode 100644 index 0000000..44436fc --- /dev/null +++ b/ci-lib-native/src/dbctx_ext.rs @@ -0,0 +1,62 @@ +use crate::io::ArtifactDescriptor; +use crate::notifier::{RemoteNotifier, NotifierConfig}; +use tokio::fs::{File, OpenOptions}; + +use ci_lib_core::dbctx::DbCtx; + +pub fn notifiers_by_repo(ctx: &DbCtx, repo_id: u64) -> Result<Vec<RemoteNotifier>, String> { +    let remotes = ctx.remotes_by_repo(repo_id)?; + +    let mut notifiers: Vec<RemoteNotifier> = Vec::new(); + +    for remote in remotes.into_iter() { +        match remote.remote_api.as_str() { +            "github" => { +                let mut notifier_path = ctx.config_path.clone(); +                notifier_path.push(&remote.notifier_config_path); + +                let notifier = RemoteNotifier { +                    remote_path: remote.remote_path, +                    notifier: NotifierConfig::github_from_file(¬ifier_path) +                        .expect("can load notifier config") +                }; +                notifiers.push(notifier); +            }, +            "email" => { +                let mut notifier_path = ctx.config_path.clone(); +                notifier_path.push(&remote.notifier_config_path); + +                let notifier = RemoteNotifier { +                    remote_path: remote.remote_path, +                    notifier: NotifierConfig::email_from_file(¬ifier_path) +                        .expect("can load notifier config") +                }; +                notifiers.push(notifier); +            } +            other => { +                eprintln!("unknown remote api kind: {:?}, remote is {:?}", other, &remote) +            } +        } +    } + +    Ok(notifiers) +} + +pub async fn reserve_artifact(ctx: &DbCtx, run_id: u64, name: &str, desc: &str) -> Result<ArtifactDescriptor, String> { +    let artifact_id = { +        let created_time = ci_lib_core::now_ms(); +        let conn = ctx.conn.lock().unwrap(); +        conn +            .execute( +                "insert into artifacts (run_id, name, desc, created_time) values (?1, ?2, ?3, ?4)", +                (run_id, name, desc, created_time) +            ) +            .map_err(|e| { +                format!("{:?}", e) +            })?; + +        conn.last_insert_rowid() as u64 +    }; + +    ArtifactDescriptor::new(run_id, artifact_id).await +} diff --git a/ci-lib-native/src/io.rs b/ci-lib-native/src/io.rs new file mode 100644 index 0000000..d41349c --- /dev/null +++ b/ci-lib-native/src/io.rs @@ -0,0 +1,174 @@ +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use futures_util::StreamExt; +use tokio::fs::File; +use std::io::Write; +use tokio::fs::OpenOptions; +use std::task::{Poll, Context}; +use std::pin::Pin; +use std::time::{UNIX_EPOCH, SystemTime}; +use std::sync::{Arc, Mutex}; + +#[derive(Clone)] +pub struct VecSink { +    body: Arc<Mutex<Vec<u8>>>, +} + +impl VecSink { +    pub fn new() -> Self { +        Self { body: Arc::new(Mutex::new(Vec::new())) } +    } + +    pub fn take_buf(&self) -> Vec<u8> { +        std::mem::replace(&mut *self.body.lock().unwrap(), Vec::new()) +    } +} + +impl tokio::io::AsyncWrite for VecSink { +    fn poll_write( +        self: Pin<&mut Self>, +        cx: &mut Context, +        buf: &[u8] +    ) -> Poll<Result<usize, std::io::Error>> { +        self.body.lock().unwrap().extend_from_slice(buf); +        Poll::Ready(Ok(buf.len())) +    } + +    fn poll_flush( +        self: Pin<&mut Self>, +        _cx: &mut Context +    ) -> Poll<Result<(), std::io::Error>> { +        Poll::Ready(Ok(())) +    } + +    fn poll_shutdown( +        self: Pin<&mut Self>, +        _cx: &mut Context +    ) -> Poll<Result<(), std::io::Error>> { +        Poll::Ready(Ok(())) +    } +} + +pub struct ArtifactStream { +    sender: hyper::body::Sender, +} + +impl ArtifactStream { +    pub fn new(sender: hyper::body::Sender) -> Self { +        Self { sender } +    } +} + +impl tokio::io::AsyncWrite for ArtifactStream { +    fn poll_write( +        self: Pin<&mut Self>, +        cx: &mut Context, +        buf: &[u8] +    ) -> Poll<Result<usize, std::io::Error>> { +        match self.get_mut().sender.try_send_data(buf.to_vec().into()) { +            Ok(()) => { +                Poll::Ready(Ok(buf.len())) +            }, +            _ => { +                cx.waker().wake_by_ref(); +                Poll::Pending +            } +        } +    } + +    fn poll_flush( +        self: Pin<&mut Self>, +        _cx: &mut Context +    ) -> Poll<Result<(), std::io::Error>> { +        Poll::Ready(Ok(())) +    } + +    fn poll_shutdown( +        self: Pin<&mut Self>, +        _cx: &mut Context +    ) -> Poll<Result<(), std::io::Error>> { +        Poll::Ready(Ok(())) +    } +} + + +pub struct ArtifactDescriptor { +    job_id: u64, +    pub artifact_id: u64, +    file: File, +} + +impl ArtifactDescriptor { +    pub async fn new(job_id: u64, artifact_id: u64) -> Result<Self, String> { +        // TODO: jobs should be a configurable path +        let path = format!("artifacts/{}/{}", job_id, artifact_id); +        let file = OpenOptions::new() +            .read(true) +            .write(true) +            .create_new(true) +            .open(&path) +            .await +            .map_err(|e| format!("couldn't open artifact file {}: {}", path, e))?; + +        Ok(ArtifactDescriptor { +            job_id, +            artifact_id, +            file, +        }) +    } + +    pub async fn store_all(&mut self, mut data: axum::extract::BodyStream) -> Result<(), String> { +        loop { +            let chunk = data.next().await; + +            let chunk = match chunk { +                Some(Ok(chunk)) => chunk, +                Some(Err(e)) => { +                    eprintln!("error: {:?}", e); +                    return Err(format!("error reading: {:?}", e)); +                } +                None => { +                    eprintln!("body done?"); +                    return Ok(()); +                } +            }; + +            let chunk = chunk.as_ref(); + +            self.file.write_all(chunk).await +                .map_err(|e| format!("failed to write: {:?}", e))?; +        } +    } +} + +pub async fn forward_data(source: &mut (impl AsyncRead + Unpin), dest: &mut (impl AsyncWrite + Unpin)) -> Result<(), String> { +    let mut buf = vec![0; 1024 * 1024]; +    loop { +        let n_read = source.read(&mut buf).await +            .map_err(|e| format!("failed to read: {:?}", e))?; + +        if n_read == 0 { +            eprintln!("done reading!"); +            return Ok(()); +        } + +        dest.write_all(&buf[..n_read]).await +            .map_err(|e| format!("failed to write: {:?}", e))?; +    } +} +/* +pub async fn forward_data(source: &mut (impl AsyncRead + Unpin), dest: &mut ArtifactStream) -> Result<(), String> { +    let mut buf = vec![0; 1024 * 1024]; +    loop { +        let n_read = source.read(&mut buf).await +            .map_err(|e| format!("failed to read: {:?}", e))?; + +        if n_read == 0 { +            eprintln!("done reading!"); +            return Ok(()); +        } + +        dest.sender.send_data(buf[..n_read].to_vec().into()).await +            .map_err(|e| format!("failed to write: {:?}", e))?; +    } +} +*/ diff --git a/ci-lib-native/src/lib.rs b/ci-lib-native/src/lib.rs new file mode 100644 index 0000000..74cb710 --- /dev/null +++ b/ci-lib-native/src/lib.rs @@ -0,0 +1,3 @@ +pub mod io; +pub mod dbctx_ext; +pub mod notifier; diff --git a/ci-lib-native/src/notifier.rs b/ci-lib-native/src/notifier.rs new file mode 100644 index 0000000..dd4a35c --- /dev/null +++ b/ci-lib-native/src/notifier.rs @@ -0,0 +1,181 @@ +use serde::{Deserialize, Serialize}; +use std::sync::Arc; +use axum::http::StatusCode; +use lettre::transport::smtp::authentication::{Credentials, Mechanism}; +use lettre::{Message, Transport}; +use lettre::transport::smtp::extension::ClientId; +use lettre::transport::smtp::client::{SmtpConnection, TlsParametersBuilder}; +use std::time::Duration; +use std::path::Path; + +use ci_lib_core::dbctx::DbCtx; + +pub struct RemoteNotifier { +    pub remote_path: String, +    pub notifier: NotifierConfig, +} + +#[derive(Serialize, Deserialize)] +#[serde(untagged)] +pub enum NotifierConfig { +    GitHub { +        token: String, +    }, +    Email { +        username: String, +        password: String, +        mailserver: String, +        from: String, +        to: String, +    } +} + +impl NotifierConfig { +    pub fn github_from_file<P: AsRef<Path>>(path: P) -> Result<Self, String> { +        let path = path.as_ref(); +        let bytes = std::fs::read(path) +            .map_err(|e| format!("can't read notifier config at {}: {:?}", path.display(), e))?; +        let config = serde_json::from_slice(&bytes) +            .map_err(|e| format!("can't deserialize notifier config at {}: {:?}", path.display(), e))?; + +        if matches!(config, NotifierConfig::GitHub { .. }) { +            Ok(config) +        } else { +            Err(format!("config at {} doesn't look like a github config (but was otherwise valid?)", path.display())) +        } +    } + +    pub fn email_from_file<P: AsRef<Path>>(path: P) -> Result<Self, String> { +        let path = path.as_ref(); +        let bytes = std::fs::read(path) +            .map_err(|e| format!("can't read notifier config at {}: {:?}", path.display(), e))?; +        let config = serde_json::from_slice(&bytes) +            .map_err(|e| format!("can't deserialize notifier config at {}: {:?}", path.display(), e))?; + +        if matches!(config, NotifierConfig::Email { .. }) { +            Ok(config) +        } else { +            Err(format!("config at {} doesn't look like an email config (but was otherwise valid?)", path.display())) +        } +    } +} + +impl RemoteNotifier { +    pub async fn tell_pending_job(&self, ctx: &Arc<DbCtx>, repo_id: u64, sha: &str, job_id: u64) -> Result<(), String> { +        self.tell_job_status( +            ctx, +            repo_id, sha, job_id, +            "pending", "build is queued", &format!("https://{}/{}/{}", "ci.butactuallyin.space", &self.remote_path, sha) +        ).await +    } + +    pub async fn tell_complete_job(&self, ctx: &Arc<DbCtx>, repo_id: u64, sha: &str, job_id: u64, desc: Result<String, String>) -> Result<(), String> { +        match desc { +            Ok(status) => { +                self.tell_job_status( +                    ctx, +                    repo_id, sha, job_id, +                    "success", &status, &format!("https://{}/{}/{}", "ci.butactuallyin.space", &self.remote_path, sha) +                ).await +            }, +            Err(status) => { +                self.tell_job_status( +                    ctx, +                    repo_id, sha, job_id, +                    "failure", &status, &format!("https://{}/{}/{}", "ci.butactuallyin.space", &self.remote_path, sha) +                ).await +            } +        } +    } + +    pub async fn tell_job_status(&self, ctx: &Arc<DbCtx>, repo_id: u64, sha: &str, job_id: u64, state: &str, desc: &str, target_url: &str) -> Result<(), String> { +        match &self.notifier { +            NotifierConfig::GitHub { token } => { +                let status_info = serde_json::json!({ +                    "state": state, +                    "description": desc, +                    "target_url": target_url, +                    "context": "actuallyinspace runner", +                }); + +                // TODO: should pool (probably in ctx?) to have an upper bound in concurrent +                // connections. +                let client = reqwest::Client::new(); +                let req = client.post(&format!("https://api.github.com/repos/{}/statuses/{}", &self.remote_path, sha)) +                    .body(serde_json::to_string(&status_info).expect("can stringify json")) +                    .header("content-type", "application/json") +                    .header("user-agent", "iximeow") +                    .header("authorization", format!("Bearer {}", token)) +                    .header("accept", "application/vnd.github+json"); +                eprintln!("sending {:?}", req); +                eprintln!("  body: {}", serde_json::to_string(&status_info).expect("can stringify json")); +                let res = req +                    .send() +                    .await; + +                match res { +                    Ok(res) => { +                        if res.status() == StatusCode::OK || res.status() == StatusCode::CREATED{ +                            Ok(()) +                        } else { +                            Err(format!("bad response: {}, response data: {:?}", res.status().as_u16(), res)) +                        } +                    } +                    Err(e) => { +                        Err(format!("failure sending request: {:?}", e)) +                    } +                } +            } +            NotifierConfig::Email { username, password, mailserver, from, to } => { +                eprintln!("[.] emailing {} for job {} via {}", state, &self.remote_path, mailserver); + +                let subject = format!("{}: job for {}", state, &self.remote_path); + +                let body = format!("{}", subject); + +                // TODO: when ci.butactuallyin.space has valid certs again, ... fix this. +                let tls = TlsParametersBuilder::new(mailserver.to_string()) +                    .dangerous_accept_invalid_certs(true) +                    .build() +                    .unwrap(); + +                let mut mailer = SmtpConnection::connect( +                    mailserver, +                    Some(Duration::from_millis(5000)), +                    &ClientId::Domain("ci.butactuallyin.space".to_string()), +                    None, +                    None, +                ).unwrap(); + +                mailer.starttls( +                    &tls, +                    &ClientId::Domain("ci.butactuallyin.space".to_string()), +                ).unwrap(); + +                let resp = mailer.auth( +                    &[Mechanism::Plain, Mechanism::Login], +                    &Credentials::new(username.to_owned(), password.to_owned()) +                ).unwrap(); +                assert!(resp.is_positive()); + +                let email = Message::builder() +                    .from(from.parse().unwrap()) +                    .to(to.parse().unwrap()) +                    .subject(&subject) +                    .body(body) +                    .unwrap(); + +                match mailer.send(email.envelope(), &email.formatted()) { +                    Ok(_) => { +                        eprintln!("[+] notified {}@{}", username, mailserver); +                        Ok(()) +                    } +                    Err(e) => { +                        eprintln!("[-] could not send email: {:?}", e); +                        Err(e.to_string()) +                    } +                } +            } +        } +    } +} | 
