From cb418168d9375d4ea6c9d21ee6b97e02a575fb4b Mon Sep 17 00:00:00 2001 From: iximeow Date: Sun, 25 Dec 2022 08:17:28 +0000 Subject: support uploading artifacts during builds --- src/dbctx.rs | 114 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 114 insertions(+) (limited to 'src/dbctx.rs') diff --git a/src/dbctx.rs b/src/dbctx.rs index 3af2d56..d025b8d 100644 --- a/src/dbctx.rs +++ b/src/dbctx.rs @@ -1,10 +1,15 @@ use std::sync::Mutex; +use futures_util::StreamExt; use rusqlite::{Connection, OptionalExtension}; use std::time::{SystemTime, UNIX_EPOCH}; +use tokio::fs::{File, OpenOptions}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; use crate::notifier::{RemoteNotifier, NotifierConfig}; use crate::sql; +const TOKEN_EXPIRY_MS: u64 = 1000 * 60 * 30; + pub struct DbCtx { pub config_path: String, // don't love this but.. for now... @@ -22,6 +27,61 @@ pub struct PendingJob { pub created_time: u64, } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum TokenValidity { + Expired, + Invalid, + Valid, +} + +pub struct ArtifactDescriptor { + job_id: u64, + artifact_id: u64, + file: File, +} + +impl ArtifactDescriptor { + async fn new(job_id: u64, artifact_id: u64) -> Result { + // TODO: jobs should be a configurable path + let path = format!("jobs/{}/{}", job_id, artifact_id); + let file = OpenOptions::new() + .read(true) + .write(true) + .create_new(true) + .open(&path) + .await + .map_err(|e| format!("couldn't open artifact file {}: {}", path, e))?; + + Ok(ArtifactDescriptor { + job_id, + artifact_id, + file, + }) + } + + pub async fn store_all(&mut self, mut data: axum::extract::BodyStream) -> Result<(), String> { + loop { + let chunk = data.next().await; + + let chunk = match chunk { + Some(Ok(chunk)) => chunk, + Some(Err(e)) => { + return Err(format!("error reading: {:?}", e)); + } + None => { + eprintln!("body done?"); + return Ok(()); + } + }; + + let chunk = chunk.as_ref(); + + self.file.write_all(chunk).await + .map_err(|e| format!("failed to write: {:?}", e))?; + } + } +} + impl DbCtx { pub fn new(config_path: &str, db_path: &str) -> Self { DbCtx { @@ -32,6 +92,7 @@ impl DbCtx { pub fn create_tables(&self) -> Result<(), ()> { let conn = self.conn.lock().unwrap(); + conn.execute(sql::CREATE_ARTIFACTS_TABLE, ()).unwrap(); conn.execute(sql::CREATE_JOBS_TABLE, ()).unwrap(); conn.execute(sql::CREATE_COMMITS_TABLE, ()).unwrap(); conn.execute(sql::CREATE_REPOS_TABLE, ()).unwrap(); @@ -68,6 +129,56 @@ impl DbCtx { Ok(conn.last_insert_rowid() as u64) } + pub async fn reserve_artifact(&self, job_id: u64, name: &str, desc: &str) -> Result { + let artifact_id = { + let conn = self.conn.lock().unwrap(); + conn + .execute( + "insert into artifacts (job_id, name, desc) values (?1, ?2, ?3)", + (job_id, name, desc) + ) + .map_err(|e| { + format!("{:?}", e) + })?; + + conn.last_insert_rowid() as u64 + }; + + ArtifactDescriptor::new(job_id, artifact_id).await + } + + pub fn job_for_token(&self, token: &str) -> Result, TokenValidity)>, String> { + self.conn.lock() + .unwrap() + .query_row( + "select id, artifacts_path, started_time, job_timeout from jobs where build_token=?1", + [token], + |row| { + let timeout: Option = row.get(3).unwrap(); + let timeout = timeout.unwrap_or(TOKEN_EXPIRY_MS); + + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("now is before epoch") + .as_millis(); + + let time: Option = row.get(2).unwrap(); + let validity = if let Some(time) = time { + if now > time as u128 + timeout as u128 { + TokenValidity::Expired + } else { + TokenValidity::Valid + } + } else { + TokenValidity::Invalid + }; + Ok((row.get(0).unwrap(), row.get(1).unwrap(), validity)) + } + ) + .optional() + .map_err(|e| e.to_string()) + } + pub fn repo_id_by_remote(&self, remote_id: u64) -> Result, String> { self.conn.lock() .unwrap() @@ -89,6 +200,9 @@ impl DbCtx { "github" => { (remote.to_owned(), remote_kind.to_owned(), format!("https://www.github.com/{}", remote), format!("https://www.github.com/{}.git", remote)) }, + "github-email" => { + (remote.to_owned(), "email".to_owned(), format!("https://www.github.com/{}", remote), format!("http://www.github.com/{}.git", remote)) + }, other => { panic!("unsupported remote kind: {}", other); } -- cgit v1.1