aboutsummaryrefslogtreecommitdiff
path: root/src/main.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/main.rs')
-rw-r--r--src/main.rs110
1 files changed, 70 insertions, 40 deletions
diff --git a/src/main.rs b/src/main.rs
index bfb60f9..cd15ea9 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -180,6 +180,16 @@ fn inner_signed_api_req(url: &str, method: Method, app_cred: &tw::Credential, ma
req
}
+static mut connection_id: u8 = 0;
+
+fn get_id() -> u8 {
+ unsafe {
+ let curr_id = connection_id;
+ connection_id += 1;
+ curr_id
+ }
+}
+
fn main() {
//Track words
@@ -200,11 +210,13 @@ fn main() {
tweeter.display_info.status("Cache loaded".to_owned());
- let mut maybe_twete_rx = tweeter.profile.clone()
- .map(|user_creds| {
- let rx = connect_twitter_stream(tweeter.app_key.clone(), user_creds);
- tweeter.display_info.status("Twitter stream open".to_owned());
- rx
+ let (tweet_tx, mut twete_rx) = chan::sync::<Vec<u8>>(0);
+ let (coordination_tx, mut coordination_rx) = chan::sync::<TwitterConnectionState>(0);
+
+ tweeter.current_profile()
+ .map(|user_profile| user_profile.to_owned())
+ .map(|user_profile| {
+ connect_twitter_stream(tweeter.app_key.clone(), user_profile.creds, tweet_tx.clone(), coordination_tx.clone(), get_id());
});
std::thread::spawn(move || {
@@ -240,10 +252,11 @@ fn main() {
};
loop {
- match do_ui(ui_rx, maybe_twete_rx, &mut tweeter, &mut queryer) {
- Some((new_ui_rx, new_maybe_twete_rx)) => {
+ match do_ui(ui_rx, twete_rx, coordination_rx, &mut tweeter, &mut queryer) {
+ Some((new_ui_rx, new_twete_rx, new_coordination_rx)) => {
ui_rx = new_ui_rx;
- maybe_twete_rx = new_maybe_twete_rx;
+ twete_rx = new_twete_rx;
+ coordination_rx = new_coordination_rx;
},
None => {
break;
@@ -314,8 +327,15 @@ fn handle_input(event: termion::event::Event, tweeter: &mut tw::TwitterCache, qu
}
Some(display::DisplayMode::Reply(twid, msg)) => {
if x == '\n' {
- // TODO: move this somewhere better.
- ::commands::twete::send_reply(msg, twid, tweeter, queryer);
+ match tweeter.current_profile().map(|profile| profile.to_owned()) {
+ Some(profile) => {
+ // TODO: move this somewhere better.
+ ::commands::twete::send_reply(msg, twid, tweeter, queryer, profile.creds);
+ },
+ None => {
+ tweeter.display_info.status("Cannot reply when not logged in".to_owned());
+ }
+ }
tweeter.display_info.mode = None;
} else {
tweeter.display_info.mode = Some(display::DisplayMode::Reply(twid, format!("{}{}", msg, x)));
@@ -356,35 +376,32 @@ fn handle_twitter_line(line: Vec<u8>, mut tweeter: &mut tw::TwitterCache, mut qu
}
}
-fn do_ui(ui_rx_orig: chan::Receiver<Result<termion::event::Event, std::io::Error>>, maybe_twete_rx: Option<chan::Receiver<Vec<u8>>>, mut tweeter: &mut tw::TwitterCache, mut queryer: &mut ::Queryer) -> Option<(chan::Receiver<Result<termion::event::Event, std::io::Error>>, Option<chan::Receiver<Vec<u8>>>)> {
+fn do_ui(
+ ui_rx_orig: chan::Receiver<Result<termion::event::Event, std::io::Error>>,
+ twete_rx: chan::Receiver<Vec<u8>>,
+ coordination_rx: chan::Receiver<TwitterConnectionState>,
+ mut tweeter: &mut tw::TwitterCache,
+ mut queryer: &mut ::Queryer
+) -> Option<(chan::Receiver<Result<termion::event::Event, std::io::Error>>, chan::Receiver<Vec<u8>>, chan::Receiver<TwitterConnectionState>)> {
loop {
let ui_rx_a = &ui_rx_orig;
let ui_rx_b = &ui_rx_orig;
- match &maybe_twete_rx {
- &Some(ref twete_rx) => {
- chan_select! {
- twete_rx.recv() -> twete => match twete {
- Some(line) => handle_twitter_line(line, tweeter, queryer),
- None => {
- tweeter.display_info.status("Twitter stream hung up...".to_owned());
- return Some((ui_rx_orig.clone(), None))
- }
- },
- ui_rx_a.recv() -> user_input => match user_input {
- Some(Ok(event)) => handle_input(event, tweeter, queryer),
- Some(Err(_)) => (), /* stdin closed? */
- None => return None // UI ded
- }
- }
+ chan_select! {
+ coordination_rx.recv() -> coordination => {
+ tweeter.display_info.status(format!("{:?}", coordination));
},
- &None => {
- chan_select! {
- ui_rx_a.recv() -> user_input => match user_input {
- Some(Ok(event)) => handle_input(event, tweeter, queryer),
- Some(Err(_)) => (), /* stdin closed? */
- None => return None // UI ded
- }
+ twete_rx.recv() -> twete => match twete {
+ Some(line) => handle_twitter_line(line, tweeter, queryer),
+ None => {
+ tweeter.display_info.status("Twitter stream hung up...".to_owned());
+ display::paint(tweeter).unwrap();
+ return None // if the twitter channel died, something real bad happeneed?
}
+ },
+ ui_rx_a.recv() -> user_input => match user_input {
+ Some(Ok(event)) => handle_input(event, tweeter, queryer),
+ Some(Err(_)) => (), /* stdin closed? */
+ None => return None // UI ded
}
}
@@ -424,7 +441,8 @@ fn do_ui(ui_rx_orig: chan::Receiver<Result<termion::event::Event, std::io::Error
}
tw::AppState::Reconnect => {
tweeter.state = tw::AppState::View;
- return Some((ui_rx_orig.clone(), tweeter.profile.clone().map(|creds| connect_twitter_stream(tweeter.app_key.clone(), creds))));
+ // TODO: reconnect *which*?
+ return None // Some((ui_rx_orig.clone(), tweeter.profile.clone().map(|creds| connect_twitter_stream(tweeter.app_key.clone(), creds))));
},
tw::AppState::Shutdown => {
tweeter.display_info.status("Saving cache...".to_owned());
@@ -471,10 +489,22 @@ fn url_encode(s: &str) -> String {
.replace("]", "%5d")
}
-fn connect_twitter_stream(app_cred: tw::Credential, user_cred: tw::Credential) -> chan::Receiver<Vec<u8>> {
- let (twete_tx, twete_rx) = chan::sync::<Vec<u8>>(0);
-
+// let (twete_tx, twete_rx) = chan::sync::<Vec<u8>>(0);
+#[derive(Debug)]
+enum TwitterConnectionState {
+ Connecting(u8),
+ Connected(u8),
+ Closed(u8)
+}
+fn connect_twitter_stream(
+ app_cred: tw::Credential,
+ user_cred: tw::Credential,
+ twete_tx: chan::Sender<Vec<u8>>,
+ coordination_tx: chan::Sender<TwitterConnectionState>,
+ conn_id: u8
+) {
std::thread::spawn(move || {
+ coordination_tx.send(TwitterConnectionState::Connecting(conn_id));
let mut core = Core::new().unwrap();
let connector = HttpsConnector::new(1, &core.handle()).unwrap();
@@ -491,6 +521,7 @@ fn connect_twitter_stream(app_cred: tw::Credential, user_cred: tw::Credential) -
println!("Twitter stream connect was abnormal: {}", status);
println!("result: {:?}", res);
}
+ coordination_tx.send(TwitterConnectionState::Connected(conn_id));
LineStream::new(res.body()
.map(|chunk| futures::stream::iter_ok(chunk.into_iter()))
.flatten())
@@ -507,7 +538,6 @@ fn connect_twitter_stream(app_cred: tw::Credential, user_cred: tw::Credential) -
Ok(_good) => (),
Err(e) => println!("Error in setting up: {}", e)
}
+ coordination_tx.send(TwitterConnectionState::Closed(conn_id));
});
-
- twete_rx
}