diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/ci_runner.rs | 41 | ||||
| -rw-r--r-- | src/io.rs | 24 | ||||
| -rw-r--r-- | src/lua/mod.rs | 16 | 
3 files changed, 25 insertions, 56 deletions
| diff --git a/src/ci_runner.rs b/src/ci_runner.rs index 3c0104f..a47d298 100644 --- a/src/ci_runner.rs +++ b/src/ci_runner.rs @@ -16,6 +16,8 @@ use std::marker::Unpin;  mod lua;  mod io; +use crate::io::ArtifactStream; +  #[derive(Debug)]  enum WorkAcquireError {      Reqwest(reqwest::Error), @@ -100,9 +102,7 @@ impl RunningJob {          if resp.status() == StatusCode::OK {              eprintln!("[+] artifact '{}' started", name); -            Ok(ArtifactStream { -                sender, -            }) +            Ok(ArtifactStream::new(sender))          } else {              Err(format!("[-] unable to create artifact: {:?}", resp))          } @@ -319,41 +319,6 @@ impl RunningJob {      }  } -struct ArtifactStream { -    sender: hyper::body::Sender, -} - -impl tokio::io::AsyncWrite for ArtifactStream { -    fn poll_write( -        self: Pin<&mut Self>, -        cx: &mut Context, -        buf: &[u8] -    ) -> Poll<Result<usize, std::io::Error>> { -        match self.get_mut().sender.try_send_data(buf.to_vec().into()) { -            Ok(()) => { -                Poll::Ready(Ok(buf.len())) -            }, -            _ => { -                Poll::Pending -            } -        } -    } - -    fn poll_flush( -        self: Pin<&mut Self>, -        _cx: &mut Context -    ) -> Poll<Result<(), std::io::Error>> { -        Poll::Ready(Ok(())) -    } - -    fn poll_shutdown( -        self: Pin<&mut Self>, -        _cx: &mut Context -    ) -> Poll<Result<(), std::io::Error>> { -        Poll::Ready(Ok(())) -    } -} -  impl RunnerClient {      async fn new(host: &str, mut sender: hyper::body::Sender, mut res: Response) -> Result<Self, String> {          if res.status() != StatusCode::OK { @@ -4,6 +4,16 @@ use tokio::fs::File;  use std::io::Write;  use tokio::fs::OpenOptions; +pub struct ArtifactStream { +    sender: hyper::body::Sender, +} + +impl ArtifactStream { +    pub fn new(sender: hyper::body::Sender) -> Self { +        Self { sender } +    } +} +  pub struct ArtifactDescriptor {      job_id: u64,      artifact_id: u64, @@ -49,33 +59,25 @@ impl ArtifactDescriptor {              };              let chunk = chunk.as_ref(); -            eprintln!("buf: {}", chunk.len()); -            eprint!("write...");              self.file.write_all(chunk).await                  .map_err(|e| format!("failed to write: {:?}", e))?; -            eprintln!("!");          }      }  } -pub async fn forward_data(source: &mut (impl AsyncRead + Unpin), dest: &mut (impl AsyncWrite + Unpin)) -> Result<(), String> { -    let mut buf = vec![0; 1024 * 8];//1024]; +pub async fn forward_data(source: &mut (impl AsyncRead + Unpin), dest: &mut ArtifactStream) -> Result<(), String> { +    let mut buf = vec![0; 1024 * 1024];      loop { -        eprint!("read...");          let n_read = source.read(&mut buf).await              .map_err(|e| format!("failed to read: {:?}", e))?; -        eprintln!("!");          if n_read == 0 {              eprintln!("done reading!");              return Ok(());          } -        eprintln!("buf: {}", n_read); -        eprint!("write..."); -        dest.write_all(&buf[..n_read]).await +        dest.sender.send_data(buf[..n_read].to_vec().into()).await              .map_err(|e| format!("failed to write: {:?}", e))?; -        eprintln!("!");      }  } diff --git a/src/lua/mod.rs b/src/lua/mod.rs index 75b9cec..f186e6b 100644 --- a/src/lua/mod.rs +++ b/src/lua/mod.rs @@ -224,13 +224,15 @@ impl BuildEnv {      pub async fn run_build(self, script: &[u8]) -> Result<(), LuaError> {          let script = script.to_vec(); -        let res: Result<(), LuaError> = std::thread::spawn(move || { -            self.lua.context(|lua_ctx| { -                lua_ctx.load(&script) -                    .set_name("goodfile")? -                    .exec() -            }) -        }).join().unwrap(); +        let res: Result<(), LuaError> = tokio::task::spawn_blocking(|| { +            std::thread::spawn(move || { +                self.lua.context(|lua_ctx| { +                    lua_ctx.load(&script) +                        .set_name("goodfile")? +                        .exec() +                }) +            }).join().unwrap() +        }).await.unwrap();          eprintln!("lua res: {:?}", res);          res      } | 
