summaryrefslogtreecommitdiff
path: root/src/ci_driver.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/ci_driver.rs')
-rw-r--r--src/ci_driver.rs124
1 files changed, 48 insertions, 76 deletions
diff --git a/src/ci_driver.rs b/src/ci_driver.rs
index 0f629e7..25928ae 100644
--- a/src/ci_driver.rs
+++ b/src/ci_driver.rs
@@ -28,17 +28,17 @@ mod sql;
mod notifier;
mod io;
-use crate::dbctx::{DbCtx, PendingJob};
+use crate::dbctx::{DbCtx, PendingRun, Job};
use crate::sql::JobResult;
-use crate::sql::JobState;
+use crate::sql::RunState;
lazy_static! {
static ref AUTH_SECRET: RwLock<Option<String>> = RwLock::new(None);
}
-fn reserve_artifacts_dir(job: u64) -> std::io::Result<PathBuf> {
- let mut path: PathBuf = "/root/ixi_ci_server/jobs/".into();
- path.push(job.to_string());
+fn reserve_artifacts_dir(run: u64) -> std::io::Result<PathBuf> {
+ let mut path: PathBuf = "/root/ixi_ci_server/artifacts/".into();
+ path.push(run.to_string());
match std::fs::create_dir(&path) {
Ok(()) => {
Ok(path)
@@ -53,9 +53,12 @@ fn reserve_artifacts_dir(job: u64) -> std::io::Result<PathBuf> {
}
}
-async fn activate_job(dbctx: Arc<DbCtx>, job: &PendingJob, clients: &mut mpsc::Receiver<RunnerClient>) -> Result<(), String> {
- eprintln!("activating job {:?}", job);
+async fn activate_run(dbctx: Arc<DbCtx>, run: &PendingRun, clients: &mut mpsc::Receiver<RunnerClient>) -> Result<(), String> {
+ eprintln!("activating task {:?}", run);
+ let job = dbctx.job_by_id(run.job_id).expect("can query").expect("job exists");
+
let connection = dbctx.conn.lock().unwrap();
+
let (repo_id, remote_git_url): (u64, String) = connection
.query_row("select repo_id, remote_git_url from remotes where id=?1", [job.remote_id], |row| Ok((row.get_unwrap(0), row.get_unwrap(1))))
.expect("query succeeds");
@@ -73,52 +76,20 @@ async fn activate_job(dbctx: Arc<DbCtx>, job: &PendingJob, clients: &mut mpsc::R
.expect("can run query");
std::mem::drop(connection);
- let artifacts: PathBuf = match &job.artifacts {
- Some(artifacts) => PathBuf::from(artifacts),
- None => reserve_artifacts_dir(job.id).expect("can reserve a directory for artifacts")
- };
- /*
-
- if job.run_host.as_ref() == None {
- eprintln!("need to find a host to run the job");
- }
-
- eprintln!("cloning {}", remote_git_url);
- let mut repo_dir = artifacts.clone();
- repo_dir.push("repo");
- eprintln!(" ... into {}", repo_dir.display());
-
- Command::new("git")
- .arg("clone")
- .arg(&remote_git_url)
- .arg(&format!("{}", repo_dir.display()))
- .status()
- .expect("can clone the repo");
-
- eprintln!("checking out {}", commit_sha);
- Command::new("git")
- .current_dir(&repo_dir)
- .arg("checkout")
- .arg(&commit_sha)
- .status()
- .expect("can checkout hash");
- */
+ let artifacts: PathBuf = reserve_artifacts_dir(run.id).expect("can reserve a directory for artifacts");
eprintln!("running {}", repo_name);
- /*
- * find the CI script, figure out how to run it
- */
let mut client_job = loop {
let mut candidate = clients.recv().await
.ok_or_else(|| "client channel disconnected".to_string())?;
- if !candidate.will_accept(job) {
+ if !candidate.will_accept(&job) {
eprintln!("client {:?} would not accept job {:?}", candidate, job);
continue;
}
- if let Ok(Some(mut client_job)) = candidate.submit(&dbctx, &job, &remote_git_url, &commit_sha).await {
+ if let Ok(Some(mut client_job)) = candidate.submit(&dbctx, &run, &remote_git_url, &commit_sha).await {
break client_job;
} else {
// failed to submit job, move on for now
@@ -168,8 +139,8 @@ struct ClientJob {
dbctx: Arc<DbCtx>,
remote_git_url: String,
sha: String,
- job: PendingJob,
- client: RunnerClient
+ task: PendingRun,
+ client: RunnerClient
}
impl ClientJob {
@@ -179,49 +150,50 @@ impl ClientJob {
let msg = match self.client.recv().await.expect("recv works") {
Some(msg) => msg,
None => {
- eprintln!("client hung up. job's done, i hope?");
+ eprintln!("client hung up. task's done, i hope?");
return;
}
};
eprintln!("got {:?}", msg);
let msg_kind = msg.as_object().unwrap().get("kind").unwrap().as_str().unwrap();
match msg_kind {
- "new_job_please" => {
- eprintln!("misdirected job request (after handshake?)");
+ "new_task_please" => {
+ eprintln!("misdirected task request (after handshake?)");
return;
},
- "job_status" => {
+ "task_status" => {
let state = msg.as_object().unwrap().get("state").unwrap().as_str().unwrap();
- let (result, state): (Result<String, String>, JobState) = if state == "finished" {
+ let (result, state): (Result<String, String>, RunState) = if state == "finished" {
let result = msg.as_object().unwrap().get("result").unwrap().as_str().unwrap();
- eprintln!("job update: state is {} and result is {}", state, result);
+ eprintln!("task update: state is {} and result is {}", state, result);
match result {
"pass" => {
- (Ok("success".to_string()), JobState::Finished)
+ (Ok("success".to_string()), RunState::Finished)
},
other => {
let desc = msg.as_object().unwrap().get("desc")
.map(|x| x.as_str().unwrap().to_string())
.unwrap_or_else(|| other.to_string());
- (Err(desc), JobState::Error)
+ (Err(desc), RunState::Error)
}
}
} else if state == "interrupted" {
let result = msg.as_object().unwrap().get("result").unwrap().as_str().unwrap();
- eprintln!("job update: state is {} and result is {}", state, result);
+ eprintln!("task update: state is {} and result is {}", state, result);
let desc = msg.as_object().unwrap().get("desc")
.map(|x| x.as_str().unwrap().to_string())
.unwrap_or_else(|| result.to_string());
- (Err(desc), JobState::Error)
+ (Err(desc), RunState::Error)
} else {
- eprintln!("job update: state is {}", state);
- (Err(format!("atypical completion status: {}", state)), JobState::Invalid)
+ eprintln!("task update: state is {}", state);
+ (Err(format!("atypical completion status: {}", state)), RunState::Invalid)
};
- let repo_id = self.dbctx.repo_id_by_remote(self.job.remote_id).unwrap().expect("remote exists");
+ let job = self.dbctx.job_by_id(self.task.job_id).expect("can query").expect("job exists");
+ let repo_id = self.dbctx.repo_id_by_remote(job.remote_id).unwrap().expect("remote exists");
for notifier in self.dbctx.notifiers_by_repo(repo_id).expect("can get notifiers") {
- if let Err(e) = notifier.tell_complete_job(&self.dbctx, repo_id, &self.sha, self.job.id, result.clone()).await {
+ if let Err(e) = notifier.tell_complete_job(&self.dbctx, repo_id, &self.sha, self.task.id, result.clone()).await {
eprintln!("could not notify {:?}: {:?}", notifier.remote_path, e);
}
}
@@ -242,8 +214,8 @@ impl ClientJob {
};
self.dbctx.conn.lock().unwrap().execute(
- "update jobs set complete_time=?1, state=?2, build_result=?3, final_status=?4 where id=?5",
- (now as u64, state as u64, build_result as u8, result_desc, self.job.id)
+ "update runs set complete_time=?1, state=?2, build_result=?3, final_status=?4 where id=?5",
+ (now as u64, state as u64, build_result as u8, result_desc, self.task.id)
)
.expect("can update");
}
@@ -258,7 +230,7 @@ impl ClientJob {
let name = msg.as_object().unwrap().get("name").unwrap().as_str().unwrap();
let value = msg.as_object().unwrap().get("value").unwrap().as_str().unwrap();
- self.dbctx.insert_metric(self.job.id, name, value)
+ self.dbctx.insert_metric(self.task.id, name, value)
.expect("TODO handle metric insert error?");
}
"command" => {
@@ -311,7 +283,7 @@ impl RunnerClient {
}
// is this client willing to run the job based on what it has told us so far?
- fn will_accept(&self, job: &PendingJob) -> bool {
+ fn will_accept(&self, job: &Job) -> bool {
match (job.source.as_ref(), self.accepted_sources.as_ref()) {
(_, None) => true,
(None, Some(_)) => false,
@@ -321,7 +293,7 @@ impl RunnerClient {
}
}
- async fn submit(mut self, dbctx: &Arc<DbCtx>, job: &PendingJob, remote_git_url: &str, sha: &str) -> Result<Option<ClientJob>, String> {
+ async fn submit(mut self, dbctx: &Arc<DbCtx>, job: &PendingRun, remote_git_url: &str, sha: &str) -> Result<Option<ClientJob>, String> {
self.send(serde_json::json!({
"commit": sha,
"remote_url": remote_git_url,
@@ -334,7 +306,7 @@ impl RunnerClient {
}) {
eprintln!("resp: {:?}", resp);
Ok(Some(ClientJob {
- job: job.clone(),
+ task: job.clone(),
dbctx: Arc::clone(dbctx),
sha: sha.to_string(),
remote_git_url: remote_git_url.to_string(),
@@ -364,18 +336,18 @@ impl fmt::Debug for RunnerClient {
#[axum_macros::debug_handler]
async fn handle_artifact(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClient>)>, headers: HeaderMap, artifact_content: BodyStream) -> impl IntoResponse {
eprintln!("artifact request");
- let job_token = match headers.get("x-job-token") {
- Some(job_token) => job_token.to_str().expect("valid string"),
+ let run_token = match headers.get("x-task-token") {
+ Some(run_token) => run_token.to_str().expect("valid string"),
None => {
- eprintln!("bad artifact post: headers: {:?}\nno x-job-token", headers);
+ eprintln!("bad artifact post: headers: {:?}\nno x-tasak-token", headers);
return (StatusCode::BAD_REQUEST, "").into_response();
}
};
- let (job, artifact_path, token_validity) = match ctx.0.job_for_token(&job_token).unwrap() {
+ let (run, artifact_path, token_validity) = match ctx.0.run_for_token(&run_token).unwrap() {
Some(result) => result,
None => {
- eprintln!("bad artifact post: headers: {:?}\njob token is not known", headers);
+ eprintln!("bad artifact post: headers: {:?}\nrun token is not known", headers);
return (StatusCode::BAD_REQUEST, "").into_response();
}
};
@@ -408,7 +380,7 @@ async fn handle_artifact(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClien
}
};
- let mut artifact = match ctx.0.reserve_artifact(job, artifact_name, artifact_desc).await {
+ let mut artifact = match ctx.0.reserve_artifact(run, artifact_name, artifact_desc).await {
Ok(artifact) => artifact,
Err(err) => {
eprintln!("failure to reserve artifact: {:?}", err);
@@ -442,7 +414,7 @@ async fn handle_next_job(State(ctx): State<(Arc<DbCtx>, mpsc::Sender<RunnerClien
}
}
None => {
- eprintln!("bad artifact post: headers: {:?}\nno x-job-token", headers);
+ eprintln!("bad artifact post: headers: {:?}\nno authorization", headers);
return (StatusCode::BAD_REQUEST, "").into_response();
}
};
@@ -533,13 +505,13 @@ async fn main() {
dbctx.create_tables().unwrap();
loop {
- let jobs = dbctx.get_pending_jobs().unwrap();
+ let runs = dbctx.get_pending_runs().unwrap();
- if jobs.len() > 0 {
- println!("{} new jobs", jobs.len());
+ if runs.len() > 0 {
+ println!("{} new runs", runs.len());
- for job in jobs.into_iter() {
- activate_job(Arc::clone(&dbctx), &job, &mut channel).await;
+ for run in runs.into_iter() {
+ activate_run(Arc::clone(&dbctx), &run, &mut channel).await;
}
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;