From a7d2af2370ee186cdbf7f237a08754b9ed6991fd Mon Sep 17 00:00:00 2001 From: iximeow Date: Thu, 29 Dec 2022 03:23:49 +0000 Subject: factor out io stuff --- src/io.rs | 81 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) create mode 100644 src/io.rs (limited to 'src/io.rs') diff --git a/src/io.rs b/src/io.rs new file mode 100644 index 0000000..575cf65 --- /dev/null +++ b/src/io.rs @@ -0,0 +1,81 @@ +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use futures_util::StreamExt; +use tokio::fs::File; +use std::io::Write; +use tokio::fs::OpenOptions; + +pub struct ArtifactDescriptor { + job_id: u64, + artifact_id: u64, + file: File, +} + +impl ArtifactDescriptor { + pub async fn new(job_id: u64, artifact_id: u64) -> Result { + // TODO: jobs should be a configurable path + let path = format!("jobs/{}/{}", job_id, artifact_id); + let file = OpenOptions::new() + .read(true) + .write(true) + .create_new(true) + .open(&path) + .await + .map_err(|e| format!("couldn't open artifact file {}: {}", path, e))?; + + Ok(ArtifactDescriptor { + job_id, + artifact_id, + file, + }) + } + + pub async fn store_all(&mut self, mut data: axum::extract::BodyStream) -> Result<(), String> { + eprintln!("new store..."); + loop { + eprint!("read..."); + let chunk = data.next().await; + eprintln!("!"); + + let chunk = match chunk { + Some(Ok(chunk)) => chunk, + Some(Err(e)) => { + eprintln!("error: {:?}", e); + return Err(format!("error reading: {:?}", e)); + } + None => { + eprintln!("body done?"); + return Ok(()); + } + }; + + let chunk = chunk.as_ref(); + eprintln!("buf: {}", chunk.len()); + + eprint!("write..."); + self.file.write_all(chunk).await + .map_err(|e| format!("failed to write: {:?}", e))?; + eprintln!("!"); + } + } +} + +pub async fn forward_data(source: &mut (impl AsyncRead + Unpin), dest: &mut (impl AsyncWrite + Unpin)) -> Result<(), String> { + let mut buf = vec![0; 1024 * 8];//1024]; + loop { + eprint!("read..."); + let n_read = source.read(&mut buf).await + .map_err(|e| format!("failed to read: {:?}", e))?; + eprintln!("!"); + + if n_read == 0 { + eprintln!("done reading!"); + return Ok(()); + } + eprintln!("buf: {}", n_read); + + eprint!("write..."); + dest.write_all(&buf[..n_read]).await + .map_err(|e| format!("failed to write: {:?}", e))?; + eprintln!("!"); + } +} -- cgit v1.1