aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndy Wortman <me@iximeow.net>2017-07-15 20:46:30 -0700
committerAndy Wortman <me@iximeow.net>2017-07-15 20:46:30 -0700
commit322ed4cd914b92a18bfee33f33e23649f3035f03 (patch)
treee1de3c72678282084054459e4aa36f20532e1d0b
parentb888282e1dc56cd7a63d88e9426488ab2772c0ef (diff)
more help from rschifflin to adapt the Stream<Item=u8> into a Stream<Item=Vec<u8>> delimited by byte-of-choice (here 0x0a)
-rw-r--r--main.rs62
1 files changed, 60 insertions, 2 deletions
diff --git a/main.rs b/main.rs
index 828cc22..d13f90c 100644
--- a/main.rs
+++ b/main.rs
@@ -132,9 +132,10 @@ fn main() {
*/
let work = client.request(req).and_then(|res| {
- res.body()
+ LineStream::new(res.body()
.map(|chunk| futures::stream::iter(chunk.into_iter().map(|b| -> Result<u8, hyper::Error> { Ok(b) })))
- .flatten().for_each(|byte| Ok(print!("{}", byte as char)))
+ .flatten())
+ .for_each(|s| Ok(print!("{}", str::from_utf8(&s).unwrap())))
});
println!("Before?");
@@ -147,3 +148,60 @@ fn main() {
}*/
}
+
+//extern crate futures;
+//use futures::stream::Stream;
+//use futures::{Future, Poll, Async};
+use futures::{Poll, Async};
+/*
+fn main() {
+ let lines = "line 1.\nline 2...\n LINE 3 \n".as_bytes();
+ let bytestream = futures::stream::iter(lines.iter().map(|byte| -> Result<_, ()> { Ok(*byte) }));
+ let linestream = LineStream::new(bytestream);
+
+ linestream.for_each(|line| {
+ println!("Bytes: {:?}", line);
+ println!("Line: {}", String::from_utf8(line).unwrap());
+ Ok(())
+ }).wait().unwrap()
+}
+*/
+
+struct LineStream<S, E> where S: Stream<Item=u8, Error=E> {
+ stream: S,
+ progress: Vec<u8>
+}
+
+impl<S,E> LineStream<S, E> where S: Stream<Item=u8, Error=E> + Sized {
+ pub fn new(stream: S) -> LineStream<S, E> {
+ LineStream {
+ stream: stream,
+ progress: vec![]
+ }
+ }
+}
+
+impl<S, E> Stream for LineStream<S, E> where S: Stream<Item=u8, Error=E> {
+ type Item = Vec<u8>;
+ type Error = E;
+
+ fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ loop {
+ match self.stream.poll() {
+ Ok(Async::Ready(Some(byte))) => {
+ if byte == 0x0a {
+ let mut new_vec = vec![];
+ std::mem::swap(&mut self.progress, &mut new_vec);
+ return Ok(Async::Ready(Some(new_vec)))
+ } else {
+ self.progress.push(byte)
+ }
+ },
+ Ok(Async::Ready(None)) => return Ok(Async::Ready(None)),
+ Ok(Async::NotReady) => return Ok(Async::NotReady),
+ Err(e) => return Err(e)
+ }
+ }
+ }
+}
+