summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/ci_driver.rs77
-rw-r--r--src/ci_runner.rs193
-rw-r--r--src/dbctx.rs114
-rw-r--r--src/notifier.rs5
-rw-r--r--src/sql.rs9
5 files changed, 346 insertions, 52 deletions
diff --git a/src/ci_driver.rs b/src/ci_driver.rs
index 96e0d44..9ec0dd8 100644
--- a/src/ci_driver.rs
+++ b/src/ci_driver.rs
@@ -9,6 +9,7 @@ use std::time::{SystemTime, UNIX_EPOCH};
use axum_server::tls_rustls::RustlsConfig;
use axum::body::StreamBody;
use axum::http::{StatusCode};
+use hyper::HeaderMap;
use axum::Router;
use axum::routing::*;
use axum::extract::State;
@@ -101,8 +102,8 @@ async fn activate_job(dbctx: Arc<DbCtx>, job: &PendingJob, clients: &mut mpsc::R
let run_host = client_job.client.name.clone();
connection.execute(
- "update jobs set started_time=?1, run_host=?2, state=1, artifacts_path=?3 where id=?4",
- (now as u64, run_host, format!("{}", artifacts.display()), job.id)
+ "update jobs set started_time=?1, run_host=?2, state=1, artifacts_path=?3, build_token=?4 where id=?5",
+ (now as u64, run_host, format!("{}", artifacts.display()), &client_job.client.build_token, job.id)
)
.expect("can update");
@@ -139,7 +140,15 @@ struct ClientJob {
impl ClientJob {
pub async fn run(&mut self) {
loop {
- let msg = self.client.recv().await.expect("recv works").expect("client sent an object");
+ eprintln!("waiting on response..");
+ let msg = match self.client.recv().await.expect("recv works") {
+ Some(msg) => msg,
+ None => {
+ eprintln!("client hung up. job's done, i hope?");
+ return;
+ }
+ };
+ eprintln!("got {:?}", msg);
let msg_kind = msg.as_object().unwrap().get("kind").unwrap().as_str().unwrap();
match msg_kind {
"job_status" => {
@@ -228,7 +237,6 @@ impl RunnerClient {
Err(format!("no client job: {:?}", e))
},
None => {
- eprintln!("no more body chunks? client hung up?");
Ok(None)
}
}
@@ -258,7 +266,6 @@ impl RunnerClient {
}
}
Ok(None) => {
- eprintln!("no more body chunks? client hung up?");
Ok(None)
}
Err(e) => {
@@ -275,6 +282,65 @@ impl fmt::Debug for RunnerClient {
}
}
+#[axum_macros::debug_handler]
+async fn handle_artifact(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClient>)>, headers: HeaderMap, artifact_content: BodyStream) -> impl IntoResponse {
+ let job_token = match headers.get("x-job-token") {
+ Some(job_token) => job_token.to_str().expect("valid string"),
+ None => {
+ eprintln!("bad artifact post: headers: {:?}\nno x-job-token", headers);
+ return (StatusCode::BAD_REQUEST, "").into_response();
+ }
+ };
+
+ let (job, artifact_path, token_validity) = match ctx.0.job_for_token(&job_token).unwrap() {
+ Some(result) => result,
+ None => {
+ eprintln!("bad artifact post: headers: {:?}\njob token is not known", headers);
+ return (StatusCode::BAD_REQUEST, "").into_response();
+ }
+ };
+
+ if token_validity != dbctx::TokenValidity::Valid {
+ eprintln!("bad artifact post: headers: {:?}. token is not valid: {:?}", headers, token_validity);
+ return (StatusCode::BAD_REQUEST, "").into_response();
+ }
+
+ let artifact_path = if let Some(artifact_path) = artifact_path {
+ artifact_path
+ } else {
+ eprintln!("bad artifact post: headers: {:?}. no artifact path?", headers);
+ return (StatusCode::BAD_REQUEST, "").into_response();
+ };
+
+ let artifact_name = match headers.get("x-artifact-name") {
+ Some(artifact_name) => artifact_name.to_str().expect("valid string"),
+ None => {
+ eprintln!("bad artifact post: headers: {:?}\nno x-artifact-name", headers);
+ return (StatusCode::BAD_REQUEST, "").into_response();
+ }
+ };
+
+ let artifact_desc = match headers.get("x-artifact-desc") {
+ Some(artifact_desc) => artifact_desc.to_str().expect("valid string"),
+ None => {
+ eprintln!("bad artifact post: headers: {:?}\nno x-artifact-desc", headers);
+ return (StatusCode::BAD_REQUEST, "").into_response();
+ }
+ };
+
+ let mut artifact = match ctx.0.reserve_artifact(job, artifact_name, artifact_desc).await {
+ Ok(artifact) => artifact,
+ Err(err) => {
+ eprintln!("failure to reserve artifact: {:?}", err);
+ return (StatusCode::INTERNAL_SERVER_ERROR, "").into_response();
+ }
+ };
+
+ spawn(async move { artifact.store_all(artifact_content).await });
+
+ (StatusCode::OK, "").into_response()
+}
+
async fn handle_next_job(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClient>)>, job_resp: BodyStream) -> impl IntoResponse {
let (tx_sender, tx_receiver) = mpsc::channel(8);
let resp_body = StreamBody::new(ReceiverStream::new(tx_receiver));
@@ -308,6 +374,7 @@ async fn make_api_server(dbctx: Arc<DbCtx>) -> (Router, mpsc::Receiver<RunnerCli
let router = Router::new()
.route("/api/next_job", post(handle_next_job))
+ .route("/api/artifact", post(handle_artifact))
.with_state((dbctx, pending_client_sender));
(router, pending_client_receiver)
}
diff --git a/src/ci_runner.rs b/src/ci_runner.rs
index 6554f05..9224614 100644
--- a/src/ci_runner.rs
+++ b/src/ci_runner.rs
@@ -1,8 +1,14 @@
use std::time::Duration;
use reqwest::{StatusCode, Response};
-use std::process::Command;
+use tokio::process::Command;
+use std::process::Stdio;
+use std::process::ExitStatus;
+use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use serde_derive::{Deserialize, Serialize};
use serde::{Deserialize, de::DeserializeOwned, Serialize};
+use std::task::{Context, Poll};
+use std::pin::Pin;
+use std::marker::Unpin;
#[derive(Debug)]
enum WorkAcquireError {
@@ -12,6 +18,7 @@ enum WorkAcquireError {
}
struct RunnerClient {
+ http: reqwest::Client,
host: String,
tx: hyper::body::Sender,
rx: Response,
@@ -28,75 +35,127 @@ struct RequestedJob {
impl RequestedJob {
// TODO: panics if hyper finds the channel is closed. hum
async fn create_artifact(&self, client: &mut RunnerClient, name: &str, desc: &str) -> Result<ArtifactStream, String> {
- let create_artifact_message = serde_json::json!({
- "kind": "artifact_create",
- "name": name,
- "description": desc,
- "job_token": &self.build_token,
- });
- client.send(create_artifact_message).await
- .map_err(|e| format!("create artifact send error: {:?}", e))?;
- let resp = client.recv().await
- .map_err(|e| format!("create artifact recv error: {:?}", e))?;
- eprintln!("resp: {:?}", resp);
- let object_id = resp.unwrap()
- .as_object().expect("is an object")
- .get("object_id").unwrap().as_str().expect("is str")
- .to_owned();
- // POST to this object id...
- Ok(ArtifactStream {
- object_id,
- })
+ eprintln!("[?] creating artifact...");
+ let (mut sender, body) = hyper::Body::channel();
+ let resp = client.http.post("https://ci.butactuallyin.space:9876/api/artifact")
+ .header("user-agent", "ci-butactuallyin-space-runner")
+ .header("x-job-token", &self.build_token)
+ .header("x-artifact-name", name)
+ .header("x-artifact-desc", desc)
+ .body(body)
+ .send()
+ .await
+ .map_err(|e| format!("unable to send request: {:?}", e))?;
+
+ if resp.status() == StatusCode::OK {
+ eprintln!("[+] artifact '{}' started", name);
+ Ok(ArtifactStream {
+ sender,
+ })
+ } else {
+ Err(format!("[-] unable to create artifact: {:?}", resp))
+ }
}
- async fn execute_goodfile(&self, client: &mut RunnerClient) -> Result<String, String> {
- let clone_log = self.create_artifact(client, "git clone log", &format!("git clone {} tmpdir", &self.remote_url)).await.expect("works");
+ async fn execute_command(&self, client: &mut RunnerClient, mut command: Command, name: &str, desc: &str) -> Result<ExitStatus, String> {
+ eprintln!("[.] running {}", name);
+ async fn forward_data(mut source: impl AsyncRead + Unpin, mut dest: impl AsyncWrite + Unpin) -> Result<(), String> {
+ let mut buf = vec![0; 1024 * 1024];
+ loop {
+ let n_read = source.read(&mut buf).await
+ .map_err(|e| format!("failed to read: {:?}", e))?;
- let clone_res = Command::new("git")
+ if n_read == 0 {
+ return Ok(());
+ }
+
+ dest.write_all(&buf[..n_read]).await
+ .map_err(|e| format!("failed to write: {:?}", e))?;
+ }
+ }
+
+ let stdout_artifact = self.create_artifact(
+ client,
+ &format!("{} (stdout)", name),
+ &format!("{} (stdout)", desc)
+ ).await.expect("works");
+ let stderr_artifact = self.create_artifact(
+ client,
+ &format!("{} (stderr)", name),
+ &format!("{} (stderr)", desc)
+ ).await.expect("works");
+
+ let mut child = command
+ .stdin(Stdio::null())
+ .stdout(Stdio::piped())
+ .stderr(Stdio::piped())
+ .spawn()
+ .map_err(|e| format!("failed to spawn '{}', {:?}", name, e))?;
+
+ let child_stdout = child.stdout.take().unwrap();
+ let child_stderr = child.stderr.take().unwrap();
+
+ eprintln!("[.] '{}': forwarding stdout", name);
+ tokio::spawn(forward_data(child_stdout, stdout_artifact));
+ eprintln!("[.] '{}': forwarding stderr", name);
+ tokio::spawn(forward_data(child_stderr, stderr_artifact));
+
+ let res = child.wait().await
+ .map_err(|e| format!("failed to wait? {:?}", e))?;
+
+ if res.success() {
+ eprintln!("[+] '{}' success", name);
+ } else {
+ eprintln!("[-] '{}' fail: {:?}", name, res);
+ }
+
+ Ok(res)
+ }
+
+ async fn execute_goodfile(&self, client: &mut RunnerClient) -> Result<String, String> {
+ let mut git_clone = Command::new("git");
+ git_clone
.arg("clone")
.arg(&self.remote_url)
- .arg("tmpdir")
- .status()
- .map_err(|e| format!("failed to run git clone? {:?}", e))?;
+ .arg("tmpdir");
+
+ let clone_res = self.execute_command(client, git_clone, "git clone log", &format!("git clone {} tmpdir", &self.remote_url)).await?;
if !clone_res.success() {
return Err(format!("git clone failed: {:?}", clone_res));
}
- let checkout_log = self.create_artifact(client, "git checkout log", &format!("git checkout {}", &self.commit)).await.expect("works");
-
- let checkout_res = Command::new("git")
+ let mut git_checkout = Command::new("git");
+ git_checkout
.current_dir("tmpdir")
.arg("checkout")
- .arg(&self.commit)
- .status()
- .map_err(|e| format!("failed to run git checkout? {:?}", e))?;
+ .arg(&self.commit);
+
+ let checkout_res = self.execute_command(client, git_checkout, "git checkout log", &format!("git checkout {}", &self.commit)).await?;
if !checkout_res.success() {
return Err(format!("git checkout failed: {:?}", checkout_res));
}
- let build_log = self.create_artifact(client, "cargo build log", "cargo build").await.expect("works");
-
- let build_res = Command::new("cargo")
+ let mut build = Command::new("cargo");
+ build
.current_dir("tmpdir")
- .arg("build")
- .status()
- .map_err(|e| format!("failed to run cargo build? {:?}", e))?;
+ .arg("build");
+
+ let build_res = self.execute_command(client, build, "cargo build log", "cargo build").await?;
if !build_res.success() {
return Err(format!("cargo build failed: {:?}", build_res));
}
- let test_log = self.create_artifact(client, "cargo test log", "cargo test").await.expect("works");
-
- let test_result = Command::new("cargo")
+ let mut test = Command::new("cargo");
+ test
.current_dir("tmpdir")
- .arg("test")
- .status()
- .map_err(|e| format!("failed to run cargo test? {:?}", e))?;
+ .arg("test");
+
+ let test_res = self.execute_command(client, test, "cargo test log", "cargo test").await?;
- match test_result.code() {
+ match test_res.code() {
Some(0) => Ok("pass".to_string()),
Some(n) => Ok(format!("error: {}", n)),
None => Ok(format!("abnormal exit")),
@@ -105,7 +164,38 @@ impl RequestedJob {
}
struct ArtifactStream {
- object_id: String,
+ sender: hyper::body::Sender,
+}
+
+impl tokio::io::AsyncWrite for ArtifactStream {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context,
+ buf: &[u8]
+ ) -> Poll<Result<usize, std::io::Error>> {
+ match self.get_mut().sender.try_send_data(buf.to_vec().into()) {
+ Ok(()) => {
+ Poll::Ready(Ok(buf.len()))
+ },
+ _ => {
+ Poll::Pending
+ }
+ }
+ }
+
+ fn poll_flush(
+ self: Pin<&mut Self>,
+ _cx: &mut Context
+ ) -> Poll<Result<(), std::io::Error>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn poll_shutdown(
+ self: Pin<&mut Self>,
+ _cx: &mut Context
+ ) -> Poll<Result<(), std::io::Error>> {
+ Poll::Ready(Ok(()))
+ }
}
impl RunnerClient {
@@ -120,6 +210,11 @@ impl RunnerClient {
}
Ok(Self {
+ http: reqwest::ClientBuilder::new()
+ .connect_timeout(Duration::from_millis(1000))
+ .timeout(Duration::from_millis(600000))
+ .build()
+ .expect("can build client"),
host: host.to_string(),
tx: sender,
rx: res,
@@ -187,6 +282,8 @@ impl RunnerClient {
match res {
Ok(status) => {
+ eprintln!("[+] job success!");
+
self.send(serde_json::json!({
"kind": "job_status",
"state": "finished",
@@ -194,6 +291,8 @@ impl RunnerClient {
})).await.unwrap();
}
Err(status) => {
+ eprintln!("[-] job error: {}", status);
+
self.send(serde_json::json!({
"kind": "job_status",
"state": "interrupted",
@@ -213,6 +312,8 @@ async fn main() {
.build()
.expect("can build client");
+ let allowed_pushers = None;
+
loop {
let (mut sender, body) = hyper::Body::channel();
let poll = client.post("https://ci.butactuallyin.space:9876/api/next_job")
diff --git a/src/dbctx.rs b/src/dbctx.rs
index 3af2d56..d025b8d 100644
--- a/src/dbctx.rs
+++ b/src/dbctx.rs
@@ -1,10 +1,15 @@
use std::sync::Mutex;
+use futures_util::StreamExt;
use rusqlite::{Connection, OptionalExtension};
use std::time::{SystemTime, UNIX_EPOCH};
+use tokio::fs::{File, OpenOptions};
+use tokio::io::{AsyncReadExt, AsyncWriteExt};
use crate::notifier::{RemoteNotifier, NotifierConfig};
use crate::sql;
+const TOKEN_EXPIRY_MS: u64 = 1000 * 60 * 30;
+
pub struct DbCtx {
pub config_path: String,
// don't love this but.. for now...
@@ -22,6 +27,61 @@ pub struct PendingJob {
pub created_time: u64,
}
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum TokenValidity {
+ Expired,
+ Invalid,
+ Valid,
+}
+
+pub struct ArtifactDescriptor {
+ job_id: u64,
+ artifact_id: u64,
+ file: File,
+}
+
+impl ArtifactDescriptor {
+ async fn new(job_id: u64, artifact_id: u64) -> Result<Self, String> {
+ // TODO: jobs should be a configurable path
+ let path = format!("jobs/{}/{}", job_id, artifact_id);
+ let file = OpenOptions::new()
+ .read(true)
+ .write(true)
+ .create_new(true)
+ .open(&path)
+ .await
+ .map_err(|e| format!("couldn't open artifact file {}: {}", path, e))?;
+
+ Ok(ArtifactDescriptor {
+ job_id,
+ artifact_id,
+ file,
+ })
+ }
+
+ pub async fn store_all(&mut self, mut data: axum::extract::BodyStream) -> Result<(), String> {
+ loop {
+ let chunk = data.next().await;
+
+ let chunk = match chunk {
+ Some(Ok(chunk)) => chunk,
+ Some(Err(e)) => {
+ return Err(format!("error reading: {:?}", e));
+ }
+ None => {
+ eprintln!("body done?");
+ return Ok(());
+ }
+ };
+
+ let chunk = chunk.as_ref();
+
+ self.file.write_all(chunk).await
+ .map_err(|e| format!("failed to write: {:?}", e))?;
+ }
+ }
+}
+
impl DbCtx {
pub fn new(config_path: &str, db_path: &str) -> Self {
DbCtx {
@@ -32,6 +92,7 @@ impl DbCtx {
pub fn create_tables(&self) -> Result<(), ()> {
let conn = self.conn.lock().unwrap();
+ conn.execute(sql::CREATE_ARTIFACTS_TABLE, ()).unwrap();
conn.execute(sql::CREATE_JOBS_TABLE, ()).unwrap();
conn.execute(sql::CREATE_COMMITS_TABLE, ()).unwrap();
conn.execute(sql::CREATE_REPOS_TABLE, ()).unwrap();
@@ -68,6 +129,56 @@ impl DbCtx {
Ok(conn.last_insert_rowid() as u64)
}
+ pub async fn reserve_artifact(&self, job_id: u64, name: &str, desc: &str) -> Result<ArtifactDescriptor, String> {
+ let artifact_id = {
+ let conn = self.conn.lock().unwrap();
+ conn
+ .execute(
+ "insert into artifacts (job_id, name, desc) values (?1, ?2, ?3)",
+ (job_id, name, desc)
+ )
+ .map_err(|e| {
+ format!("{:?}", e)
+ })?;
+
+ conn.last_insert_rowid() as u64
+ };
+
+ ArtifactDescriptor::new(job_id, artifact_id).await
+ }
+
+ pub fn job_for_token(&self, token: &str) -> Result<Option<(u64, Option<String>, TokenValidity)>, String> {
+ self.conn.lock()
+ .unwrap()
+ .query_row(
+ "select id, artifacts_path, started_time, job_timeout from jobs where build_token=?1",
+ [token],
+ |row| {
+ let timeout: Option<u64> = row.get(3).unwrap();
+ let timeout = timeout.unwrap_or(TOKEN_EXPIRY_MS);
+
+ let now = SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .expect("now is before epoch")
+ .as_millis();
+
+ let time: Option<u64> = row.get(2).unwrap();
+ let validity = if let Some(time) = time {
+ if now > time as u128 + timeout as u128 {
+ TokenValidity::Expired
+ } else {
+ TokenValidity::Valid
+ }
+ } else {
+ TokenValidity::Invalid
+ };
+ Ok((row.get(0).unwrap(), row.get(1).unwrap(), validity))
+ }
+ )
+ .optional()
+ .map_err(|e| e.to_string())
+ }
+
pub fn repo_id_by_remote(&self, remote_id: u64) -> Result<Option<u64>, String> {
self.conn.lock()
.unwrap()
@@ -89,6 +200,9 @@ impl DbCtx {
"github" => {
(remote.to_owned(), remote_kind.to_owned(), format!("https://www.github.com/{}", remote), format!("https://www.github.com/{}.git", remote))
},
+ "github-email" => {
+ (remote.to_owned(), "email".to_owned(), format!("https://www.github.com/{}", remote), format!("http://www.github.com/{}.git", remote))
+ },
other => {
panic!("unsupported remote kind: {}", other);
}
diff --git a/src/notifier.rs b/src/notifier.rs
index 4ddc91b..3d9964a 100644
--- a/src/notifier.rs
+++ b/src/notifier.rs
@@ -1,6 +1,11 @@
use serde_derive::{Deserialize, Serialize};
use std::sync::Arc;
use axum::http::StatusCode;
+use lettre::transport::smtp::authentication::{Credentials, Mechanism};
+use lettre::{Message, Transport};
+use lettre::transport::smtp::extension::ClientId;
+use lettre::transport::smtp::client::{SmtpConnection, TlsParametersBuilder};
+use std::time::Duration;
use crate::DbCtx;
diff --git a/src/sql.rs b/src/sql.rs
index 80eb18a..81c58de 100644
--- a/src/sql.rs
+++ b/src/sql.rs
@@ -38,7 +38,8 @@ pub const CREATE_JOBS_TABLE: &'static str = "\
commit_id INTEGER,
created_time INTEGER,
started_time INTEGER,
- complete_time INTEGER);";
+ complete_time INTEGER,
+ job_timeout INTEGER);";
pub const CREATE_COMMITS_TABLE: &'static str = "\
CREATE TABLE IF NOT EXISTS commits (id INTEGER PRIMARY KEY AUTOINCREMENT, sha TEXT UNIQUE);";
@@ -62,6 +63,12 @@ pub const CREATE_REMOTES_TABLE: &'static str = "\
remote_git_url TEXT,
notifier_config_path TEXT);";
+pub const CREATE_ARTIFACTS_TABLE: &'static str = "\
+ CREATE TABLE IF NOT EXISTS artifacts (id INTEGER PRIMARY KEY AUTOINCREMENT,
+ job_id INTEGER,
+ name TEXT,
+ desc TEXT);";
+
pub const CREATE_REMOTES_INDEX: &'static str = "\
CREATE INDEX IF NOT EXISTS 'repo_to_remote' ON remotes(repo_id);";