aboutsummaryrefslogtreecommitdiff
path: root/linestream.rs
diff options
context:
space:
mode:
authorAndy Wortman <ixineeringeverywhere@gmail.com>2017-10-01 23:25:46 -0700
committerAndy Wortman <ixineeringeverywhere@gmail.com>2017-10-01 23:25:46 -0700
commit943824e02fa771fa8350e4da90f2c9591ec4647e (patch)
treed04f8509e442e74e89cb6e7204d7fb4455d15ccd /linestream.rs
parente6ebf2c99a70bd5ee4e8d07097e6b128c3630714 (diff)
yank out more parts, decouple events and display
Diffstat (limited to 'linestream.rs')
-rw-r--r--linestream.rs42
1 files changed, 42 insertions, 0 deletions
diff --git a/linestream.rs b/linestream.rs
new file mode 100644
index 0000000..5106af3
--- /dev/null
+++ b/linestream.rs
@@ -0,0 +1,42 @@
+use std;
+use futures::stream::Stream;
+use futures::{Poll, Async};
+
+pub struct LineStream<S, E> where S: Stream<Item=u8, Error=E> {
+ stream: S,
+ progress: Vec<u8>
+}
+
+impl<S,E> LineStream<S, E> where S: Stream<Item=u8, Error=E> + Sized {
+ pub fn new(stream: S) -> LineStream<S, E> {
+ LineStream {
+ stream: stream,
+ progress: vec![]
+ }
+ }
+}
+
+impl<S, E> Stream for LineStream<S, E> where S: Stream<Item=u8, Error=E> {
+ type Item = Vec<u8>;
+ type Error = E;
+
+ fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ loop {
+ match self.stream.poll() {
+ Ok(Async::Ready(Some(byte))) => {
+ if byte == 0x0a {
+ let mut new_vec = vec![];
+ std::mem::swap(&mut self.progress, &mut new_vec);
+ return Ok(Async::Ready(Some(new_vec)))
+ } else {
+ self.progress.push(byte)
+ }
+ },
+ Ok(Async::Ready(None)) => return Ok(Async::Ready(None)),
+ Ok(Async::NotReady) => return Ok(Async::NotReady),
+ Err(e) => return Err(e)
+ }
+ }
+ }
+}
+