aboutsummaryrefslogtreecommitdiff
path: root/src/linestream.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/linestream.rs')
-rw-r--r--src/linestream.rs42
1 files changed, 42 insertions, 0 deletions
diff --git a/src/linestream.rs b/src/linestream.rs
new file mode 100644
index 0000000..5106af3
--- /dev/null
+++ b/src/linestream.rs
@@ -0,0 +1,42 @@
+use std;
+use futures::stream::Stream;
+use futures::{Poll, Async};
+
+pub struct LineStream<S, E> where S: Stream<Item=u8, Error=E> {
+ stream: S,
+ progress: Vec<u8>
+}
+
+impl<S,E> LineStream<S, E> where S: Stream<Item=u8, Error=E> + Sized {
+ pub fn new(stream: S) -> LineStream<S, E> {
+ LineStream {
+ stream: stream,
+ progress: vec![]
+ }
+ }
+}
+
+impl<S, E> Stream for LineStream<S, E> where S: Stream<Item=u8, Error=E> {
+ type Item = Vec<u8>;
+ type Error = E;
+
+ fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ loop {
+ match self.stream.poll() {
+ Ok(Async::Ready(Some(byte))) => {
+ if byte == 0x0a {
+ let mut new_vec = vec![];
+ std::mem::swap(&mut self.progress, &mut new_vec);
+ return Ok(Async::Ready(Some(new_vec)))
+ } else {
+ self.progress.push(byte)
+ }
+ },
+ Ok(Async::Ready(None)) => return Ok(Async::Ready(None)),
+ Ok(Async::NotReady) => return Ok(Async::NotReady),
+ Err(e) => return Err(e)
+ }
+ }
+ }
+}
+