summaryrefslogtreecommitdiff
path: root/src/io.rs
diff options
context:
space:
mode:
authoriximeow <git@iximeow.net>2022-12-29 09:19:07 +0000
committeriximeow <git@iximeow.net>2022-12-29 09:19:07 +0000
commite28b277980763b88d2828812bff2c0b9546d3d25 (patch)
tree34fe85309adb4170662951b3fb179f7441631e22 /src/io.rs
parent7801ace71a60f9c16497d99f760c14529a2d63d3 (diff)
do not use buggy AsyncWrite impl for file transfer, spawn_blocking a blocking task
Diffstat (limited to 'src/io.rs')
-rw-r--r--src/io.rs24
1 files changed, 13 insertions, 11 deletions
diff --git a/src/io.rs b/src/io.rs
index 575cf65..67e4d11 100644
--- a/src/io.rs
+++ b/src/io.rs
@@ -4,6 +4,16 @@ use tokio::fs::File;
use std::io::Write;
use tokio::fs::OpenOptions;
+pub struct ArtifactStream {
+ sender: hyper::body::Sender,
+}
+
+impl ArtifactStream {
+ pub fn new(sender: hyper::body::Sender) -> Self {
+ Self { sender }
+ }
+}
+
pub struct ArtifactDescriptor {
job_id: u64,
artifact_id: u64,
@@ -49,33 +59,25 @@ impl ArtifactDescriptor {
};
let chunk = chunk.as_ref();
- eprintln!("buf: {}", chunk.len());
- eprint!("write...");
self.file.write_all(chunk).await
.map_err(|e| format!("failed to write: {:?}", e))?;
- eprintln!("!");
}
}
}
-pub async fn forward_data(source: &mut (impl AsyncRead + Unpin), dest: &mut (impl AsyncWrite + Unpin)) -> Result<(), String> {
- let mut buf = vec![0; 1024 * 8];//1024];
+pub async fn forward_data(source: &mut (impl AsyncRead + Unpin), dest: &mut ArtifactStream) -> Result<(), String> {
+ let mut buf = vec![0; 1024 * 1024];
loop {
- eprint!("read...");
let n_read = source.read(&mut buf).await
.map_err(|e| format!("failed to read: {:?}", e))?;
- eprintln!("!");
if n_read == 0 {
eprintln!("done reading!");
return Ok(());
}
- eprintln!("buf: {}", n_read);
- eprint!("write...");
- dest.write_all(&buf[..n_read]).await
+ dest.sender.send_data(buf[..n_read].to_vec().into()).await
.map_err(|e| format!("failed to write: {:?}", e))?;
- eprintln!("!");
}
}