diff options
-rw-r--r-- | Cargo.toml | 2 | ||||
-rw-r--r-- | main.rs | 512 |
2 files changed, 357 insertions, 157 deletions
@@ -27,3 +27,5 @@ hyper-tls = "*" "serde_json" = "*" "chan" = "*" chrono = "0.4" +serde = "*" +serde_derive = "*" @@ -7,6 +7,7 @@ use std::str; extern crate url; #[macro_use] extern crate hyper; +#[macro_use] extern crate serde_derive; extern crate oauthcli; extern crate tokio_core; extern crate futures; @@ -22,15 +23,15 @@ use hyper_tls::HttpsConnector; //use json_streamer::JsonObjectStreamer; //Change these values to your real Twitter API credentials -static consumer_key: &str = "0af9c1AoEi5X7IjtOKAtP60Za"; -static consumer_secret: &str = "1fxEzRhQtQSWKus4oqDwdg5DALIjGpINg0PGjkYVwKT8EEMFCh"; -static token: &str = "629126745-VePBD9ciKwpuVuIeEcNnxwxQFNWDXEy8KL3dGRRg"; -static token_secret: &str = "uAAruZzJu03NvMlH6cTeGku7NqVPro1ddKN4BxORy5hWG"; +static consumer_key: &str = "T879tHWDzd6LvKWdYVfbJL4Su"; +static consumer_secret: &str = "OAXXYYIozAZ4vWSmDziI1EMJCKXPmWPFgLbJpB896iIAMIAdpb"; +static token: &str = "629126745-Qt6LPq2kR7w58s7WHzSqcs4CIdiue64kkfYYB7RI"; +static token_secret: &str = "3BI3YC4WVbKW5icpHORWpsTYqYIj5oAZFkrgyIAoaoKnK"; static lol_auth_token: &str = "641cdf3a4bbddb72c118b5821e8696aee6300a9a"; static streamurl: &str = "https://userstream.twitter.com/1.1/user.json?tweet_mode=extended"; static tweet_lookup_url: &str = "https://api.twitter.com/1.1/statuses/show.json?tweet_mode=extended"; -static user_lookup_url: &str = "https://api.twitter.com/1.1/users/lookup.json"; +static user_lookup_url: &str = "https://api.twitter.com/1.1/users/show.json"; header! { (Authorization, "Authorization") => [String] } header! { (Accept, "Accept") => [String] } @@ -38,6 +39,9 @@ header! { (ContentType, "Content-Type") => [String] } header! { (Cookie, "cookie") => [String] } mod tw { + use std::path::Path; + use std::fs::File; + use std::io::{BufRead, BufReader, Read}; extern crate chrono; use self::chrono::prelude::*; @@ -48,7 +52,13 @@ mod tw { use tokio_core::reactor::Core; use hyper_tls::HttpsConnector; extern crate serde_json; + extern crate serde; + extern crate serde_derive; + use std::io::Write; + use std::fs::OpenOptions; + + #[derive(Debug, Serialize, Deserialize)] pub struct User { pub id: String, pub name: String, @@ -237,12 +247,17 @@ mod tw { } } + #[derive(Debug, Serialize, Deserialize)] pub struct Tweet { pub id: String, pub author_id: String, pub text: String, pub created_at: String, // lol + #[serde(skip_serializing_if="Option::is_none")] + #[serde(default = "Option::default")] pub quoted_tweet_id: Option<String>, + #[serde(skip_serializing_if="Option::is_none")] + #[serde(default = "Option::default")] pub rt_tweet: Option<String> } @@ -324,18 +339,29 @@ mod tw { twete_text } + #[derive(Serialize, Deserialize)] pub struct TwitterCache { - users: HashMap<String, User>, - tweets: HashMap<String, Tweet>, + #[serde(skip)] + pub users: HashMap<String, User>, + #[serde(skip)] + pub tweets: HashMap<String, Tweet>, following: HashSet<String>, following_history: HashMap<String, (String, i64)>, // userid:date?? - followers: HashSet<String>, + pub followers: HashSet<String>, lost_followers: HashSet<String>, follower_history: HashMap<String, (String, i64)>, // userid:date?? - queryer: Option<(hyper::client::Client<HttpsConnector<hyper::client::HttpConnector>>, Core)> + #[serde(skip)] + pub needs_save: bool, + #[serde(skip)] + pub caching_permitted: bool } impl TwitterCache { + const PROFILE_DIR: &'static str = "cache/"; + const TWEET_CACHE: &'static str = "cache/tweets.json"; + const USERS_CACHE: &'static str = "cache/users.json"; + const PROFILE_CACHE: &'static str = "cache/profile.json"; // this should involve MY user id.. + fn new() -> TwitterCache { TwitterCache { users: HashMap::new(), @@ -345,28 +371,92 @@ mod tw { followers: HashSet::new(), lost_followers: HashSet::new(), follower_history: HashMap::new(), - queryer: None + needs_save: false, + caching_permitted: true } } - pub fn with_client( - &mut self, - client: hyper::client::Client<HttpsConnector<hyper::client::HttpConnector>>, - core: Core - ) { - self.queryer = Some((client, core)); + fn new_without_caching() -> TwitterCache { + let mut cache = TwitterCache::new(); + cache.caching_permitted = false; + cache } fn cache_user(&mut self, user: User) { - self.users.insert(user.id.to_owned(), user); + if !self.users.contains_key(&user.id) { + let mut file = + OpenOptions::new() + .create(true) + .append(true) + .open(TwitterCache::USERS_CACHE) + .unwrap(); + writeln!(file, "{}", serde_json::to_string(&user).unwrap()); + self.users.insert(user.id.to_owned(), user); + } } fn cache_tweet(&mut self, tweet: Tweet) { - self.tweets.insert(tweet.id.to_owned(), tweet); + if !self.tweets.contains_key(&tweet.id) { + let mut file = + OpenOptions::new() + .create(true) + .append(true) + .open(TwitterCache::TWEET_CACHE) + .unwrap(); + writeln!(file, "{}", serde_json::to_string(&tweet).unwrap()); + self.tweets.insert(tweet.id.to_owned(), tweet); + } } pub fn store_cache(&self) { + if Path::new(TwitterCache::PROFILE_DIR).is_dir() { + let mut profile = OpenOptions::new() + .write(true) + .create(true) + .append(false) + .open(TwitterCache::PROFILE_CACHE) + .unwrap(); + serde_json::to_writer(profile, self).unwrap(); + } else { + println!("No cache dir exists..."); + } // store cache } pub fn load_cache() -> TwitterCache { - TwitterCache::new() + if Path::new(TwitterCache::PROFILE_CACHE).is_file() { + let mut buf = vec![]; + let mut profile = File::open(TwitterCache::PROFILE_CACHE).unwrap(); + match profile.read_to_end(&mut buf) { + Ok(sz) => { + match serde_json::from_slice(&buf) { + Ok(result) => { + let mut cache: TwitterCache = result; + cache.tweets = HashMap::new(); + for line in BufReader::new(File::open(TwitterCache::TWEET_CACHE).unwrap()).lines() { + let t: Tweet = serde_json::from_str(&line.unwrap()).unwrap(); + cache.tweets.insert(t.id.to_owned(), t); + } + for line in BufReader::new(File::open(TwitterCache::USERS_CACHE).unwrap()).lines() { + let u: User = serde_json::from_str(&line.unwrap()).unwrap(); + cache.users.insert(u.id.to_owned(), u); + } + cache.caching_permitted = true; + cache.needs_save = false; + cache + } + Err(e) => { + // TODO! should be able to un-frick profile after startup. + println!("Error reading profile, profile caching disabled... {}", e); + TwitterCache::new_without_caching() + } + } + } + Err(e) => { + println!("Error reading cached profile: {}. Profile caching disabled.", e); + TwitterCache::new_without_caching() + } + } + } else { + println!("Hello! First time setup?"); + TwitterCache::new() + } } pub fn cache_api_tweet(&mut self, json: serde_json::Value) { if let Some((rt, rt_user)) = json.get("retweeted_status").and_then(|x| Tweet::from_api_json(x.to_owned())) { @@ -389,7 +479,7 @@ mod tw { self.cache_user(user); } } - pub fn cache_api_event(&mut self, json: serde_json::Map<String, serde_json::Value>) { + pub fn cache_api_event(&mut self, json: serde_json::Map<String, serde_json::Value>, mut queryer: &mut ::Queryer) { /* don't really care to hold on to who fav, unfav, ... when, just pick targets out. */ match json.get("event").and_then(|x| x.as_str()) { Some("favorite") => { @@ -406,7 +496,25 @@ mod tw { Some("favorited_retweet") => ()/* cache rt */, Some("delete") => { let user_id = json["delete"]["status"]["user_id_str"].as_str().unwrap().to_string(); - self.fetch_user(&user_id); + self.fetch_user(&user_id, &mut queryer); + }, + Some("follow") => { + let follower = json["source"]["id_str"].as_str().unwrap().to_string(); + let followed = json["target"]["id_str"].as_str().unwrap().to_string(); + if follower == "iximeow" { + // self.add_follow( + } else { + self.add_follower(&follower); + } + }, + Some("unfollow") => { + let follower = json["source"]["id_str"].as_str().unwrap().to_string(); + let followed = json["target"]["id_str"].as_str().unwrap().to_string(); + if follower == "iximeow" { + // self.add_follow( + } else { + self.remove_follower(&follower); + } }, Some(_) => () /* an uninteresting event */, None => () // not really an event? should we log something? @@ -419,18 +527,18 @@ mod tw { pub fn retrieve_user(&self, user_id: &String) -> Option<&User> { self.users.get(user_id) } - pub fn fetch_tweet(&mut self, tweet_id: &String) -> Option<&Tweet> { + pub fn fetch_tweet(&mut self, tweet_id: &String, mut queryer: &mut ::Queryer) -> Option<&Tweet> { if !self.tweets.contains_key(tweet_id) { - match self.look_up_tweet(tweet_id) { + match self.look_up_tweet(tweet_id, &mut queryer) { Some(json) => self.cache_api_tweet(json), None => println!("Unable to retrieve tweet {}", tweet_id) }; } self.tweets.get(tweet_id) } - pub fn fetch_user(&mut self, user_id: &String) -> Option<&User> { + pub fn fetch_user(&mut self, user_id: &String, mut queryer: &mut ::Queryer) -> Option<&User> { if !self.users.contains_key(user_id) { - let maybe_parsed = self.look_up_user(user_id).and_then(|x| User::from_json(x)); + let maybe_parsed = self.look_up_user(user_id, &mut queryer).and_then(|x| User::from_json(x)); match maybe_parsed { Some(tw) => self.cache_user(tw), None => println!("Unable to retrieve user {}", user_id) @@ -438,6 +546,21 @@ mod tw { } self.users.get(user_id) } + pub fn set_following(&mut self, user_ids: Vec<String>) { + let uid_set = user_ids.into_iter().collect::<HashSet<String>>(); + + let new_uids = &uid_set - &self.following; + for user in &new_uids { + println!("New following! {}", user); + self.add_following(user); + } + + let lost_uids = &self.following - &uid_set; + for user in &lost_uids { + println!("Bye, friend! {}", user); + self.remove_following(user); + } + } pub fn set_followers(&mut self, user_ids: Vec<String>) { let uid_set = user_ids.into_iter().collect::<HashSet<String>>(); @@ -453,36 +576,80 @@ mod tw { self.remove_follower(user); } } + pub fn add_following(&mut self, user_id: &String) { + self.needs_save = true; + self.following.insert(user_id.to_owned()); + self.following_history.insert(user_id.to_owned(), ("following".to_string(), Utc::now().timestamp())); + } + pub fn remove_following(&mut self, user_id: &String) { + self.needs_save = true; + self.following.remove(user_id); + self.following_history.insert(user_id.to_owned(), ("unfollowing".to_string(), Utc::now().timestamp())); + } pub fn add_follower(&mut self, user_id: &String) { + self.needs_save = true; self.followers.insert(user_id.to_owned()); self.lost_followers.remove(user_id); self.follower_history.insert(user_id.to_owned(), ("follow".to_string(), Utc::now().timestamp())); } pub fn remove_follower(&mut self, user_id: &String) { + self.needs_save = true; self.followers.remove(user_id); self.lost_followers.insert(user_id.to_owned()); self.follower_history.insert(user_id.to_owned(), ("unfollow".to_string(), Utc::now().timestamp())); } - fn look_up_user(&mut self, id: &str) -> Option<serde_json::Value> { - if let Some((ref client, ref mut core)) = self.queryer { - ::do_web_req(&format!("{}?id={}", ::user_lookup_url, id), client, core) - } else { - None - } + fn look_up_user(&mut self, id: &str, queryer: &mut ::Queryer) -> Option<serde_json::Value> { + let url = &format!("{}?user_id={}", ::user_lookup_url, id); + queryer.do_api_req(url) } - fn look_up_tweet(&mut self, id: &str) -> Option<serde_json::Value> { - if let Some((ref client, ref mut core)) = self.queryer { - ::do_web_req(&format!("{}?id={}", ::tweet_lookup_url, id), client, core) - } else { - None - } + fn look_up_tweet(&mut self, id: &str, queryer: &mut ::Queryer) -> Option<serde_json::Value> { + let url = &format!("{}?id={}", ::tweet_lookup_url, id); + queryer.do_api_req(url) } } +} +pub struct Queryer { + client: hyper::client::Client<HttpsConnector<hyper::client::HttpConnector>>, + core: Core } +impl Queryer { + fn do_api_req(&mut self, url: &str) -> Option<serde_json::Value> { + self.issue_request(signed_api_get(url)) + } + fn do_web_req(&mut self, url: &str) -> Option<serde_json::Value> { + self.issue_request(signed_web_get(url)) + } + fn issue_request(&mut self, req: hyper::client::Request) -> Option<serde_json::Value> { + let lookup = self.client.request(req); + + let resp: hyper::Response = self.core.run(lookup).unwrap(); + let status = resp.status().clone(); + + let chunks: Vec<hyper::Chunk> = self.core.run(resp.body().collect()).unwrap(); + + let resp_body: Vec<u8> = chunks.into_iter().flat_map(|chunk| chunk.into_iter()).collect(); + + match serde_json::from_slice(&resp_body) { + Ok(value) => { + if status != hyper::StatusCode::Ok { + println!("!! Requests returned status: {}", status); + println!("{}", value); + None + } else { + Some(value) + } + } + Err(e) => { + println!("error deserializing json: {}", e); + None + } + } + } +} fn render_twete(twete: &tw::Tweet, tweeter: &tw::TwitterCache) { @@ -518,17 +685,24 @@ fn render_twete(twete: &tw::Tweet, tweeter: &tw::TwitterCache) { fn render_twitter_event( structure: serde_json::Map<String, serde_json::Value>, - tweeter: &mut tw::TwitterCache) { + tweeter: &mut tw::TwitterCache, + mut queryer: &mut Queryer) { if structure.contains_key("event") { - tweeter.cache_api_event(structure.clone()); + tweeter.cache_api_event(structure.clone(), &mut queryer); if let Some(event) = tw::events::Event::from_json(structure) { event.render(&tweeter); }; + } else if structure.contains_key("friends") { + let user_id_nums = structure["friends"].as_array().unwrap(); + let user_id_strs = user_id_nums.into_iter().map(|x| x.as_u64().unwrap().to_string()); + tweeter.set_following(user_id_strs.collect()); } else if structure.contains_key("delete") { println!("delete..."); let deleted_user_id = structure["delete"]["status"]["user_id_str"].as_str().unwrap().to_string(); if let Some(handle) = tweeter.retrieve_user(&deleted_user_id).map(|x| &x.handle) { println!("who? {} - {}", deleted_user_id, handle); + } else { + println!("dunno who..."); } } else if structure.contains_key("user") && structure.contains_key("id") { let twete_id = structure["id_str"].as_str().unwrap().to_string(); @@ -599,6 +773,7 @@ fn signed_api_get(url: &str) -> hyper::client::Request { req.set_body(param_string); { + println!("{}", header.to_owned()); let mut headers = req.headers_mut(); headers.set(Authorization(header.to_owned())); headers.set(Accept("*/*".to_owned())); @@ -608,41 +783,13 @@ fn signed_api_get(url: &str) -> hyper::client::Request { req } -fn do_web_req(url: &str, client: &hyper::client::Client<HttpsConnector<hyper::client::HttpConnector>>, core: &mut Core) -> Option<serde_json::Value> { - let lookup = client.request(signed_web_get(url)); - - let resp: hyper::Response = core.run(lookup).unwrap(); - - let chunks: Vec<hyper::Chunk> = core.run(resp.body().collect()).unwrap(); - - let resp_body: Vec<u8> = chunks.into_iter().flat_map(|chunk| chunk.into_iter()).collect(); - - match serde_json::from_slice(&resp_body) { - Ok(value) => Some(value), - Err(e) => { - println!("error deserializing json: {}", e); - None - } - } -} - fn display_event( twete: serde_json::Value, - tweeter: &mut tw::TwitterCache + tweeter: &mut tw::TwitterCache, + queryer: &mut Queryer ) { - /* match twete { - serde_json::Value::Object(objmap) => { - if objmap.contains_key("id_str") { - let tweet_id = objmap["id_str"].as_str().unwrap(); - twetemap.insert(tweet_id, objmap); - } - render_twitter_event(&twetemap, objmap, client, c2); - }, - f => println!("Unexpected object: {}", f) - };*/ - match twete { - serde_json::Value::Object(objmap) => render_twitter_event(objmap, tweeter), + serde_json::Value::Object(objmap) => render_twitter_event(objmap, tweeter, queryer), _ => () }; } @@ -653,116 +800,167 @@ fn main() { // let url = "https://stream.twitter.com/1.1/statuses/filter.json"; // let url = "https://stream.twitter.com/1.1/statuses/sample.json"; - let mut core = Core::new().unwrap(); - - let connector = HttpsConnector::new(1, &core.handle()).unwrap(); - - let client = Client::configure() - .keep_alive(true) - .connector(connector) - .build(&core.handle()); - -// println!("{}", do_web_req("https://caps.twitter.com/v2/capi/passthrough/1?twitter:string:card_uri=card://887655800482787328&twitter:long:original_tweet_id=887655800981925888&twitter:string:response_card_name=poll3choice_text_only&twitter:string:cards_platform=Web-12", &client, &mut core).unwrap()); -// println!("{}", look_up_tweet("887655800981925888", &client, &mut core).unwrap()); + println!("starting!"); - println!("Loading cache..."); + let (ui_tx, ui_rx) = chan::sync::<Vec<u8>>(0); - let req = signed_api_get(streamurl); + let twete_rx = connect_twitter_stream(); - println!("starting!"); -// println!("lookup'd"); - -// println!("requesting..."); - /* - let work = client.request(req).and_then(|res| { - res.body().for_each(move |body: hyper::Chunk| { - println!("hmmm"); - println!("{}", std::str::from_utf8(&body).unwrap()); - Ok(()) - }) + std::thread::spawn(move || { + use std::io::Read; + loop { + let mut line = String::new(); + std::io::stdin().read_line(&mut line); + ui_tx.send(line.into_bytes()); + } }); - */ - let (twete_tx, twete_rx) = chan::sync::<Vec<u8>>(0); - let (ui_tx, ui_rx) = chan::sync::<Vec<u8>>(0); + // I *would* want to load this before spawning the thread, but.. + // tokio_core::reactor::Inner can't be moved between threads safely + // and beacuse it's an Option-al field, it might be present + // and rustc says nooooo + // + // even though it's not ever present before here + println!("Loading cache..."); - let remote = core.remote(); + let mut tweeter = tw::TwitterCache::load_cache(); - std::thread::spawn(move || { - // I *would* want to load this before spawning the thread, but.. - // tokio_core::reactor::Inner can't be moved between threads safely - // and beacuse it's an Option-al field, it might be present - // and rustc says nooooo - // - // even though it's not ever present before here - let mut tweeter = tw::TwitterCache::load_cache(); + println!("Loaded cache!"); - let c2 = Core::new().unwrap(); // i swear this is not where the botnet lives - let handle = &c2.handle(); - let secondaryConnector = HttpsConnector::new(4, handle).unwrap(); + let c2 = Core::new().unwrap(); // i swear this is not where the botnet lives + let handle = &c2.handle(); + let secondaryConnector = HttpsConnector::new(4, handle).unwrap(); - let secondaryClient = Client::configure() - .connector(secondaryConnector) - .build(handle); + let secondaryClient = Client::configure() + .connector(secondaryConnector) + .build(handle); - tweeter.with_client(secondaryClient, c2); + let mut queryer = Queryer { + client: secondaryClient, + core: c2 + }; - loop { - chan_select! { - twete_rx.recv() -> twete => { - match twete { - Some(line) => { - let jsonstr = std::str::from_utf8(&line).unwrap().trim(); - println!("{}", jsonstr); - /* TODO: replace from_str with from_slice */ - let json: serde_json::Value = serde_json::from_str(&jsonstr).unwrap(); - display_event(json, &mut tweeter); + loop { + let ui_rx_b = &ui_rx; + chan_select! { + twete_rx.recv() -> twete => match twete { + Some(line) => { + let jsonstr = std::str::from_utf8(&line).unwrap().trim(); +// println!("{}", jsonstr); + /* TODO: replace from_str with from_slice */ + let json: serde_json::Value = serde_json::from_str(&jsonstr).unwrap(); + display_event(json, &mut tweeter, &mut queryer); + if tweeter.needs_save && tweeter.caching_permitted { + tweeter.store_cache(); + } + } + None => { + println!("Twitter stream hung up..."); + chan_select! { + ui_rx_b.recv() -> input => match input { + Some(line) => handle_user_input(line, &mut tweeter, &mut queryer), + None => std::process::exit(0) } - None => { println!("???"); } } + } + }, + ui_rx.recv() -> user_input => match user_input { + Some(line) => { + handle_user_input(line, &mut tweeter, &mut queryer); }, - ui_rx.recv() -> user_input => { - println!("ui_rx recv"); - match user_input { - Some(line) => println!("You typed {}", std::str::from_utf8(&line).unwrap()), - None => println!("??? 2") - } + None => println!("UI thread hung up...") + } + } + } + + println!("Bye bye"); +} + +fn handle_user_input(line: Vec<u8>, tweeter: &mut tw::TwitterCache, mut queryer: &mut Queryer) { + if line == String::from("show_cache\n").into_bytes() { + println!("----* USERS *----"); + for (uid, user) in &tweeter.users { + println!("User: {} -> {:?}", uid, user); + } + println!("----* TWEETS *----"); + for (tid, tweet) in &tweeter.tweets { + println!("Tweet: {} -> {:?}", tid, tweet); + } + println!("----* FOLLOWERS *----"); + for uid in &tweeter.followers.clone() { + let user_res = tweeter.fetch_user(uid, &mut queryer); + match user_res { + Some(user) => { + println!("Follower: {} - {:?}", uid, user); } + None => { println!(" ..."); } } } - }); + } else if line == String::from("q\n").into_bytes() { + println!("Bye bye!"); + tweeter.store_cache(); + std::process::exit(0); + } else if line.starts_with("look_up_".as_bytes()) { + let linestr = std::str::from_utf8(&line).unwrap().trim(); + if linestr.starts_with("look_up_tweet") { + let tweetid = &linestr.split(" ").collect::<Vec<&str>>()[1].to_string(); + if let Some(tweet) = tweeter.fetch_tweet(tweetid, &mut queryer) { + println!("{:?}", tweet); + } else { +// println!("Couldn't retrieve {}", tweetid); + } + } else if linestr.starts_with("look_up_user") { + let userid = &linestr.split(" ").collect::<Vec<&str>>()[1].to_string(); + if let Some(user) = tweeter.fetch_user(userid, &mut queryer) { + println!("{:?}", user); + } else { +// println!("Couldn't retrieve {}", userid); + } + } + } +} + +fn connect_twitter_stream() -> chan::Receiver<Vec<u8>> { + let (twete_tx, twete_rx) = chan::sync::<Vec<u8>>(0); std::thread::spawn(move || { - use std::io::Read; - loop { - let mut line = String::new(); - std::io::stdin().read_line(&mut line); - ui_tx.send(line.into_bytes()); - } - }); + let mut core = Core::new().unwrap(); - println!("Before?"); - let work = client.request(req).and_then(|res| { - LineStream::new(res.body() - .map(|chunk| futures::stream::iter(chunk.into_iter().map(|b| Ok(b)))) - .flatten()) - .for_each(|s| { - if s.len() != 1 { - //println!("Send!: {}", std::str::from_utf8(&s).unwrap()); - twete_tx.send(s); - }; - Ok(()) - }) - }); + let connector = HttpsConnector::new(1, &core.handle()).unwrap(); + + let client = Client::configure() + .keep_alive(true) + .connector(connector) + .build(&core.handle()); - let resp = core.run(work).unwrap(); - println!("After?"); + // println!("{}", do_web_req("https://caps.twitter.com/v2/capi/passthrough/1?twitter:string:card_uri=card://887655800482787328&twitter:long:original_tweet_id=887655800981925888&twitter:string:response_card_name=poll3choice_text_only&twitter:string:cards_platform=Web-12", &client, &mut core).unwrap()); + // println!("{}", look_up_tweet("887655800981925888", &client, &mut core).unwrap()); - /* - for obj in BufReader::new(res).json_objects() { - println!("{:?}", obj.as_object().unwrap().get("text").unwrap().as_string().unwrap()); - }*/ + let req = signed_api_get(streamurl); + let work = client.request(req).and_then(|res| { + let status = res.status(); + if status != hyper::StatusCode::Ok { + println!("Twitter stream connect was abnormal: {}", status); + } + LineStream::new(res.body() + .map(|chunk| futures::stream::iter(chunk.into_iter().map(|b| Ok(b)))) + .flatten()) + .for_each(|s| { + if s.len() != 1 { + twete_tx.send(s); + }; + Ok(()) + }) + }); + + let resp = core.run(work); + match resp { + Ok(good) => (), + Err(e) => println!("Error in setting up: {}", e) + } + }); + twete_rx } //extern crate futures; |