aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndy Wortman <me@iximeow.net>2017-07-17 00:42:41 -0700
committerAndy Wortman <me@iximeow.net>2017-07-17 00:42:47 -0700
commit93cb77eb29c3e8b76694b5a11b6c1c3d469ae8db (patch)
tree8ac9aa14b565c60cfc6aa84c6f3f06d2e8862be8
parent322ed4cd914b92a18bfee33f33e23649f3035f03 (diff)
gettin better - thread for stream, thread for render
render slightly intelligently, too
-rw-r--r--Cargo.toml2
-rw-r--r--main.rs240
2 files changed, 175 insertions, 67 deletions
diff --git a/Cargo.toml b/Cargo.toml
index 1784ff9..77a8704 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -25,4 +25,4 @@ hyper-tls = "*"
"url" = "*"
"tokio-core" = "*"
"futures" = "*"
-
+"serde_json" = "*"
diff --git a/main.rs b/main.rs
index d13f90c..6efd9a5 100644
--- a/main.rs
+++ b/main.rs
@@ -1,51 +1,9 @@
+extern crate serde_json;
+
use rustc_serialize::json::Json;
use std::str;
use std::io::BufRead;
-pub trait JsonObjectStreamer: Sized {
- fn json_objects(&mut self) -> JsonObjects<Self>;
-}
-
-impl<T: BufRead> JsonObjectStreamer for T {
- fn json_objects(&mut self) -> JsonObjects<T> {
- JsonObjects { reader: self }
- }
-}
-
-pub struct JsonObjects<'a, B> where B: 'a {
- reader: &'a mut B
-}
-
-impl<'a, B> Iterator for JsonObjects<'a, B> where B: BufRead + 'a {
-
- type Item = Json;
-
- fn next(&mut self) -> Option<Json> {
-
- let mut buf: Vec<u8> = Vec::new();
-
- let _ = self.reader.read_until(b'\r', &mut buf);
-
- if buf.last() == Some(&b'\r') {
- buf.pop();
- let mut b: String = String::new();
- match self.reader.read_line(&mut b) {
- Ok(_) => (),
- Err(_) => return None,
- }
- }
-
- let line = match str::from_utf8(&buf) {
- Ok(line) => line,
- Err(_) => return None
- };
-
- Json::from_str(line).ok()
-
- }
-
-}
-
extern crate url;
#[macro_use] extern crate hyper;
extern crate rustc_serialize;
@@ -64,24 +22,84 @@ use hyper::client::FutureResponse;
use hyper_tls::HttpsConnector;
//use json_streamer::JsonObjectStreamer;
+//Change these values to your real Twitter API credentials
+static consumer_key: &str = "0af9c1AoEi5X7IjtOKAtP60Za";
+static consumer_secret: &str = "1fxEzRhQtQSWKus4oqDwdg5DALIjGpINg0PGjkYVwKT8EEMFCh";
+static token: &str = "629126745-VePBD9ciKwpuVuIeEcNnxwxQFNWDXEy8KL3dGRRg";
+static token_secret: &str = "uAAruZzJu03NvMlH6cTeGku7NqVPro1ddKN4BxORy5hWG";
+
+static streamurl: &str = "https://userstream.twitter.com/1.1/user.json";
+static tweet_lookup_url: &str = "https://api.twitter.com/1.1/statuses/show.json";
+static user_lookup_url: &str = "https://api.twitter.com/1.1/users/lookup.json";
+
header! { (Authorization, "Authorization") => [String] }
header! { (Accept, "Accept") => [String] }
header! { (ContentType, "Content-Type") => [String] }
-fn main() {
+fn render_tweet(structure: serde_json::Map<String, serde_json::Value>) {
+ if structure.contains_key("event") {
+ match &structure["event"].as_str().unwrap() {
+ &"follow" => println!("followed! by {} (@{})", structure["source"]["name"], structure["source"]["screen_name"]),
+ &"favorite" => println!("fav"),
+ e => println!("unrecognized event: {}", e)
+ }
+ } else if structure.contains_key("delete") {
+ println!("delete...");
+ let deleted_user_id = structure["delete"]["status"]["user_id_str"].as_str().unwrap();
+ let userjson = look_up_user(deleted_user_id);
+ let screen_name = match userjson {
+ Some(ref json) => {
+ json[0]["screen_name"].as_str().clone().unwrap()
+ },
+ None => { "idk lol" }
+ };
+ println!("who? {} - {}", structure["delete"]["status"]["user_id_str"], screen_name);
+ } else if structure.contains_key("user") && structure.contains_key("id") {
+ // probably a tweet
+ let mut twete: &serde_json::Map<String, serde_json::Value> = &structure; // type isn't actually necessary here, but that lead me to the right rvalue
+ let source_name = twete["user"]["name"].as_str().unwrap();
+ let source_screen_name = twete["user"]["screen_name"].as_str().unwrap();
+ if twete.contains_key("retweeted_status") {
+ // render RT, actually
+ match &twete["retweeted_status"] {
+ // v--- why is it permissible to write "ref" here? does
+ // this take a ref of `value`?
+ &serde_json::Value::Object(ref value) => twete = value,
+ f => panic!(" o no, wrong type of thing! {}", f)
+ }
- //Change these values to your real Twitter API credentials
- let consumer_key = "0af9c1AoEi5X7IjtOKAtP60Za";
- let consumer_secret = "1fxEzRhQtQSWKus4oqDwdg5DALIjGpINg0PGjkYVwKT8EEMFCh";
- let token = "629126745-VePBD9ciKwpuVuIeEcNnxwxQFNWDXEy8KL3dGRRg";
- let token_secret = "uAAruZzJu03NvMlH6cTeGku7NqVPro1ddKN4BxORy5hWG";
+ let author_name = twete["user"]["name"].as_str().unwrap();
+ let author_screen_name = twete["user"]["screen_name"].as_str().unwrap();
- //Track words
+ println!("{} (@{}) via {} (@{}) RT:", author_name, author_screen_name, source_name, source_screen_name);
+ } else {
+ println!("{} (@{})", source_name, source_screen_name);
+ }
+ let mut twete_text = (if twete["truncated"].as_bool().unwrap() {
+ // get full text here!
+ println!(" ... :/");
+ "asdf"
+ } else {
+ twete["text"].as_str().unwrap()
+ })
+ .replace("&amp;", "&")
+ .replace("&gt;", ">")
+ .replace("&lt;", "<");
+ for url in twete["entries"]["urls"].as_array().unwrap() {
+ twete_text.replace(url["url"].as_str().unwrap(), url["expanded_url"].as_str().unwrap());
+ }
+ println!("{}", twete_text);
+ if twete.contains_key("quoted_status") {
+ println!(" and it's a quote ");
+ }
+ }
+ println!("");
+}
+
+fn signed_get(url: &str) -> hyper::client::Request {
// let params: Vec<(String, String)> = vec![("track".to_string(), "london".to_string())];
let params: Vec<(String, String)> = vec![];
-// let url = "https://stream.twitter.com/1.1/statuses/filter.json";
- let url = "https://userstream.twitter.com/1.1/user.json";
-// let url = "https://stream.twitter.com/1.1/statuses/sample.json";
+ let param_string: String = params.iter().map(|p| p.0.clone() + &"=".to_string() + &p.1).collect::<Vec<String>>().join("&");
let header = oauthcli::authorization_header(
"GET",
@@ -99,28 +117,93 @@ fn main() {
params.clone().into_iter()
);
+ let mut req = Request::new(Method::Get, url.parse().unwrap());
+
+ req.set_body(param_string);
+
+ {
+ let mut headers = req.headers_mut();
+ headers.set(Authorization(header.to_owned()));
+ headers.set(Accept("*/*".to_owned()));
+ headers.set(ContentType("application/x-www-form-urlencoded".to_owned()));
+ };
+
+ req
+}
+
+fn look_up_user(id: &str) -> Option<serde_json::Value> {
let mut core = Core::new().unwrap();
+ let connector = HttpsConnector::new(4, &core.handle()).unwrap();
+
+ let client = Client::configure()
+ .connector(connector)
+ .build(&core.handle());
+
+ let lookup = client.request(signed_get(&format!("{}?user_id={}", user_lookup_url, id)));
+ let resp: hyper::Response = core.run(lookup).unwrap();
+// println!("user lookup request out..");
+ let w = resp.body()
+ .map(|chunk| futures::stream::iter(chunk.into_iter().map(|b| -> Result<u8, hyper::Error> { Ok(b) })))
+ .flatten()
+ .wait();
+ let resp_body = w.map(|r| { r.unwrap() }).collect::<Vec<u8>>();
+ match serde_json::from_slice(&resp_body) {
+ Ok(value) => Some(value),
+ Err(e) => {
+ println!("error deserializing json: {}", e);
+ None
+ }
+ }
+}
+fn look_up_tweet(id: &str) -> Option<serde_json::Value> {
+ let mut core = Core::new().unwrap();
let connector = HttpsConnector::new(4, &core.handle()).unwrap();
let client = Client::configure()
- .keep_alive(true)
.connector(connector)
.build(&core.handle());
- let param_string: String = params.iter().map(|p| p.0.clone() + &"=".to_string() + &p.1).collect::<Vec<String>>().join("&");
+ let lookup = client.request(signed_get(&format!("{}?id={}", tweet_lookup_url, id)));
+ let resp: hyper::Response = core.run(lookup).unwrap();
+ let w = resp.body()
+ .map(|chunk| futures::stream::iter(chunk.into_iter().map(|b| -> Result<u8, hyper::Error> { Ok(b) })))
+ .flatten()
+ .wait();
+ let resp_body = w.map(|r| { r.unwrap() }).collect::<Vec<u8>>();
+ match serde_json::from_slice(&resp_body) {
+ Ok(value) => Some(value),
+ Err(e) => {
+ println!("error deserializing json: {}", e);
+ None
+ }
+ }
+}
- let mut req = Request::new(Method::Get, url.parse().unwrap());
- req.set_body(param_string);
- {
- let mut headers = req.headers_mut();
- headers.set(Authorization(header.to_owned()));
- headers.set(Accept("*/*".to_owned()));
- headers.set(ContentType("application/x-www-form-urlencoded".to_owned()));
- };
+fn main() {
+
+
+ //Track words
+// let url = "https://stream.twitter.com/1.1/statuses/filter.json";
+// let url = "https://stream.twitter.com/1.1/statuses/sample.json";
+
+ let mut core = Core::new().unwrap();
+
+ let connector = HttpsConnector::new(4, &core.handle()).unwrap();
+
+ let client = Client::configure()
+ .keep_alive(true)
+ .connector(connector)
+ .build(&core.handle());
- println!("requesting...");
+ let req = signed_get(streamurl);
+
+ println!("starting!");
+// println!("{}", look_up_user("12503922").unwrap()[0]["screen_name"].as_str().unwrap());
+// println!("lookup'd");
+
+// println!("requesting...");
/*
let work = client.request(req).and_then(|res| {
res.body().for_each(move |body: hyper::Chunk| {
@@ -131,11 +214,36 @@ fn main() {
});
*/
+ let (tx, rx) = std::sync::mpsc::channel::<Vec<u8>>();
+
+ std::thread::spawn(move || {
+ loop {
+ match rx.recv() {
+ Ok(line) => {
+ let jsonstr = std::str::from_utf8(&line).unwrap().trim();
+ //println!("{}", jsonstr);
+ let json: serde_json::Value = serde_json::from_str(&jsonstr).unwrap();
+ match json {
+ serde_json::Value::Object(objmap) => render_tweet(objmap),
+ f => println!("Unexpected object: {}", f)
+ }
+ }
+ Err(e) => { println!("{}", e); }
+ }
+ }
+ });
+
let work = client.request(req).and_then(|res| {
LineStream::new(res.body()
.map(|chunk| futures::stream::iter(chunk.into_iter().map(|b| -> Result<u8, hyper::Error> { Ok(b) })))
.flatten())
- .for_each(|s| Ok(print!("{}", str::from_utf8(&s).unwrap())))
+ .for_each(|s| {
+ if s.len() != 1 {
+ println!("Send!: {}", std::str::from_utf8(&s).unwrap());
+ tx.send(s);
+ };
+ Ok(())
+ })
});
println!("Before?");