diff options
author | iximeow <me@iximeow.net> | 2022-12-29 13:31:22 -0800 |
---|---|---|
committer | iximeow <me@iximeow.net> | 2022-12-29 13:31:22 -0800 |
commit | 54a00732752044fdecdd33ec04ec72e0d56b9db5 (patch) | |
tree | 412dd33d7da7c61357704afd6088daeaa90851fe | |
parent | 79a40354df06b79df03352fb9724c9f934b12e06 (diff) |
use fixed AsyncWrite impl
-rw-r--r-- | src/io.rs | 52 | ||||
-rw-r--r-- | src/lua/mod.rs | 1 |
2 files changed, 53 insertions, 0 deletions
@@ -3,6 +3,8 @@ use futures_util::StreamExt; use tokio::fs::File; use std::io::Write; use tokio::fs::OpenOptions; +use std::task::{Poll, Context}; +use std::pin::Pin; pub struct ArtifactStream { sender: hyper::body::Sender, @@ -14,6 +16,39 @@ impl ArtifactStream { } } +impl tokio::io::AsyncWrite for ArtifactStream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &[u8] + ) -> Poll<Result<usize, std::io::Error>> { + match self.get_mut().sender.try_send_data(buf.to_vec().into()) { + Ok(()) => { + Poll::Ready(Ok(buf.len())) + }, + _ => { + cx.waker().wake_by_ref(); + Poll::Pending + } + } + } + + fn poll_flush( + self: Pin<&mut Self>, + _cx: &mut Context + ) -> Poll<Result<(), std::io::Error>> { + Poll::Ready(Ok(())) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + _cx: &mut Context + ) -> Poll<Result<(), std::io::Error>> { + Poll::Ready(Ok(())) + } +} + + pub struct ArtifactDescriptor { job_id: u64, artifact_id: u64, @@ -63,6 +98,22 @@ impl ArtifactDescriptor { } } +pub async fn forward_data(source: &mut (impl AsyncRead + Unpin), dest: &mut (impl AsyncWrite + Unpin)) -> Result<(), String> { + let mut buf = vec![0; 1024 * 1024]; + loop { + let n_read = source.read(&mut buf).await + .map_err(|e| format!("failed to read: {:?}", e))?; + + if n_read == 0 { + eprintln!("done reading!"); + return Ok(()); + } + + dest.write_all(&buf[..n_read]).await + .map_err(|e| format!("failed to write: {:?}", e))?; + } +} +/* pub async fn forward_data(source: &mut (impl AsyncRead + Unpin), dest: &mut ArtifactStream) -> Result<(), String> { let mut buf = vec![0; 1024 * 1024]; loop { @@ -78,3 +129,4 @@ pub async fn forward_data(source: &mut (impl AsyncRead + Unpin), dest: &mut Arti .map_err(|e| format!("failed to write: {:?}", e))?; } } +*/ diff --git a/src/lua/mod.rs b/src/lua/mod.rs index 8f14b9a..cacba00 100644 --- a/src/lua/mod.rs +++ b/src/lua/mod.rs @@ -118,6 +118,7 @@ impl BuildEnv { } }; eprintln!("args: {:?}", args); + eprintln!(" params: {:?}", params); let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build() |