Implement the reidentify command

v5-api
Dominik Nakamura 2 years ago
parent 5c1b03029b
commit a536ee7f34
No known key found for this signature in database
GPG Key ID: E4C6A749B2491910

@ -3,7 +3,7 @@
#[cfg(feature = "events")]
use std::sync::Weak;
use std::{
collections::HashMap,
collections::{HashMap, VecDeque},
future::Future,
sync::{
atomic::{AtomicU64, Ordering},
@ -36,7 +36,7 @@ pub use self::{
#[cfg(feature = "events")]
use crate::events::Event;
use crate::{
requests::{ClientRequest, EventSubscription, Identify, Request, RequestType},
requests::{ClientRequest, EventSubscription, Identify, Reidentify, Request, RequestType},
responses::{Hello, Identified, RequestResponse, ServerMessage, Status},
Error, Result,
};
@ -80,6 +80,10 @@ pub struct Client {
/// of a request ID and the value is a oneshot sender that allows to send the response back to
/// the other end that waits for the response.
receivers: Arc<Mutex<ReceiverList>>,
/// A list of awaiting [`reidentify`](Self::reidentify) requests, waiting for confirmation. As
/// these requests don't carry any kind of ID, they're handled sequentially and must be tracked
/// separate from normal requests.
reidentify_receivers: Arc<ReidentifyReceiverList>,
/// Broadcast sender that distributes received events to all current listeners. Events are
/// dropped if nobody listens.
#[cfg(feature = "events")]
@ -96,6 +100,26 @@ type MessageWriter = SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Messa
/// Shorthand for the list of ongoing requests that wait for a response.
type ReceiverList = HashMap<u64, oneshot::Sender<(Status, serde_json::Value)>>;
/// Wrapper around a thread-safe queue to park and notify re-identify listener.
#[derive(Default)]
struct ReidentifyReceiverList(Mutex<VecDeque<oneshot::Sender<Identified>>>);
impl ReidentifyReceiverList {
/// Add a new receiver to the wait list, returning a channel to await the result on.
async fn add(&self) -> oneshot::Receiver<Identified> {
let (tx, rx) = oneshot::channel();
self.0.lock().await.push_back(tx);
rx
}
/// Notify the next listener in the queue, transfering it the response.
async fn notify(&self, identified: Identified) {
if let Some(tx) = self.0.lock().await.pop_front() {
tx.send(identified).ok();
}
}
}
/// Default broadcast capacity used when not overwritten by the user.
#[cfg(feature = "events")]
const DEFAULT_CAPACITY: usize = 100;
@ -193,8 +217,13 @@ impl Client {
.map_err(Error::Connect)?;
let (mut write, mut read) = socket.split();
let receivers = Arc::new(Mutex::new(HashMap::<_, oneshot::Sender<_>>::new()));
let receivers2 = Arc::clone(&receivers);
let reidentify_receivers = Arc::new(ReidentifyReceiverList::default());
let reidentify_receivers2 = Arc::clone(&reidentify_receivers);
#[cfg(feature = "events")]
let (event_sender, _) =
broadcast::channel(config.broadcast_capacity.unwrap_or(DEFAULT_CAPACITY));
@ -247,6 +276,9 @@ impl Client {
ServerMessage::Event(event) => {
events_tx.send(event).ok();
}
ServerMessage::Identified(identified) => {
reidentify_receivers2.notify(identified).await;
}
_ => return Err(InnerError::UnexpectedMessage(message)),
}
@ -274,6 +306,7 @@ impl Client {
write,
id_counter,
receivers,
reidentify_receivers,
#[cfg(feature = "events")]
event_sender: Arc::downgrade(&event_sender),
handle: Some(handle),
@ -374,9 +407,28 @@ impl Client {
///
/// This currently allows to change the events to listen for, without the need of a full
/// disconnect and new connection.
#[doc(hidden)]
pub async fn reidentify(&self) -> Result<()> {
todo!("The `Reidentify` command is not yet implemented")
pub async fn reidentify(&self, event_subscriptions: EventSubscription) -> Result<()> {
let json = serde_json::to_string(&ClientRequest::Reidentify(Reidentify {
event_subscriptions: Some(event_subscriptions),
}))
.map_err(Error::SerializeMessage)?;
let rx = self.reidentify_receivers.add().await;
self.write
.lock()
.await
.send(Message::Text(json))
.await
.map_err(Error::Send)?;
let resp = rx.await.map_err(Error::ReceiveMessage)?;
debug!(
"re-identified with RPC version {}",
resp.negotiated_rpc_version
);
Ok(())
}
/// Get a stream of events. Each call to this function creates a new listener, therefore it's

Loading…
Cancel
Save