summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock21
-rw-r--r--Cargo.toml1
-rw-r--r--src/ci_driver.rs6
-rw-r--r--src/dbctx.rs43
-rw-r--r--src/io.rs10
-rw-r--r--src/lua/mod.rs11
-rw-r--r--src/main.rs129
-rw-r--r--src/sql.rs7
8 files changed, 183 insertions, 45 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 7eaa568..fdb63dc 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -164,6 +164,26 @@ dependencies = [
]
[[package]]
+name = "axum-extra"
+version = "0.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "51227033e4d3acad15c879092ac8a228532707b5db5ff2628f638334f63e1b7a"
+dependencies = [
+ "axum",
+ "bytes",
+ "futures-util",
+ "http",
+ "mime",
+ "pin-project-lite",
+ "tokio",
+ "tokio-util",
+ "tower",
+ "tower-http",
+ "tower-layer",
+ "tower-service",
+]
+
+[[package]]
name = "axum-macros"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1193,6 +1213,7 @@ name = "ixi-ci-server"
version = "0.0.1"
dependencies = [
"axum",
+ "axum-extra",
"axum-macros",
"axum-server",
"base64",
diff --git a/Cargo.toml b/Cargo.toml
index 03e5f77..7d1a602 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -8,6 +8,7 @@ edition = "2021"
[dependencies]
lazy_static = "*"
axum = { version = "*" }
+axum-extra = { version = "*", features = ["async-read-body"] }
axum-server = { version = "*", features = ["tls-rustls"] }
handlebars = "*"
libc = "*"
diff --git a/src/ci_driver.rs b/src/ci_driver.rs
index 29a699c..c4a791d 100644
--- a/src/ci_driver.rs
+++ b/src/ci_driver.rs
@@ -414,7 +414,11 @@ async fn handle_artifact(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClien
};
eprintln!("spawning task...");
- spawn(async move { artifact.store_all(artifact_content).await.unwrap() });
+ let dbctx_ref = Arc::clone(&ctx.0);
+ spawn(async move {
+ artifact.store_all(artifact_content).await.unwrap();
+ dbctx_ref.finalize_artifact(artifact.artifact_id).await.unwrap();
+ });
eprintln!("done?");
(StatusCode::OK, "").into_response()
diff --git a/src/dbctx.rs b/src/dbctx.rs
index f545158..926f717 100644
--- a/src/dbctx.rs
+++ b/src/dbctx.rs
@@ -86,7 +86,9 @@ pub struct ArtifactRecord {
pub id: u64,
pub job_id: u64,
pub name: String,
- pub desc: String
+ pub desc: String,
+ pub created_time: u64,
+ pub completed_time: Option<u64>,
}
impl DbCtx {
@@ -148,13 +150,30 @@ impl DbCtx {
Ok(conn.last_insert_rowid() as u64)
}
+ pub async fn finalize_artifact(&self, artifact_id: u64) -> Result<(), String> {
+ let conn = self.conn.lock().unwrap();
+ conn
+ .execute(
+ "update artifacts set completed_time=?1 where id=?2",
+ (crate::io::now_ms(), artifact_id)
+ )
+ .map(|_| ())
+ .map_err(|e| {
+ format!("{:?}", e)
+ })
+ }
+
pub async fn reserve_artifact(&self, job_id: u64, name: &str, desc: &str) -> Result<ArtifactDescriptor, String> {
let artifact_id = {
+ let created_time = SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .expect("now is before epoch")
+ .as_millis() as u64;
let conn = self.conn.lock().unwrap();
conn
.execute(
- "insert into artifacts (job_id, name, desc) values (?1, ?2, ?3)",
- (job_id, name, desc)
+ "insert into artifacts (job_id, name, desc, created_time) values (?1, ?2, ?3, ?4)",
+ (job_id, name, desc, created_time)
)
.map_err(|e| {
format!("{:?}", e)
@@ -166,6 +185,20 @@ impl DbCtx {
ArtifactDescriptor::new(job_id, artifact_id).await
}
+ pub fn lookup_artifact(&self, job_id: u64, artifact_id: u64) -> Result<Option<ArtifactRecord>, String> {
+ let conn = self.conn.lock().unwrap();
+ conn
+ .query_row(sql::ARTIFACT_BY_ID, [artifact_id, job_id], |row| {
+ let (id, job_id, name, desc, created_time, completed_time) = row.try_into().unwrap();
+
+ Ok(ArtifactRecord {
+ id, job_id, name, desc, created_time, completed_time
+ })
+ })
+ .optional()
+ .map_err(|e| e.to_string())
+ }
+
pub fn commit_sha(&self, commit_id: u64) -> Result<String, String> {
self.conn.lock()
.unwrap()
@@ -321,8 +354,8 @@ impl DbCtx {
let mut artifacts = Vec::new();
while let Some(row) = result.next().unwrap() {
- let (id, job_id, name, desc): (u64, u64, String, String) = row.try_into().unwrap();
- artifacts.push(ArtifactRecord { id, job_id, name, desc });
+ let (id, job_id, name, desc, created_time, completed_time): (u64, u64, String, String, u64, Option<u64>) = row.try_into().unwrap();
+ artifacts.push(ArtifactRecord { id, job_id, name, desc, created_time, completed_time });
}
Ok(artifacts)
diff --git a/src/io.rs b/src/io.rs
index 219edbf..6e71282 100644
--- a/src/io.rs
+++ b/src/io.rs
@@ -5,6 +5,14 @@ use std::io::Write;
use tokio::fs::OpenOptions;
use std::task::{Poll, Context};
use std::pin::Pin;
+use std::time::{UNIX_EPOCH, SystemTime};
+
+pub fn now_ms() -> u64 {
+ SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .expect("now is later than epoch")
+ .as_millis() as u64
+}
pub struct ArtifactStream {
sender: hyper::body::Sender,
@@ -51,7 +59,7 @@ impl tokio::io::AsyncWrite for ArtifactStream {
pub struct ArtifactDescriptor {
job_id: u64,
- artifact_id: u64,
+ pub artifact_id: u64,
file: File,
}
diff --git a/src/lua/mod.rs b/src/lua/mod.rs
index 662b34c..53473cf 100644
--- a/src/lua/mod.rs
+++ b/src/lua/mod.rs
@@ -4,7 +4,6 @@ use rlua::prelude::*;
use std::sync::{Arc, Mutex};
use std::path::PathBuf;
-use std::time::{UNIX_EPOCH, SystemTime};
pub const DEFAULT_RUST_GOODFILE: &'static [u8] = include_bytes!("../../config/goodfiles/rust.lua");
@@ -18,7 +17,6 @@ mod lua_exports {
use std::sync::{Arc, Mutex};
use std::path::PathBuf;
- use std::time::{UNIX_EPOCH, SystemTime};
use rlua::prelude::*;
@@ -166,13 +164,6 @@ mod lua_exports {
})
}
- pub fn now_ms() -> u64 {
- SystemTime::now()
- .duration_since(UNIX_EPOCH)
- .expect("now is later than epoch")
- .as_millis() as u64
- }
-
pub fn has_cmd(name: &str) -> Result<bool, rlua::Error> {
Ok(std::process::Command::new("which")
.arg(name)
@@ -239,7 +230,7 @@ impl BuildEnv {
lua_exports::metric(name, value, job_ref)
})?;
- let now_ms = decl_env.create_function("now_ms", move |_, job_ref, ()| Ok(lua_exports::now_ms()))?;
+ let now_ms = decl_env.create_function("now_ms", move |_, job_ref, ()| Ok(crate::io::now_ms()))?;
let artifact = decl_env.create_function("artifact", move |_, job_ref, (path, name): (String, Option<String>)| {
lua_exports::artifact(path, name, job_ref)
diff --git a/src/main.rs b/src/main.rs
index ab6b62c..c1d9664 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -20,6 +20,9 @@ use axum::extract::rejection::JsonRejection;
use axum::body::Bytes;
use axum::http::{StatusCode, Uri};
use http::header::HeaderMap;
+use tokio::sync::mpsc;
+use tokio_stream::wrappers::ReceiverStream;
+use axum::body::StreamBody;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
@@ -43,12 +46,19 @@ struct WebserverConfig {
psks: Vec<GithubPsk>,
cert_path: PathBuf,
key_path: PathBuf,
+ jobs_path: PathBuf,
config_path: PathBuf,
db_path: PathBuf,
debug_addr: Option<String>,
server_addr: Option<String>,
}
+#[derive(Clone)]
+struct WebserverState {
+ jobs_path: PathBuf,
+ dbctx: Arc<DbCtx>,
+}
+
#[derive(Clone, Serialize, Deserialize)]
struct GithubPsk {
key: String,
@@ -253,9 +263,9 @@ async fn handle_github_event(ctx: Arc<DbCtx>, owner: String, repo: String, event
}
}
-async fn handle_ci_index(State(ctx): State<Arc<DbCtx>>) -> impl IntoResponse {
+async fn handle_ci_index(State(ctx): State<WebserverState>) -> impl IntoResponse {
eprintln!("root index");
- let repos = match ctx.get_repos() {
+ let repos = match ctx.dbctx.get_repos() {
Ok(repos) => repos,
Err(e) => {
eprintln!("failed to get repos: {:?}", e);
@@ -293,8 +303,8 @@ async fn handle_ci_index(State(ctx): State<Arc<DbCtx>>) -> impl IntoResponse {
for repo in repos {
let mut most_recent_job: Option<Job> = None;
- for remote in ctx.remotes_by_repo(repo.id).expect("remotes by repo works") {
- let last_job = ctx.last_job_from_remote(remote.id).expect("job by remote works");
+ for remote in ctx.dbctx.remotes_by_repo(repo.id).expect("remotes by repo works") {
+ let last_job = ctx.dbctx.last_job_from_remote(remote.id).expect("job by remote works");
if let Some(last_job) = last_job {
if most_recent_job.as_ref().map(|job| job.created_time < last_job.created_time).unwrap_or(true) {
most_recent_job = Some(last_job);
@@ -306,13 +316,13 @@ async fn handle_ci_index(State(ctx): State<Arc<DbCtx>>) -> impl IntoResponse {
let row_html: String = match most_recent_job {
Some(job) => {
- let job_commit = ctx.commit_sha(job.commit_id).expect("job has a commit");
- let commit_html = match commit_url(&job, &job_commit, &ctx) {
+ let job_commit = ctx.dbctx.commit_sha(job.commit_id).expect("job has a commit");
+ let commit_html = match commit_url(&job, &job_commit, &ctx.dbctx) {
Some(url) => format!("<a href=\"{}\">{}</a>", url, &job_commit),
None => job_commit.clone()
};
- let job_html = format!("<a href=\"{}\">{}</a>", job_url(&job, &job_commit, &ctx), job.id);
+ let job_html = format!("<a href=\"{}\">{}</a>", job_url(&job, &job_commit, &ctx.dbctx), job.id);
let last_build_time = Utc.timestamp_millis_opt(job.created_time as i64).unwrap().to_rfc2822();
let duration = if let Some(start_time) = job.start_time {
@@ -391,13 +401,13 @@ async fn handle_ci_index(State(ctx): State<Arc<DbCtx>>) -> impl IntoResponse {
(StatusCode::OK, Html(response))
}
-async fn handle_commit_status(Path(path): Path<(String, String, String)>, State(ctx): State<Arc<DbCtx>>) -> impl IntoResponse {
+async fn handle_commit_status(Path(path): Path<(String, String, String)>, State(ctx): State<WebserverState>) -> impl IntoResponse {
eprintln!("path: {}/{}, sha {}", path.0, path.1, path.2);
let remote_path = format!("{}/{}", path.0, path.1);
let sha = path.2;
let (commit_id, sha): (u64, String) = if sha.len() >= 7 {
- match ctx.conn.lock().unwrap()
+ match ctx.dbctx.conn.lock().unwrap()
.query_row("select id, sha from commits where sha like ?1;", [&format!("{}%", sha)], |row| Ok((row.get_unwrap(0), row.get_unwrap(1))))
.optional()
.expect("can query") {
@@ -410,17 +420,17 @@ async fn handle_commit_status(Path(path): Path<(String, String, String)>, State(
return (StatusCode::NOT_FOUND, Html("<html><body>no such commit</body></html>".to_string()));
};
- let (remote_id, repo_id): (u64, u64) = ctx.conn.lock().unwrap()
+ let (remote_id, repo_id): (u64, u64) = ctx.dbctx.conn.lock().unwrap()
.query_row("select id, repo_id from remotes where remote_path=?1;", [&remote_path], |row| Ok((row.get_unwrap(0), row.get_unwrap(1))))
.expect("can query");
- let (job_id, state, build_result, result_desc): (u64, u8, Option<u8>, Option<String>) = ctx.conn.lock().unwrap()
+ let (job_id, state, build_result, result_desc): (u64, u8, Option<u8>, Option<String>) = ctx.dbctx.conn.lock().unwrap()
.query_row("select id, state, build_result, final_status from jobs where commit_id=?1;", [commit_id], |row| Ok((row.get_unwrap(0), row.get_unwrap(1), row.get_unwrap(2), row.get_unwrap(3))))
.expect("can query");
let state: sql::JobState = unsafe { std::mem::transmute(state) };
- let repo_name: String = ctx.conn.lock().unwrap()
+ let repo_name: String = ctx.dbctx.conn.lock().unwrap()
.query_row("select repo_name from repos where id=?1;", [repo_id], |row| row.get(0))
.expect("can query");
@@ -455,7 +465,7 @@ async fn handle_commit_status(Path(path): Path<(String, String, String)>, State(
let output = if state == JobState::Finished && build_result == Some(1) || state == JobState::Error {
// collect stderr/stdout from the last artifacts, then the last 10kb of each, insert it in
// the page...
- let artifacts = ctx.artifacts_for_job(job_id).unwrap();
+ let artifacts = ctx.dbctx.artifacts_for_job(job_id).unwrap();
if artifacts.len() > 0 {
let mut streams = String::new();
for artifact in artifacts.iter() {
@@ -473,7 +483,7 @@ async fn handle_commit_status(Path(path): Path<(String, String, String)>, State(
None
};
- let metrics = ctx.metrics_for_job(job_id).unwrap();
+ let metrics = ctx.dbctx.metrics_for_job(job_id).unwrap();
let metrics_section = if metrics.len() > 0 {
let mut section = String::new();
section.push_str("<div>");
@@ -516,12 +526,72 @@ async fn handle_commit_status(Path(path): Path<(String, String, String)>, State(
(StatusCode::OK, Html(html))
}
-async fn handle_repo_summary(Path(path): Path<String>, State(ctx): State<Arc<DbCtx>>) -> impl IntoResponse {
+async fn handle_get_artifact(Path(path): Path<(String, String)>, State(ctx): State<WebserverState>) -> impl IntoResponse {
+ eprintln!("get artifact, job={}, artifact={}", path.0, path.1);
+ let job_id: u64 = path.0.parse().unwrap();
+ let artifact_id: u64 = path.1.parse().unwrap();
+
+ let artifact_descriptor = match ctx.dbctx.lookup_artifact(job_id, artifact_id).unwrap() {
+ Some(artifact) => artifact,
+ None => {
+ return (StatusCode::NOT_FOUND, Html("no such artifact")).into_response();
+ }
+ };
+
+ let mut live_artifact = false;
+
+ if let Some(completed_time) = artifact_descriptor.completed_time {
+ if completed_time < artifact_descriptor.created_time {
+ live_artifact = true;
+ }
+ } else {
+ live_artifact = true;
+ }
+
+ if live_artifact {
+ let (mut tx_sender, tx_receiver) = tokio::io::duplex(65536);
+ let resp_body = axum_extra::body::AsyncReadBody::new(tx_receiver);
+ let mut artifact_path = ctx.jobs_path.clone();
+ artifact_path.push(artifact_descriptor.job_id.to_string());
+ artifact_path.push(artifact_descriptor.id.to_string());
+ spawn(async move {
+ let mut artifact = artifact_descriptor;
+
+ let mut artifact_file = tokio::fs::File::open(&artifact_path)
+ .await
+ .expect("artifact file exists?");
+ while artifact.completed_time.is_none() {
+ match crate::io::forward_data(&mut artifact_file, &mut tx_sender).await {
+ Ok(()) => {
+ // reached the current EOF, wait and then commit an unspeakable sin
+ tokio::time::sleep(std::time::Duration::from_millis(250)).await;
+ // this would be much implemented as yielding on a condvar woken when an
+ // inotify event on the file indicates a write has occurred. but i am
+ // dreadfully lazy, so we'll just uhh. busy-poll on the file? lmao.
+ artifact = ctx.dbctx.lookup_artifact(artifact.job_id, artifact.id)
+ .expect("can query db")
+ .expect("artifact still exists");
+ }
+ Err(e) => {
+ eprintln!("artifact file streaming failed: {}", e);
+ }
+ }
+ }
+
+ eprintln!("[+] artifact {} is done being written, and we've sent the whole thing. bye!", artifact.id);
+ });
+ (StatusCode::OK, resp_body).into_response()
+ } else {
+ (StatusCode::OK, Html("all done")).into_response()
+ }
+}
+
+async fn handle_repo_summary(Path(path): Path<String>, State(ctx): State<WebserverState>) -> impl IntoResponse {
eprintln!("get repo summary: {:?}", path);
let mut last_builds = Vec::new();
- let (repo_id, repo_name): (u64, String) = match ctx.conn.lock().unwrap()
+ let (repo_id, repo_name): (u64, String) = match ctx.dbctx.conn.lock().unwrap()
.query_row("select id, repo_name from repos where repo_name=?1;", [&path], |row| Ok((row.get(0).unwrap(), row.get(1).unwrap())))
.optional()
.unwrap() {
@@ -532,8 +602,8 @@ async fn handle_repo_summary(Path(path): Path<String>, State(ctx): State<Arc<DbC
}
};
- for remote in ctx.remotes_by_repo(repo_id).expect("can get repo from a path") {
- let mut last_ten_jobs = ctx.recent_jobs_from_remote(remote.id, 10).expect("can look up jobs for a repo");
+ for remote in ctx.dbctx.remotes_by_repo(repo_id).expect("can get repo from a path") {
+ let mut last_ten_jobs = ctx.dbctx.recent_jobs_from_remote(remote.id, 10).expect("can look up jobs for a repo");
last_builds.extend(last_ten_jobs.drain(..));
}
last_builds.sort_by_key(|job| -(job.created_time as i64));
@@ -559,13 +629,13 @@ async fn handle_repo_summary(Path(path): Path<String>, State(ctx): State<Arc<DbC
response.push_str("</tr>\n");
for job in last_builds.iter().take(10) {
- let job_commit = ctx.commit_sha(job.commit_id).expect("job has a commit");
- let commit_html = match commit_url(&job, &job_commit, &ctx) {
+ let job_commit = ctx.dbctx.commit_sha(job.commit_id).expect("job has a commit");
+ let commit_html = match commit_url(&job, &job_commit, &ctx.dbctx) {
Some(url) => format!("<a href=\"{}\">{}</a>", url, &job_commit),
None => job_commit.clone()
};
- let job_html = format!("<a href=\"{}\">{}</a>", job_url(&job, &job_commit, &ctx), job.id);
+ let job_html = format!("<a href=\"{}\">{}</a>", job_url(&job, &job_commit, &ctx.dbctx), job.id);
let last_build_time = Utc.timestamp_millis_opt(job.created_time as i64).unwrap().to_rfc2822();
let duration = if let Some(start_time) = job.start_time {
@@ -625,7 +695,7 @@ async fn handle_repo_summary(Path(path): Path<String>, State(ctx): State<Arc<DbC
(StatusCode::OK, Html(response))
}
-async fn handle_repo_event(Path(path): Path<(String, String)>, headers: HeaderMap, State(ctx): State<Arc<DbCtx>>, body: Bytes) -> impl IntoResponse {
+async fn handle_repo_event(Path(path): Path<(String, String)>, headers: HeaderMap, State(ctx): State<WebserverState>, body: Bytes) -> impl IntoResponse {
let json: Result<serde_json::Value, _> = serde_json::from_slice(&body);
eprintln!("repo event: {:?} {:?} {:?}", path.0, path.1, headers);
@@ -674,11 +744,11 @@ async fn handle_repo_event(Path(path): Path<(String, String)>, headers: HeaderMa
}
};
- handle_github_event(ctx, path.0, path.1, kind, payload).await
+ handle_github_event(Arc::clone(&ctx.dbctx), path.0, path.1, kind, payload).await
}
-async fn make_app_server(cfg_path: &PathBuf, db_path: &PathBuf) -> Router {
+async fn make_app_server(jobs_path: PathBuf, cfg_path: &PathBuf, db_path: &PathBuf) -> Router {
/*
// GET /hello/warp => 200 OK with body "Hello, warp!"
@@ -743,9 +813,13 @@ async fn make_app_server(cfg_path: &PathBuf, db_path: &PathBuf) -> Router {
.route("/:owner/:repo/:sha", get(handle_commit_status))
.route("/:owner", get(handle_repo_summary))
.route("/:owner/:repo", post(handle_repo_event))
+ .route("/artifact/:job/:artifact_id", get(handle_get_artifact))
.route("/", get(handle_ci_index))
.fallback(fallback_get)
- .with_state(Arc::new(DbCtx::new(cfg_path, db_path)))
+ .with_state(WebserverState {
+ jobs_path,
+ dbctx: Arc::new(DbCtx::new(cfg_path, db_path))
+ })
}
#[tokio::main]
@@ -766,15 +840,16 @@ async fn main() {
web_config.key_path.clone(),
).await.unwrap();
+ let jobs_path = web_config.jobs_path.clone();
let config_path = web_config.config_path.clone();
let db_path = web_config.db_path.clone();
if let Some(addr) = web_config.debug_addr.as_ref() {
spawn(axum_server::bind_rustls("127.0.0.1:8080".parse().unwrap(), config.clone())
- .serve(make_app_server(&config_path, &db_path).await.into_make_service()));
+ .serve(make_app_server(jobs_path.clone(), &config_path, &db_path).await.into_make_service()));
}
if let Some(addr) = web_config.server_addr.as_ref() {
spawn(axum_server::bind_rustls("0.0.0.0:443".parse().unwrap(), config)
- .serve(make_app_server(&config_path, &db_path).await.into_make_service()));
+ .serve(make_app_server(jobs_path.clone(), &config_path, &db_path).await.into_make_service()));
}
loop {
tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
diff --git a/src/sql.rs b/src/sql.rs
index 6e0141a..f5ae731 100644
--- a/src/sql.rs
+++ b/src/sql.rs
@@ -84,7 +84,9 @@ pub const CREATE_ARTIFACTS_TABLE: &'static str = "\
CREATE TABLE IF NOT EXISTS artifacts (id INTEGER PRIMARY KEY AUTOINCREMENT,
job_id INTEGER,
name TEXT,
- desc TEXT);";
+ desc TEXT,
+ created_time INTEGER,
+ completed_time INTEGER);";
pub const CREATE_REMOTES_INDEX: &'static str = "\
CREATE INDEX IF NOT EXISTS 'repo_to_remote' ON remotes(repo_id);";
@@ -98,6 +100,9 @@ pub const PENDING_JOBS: &'static str = "\
pub const LAST_ARTIFACTS_FOR_JOB: &'static str = "\
select * from artifacts where job_id=?1 and (name like \"%(stderr)%\" or name like \"%(stdout)%\") order by id desc limit 2;";
+pub const ARTIFACT_BY_ID: &'static str = "\
+ select * from artifacts where id=?1 and job_id=?2;";
+
pub const METRICS_FOR_JOB: &'static str = "\
select * from metrics where job_id=?1 order by id asc;";