diff options
| author | Andy Wortman <me@iximeow.net> | 2017-07-15 20:46:30 -0700 | 
|---|---|---|
| committer | Andy Wortman <me@iximeow.net> | 2017-07-15 20:46:30 -0700 | 
| commit | 322ed4cd914b92a18bfee33f33e23649f3035f03 (patch) | |
| tree | e1de3c72678282084054459e4aa36f20532e1d0b | |
| parent | b888282e1dc56cd7a63d88e9426488ab2772c0ef (diff) | |
more help from rschifflin to adapt the Stream<Item=u8> into a Stream<Item=Vec<u8>> delimited by byte-of-choice (here 0x0a)
| -rw-r--r-- | main.rs | 62 | 
1 files changed, 60 insertions, 2 deletions
@@ -132,9 +132,10 @@ fn main() {      */      let work = client.request(req).and_then(|res| { -        res.body() +        LineStream::new(res.body()              .map(|chunk| futures::stream::iter(chunk.into_iter().map(|b| -> Result<u8, hyper::Error> { Ok(b) }))) -            .flatten().for_each(|byte| Ok(print!("{}", byte as char))) +            .flatten()) +            .for_each(|s| Ok(print!("{}", str::from_utf8(&s).unwrap())))      });      println!("Before?"); @@ -147,3 +148,60 @@ fn main() {      }*/  } + +//extern crate futures; +//use futures::stream::Stream; +//use futures::{Future, Poll, Async}; +use futures::{Poll, Async}; +/* +fn main() { +  let lines = "line 1.\nline 2...\n   LINE 3  \n".as_bytes(); +  let bytestream = futures::stream::iter(lines.iter().map(|byte| -> Result<_, ()> { Ok(*byte) })); +  let linestream = LineStream::new(bytestream); +   +  linestream.for_each(|line| { +    println!("Bytes: {:?}", line); +    println!("Line: {}", String::from_utf8(line).unwrap()); +    Ok(()) +  }).wait().unwrap() +} +*/ + +struct LineStream<S, E> where S: Stream<Item=u8, Error=E> { +  stream: S, +  progress: Vec<u8> +} + +impl<S,E> LineStream<S, E> where S: Stream<Item=u8, Error=E> + Sized { +  pub fn new(stream: S) -> LineStream<S, E> { +    LineStream { +      stream: stream, +      progress: vec![] +    } +  } +} + +impl<S, E> Stream for LineStream<S, E> where S: Stream<Item=u8, Error=E> { +  type Item = Vec<u8>; +  type Error = E; +   +  fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { +    loop { +      match self.stream.poll() { +        Ok(Async::Ready(Some(byte))) => { +          if byte == 0x0a {  +            let mut new_vec = vec![]; +            std::mem::swap(&mut self.progress, &mut new_vec); +            return Ok(Async::Ready(Some(new_vec))) +          } else { +            self.progress.push(byte) +          } +        }, +        Ok(Async::Ready(None)) => return Ok(Async::Ready(None)), +        Ok(Async::NotReady) => return Ok(Async::NotReady), +        Err(e) => return Err(e) +      } +    } +  } +} +  | 
