diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/ci_ctl.rs | 1 | ||||
| -rw-r--r-- | src/ci_driver.rs | 1 | ||||
| -rw-r--r-- | src/ci_runner.rs | 28 | ||||
| -rw-r--r-- | src/dbctx.rs | 49 | ||||
| -rw-r--r-- | src/io.rs | 81 | ||||
| -rw-r--r-- | src/lua/mod.rs | 10 | 
6 files changed, 98 insertions, 72 deletions
| diff --git a/src/ci_ctl.rs b/src/ci_ctl.rs index 687aa46..f6c55be 100644 --- a/src/ci_ctl.rs +++ b/src/ci_ctl.rs @@ -3,6 +3,7 @@ use clap::{Parser, Subcommand};  mod sql;  mod dbctx;  mod notifier; +mod io;  use sql::JobState;  use dbctx::DbCtx; diff --git a/src/ci_driver.rs b/src/ci_driver.rs index 3be49f4..da44fda 100644 --- a/src/ci_driver.rs +++ b/src/ci_driver.rs @@ -24,6 +24,7 @@ use serde_json::json;  mod dbctx;  mod sql;  mod notifier; +mod io;  use crate::dbctx::{DbCtx, PendingJob};  use crate::sql::JobResult; diff --git a/src/ci_runner.rs b/src/ci_runner.rs index a88b135..3c0104f 100644 --- a/src/ci_runner.rs +++ b/src/ci_runner.rs @@ -14,6 +14,7 @@ use std::pin::Pin;  use std::marker::Unpin;  mod lua; +mod io;  #[derive(Debug)]  enum WorkAcquireError { @@ -75,21 +76,6 @@ pub struct RunningJob {      client: RunnerClient,  } -async fn forward_data(mut source: impl AsyncRead + Unpin, mut dest: impl AsyncWrite + Unpin) -> Result<(), String> { -    let mut buf = vec![0; 1024 * 1024]; -    loop { -        let n_read = source.read(&mut buf).await -            .map_err(|e| format!("failed to read: {:?}", e))?; - -        if n_read == 0 { -            return Ok(()); -        } - -        dest.write_all(&buf[..n_read]).await -            .map_err(|e| format!("failed to write: {:?}", e))?; -    } -} -  impl RunningJob {      async fn send_metric(&mut self, name: &str, value: String) -> Result<(), String> {          self.client.send(serde_json::json!({ @@ -182,11 +168,11 @@ impl RunningJob {      async fn execute_command(&self, mut command: Command, name: &str, desc: &str) -> Result<ExitStatus, String> {          eprintln!("[.] running {}", name); -        let stdout_artifact = self.create_artifact( +        let mut stdout_artifact = self.create_artifact(              &format!("{} (stdout)", name),              &format!("{} (stdout)", desc)          ).await.expect("works"); -        let stderr_artifact = self.create_artifact( +        let mut stderr_artifact = self.create_artifact(              &format!("{} (stderr)", name),              &format!("{} (stderr)", desc)          ).await.expect("works"); @@ -198,13 +184,13 @@ impl RunningJob {              .spawn()              .map_err(|e| format!("failed to spawn '{}', {:?}", name, e))?; -        let child_stdout = child.stdout.take().unwrap(); -        let child_stderr = child.stderr.take().unwrap(); +        let mut child_stdout = child.stdout.take().unwrap(); +        let mut child_stderr = child.stderr.take().unwrap();          eprintln!("[.] '{}': forwarding stdout", name); -        tokio::spawn(forward_data(child_stdout, stdout_artifact)); +        tokio::spawn(async move { crate::io::forward_data(&mut child_stdout, &mut stdout_artifact).await });          eprintln!("[.] '{}': forwarding stderr", name); -        tokio::spawn(forward_data(child_stderr, stderr_artifact)); +        tokio::spawn(async move { crate::io::forward_data(&mut child_stderr, &mut stderr_artifact).await });          let res = child.wait().await              .map_err(|e| format!("failed to wait? {:?}", e))?; diff --git a/src/dbctx.rs b/src/dbctx.rs index cb74010..804f083 100644 --- a/src/dbctx.rs +++ b/src/dbctx.rs @@ -7,6 +7,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt};  use std::path::Path;  use std::path::PathBuf; +use crate::io::ArtifactDescriptor;  use crate::notifier::{RemoteNotifier, NotifierConfig};  use crate::sql; @@ -45,54 +46,6 @@ pub struct ArtifactRecord {      pub desc: String  } -pub struct ArtifactDescriptor { -    job_id: u64, -    artifact_id: u64, -    file: File, -} - -impl ArtifactDescriptor { -    async fn new(job_id: u64, artifact_id: u64) -> Result<Self, String> { -        // TODO: jobs should be a configurable path -        let path = format!("jobs/{}/{}", job_id, artifact_id); -        let file = OpenOptions::new() -            .read(true) -            .write(true) -            .create_new(true) -            .open(&path) -            .await -            .map_err(|e| format!("couldn't open artifact file {}: {}", path, e))?; - -        Ok(ArtifactDescriptor { -            job_id, -            artifact_id, -            file, -        }) -    } - -    pub async fn store_all(&mut self, mut data: axum::extract::BodyStream) -> Result<(), String> { -        loop { -            let chunk = data.next().await; - -            let chunk = match chunk { -                Some(Ok(chunk)) => chunk, -                Some(Err(e)) => { -                    return Err(format!("error reading: {:?}", e)); -                } -                None => { -                    eprintln!("body done?"); -                    return Ok(()); -                } -            }; - -            let chunk = chunk.as_ref(); - -            self.file.write_all(chunk).await -                .map_err(|e| format!("failed to write: {:?}", e))?; -        } -    } -} -  impl DbCtx {      pub fn new<P: AsRef<Path>>(config_path: P, db_path: P) -> Self {          DbCtx { diff --git a/src/io.rs b/src/io.rs new file mode 100644 index 0000000..575cf65 --- /dev/null +++ b/src/io.rs @@ -0,0 +1,81 @@ +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use futures_util::StreamExt; +use tokio::fs::File; +use std::io::Write; +use tokio::fs::OpenOptions; + +pub struct ArtifactDescriptor { +    job_id: u64, +    artifact_id: u64, +    file: File, +} + +impl ArtifactDescriptor { +    pub async fn new(job_id: u64, artifact_id: u64) -> Result<Self, String> { +        // TODO: jobs should be a configurable path +        let path = format!("jobs/{}/{}", job_id, artifact_id); +        let file = OpenOptions::new() +            .read(true) +            .write(true) +            .create_new(true) +            .open(&path) +            .await +            .map_err(|e| format!("couldn't open artifact file {}: {}", path, e))?; + +        Ok(ArtifactDescriptor { +            job_id, +            artifact_id, +            file, +        }) +    } + +    pub async fn store_all(&mut self, mut data: axum::extract::BodyStream) -> Result<(), String> { +        eprintln!("new store..."); +        loop { +            eprint!("read..."); +            let chunk = data.next().await; +            eprintln!("!"); + +            let chunk = match chunk { +                Some(Ok(chunk)) => chunk, +                Some(Err(e)) => { +                    eprintln!("error: {:?}", e); +                    return Err(format!("error reading: {:?}", e)); +                } +                None => { +                    eprintln!("body done?"); +                    return Ok(()); +                } +            }; + +            let chunk = chunk.as_ref(); +            eprintln!("buf: {}", chunk.len()); + +            eprint!("write..."); +            self.file.write_all(chunk).await +                .map_err(|e| format!("failed to write: {:?}", e))?; +            eprintln!("!"); +        } +    } +} + +pub async fn forward_data(source: &mut (impl AsyncRead + Unpin), dest: &mut (impl AsyncWrite + Unpin)) -> Result<(), String> { +    let mut buf = vec![0; 1024 * 8];//1024]; +    loop { +        eprint!("read..."); +        let n_read = source.read(&mut buf).await +            .map_err(|e| format!("failed to read: {:?}", e))?; +        eprintln!("!"); + +        if n_read == 0 { +            eprintln!("done reading!"); +            return Ok(()); +        } +        eprintln!("buf: {}", n_read); + +        eprint!("write..."); +        dest.write_all(&buf[..n_read]).await +            .map_err(|e| format!("failed to write: {:?}", e))?; +        eprintln!("!"); +    } +} diff --git a/src/lua/mod.rs b/src/lua/mod.rs index 936c94a..26d2d5b 100644 --- a/src/lua/mod.rs +++ b/src/lua/mod.rs @@ -162,11 +162,15 @@ impl BuildEnv {                  .build()                  .unwrap();              rt.block_on(async move { -                let artifact = job_ref.lock().unwrap().create_artifact(&name, &format!("{} (from {})", name, path.display())).await +                let mut artifact = job_ref.lock().unwrap().create_artifact(&name, &format!("{} (from {})", name, path.display())).await                      .map_err(|e| LuaError::RuntimeError(format!("create_artifact error: {:?}", e)))                      .unwrap(); -                crate::forward_data(tokio::fs::File::open(&format!("tmpdir/{}", path.display())).await.unwrap(), artifact).await -                    .map_err(|e| LuaError::RuntimeError(format!("failed uploading data for {}: {:?}", name, e))) +                let mut file = tokio::fs::File::open(&format!("tmpdir/{}", path.display())).await.unwrap(); +                eprintln!("uploading..."); +                crate::io::forward_data(&mut file, &mut artifact).await +                    .map_err(|e| LuaError::RuntimeError(format!("failed uploading data for {}: {:?}", name, e)))?; +                std::mem::drop(artifact); +                Ok(())              })          })              .map_err(|e| format!("problem defining metric function: {:?}", e))?; | 
