summaryrefslogtreecommitdiff
path: root/src/dbctx.rs
diff options
context:
space:
mode:
authoriximeow <git@iximeow.net>2022-12-25 08:17:28 +0000
committeriximeow <git@iximeow.net>2022-12-25 08:17:28 +0000
commitcb418168d9375d4ea6c9d21ee6b97e02a575fb4b (patch)
tree8d96ef3354840b25401817611159ad68ef554f7c /src/dbctx.rs
parent4a213e872395f9b0562c113bb7303815a1d26a57 (diff)
support uploading artifacts during builds
Diffstat (limited to 'src/dbctx.rs')
-rw-r--r--src/dbctx.rs114
1 files changed, 114 insertions, 0 deletions
diff --git a/src/dbctx.rs b/src/dbctx.rs
index 3af2d56..d025b8d 100644
--- a/src/dbctx.rs
+++ b/src/dbctx.rs
@@ -1,10 +1,15 @@
use std::sync::Mutex;
+use futures_util::StreamExt;
use rusqlite::{Connection, OptionalExtension};
use std::time::{SystemTime, UNIX_EPOCH};
+use tokio::fs::{File, OpenOptions};
+use tokio::io::{AsyncReadExt, AsyncWriteExt};
use crate::notifier::{RemoteNotifier, NotifierConfig};
use crate::sql;
+const TOKEN_EXPIRY_MS: u64 = 1000 * 60 * 30;
+
pub struct DbCtx {
pub config_path: String,
// don't love this but.. for now...
@@ -22,6 +27,61 @@ pub struct PendingJob {
pub created_time: u64,
}
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum TokenValidity {
+ Expired,
+ Invalid,
+ Valid,
+}
+
+pub struct ArtifactDescriptor {
+ job_id: u64,
+ artifact_id: u64,
+ file: File,
+}
+
+impl ArtifactDescriptor {
+ async fn new(job_id: u64, artifact_id: u64) -> Result<Self, String> {
+ // TODO: jobs should be a configurable path
+ let path = format!("jobs/{}/{}", job_id, artifact_id);
+ let file = OpenOptions::new()
+ .read(true)
+ .write(true)
+ .create_new(true)
+ .open(&path)
+ .await
+ .map_err(|e| format!("couldn't open artifact file {}: {}", path, e))?;
+
+ Ok(ArtifactDescriptor {
+ job_id,
+ artifact_id,
+ file,
+ })
+ }
+
+ pub async fn store_all(&mut self, mut data: axum::extract::BodyStream) -> Result<(), String> {
+ loop {
+ let chunk = data.next().await;
+
+ let chunk = match chunk {
+ Some(Ok(chunk)) => chunk,
+ Some(Err(e)) => {
+ return Err(format!("error reading: {:?}", e));
+ }
+ None => {
+ eprintln!("body done?");
+ return Ok(());
+ }
+ };
+
+ let chunk = chunk.as_ref();
+
+ self.file.write_all(chunk).await
+ .map_err(|e| format!("failed to write: {:?}", e))?;
+ }
+ }
+}
+
impl DbCtx {
pub fn new(config_path: &str, db_path: &str) -> Self {
DbCtx {
@@ -32,6 +92,7 @@ impl DbCtx {
pub fn create_tables(&self) -> Result<(), ()> {
let conn = self.conn.lock().unwrap();
+ conn.execute(sql::CREATE_ARTIFACTS_TABLE, ()).unwrap();
conn.execute(sql::CREATE_JOBS_TABLE, ()).unwrap();
conn.execute(sql::CREATE_COMMITS_TABLE, ()).unwrap();
conn.execute(sql::CREATE_REPOS_TABLE, ()).unwrap();
@@ -68,6 +129,56 @@ impl DbCtx {
Ok(conn.last_insert_rowid() as u64)
}
+ pub async fn reserve_artifact(&self, job_id: u64, name: &str, desc: &str) -> Result<ArtifactDescriptor, String> {
+ let artifact_id = {
+ let conn = self.conn.lock().unwrap();
+ conn
+ .execute(
+ "insert into artifacts (job_id, name, desc) values (?1, ?2, ?3)",
+ (job_id, name, desc)
+ )
+ .map_err(|e| {
+ format!("{:?}", e)
+ })?;
+
+ conn.last_insert_rowid() as u64
+ };
+
+ ArtifactDescriptor::new(job_id, artifact_id).await
+ }
+
+ pub fn job_for_token(&self, token: &str) -> Result<Option<(u64, Option<String>, TokenValidity)>, String> {
+ self.conn.lock()
+ .unwrap()
+ .query_row(
+ "select id, artifacts_path, started_time, job_timeout from jobs where build_token=?1",
+ [token],
+ |row| {
+ let timeout: Option<u64> = row.get(3).unwrap();
+ let timeout = timeout.unwrap_or(TOKEN_EXPIRY_MS);
+
+ let now = SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .expect("now is before epoch")
+ .as_millis();
+
+ let time: Option<u64> = row.get(2).unwrap();
+ let validity = if let Some(time) = time {
+ if now > time as u128 + timeout as u128 {
+ TokenValidity::Expired
+ } else {
+ TokenValidity::Valid
+ }
+ } else {
+ TokenValidity::Invalid
+ };
+ Ok((row.get(0).unwrap(), row.get(1).unwrap(), validity))
+ }
+ )
+ .optional()
+ .map_err(|e| e.to_string())
+ }
+
pub fn repo_id_by_remote(&self, remote_id: u64) -> Result<Option<u64>, String> {
self.conn.lock()
.unwrap()
@@ -89,6 +200,9 @@ impl DbCtx {
"github" => {
(remote.to_owned(), remote_kind.to_owned(), format!("https://www.github.com/{}", remote), format!("https://www.github.com/{}.git", remote))
},
+ "github-email" => {
+ (remote.to_owned(), "email".to_owned(), format!("https://www.github.com/{}", remote), format!("http://www.github.com/{}.git", remote))
+ },
other => {
panic!("unsupported remote kind: {}", other);
}