From a7d2af2370ee186cdbf7f237a08754b9ed6991fd Mon Sep 17 00:00:00 2001 From: iximeow Date: Thu, 29 Dec 2022 03:23:49 +0000 Subject: factor out io stuff --- src/ci_ctl.rs | 1 + src/ci_driver.rs | 1 + src/ci_runner.rs | 28 +++++--------------- src/dbctx.rs | 49 +--------------------------------- src/io.rs | 81 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/lua/mod.rs | 10 ++++--- 6 files changed, 98 insertions(+), 72 deletions(-) create mode 100644 src/io.rs (limited to 'src') diff --git a/src/ci_ctl.rs b/src/ci_ctl.rs index 687aa46..f6c55be 100644 --- a/src/ci_ctl.rs +++ b/src/ci_ctl.rs @@ -3,6 +3,7 @@ use clap::{Parser, Subcommand}; mod sql; mod dbctx; mod notifier; +mod io; use sql::JobState; use dbctx::DbCtx; diff --git a/src/ci_driver.rs b/src/ci_driver.rs index 3be49f4..da44fda 100644 --- a/src/ci_driver.rs +++ b/src/ci_driver.rs @@ -24,6 +24,7 @@ use serde_json::json; mod dbctx; mod sql; mod notifier; +mod io; use crate::dbctx::{DbCtx, PendingJob}; use crate::sql::JobResult; diff --git a/src/ci_runner.rs b/src/ci_runner.rs index a88b135..3c0104f 100644 --- a/src/ci_runner.rs +++ b/src/ci_runner.rs @@ -14,6 +14,7 @@ use std::pin::Pin; use std::marker::Unpin; mod lua; +mod io; #[derive(Debug)] enum WorkAcquireError { @@ -75,21 +76,6 @@ pub struct RunningJob { client: RunnerClient, } -async fn forward_data(mut source: impl AsyncRead + Unpin, mut dest: impl AsyncWrite + Unpin) -> Result<(), String> { - let mut buf = vec![0; 1024 * 1024]; - loop { - let n_read = source.read(&mut buf).await - .map_err(|e| format!("failed to read: {:?}", e))?; - - if n_read == 0 { - return Ok(()); - } - - dest.write_all(&buf[..n_read]).await - .map_err(|e| format!("failed to write: {:?}", e))?; - } -} - impl RunningJob { async fn send_metric(&mut self, name: &str, value: String) -> Result<(), String> { self.client.send(serde_json::json!({ @@ -182,11 +168,11 @@ impl RunningJob { async fn execute_command(&self, mut command: Command, name: &str, desc: &str) -> Result { eprintln!("[.] running {}", name); - let stdout_artifact = self.create_artifact( + let mut stdout_artifact = self.create_artifact( &format!("{} (stdout)", name), &format!("{} (stdout)", desc) ).await.expect("works"); - let stderr_artifact = self.create_artifact( + let mut stderr_artifact = self.create_artifact( &format!("{} (stderr)", name), &format!("{} (stderr)", desc) ).await.expect("works"); @@ -198,13 +184,13 @@ impl RunningJob { .spawn() .map_err(|e| format!("failed to spawn '{}', {:?}", name, e))?; - let child_stdout = child.stdout.take().unwrap(); - let child_stderr = child.stderr.take().unwrap(); + let mut child_stdout = child.stdout.take().unwrap(); + let mut child_stderr = child.stderr.take().unwrap(); eprintln!("[.] '{}': forwarding stdout", name); - tokio::spawn(forward_data(child_stdout, stdout_artifact)); + tokio::spawn(async move { crate::io::forward_data(&mut child_stdout, &mut stdout_artifact).await }); eprintln!("[.] '{}': forwarding stderr", name); - tokio::spawn(forward_data(child_stderr, stderr_artifact)); + tokio::spawn(async move { crate::io::forward_data(&mut child_stderr, &mut stderr_artifact).await }); let res = child.wait().await .map_err(|e| format!("failed to wait? {:?}", e))?; diff --git a/src/dbctx.rs b/src/dbctx.rs index cb74010..804f083 100644 --- a/src/dbctx.rs +++ b/src/dbctx.rs @@ -7,6 +7,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt}; use std::path::Path; use std::path::PathBuf; +use crate::io::ArtifactDescriptor; use crate::notifier::{RemoteNotifier, NotifierConfig}; use crate::sql; @@ -45,54 +46,6 @@ pub struct ArtifactRecord { pub desc: String } -pub struct ArtifactDescriptor { - job_id: u64, - artifact_id: u64, - file: File, -} - -impl ArtifactDescriptor { - async fn new(job_id: u64, artifact_id: u64) -> Result { - // TODO: jobs should be a configurable path - let path = format!("jobs/{}/{}", job_id, artifact_id); - let file = OpenOptions::new() - .read(true) - .write(true) - .create_new(true) - .open(&path) - .await - .map_err(|e| format!("couldn't open artifact file {}: {}", path, e))?; - - Ok(ArtifactDescriptor { - job_id, - artifact_id, - file, - }) - } - - pub async fn store_all(&mut self, mut data: axum::extract::BodyStream) -> Result<(), String> { - loop { - let chunk = data.next().await; - - let chunk = match chunk { - Some(Ok(chunk)) => chunk, - Some(Err(e)) => { - return Err(format!("error reading: {:?}", e)); - } - None => { - eprintln!("body done?"); - return Ok(()); - } - }; - - let chunk = chunk.as_ref(); - - self.file.write_all(chunk).await - .map_err(|e| format!("failed to write: {:?}", e))?; - } - } -} - impl DbCtx { pub fn new>(config_path: P, db_path: P) -> Self { DbCtx { diff --git a/src/io.rs b/src/io.rs new file mode 100644 index 0000000..575cf65 --- /dev/null +++ b/src/io.rs @@ -0,0 +1,81 @@ +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use futures_util::StreamExt; +use tokio::fs::File; +use std::io::Write; +use tokio::fs::OpenOptions; + +pub struct ArtifactDescriptor { + job_id: u64, + artifact_id: u64, + file: File, +} + +impl ArtifactDescriptor { + pub async fn new(job_id: u64, artifact_id: u64) -> Result { + // TODO: jobs should be a configurable path + let path = format!("jobs/{}/{}", job_id, artifact_id); + let file = OpenOptions::new() + .read(true) + .write(true) + .create_new(true) + .open(&path) + .await + .map_err(|e| format!("couldn't open artifact file {}: {}", path, e))?; + + Ok(ArtifactDescriptor { + job_id, + artifact_id, + file, + }) + } + + pub async fn store_all(&mut self, mut data: axum::extract::BodyStream) -> Result<(), String> { + eprintln!("new store..."); + loop { + eprint!("read..."); + let chunk = data.next().await; + eprintln!("!"); + + let chunk = match chunk { + Some(Ok(chunk)) => chunk, + Some(Err(e)) => { + eprintln!("error: {:?}", e); + return Err(format!("error reading: {:?}", e)); + } + None => { + eprintln!("body done?"); + return Ok(()); + } + }; + + let chunk = chunk.as_ref(); + eprintln!("buf: {}", chunk.len()); + + eprint!("write..."); + self.file.write_all(chunk).await + .map_err(|e| format!("failed to write: {:?}", e))?; + eprintln!("!"); + } + } +} + +pub async fn forward_data(source: &mut (impl AsyncRead + Unpin), dest: &mut (impl AsyncWrite + Unpin)) -> Result<(), String> { + let mut buf = vec![0; 1024 * 8];//1024]; + loop { + eprint!("read..."); + let n_read = source.read(&mut buf).await + .map_err(|e| format!("failed to read: {:?}", e))?; + eprintln!("!"); + + if n_read == 0 { + eprintln!("done reading!"); + return Ok(()); + } + eprintln!("buf: {}", n_read); + + eprint!("write..."); + dest.write_all(&buf[..n_read]).await + .map_err(|e| format!("failed to write: {:?}", e))?; + eprintln!("!"); + } +} diff --git a/src/lua/mod.rs b/src/lua/mod.rs index 936c94a..26d2d5b 100644 --- a/src/lua/mod.rs +++ b/src/lua/mod.rs @@ -162,11 +162,15 @@ impl BuildEnv { .build() .unwrap(); rt.block_on(async move { - let artifact = job_ref.lock().unwrap().create_artifact(&name, &format!("{} (from {})", name, path.display())).await + let mut artifact = job_ref.lock().unwrap().create_artifact(&name, &format!("{} (from {})", name, path.display())).await .map_err(|e| LuaError::RuntimeError(format!("create_artifact error: {:?}", e))) .unwrap(); - crate::forward_data(tokio::fs::File::open(&format!("tmpdir/{}", path.display())).await.unwrap(), artifact).await - .map_err(|e| LuaError::RuntimeError(format!("failed uploading data for {}: {:?}", name, e))) + let mut file = tokio::fs::File::open(&format!("tmpdir/{}", path.display())).await.unwrap(); + eprintln!("uploading..."); + crate::io::forward_data(&mut file, &mut artifact).await + .map_err(|e| LuaError::RuntimeError(format!("failed uploading data for {}: {:?}", name, e)))?; + std::mem::drop(artifact); + Ok(()) }) }) .map_err(|e| format!("problem defining metric function: {:?}", e))?; -- cgit v1.1