summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/io.rs52
-rw-r--r--src/lua/mod.rs1
2 files changed, 53 insertions, 0 deletions
diff --git a/src/io.rs b/src/io.rs
index 245c1ef..219edbf 100644
--- a/src/io.rs
+++ b/src/io.rs
@@ -3,6 +3,8 @@ use futures_util::StreamExt;
use tokio::fs::File;
use std::io::Write;
use tokio::fs::OpenOptions;
+use std::task::{Poll, Context};
+use std::pin::Pin;
pub struct ArtifactStream {
sender: hyper::body::Sender,
@@ -14,6 +16,39 @@ impl ArtifactStream {
}
}
+impl tokio::io::AsyncWrite for ArtifactStream {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context,
+ buf: &[u8]
+ ) -> Poll<Result<usize, std::io::Error>> {
+ match self.get_mut().sender.try_send_data(buf.to_vec().into()) {
+ Ok(()) => {
+ Poll::Ready(Ok(buf.len()))
+ },
+ _ => {
+ cx.waker().wake_by_ref();
+ Poll::Pending
+ }
+ }
+ }
+
+ fn poll_flush(
+ self: Pin<&mut Self>,
+ _cx: &mut Context
+ ) -> Poll<Result<(), std::io::Error>> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn poll_shutdown(
+ self: Pin<&mut Self>,
+ _cx: &mut Context
+ ) -> Poll<Result<(), std::io::Error>> {
+ Poll::Ready(Ok(()))
+ }
+}
+
+
pub struct ArtifactDescriptor {
job_id: u64,
artifact_id: u64,
@@ -63,6 +98,22 @@ impl ArtifactDescriptor {
}
}
+pub async fn forward_data(source: &mut (impl AsyncRead + Unpin), dest: &mut (impl AsyncWrite + Unpin)) -> Result<(), String> {
+ let mut buf = vec![0; 1024 * 1024];
+ loop {
+ let n_read = source.read(&mut buf).await
+ .map_err(|e| format!("failed to read: {:?}", e))?;
+
+ if n_read == 0 {
+ eprintln!("done reading!");
+ return Ok(());
+ }
+
+ dest.write_all(&buf[..n_read]).await
+ .map_err(|e| format!("failed to write: {:?}", e))?;
+ }
+}
+/*
pub async fn forward_data(source: &mut (impl AsyncRead + Unpin), dest: &mut ArtifactStream) -> Result<(), String> {
let mut buf = vec![0; 1024 * 1024];
loop {
@@ -78,3 +129,4 @@ pub async fn forward_data(source: &mut (impl AsyncRead + Unpin), dest: &mut Arti
.map_err(|e| format!("failed to write: {:?}", e))?;
}
}
+*/
diff --git a/src/lua/mod.rs b/src/lua/mod.rs
index 8f14b9a..cacba00 100644
--- a/src/lua/mod.rs
+++ b/src/lua/mod.rs
@@ -118,6 +118,7 @@ impl BuildEnv {
}
};
eprintln!("args: {:?}", args);
+ eprintln!(" params: {:?}", params);
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()