From e28b277980763b88d2828812bff2c0b9546d3d25 Mon Sep 17 00:00:00 2001 From: iximeow Date: Thu, 29 Dec 2022 09:19:07 +0000 Subject: do not use buggy AsyncWrite impl for file transfer, spawn_blocking a blocking task --- src/ci_runner.rs | 41 +++-------------------------------------- src/io.rs | 24 +++++++++++++----------- src/lua/mod.rs | 16 +++++++++------- 3 files changed, 25 insertions(+), 56 deletions(-) (limited to 'src') diff --git a/src/ci_runner.rs b/src/ci_runner.rs index 3c0104f..a47d298 100644 --- a/src/ci_runner.rs +++ b/src/ci_runner.rs @@ -16,6 +16,8 @@ use std::marker::Unpin; mod lua; mod io; +use crate::io::ArtifactStream; + #[derive(Debug)] enum WorkAcquireError { Reqwest(reqwest::Error), @@ -100,9 +102,7 @@ impl RunningJob { if resp.status() == StatusCode::OK { eprintln!("[+] artifact '{}' started", name); - Ok(ArtifactStream { - sender, - }) + Ok(ArtifactStream::new(sender)) } else { Err(format!("[-] unable to create artifact: {:?}", resp)) } @@ -319,41 +319,6 @@ impl RunningJob { } } -struct ArtifactStream { - sender: hyper::body::Sender, -} - -impl tokio::io::AsyncWrite for ArtifactStream { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context, - buf: &[u8] - ) -> Poll> { - match self.get_mut().sender.try_send_data(buf.to_vec().into()) { - Ok(()) => { - Poll::Ready(Ok(buf.len())) - }, - _ => { - Poll::Pending - } - } - } - - fn poll_flush( - self: Pin<&mut Self>, - _cx: &mut Context - ) -> Poll> { - Poll::Ready(Ok(())) - } - - fn poll_shutdown( - self: Pin<&mut Self>, - _cx: &mut Context - ) -> Poll> { - Poll::Ready(Ok(())) - } -} - impl RunnerClient { async fn new(host: &str, mut sender: hyper::body::Sender, mut res: Response) -> Result { if res.status() != StatusCode::OK { diff --git a/src/io.rs b/src/io.rs index 575cf65..67e4d11 100644 --- a/src/io.rs +++ b/src/io.rs @@ -4,6 +4,16 @@ use tokio::fs::File; use std::io::Write; use tokio::fs::OpenOptions; +pub struct ArtifactStream { + sender: hyper::body::Sender, +} + +impl ArtifactStream { + pub fn new(sender: hyper::body::Sender) -> Self { + Self { sender } + } +} + pub struct ArtifactDescriptor { job_id: u64, artifact_id: u64, @@ -49,33 +59,25 @@ impl ArtifactDescriptor { }; let chunk = chunk.as_ref(); - eprintln!("buf: {}", chunk.len()); - eprint!("write..."); self.file.write_all(chunk).await .map_err(|e| format!("failed to write: {:?}", e))?; - eprintln!("!"); } } } -pub async fn forward_data(source: &mut (impl AsyncRead + Unpin), dest: &mut (impl AsyncWrite + Unpin)) -> Result<(), String> { - let mut buf = vec![0; 1024 * 8];//1024]; +pub async fn forward_data(source: &mut (impl AsyncRead + Unpin), dest: &mut ArtifactStream) -> Result<(), String> { + let mut buf = vec![0; 1024 * 1024]; loop { - eprint!("read..."); let n_read = source.read(&mut buf).await .map_err(|e| format!("failed to read: {:?}", e))?; - eprintln!("!"); if n_read == 0 { eprintln!("done reading!"); return Ok(()); } - eprintln!("buf: {}", n_read); - eprint!("write..."); - dest.write_all(&buf[..n_read]).await + dest.sender.send_data(buf[..n_read].to_vec().into()).await .map_err(|e| format!("failed to write: {:?}", e))?; - eprintln!("!"); } } diff --git a/src/lua/mod.rs b/src/lua/mod.rs index 75b9cec..f186e6b 100644 --- a/src/lua/mod.rs +++ b/src/lua/mod.rs @@ -224,13 +224,15 @@ impl BuildEnv { pub async fn run_build(self, script: &[u8]) -> Result<(), LuaError> { let script = script.to_vec(); - let res: Result<(), LuaError> = std::thread::spawn(move || { - self.lua.context(|lua_ctx| { - lua_ctx.load(&script) - .set_name("goodfile")? - .exec() - }) - }).join().unwrap(); + let res: Result<(), LuaError> = tokio::task::spawn_blocking(|| { + std::thread::spawn(move || { + self.lua.context(|lua_ctx| { + lua_ctx.load(&script) + .set_name("goodfile")? + .exec() + }) + }).join().unwrap() + }).await.unwrap(); eprintln!("lua res: {:?}", res); res } -- cgit v1.1