From 322ed4cd914b92a18bfee33f33e23649f3035f03 Mon Sep 17 00:00:00 2001 From: Andy Wortman Date: Sat, 15 Jul 2017 20:46:30 -0700 Subject: more help from rschifflin to adapt the Stream into a Stream> delimited by byte-of-choice (here 0x0a) --- main.rs | 62 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 60 insertions(+), 2 deletions(-) (limited to 'main.rs') diff --git a/main.rs b/main.rs index 828cc22..d13f90c 100644 --- a/main.rs +++ b/main.rs @@ -132,9 +132,10 @@ fn main() { */ let work = client.request(req).and_then(|res| { - res.body() + LineStream::new(res.body() .map(|chunk| futures::stream::iter(chunk.into_iter().map(|b| -> Result { Ok(b) }))) - .flatten().for_each(|byte| Ok(print!("{}", byte as char))) + .flatten()) + .for_each(|s| Ok(print!("{}", str::from_utf8(&s).unwrap()))) }); println!("Before?"); @@ -147,3 +148,60 @@ fn main() { }*/ } + +//extern crate futures; +//use futures::stream::Stream; +//use futures::{Future, Poll, Async}; +use futures::{Poll, Async}; +/* +fn main() { + let lines = "line 1.\nline 2...\n LINE 3 \n".as_bytes(); + let bytestream = futures::stream::iter(lines.iter().map(|byte| -> Result<_, ()> { Ok(*byte) })); + let linestream = LineStream::new(bytestream); + + linestream.for_each(|line| { + println!("Bytes: {:?}", line); + println!("Line: {}", String::from_utf8(line).unwrap()); + Ok(()) + }).wait().unwrap() +} +*/ + +struct LineStream where S: Stream { + stream: S, + progress: Vec +} + +impl LineStream where S: Stream + Sized { + pub fn new(stream: S) -> LineStream { + LineStream { + stream: stream, + progress: vec![] + } + } +} + +impl Stream for LineStream where S: Stream { + type Item = Vec; + type Error = E; + + fn poll(&mut self) -> Poll, Self::Error> { + loop { + match self.stream.poll() { + Ok(Async::Ready(Some(byte))) => { + if byte == 0x0a { + let mut new_vec = vec![]; + std::mem::swap(&mut self.progress, &mut new_vec); + return Ok(Async::Ready(Some(new_vec))) + } else { + self.progress.push(byte) + } + }, + Ok(Async::Ready(None)) => return Ok(Async::Ready(None)), + Ok(Async::NotReady) => return Ok(Async::NotReady), + Err(e) => return Err(e) + } + } + } +} + -- cgit v1.1