From 4415a995272b97dd7774a0b071218878f6c450f4 Mon Sep 17 00:00:00 2001 From: iximeow Date: Sun, 10 Sep 2017 13:04:49 -0700 Subject: support only serializing non-None fields (i hope) also move application to main thread, twitter connection to.. not main thread. now when twitter fails to connect or the connection drops, the application continues ticking along. --- Cargo.toml | 2 + main.rs | 512 ++++++++++++++++++++++++++++++++++++++++++------------------- 2 files changed, 357 insertions(+), 157 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 5d67061..6a0653c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,3 +27,5 @@ hyper-tls = "*" "serde_json" = "*" "chan" = "*" chrono = "0.4" +serde = "*" +serde_derive = "*" diff --git a/main.rs b/main.rs index d4f108b..f47f2f3 100644 --- a/main.rs +++ b/main.rs @@ -7,6 +7,7 @@ use std::str; extern crate url; #[macro_use] extern crate hyper; +#[macro_use] extern crate serde_derive; extern crate oauthcli; extern crate tokio_core; extern crate futures; @@ -22,15 +23,15 @@ use hyper_tls::HttpsConnector; //use json_streamer::JsonObjectStreamer; //Change these values to your real Twitter API credentials -static consumer_key: &str = "0af9c1AoEi5X7IjtOKAtP60Za"; -static consumer_secret: &str = "1fxEzRhQtQSWKus4oqDwdg5DALIjGpINg0PGjkYVwKT8EEMFCh"; -static token: &str = "629126745-VePBD9ciKwpuVuIeEcNnxwxQFNWDXEy8KL3dGRRg"; -static token_secret: &str = "uAAruZzJu03NvMlH6cTeGku7NqVPro1ddKN4BxORy5hWG"; +static consumer_key: &str = "T879tHWDzd6LvKWdYVfbJL4Su"; +static consumer_secret: &str = "OAXXYYIozAZ4vWSmDziI1EMJCKXPmWPFgLbJpB896iIAMIAdpb"; +static token: &str = "629126745-Qt6LPq2kR7w58s7WHzSqcs4CIdiue64kkfYYB7RI"; +static token_secret: &str = "3BI3YC4WVbKW5icpHORWpsTYqYIj5oAZFkrgyIAoaoKnK"; static lol_auth_token: &str = "641cdf3a4bbddb72c118b5821e8696aee6300a9a"; static streamurl: &str = "https://userstream.twitter.com/1.1/user.json?tweet_mode=extended"; static tweet_lookup_url: &str = "https://api.twitter.com/1.1/statuses/show.json?tweet_mode=extended"; -static user_lookup_url: &str = "https://api.twitter.com/1.1/users/lookup.json"; +static user_lookup_url: &str = "https://api.twitter.com/1.1/users/show.json"; header! { (Authorization, "Authorization") => [String] } header! { (Accept, "Accept") => [String] } @@ -38,6 +39,9 @@ header! { (ContentType, "Content-Type") => [String] } header! { (Cookie, "cookie") => [String] } mod tw { + use std::path::Path; + use std::fs::File; + use std::io::{BufRead, BufReader, Read}; extern crate chrono; use self::chrono::prelude::*; @@ -48,7 +52,13 @@ mod tw { use tokio_core::reactor::Core; use hyper_tls::HttpsConnector; extern crate serde_json; + extern crate serde; + extern crate serde_derive; + use std::io::Write; + use std::fs::OpenOptions; + + #[derive(Debug, Serialize, Deserialize)] pub struct User { pub id: String, pub name: String, @@ -237,12 +247,17 @@ mod tw { } } + #[derive(Debug, Serialize, Deserialize)] pub struct Tweet { pub id: String, pub author_id: String, pub text: String, pub created_at: String, // lol + #[serde(skip_serializing_if="Option::is_none")] + #[serde(default = "Option::default")] pub quoted_tweet_id: Option, + #[serde(skip_serializing_if="Option::is_none")] + #[serde(default = "Option::default")] pub rt_tweet: Option } @@ -324,18 +339,29 @@ mod tw { twete_text } + #[derive(Serialize, Deserialize)] pub struct TwitterCache { - users: HashMap, - tweets: HashMap, + #[serde(skip)] + pub users: HashMap, + #[serde(skip)] + pub tweets: HashMap, following: HashSet, following_history: HashMap, // userid:date?? - followers: HashSet, + pub followers: HashSet, lost_followers: HashSet, follower_history: HashMap, // userid:date?? - queryer: Option<(hyper::client::Client>, Core)> + #[serde(skip)] + pub needs_save: bool, + #[serde(skip)] + pub caching_permitted: bool } impl TwitterCache { + const PROFILE_DIR: &'static str = "cache/"; + const TWEET_CACHE: &'static str = "cache/tweets.json"; + const USERS_CACHE: &'static str = "cache/users.json"; + const PROFILE_CACHE: &'static str = "cache/profile.json"; // this should involve MY user id.. + fn new() -> TwitterCache { TwitterCache { users: HashMap::new(), @@ -345,28 +371,92 @@ mod tw { followers: HashSet::new(), lost_followers: HashSet::new(), follower_history: HashMap::new(), - queryer: None + needs_save: false, + caching_permitted: true } } - pub fn with_client( - &mut self, - client: hyper::client::Client>, - core: Core - ) { - self.queryer = Some((client, core)); + fn new_without_caching() -> TwitterCache { + let mut cache = TwitterCache::new(); + cache.caching_permitted = false; + cache } fn cache_user(&mut self, user: User) { - self.users.insert(user.id.to_owned(), user); + if !self.users.contains_key(&user.id) { + let mut file = + OpenOptions::new() + .create(true) + .append(true) + .open(TwitterCache::USERS_CACHE) + .unwrap(); + writeln!(file, "{}", serde_json::to_string(&user).unwrap()); + self.users.insert(user.id.to_owned(), user); + } } fn cache_tweet(&mut self, tweet: Tweet) { - self.tweets.insert(tweet.id.to_owned(), tweet); + if !self.tweets.contains_key(&tweet.id) { + let mut file = + OpenOptions::new() + .create(true) + .append(true) + .open(TwitterCache::TWEET_CACHE) + .unwrap(); + writeln!(file, "{}", serde_json::to_string(&tweet).unwrap()); + self.tweets.insert(tweet.id.to_owned(), tweet); + } } pub fn store_cache(&self) { + if Path::new(TwitterCache::PROFILE_DIR).is_dir() { + let mut profile = OpenOptions::new() + .write(true) + .create(true) + .append(false) + .open(TwitterCache::PROFILE_CACHE) + .unwrap(); + serde_json::to_writer(profile, self).unwrap(); + } else { + println!("No cache dir exists..."); + } // store cache } pub fn load_cache() -> TwitterCache { - TwitterCache::new() + if Path::new(TwitterCache::PROFILE_CACHE).is_file() { + let mut buf = vec![]; + let mut profile = File::open(TwitterCache::PROFILE_CACHE).unwrap(); + match profile.read_to_end(&mut buf) { + Ok(sz) => { + match serde_json::from_slice(&buf) { + Ok(result) => { + let mut cache: TwitterCache = result; + cache.tweets = HashMap::new(); + for line in BufReader::new(File::open(TwitterCache::TWEET_CACHE).unwrap()).lines() { + let t: Tweet = serde_json::from_str(&line.unwrap()).unwrap(); + cache.tweets.insert(t.id.to_owned(), t); + } + for line in BufReader::new(File::open(TwitterCache::USERS_CACHE).unwrap()).lines() { + let u: User = serde_json::from_str(&line.unwrap()).unwrap(); + cache.users.insert(u.id.to_owned(), u); + } + cache.caching_permitted = true; + cache.needs_save = false; + cache + } + Err(e) => { + // TODO! should be able to un-frick profile after startup. + println!("Error reading profile, profile caching disabled... {}", e); + TwitterCache::new_without_caching() + } + } + } + Err(e) => { + println!("Error reading cached profile: {}. Profile caching disabled.", e); + TwitterCache::new_without_caching() + } + } + } else { + println!("Hello! First time setup?"); + TwitterCache::new() + } } pub fn cache_api_tweet(&mut self, json: serde_json::Value) { if let Some((rt, rt_user)) = json.get("retweeted_status").and_then(|x| Tweet::from_api_json(x.to_owned())) { @@ -389,7 +479,7 @@ mod tw { self.cache_user(user); } } - pub fn cache_api_event(&mut self, json: serde_json::Map) { + pub fn cache_api_event(&mut self, json: serde_json::Map, mut queryer: &mut ::Queryer) { /* don't really care to hold on to who fav, unfav, ... when, just pick targets out. */ match json.get("event").and_then(|x| x.as_str()) { Some("favorite") => { @@ -406,7 +496,25 @@ mod tw { Some("favorited_retweet") => ()/* cache rt */, Some("delete") => { let user_id = json["delete"]["status"]["user_id_str"].as_str().unwrap().to_string(); - self.fetch_user(&user_id); + self.fetch_user(&user_id, &mut queryer); + }, + Some("follow") => { + let follower = json["source"]["id_str"].as_str().unwrap().to_string(); + let followed = json["target"]["id_str"].as_str().unwrap().to_string(); + if follower == "iximeow" { + // self.add_follow( + } else { + self.add_follower(&follower); + } + }, + Some("unfollow") => { + let follower = json["source"]["id_str"].as_str().unwrap().to_string(); + let followed = json["target"]["id_str"].as_str().unwrap().to_string(); + if follower == "iximeow" { + // self.add_follow( + } else { + self.remove_follower(&follower); + } }, Some(_) => () /* an uninteresting event */, None => () // not really an event? should we log something? @@ -419,18 +527,18 @@ mod tw { pub fn retrieve_user(&self, user_id: &String) -> Option<&User> { self.users.get(user_id) } - pub fn fetch_tweet(&mut self, tweet_id: &String) -> Option<&Tweet> { + pub fn fetch_tweet(&mut self, tweet_id: &String, mut queryer: &mut ::Queryer) -> Option<&Tweet> { if !self.tweets.contains_key(tweet_id) { - match self.look_up_tweet(tweet_id) { + match self.look_up_tweet(tweet_id, &mut queryer) { Some(json) => self.cache_api_tweet(json), None => println!("Unable to retrieve tweet {}", tweet_id) }; } self.tweets.get(tweet_id) } - pub fn fetch_user(&mut self, user_id: &String) -> Option<&User> { + pub fn fetch_user(&mut self, user_id: &String, mut queryer: &mut ::Queryer) -> Option<&User> { if !self.users.contains_key(user_id) { - let maybe_parsed = self.look_up_user(user_id).and_then(|x| User::from_json(x)); + let maybe_parsed = self.look_up_user(user_id, &mut queryer).and_then(|x| User::from_json(x)); match maybe_parsed { Some(tw) => self.cache_user(tw), None => println!("Unable to retrieve user {}", user_id) @@ -438,6 +546,21 @@ mod tw { } self.users.get(user_id) } + pub fn set_following(&mut self, user_ids: Vec) { + let uid_set = user_ids.into_iter().collect::>(); + + let new_uids = &uid_set - &self.following; + for user in &new_uids { + println!("New following! {}", user); + self.add_following(user); + } + + let lost_uids = &self.following - &uid_set; + for user in &lost_uids { + println!("Bye, friend! {}", user); + self.remove_following(user); + } + } pub fn set_followers(&mut self, user_ids: Vec) { let uid_set = user_ids.into_iter().collect::>(); @@ -453,36 +576,80 @@ mod tw { self.remove_follower(user); } } + pub fn add_following(&mut self, user_id: &String) { + self.needs_save = true; + self.following.insert(user_id.to_owned()); + self.following_history.insert(user_id.to_owned(), ("following".to_string(), Utc::now().timestamp())); + } + pub fn remove_following(&mut self, user_id: &String) { + self.needs_save = true; + self.following.remove(user_id); + self.following_history.insert(user_id.to_owned(), ("unfollowing".to_string(), Utc::now().timestamp())); + } pub fn add_follower(&mut self, user_id: &String) { + self.needs_save = true; self.followers.insert(user_id.to_owned()); self.lost_followers.remove(user_id); self.follower_history.insert(user_id.to_owned(), ("follow".to_string(), Utc::now().timestamp())); } pub fn remove_follower(&mut self, user_id: &String) { + self.needs_save = true; self.followers.remove(user_id); self.lost_followers.insert(user_id.to_owned()); self.follower_history.insert(user_id.to_owned(), ("unfollow".to_string(), Utc::now().timestamp())); } - fn look_up_user(&mut self, id: &str) -> Option { - if let Some((ref client, ref mut core)) = self.queryer { - ::do_web_req(&format!("{}?id={}", ::user_lookup_url, id), client, core) - } else { - None - } + fn look_up_user(&mut self, id: &str, queryer: &mut ::Queryer) -> Option { + let url = &format!("{}?user_id={}", ::user_lookup_url, id); + queryer.do_api_req(url) } - fn look_up_tweet(&mut self, id: &str) -> Option { - if let Some((ref client, ref mut core)) = self.queryer { - ::do_web_req(&format!("{}?id={}", ::tweet_lookup_url, id), client, core) - } else { - None - } + fn look_up_tweet(&mut self, id: &str, queryer: &mut ::Queryer) -> Option { + let url = &format!("{}?id={}", ::tweet_lookup_url, id); + queryer.do_api_req(url) } } +} +pub struct Queryer { + client: hyper::client::Client>, + core: Core } +impl Queryer { + fn do_api_req(&mut self, url: &str) -> Option { + self.issue_request(signed_api_get(url)) + } + fn do_web_req(&mut self, url: &str) -> Option { + self.issue_request(signed_web_get(url)) + } + fn issue_request(&mut self, req: hyper::client::Request) -> Option { + let lookup = self.client.request(req); + + let resp: hyper::Response = self.core.run(lookup).unwrap(); + let status = resp.status().clone(); + + let chunks: Vec = self.core.run(resp.body().collect()).unwrap(); + + let resp_body: Vec = chunks.into_iter().flat_map(|chunk| chunk.into_iter()).collect(); + + match serde_json::from_slice(&resp_body) { + Ok(value) => { + if status != hyper::StatusCode::Ok { + println!("!! Requests returned status: {}", status); + println!("{}", value); + None + } else { + Some(value) + } + } + Err(e) => { + println!("error deserializing json: {}", e); + None + } + } + } +} fn render_twete(twete: &tw::Tweet, tweeter: &tw::TwitterCache) { @@ -518,17 +685,24 @@ fn render_twete(twete: &tw::Tweet, tweeter: &tw::TwitterCache) { fn render_twitter_event( structure: serde_json::Map, - tweeter: &mut tw::TwitterCache) { + tweeter: &mut tw::TwitterCache, + mut queryer: &mut Queryer) { if structure.contains_key("event") { - tweeter.cache_api_event(structure.clone()); + tweeter.cache_api_event(structure.clone(), &mut queryer); if let Some(event) = tw::events::Event::from_json(structure) { event.render(&tweeter); }; + } else if structure.contains_key("friends") { + let user_id_nums = structure["friends"].as_array().unwrap(); + let user_id_strs = user_id_nums.into_iter().map(|x| x.as_u64().unwrap().to_string()); + tweeter.set_following(user_id_strs.collect()); } else if structure.contains_key("delete") { println!("delete..."); let deleted_user_id = structure["delete"]["status"]["user_id_str"].as_str().unwrap().to_string(); if let Some(handle) = tweeter.retrieve_user(&deleted_user_id).map(|x| &x.handle) { println!("who? {} - {}", deleted_user_id, handle); + } else { + println!("dunno who..."); } } else if structure.contains_key("user") && structure.contains_key("id") { let twete_id = structure["id_str"].as_str().unwrap().to_string(); @@ -599,6 +773,7 @@ fn signed_api_get(url: &str) -> hyper::client::Request { req.set_body(param_string); { + println!("{}", header.to_owned()); let mut headers = req.headers_mut(); headers.set(Authorization(header.to_owned())); headers.set(Accept("*/*".to_owned())); @@ -608,41 +783,13 @@ fn signed_api_get(url: &str) -> hyper::client::Request { req } -fn do_web_req(url: &str, client: &hyper::client::Client>, core: &mut Core) -> Option { - let lookup = client.request(signed_web_get(url)); - - let resp: hyper::Response = core.run(lookup).unwrap(); - - let chunks: Vec = core.run(resp.body().collect()).unwrap(); - - let resp_body: Vec = chunks.into_iter().flat_map(|chunk| chunk.into_iter()).collect(); - - match serde_json::from_slice(&resp_body) { - Ok(value) => Some(value), - Err(e) => { - println!("error deserializing json: {}", e); - None - } - } -} - fn display_event( twete: serde_json::Value, - tweeter: &mut tw::TwitterCache + tweeter: &mut tw::TwitterCache, + queryer: &mut Queryer ) { - /* match twete { - serde_json::Value::Object(objmap) => { - if objmap.contains_key("id_str") { - let tweet_id = objmap["id_str"].as_str().unwrap(); - twetemap.insert(tweet_id, objmap); - } - render_twitter_event(&twetemap, objmap, client, c2); - }, - f => println!("Unexpected object: {}", f) - };*/ - match twete { - serde_json::Value::Object(objmap) => render_twitter_event(objmap, tweeter), + serde_json::Value::Object(objmap) => render_twitter_event(objmap, tweeter, queryer), _ => () }; } @@ -653,116 +800,167 @@ fn main() { // let url = "https://stream.twitter.com/1.1/statuses/filter.json"; // let url = "https://stream.twitter.com/1.1/statuses/sample.json"; - let mut core = Core::new().unwrap(); - - let connector = HttpsConnector::new(1, &core.handle()).unwrap(); - - let client = Client::configure() - .keep_alive(true) - .connector(connector) - .build(&core.handle()); - -// println!("{}", do_web_req("https://caps.twitter.com/v2/capi/passthrough/1?twitter:string:card_uri=card://887655800482787328&twitter:long:original_tweet_id=887655800981925888&twitter:string:response_card_name=poll3choice_text_only&twitter:string:cards_platform=Web-12", &client, &mut core).unwrap()); -// println!("{}", look_up_tweet("887655800981925888", &client, &mut core).unwrap()); + println!("starting!"); - println!("Loading cache..."); + let (ui_tx, ui_rx) = chan::sync::>(0); - let req = signed_api_get(streamurl); + let twete_rx = connect_twitter_stream(); - println!("starting!"); -// println!("lookup'd"); - -// println!("requesting..."); - /* - let work = client.request(req).and_then(|res| { - res.body().for_each(move |body: hyper::Chunk| { - println!("hmmm"); - println!("{}", std::str::from_utf8(&body).unwrap()); - Ok(()) - }) + std::thread::spawn(move || { + use std::io::Read; + loop { + let mut line = String::new(); + std::io::stdin().read_line(&mut line); + ui_tx.send(line.into_bytes()); + } }); - */ - let (twete_tx, twete_rx) = chan::sync::>(0); - let (ui_tx, ui_rx) = chan::sync::>(0); + // I *would* want to load this before spawning the thread, but.. + // tokio_core::reactor::Inner can't be moved between threads safely + // and beacuse it's an Option-al field, it might be present + // and rustc says nooooo + // + // even though it's not ever present before here + println!("Loading cache..."); - let remote = core.remote(); + let mut tweeter = tw::TwitterCache::load_cache(); - std::thread::spawn(move || { - // I *would* want to load this before spawning the thread, but.. - // tokio_core::reactor::Inner can't be moved between threads safely - // and beacuse it's an Option-al field, it might be present - // and rustc says nooooo - // - // even though it's not ever present before here - let mut tweeter = tw::TwitterCache::load_cache(); + println!("Loaded cache!"); - let c2 = Core::new().unwrap(); // i swear this is not where the botnet lives - let handle = &c2.handle(); - let secondaryConnector = HttpsConnector::new(4, handle).unwrap(); + let c2 = Core::new().unwrap(); // i swear this is not where the botnet lives + let handle = &c2.handle(); + let secondaryConnector = HttpsConnector::new(4, handle).unwrap(); - let secondaryClient = Client::configure() - .connector(secondaryConnector) - .build(handle); + let secondaryClient = Client::configure() + .connector(secondaryConnector) + .build(handle); - tweeter.with_client(secondaryClient, c2); + let mut queryer = Queryer { + client: secondaryClient, + core: c2 + }; - loop { - chan_select! { - twete_rx.recv() -> twete => { - match twete { - Some(line) => { - let jsonstr = std::str::from_utf8(&line).unwrap().trim(); - println!("{}", jsonstr); - /* TODO: replace from_str with from_slice */ - let json: serde_json::Value = serde_json::from_str(&jsonstr).unwrap(); - display_event(json, &mut tweeter); + loop { + let ui_rx_b = &ui_rx; + chan_select! { + twete_rx.recv() -> twete => match twete { + Some(line) => { + let jsonstr = std::str::from_utf8(&line).unwrap().trim(); +// println!("{}", jsonstr); + /* TODO: replace from_str with from_slice */ + let json: serde_json::Value = serde_json::from_str(&jsonstr).unwrap(); + display_event(json, &mut tweeter, &mut queryer); + if tweeter.needs_save && tweeter.caching_permitted { + tweeter.store_cache(); + } + } + None => { + println!("Twitter stream hung up..."); + chan_select! { + ui_rx_b.recv() -> input => match input { + Some(line) => handle_user_input(line, &mut tweeter, &mut queryer), + None => std::process::exit(0) } - None => { println!("???"); } } + } + }, + ui_rx.recv() -> user_input => match user_input { + Some(line) => { + handle_user_input(line, &mut tweeter, &mut queryer); }, - ui_rx.recv() -> user_input => { - println!("ui_rx recv"); - match user_input { - Some(line) => println!("You typed {}", std::str::from_utf8(&line).unwrap()), - None => println!("??? 2") - } + None => println!("UI thread hung up...") + } + } + } + + println!("Bye bye"); +} + +fn handle_user_input(line: Vec, tweeter: &mut tw::TwitterCache, mut queryer: &mut Queryer) { + if line == String::from("show_cache\n").into_bytes() { + println!("----* USERS *----"); + for (uid, user) in &tweeter.users { + println!("User: {} -> {:?}", uid, user); + } + println!("----* TWEETS *----"); + for (tid, tweet) in &tweeter.tweets { + println!("Tweet: {} -> {:?}", tid, tweet); + } + println!("----* FOLLOWERS *----"); + for uid in &tweeter.followers.clone() { + let user_res = tweeter.fetch_user(uid, &mut queryer); + match user_res { + Some(user) => { + println!("Follower: {} - {:?}", uid, user); } + None => { println!(" ..."); } } } - }); + } else if line == String::from("q\n").into_bytes() { + println!("Bye bye!"); + tweeter.store_cache(); + std::process::exit(0); + } else if line.starts_with("look_up_".as_bytes()) { + let linestr = std::str::from_utf8(&line).unwrap().trim(); + if linestr.starts_with("look_up_tweet") { + let tweetid = &linestr.split(" ").collect::>()[1].to_string(); + if let Some(tweet) = tweeter.fetch_tweet(tweetid, &mut queryer) { + println!("{:?}", tweet); + } else { +// println!("Couldn't retrieve {}", tweetid); + } + } else if linestr.starts_with("look_up_user") { + let userid = &linestr.split(" ").collect::>()[1].to_string(); + if let Some(user) = tweeter.fetch_user(userid, &mut queryer) { + println!("{:?}", user); + } else { +// println!("Couldn't retrieve {}", userid); + } + } + } +} + +fn connect_twitter_stream() -> chan::Receiver> { + let (twete_tx, twete_rx) = chan::sync::>(0); std::thread::spawn(move || { - use std::io::Read; - loop { - let mut line = String::new(); - std::io::stdin().read_line(&mut line); - ui_tx.send(line.into_bytes()); - } - }); + let mut core = Core::new().unwrap(); - println!("Before?"); - let work = client.request(req).and_then(|res| { - LineStream::new(res.body() - .map(|chunk| futures::stream::iter(chunk.into_iter().map(|b| Ok(b)))) - .flatten()) - .for_each(|s| { - if s.len() != 1 { - //println!("Send!: {}", std::str::from_utf8(&s).unwrap()); - twete_tx.send(s); - }; - Ok(()) - }) - }); + let connector = HttpsConnector::new(1, &core.handle()).unwrap(); + + let client = Client::configure() + .keep_alive(true) + .connector(connector) + .build(&core.handle()); - let resp = core.run(work).unwrap(); - println!("After?"); + // println!("{}", do_web_req("https://caps.twitter.com/v2/capi/passthrough/1?twitter:string:card_uri=card://887655800482787328&twitter:long:original_tweet_id=887655800981925888&twitter:string:response_card_name=poll3choice_text_only&twitter:string:cards_platform=Web-12", &client, &mut core).unwrap()); + // println!("{}", look_up_tweet("887655800981925888", &client, &mut core).unwrap()); - /* - for obj in BufReader::new(res).json_objects() { - println!("{:?}", obj.as_object().unwrap().get("text").unwrap().as_string().unwrap()); - }*/ + let req = signed_api_get(streamurl); + let work = client.request(req).and_then(|res| { + let status = res.status(); + if status != hyper::StatusCode::Ok { + println!("Twitter stream connect was abnormal: {}", status); + } + LineStream::new(res.body() + .map(|chunk| futures::stream::iter(chunk.into_iter().map(|b| Ok(b)))) + .flatten()) + .for_each(|s| { + if s.len() != 1 { + twete_tx.send(s); + }; + Ok(()) + }) + }); + + let resp = core.run(work); + match resp { + Ok(good) => (), + Err(e) => println!("Error in setting up: {}", e) + } + }); + twete_rx } //extern crate futures; -- cgit v1.1