From 666ca5b38bbf4f5243519e79953038f947b268fc Mon Sep 17 00:00:00 2001 From: Andy Wortman Date: Mon, 20 Nov 2017 02:21:44 -0800 Subject: pass connection_id through to contextualize events welcome message now handled correctly for whichever stream made the connection additionally, moved profile-specific operations to TwitterProfile events, tweets, and DMs don't consider connection because they include a target indicator which means we don't *need* connection_id at that point --- src/main.rs | 90 ++++++++++++++++++++++++++++++++----------------------------- 1 file changed, 47 insertions(+), 43 deletions(-) (limited to 'src/main.rs') diff --git a/src/main.rs b/src/main.rs index b9bf27d..e45a799 100644 --- a/src/main.rs +++ b/src/main.rs @@ -210,11 +210,11 @@ fn main() { tweeter.display_info.status("Cache loaded".to_owned()); - let (twete_tx, mut twete_rx) = chan::sync::>(0); - let (coordination_tx, mut coordination_rx) = chan::sync::(0); + let (twete_tx, twete_rx) = chan::sync::<(u8, Vec)>(0); + let (coordination_tx, coordination_rx) = chan::sync::<(u8, TwitterConnectionState)>(0); for (ref profile_name, ref profile) in &tweeter.profiles { - connect_twitter_stream(tweeter.app_key.clone(), profile.creds.clone(), twete_tx.clone(), coordination_tx.clone(), get_id()); + connect_twitter_stream(tweeter.app_key.clone(), profile_name.to_string(), profile.creds.clone(), twete_tx.clone(), coordination_tx.clone(), get_id()); } std::thread::spawn(move || { @@ -249,18 +249,8 @@ fn main() { Err(e) => println!("{}", e) // TODO: we got here because writing to stdout failed. what to do now? }; - loop { - match do_ui(ui_rx, twete_rx, &twete_tx, coordination_rx, &coordination_tx, &mut tweeter, &mut queryer) { - Some((new_ui_rx, new_twete_rx, new_coordination_rx)) => { - ui_rx = new_ui_rx; - twete_rx = new_twete_rx; - coordination_rx = new_coordination_rx; - }, - None => { - break; - } - } - } + do_ui(ui_rx, twete_rx, &twete_tx, coordination_rx, &coordination_tx, &mut tweeter, &mut queryer); + tcsetattr(0, TCSANOW, &termios); } @@ -294,7 +284,7 @@ fn handle_input(event: termion::event::Event, tweeter: &mut tw::TwitterCache, qu tweeter.display_info.mode = Some(display::DisplayMode::Reply(twid, "".to_owned())); } } - } + } Event::Key(Key::Ctrl('n')) => { match tweeter.display_info.mode.clone() { Some(display::DisplayMode::Compose(msg)) => { @@ -359,12 +349,12 @@ fn handle_input(event: termion::event::Event, tweeter: &mut tw::TwitterCache, qu } } -fn handle_twitter_line(line: Vec, mut tweeter: &mut tw::TwitterCache, mut queryer: &mut ::Queryer) { +fn handle_twitter_line(conn_id: u8, line: Vec, mut tweeter: &mut tw::TwitterCache, mut queryer: &mut ::Queryer) { let jsonstr = std::str::from_utf8(&line).unwrap().trim(); /* TODO: replace from_str with from_slice? */ match serde_json::from_str(&jsonstr) { Ok(json) => { - tw::handle_message(json, &mut tweeter, &mut queryer); + tw::handle_message(conn_id, json, &mut tweeter, &mut queryer); if tweeter.needs_save && tweeter.caching_permitted { tweeter.store_cache(); } @@ -375,33 +365,46 @@ fn handle_twitter_line(line: Vec, mut tweeter: &mut tw::TwitterCache, mut qu } fn do_ui( - ui_rx_orig: chan::Receiver>, - twete_rx: chan::Receiver>, - twete_tx: &chan::Sender>, - coordination_rx: chan::Receiver, - coordination_tx: &chan::Sender, + ui_rx: chan::Receiver>, + twete_rx: chan::Receiver<(u8, Vec)>, + twete_tx: &chan::Sender<(u8, Vec)>, + coordination_rx: chan::Receiver<(u8, TwitterConnectionState)>, + coordination_tx: &chan::Sender<(u8, TwitterConnectionState)>, mut tweeter: &mut tw::TwitterCache, mut queryer: &mut ::Queryer -) -> Option<(chan::Receiver>, chan::Receiver>, chan::Receiver)> { +) { loop { - let ui_rx_a = &ui_rx_orig; - let ui_rx_b = &ui_rx_orig; chan_select! { coordination_rx.recv() -> coordination => { - tweeter.display_info.status(format!("{:?}", coordination)); + match coordination { + Some((conn_id, coordination)) => { + match coordination { + TwitterConnectionState::Connecting(profile_name) => { + tweeter.connection_map.insert(conn_id, profile_name); + }, + TwitterConnectionState::Connected => { + tweeter.display_info.status(format!("Stream connected for profile \"{}\"", tweeter.connection_map[&conn_id])); + }, + TwitterConnectionState::Closed => { + tweeter.connection_map.remove(&conn_id); + } + } + }, + None => { /* if this stream closes something is terribly wrong... */ panic!("Coordination tx/rx closed!"); } + } }, twete_rx.recv() -> twete => match twete { - Some(line) => handle_twitter_line(line, tweeter, queryer), + Some((conn_id, line)) => handle_twitter_line(conn_id, line, tweeter, queryer), None => { tweeter.display_info.status("Twitter stream hung up...".to_owned()); display::paint(tweeter).unwrap(); - return None // if the twitter channel died, something real bad happeneed? + return; // if the twitter channel died, something real bad happeneed? } }, - ui_rx_a.recv() -> user_input => match user_input { + ui_rx.recv() -> user_input => match user_input { Some(Ok(event)) => handle_input(event, tweeter, queryer), Some(Err(_)) => (), /* stdin closed? */ - None => return None // UI ded + None => return // UI ded } } @@ -443,7 +446,7 @@ fn do_ui( tweeter.state = tw::AppState::View; match tweeter.profiles.get(&profile_name).map(|profile| profile.creds.to_owned()) { Some(user_creds) => { - connect_twitter_stream(tweeter.app_key.clone(), user_creds, twete_tx.clone(), coordination_tx.clone(), get_id()) + connect_twitter_stream(tweeter.app_key.clone(), profile_name, user_creds, twete_tx.clone(), coordination_tx.clone(), get_id()) }, None => { tweeter.display_info.status(format!("No profile named {}", profile_name)); @@ -456,9 +459,9 @@ fn do_ui( tweeter.store_cache(); tweeter.display_info.status("Bye bye!".to_owned()); display::paint(tweeter).unwrap(); - return None + return; }, - _ => () + tw::AppState::View | tw::AppState::Compose => { /* nothing special to do */ } }; } } @@ -498,19 +501,20 @@ fn url_encode(s: &str) -> String { // let (twete_tx, twete_rx) = chan::sync::>(0); #[derive(Debug)] enum TwitterConnectionState { - Connecting(u8), - Connected(u8), - Closed(u8) + Connecting(String), + Connected, + Closed } fn connect_twitter_stream( app_cred: tw::Credential, + profile_name: String, user_cred: tw::Credential, - twete_tx: chan::Sender>, - coordination_tx: chan::Sender, + twete_tx: chan::Sender<(u8, Vec)>, + coordination_tx: chan::Sender<(u8, TwitterConnectionState)>, conn_id: u8 ) { std::thread::spawn(move || { - coordination_tx.send(TwitterConnectionState::Connecting(conn_id)); + coordination_tx.send((conn_id, TwitterConnectionState::Connecting(profile_name))); let mut core = Core::new().unwrap(); let connector = HttpsConnector::new(1, &core.handle()).unwrap(); @@ -527,13 +531,13 @@ fn connect_twitter_stream( println!("Twitter stream connect was abnormal: {}", status); println!("result: {:?}", res); } - coordination_tx.send(TwitterConnectionState::Connected(conn_id)); + coordination_tx.send((conn_id, TwitterConnectionState::Connected)); LineStream::new(res.body() .map(|chunk| futures::stream::iter_ok(chunk.into_iter())) .flatten()) .for_each(|s| { if s.len() != 1 { - twete_tx.send(s); + twete_tx.send((conn_id, s)); }; Ok(()) }) @@ -544,6 +548,6 @@ fn connect_twitter_stream( Ok(_good) => (), Err(e) => println!("Error in setting up: {}", e) } - coordination_tx.send(TwitterConnectionState::Closed(conn_id)); + coordination_tx.send((conn_id, TwitterConnectionState::Closed)); }); } -- cgit v1.1