From 93cb77eb29c3e8b76694b5a11b6c1c3d469ae8db Mon Sep 17 00:00:00 2001 From: Andy Wortman Date: Mon, 17 Jul 2017 00:42:41 -0700 Subject: gettin better - thread for stream, thread for render render slightly intelligently, too --- main.rs | 240 ++++++++++++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 174 insertions(+), 66 deletions(-) (limited to 'main.rs') diff --git a/main.rs b/main.rs index d13f90c..6efd9a5 100644 --- a/main.rs +++ b/main.rs @@ -1,51 +1,9 @@ +extern crate serde_json; + use rustc_serialize::json::Json; use std::str; use std::io::BufRead; -pub trait JsonObjectStreamer: Sized { - fn json_objects(&mut self) -> JsonObjects; -} - -impl JsonObjectStreamer for T { - fn json_objects(&mut self) -> JsonObjects { - JsonObjects { reader: self } - } -} - -pub struct JsonObjects<'a, B> where B: 'a { - reader: &'a mut B -} - -impl<'a, B> Iterator for JsonObjects<'a, B> where B: BufRead + 'a { - - type Item = Json; - - fn next(&mut self) -> Option { - - let mut buf: Vec = Vec::new(); - - let _ = self.reader.read_until(b'\r', &mut buf); - - if buf.last() == Some(&b'\r') { - buf.pop(); - let mut b: String = String::new(); - match self.reader.read_line(&mut b) { - Ok(_) => (), - Err(_) => return None, - } - } - - let line = match str::from_utf8(&buf) { - Ok(line) => line, - Err(_) => return None - }; - - Json::from_str(line).ok() - - } - -} - extern crate url; #[macro_use] extern crate hyper; extern crate rustc_serialize; @@ -64,24 +22,84 @@ use hyper::client::FutureResponse; use hyper_tls::HttpsConnector; //use json_streamer::JsonObjectStreamer; +//Change these values to your real Twitter API credentials +static consumer_key: &str = "0af9c1AoEi5X7IjtOKAtP60Za"; +static consumer_secret: &str = "1fxEzRhQtQSWKus4oqDwdg5DALIjGpINg0PGjkYVwKT8EEMFCh"; +static token: &str = "629126745-VePBD9ciKwpuVuIeEcNnxwxQFNWDXEy8KL3dGRRg"; +static token_secret: &str = "uAAruZzJu03NvMlH6cTeGku7NqVPro1ddKN4BxORy5hWG"; + +static streamurl: &str = "https://userstream.twitter.com/1.1/user.json"; +static tweet_lookup_url: &str = "https://api.twitter.com/1.1/statuses/show.json"; +static user_lookup_url: &str = "https://api.twitter.com/1.1/users/lookup.json"; + header! { (Authorization, "Authorization") => [String] } header! { (Accept, "Accept") => [String] } header! { (ContentType, "Content-Type") => [String] } -fn main() { +fn render_tweet(structure: serde_json::Map) { + if structure.contains_key("event") { + match &structure["event"].as_str().unwrap() { + &"follow" => println!("followed! by {} (@{})", structure["source"]["name"], structure["source"]["screen_name"]), + &"favorite" => println!("fav"), + e => println!("unrecognized event: {}", e) + } + } else if structure.contains_key("delete") { + println!("delete..."); + let deleted_user_id = structure["delete"]["status"]["user_id_str"].as_str().unwrap(); + let userjson = look_up_user(deleted_user_id); + let screen_name = match userjson { + Some(ref json) => { + json[0]["screen_name"].as_str().clone().unwrap() + }, + None => { "idk lol" } + }; + println!("who? {} - {}", structure["delete"]["status"]["user_id_str"], screen_name); + } else if structure.contains_key("user") && structure.contains_key("id") { + // probably a tweet + let mut twete: &serde_json::Map = &structure; // type isn't actually necessary here, but that lead me to the right rvalue + let source_name = twete["user"]["name"].as_str().unwrap(); + let source_screen_name = twete["user"]["screen_name"].as_str().unwrap(); + if twete.contains_key("retweeted_status") { + // render RT, actually + match &twete["retweeted_status"] { + // v--- why is it permissible to write "ref" here? does + // this take a ref of `value`? + &serde_json::Value::Object(ref value) => twete = value, + f => panic!(" o no, wrong type of thing! {}", f) + } - //Change these values to your real Twitter API credentials - let consumer_key = "0af9c1AoEi5X7IjtOKAtP60Za"; - let consumer_secret = "1fxEzRhQtQSWKus4oqDwdg5DALIjGpINg0PGjkYVwKT8EEMFCh"; - let token = "629126745-VePBD9ciKwpuVuIeEcNnxwxQFNWDXEy8KL3dGRRg"; - let token_secret = "uAAruZzJu03NvMlH6cTeGku7NqVPro1ddKN4BxORy5hWG"; + let author_name = twete["user"]["name"].as_str().unwrap(); + let author_screen_name = twete["user"]["screen_name"].as_str().unwrap(); - //Track words + println!("{} (@{}) via {} (@{}) RT:", author_name, author_screen_name, source_name, source_screen_name); + } else { + println!("{} (@{})", source_name, source_screen_name); + } + let mut twete_text = (if twete["truncated"].as_bool().unwrap() { + // get full text here! + println!(" ... :/"); + "asdf" + } else { + twete["text"].as_str().unwrap() + }) + .replace("&", "&") + .replace(">", ">") + .replace("<", "<"); + for url in twete["entries"]["urls"].as_array().unwrap() { + twete_text.replace(url["url"].as_str().unwrap(), url["expanded_url"].as_str().unwrap()); + } + println!("{}", twete_text); + if twete.contains_key("quoted_status") { + println!(" and it's a quote "); + } + } + println!(""); +} + +fn signed_get(url: &str) -> hyper::client::Request { // let params: Vec<(String, String)> = vec![("track".to_string(), "london".to_string())]; let params: Vec<(String, String)> = vec![]; -// let url = "https://stream.twitter.com/1.1/statuses/filter.json"; - let url = "https://userstream.twitter.com/1.1/user.json"; -// let url = "https://stream.twitter.com/1.1/statuses/sample.json"; + let param_string: String = params.iter().map(|p| p.0.clone() + &"=".to_string() + &p.1).collect::>().join("&"); let header = oauthcli::authorization_header( "GET", @@ -99,28 +117,93 @@ fn main() { params.clone().into_iter() ); + let mut req = Request::new(Method::Get, url.parse().unwrap()); + + req.set_body(param_string); + + { + let mut headers = req.headers_mut(); + headers.set(Authorization(header.to_owned())); + headers.set(Accept("*/*".to_owned())); + headers.set(ContentType("application/x-www-form-urlencoded".to_owned())); + }; + + req +} + +fn look_up_user(id: &str) -> Option { let mut core = Core::new().unwrap(); + let connector = HttpsConnector::new(4, &core.handle()).unwrap(); + + let client = Client::configure() + .connector(connector) + .build(&core.handle()); + + let lookup = client.request(signed_get(&format!("{}?user_id={}", user_lookup_url, id))); + let resp: hyper::Response = core.run(lookup).unwrap(); +// println!("user lookup request out.."); + let w = resp.body() + .map(|chunk| futures::stream::iter(chunk.into_iter().map(|b| -> Result { Ok(b) }))) + .flatten() + .wait(); + let resp_body = w.map(|r| { r.unwrap() }).collect::>(); + match serde_json::from_slice(&resp_body) { + Ok(value) => Some(value), + Err(e) => { + println!("error deserializing json: {}", e); + None + } + } +} +fn look_up_tweet(id: &str) -> Option { + let mut core = Core::new().unwrap(); let connector = HttpsConnector::new(4, &core.handle()).unwrap(); let client = Client::configure() - .keep_alive(true) .connector(connector) .build(&core.handle()); - let param_string: String = params.iter().map(|p| p.0.clone() + &"=".to_string() + &p.1).collect::>().join("&"); + let lookup = client.request(signed_get(&format!("{}?id={}", tweet_lookup_url, id))); + let resp: hyper::Response = core.run(lookup).unwrap(); + let w = resp.body() + .map(|chunk| futures::stream::iter(chunk.into_iter().map(|b| -> Result { Ok(b) }))) + .flatten() + .wait(); + let resp_body = w.map(|r| { r.unwrap() }).collect::>(); + match serde_json::from_slice(&resp_body) { + Ok(value) => Some(value), + Err(e) => { + println!("error deserializing json: {}", e); + None + } + } +} - let mut req = Request::new(Method::Get, url.parse().unwrap()); - req.set_body(param_string); - { - let mut headers = req.headers_mut(); - headers.set(Authorization(header.to_owned())); - headers.set(Accept("*/*".to_owned())); - headers.set(ContentType("application/x-www-form-urlencoded".to_owned())); - }; +fn main() { + + + //Track words +// let url = "https://stream.twitter.com/1.1/statuses/filter.json"; +// let url = "https://stream.twitter.com/1.1/statuses/sample.json"; + + let mut core = Core::new().unwrap(); + + let connector = HttpsConnector::new(4, &core.handle()).unwrap(); + + let client = Client::configure() + .keep_alive(true) + .connector(connector) + .build(&core.handle()); - println!("requesting..."); + let req = signed_get(streamurl); + + println!("starting!"); +// println!("{}", look_up_user("12503922").unwrap()[0]["screen_name"].as_str().unwrap()); +// println!("lookup'd"); + +// println!("requesting..."); /* let work = client.request(req).and_then(|res| { res.body().for_each(move |body: hyper::Chunk| { @@ -131,11 +214,36 @@ fn main() { }); */ + let (tx, rx) = std::sync::mpsc::channel::>(); + + std::thread::spawn(move || { + loop { + match rx.recv() { + Ok(line) => { + let jsonstr = std::str::from_utf8(&line).unwrap().trim(); + //println!("{}", jsonstr); + let json: serde_json::Value = serde_json::from_str(&jsonstr).unwrap(); + match json { + serde_json::Value::Object(objmap) => render_tweet(objmap), + f => println!("Unexpected object: {}", f) + } + } + Err(e) => { println!("{}", e); } + } + } + }); + let work = client.request(req).and_then(|res| { LineStream::new(res.body() .map(|chunk| futures::stream::iter(chunk.into_iter().map(|b| -> Result { Ok(b) }))) .flatten()) - .for_each(|s| Ok(print!("{}", str::from_utf8(&s).unwrap()))) + .for_each(|s| { + if s.len() != 1 { + println!("Send!: {}", std::str::from_utf8(&s).unwrap()); + tx.send(s); + }; + Ok(()) + }) }); println!("Before?"); -- cgit v1.1