summaryrefslogtreecommitdiff
path: root/src/io.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/io.rs')
-rw-r--r--src/io.rs81
1 files changed, 81 insertions, 0 deletions
diff --git a/src/io.rs b/src/io.rs
new file mode 100644
index 0000000..575cf65
--- /dev/null
+++ b/src/io.rs
@@ -0,0 +1,81 @@
+use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
+use futures_util::StreamExt;
+use tokio::fs::File;
+use std::io::Write;
+use tokio::fs::OpenOptions;
+
+pub struct ArtifactDescriptor {
+ job_id: u64,
+ artifact_id: u64,
+ file: File,
+}
+
+impl ArtifactDescriptor {
+ pub async fn new(job_id: u64, artifact_id: u64) -> Result<Self, String> {
+ // TODO: jobs should be a configurable path
+ let path = format!("jobs/{}/{}", job_id, artifact_id);
+ let file = OpenOptions::new()
+ .read(true)
+ .write(true)
+ .create_new(true)
+ .open(&path)
+ .await
+ .map_err(|e| format!("couldn't open artifact file {}: {}", path, e))?;
+
+ Ok(ArtifactDescriptor {
+ job_id,
+ artifact_id,
+ file,
+ })
+ }
+
+ pub async fn store_all(&mut self, mut data: axum::extract::BodyStream) -> Result<(), String> {
+ eprintln!("new store...");
+ loop {
+ eprint!("read...");
+ let chunk = data.next().await;
+ eprintln!("!");
+
+ let chunk = match chunk {
+ Some(Ok(chunk)) => chunk,
+ Some(Err(e)) => {
+ eprintln!("error: {:?}", e);
+ return Err(format!("error reading: {:?}", e));
+ }
+ None => {
+ eprintln!("body done?");
+ return Ok(());
+ }
+ };
+
+ let chunk = chunk.as_ref();
+ eprintln!("buf: {}", chunk.len());
+
+ eprint!("write...");
+ self.file.write_all(chunk).await
+ .map_err(|e| format!("failed to write: {:?}", e))?;
+ eprintln!("!");
+ }
+ }
+}
+
+pub async fn forward_data(source: &mut (impl AsyncRead + Unpin), dest: &mut (impl AsyncWrite + Unpin)) -> Result<(), String> {
+ let mut buf = vec![0; 1024 * 8];//1024];
+ loop {
+ eprint!("read...");
+ let n_read = source.read(&mut buf).await
+ .map_err(|e| format!("failed to read: {:?}", e))?;
+ eprintln!("!");
+
+ if n_read == 0 {
+ eprintln!("done reading!");
+ return Ok(());
+ }
+ eprintln!("buf: {}", n_read);
+
+ eprint!("write...");
+ dest.write_all(&buf[..n_read]).await
+ .map_err(|e| format!("failed to write: {:?}", e))?;
+ eprintln!("!");
+ }
+}