aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.toml2
-rw-r--r--main.rs512
2 files changed, 357 insertions, 157 deletions
diff --git a/Cargo.toml b/Cargo.toml
index 5d67061..6a0653c 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -27,3 +27,5 @@ hyper-tls = "*"
"serde_json" = "*"
"chan" = "*"
chrono = "0.4"
+serde = "*"
+serde_derive = "*"
diff --git a/main.rs b/main.rs
index d4f108b..f47f2f3 100644
--- a/main.rs
+++ b/main.rs
@@ -7,6 +7,7 @@ use std::str;
extern crate url;
#[macro_use] extern crate hyper;
+#[macro_use] extern crate serde_derive;
extern crate oauthcli;
extern crate tokio_core;
extern crate futures;
@@ -22,15 +23,15 @@ use hyper_tls::HttpsConnector;
//use json_streamer::JsonObjectStreamer;
//Change these values to your real Twitter API credentials
-static consumer_key: &str = "0af9c1AoEi5X7IjtOKAtP60Za";
-static consumer_secret: &str = "1fxEzRhQtQSWKus4oqDwdg5DALIjGpINg0PGjkYVwKT8EEMFCh";
-static token: &str = "629126745-VePBD9ciKwpuVuIeEcNnxwxQFNWDXEy8KL3dGRRg";
-static token_secret: &str = "uAAruZzJu03NvMlH6cTeGku7NqVPro1ddKN4BxORy5hWG";
+static consumer_key: &str = "T879tHWDzd6LvKWdYVfbJL4Su";
+static consumer_secret: &str = "OAXXYYIozAZ4vWSmDziI1EMJCKXPmWPFgLbJpB896iIAMIAdpb";
+static token: &str = "629126745-Qt6LPq2kR7w58s7WHzSqcs4CIdiue64kkfYYB7RI";
+static token_secret: &str = "3BI3YC4WVbKW5icpHORWpsTYqYIj5oAZFkrgyIAoaoKnK";
static lol_auth_token: &str = "641cdf3a4bbddb72c118b5821e8696aee6300a9a";
static streamurl: &str = "https://userstream.twitter.com/1.1/user.json?tweet_mode=extended";
static tweet_lookup_url: &str = "https://api.twitter.com/1.1/statuses/show.json?tweet_mode=extended";
-static user_lookup_url: &str = "https://api.twitter.com/1.1/users/lookup.json";
+static user_lookup_url: &str = "https://api.twitter.com/1.1/users/show.json";
header! { (Authorization, "Authorization") => [String] }
header! { (Accept, "Accept") => [String] }
@@ -38,6 +39,9 @@ header! { (ContentType, "Content-Type") => [String] }
header! { (Cookie, "cookie") => [String] }
mod tw {
+ use std::path::Path;
+ use std::fs::File;
+ use std::io::{BufRead, BufReader, Read};
extern crate chrono;
use self::chrono::prelude::*;
@@ -48,7 +52,13 @@ mod tw {
use tokio_core::reactor::Core;
use hyper_tls::HttpsConnector;
extern crate serde_json;
+ extern crate serde;
+ extern crate serde_derive;
+ use std::io::Write;
+ use std::fs::OpenOptions;
+
+ #[derive(Debug, Serialize, Deserialize)]
pub struct User {
pub id: String,
pub name: String,
@@ -237,12 +247,17 @@ mod tw {
}
}
+ #[derive(Debug, Serialize, Deserialize)]
pub struct Tweet {
pub id: String,
pub author_id: String,
pub text: String,
pub created_at: String, // lol
+ #[serde(skip_serializing_if="Option::is_none")]
+ #[serde(default = "Option::default")]
pub quoted_tweet_id: Option<String>,
+ #[serde(skip_serializing_if="Option::is_none")]
+ #[serde(default = "Option::default")]
pub rt_tweet: Option<String>
}
@@ -324,18 +339,29 @@ mod tw {
twete_text
}
+ #[derive(Serialize, Deserialize)]
pub struct TwitterCache {
- users: HashMap<String, User>,
- tweets: HashMap<String, Tweet>,
+ #[serde(skip)]
+ pub users: HashMap<String, User>,
+ #[serde(skip)]
+ pub tweets: HashMap<String, Tweet>,
following: HashSet<String>,
following_history: HashMap<String, (String, i64)>, // userid:date??
- followers: HashSet<String>,
+ pub followers: HashSet<String>,
lost_followers: HashSet<String>,
follower_history: HashMap<String, (String, i64)>, // userid:date??
- queryer: Option<(hyper::client::Client<HttpsConnector<hyper::client::HttpConnector>>, Core)>
+ #[serde(skip)]
+ pub needs_save: bool,
+ #[serde(skip)]
+ pub caching_permitted: bool
}
impl TwitterCache {
+ const PROFILE_DIR: &'static str = "cache/";
+ const TWEET_CACHE: &'static str = "cache/tweets.json";
+ const USERS_CACHE: &'static str = "cache/users.json";
+ const PROFILE_CACHE: &'static str = "cache/profile.json"; // this should involve MY user id..
+
fn new() -> TwitterCache {
TwitterCache {
users: HashMap::new(),
@@ -345,28 +371,92 @@ mod tw {
followers: HashSet::new(),
lost_followers: HashSet::new(),
follower_history: HashMap::new(),
- queryer: None
+ needs_save: false,
+ caching_permitted: true
}
}
- pub fn with_client(
- &mut self,
- client: hyper::client::Client<HttpsConnector<hyper::client::HttpConnector>>,
- core: Core
- ) {
- self.queryer = Some((client, core));
+ fn new_without_caching() -> TwitterCache {
+ let mut cache = TwitterCache::new();
+ cache.caching_permitted = false;
+ cache
}
fn cache_user(&mut self, user: User) {
- self.users.insert(user.id.to_owned(), user);
+ if !self.users.contains_key(&user.id) {
+ let mut file =
+ OpenOptions::new()
+ .create(true)
+ .append(true)
+ .open(TwitterCache::USERS_CACHE)
+ .unwrap();
+ writeln!(file, "{}", serde_json::to_string(&user).unwrap());
+ self.users.insert(user.id.to_owned(), user);
+ }
}
fn cache_tweet(&mut self, tweet: Tweet) {
- self.tweets.insert(tweet.id.to_owned(), tweet);
+ if !self.tweets.contains_key(&tweet.id) {
+ let mut file =
+ OpenOptions::new()
+ .create(true)
+ .append(true)
+ .open(TwitterCache::TWEET_CACHE)
+ .unwrap();
+ writeln!(file, "{}", serde_json::to_string(&tweet).unwrap());
+ self.tweets.insert(tweet.id.to_owned(), tweet);
+ }
}
pub fn store_cache(&self) {
+ if Path::new(TwitterCache::PROFILE_DIR).is_dir() {
+ let mut profile = OpenOptions::new()
+ .write(true)
+ .create(true)
+ .append(false)
+ .open(TwitterCache::PROFILE_CACHE)
+ .unwrap();
+ serde_json::to_writer(profile, self).unwrap();
+ } else {
+ println!("No cache dir exists...");
+ }
// store cache
}
pub fn load_cache() -> TwitterCache {
- TwitterCache::new()
+ if Path::new(TwitterCache::PROFILE_CACHE).is_file() {
+ let mut buf = vec![];
+ let mut profile = File::open(TwitterCache::PROFILE_CACHE).unwrap();
+ match profile.read_to_end(&mut buf) {
+ Ok(sz) => {
+ match serde_json::from_slice(&buf) {
+ Ok(result) => {
+ let mut cache: TwitterCache = result;
+ cache.tweets = HashMap::new();
+ for line in BufReader::new(File::open(TwitterCache::TWEET_CACHE).unwrap()).lines() {
+ let t: Tweet = serde_json::from_str(&line.unwrap()).unwrap();
+ cache.tweets.insert(t.id.to_owned(), t);
+ }
+ for line in BufReader::new(File::open(TwitterCache::USERS_CACHE).unwrap()).lines() {
+ let u: User = serde_json::from_str(&line.unwrap()).unwrap();
+ cache.users.insert(u.id.to_owned(), u);
+ }
+ cache.caching_permitted = true;
+ cache.needs_save = false;
+ cache
+ }
+ Err(e) => {
+ // TODO! should be able to un-frick profile after startup.
+ println!("Error reading profile, profile caching disabled... {}", e);
+ TwitterCache::new_without_caching()
+ }
+ }
+ }
+ Err(e) => {
+ println!("Error reading cached profile: {}. Profile caching disabled.", e);
+ TwitterCache::new_without_caching()
+ }
+ }
+ } else {
+ println!("Hello! First time setup?");
+ TwitterCache::new()
+ }
}
pub fn cache_api_tweet(&mut self, json: serde_json::Value) {
if let Some((rt, rt_user)) = json.get("retweeted_status").and_then(|x| Tweet::from_api_json(x.to_owned())) {
@@ -389,7 +479,7 @@ mod tw {
self.cache_user(user);
}
}
- pub fn cache_api_event(&mut self, json: serde_json::Map<String, serde_json::Value>) {
+ pub fn cache_api_event(&mut self, json: serde_json::Map<String, serde_json::Value>, mut queryer: &mut ::Queryer) {
/* don't really care to hold on to who fav, unfav, ... when, just pick targets out. */
match json.get("event").and_then(|x| x.as_str()) {
Some("favorite") => {
@@ -406,7 +496,25 @@ mod tw {
Some("favorited_retweet") => ()/* cache rt */,
Some("delete") => {
let user_id = json["delete"]["status"]["user_id_str"].as_str().unwrap().to_string();
- self.fetch_user(&user_id);
+ self.fetch_user(&user_id, &mut queryer);
+ },
+ Some("follow") => {
+ let follower = json["source"]["id_str"].as_str().unwrap().to_string();
+ let followed = json["target"]["id_str"].as_str().unwrap().to_string();
+ if follower == "iximeow" {
+ // self.add_follow(
+ } else {
+ self.add_follower(&follower);
+ }
+ },
+ Some("unfollow") => {
+ let follower = json["source"]["id_str"].as_str().unwrap().to_string();
+ let followed = json["target"]["id_str"].as_str().unwrap().to_string();
+ if follower == "iximeow" {
+ // self.add_follow(
+ } else {
+ self.remove_follower(&follower);
+ }
},
Some(_) => () /* an uninteresting event */,
None => () // not really an event? should we log something?
@@ -419,18 +527,18 @@ mod tw {
pub fn retrieve_user(&self, user_id: &String) -> Option<&User> {
self.users.get(user_id)
}
- pub fn fetch_tweet(&mut self, tweet_id: &String) -> Option<&Tweet> {
+ pub fn fetch_tweet(&mut self, tweet_id: &String, mut queryer: &mut ::Queryer) -> Option<&Tweet> {
if !self.tweets.contains_key(tweet_id) {
- match self.look_up_tweet(tweet_id) {
+ match self.look_up_tweet(tweet_id, &mut queryer) {
Some(json) => self.cache_api_tweet(json),
None => println!("Unable to retrieve tweet {}", tweet_id)
};
}
self.tweets.get(tweet_id)
}
- pub fn fetch_user(&mut self, user_id: &String) -> Option<&User> {
+ pub fn fetch_user(&mut self, user_id: &String, mut queryer: &mut ::Queryer) -> Option<&User> {
if !self.users.contains_key(user_id) {
- let maybe_parsed = self.look_up_user(user_id).and_then(|x| User::from_json(x));
+ let maybe_parsed = self.look_up_user(user_id, &mut queryer).and_then(|x| User::from_json(x));
match maybe_parsed {
Some(tw) => self.cache_user(tw),
None => println!("Unable to retrieve user {}", user_id)
@@ -438,6 +546,21 @@ mod tw {
}
self.users.get(user_id)
}
+ pub fn set_following(&mut self, user_ids: Vec<String>) {
+ let uid_set = user_ids.into_iter().collect::<HashSet<String>>();
+
+ let new_uids = &uid_set - &self.following;
+ for user in &new_uids {
+ println!("New following! {}", user);
+ self.add_following(user);
+ }
+
+ let lost_uids = &self.following - &uid_set;
+ for user in &lost_uids {
+ println!("Bye, friend! {}", user);
+ self.remove_following(user);
+ }
+ }
pub fn set_followers(&mut self, user_ids: Vec<String>) {
let uid_set = user_ids.into_iter().collect::<HashSet<String>>();
@@ -453,36 +576,80 @@ mod tw {
self.remove_follower(user);
}
}
+ pub fn add_following(&mut self, user_id: &String) {
+ self.needs_save = true;
+ self.following.insert(user_id.to_owned());
+ self.following_history.insert(user_id.to_owned(), ("following".to_string(), Utc::now().timestamp()));
+ }
+ pub fn remove_following(&mut self, user_id: &String) {
+ self.needs_save = true;
+ self.following.remove(user_id);
+ self.following_history.insert(user_id.to_owned(), ("unfollowing".to_string(), Utc::now().timestamp()));
+ }
pub fn add_follower(&mut self, user_id: &String) {
+ self.needs_save = true;
self.followers.insert(user_id.to_owned());
self.lost_followers.remove(user_id);
self.follower_history.insert(user_id.to_owned(), ("follow".to_string(), Utc::now().timestamp()));
}
pub fn remove_follower(&mut self, user_id: &String) {
+ self.needs_save = true;
self.followers.remove(user_id);
self.lost_followers.insert(user_id.to_owned());
self.follower_history.insert(user_id.to_owned(), ("unfollow".to_string(), Utc::now().timestamp()));
}
- fn look_up_user(&mut self, id: &str) -> Option<serde_json::Value> {
- if let Some((ref client, ref mut core)) = self.queryer {
- ::do_web_req(&format!("{}?id={}", ::user_lookup_url, id), client, core)
- } else {
- None
- }
+ fn look_up_user(&mut self, id: &str, queryer: &mut ::Queryer) -> Option<serde_json::Value> {
+ let url = &format!("{}?user_id={}", ::user_lookup_url, id);
+ queryer.do_api_req(url)
}
- fn look_up_tweet(&mut self, id: &str) -> Option<serde_json::Value> {
- if let Some((ref client, ref mut core)) = self.queryer {
- ::do_web_req(&format!("{}?id={}", ::tweet_lookup_url, id), client, core)
- } else {
- None
- }
+ fn look_up_tweet(&mut self, id: &str, queryer: &mut ::Queryer) -> Option<serde_json::Value> {
+ let url = &format!("{}?id={}", ::tweet_lookup_url, id);
+ queryer.do_api_req(url)
}
}
+}
+pub struct Queryer {
+ client: hyper::client::Client<HttpsConnector<hyper::client::HttpConnector>>,
+ core: Core
}
+impl Queryer {
+ fn do_api_req(&mut self, url: &str) -> Option<serde_json::Value> {
+ self.issue_request(signed_api_get(url))
+ }
+ fn do_web_req(&mut self, url: &str) -> Option<serde_json::Value> {
+ self.issue_request(signed_web_get(url))
+ }
+ fn issue_request(&mut self, req: hyper::client::Request) -> Option<serde_json::Value> {
+ let lookup = self.client.request(req);
+
+ let resp: hyper::Response = self.core.run(lookup).unwrap();
+ let status = resp.status().clone();
+
+ let chunks: Vec<hyper::Chunk> = self.core.run(resp.body().collect()).unwrap();
+
+ let resp_body: Vec<u8> = chunks.into_iter().flat_map(|chunk| chunk.into_iter()).collect();
+
+ match serde_json::from_slice(&resp_body) {
+ Ok(value) => {
+ if status != hyper::StatusCode::Ok {
+ println!("!! Requests returned status: {}", status);
+ println!("{}", value);
+ None
+ } else {
+ Some(value)
+ }
+ }
+ Err(e) => {
+ println!("error deserializing json: {}", e);
+ None
+ }
+ }
+ }
+}
fn render_twete(twete: &tw::Tweet, tweeter: &tw::TwitterCache) {
@@ -518,17 +685,24 @@ fn render_twete(twete: &tw::Tweet, tweeter: &tw::TwitterCache) {
fn render_twitter_event(
structure: serde_json::Map<String, serde_json::Value>,
- tweeter: &mut tw::TwitterCache) {
+ tweeter: &mut tw::TwitterCache,
+ mut queryer: &mut Queryer) {
if structure.contains_key("event") {
- tweeter.cache_api_event(structure.clone());
+ tweeter.cache_api_event(structure.clone(), &mut queryer);
if let Some(event) = tw::events::Event::from_json(structure) {
event.render(&tweeter);
};
+ } else if structure.contains_key("friends") {
+ let user_id_nums = structure["friends"].as_array().unwrap();
+ let user_id_strs = user_id_nums.into_iter().map(|x| x.as_u64().unwrap().to_string());
+ tweeter.set_following(user_id_strs.collect());
} else if structure.contains_key("delete") {
println!("delete...");
let deleted_user_id = structure["delete"]["status"]["user_id_str"].as_str().unwrap().to_string();
if let Some(handle) = tweeter.retrieve_user(&deleted_user_id).map(|x| &x.handle) {
println!("who? {} - {}", deleted_user_id, handle);
+ } else {
+ println!("dunno who...");
}
} else if structure.contains_key("user") && structure.contains_key("id") {
let twete_id = structure["id_str"].as_str().unwrap().to_string();
@@ -599,6 +773,7 @@ fn signed_api_get(url: &str) -> hyper::client::Request {
req.set_body(param_string);
{
+ println!("{}", header.to_owned());
let mut headers = req.headers_mut();
headers.set(Authorization(header.to_owned()));
headers.set(Accept("*/*".to_owned()));
@@ -608,41 +783,13 @@ fn signed_api_get(url: &str) -> hyper::client::Request {
req
}
-fn do_web_req(url: &str, client: &hyper::client::Client<HttpsConnector<hyper::client::HttpConnector>>, core: &mut Core) -> Option<serde_json::Value> {
- let lookup = client.request(signed_web_get(url));
-
- let resp: hyper::Response = core.run(lookup).unwrap();
-
- let chunks: Vec<hyper::Chunk> = core.run(resp.body().collect()).unwrap();
-
- let resp_body: Vec<u8> = chunks.into_iter().flat_map(|chunk| chunk.into_iter()).collect();
-
- match serde_json::from_slice(&resp_body) {
- Ok(value) => Some(value),
- Err(e) => {
- println!("error deserializing json: {}", e);
- None
- }
- }
-}
-
fn display_event(
twete: serde_json::Value,
- tweeter: &mut tw::TwitterCache
+ tweeter: &mut tw::TwitterCache,
+ queryer: &mut Queryer
) {
- /*
match twete {
- serde_json::Value::Object(objmap) => {
- if objmap.contains_key("id_str") {
- let tweet_id = objmap["id_str"].as_str().unwrap();
- twetemap.insert(tweet_id, objmap);
- }
- render_twitter_event(&twetemap, objmap, client, c2);
- },
- f => println!("Unexpected object: {}", f)
- };*/
- match twete {
- serde_json::Value::Object(objmap) => render_twitter_event(objmap, tweeter),
+ serde_json::Value::Object(objmap) => render_twitter_event(objmap, tweeter, queryer),
_ => ()
};
}
@@ -653,116 +800,167 @@ fn main() {
// let url = "https://stream.twitter.com/1.1/statuses/filter.json";
// let url = "https://stream.twitter.com/1.1/statuses/sample.json";
- let mut core = Core::new().unwrap();
-
- let connector = HttpsConnector::new(1, &core.handle()).unwrap();
-
- let client = Client::configure()
- .keep_alive(true)
- .connector(connector)
- .build(&core.handle());
-
-// println!("{}", do_web_req("https://caps.twitter.com/v2/capi/passthrough/1?twitter:string:card_uri=card://887655800482787328&twitter:long:original_tweet_id=887655800981925888&twitter:string:response_card_name=poll3choice_text_only&twitter:string:cards_platform=Web-12", &client, &mut core).unwrap());
-// println!("{}", look_up_tweet("887655800981925888", &client, &mut core).unwrap());
+ println!("starting!");
- println!("Loading cache...");
+ let (ui_tx, ui_rx) = chan::sync::<Vec<u8>>(0);
- let req = signed_api_get(streamurl);
+ let twete_rx = connect_twitter_stream();
- println!("starting!");
-// println!("lookup'd");
-
-// println!("requesting...");
- /*
- let work = client.request(req).and_then(|res| {
- res.body().for_each(move |body: hyper::Chunk| {
- println!("hmmm");
- println!("{}", std::str::from_utf8(&body).unwrap());
- Ok(())
- })
+ std::thread::spawn(move || {
+ use std::io::Read;
+ loop {
+ let mut line = String::new();
+ std::io::stdin().read_line(&mut line);
+ ui_tx.send(line.into_bytes());
+ }
});
- */
- let (twete_tx, twete_rx) = chan::sync::<Vec<u8>>(0);
- let (ui_tx, ui_rx) = chan::sync::<Vec<u8>>(0);
+ // I *would* want to load this before spawning the thread, but..
+ // tokio_core::reactor::Inner can't be moved between threads safely
+ // and beacuse it's an Option-al field, it might be present
+ // and rustc says nooooo
+ //
+ // even though it's not ever present before here
+ println!("Loading cache...");
- let remote = core.remote();
+ let mut tweeter = tw::TwitterCache::load_cache();
- std::thread::spawn(move || {
- // I *would* want to load this before spawning the thread, but..
- // tokio_core::reactor::Inner can't be moved between threads safely
- // and beacuse it's an Option-al field, it might be present
- // and rustc says nooooo
- //
- // even though it's not ever present before here
- let mut tweeter = tw::TwitterCache::load_cache();
+ println!("Loaded cache!");
- let c2 = Core::new().unwrap(); // i swear this is not where the botnet lives
- let handle = &c2.handle();
- let secondaryConnector = HttpsConnector::new(4, handle).unwrap();
+ let c2 = Core::new().unwrap(); // i swear this is not where the botnet lives
+ let handle = &c2.handle();
+ let secondaryConnector = HttpsConnector::new(4, handle).unwrap();
- let secondaryClient = Client::configure()
- .connector(secondaryConnector)
- .build(handle);
+ let secondaryClient = Client::configure()
+ .connector(secondaryConnector)
+ .build(handle);
- tweeter.with_client(secondaryClient, c2);
+ let mut queryer = Queryer {
+ client: secondaryClient,
+ core: c2
+ };
- loop {
- chan_select! {
- twete_rx.recv() -> twete => {
- match twete {
- Some(line) => {
- let jsonstr = std::str::from_utf8(&line).unwrap().trim();
- println!("{}", jsonstr);
- /* TODO: replace from_str with from_slice */
- let json: serde_json::Value = serde_json::from_str(&jsonstr).unwrap();
- display_event(json, &mut tweeter);
+ loop {
+ let ui_rx_b = &ui_rx;
+ chan_select! {
+ twete_rx.recv() -> twete => match twete {
+ Some(line) => {
+ let jsonstr = std::str::from_utf8(&line).unwrap().trim();
+// println!("{}", jsonstr);
+ /* TODO: replace from_str with from_slice */
+ let json: serde_json::Value = serde_json::from_str(&jsonstr).unwrap();
+ display_event(json, &mut tweeter, &mut queryer);
+ if tweeter.needs_save && tweeter.caching_permitted {
+ tweeter.store_cache();
+ }
+ }
+ None => {
+ println!("Twitter stream hung up...");
+ chan_select! {
+ ui_rx_b.recv() -> input => match input {
+ Some(line) => handle_user_input(line, &mut tweeter, &mut queryer),
+ None => std::process::exit(0)
}
- None => { println!("???"); }
}
+ }
+ },
+ ui_rx.recv() -> user_input => match user_input {
+ Some(line) => {
+ handle_user_input(line, &mut tweeter, &mut queryer);
},
- ui_rx.recv() -> user_input => {
- println!("ui_rx recv");
- match user_input {
- Some(line) => println!("You typed {}", std::str::from_utf8(&line).unwrap()),
- None => println!("??? 2")
- }
+ None => println!("UI thread hung up...")
+ }
+ }
+ }
+
+ println!("Bye bye");
+}
+
+fn handle_user_input(line: Vec<u8>, tweeter: &mut tw::TwitterCache, mut queryer: &mut Queryer) {
+ if line == String::from("show_cache\n").into_bytes() {
+ println!("----* USERS *----");
+ for (uid, user) in &tweeter.users {
+ println!("User: {} -> {:?}", uid, user);
+ }
+ println!("----* TWEETS *----");
+ for (tid, tweet) in &tweeter.tweets {
+ println!("Tweet: {} -> {:?}", tid, tweet);
+ }
+ println!("----* FOLLOWERS *----");
+ for uid in &tweeter.followers.clone() {
+ let user_res = tweeter.fetch_user(uid, &mut queryer);
+ match user_res {
+ Some(user) => {
+ println!("Follower: {} - {:?}", uid, user);
}
+ None => { println!(" ..."); }
}
}
- });
+ } else if line == String::from("q\n").into_bytes() {
+ println!("Bye bye!");
+ tweeter.store_cache();
+ std::process::exit(0);
+ } else if line.starts_with("look_up_".as_bytes()) {
+ let linestr = std::str::from_utf8(&line).unwrap().trim();
+ if linestr.starts_with("look_up_tweet") {
+ let tweetid = &linestr.split(" ").collect::<Vec<&str>>()[1].to_string();
+ if let Some(tweet) = tweeter.fetch_tweet(tweetid, &mut queryer) {
+ println!("{:?}", tweet);
+ } else {
+// println!("Couldn't retrieve {}", tweetid);
+ }
+ } else if linestr.starts_with("look_up_user") {
+ let userid = &linestr.split(" ").collect::<Vec<&str>>()[1].to_string();
+ if let Some(user) = tweeter.fetch_user(userid, &mut queryer) {
+ println!("{:?}", user);
+ } else {
+// println!("Couldn't retrieve {}", userid);
+ }
+ }
+ }
+}
+
+fn connect_twitter_stream() -> chan::Receiver<Vec<u8>> {
+ let (twete_tx, twete_rx) = chan::sync::<Vec<u8>>(0);
std::thread::spawn(move || {
- use std::io::Read;
- loop {
- let mut line = String::new();
- std::io::stdin().read_line(&mut line);
- ui_tx.send(line.into_bytes());
- }
- });
+ let mut core = Core::new().unwrap();
- println!("Before?");
- let work = client.request(req).and_then(|res| {
- LineStream::new(res.body()
- .map(|chunk| futures::stream::iter(chunk.into_iter().map(|b| Ok(b))))
- .flatten())
- .for_each(|s| {
- if s.len() != 1 {
- //println!("Send!: {}", std::str::from_utf8(&s).unwrap());
- twete_tx.send(s);
- };
- Ok(())
- })
- });
+ let connector = HttpsConnector::new(1, &core.handle()).unwrap();
+
+ let client = Client::configure()
+ .keep_alive(true)
+ .connector(connector)
+ .build(&core.handle());
- let resp = core.run(work).unwrap();
- println!("After?");
+ // println!("{}", do_web_req("https://caps.twitter.com/v2/capi/passthrough/1?twitter:string:card_uri=card://887655800482787328&twitter:long:original_tweet_id=887655800981925888&twitter:string:response_card_name=poll3choice_text_only&twitter:string:cards_platform=Web-12", &client, &mut core).unwrap());
+ // println!("{}", look_up_tweet("887655800981925888", &client, &mut core).unwrap());
- /*
- for obj in BufReader::new(res).json_objects() {
- println!("{:?}", obj.as_object().unwrap().get("text").unwrap().as_string().unwrap());
- }*/
+ let req = signed_api_get(streamurl);
+ let work = client.request(req).and_then(|res| {
+ let status = res.status();
+ if status != hyper::StatusCode::Ok {
+ println!("Twitter stream connect was abnormal: {}", status);
+ }
+ LineStream::new(res.body()
+ .map(|chunk| futures::stream::iter(chunk.into_iter().map(|b| Ok(b))))
+ .flatten())
+ .for_each(|s| {
+ if s.len() != 1 {
+ twete_tx.send(s);
+ };
+ Ok(())
+ })
+ });
+
+ let resp = core.run(work);
+ match resp {
+ Ok(good) => (),
+ Err(e) => println!("Error in setting up: {}", e)
+ }
+ });
+ twete_rx
}
//extern crate futures;