|
|
|
@ -1,4 +1,5 @@
|
|
|
|
|
use std::collections::HashMap;
|
|
|
|
|
use std::fmt;
|
|
|
|
|
use std::io;
|
|
|
|
|
|
|
|
|
|
use log::*;
|
|
|
|
@ -135,6 +136,17 @@ enum Action {
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl fmt::Debug for Action {
|
|
|
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
|
|
|
match self {
|
|
|
|
|
Self::Register { id, .. } => write!(f, "Action::Register {{ id: {id}, .. }}"),
|
|
|
|
|
Self::Unregister { id } => write!(f, "Action::Unregister {{ id: {id} }}"),
|
|
|
|
|
Self::Read { .. } => write!(f, "Action::Read {{ .. }}"),
|
|
|
|
|
Self::Write { id, .. } => write!(f, "Action::Write {{ id: {id}, .. }}"),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Internal task to process outgoing [`UntypedRequest`]s.
|
|
|
|
|
async fn request_task(
|
|
|
|
|
id: ConnectionId,
|
|
|
|
@ -142,10 +154,13 @@ async fn request_task(
|
|
|
|
|
mut rx: mpsc::UnboundedReceiver<UntypedRequest<'static>>,
|
|
|
|
|
) {
|
|
|
|
|
while let Some(req) = rx.recv().await {
|
|
|
|
|
trace!("[Conn {id}] Firing off request {}", req.id);
|
|
|
|
|
if let Err(x) = client.fire(req).await {
|
|
|
|
|
error!("[Conn {id}] Failed to send request: {x}");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
trace!("[Conn {id}] Manager request task closed");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Internal task to process incoming [`UntypedResponse`]s.
|
|
|
|
@ -155,10 +170,17 @@ async fn response_task(
|
|
|
|
|
tx: mpsc::UnboundedSender<Action>,
|
|
|
|
|
) {
|
|
|
|
|
while let Some(res) = mailbox.next().await {
|
|
|
|
|
trace!(
|
|
|
|
|
"[Conn {id}] Receiving response {} to request {}",
|
|
|
|
|
res.id,
|
|
|
|
|
res.origin_id
|
|
|
|
|
);
|
|
|
|
|
if let Err(x) = tx.send(Action::Read { res }) {
|
|
|
|
|
error!("[Conn {id}] Failed to forward received response: {x}");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
trace!("[Conn {id}] Manager response task closed");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Internal task to process [`Action`] items.
|
|
|
|
@ -174,6 +196,8 @@ async fn action_task(
|
|
|
|
|
let mut registered = HashMap::new();
|
|
|
|
|
|
|
|
|
|
while let Some(action) = rx.recv().await {
|
|
|
|
|
trace!("[Conn {id}] {action:?}");
|
|
|
|
|
|
|
|
|
|
match action {
|
|
|
|
|
Action::Register { id, reply } => {
|
|
|
|
|
registered.insert(id, reply);
|
|
|
|
@ -201,9 +225,20 @@ async fn action_task(
|
|
|
|
|
id: channel_id,
|
|
|
|
|
response: res,
|
|
|
|
|
};
|
|
|
|
|
if let Err(x) = reply.send(response).await {
|
|
|
|
|
error!("[Conn {id}] {x}");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// TODO: This seems to get stuck at times with some change recently,
|
|
|
|
|
// so we kick this off in a new task instead. The better solution
|
|
|
|
|
// is to switch most of our mpsc usage to be unbounded so we
|
|
|
|
|
// don't need an async call. The only bounded ones should be those
|
|
|
|
|
// externally facing to the API user, if even that.
|
|
|
|
|
//
|
|
|
|
|
// https://github.com/chipsenkbeil/distant/issues/205
|
|
|
|
|
let reply = reply.clone();
|
|
|
|
|
tokio::spawn(async move {
|
|
|
|
|
if let Err(x) = reply.send(response).await {
|
|
|
|
|
error!("[Conn {id}] {x}");
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
Action::Write { id, mut req } => {
|
|
|
|
@ -217,4 +252,6 @@ async fn action_task(
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
trace!("[Conn {id}] Manager action task closed");
|
|
|
|
|
}
|
|
|
|
|