aboutsummaryrefslogtreecommitdiff
path: root/src/main.rs
diff options
context:
space:
mode:
authorAndy Wortman <ixineeringeverywhere@gmail.com>2017-11-18 16:49:23 -0800
committerAndy Wortman <ixineeringeverywhere@gmail.com>2017-11-18 16:49:23 -0800
commita3966446bab8bf849457c24a9a6d05216f950e11 (patch)
tree10ce6f47ba5ef8641aab53052dd69e958df3af3a /src/main.rs
parent799757386fa02339c20eff9f256de0f97b5fa042 (diff)
groundwork for multi-account use
add connection state tracked per-stream, add explicit TwitterProfile mapped to names that can be used for accounts. default names are the handle of the corresponding twitter account. partition out user Credential to be per TwitterProfile, so fav, reply, etc, all come from the selected account. Multiplex twitter connections and message streams across chan (instead of earlier plan, which was to have one chan per thread)
Diffstat (limited to 'src/main.rs')
-rw-r--r--src/main.rs110
1 files changed, 70 insertions, 40 deletions
diff --git a/src/main.rs b/src/main.rs
index bfb60f9..cd15ea9 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -180,6 +180,16 @@ fn inner_signed_api_req(url: &str, method: Method, app_cred: &tw::Credential, ma
req
}
+static mut connection_id: u8 = 0;
+
+fn get_id() -> u8 {
+ unsafe {
+ let curr_id = connection_id;
+ connection_id += 1;
+ curr_id
+ }
+}
+
fn main() {
//Track words
@@ -200,11 +210,13 @@ fn main() {
tweeter.display_info.status("Cache loaded".to_owned());
- let mut maybe_twete_rx = tweeter.profile.clone()
- .map(|user_creds| {
- let rx = connect_twitter_stream(tweeter.app_key.clone(), user_creds);
- tweeter.display_info.status("Twitter stream open".to_owned());
- rx
+ let (tweet_tx, mut twete_rx) = chan::sync::<Vec<u8>>(0);
+ let (coordination_tx, mut coordination_rx) = chan::sync::<TwitterConnectionState>(0);
+
+ tweeter.current_profile()
+ .map(|user_profile| user_profile.to_owned())
+ .map(|user_profile| {
+ connect_twitter_stream(tweeter.app_key.clone(), user_profile.creds, tweet_tx.clone(), coordination_tx.clone(), get_id());
});
std::thread::spawn(move || {
@@ -240,10 +252,11 @@ fn main() {
};
loop {
- match do_ui(ui_rx, maybe_twete_rx, &mut tweeter, &mut queryer) {
- Some((new_ui_rx, new_maybe_twete_rx)) => {
+ match do_ui(ui_rx, twete_rx, coordination_rx, &mut tweeter, &mut queryer) {
+ Some((new_ui_rx, new_twete_rx, new_coordination_rx)) => {
ui_rx = new_ui_rx;
- maybe_twete_rx = new_maybe_twete_rx;
+ twete_rx = new_twete_rx;
+ coordination_rx = new_coordination_rx;
},
None => {
break;
@@ -314,8 +327,15 @@ fn handle_input(event: termion::event::Event, tweeter: &mut tw::TwitterCache, qu
}
Some(display::DisplayMode::Reply(twid, msg)) => {
if x == '\n' {
- // TODO: move this somewhere better.
- ::commands::twete::send_reply(msg, twid, tweeter, queryer);
+ match tweeter.current_profile().map(|profile| profile.to_owned()) {
+ Some(profile) => {
+ // TODO: move this somewhere better.
+ ::commands::twete::send_reply(msg, twid, tweeter, queryer, profile.creds);
+ },
+ None => {
+ tweeter.display_info.status("Cannot reply when not logged in".to_owned());
+ }
+ }
tweeter.display_info.mode = None;
} else {
tweeter.display_info.mode = Some(display::DisplayMode::Reply(twid, format!("{}{}", msg, x)));
@@ -356,35 +376,32 @@ fn handle_twitter_line(line: Vec<u8>, mut tweeter: &mut tw::TwitterCache, mut qu
}
}
-fn do_ui(ui_rx_orig: chan::Receiver<Result<termion::event::Event, std::io::Error>>, maybe_twete_rx: Option<chan::Receiver<Vec<u8>>>, mut tweeter: &mut tw::TwitterCache, mut queryer: &mut ::Queryer) -> Option<(chan::Receiver<Result<termion::event::Event, std::io::Error>>, Option<chan::Receiver<Vec<u8>>>)> {
+fn do_ui(
+ ui_rx_orig: chan::Receiver<Result<termion::event::Event, std::io::Error>>,
+ twete_rx: chan::Receiver<Vec<u8>>,
+ coordination_rx: chan::Receiver<TwitterConnectionState>,
+ mut tweeter: &mut tw::TwitterCache,
+ mut queryer: &mut ::Queryer
+) -> Option<(chan::Receiver<Result<termion::event::Event, std::io::Error>>, chan::Receiver<Vec<u8>>, chan::Receiver<TwitterConnectionState>)> {
loop {
let ui_rx_a = &ui_rx_orig;
let ui_rx_b = &ui_rx_orig;
- match &maybe_twete_rx {
- &Some(ref twete_rx) => {
- chan_select! {
- twete_rx.recv() -> twete => match twete {
- Some(line) => handle_twitter_line(line, tweeter, queryer),
- None => {
- tweeter.display_info.status("Twitter stream hung up...".to_owned());
- return Some((ui_rx_orig.clone(), None))
- }
- },
- ui_rx_a.recv() -> user_input => match user_input {
- Some(Ok(event)) => handle_input(event, tweeter, queryer),
- Some(Err(_)) => (), /* stdin closed? */
- None => return None // UI ded
- }
- }
+ chan_select! {
+ coordination_rx.recv() -> coordination => {
+ tweeter.display_info.status(format!("{:?}", coordination));
},
- &None => {
- chan_select! {
- ui_rx_a.recv() -> user_input => match user_input {
- Some(Ok(event)) => handle_input(event, tweeter, queryer),
- Some(Err(_)) => (), /* stdin closed? */
- None => return None // UI ded
- }
+ twete_rx.recv() -> twete => match twete {
+ Some(line) => handle_twitter_line(line, tweeter, queryer),
+ None => {
+ tweeter.display_info.status("Twitter stream hung up...".to_owned());
+ display::paint(tweeter).unwrap();
+ return None // if the twitter channel died, something real bad happeneed?
}
+ },
+ ui_rx_a.recv() -> user_input => match user_input {
+ Some(Ok(event)) => handle_input(event, tweeter, queryer),
+ Some(Err(_)) => (), /* stdin closed? */
+ None => return None // UI ded
}
}
@@ -424,7 +441,8 @@ fn do_ui(ui_rx_orig: chan::Receiver<Result<termion::event::Event, std::io::Error
}
tw::AppState::Reconnect => {
tweeter.state = tw::AppState::View;
- return Some((ui_rx_orig.clone(), tweeter.profile.clone().map(|creds| connect_twitter_stream(tweeter.app_key.clone(), creds))));
+ // TODO: reconnect *which*?
+ return None // Some((ui_rx_orig.clone(), tweeter.profile.clone().map(|creds| connect_twitter_stream(tweeter.app_key.clone(), creds))));
},
tw::AppState::Shutdown => {
tweeter.display_info.status("Saving cache...".to_owned());
@@ -471,10 +489,22 @@ fn url_encode(s: &str) -> String {
.replace("]", "%5d")
}
-fn connect_twitter_stream(app_cred: tw::Credential, user_cred: tw::Credential) -> chan::Receiver<Vec<u8>> {
- let (twete_tx, twete_rx) = chan::sync::<Vec<u8>>(0);
-
+// let (twete_tx, twete_rx) = chan::sync::<Vec<u8>>(0);
+#[derive(Debug)]
+enum TwitterConnectionState {
+ Connecting(u8),
+ Connected(u8),
+ Closed(u8)
+}
+fn connect_twitter_stream(
+ app_cred: tw::Credential,
+ user_cred: tw::Credential,
+ twete_tx: chan::Sender<Vec<u8>>,
+ coordination_tx: chan::Sender<TwitterConnectionState>,
+ conn_id: u8
+) {
std::thread::spawn(move || {
+ coordination_tx.send(TwitterConnectionState::Connecting(conn_id));
let mut core = Core::new().unwrap();
let connector = HttpsConnector::new(1, &core.handle()).unwrap();
@@ -491,6 +521,7 @@ fn connect_twitter_stream(app_cred: tw::Credential, user_cred: tw::Credential) -
println!("Twitter stream connect was abnormal: {}", status);
println!("result: {:?}", res);
}
+ coordination_tx.send(TwitterConnectionState::Connected(conn_id));
LineStream::new(res.body()
.map(|chunk| futures::stream::iter_ok(chunk.into_iter()))
.flatten())
@@ -507,7 +538,6 @@ fn connect_twitter_stream(app_cred: tw::Credential, user_cred: tw::Credential) -
Ok(_good) => (),
Err(e) => println!("Error in setting up: {}", e)
}
+ coordination_tx.send(TwitterConnectionState::Closed(conn_id));
});
-
- twete_rx
}