summaryrefslogtreecommitdiff
path: root/src/io.rs
diff options
context:
space:
mode:
authoriximeow <me@iximeow.net>2023-07-13 00:51:51 -0700
committeriximeow <me@iximeow.net>2023-07-13 00:51:51 -0700
commit9e6906c00c49186189d211dc96e132d85e7ff641 (patch)
tree05c20145ebc306313e3a12dc73c34b5dea40bbdc /src/io.rs
parent543150f1666690351d4698421cc6ceb115c1e251 (diff)
reorganize the whole thing into crates/ packages
Diffstat (limited to 'src/io.rs')
-rw-r--r--src/io.rs181
1 files changed, 0 insertions, 181 deletions
diff --git a/src/io.rs b/src/io.rs
deleted file mode 100644
index f9f407f..0000000
--- a/src/io.rs
+++ /dev/null
@@ -1,181 +0,0 @@
-use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
-use futures_util::StreamExt;
-use tokio::fs::File;
-use std::io::Write;
-use tokio::fs::OpenOptions;
-use std::task::{Poll, Context};
-use std::pin::Pin;
-use std::time::{UNIX_EPOCH, SystemTime};
-use std::sync::{Arc, Mutex};
-
-pub fn now_ms() -> u64 {
- SystemTime::now()
- .duration_since(UNIX_EPOCH)
- .expect("now is later than epoch")
- .as_millis() as u64
-}
-
-#[derive(Clone)]
-pub struct VecSink {
- body: Arc<Mutex<Vec<u8>>>,
-}
-
-impl VecSink {
- pub fn new() -> Self {
- Self { body: Arc::new(Mutex::new(Vec::new())) }
- }
-
- pub fn take_buf(&self) -> Vec<u8> {
- std::mem::replace(&mut *self.body.lock().unwrap(), Vec::new())
- }
-}
-
-impl tokio::io::AsyncWrite for VecSink {
- fn poll_write(
- self: Pin<&mut Self>,
- cx: &mut Context,
- buf: &[u8]
- ) -> Poll<Result<usize, std::io::Error>> {
- self.body.lock().unwrap().extend_from_slice(buf);
- Poll::Ready(Ok(buf.len()))
- }
-
- fn poll_flush(
- self: Pin<&mut Self>,
- _cx: &mut Context
- ) -> Poll<Result<(), std::io::Error>> {
- Poll::Ready(Ok(()))
- }
-
- fn poll_shutdown(
- self: Pin<&mut Self>,
- _cx: &mut Context
- ) -> Poll<Result<(), std::io::Error>> {
- Poll::Ready(Ok(()))
- }
-}
-
-pub struct ArtifactStream {
- sender: hyper::body::Sender,
-}
-
-impl ArtifactStream {
- pub fn new(sender: hyper::body::Sender) -> Self {
- Self { sender }
- }
-}
-
-impl tokio::io::AsyncWrite for ArtifactStream {
- fn poll_write(
- self: Pin<&mut Self>,
- cx: &mut Context,
- buf: &[u8]
- ) -> Poll<Result<usize, std::io::Error>> {
- match self.get_mut().sender.try_send_data(buf.to_vec().into()) {
- Ok(()) => {
- Poll::Ready(Ok(buf.len()))
- },
- _ => {
- cx.waker().wake_by_ref();
- Poll::Pending
- }
- }
- }
-
- fn poll_flush(
- self: Pin<&mut Self>,
- _cx: &mut Context
- ) -> Poll<Result<(), std::io::Error>> {
- Poll::Ready(Ok(()))
- }
-
- fn poll_shutdown(
- self: Pin<&mut Self>,
- _cx: &mut Context
- ) -> Poll<Result<(), std::io::Error>> {
- Poll::Ready(Ok(()))
- }
-}
-
-
-pub struct ArtifactDescriptor {
- job_id: u64,
- pub artifact_id: u64,
- file: File,
-}
-
-impl ArtifactDescriptor {
- pub async fn new(job_id: u64, artifact_id: u64) -> Result<Self, String> {
- // TODO: jobs should be a configurable path
- let path = format!("artifacts/{}/{}", job_id, artifact_id);
- let file = OpenOptions::new()
- .read(true)
- .write(true)
- .create_new(true)
- .open(&path)
- .await
- .map_err(|e| format!("couldn't open artifact file {}: {}", path, e))?;
-
- Ok(ArtifactDescriptor {
- job_id,
- artifact_id,
- file,
- })
- }
-
- pub async fn store_all(&mut self, mut data: axum::extract::BodyStream) -> Result<(), String> {
- loop {
- let chunk = data.next().await;
-
- let chunk = match chunk {
- Some(Ok(chunk)) => chunk,
- Some(Err(e)) => {
- eprintln!("error: {:?}", e);
- return Err(format!("error reading: {:?}", e));
- }
- None => {
- eprintln!("body done?");
- return Ok(());
- }
- };
-
- let chunk = chunk.as_ref();
-
- self.file.write_all(chunk).await
- .map_err(|e| format!("failed to write: {:?}", e))?;
- }
- }
-}
-
-pub async fn forward_data(source: &mut (impl AsyncRead + Unpin), dest: &mut (impl AsyncWrite + Unpin)) -> Result<(), String> {
- let mut buf = vec![0; 1024 * 1024];
- loop {
- let n_read = source.read(&mut buf).await
- .map_err(|e| format!("failed to read: {:?}", e))?;
-
- if n_read == 0 {
- eprintln!("done reading!");
- return Ok(());
- }
-
- dest.write_all(&buf[..n_read]).await
- .map_err(|e| format!("failed to write: {:?}", e))?;
- }
-}
-/*
-pub async fn forward_data(source: &mut (impl AsyncRead + Unpin), dest: &mut ArtifactStream) -> Result<(), String> {
- let mut buf = vec![0; 1024 * 1024];
- loop {
- let n_read = source.read(&mut buf).await
- .map_err(|e| format!("failed to read: {:?}", e))?;
-
- if n_read == 0 {
- eprintln!("done reading!");
- return Ok(());
- }
-
- dest.sender.send_data(buf[..n_read].to_vec().into()).await
- .map_err(|e| format!("failed to write: {:?}", e))?;
- }
-}
-*/