summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authoriximeow <git@iximeow.net>2022-12-29 03:23:49 +0000
committeriximeow <git@iximeow.net>2022-12-29 03:23:49 +0000
commita7d2af2370ee186cdbf7f237a08754b9ed6991fd (patch)
treeaef84fec24d1f5f61264d278de9c409ded3c4681 /src
parentf6da9d6b9ffcdb8a4a30d7d9f28dd37b4afb143c (diff)
factor out io stuff
Diffstat (limited to 'src')
-rw-r--r--src/ci_ctl.rs1
-rw-r--r--src/ci_driver.rs1
-rw-r--r--src/ci_runner.rs28
-rw-r--r--src/dbctx.rs49
-rw-r--r--src/io.rs81
-rw-r--r--src/lua/mod.rs10
6 files changed, 98 insertions, 72 deletions
diff --git a/src/ci_ctl.rs b/src/ci_ctl.rs
index 687aa46..f6c55be 100644
--- a/src/ci_ctl.rs
+++ b/src/ci_ctl.rs
@@ -3,6 +3,7 @@ use clap::{Parser, Subcommand};
mod sql;
mod dbctx;
mod notifier;
+mod io;
use sql::JobState;
use dbctx::DbCtx;
diff --git a/src/ci_driver.rs b/src/ci_driver.rs
index 3be49f4..da44fda 100644
--- a/src/ci_driver.rs
+++ b/src/ci_driver.rs
@@ -24,6 +24,7 @@ use serde_json::json;
mod dbctx;
mod sql;
mod notifier;
+mod io;
use crate::dbctx::{DbCtx, PendingJob};
use crate::sql::JobResult;
diff --git a/src/ci_runner.rs b/src/ci_runner.rs
index a88b135..3c0104f 100644
--- a/src/ci_runner.rs
+++ b/src/ci_runner.rs
@@ -14,6 +14,7 @@ use std::pin::Pin;
use std::marker::Unpin;
mod lua;
+mod io;
#[derive(Debug)]
enum WorkAcquireError {
@@ -75,21 +76,6 @@ pub struct RunningJob {
client: RunnerClient,
}
-async fn forward_data(mut source: impl AsyncRead + Unpin, mut dest: impl AsyncWrite + Unpin) -> Result<(), String> {
- let mut buf = vec![0; 1024 * 1024];
- loop {
- let n_read = source.read(&mut buf).await
- .map_err(|e| format!("failed to read: {:?}", e))?;
-
- if n_read == 0 {
- return Ok(());
- }
-
- dest.write_all(&buf[..n_read]).await
- .map_err(|e| format!("failed to write: {:?}", e))?;
- }
-}
-
impl RunningJob {
async fn send_metric(&mut self, name: &str, value: String) -> Result<(), String> {
self.client.send(serde_json::json!({
@@ -182,11 +168,11 @@ impl RunningJob {
async fn execute_command(&self, mut command: Command, name: &str, desc: &str) -> Result<ExitStatus, String> {
eprintln!("[.] running {}", name);
- let stdout_artifact = self.create_artifact(
+ let mut stdout_artifact = self.create_artifact(
&format!("{} (stdout)", name),
&format!("{} (stdout)", desc)
).await.expect("works");
- let stderr_artifact = self.create_artifact(
+ let mut stderr_artifact = self.create_artifact(
&format!("{} (stderr)", name),
&format!("{} (stderr)", desc)
).await.expect("works");
@@ -198,13 +184,13 @@ impl RunningJob {
.spawn()
.map_err(|e| format!("failed to spawn '{}', {:?}", name, e))?;
- let child_stdout = child.stdout.take().unwrap();
- let child_stderr = child.stderr.take().unwrap();
+ let mut child_stdout = child.stdout.take().unwrap();
+ let mut child_stderr = child.stderr.take().unwrap();
eprintln!("[.] '{}': forwarding stdout", name);
- tokio::spawn(forward_data(child_stdout, stdout_artifact));
+ tokio::spawn(async move { crate::io::forward_data(&mut child_stdout, &mut stdout_artifact).await });
eprintln!("[.] '{}': forwarding stderr", name);
- tokio::spawn(forward_data(child_stderr, stderr_artifact));
+ tokio::spawn(async move { crate::io::forward_data(&mut child_stderr, &mut stderr_artifact).await });
let res = child.wait().await
.map_err(|e| format!("failed to wait? {:?}", e))?;
diff --git a/src/dbctx.rs b/src/dbctx.rs
index cb74010..804f083 100644
--- a/src/dbctx.rs
+++ b/src/dbctx.rs
@@ -7,6 +7,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt};
use std::path::Path;
use std::path::PathBuf;
+use crate::io::ArtifactDescriptor;
use crate::notifier::{RemoteNotifier, NotifierConfig};
use crate::sql;
@@ -45,54 +46,6 @@ pub struct ArtifactRecord {
pub desc: String
}
-pub struct ArtifactDescriptor {
- job_id: u64,
- artifact_id: u64,
- file: File,
-}
-
-impl ArtifactDescriptor {
- async fn new(job_id: u64, artifact_id: u64) -> Result<Self, String> {
- // TODO: jobs should be a configurable path
- let path = format!("jobs/{}/{}", job_id, artifact_id);
- let file = OpenOptions::new()
- .read(true)
- .write(true)
- .create_new(true)
- .open(&path)
- .await
- .map_err(|e| format!("couldn't open artifact file {}: {}", path, e))?;
-
- Ok(ArtifactDescriptor {
- job_id,
- artifact_id,
- file,
- })
- }
-
- pub async fn store_all(&mut self, mut data: axum::extract::BodyStream) -> Result<(), String> {
- loop {
- let chunk = data.next().await;
-
- let chunk = match chunk {
- Some(Ok(chunk)) => chunk,
- Some(Err(e)) => {
- return Err(format!("error reading: {:?}", e));
- }
- None => {
- eprintln!("body done?");
- return Ok(());
- }
- };
-
- let chunk = chunk.as_ref();
-
- self.file.write_all(chunk).await
- .map_err(|e| format!("failed to write: {:?}", e))?;
- }
- }
-}
-
impl DbCtx {
pub fn new<P: AsRef<Path>>(config_path: P, db_path: P) -> Self {
DbCtx {
diff --git a/src/io.rs b/src/io.rs
new file mode 100644
index 0000000..575cf65
--- /dev/null
+++ b/src/io.rs
@@ -0,0 +1,81 @@
+use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
+use futures_util::StreamExt;
+use tokio::fs::File;
+use std::io::Write;
+use tokio::fs::OpenOptions;
+
+pub struct ArtifactDescriptor {
+ job_id: u64,
+ artifact_id: u64,
+ file: File,
+}
+
+impl ArtifactDescriptor {
+ pub async fn new(job_id: u64, artifact_id: u64) -> Result<Self, String> {
+ // TODO: jobs should be a configurable path
+ let path = format!("jobs/{}/{}", job_id, artifact_id);
+ let file = OpenOptions::new()
+ .read(true)
+ .write(true)
+ .create_new(true)
+ .open(&path)
+ .await
+ .map_err(|e| format!("couldn't open artifact file {}: {}", path, e))?;
+
+ Ok(ArtifactDescriptor {
+ job_id,
+ artifact_id,
+ file,
+ })
+ }
+
+ pub async fn store_all(&mut self, mut data: axum::extract::BodyStream) -> Result<(), String> {
+ eprintln!("new store...");
+ loop {
+ eprint!("read...");
+ let chunk = data.next().await;
+ eprintln!("!");
+
+ let chunk = match chunk {
+ Some(Ok(chunk)) => chunk,
+ Some(Err(e)) => {
+ eprintln!("error: {:?}", e);
+ return Err(format!("error reading: {:?}", e));
+ }
+ None => {
+ eprintln!("body done?");
+ return Ok(());
+ }
+ };
+
+ let chunk = chunk.as_ref();
+ eprintln!("buf: {}", chunk.len());
+
+ eprint!("write...");
+ self.file.write_all(chunk).await
+ .map_err(|e| format!("failed to write: {:?}", e))?;
+ eprintln!("!");
+ }
+ }
+}
+
+pub async fn forward_data(source: &mut (impl AsyncRead + Unpin), dest: &mut (impl AsyncWrite + Unpin)) -> Result<(), String> {
+ let mut buf = vec![0; 1024 * 8];//1024];
+ loop {
+ eprint!("read...");
+ let n_read = source.read(&mut buf).await
+ .map_err(|e| format!("failed to read: {:?}", e))?;
+ eprintln!("!");
+
+ if n_read == 0 {
+ eprintln!("done reading!");
+ return Ok(());
+ }
+ eprintln!("buf: {}", n_read);
+
+ eprint!("write...");
+ dest.write_all(&buf[..n_read]).await
+ .map_err(|e| format!("failed to write: {:?}", e))?;
+ eprintln!("!");
+ }
+}
diff --git a/src/lua/mod.rs b/src/lua/mod.rs
index 936c94a..26d2d5b 100644
--- a/src/lua/mod.rs
+++ b/src/lua/mod.rs
@@ -162,11 +162,15 @@ impl BuildEnv {
.build()
.unwrap();
rt.block_on(async move {
- let artifact = job_ref.lock().unwrap().create_artifact(&name, &format!("{} (from {})", name, path.display())).await
+ let mut artifact = job_ref.lock().unwrap().create_artifact(&name, &format!("{} (from {})", name, path.display())).await
.map_err(|e| LuaError::RuntimeError(format!("create_artifact error: {:?}", e)))
.unwrap();
- crate::forward_data(tokio::fs::File::open(&format!("tmpdir/{}", path.display())).await.unwrap(), artifact).await
- .map_err(|e| LuaError::RuntimeError(format!("failed uploading data for {}: {:?}", name, e)))
+ let mut file = tokio::fs::File::open(&format!("tmpdir/{}", path.display())).await.unwrap();
+ eprintln!("uploading...");
+ crate::io::forward_data(&mut file, &mut artifact).await
+ .map_err(|e| LuaError::RuntimeError(format!("failed uploading data for {}: {:?}", name, e)))?;
+ std::mem::drop(artifact);
+ Ok(())
})
})
.map_err(|e| format!("problem defining metric function: {:?}", e))?;