From 9e6906c00c49186189d211dc96e132d85e7ff641 Mon Sep 17 00:00:00 2001 From: iximeow Date: Thu, 13 Jul 2023 00:51:51 -0700 Subject: reorganize the whole thing into crates/ packages --- ci-lib-native/src/io.rs | 174 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 174 insertions(+) create mode 100644 ci-lib-native/src/io.rs (limited to 'ci-lib-native/src/io.rs') diff --git a/ci-lib-native/src/io.rs b/ci-lib-native/src/io.rs new file mode 100644 index 0000000..d41349c --- /dev/null +++ b/ci-lib-native/src/io.rs @@ -0,0 +1,174 @@ +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use futures_util::StreamExt; +use tokio::fs::File; +use std::io::Write; +use tokio::fs::OpenOptions; +use std::task::{Poll, Context}; +use std::pin::Pin; +use std::time::{UNIX_EPOCH, SystemTime}; +use std::sync::{Arc, Mutex}; + +#[derive(Clone)] +pub struct VecSink { + body: Arc>>, +} + +impl VecSink { + pub fn new() -> Self { + Self { body: Arc::new(Mutex::new(Vec::new())) } + } + + pub fn take_buf(&self) -> Vec { + std::mem::replace(&mut *self.body.lock().unwrap(), Vec::new()) + } +} + +impl tokio::io::AsyncWrite for VecSink { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &[u8] + ) -> Poll> { + self.body.lock().unwrap().extend_from_slice(buf); + Poll::Ready(Ok(buf.len())) + } + + fn poll_flush( + self: Pin<&mut Self>, + _cx: &mut Context + ) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + _cx: &mut Context + ) -> Poll> { + Poll::Ready(Ok(())) + } +} + +pub struct ArtifactStream { + sender: hyper::body::Sender, +} + +impl ArtifactStream { + pub fn new(sender: hyper::body::Sender) -> Self { + Self { sender } + } +} + +impl tokio::io::AsyncWrite for ArtifactStream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &[u8] + ) -> Poll> { + match self.get_mut().sender.try_send_data(buf.to_vec().into()) { + Ok(()) => { + Poll::Ready(Ok(buf.len())) + }, + _ => { + cx.waker().wake_by_ref(); + Poll::Pending + } + } + } + + fn poll_flush( + self: Pin<&mut Self>, + _cx: &mut Context + ) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + _cx: &mut Context + ) -> Poll> { + Poll::Ready(Ok(())) + } +} + + +pub struct ArtifactDescriptor { + job_id: u64, + pub artifact_id: u64, + file: File, +} + +impl ArtifactDescriptor { + pub async fn new(job_id: u64, artifact_id: u64) -> Result { + // TODO: jobs should be a configurable path + let path = format!("artifacts/{}/{}", job_id, artifact_id); + let file = OpenOptions::new() + .read(true) + .write(true) + .create_new(true) + .open(&path) + .await + .map_err(|e| format!("couldn't open artifact file {}: {}", path, e))?; + + Ok(ArtifactDescriptor { + job_id, + artifact_id, + file, + }) + } + + pub async fn store_all(&mut self, mut data: axum::extract::BodyStream) -> Result<(), String> { + loop { + let chunk = data.next().await; + + let chunk = match chunk { + Some(Ok(chunk)) => chunk, + Some(Err(e)) => { + eprintln!("error: {:?}", e); + return Err(format!("error reading: {:?}", e)); + } + None => { + eprintln!("body done?"); + return Ok(()); + } + }; + + let chunk = chunk.as_ref(); + + self.file.write_all(chunk).await + .map_err(|e| format!("failed to write: {:?}", e))?; + } + } +} + +pub async fn forward_data(source: &mut (impl AsyncRead + Unpin), dest: &mut (impl AsyncWrite + Unpin)) -> Result<(), String> { + let mut buf = vec![0; 1024 * 1024]; + loop { + let n_read = source.read(&mut buf).await + .map_err(|e| format!("failed to read: {:?}", e))?; + + if n_read == 0 { + eprintln!("done reading!"); + return Ok(()); + } + + dest.write_all(&buf[..n_read]).await + .map_err(|e| format!("failed to write: {:?}", e))?; + } +} +/* +pub async fn forward_data(source: &mut (impl AsyncRead + Unpin), dest: &mut ArtifactStream) -> Result<(), String> { + let mut buf = vec![0; 1024 * 1024]; + loop { + let n_read = source.read(&mut buf).await + .map_err(|e| format!("failed to read: {:?}", e))?; + + if n_read == 0 { + eprintln!("done reading!"); + return Ok(()); + } + + dest.sender.send_data(buf[..n_read].to_vec().into()).await + .map_err(|e| format!("failed to write: {:?}", e))?; + } +} +*/ -- cgit v1.1