summaryrefslogtreecommitdiff
path: root/src/ci_driver.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/ci_driver.rs')
-rw-r--r--src/ci_driver.rs86
1 files changed, 43 insertions, 43 deletions
diff --git a/src/ci_driver.rs b/src/ci_driver.rs
index fca1e6e..710272d 100644
--- a/src/ci_driver.rs
+++ b/src/ci_driver.rs
@@ -28,10 +28,12 @@ mod dbctx;
mod sql;
mod notifier;
mod io;
+mod protocol;
use crate::dbctx::{DbCtx, PendingRun, Job, Run};
use crate::sql::JobResult;
use crate::sql::RunState;
+use crate::protocol::{ClientProto, CommandInfo, TaskInfo, RequestedJob};
lazy_static! {
static ref AUTH_SECRET: RwLock<Option<String>> = RwLock::new(None);
@@ -151,7 +153,7 @@ impl ClientJob {
pub async fn run(&mut self) {
loop {
eprintln!("waiting on response..");
- let msg = match self.client.recv().await.expect("recv works") {
+ let msg = match self.client.recv_typed::<ClientProto>().await.expect("recv works") {
Some(msg) => msg,
None => {
eprintln!("client hung up. task's done, i hope?");
@@ -159,38 +161,30 @@ impl ClientJob {
}
};
eprintln!("got {:?}", msg);
- let msg_kind = msg.as_object().unwrap().get("kind").unwrap().as_str().unwrap();
- match msg_kind {
- "new_task_please" => {
+ match msg {
+ ClientProto::NewTaskPlease { allowed_pushers, host_info } => {
eprintln!("misdirected task request (after handshake?)");
return;
- },
- "task_status" => {
- let state = msg.as_object().unwrap().get("state").unwrap().as_str().unwrap();
- let (result, state): (Result<String, String>, RunState) = if state == "finished" {
- let result = msg.as_object().unwrap().get("result").unwrap().as_str().unwrap();
- eprintln!("task update: state is {} and result is {}", state, result);
- match result {
- "pass" => {
- (Ok("success".to_string()), RunState::Finished)
- },
- other => {
- let desc = msg.as_object().unwrap().get("desc")
- .map(|x| x.as_str().unwrap().to_string())
- .unwrap_or_else(|| other.to_string());
- (Err(desc), RunState::Error)
+ }
+ ClientProto::TaskStatus(task_info) => {
+ let (result, state): (Result<String, String>, RunState) = match task_info {
+ TaskInfo::Finished { status } => {
+ eprintln!("task update: state is finished and result is {}", status);
+ match status.as_str() {
+ "pass" => {
+ (Ok("success".to_string()), RunState::Finished)
+ },
+ other => {
+ eprintln!("unhandled task completion status: {}", other);
+ (Err(other.to_string()), RunState::Error)
+ }
}
+ },
+ TaskInfo::Interrupted { status, description } => {
+ eprintln!("task update: state is interrupted and result is {}", status);
+ let desc = description.unwrap_or_else(|| status.clone());
+ (Err(desc), RunState::Error)
}
- } else if state == "interrupted" {
- let result = msg.as_object().unwrap().get("result").unwrap().as_str().unwrap();
- eprintln!("task update: state is {} and result is {}", state, result);
- let desc = msg.as_object().unwrap().get("desc")
- .map(|x| x.as_str().unwrap().to_string())
- .unwrap_or_else(|| result.to_string());
- (Err(desc), RunState::Error)
- } else {
- eprintln!("task update: state is {}", state);
- (Err(format!("atypical completion status: {}", state)), RunState::Invalid)
};
let job = self.dbctx.job_by_id(self.task.job_id).expect("can query").expect("job exists");
@@ -223,26 +217,23 @@ impl ClientJob {
)
.expect("can update");
}
- "artifact_create" => {
- eprintln!("creating artifact {:?}", msg);
+ ClientProto::ArtifactCreate => {
+ eprintln!("creating artifact");
self.client.send(serde_json::json!({
"status": "ok",
"object_id": "10",
})).await.unwrap();
},
- "metric" => {
- let name = msg.as_object().unwrap().get("name").unwrap().as_str().unwrap();
- let value = msg.as_object().unwrap().get("value").unwrap().as_str().unwrap();
-
- self.dbctx.insert_metric(self.task.id, name, value)
+ ClientProto::Metric { name, value } => {
+ self.dbctx.insert_metric(self.task.id, &name, &value)
.expect("TODO handle metric insert error?");
}
- "command" => {
+ ClientProto::Command(_command_info) => {
// record information about commands, start/stop, etc. probably also allow
// artifacts to be attached to commands and default to attaching stdout/stderr?
}
other => {
- eprintln!("unhandled message kind {:?} ({:?})", msg_kind, msg);
+ eprintln!("unhandled message {:?}", other);
}
}
}
@@ -264,7 +255,11 @@ impl RunnerClient {
}
async fn send(&mut self, msg: serde_json::Value) -> Result<(), String> {
- self.tx.send(Ok(serde_json::to_string(&msg).unwrap()))
+ self.send_typed(&msg).await
+ }
+
+ async fn send_typed<T: serde::Serialize>(&mut self, msg: &T) -> Result<(), String> {
+ self.tx.send(Ok(serde_json::to_string(msg).unwrap()))
.await
.map_err(|e| e.to_string())
}
@@ -286,6 +281,11 @@ impl RunnerClient {
}
}
+ async fn recv_typed<T: serde::de::DeserializeOwned>(&mut self) -> Result<Option<T>, String> {
+ let json = self.recv().await?;
+ Ok(json.map(|v| serde_json::from_value(v).unwrap()))
+ }
+
// is this client willing to run the job based on what it has told us so far?
fn will_accept(&self, job: &Job) -> bool {
match (job.source.as_ref(), self.accepted_sources.as_ref()) {
@@ -298,10 +298,10 @@ impl RunnerClient {
}
async fn submit(mut self, dbctx: &Arc<DbCtx>, job: &PendingRun, remote_git_url: &str, sha: &str) -> Result<Option<ClientJob>, String> {
- self.send(serde_json::json!({
- "commit": sha,
- "remote_url": remote_git_url,
- "build_token": &self.build_token,
+ self.send_typed(&ClientProto::new_task(RequestedJob {
+ commit: sha.to_string(),
+ remote_url: remote_git_url.to_string(),
+ build_token: self.build_token.to_string(),
})).await?;
match self.recv().await {
Ok(Some(resp)) => {