diff options
author | Andy Wortman <me@iximeow.net> | 2017-07-15 20:46:30 -0700 |
---|---|---|
committer | Andy Wortman <me@iximeow.net> | 2017-07-15 20:46:30 -0700 |
commit | 322ed4cd914b92a18bfee33f33e23649f3035f03 (patch) | |
tree | e1de3c72678282084054459e4aa36f20532e1d0b | |
parent | b888282e1dc56cd7a63d88e9426488ab2772c0ef (diff) |
more help from rschifflin to adapt the Stream<Item=u8> into a Stream<Item=Vec<u8>> delimited by byte-of-choice (here 0x0a)
-rw-r--r-- | main.rs | 62 |
1 files changed, 60 insertions, 2 deletions
@@ -132,9 +132,10 @@ fn main() { */ let work = client.request(req).and_then(|res| { - res.body() + LineStream::new(res.body() .map(|chunk| futures::stream::iter(chunk.into_iter().map(|b| -> Result<u8, hyper::Error> { Ok(b) }))) - .flatten().for_each(|byte| Ok(print!("{}", byte as char))) + .flatten()) + .for_each(|s| Ok(print!("{}", str::from_utf8(&s).unwrap()))) }); println!("Before?"); @@ -147,3 +148,60 @@ fn main() { }*/ } + +//extern crate futures; +//use futures::stream::Stream; +//use futures::{Future, Poll, Async}; +use futures::{Poll, Async}; +/* +fn main() { + let lines = "line 1.\nline 2...\n LINE 3 \n".as_bytes(); + let bytestream = futures::stream::iter(lines.iter().map(|byte| -> Result<_, ()> { Ok(*byte) })); + let linestream = LineStream::new(bytestream); + + linestream.for_each(|line| { + println!("Bytes: {:?}", line); + println!("Line: {}", String::from_utf8(line).unwrap()); + Ok(()) + }).wait().unwrap() +} +*/ + +struct LineStream<S, E> where S: Stream<Item=u8, Error=E> { + stream: S, + progress: Vec<u8> +} + +impl<S,E> LineStream<S, E> where S: Stream<Item=u8, Error=E> + Sized { + pub fn new(stream: S) -> LineStream<S, E> { + LineStream { + stream: stream, + progress: vec![] + } + } +} + +impl<S, E> Stream for LineStream<S, E> where S: Stream<Item=u8, Error=E> { + type Item = Vec<u8>; + type Error = E; + + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { + loop { + match self.stream.poll() { + Ok(Async::Ready(Some(byte))) => { + if byte == 0x0a { + let mut new_vec = vec![]; + std::mem::swap(&mut self.progress, &mut new_vec); + return Ok(Async::Ready(Some(new_vec))) + } else { + self.progress.push(byte) + } + }, + Ok(Async::Ready(None)) => return Ok(Async::Ready(None)), + Ok(Async::NotReady) => return Ok(Async::NotReady), + Err(e) => return Err(e) + } + } + } +} + |