From e28b277980763b88d2828812bff2c0b9546d3d25 Mon Sep 17 00:00:00 2001 From: iximeow Date: Thu, 29 Dec 2022 09:19:07 +0000 Subject: do not use buggy AsyncWrite impl for file transfer, spawn_blocking a blocking task --- src/io.rs | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) (limited to 'src/io.rs') diff --git a/src/io.rs b/src/io.rs index 575cf65..67e4d11 100644 --- a/src/io.rs +++ b/src/io.rs @@ -4,6 +4,16 @@ use tokio::fs::File; use std::io::Write; use tokio::fs::OpenOptions; +pub struct ArtifactStream { + sender: hyper::body::Sender, +} + +impl ArtifactStream { + pub fn new(sender: hyper::body::Sender) -> Self { + Self { sender } + } +} + pub struct ArtifactDescriptor { job_id: u64, artifact_id: u64, @@ -49,33 +59,25 @@ impl ArtifactDescriptor { }; let chunk = chunk.as_ref(); - eprintln!("buf: {}", chunk.len()); - eprint!("write..."); self.file.write_all(chunk).await .map_err(|e| format!("failed to write: {:?}", e))?; - eprintln!("!"); } } } -pub async fn forward_data(source: &mut (impl AsyncRead + Unpin), dest: &mut (impl AsyncWrite + Unpin)) -> Result<(), String> { - let mut buf = vec![0; 1024 * 8];//1024]; +pub async fn forward_data(source: &mut (impl AsyncRead + Unpin), dest: &mut ArtifactStream) -> Result<(), String> { + let mut buf = vec![0; 1024 * 1024]; loop { - eprint!("read..."); let n_read = source.read(&mut buf).await .map_err(|e| format!("failed to read: {:?}", e))?; - eprintln!("!"); if n_read == 0 { eprintln!("done reading!"); return Ok(()); } - eprintln!("buf: {}", n_read); - eprint!("write..."); - dest.write_all(&buf[..n_read]).await + dest.sender.send_data(buf[..n_read].to_vec().into()).await .map_err(|e| format!("failed to write: {:?}", e))?; - eprintln!("!"); } } -- cgit v1.1