summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/ci_ctl.rs184
-rw-r--r--src/ci_driver.rs310
-rw-r--r--src/ci_runner.rs269
-rw-r--r--src/dbctx.rs204
-rw-r--r--src/main.rs341
-rw-r--r--src/notifier.rs126
-rw-r--r--src/sql.rs23
7 files changed, 1198 insertions, 259 deletions
diff --git a/src/ci_ctl.rs b/src/ci_ctl.rs
new file mode 100644
index 0000000..3f48907
--- /dev/null
+++ b/src/ci_ctl.rs
@@ -0,0 +1,184 @@
+use clap::{Parser, Subcommand};
+
+mod sql;
+mod dbctx;
+mod notifier;
+
+use sql::JobState;
+use dbctx::DbCtx;
+use notifier::NotifierConfig;
+
+#[derive(Parser)]
+#[command(version, about, long_about = None)]
+struct Args {
+ /// path to a database to manage (defaults to "./state.db")
+ db_path: Option<String>,
+
+ /// path to where configs should be found (defaults to "./config")
+ config_path: Option<String>,
+
+ #[command(subcommand)]
+ command: Command,
+}
+
+#[derive(Subcommand)]
+enum Command {
+ /// add _something_ to the db
+ Add {
+ #[command(subcommand)]
+ what: AddItem,
+ },
+
+ /// make sure that the state looks reasonable.
+ ///
+ /// currently, ensure that all notifiers have a config, that config references an existing
+ /// file, and that the referenced file is valid.
+ Validate,
+
+ /// do something with jobs
+ Job {
+ #[command(subcommand)]
+ what: JobAction,
+ },
+}
+
+#[derive(Subcommand)]
+enum JobAction {
+ List,
+ Rerun {
+ which: u32
+ }
+}
+
+#[derive(Subcommand)]
+enum AddItem {
+ Repo {
+ name: String,
+ remote: Option<String>,
+ remote_kind: Option<String>,
+ config: Option<String>,
+ },
+ Remote {
+ repo_name: String,
+ remote: String,
+ remote_kind: String,
+ config: String,
+ },
+}
+
+fn main() {
+ let args = Args::parse();
+
+ let db_path = args.db_path.unwrap_or_else(|| "./state.db".to_owned());
+ let config_path = args.config_path.unwrap_or_else(|| "./config".to_owned());
+
+ match args.command {
+ Command::Job { what } => {
+ match what {
+ JobAction::List => {
+ let db = DbCtx::new(&config_path, &db_path);
+ let mut conn = db.conn.lock().unwrap();
+ let mut query = conn.prepare("select id, artifacts_path, state, commit_id, created_time from jobs;").unwrap();
+ let mut jobs = query.query([]).unwrap();
+ while let Some(row) = jobs.next().unwrap() {
+ let (id, artifacts, state, commit_id, created_time): (u64, Option<String>, u64, u64, u64) = row.try_into().unwrap();
+
+ eprintln!("[+] {:04} | {: >8?} | {}", id, state, created_time);
+ }
+ eprintln!("jobs");
+ },
+ JobAction::Rerun { which } => {
+ let db = DbCtx::new(&config_path, &db_path);
+ db.conn.lock().unwrap().execute("update jobs set state=0 where id=?1", [which])
+ .expect("works");
+ eprintln!("[+] job {} set to pending", which);
+ }
+ }
+ },
+ Command::Add { what } => {
+ match what {
+ AddItem::Repo { name, remote, remote_kind, config } => {
+ let remote_config = match (remote, remote_kind, config) {
+ (Some(remote), Some(remote_kind), Some(config_path)) => {
+ // do something
+ if remote_kind != "github" {
+ eprintln!("unknown remote kind: {}", remote);
+ return;
+ }
+ Some((remote, remote_kind, config_path))
+ },
+ (None, None, None) => {
+ None
+ },
+ _ => {
+ eprintln!("when specifying a remote, `remote`, `remote_kind`, and `config_path` must either all be provided together or not at all");
+ return;
+ }
+ };
+
+ let db = DbCtx::new(&config_path, &db_path);
+ let repo_id = match db.new_repo(&name) {
+ Ok(repo_id) => repo_id,
+ Err(e) => {
+ if e.contains("UNIQUE constraint failed") {
+ eprintln!("[!] repo '{}' already exists", name);
+ return;
+ } else {
+ eprintln!("[!] failed to create repo entry: {}", e);
+ return;
+ }
+ }
+ };
+ println!("[+] new repo created: '{}' id {}", &name, repo_id);
+ if let Some((remote, remote_kind, config_path)) = remote_config {
+ let full_config_file_path = format!("{}/{}", &db.config_path, config_path);
+ let config = match remote_kind.as_ref() {
+ "github" => {
+ assert!(NotifierConfig::github_from_file(&full_config_file_path).is_ok());
+ }
+ "email" => {
+ assert!(NotifierConfig::email_from_file(&full_config_file_path).is_ok());
+ }
+ other => {
+ panic!("[-] notifiers for '{}' remotes are not supported", other);
+ }
+ };
+ db.new_remote(repo_id, remote.as_str(), remote_kind.as_str(), config_path.as_str()).unwrap();
+ println!("[+] new remote created: repo '{}', {} remote at {}", &name, remote_kind, remote);
+ }
+ },
+ AddItem::Remote { repo_name, remote, remote_kind, config } => {
+ let db = DbCtx::new(&config_path, &db_path);
+ let repo_id = match db.repo_id_by_name(&repo_name) {
+ Ok(Some(id)) => id,
+ Ok(None) => {
+ eprintln!("[-] repo '{}' does not exist", repo_name);
+ return;
+ },
+ Err(e) => {
+ eprintln!("[!] couldn't look up repo '{}': {:?}", repo_name, e);
+ return;
+ }
+ };
+ let config_file = format!("{}/{}", config_path, config);
+ match remote_kind.as_ref() {
+ "github" => {
+ NotifierConfig::github_from_file(&config_file).unwrap();
+ }
+ "email" => {
+ NotifierConfig::email_from_file(&config_file).unwrap();
+ }
+ other => {
+ panic!("notifiers for '{}' remotes are not supported", other);
+ }
+ };
+ db.new_remote(repo_id, remote.as_str(), remote_kind.as_str(), config.as_str()).unwrap();
+ println!("[+] new remote created: repo '{}', {} remote at {}", &repo_name, remote_kind, remote);
+ },
+ }
+ },
+ Command::Validate => {
+ println!("ok");
+ }
+ }
+}
diff --git a/src/ci_driver.rs b/src/ci_driver.rs
index fd6f813..96e0d44 100644
--- a/src/ci_driver.rs
+++ b/src/ci_driver.rs
@@ -1,10 +1,29 @@
-use rusqlite::Connection;
use std::process::Command;
+use futures_util::StreamExt;
+use std::fmt;
use std::path::{Path, PathBuf};
+use tokio::spawn;
+use tokio_stream::wrappers::ReceiverStream;
+use std::sync::Arc;
+use std::time::{SystemTime, UNIX_EPOCH};
+use axum_server::tls_rustls::RustlsConfig;
+use axum::body::StreamBody;
+use axum::http::{StatusCode};
+use axum::Router;
+use axum::routing::*;
+use axum::extract::State;
+use axum::extract::BodyStream;
+use axum::response::IntoResponse;
+use tokio::sync::mpsc;
+use tokio::sync::mpsc::error::TrySendError;
+use serde_json::json;
+mod dbctx;
mod sql;
+mod notifier;
-use std::time::{SystemTime, UNIX_EPOCH};
+use crate::dbctx::{DbCtx, PendingJob};
+use crate::sql::JobState;
fn reserve_artifacts_dir(job: u64) -> std::io::Result<PathBuf> {
let mut path: PathBuf = "/root/ixi_ci_server/jobs/".into();
@@ -13,33 +32,43 @@ fn reserve_artifacts_dir(job: u64) -> std::io::Result<PathBuf> {
Ok(path)
}
-fn activate_job(connection: &mut Connection, job: u64, artifacts: Option<String>, state: u8, run_host: Option<String>, commit_id: u64, repo_url: String, repo_name: String) {
+async fn activate_job(dbctx: Arc<DbCtx>, job: &PendingJob, clients: &mut mpsc::Receiver<RunnerClient>) -> Result<(), String> {
+ let connection = dbctx.conn.lock().unwrap();
+ let (repo_id, remote_git_url): (u64, String) = connection
+ .query_row("select repo_id, remote_git_url from remotes where id=?1", [job.remote_id], |row| Ok((row.get_unwrap(0), row.get_unwrap(1))))
+ .expect("query succeeds");
+ let repo_name: String = connection
+ .query_row("select repo_name from repos where id=?1", [repo_id], |row| row.get(0))
+ .expect("query succeeds");
+
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("now is before epoch")
.as_millis();
let commit_sha: String = connection
- .query_row("select sha from commits where id=?1", [commit_id], |row| row.get(0))
+ .query_row("select sha from commits where id=?1", [job.commit_id], |row| row.get(0))
.expect("can run query");
- let artifacts: PathBuf = match artifacts {
+ let artifacts = PathBuf::from("/tmp");
+ /*
+ let artifacts: PathBuf = match &job.artifacts {
Some(artifacts) => PathBuf::from(artifacts),
- None => reserve_artifacts_dir(job).expect("can reserve a directory for artifacts")
+ None => reserve_artifacts_dir(job.id).expect("can reserve a directory for artifacts")
};
- if run_host == None {
+ if job.run_host.as_ref() == None {
eprintln!("need to find a host to run the job");
}
- eprintln!("cloning {}", repo_url);
+ eprintln!("cloning {}", remote_git_url);
let mut repo_dir = artifacts.clone();
repo_dir.push("repo");
eprintln!(" ... into {}", repo_dir.display());
Command::new("git")
.arg("clone")
- .arg(repo_url)
+ .arg(&remote_git_url)
.arg(&format!("{}", repo_dir.display()))
.status()
.expect("can clone the repo");
@@ -48,44 +77,265 @@ fn activate_job(connection: &mut Connection, job: u64, artifacts: Option<String>
Command::new("git")
.current_dir(&repo_dir)
.arg("checkout")
- .arg(commit_sha)
+ .arg(&commit_sha)
.status()
.expect("can checkout hash");
+ */
eprintln!("running {}", repo_name);
/*
* find the CI script, figure out how to run it
*/
+ let mut client_job = loop {
+ let mut candidate = clients.recv().await
+ .ok_or_else(|| "client channel disconnected".to_string())?;
+
+ if let Ok(Some(mut client_job)) = candidate.submit(&dbctx, &job, &remote_git_url, &commit_sha).await {
+ break client_job;
+ } else {
+ // failed to submit job, move on for now
+ }
+ };
+
+ let run_host = client_job.client.name.clone();
+
connection.execute(
"update jobs set started_time=?1, run_host=?2, state=1, artifacts_path=?3 where id=?4",
- (now as u64, "test host".to_string(), format!("{}", artifacts.display()), job)
+ (now as u64, run_host, format!("{}", artifacts.display()), job.id)
)
.expect("can update");
+
+ spawn(async move {
+ client_job.run().await
+ });
+
+ Ok(())
}
-fn main() {
- let mut connection = Connection::open("/root/ixi_ci_server/state.db").unwrap();
- connection.execute(sql::CREATE_JOBS_TABLE, ()).unwrap();
- connection.execute(sql::CREATE_COMMITS_TABLE, ()).unwrap();
- connection.execute(sql::CREATE_REPOS_TABLE, ()).unwrap();
- connection.execute(sql::CREATE_REMOTES_TABLE, ()).unwrap();
+struct RunnerClient {
+ tx: mpsc::Sender<Result<String, String>>,
+ rx: BodyStream,
+ name: String,
+ build_token: String,
+}
- loop {
- let mut pending_query = connection.prepare(sql::PENDING_JOBS).unwrap();
- let mut jobs = pending_query.query([]).unwrap();
- let mut to_start = Vec::new();
- while let Some(row) = jobs.next().unwrap() {
- let (id, artifacts, state, run_host, commit_id, repo_url, repo_name): (u64, Option<String>, u8, Option<String>, u64, String, String)= TryInto::try_into(row).unwrap();
- to_start.push((id, artifacts, state, run_host, commit_id, repo_url, repo_name));
+fn random_name() -> String {
+ "random name".to_string()
+}
+
+fn token_for_job() -> String {
+ "very secret token do not share".to_string()
+}
+
+struct ClientJob {
+ dbctx: Arc<DbCtx>,
+ remote_git_url: String,
+ sha: String,
+ job: PendingJob,
+ client: RunnerClient
+}
+
+impl ClientJob {
+ pub async fn run(&mut self) {
+ loop {
+ let msg = self.client.recv().await.expect("recv works").expect("client sent an object");
+ let msg_kind = msg.as_object().unwrap().get("kind").unwrap().as_str().unwrap();
+ match msg_kind {
+ "job_status" => {
+ let state = msg.as_object().unwrap().get("state").unwrap().as_str().unwrap();
+ let (result, state): (Result<String, String>, JobState) = if state == "finished" {
+ let result = msg.as_object().unwrap().get("result").unwrap().as_str().unwrap();
+ eprintln!("job update: state is {} and result is {}", state, result);
+ match result {
+ "pass" => {
+ (Ok("success".to_string()), JobState::Complete)
+ },
+ other => {
+ (Err(other.to_string()), JobState::Error)
+ }
+ }
+ } else if state == "interrupted" {
+ let result = msg.as_object().unwrap().get("result").unwrap().as_str().unwrap();
+ eprintln!("job update: state is {} and result is {}", state, result);
+ (Err(result.to_string()), JobState::Error)
+ } else {
+ eprintln!("job update: state is {}", state);
+ (Err(format!("atypical completion status: {}", state)), JobState::Invalid)
+ };
+
+ let repo_id = self.dbctx.repo_id_by_remote(self.job.remote_id).unwrap().expect("remote exists");
+
+ for notifier in self.dbctx.notifiers_by_repo(repo_id).expect("can get notifiers") {
+ notifier.tell_complete_job(&self.dbctx, repo_id, &self.sha, self.job.id, result.clone()).await.expect("can notify");
+ }
+
+ let now = SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .expect("now is before epoch")
+ .as_millis();
+
+ self.dbctx.conn.lock().unwrap().execute(
+ "update jobs set complete_time=?1, state=?2 where id=?3",
+ (now as u64, state as u64, self.job.id)
+ )
+ .expect("can update");
+ }
+ "artifact_create" => {
+ eprintln!("creating artifact {:?}", msg);
+ self.client.send(serde_json::json!({
+ "status": "ok",
+ "object_id": "10",
+ })).await.unwrap();
+ },
+ other => {
+ eprintln!("unhandled message kind {:?} ({:?})", msg_kind, msg);
+ return;
+ }
+ }
+ }
+ }
+}
+
+impl RunnerClient {
+ async fn new(sender: mpsc::Sender<Result<String, String>>, resp: BodyStream) -> Result<Self, String> {
+ let name = random_name();
+ let token = token_for_job();
+ let client = RunnerClient {
+ tx: sender,
+ rx: resp,
+ name,
+ build_token: token,
+ };
+ Ok(client)
+ }
+
+ async fn send(&mut self, msg: serde_json::Value) -> Result<(), String> {
+ self.tx.send(Ok(serde_json::to_string(&msg).unwrap()))
+ .await
+ .map_err(|e| e.to_string())
+ }
+
+ async fn recv(&mut self) -> Result<Option<serde_json::Value>, String> {
+ match self.rx.next().await {
+ Some(Ok(bytes)) => {
+ serde_json::from_slice(&bytes)
+ .map(Option::Some)
+ .map_err(|e| e.to_string())
+ },
+ Some(Err(e)) => {
+ eprintln!("e: {:?}", e);
+ Err(format!("no client job: {:?}", e))
+ },
+ None => {
+ eprintln!("no more body chunks? client hung up?");
+ Ok(None)
+ }
+ }
+ }
+
+ async fn submit(mut self, dbctx: &Arc<DbCtx>, job: &PendingJob, remote_git_url: &str, sha: &str) -> Result<Option<ClientJob>, String> {
+ self.send(serde_json::json!({
+ "commit": sha,
+ "remote_url": remote_git_url,
+ "build_token": &self.build_token,
+ })).await?;
+ match self.recv().await {
+ Ok(Some(resp)) => {
+ if resp == serde_json::json!({
+ "status": "started"
+ }) {
+ eprintln!("resp: {:?}", resp);
+ Ok(Some(ClientJob {
+ job: job.clone(),
+ dbctx: Arc::clone(dbctx),
+ sha: sha.to_string(),
+ remote_git_url: remote_git_url.to_string(),
+ client: self
+ }))
+ } else {
+ Err("client rejected job".to_string())
+ }
+ }
+ Ok(None) => {
+ eprintln!("no more body chunks? client hung up?");
+ Ok(None)
+ }
+ Err(e) => {
+ eprintln!("e: {:?}", e);
+ Err(e)
+ }
+ }
+ }
+}
+
+impl fmt::Debug for RunnerClient {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ f.write_str("RunnerClient { .. }")
+ }
+}
+
+async fn handle_next_job(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClient>)>, job_resp: BodyStream) -> impl IntoResponse {
+ let (tx_sender, tx_receiver) = mpsc::channel(8);
+ let resp_body = StreamBody::new(ReceiverStream::new(tx_receiver));
+ tx_sender.send(Ok("hello".to_string())).await.expect("works");
+ let client = RunnerClient::new(tx_sender, job_resp).await;
+ match client {
+ Ok(client) => {
+ eprintln!("registering client");
+ match ctx.1.try_send(client) {
+ Ok(()) => {
+ eprintln!("response established...");
+ return (StatusCode::OK, resp_body);
+ }
+ Err(TrySendError::Full(client)) => {
+ return (StatusCode::IM_A_TEAPOT, resp_body);
+ }
+ Err(TrySendError::Closed(client)) => {
+ panic!("client holder is gone?");
+ }
+ }
}
- std::mem::drop(jobs);
- std::mem::drop(pending_query);
- if to_start.len() > 0 {
- println!("{} new jobs", to_start.len());
+ Err(e) => {
+ eprintln!("unable to register client");
+ return (StatusCode::MISDIRECTED_REQUEST, resp_body);
+ }
+ }
+}
+
+async fn make_api_server(dbctx: Arc<DbCtx>) -> (Router, mpsc::Receiver<RunnerClient>) {
+ let (pending_client_sender, pending_client_receiver) = mpsc::channel(8);
+
+ let router = Router::new()
+ .route("/api/next_job", post(handle_next_job))
+ .with_state((dbctx, pending_client_sender));
+ (router, pending_client_receiver)
+}
+
+#[tokio::main]
+async fn main() {
+ tracing_subscriber::fmt::init();
+ let config = RustlsConfig::from_pem_file(
+ PathBuf::from("/etc/letsencrypt/live/ci.butactuallyin.space/fullchain.pem"),
+ PathBuf::from("/etc/letsencrypt/live/ci.butactuallyin.space/privkey.pem"),
+ ).await.unwrap();
+
+ let dbctx = Arc::new(DbCtx::new("/root/ixi_ci_server/config", "/root/ixi_ci_server/state.db"));
+
+ let (api_server, mut channel) = make_api_server(Arc::clone(&dbctx)).await;
+ spawn(axum_server::bind_rustls("0.0.0.0:9876".parse().unwrap(), config)
+ .serve(api_server.into_make_service()));
+
+ dbctx.create_tables().unwrap();
+
+ loop {
+ let jobs = dbctx.get_pending_jobs().unwrap();
+
+ if jobs.len() > 0 {
+ println!("{} new jobs", jobs.len());
- for job in to_start.into_iter() {
- activate_job(&mut connection, job.0, job.1, job.2, job.3, job.4, job.5, job.6);
+ for job in jobs.into_iter() {
+ activate_job(Arc::clone(&dbctx), &job, &mut channel).await;
}
}
std::thread::sleep(std::time::Duration::from_millis(100));
diff --git a/src/ci_runner.rs b/src/ci_runner.rs
new file mode 100644
index 0000000..6554f05
--- /dev/null
+++ b/src/ci_runner.rs
@@ -0,0 +1,269 @@
+use std::time::Duration;
+use reqwest::{StatusCode, Response};
+use std::process::Command;
+use serde_derive::{Deserialize, Serialize};
+use serde::{Deserialize, de::DeserializeOwned, Serialize};
+
+#[derive(Debug)]
+enum WorkAcquireError {
+ Reqwest(reqwest::Error),
+ EarlyEof,
+ Protocol(String),
+}
+
+struct RunnerClient {
+ host: String,
+ tx: hyper::body::Sender,
+ rx: Response,
+ current_job: Option<RequestedJob>,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+struct RequestedJob {
+ commit: String,
+ remote_url: String,
+ build_token: String,
+}
+
+impl RequestedJob {
+ // TODO: panics if hyper finds the channel is closed. hum
+ async fn create_artifact(&self, client: &mut RunnerClient, name: &str, desc: &str) -> Result<ArtifactStream, String> {
+ let create_artifact_message = serde_json::json!({
+ "kind": "artifact_create",
+ "name": name,
+ "description": desc,
+ "job_token": &self.build_token,
+ });
+ client.send(create_artifact_message).await
+ .map_err(|e| format!("create artifact send error: {:?}", e))?;
+ let resp = client.recv().await
+ .map_err(|e| format!("create artifact recv error: {:?}", e))?;
+ eprintln!("resp: {:?}", resp);
+ let object_id = resp.unwrap()
+ .as_object().expect("is an object")
+ .get("object_id").unwrap().as_str().expect("is str")
+ .to_owned();
+ // POST to this object id...
+ Ok(ArtifactStream {
+ object_id,
+ })
+ }
+
+ async fn execute_goodfile(&self, client: &mut RunnerClient) -> Result<String, String> {
+ let clone_log = self.create_artifact(client, "git clone log", &format!("git clone {} tmpdir", &self.remote_url)).await.expect("works");
+
+ let clone_res = Command::new("git")
+ .arg("clone")
+ .arg(&self.remote_url)
+ .arg("tmpdir")
+ .status()
+ .map_err(|e| format!("failed to run git clone? {:?}", e))?;
+
+ if !clone_res.success() {
+ return Err(format!("git clone failed: {:?}", clone_res));
+ }
+
+ let checkout_log = self.create_artifact(client, "git checkout log", &format!("git checkout {}", &self.commit)).await.expect("works");
+
+ let checkout_res = Command::new("git")
+ .current_dir("tmpdir")
+ .arg("checkout")
+ .arg(&self.commit)
+ .status()
+ .map_err(|e| format!("failed to run git checkout? {:?}", e))?;
+
+ if !checkout_res.success() {
+ return Err(format!("git checkout failed: {:?}", checkout_res));
+ }
+
+ let build_log = self.create_artifact(client, "cargo build log", "cargo build").await.expect("works");
+
+ let build_res = Command::new("cargo")
+ .current_dir("tmpdir")
+ .arg("build")
+ .status()
+ .map_err(|e| format!("failed to run cargo build? {:?}", e))?;
+
+ if !build_res.success() {
+ return Err(format!("cargo build failed: {:?}", build_res));
+ }
+
+ let test_log = self.create_artifact(client, "cargo test log", "cargo test").await.expect("works");
+
+ let test_result = Command::new("cargo")
+ .current_dir("tmpdir")
+ .arg("test")
+ .status()
+ .map_err(|e| format!("failed to run cargo test? {:?}", e))?;
+
+ match test_result.code() {
+ Some(0) => Ok("pass".to_string()),
+ Some(n) => Ok(format!("error: {}", n)),
+ None => Ok(format!("abnormal exit")),
+ }
+ }
+}
+
+struct ArtifactStream {
+ object_id: String,
+}
+
+impl RunnerClient {
+ async fn new(host: &str, sender: hyper::body::Sender, mut res: Response) -> Result<Self, String> {
+ if res.status() != StatusCode::OK {
+ return Err(format!("server returned a bad response: {:?}, response itself: {:?}", res.status(), res));
+ }
+
+ let hello = res.chunk().await.expect("chunk");
+ if hello.as_ref().map(|x| &x[..]) != Some(b"hello") {
+ return Err(format!("bad hello: {:?}", hello));
+ }
+
+ Ok(Self {
+ host: host.to_string(),
+ tx: sender,
+ rx: res,
+ current_job: None,
+ })
+ }
+
+ async fn wait_for_work(&mut self) -> Result<Option<RequestedJob>, WorkAcquireError> {
+ match self.rx.chunk().await {
+ Ok(Some(chunk)) => {
+ eprintln!("got chunk: {:?}", &chunk);
+ serde_json::from_slice(&chunk)
+ .map(Option::Some)
+ .map_err(|e| {
+ WorkAcquireError::Protocol(format!("not json: {:?}", e))
+ })
+ }
+ Ok(None) => {
+ Ok(None)
+ },
+ Err(e) => {
+ Err(WorkAcquireError::Reqwest(e))
+ }
+ }
+ }
+
+ async fn recv(&mut self) -> Result<Option<serde_json::Value>, String> {
+ self.recv_typed().await
+ }
+
+ async fn recv_typed<T: DeserializeOwned>(&mut self) -> Result<Option<T>, String> {
+ match self.rx.chunk().await {
+ Ok(Some(chunk)) => {
+ serde_json::from_slice(&chunk)
+ .map(Option::Some)
+ .map_err(|e| {
+ format!("not json: {:?}", e)
+ })
+ },
+ Ok(None) => Ok(None),
+ Err(e) => {
+ Err(format!("error in recv: {:?}", e))
+ }
+ }
+ }
+
+ async fn send(&mut self, value: serde_json::Value) -> Result<(), String> {
+ self.tx.send_data(
+ serde_json::to_vec(&value)
+ .map_err(|e| format!("json error: {:?}", e))?
+ .into()
+ ).await
+ .map_err(|e| format!("send error: {:?}", e))
+ }
+
+ async fn run_job(&mut self, job: RequestedJob) {
+ self.send(serde_json::json!({
+ "status": "started"
+ })).await.unwrap();
+
+ std::fs::remove_dir_all("tmpdir").unwrap();
+ std::fs::create_dir("tmpdir").unwrap();
+
+ let res = job.execute_goodfile(self).await;
+
+ match res {
+ Ok(status) => {
+ self.send(serde_json::json!({
+ "kind": "job_status",
+ "state": "finished",
+ "result": status
+ })).await.unwrap();
+ }
+ Err(status) => {
+ self.send(serde_json::json!({
+ "kind": "job_status",
+ "state": "interrupted",
+ "result": status
+ })).await.unwrap();
+ }
+ }
+ }
+}
+
+#[tokio::main]
+async fn main() {
+ let secret = std::fs::read_to_string("./auth_secret").unwrap();
+ let client = reqwest::ClientBuilder::new()
+ .connect_timeout(Duration::from_millis(1000))
+ .timeout(Duration::from_millis(600000))
+ .build()
+ .expect("can build client");
+
+ loop {
+ let (mut sender, body) = hyper::Body::channel();
+ let poll = client.post("https://ci.butactuallyin.space:9876/api/next_job")
+ .header("user-agent", "ci-butactuallyin-space-runner")
+ .header("authorization", &secret)
+ .body(body)
+ .send()
+ .await;
+
+ match poll {
+ Ok(mut res) => {
+ let mut client = match RunnerClient::new("ci.butactuallyin.space:9876", sender, res).await {
+ Ok(client) => client,
+ Err(e) => {
+ eprintln!("failed to initialize client: {:?}", e);
+ std::thread::sleep(Duration::from_millis(10000));
+ continue;
+ }
+ };
+ let job = match client.wait_for_work().await {
+ Ok(Some(request)) => request,
+ Ok(None) => {
+ eprintln!("no work to do (yet)");
+ std::thread::sleep(Duration::from_millis(2000));
+ continue;
+ }
+ Err(e) => {
+ eprintln!("failed to get work: {:?}", e);
+ std::thread::sleep(Duration::from_millis(10000));
+ continue;
+ }
+ };
+ eprintln!("requested work: {:?}", job);
+
+ eprintln!("doing {:?}", job);
+ client.run_job(job).await;
+ std::thread::sleep(Duration::from_millis(10000));
+ },
+ Err(e) => {
+ let message = format!("{}", e);
+
+ if message.contains("tcp connect error") {
+ eprintln!("could not reach server. sleeping a bit and retrying.");
+ std::thread::sleep(Duration::from_millis(5000));
+ continue;
+ }
+
+ eprintln!("unhandled error: {}", message);
+
+ std::thread::sleep(Duration::from_millis(1000));
+ }
+ }
+ }
+}
diff --git a/src/dbctx.rs b/src/dbctx.rs
new file mode 100644
index 0000000..3af2d56
--- /dev/null
+++ b/src/dbctx.rs
@@ -0,0 +1,204 @@
+use std::sync::Mutex;
+use rusqlite::{Connection, OptionalExtension};
+use std::time::{SystemTime, UNIX_EPOCH};
+
+use crate::notifier::{RemoteNotifier, NotifierConfig};
+use crate::sql;
+
+pub struct DbCtx {
+ pub config_path: String,
+ // don't love this but.. for now...
+ pub conn: Mutex<Connection>,
+}
+
+#[derive(Debug, Clone)]
+pub struct PendingJob {
+ pub id: u64,
+ pub artifacts: Option<String>,
+ pub state: sql::JobState,
+ pub run_host: Option<String>,
+ pub remote_id: u64,
+ pub commit_id: u64,
+ pub created_time: u64,
+}
+
+impl DbCtx {
+ pub fn new(config_path: &str, db_path: &str) -> Self {
+ DbCtx {
+ config_path: config_path.to_owned(),
+ conn: Mutex::new(Connection::open(db_path).unwrap())
+ }
+ }
+
+ pub fn create_tables(&self) -> Result<(), ()> {
+ let conn = self.conn.lock().unwrap();
+ conn.execute(sql::CREATE_JOBS_TABLE, ()).unwrap();
+ conn.execute(sql::CREATE_COMMITS_TABLE, ()).unwrap();
+ conn.execute(sql::CREATE_REPOS_TABLE, ()).unwrap();
+ conn.execute(sql::CREATE_REPO_NAME_INDEX, ()).unwrap();
+ conn.execute(sql::CREATE_REMOTES_TABLE, ()).unwrap();
+ conn.execute(sql::CREATE_REMOTES_INDEX, ()).unwrap();
+
+ Ok(())
+ }
+
+ pub fn new_commit(&self, sha: &str) -> Result<u64, String> {
+ let conn = self.conn.lock().unwrap();
+ conn
+ .execute(
+ "insert into commits (sha) values (?1)",
+ [sha.clone()]
+ )
+ .expect("can insert");
+
+ Ok(conn.last_insert_rowid() as u64)
+ }
+
+ pub fn new_repo(&self, name: &str) -> Result<u64, String> {
+ let conn = self.conn.lock().unwrap();
+ conn
+ .execute(
+ "insert into repos (repo_name) values (?1)",
+ [name.clone()]
+ )
+ .map_err(|e| {
+ format!("{:?}", e)
+ })?;
+
+ Ok(conn.last_insert_rowid() as u64)
+ }
+
+ pub fn repo_id_by_remote(&self, remote_id: u64) -> Result<Option<u64>, String> {
+ self.conn.lock()
+ .unwrap()
+ .query_row("select repo_id from remotes where id=?1", [remote_id], |row| row.get(0))
+ .optional()
+ .map_err(|e| e.to_string())
+ }
+
+ pub fn repo_id_by_name(&self, repo_name: &str) -> Result<Option<u64>, String> {
+ self.conn.lock()
+ .unwrap()
+ .query_row("select id from repos where repo_name=?1", [repo_name], |row| row.get(0))
+ .optional()
+ .map_err(|e| e.to_string())
+ }
+
+ pub fn new_remote(&self, repo_id: u64, remote: &str, remote_kind: &str, config_path: &str) -> Result<u64, String> {
+ let (remote_path, remote_api, remote_url, remote_git_url) = match remote_kind {
+ "github" => {
+ (remote.to_owned(), remote_kind.to_owned(), format!("https://www.github.com/{}", remote), format!("https://www.github.com/{}.git", remote))
+ },
+ other => {
+ panic!("unsupported remote kind: {}", other);
+ }
+ };
+
+ let conn = self.conn.lock().unwrap();
+ conn
+ .execute(
+ "insert into remotes (repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path) values (?1, ?2, ?3, ?4, ?5, ?6);",
+ (repo_id, remote_path, remote_api, remote_url, remote_git_url, config_path)
+ )
+ .expect("can insert");
+
+ Ok(conn.last_insert_rowid() as u64)
+ }
+
+ pub fn new_job(&self, remote_id: u64, sha: &str) -> Result<u64, String> {
+ // TODO: potential race: if two remotes learn about a commit at the same time and we decide
+ // to create two jobs at the same time, this might return an incorrect id if the insert
+ // didn't actually insert a new row.
+ let commit_id = self.new_commit(sha).expect("can create commit record");
+
+ let created_time = SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .expect("now is before epoch")
+ .as_millis() as u64;
+
+ let conn = self.conn.lock().unwrap();
+
+ let rows_modified = conn.execute(
+ "insert into jobs (state, remote_id, commit_id, created_time) values (?1, ?2, ?3, ?4);",
+ (crate::sql::JobState::Pending as u64, remote_id, commit_id, created_time)
+ ).unwrap();
+
+ assert_eq!(1, rows_modified);
+
+ Ok(conn.last_insert_rowid() as u64)
+ }
+
+ pub fn get_pending_jobs(&self) -> Result<Vec<PendingJob>, String> {
+ let conn = self.conn.lock().unwrap();
+
+ let mut pending_query = conn.prepare(sql::PENDING_JOBS).unwrap();
+ let mut jobs = pending_query.query([]).unwrap();
+ let mut pending = Vec::new();
+
+ while let Some(row) = jobs.next().unwrap() {
+ let (id, artifacts, state, run_host, remote_id, commit_id, created_time) = row.try_into().unwrap();
+ let state: u8 = state;
+ pending.push(PendingJob {
+ id, artifacts,
+ state: state.try_into().unwrap(),
+ run_host, remote_id, commit_id, created_time
+ });
+ }
+
+ Ok(pending)
+ }
+
+ pub fn notifiers_by_repo(&self, repo_id: u64) -> Result<Vec<RemoteNotifier>, String> {
+ #[derive(Debug)]
+ #[allow(dead_code)]
+ struct Remote {
+ id: u64,
+ repo_id: u64,
+ remote_path: String,
+ remote_api: String,
+ notifier_config_path: String,
+ }
+
+ let mut remotes: Vec<Remote> = Vec::new();
+
+ let conn = self.conn.lock().unwrap();
+ let mut remotes_query = conn.prepare(crate::sql::REMOTES_FOR_REPO).unwrap();
+ let mut remote_results = remotes_query.query([repo_id]).unwrap();
+
+ while let Some(row) = remote_results.next().unwrap() {
+ let (id, repo_id, remote_path, remote_api, remote_url, remote_git_url, notifier_config_path) = row.try_into().unwrap();
+ let _: String = remote_url;
+ let _: String = remote_git_url;
+ remotes.push(Remote { id, repo_id, remote_path, remote_api, notifier_config_path });
+ }
+
+ let mut notifiers: Vec<RemoteNotifier> = Vec::new();
+
+ for remote in remotes.into_iter() {
+ match remote.remote_api.as_str() {
+ "github" => {
+ let notifier = RemoteNotifier {
+ remote_path: remote.remote_path,
+ notifier: NotifierConfig::github_from_file(&format!("{}/{}", self.config_path, remote.notifier_config_path))
+ .expect("can load notifier config")
+ };
+ notifiers.push(notifier);
+ },
+ "email" => {
+ let notifier = RemoteNotifier {
+ remote_path: remote.remote_path,
+ notifier: NotifierConfig::email_from_file(&format!("{}/{}", self.config_path, remote.notifier_config_path))
+ .expect("can load notifier config")
+ };
+ notifiers.push(notifier);
+ }
+ other => {
+ eprintln!("unknown remote api kind: {:?}, remote is {:?}", other, &remote)
+ }
+ }
+ }
+
+ Ok(notifiers)
+ }
+}
+
diff --git a/src/main.rs b/src/main.rs
index 1b6d9e8..56aca01 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -3,11 +3,10 @@
use tokio::spawn;
use std::path::PathBuf;
-use serde_derive::{Deserialize, Serialize};
use axum_server::tls_rustls::RustlsConfig;
use axum::routing::*;
use axum::Router;
-use axum::response::{IntoResponse, Response};
+use axum::response::{IntoResponse, Response, Html};
use std::net::SocketAddr;
use axum::extract::{Path, State};
use http_body::combinators::UnsyncBoxBody;
@@ -18,15 +17,22 @@ use axum::http::{StatusCode, Uri};
use http::header::HeaderMap;
use std::sync::Arc;
-use std::sync::Mutex;
use std::time::{SystemTime, UNIX_EPOCH};
use hmac::{Hmac, Mac};
use sha2::Sha256;
mod sql;
+mod notifier;
+mod dbctx;
-use rusqlite::{Connection, OptionalExtension};
+use sql::JobState;
+
+use dbctx::DbCtx;
+
+use rusqlite::OptionalExtension;
+
+const PSKS: &'static [&'static [u8]] = &[];
#[derive(Copy, Clone, Debug)]
enum GithubHookError {
@@ -96,14 +102,22 @@ async fn process_push_event(ctx: Arc<DbCtx>, owner: String, repo: String, event:
return (StatusCode::OK, String::new());
}
- let remote_url = format!("https://github.com/{}.git", repo);
- let (remote_id, repo_id): (u64, u64) = ctx.conn.lock().unwrap()
+ let remote_url = format!("https://www.github.com/{}.git", repo);
+ eprintln!("looking for remote url: {}", remote_url);
+ let (remote_id, repo_id): (u64, u64) = match ctx.conn.lock().unwrap()
.query_row("select id, repo_id from remotes where remote_git_url=?1;", [&remote_url], |row| Ok((row.get(0).unwrap(), row.get(1).unwrap())))
- .unwrap();
+ .optional()
+ .unwrap() {
+ Some(elems) => elems,
+ None => {
+ eprintln!("no remote registered for url {} (repo {})", remote_url, repo);
+ return (StatusCode::NOT_FOUND, String::new());
+ }
+ };
let job_id = ctx.new_job(remote_id, &sha).unwrap();
- let notifiers = ctx.notifiers_by_name(&repo).expect("can get notifiers");
+ let notifiers = ctx.notifiers_by_repo(repo_id).expect("can get notifiers");
for notifier in notifiers {
notifier.tell_pending_job(&ctx, repo_id, &sha, job_id).await.expect("can notify");
@@ -134,11 +148,78 @@ async fn handle_github_event(ctx: Arc<DbCtx>, owner: String, repo: String, event
async fn handle_commit_status(Path(path): Path<(String, String, String)>, State(ctx): State<Arc<DbCtx>>) -> impl IntoResponse {
eprintln!("path: {}/{}, sha {}", path.0, path.1, path.2);
+ let remote_path = format!("{}/{}", path.0, path.1);
+ let sha = path.2;
+
+ let commit_id: Option<u64> = ctx.conn.lock().unwrap()
+ .query_row("select id from commits where sha=?1;", [&sha], |row| row.get(0))
+ .optional()
+ .expect("can query");
+
+ let commit_id: u64 = match commit_id {
+ Some(commit_id) => {
+ commit_id
+ },
+ None => {
+ return (StatusCode::NOT_FOUND, Html("<html><body>no such commit</body></html>".to_string()));
+ }
+ };
+
+ let (remote_id, repo_id): (u64, u64) = ctx.conn.lock().unwrap()
+ .query_row("select id, repo_id from remotes where remote_path=?1;", [&remote_path], |row| Ok((row.get_unwrap(0), row.get_unwrap(1))))
+ .expect("can query");
+
+ let (job_id, state): (u64, u8) = ctx.conn.lock().unwrap()
+ .query_row("select id, state from jobs where commit_id=?1;", [commit_id], |row| Ok((row.get_unwrap(0), row.get_unwrap(1))))
+ .expect("can query");
+
+ let state: sql::JobState = unsafe { std::mem::transmute(state) };
+
+ let repo_name: String = ctx.conn.lock().unwrap()
+ .query_row("select repo_name from repos where id=?1;", [repo_id], |row| row.get(0))
+ .expect("can query");
+
+ let deployed = false;
+
let time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("now is before epoch");
- format!("requested: {:?}", path)
+ let resp = format!("\
+ <html>\n\
+ <head>\n\
+ <title>ci.butactuallyin.space - {}</title>\n\
+ </head>\n\
+ <body>\n\
+ <pre>\n\
+ repo: {}\n\
+ commit: <a href='https://www.github.com/{}/commit/{}'>{}</a>\n \
+ status: {}\n \
+ deployed: {}\n\
+ </pre>\n\
+ </body>\n\
+ </html>\n",
+ repo_name,
+ repo_name,
+ &remote_path, &sha, &sha,
+ match state {
+ JobState::Pending | JobState::Started => {
+ "<span style='color:#660;'>pending</span>"
+ },
+ JobState::Complete => {
+ "<span style='color:green;'>pass</span>"
+ },
+ JobState::Error => {
+ "<span style='color:red;'>pass</span>"
+ }
+ JobState::Invalid => {
+ "<span style='color:red;'>(server error)</span>"
+ }
+ },
+ deployed,
+ );
+
+ (StatusCode::OK, Html(resp))
}
async fn handle_repo_event(Path(path): Path<(String, String)>, headers: HeaderMap, State(ctx): State<Arc<DbCtx>>, body: Bytes) -> impl IntoResponse {
@@ -161,17 +242,24 @@ async fn handle_repo_event(Path(path): Path<(String, String)>, headers: HeaderMa
}
};
- let mut mac = Hmac::<Sha256>::new_from_slice(GITHUB_PSK)
- .expect("hmac can be constructed");
- mac.update(&body);
- let result = mac.finalize().into_bytes().to_vec();
-
- // hack: skip sha256=
- let decoded = hex::decode(&sent_hmac[7..]).expect("provided hmac is valid hex");
- if decoded != result {
- eprintln!("bad hmac:\n\
- got: {:?}\n\
- expected: {:?}", decoded, result);
+ let mut hmac_ok = false;
+
+ for psk in PSKS.iter() {
+ let mut mac = Hmac::<Sha256>::new_from_slice(psk)
+ .expect("hmac can be constructed");
+ mac.update(&body);
+ let result = mac.finalize().into_bytes().to_vec();
+
+ // hack: skip sha256=
+ let decoded = hex::decode(&sent_hmac[7..]).expect("provided hmac is valid hex");
+ if decoded == result {
+ hmac_ok = true;
+ break;
+ }
+ }
+
+ if !hmac_ok {
+ eprintln!("bad hmac by all psks");
return (StatusCode::BAD_REQUEST, "").into_response();
}
@@ -186,213 +274,8 @@ async fn handle_repo_event(Path(path): Path<(String, String)>, headers: HeaderMa
handle_github_event(ctx, path.0, path.1, kind, payload).await
}
-struct DbCtx {
- conn: Mutex<Connection>,
-}
-
-struct RemoteNotifier {
- remote_path: String,
- notifier: NotifierConfig,
-}
-
-#[derive(Serialize, Deserialize)]
-#[serde(untagged)]
-enum NotifierConfig {
- GitHub {
- token: String,
- },
- Email {
- username: String,
- password: String,
- mailserver: String,
- from: String,
- to: String,
- }
-}
-
-impl NotifierConfig {
- fn github_from_file(path: &str) -> Result<Self, String> {
- let bytes = std::fs::read(path)
- .map_err(|e| format!("can't read notifier config at {}: {:?}", path, e))?;
- let config = serde_json::from_slice(&bytes)
- .map_err(|e| format!("can't deserialize notifier config at {}: {:?}", path, e))?;
-
- if matches!(config, NotifierConfig::GitHub { .. }) {
- Ok(config)
- } else {
- Err(format!("config at {} doesn't look like a github config (but was otherwise valid?)", path))
- }
- }
-
- fn email_from_file(path: &str) -> Result<Self, String> {
- let bytes = std::fs::read(path)
- .map_err(|e| format!("can't read notifier config at {}: {:?}", path, e))?;
- let config = serde_json::from_slice(&bytes)
- .map_err(|e| format!("can't deserialize notifier config at {}: {:?}", path, e))?;
-
- if matches!(config, NotifierConfig::Email { .. }) {
- Ok(config)
- } else {
- Err(format!("config at {} doesn't look like an email config (but was otherwise valid?)", path))
- }
- }
-}
-
-impl RemoteNotifier {
- async fn tell_pending_job(&self, ctx: &Arc<DbCtx>, repo_id: u64, sha: &str, job_id: u64) -> Result<(), String> {
- match &self.notifier {
- NotifierConfig::GitHub { token } => {
- let status_info = serde_json::json!({
- "state": "pending",
- "target_url": format!(
- "https://{}/{}/{}",
- "ci.butactuallyin.space",
- &self.remote_path,
- sha,
- ),
- "description": "build is queued",
- "context": "actuallyinspace runner",
- });
-
- // TODO: should pool (probably in ctx?) to have an upper bound in concurrent
- // connections.
- let client = reqwest::Client::new();
- let res = client.post(&format!("https://api.github.com/repos/{}/statuses/{}", &self.remote_path, sha))
- .body(serde_json::to_string(&status_info).expect("can stringify json"))
- .header("authorization", format!("Bearer {}", token))
- .header("accept", "application/vnd.github+json")
- .send()
- .await;
-
- match res {
- Ok(res) => {
- if res.status() == StatusCode::OK {
- Ok(())
- } else {
- Err(format!("bad response: {}, response data: {:?}", res.status().as_u16(), res))
- }
- }
- Err(e) => {
- Err(format!("failure sending request: {:?}", e))
- }
- }
- }
- NotifierConfig::Email { username, password, mailserver, from, to } => {
- panic!("should send an email saying that a job is now pending for `sha`")
- }
- }
- }
-}
-
-impl DbCtx {
- fn new(db_path: &'static str) -> Self {
- DbCtx {
- conn: Mutex::new(Connection::open(db_path).unwrap())
- }
- }
-
- fn new_commit(&self, sha: &str) -> Result<u64, String> {
- let conn = self.conn.lock().unwrap();
- conn
- .execute(
- "insert into commits (sha) values (?1)",
- [sha.clone()]
- )
- .expect("can insert");
-
- Ok(conn.last_insert_rowid() as u64)
- }
-
- fn new_job(&self, remote_id: u64, sha: &str) -> Result<u64, String> {
- // TODO: potential race: if two remotes learn about a commit at the same time and we decide
- // to create two jobs at the same time, this might return an incorrect id if the insert
- // didn't actually insert a new row.
- let commit_id = self.new_commit(sha).expect("can create commit record");
-
- let created_time = SystemTime::now()
- .duration_since(UNIX_EPOCH)
- .expect("now is before epoch")
- .as_millis() as u64;
-
- let conn = self.conn.lock().unwrap();
-
- let rows_modified = conn.execute(
- "insert into jobs (state, remote_id, commit_id, created_time) values (?1, ?2, ?3, ?4);",
- (sql::JobState::Pending as u64, remote_id, commit_id, created_time)
- ).unwrap();
-
- assert_eq!(1, rows_modified);
-
- Ok(conn.last_insert_rowid() as u64)
- }
-
- fn notifiers_by_name(&self, repo: &str) -> Result<Vec<RemoteNotifier>, String> {
- let maybe_repo_id: Option<u64> = self.conn.lock()
- .unwrap()
- .query_row("select * from repos where repo_name=?1", [repo], |row| row.get(0))
- .optional()
- .expect("query succeeds");
- match maybe_repo_id {
- Some(repo_id) => {
- // get remotes
-
- #[derive(Debug)]
- #[allow(dead_code)]
- struct Remote {
- id: u64,
- repo_id: u64,
- remote_path: String,
- remote_api: String,
- notifier_config_path: String,
- }
-
- let mut remotes: Vec<Remote> = Vec::new();
-
- let conn = self.conn.lock().unwrap();
- let mut remotes_query = conn.prepare(sql::REMOTES_FOR_REPO).unwrap();
- let mut remote_results = remotes_query.query([repo_id]).unwrap();
-
- while let Some(row) = remote_results.next().unwrap() {
- let (id, repo_id, remote_path, remote_api, notifier_config_path) = row.try_into().unwrap();
- remotes.push(Remote { id, repo_id, remote_path, remote_api, notifier_config_path });
- }
-
- let mut notifiers: Vec<RemoteNotifier> = Vec::new();
-
- for remote in remotes.into_iter() {
- match remote.remote_api.as_str() {
- "github" => {
- let notifier = RemoteNotifier {
- remote_path: remote.remote_path,
- notifier: NotifierConfig::github_from_file(&remote.notifier_config_path)
- .expect("can load notifier config")
- };
- notifiers.push(notifier);
- },
- "email" => {
- let notifier = RemoteNotifier {
- remote_path: remote.remote_path,
- notifier: NotifierConfig::email_from_file(&remote.notifier_config_path)
- .expect("can load notifier config")
- };
- notifiers.push(notifier);
- }
- other => {
- eprintln!("unknown remote api kind: {:?}, remote is {:?}", other, &remote)
- }
- }
- }
-
- Ok(notifiers)
- }
- None => {
- return Err(format!("repo '{}' is not known", repo));
- }
- }
- }
-}
-async fn make_app_server(db_path: &'static str) -> Router {
+async fn make_app_server(cfg_path: &'static str, db_path: &'static str) -> Router {
/*
// GET /hello/warp => 200 OK with body "Hello, warp!"
@@ -457,7 +340,7 @@ async fn make_app_server(db_path: &'static str) -> Router {
.route("/:owner/:repo/:sha", get(handle_commit_status))
.route("/:owner/:repo", post(handle_repo_event))
.fallback(fallback_get)
- .with_state(Arc::new(DbCtx::new(db_path)))
+ .with_state(Arc::new(DbCtx::new(cfg_path, db_path)))
}
#[tokio::main]
@@ -468,9 +351,9 @@ async fn main() {
PathBuf::from("/etc/letsencrypt/live/ci.butactuallyin.space/privkey.pem"),
).await.unwrap();
spawn(axum_server::bind_rustls("127.0.0.1:8080".parse().unwrap(), config.clone())
- .serve(make_app_server("/root/ixi_ci_server/state.db").await.into_make_service()));
+ .serve(make_app_server("/root/ixi_ci_server/config", "/root/ixi_ci_server/state.db").await.into_make_service()));
axum_server::bind_rustls("0.0.0.0:443".parse().unwrap(), config)
- .serve(make_app_server("/root/ixi_ci_server/state.db").await.into_make_service())
+ .serve(make_app_server("/root/ixi_ci_server/config", "/root/ixi_ci_server/state.db").await.into_make_service())
.await
.unwrap();
}
diff --git a/src/notifier.rs b/src/notifier.rs
new file mode 100644
index 0000000..4ddc91b
--- /dev/null
+++ b/src/notifier.rs
@@ -0,0 +1,126 @@
+use serde_derive::{Deserialize, Serialize};
+use std::sync::Arc;
+use axum::http::StatusCode;
+
+use crate::DbCtx;
+
+pub struct RemoteNotifier {
+ pub remote_path: String,
+ pub notifier: NotifierConfig,
+}
+
+#[derive(Serialize, Deserialize)]
+#[serde(untagged)]
+pub enum NotifierConfig {
+ GitHub {
+ token: String,
+ },
+ Email {
+ username: String,
+ password: String,
+ mailserver: String,
+ from: String,
+ to: String,
+ }
+}
+
+impl NotifierConfig {
+ pub fn github_from_file(path: &str) -> Result<Self, String> {
+ let bytes = std::fs::read(path)
+ .map_err(|e| format!("can't read notifier config at {}: {:?}", path, e))?;
+ let config = serde_json::from_slice(&bytes)
+ .map_err(|e| format!("can't deserialize notifier config at {}: {:?}", path, e))?;
+
+ if matches!(config, NotifierConfig::GitHub { .. }) {
+ Ok(config)
+ } else {
+ Err(format!("config at {} doesn't look like a github config (but was otherwise valid?)", path))
+ }
+ }
+
+ pub fn email_from_file(path: &str) -> Result<Self, String> {
+ let bytes = std::fs::read(path)
+ .map_err(|e| format!("can't read notifier config at {}: {:?}", path, e))?;
+ let config = serde_json::from_slice(&bytes)
+ .map_err(|e| format!("can't deserialize notifier config at {}: {:?}", path, e))?;
+
+ if matches!(config, NotifierConfig::Email { .. }) {
+ Ok(config)
+ } else {
+ Err(format!("config at {} doesn't look like an email config (but was otherwise valid?)", path))
+ }
+ }
+}
+
+impl RemoteNotifier {
+ pub async fn tell_pending_job(&self, ctx: &Arc<DbCtx>, repo_id: u64, sha: &str, job_id: u64) -> Result<(), String> {
+ self.tell_job_status(
+ ctx,
+ repo_id, sha, job_id,
+ "pending", "build is queued", &format!("https://{}/{}/{}", "ci.butactuallyin.space", &self.remote_path, sha)
+ ).await
+ }
+
+ pub async fn tell_complete_job(&self, ctx: &Arc<DbCtx>, repo_id: u64, sha: &str, job_id: u64, desc: Result<String, String>) -> Result<(), String> {
+ match desc {
+ Ok(status) => {
+ self.tell_job_status(
+ ctx,
+ repo_id, sha, job_id,
+ "success", &status, &format!("https://{}/{}/{}", "ci.butactuallyin.space", &self.remote_path, sha)
+ ).await
+ },
+ Err(status) => {
+ self.tell_job_status(
+ ctx,
+ repo_id, sha, job_id,
+ "failure", &status, &format!("https://{}/{}/{}", "ci.butactuallyin.space", &self.remote_path, sha)
+ ).await
+ }
+ }
+ }
+
+ pub async fn tell_job_status(&self, ctx: &Arc<DbCtx>, repo_id: u64, sha: &str, job_id: u64, state: &str, desc: &str, target_url: &str) -> Result<(), String> {
+ match &self.notifier {
+ NotifierConfig::GitHub { token } => {
+ let status_info = serde_json::json!({
+ "state": state,
+ "description": desc,
+ "target_url": target_url,
+ "context": "actuallyinspace runner",
+ });
+
+ // TODO: should pool (probably in ctx?) to have an upper bound in concurrent
+ // connections.
+ let client = reqwest::Client::new();
+ let req = client.post(&format!("https://api.github.com/repos/{}/statuses/{}", &self.remote_path, sha))
+ .body(serde_json::to_string(&status_info).expect("can stringify json"))
+ .header("content-type", "application/json")
+ .header("user-agent", "iximeow")
+ .header("authorization", format!("Bearer {}", token))
+ .header("accept", "application/vnd.github+json");
+ eprintln!("sending {:?}", req);
+ eprintln!(" body: {}", serde_json::to_string(&status_info).expect("can stringify json"));
+ let res = req
+ .send()
+ .await;
+
+ match res {
+ Ok(res) => {
+ if res.status() == StatusCode::OK || res.status() == StatusCode::CREATED{
+ Ok(())
+ } else {
+ Err(format!("bad response: {}, response data: {:?}", res.status().as_u16(), res))
+ }
+ }
+ Err(e) => {
+ Err(format!("failure sending request: {:?}", e))
+ }
+ }
+ }
+ NotifierConfig::Email { username, password, mailserver, from, to } => {
+ panic!("should send an email saying that a job is now pending for `sha`")
+ }
+ }
+ }
+}
diff --git a/src/sql.rs b/src/sql.rs
index ee334c1..80eb18a 100644
--- a/src/sql.rs
+++ b/src/sql.rs
@@ -1,10 +1,29 @@
#![allow(dead_code)]
+use std::convert::TryFrom;
+
+#[derive(Debug, Clone)]
pub enum JobState {
Pending = 0,
Started = 1,
Complete = 2,
Error = 3,
+ Invalid = 4,
+}
+
+impl TryFrom<u8> for JobState {
+ type Error = String;
+
+ fn try_from(value: u8) -> Result<Self, String> {
+ match value {
+ 0 => Ok(JobState::Pending),
+ 1 => Ok(JobState::Started),
+ 2 => Ok(JobState::Complete),
+ 3 => Ok(JobState::Error),
+ 4 => Ok(JobState::Invalid),
+ other => Err(format!("invalid job state: {}", other)),
+ }
+ }
}
// remote_id is the remote from which we were notified. this is necessary so we know which remote
@@ -14,6 +33,7 @@ pub const CREATE_JOBS_TABLE: &'static str = "\
artifacts_path TEXT,
state INTEGER NOT NULL,
run_host TEXT,
+ build_token TEXT,
remote_id INTEGER,
commit_id INTEGER,
created_time INTEGER,
@@ -45,6 +65,9 @@ pub const CREATE_REMOTES_TABLE: &'static str = "\
pub const CREATE_REMOTES_INDEX: &'static str = "\
CREATE INDEX IF NOT EXISTS 'repo_to_remote' ON remotes(repo_id);";
+pub const CREATE_REPO_NAME_INDEX: &'static str = "\
+ CREATE UNIQUE INDEX IF NOT EXISTS 'repo_names' ON repos(repo_name);";
+
pub const PENDING_JOBS: &'static str = "\
select * from jobs where state=0;";