aboutsummaryrefslogtreecommitdiff
path: root/src/linestream.rs
diff options
context:
space:
mode:
authorAndy Wortman <ixineeringeverywhere@gmail.com>2017-10-02 01:27:08 -0700
committerAndy Wortman <ixineeringeverywhere@gmail.com>2017-10-02 01:27:18 -0700
commitea4e93f01d9e4ef17effae1e9a807bb1977865fe (patch)
tree08cfbcc32b9fbea5f13fd3447026090f51402274 /src/linestream.rs
parentafd61ae0822690f30d37859c806a8d8d843b8c1a (diff)
move everything to src/
Diffstat (limited to 'src/linestream.rs')
-rw-r--r--src/linestream.rs42
1 files changed, 42 insertions, 0 deletions
diff --git a/src/linestream.rs b/src/linestream.rs
new file mode 100644
index 0000000..5106af3
--- /dev/null
+++ b/src/linestream.rs
@@ -0,0 +1,42 @@
+use std;
+use futures::stream::Stream;
+use futures::{Poll, Async};
+
+pub struct LineStream<S, E> where S: Stream<Item=u8, Error=E> {
+ stream: S,
+ progress: Vec<u8>
+}
+
+impl<S,E> LineStream<S, E> where S: Stream<Item=u8, Error=E> + Sized {
+ pub fn new(stream: S) -> LineStream<S, E> {
+ LineStream {
+ stream: stream,
+ progress: vec![]
+ }
+ }
+}
+
+impl<S, E> Stream for LineStream<S, E> where S: Stream<Item=u8, Error=E> {
+ type Item = Vec<u8>;
+ type Error = E;
+
+ fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
+ loop {
+ match self.stream.poll() {
+ Ok(Async::Ready(Some(byte))) => {
+ if byte == 0x0a {
+ let mut new_vec = vec![];
+ std::mem::swap(&mut self.progress, &mut new_vec);
+ return Ok(Async::Ready(Some(new_vec)))
+ } else {
+ self.progress.push(byte)
+ }
+ },
+ Ok(Async::Ready(None)) => return Ok(Async::Ready(None)),
+ Ok(Async::NotReady) => return Ok(Async::NotReady),
+ Err(e) => return Err(e)
+ }
+ }
+ }
+}
+