summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoriximeow <git@iximeow.net>2022-12-29 09:19:07 +0000
committeriximeow <git@iximeow.net>2022-12-29 09:19:07 +0000
commite28b277980763b88d2828812bff2c0b9546d3d25 (patch)
tree34fe85309adb4170662951b3fb179f7441631e22
parent7801ace71a60f9c16497d99f760c14529a2d63d3 (diff)
do not use buggy AsyncWrite impl for file transfer, spawn_blocking a blocking task
-rw-r--r--src/ci_runner.rs41
-rw-r--r--src/io.rs24
-rw-r--r--src/lua/mod.rs16
3 files changed, 25 insertions, 56 deletions
diff --git a/src/ci_runner.rs b/src/ci_runner.rs
index 3c0104f..a47d298 100644
--- a/src/ci_runner.rs
+++ b/src/ci_runner.rs
@@ -16,6 +16,8 @@ use std::marker::Unpin;
mod lua;
mod io;
+use crate::io::ArtifactStream;
+
#[derive(Debug)]
enum WorkAcquireError {
Reqwest(reqwest::Error),
@@ -100,9 +102,7 @@ impl RunningJob {
if resp.status() == StatusCode::OK {
eprintln!("[+] artifact '{}' started", name);
- Ok(ArtifactStream {
- sender,
- })
+ Ok(ArtifactStream::new(sender))
} else {
Err(format!("[-] unable to create artifact: {:?}", resp))
}
@@ -319,41 +319,6 @@ impl RunningJob {
}
}
-struct ArtifactStream {
- sender: hyper::body::Sender,
-}
-
-impl tokio::io::AsyncWrite for ArtifactStream {
- fn poll_write(
- self: Pin<&mut Self>,
- cx: &mut Context,
- buf: &[u8]
- ) -> Poll<Result<usize, std::io::Error>> {
- match self.get_mut().sender.try_send_data(buf.to_vec().into()) {
- Ok(()) => {
- Poll::Ready(Ok(buf.len()))
- },
- _ => {
- Poll::Pending
- }
- }
- }
-
- fn poll_flush(
- self: Pin<&mut Self>,
- _cx: &mut Context
- ) -> Poll<Result<(), std::io::Error>> {
- Poll::Ready(Ok(()))
- }
-
- fn poll_shutdown(
- self: Pin<&mut Self>,
- _cx: &mut Context
- ) -> Poll<Result<(), std::io::Error>> {
- Poll::Ready(Ok(()))
- }
-}
-
impl RunnerClient {
async fn new(host: &str, mut sender: hyper::body::Sender, mut res: Response) -> Result<Self, String> {
if res.status() != StatusCode::OK {
diff --git a/src/io.rs b/src/io.rs
index 575cf65..67e4d11 100644
--- a/src/io.rs
+++ b/src/io.rs
@@ -4,6 +4,16 @@ use tokio::fs::File;
use std::io::Write;
use tokio::fs::OpenOptions;
+pub struct ArtifactStream {
+ sender: hyper::body::Sender,
+}
+
+impl ArtifactStream {
+ pub fn new(sender: hyper::body::Sender) -> Self {
+ Self { sender }
+ }
+}
+
pub struct ArtifactDescriptor {
job_id: u64,
artifact_id: u64,
@@ -49,33 +59,25 @@ impl ArtifactDescriptor {
};
let chunk = chunk.as_ref();
- eprintln!("buf: {}", chunk.len());
- eprint!("write...");
self.file.write_all(chunk).await
.map_err(|e| format!("failed to write: {:?}", e))?;
- eprintln!("!");
}
}
}
-pub async fn forward_data(source: &mut (impl AsyncRead + Unpin), dest: &mut (impl AsyncWrite + Unpin)) -> Result<(), String> {
- let mut buf = vec![0; 1024 * 8];//1024];
+pub async fn forward_data(source: &mut (impl AsyncRead + Unpin), dest: &mut ArtifactStream) -> Result<(), String> {
+ let mut buf = vec![0; 1024 * 1024];
loop {
- eprint!("read...");
let n_read = source.read(&mut buf).await
.map_err(|e| format!("failed to read: {:?}", e))?;
- eprintln!("!");
if n_read == 0 {
eprintln!("done reading!");
return Ok(());
}
- eprintln!("buf: {}", n_read);
- eprint!("write...");
- dest.write_all(&buf[..n_read]).await
+ dest.sender.send_data(buf[..n_read].to_vec().into()).await
.map_err(|e| format!("failed to write: {:?}", e))?;
- eprintln!("!");
}
}
diff --git a/src/lua/mod.rs b/src/lua/mod.rs
index 75b9cec..f186e6b 100644
--- a/src/lua/mod.rs
+++ b/src/lua/mod.rs
@@ -224,13 +224,15 @@ impl BuildEnv {
pub async fn run_build(self, script: &[u8]) -> Result<(), LuaError> {
let script = script.to_vec();
- let res: Result<(), LuaError> = std::thread::spawn(move || {
- self.lua.context(|lua_ctx| {
- lua_ctx.load(&script)
- .set_name("goodfile")?
- .exec()
- })
- }).join().unwrap();
+ let res: Result<(), LuaError> = tokio::task::spawn_blocking(|| {
+ std::thread::spawn(move || {
+ self.lua.context(|lua_ctx| {
+ lua_ctx.load(&script)
+ .set_name("goodfile")?
+ .exec()
+ })
+ }).join().unwrap()
+ }).await.unwrap();
eprintln!("lua res: {:?}", res);
res
}