2021-08-28 23:15:00 +00:00
|
|
|
use crate::{
|
2021-08-22 06:04:24 +00:00
|
|
|
client::utils,
|
2021-09-19 01:06:57 +00:00
|
|
|
constants::CLIENT_MAILBOX_CAPACITY,
|
2021-08-18 07:34:04 +00:00
|
|
|
data::{Request, Response},
|
2021-09-19 01:06:57 +00:00
|
|
|
net::{Codec, DataStream, Transport, TransportError},
|
2021-08-18 07:34:04 +00:00
|
|
|
};
|
|
|
|
use log::*;
|
|
|
|
use std::{
|
|
|
|
convert,
|
2021-09-14 17:54:45 +00:00
|
|
|
net::SocketAddr,
|
2021-09-19 01:06:57 +00:00
|
|
|
ops::{Deref, DerefMut},
|
|
|
|
sync::{Arc, Weak},
|
2021-08-18 07:34:04 +00:00
|
|
|
};
|
|
|
|
use tokio::{
|
|
|
|
io,
|
|
|
|
net::TcpStream,
|
2021-09-19 01:06:57 +00:00
|
|
|
sync::{mpsc, Mutex},
|
2021-08-18 07:34:04 +00:00
|
|
|
task::{JoinError, JoinHandle},
|
|
|
|
time::Duration,
|
|
|
|
};
|
|
|
|
|
2021-09-19 01:06:57 +00:00
|
|
|
mod ext;
|
2021-09-28 16:42:23 +00:00
|
|
|
pub use ext::{Metadata, SessionChannelExt, SessionChannelExtError};
|
2021-09-19 01:06:57 +00:00
|
|
|
|
2021-08-22 06:04:24 +00:00
|
|
|
mod info;
|
|
|
|
pub use info::{SessionInfo, SessionInfoFile, SessionInfoParseError};
|
|
|
|
|
2021-09-19 01:06:57 +00:00
|
|
|
mod mailbox;
|
|
|
|
pub use mailbox::Mailbox;
|
|
|
|
use mailbox::PostOffice;
|
2021-08-18 07:34:04 +00:00
|
|
|
|
2021-08-22 06:04:24 +00:00
|
|
|
/// Represents a session with a remote server that can be used to send requests & receive responses
|
2021-09-19 01:06:57 +00:00
|
|
|
pub struct Session {
|
|
|
|
/// Used to send requests to a server
|
|
|
|
channel: SessionChannel,
|
2021-08-18 07:34:04 +00:00
|
|
|
|
2021-09-19 01:06:57 +00:00
|
|
|
/// Contains the task that is running to send requests to a server
|
|
|
|
request_task: JoinHandle<()>,
|
2021-08-18 07:34:04 +00:00
|
|
|
|
|
|
|
/// Contains the task that is running to receive responses from a server
|
|
|
|
response_task: JoinHandle<()>,
|
2021-08-28 07:13:30 +00:00
|
|
|
|
2021-09-19 01:06:57 +00:00
|
|
|
/// Contains the task that runs on a timer to prune closed mailboxes
|
|
|
|
prune_task: JoinHandle<()>,
|
2021-08-18 07:34:04 +00:00
|
|
|
}
|
|
|
|
|
2021-09-19 01:06:57 +00:00
|
|
|
impl Session {
|
2021-08-22 06:04:24 +00:00
|
|
|
/// Connect to a remote TCP server using the provided information
|
2021-09-19 01:06:57 +00:00
|
|
|
pub async fn tcp_connect<U>(addr: SocketAddr, codec: U) -> io::Result<Self>
|
|
|
|
where
|
|
|
|
U: Codec + Send + 'static,
|
|
|
|
{
|
2021-09-14 17:54:45 +00:00
|
|
|
let transport = Transport::<TcpStream, U>::connect(addr, codec).await?;
|
2021-08-18 07:34:04 +00:00
|
|
|
debug!(
|
2021-08-22 06:04:24 +00:00
|
|
|
"Session has been established with {}",
|
2021-08-18 07:34:04 +00:00
|
|
|
transport
|
|
|
|
.peer_addr()
|
|
|
|
.map(|x| x.to_string())
|
|
|
|
.unwrap_or_else(|_| String::from("???"))
|
|
|
|
);
|
2021-09-04 04:49:37 +00:00
|
|
|
Self::initialize(transport)
|
2021-08-18 07:34:04 +00:00
|
|
|
}
|
|
|
|
|
2021-08-22 06:04:24 +00:00
|
|
|
/// Connect to a remote TCP server, timing out after duration has passed
|
2021-09-19 01:06:57 +00:00
|
|
|
pub async fn tcp_connect_timeout<U>(
|
2021-09-14 17:54:45 +00:00
|
|
|
addr: SocketAddr,
|
|
|
|
codec: U,
|
|
|
|
duration: Duration,
|
2021-09-19 01:06:57 +00:00
|
|
|
) -> io::Result<Self>
|
|
|
|
where
|
|
|
|
U: Codec + Send + 'static,
|
|
|
|
{
|
2021-09-14 17:54:45 +00:00
|
|
|
utils::timeout(duration, Self::tcp_connect(addr, codec))
|
2021-08-18 07:34:04 +00:00
|
|
|
.await
|
|
|
|
.and_then(convert::identity)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[cfg(unix)]
|
2021-09-19 01:06:57 +00:00
|
|
|
impl Session {
|
2021-08-18 07:34:04 +00:00
|
|
|
/// Connect to a proxy unix socket
|
2021-09-19 01:06:57 +00:00
|
|
|
pub async fn unix_connect<U>(path: impl AsRef<std::path::Path>, codec: U) -> io::Result<Self>
|
|
|
|
where
|
|
|
|
U: Codec + Send + 'static,
|
|
|
|
{
|
2021-09-14 17:54:45 +00:00
|
|
|
let transport = Transport::<tokio::net::UnixStream, U>::connect(path, codec).await?;
|
2021-08-18 07:34:04 +00:00
|
|
|
debug!(
|
2021-08-22 06:04:24 +00:00
|
|
|
"Session has been established with {}",
|
2021-08-18 07:34:04 +00:00
|
|
|
transport
|
|
|
|
.peer_addr()
|
|
|
|
.map(|x| format!("{:?}", x))
|
|
|
|
.unwrap_or_else(|_| String::from("???"))
|
|
|
|
);
|
2021-09-04 04:49:37 +00:00
|
|
|
Self::initialize(transport)
|
2021-08-18 07:34:04 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Connect to a proxy unix socket, timing out after duration has passed
|
2021-09-19 01:06:57 +00:00
|
|
|
pub async fn unix_connect_timeout<U>(
|
2021-08-18 07:34:04 +00:00
|
|
|
path: impl AsRef<std::path::Path>,
|
2021-09-14 17:54:45 +00:00
|
|
|
codec: U,
|
2021-08-18 07:34:04 +00:00
|
|
|
duration: Duration,
|
2021-09-19 01:06:57 +00:00
|
|
|
) -> io::Result<Self>
|
|
|
|
where
|
|
|
|
U: Codec + Send + 'static,
|
|
|
|
{
|
2021-09-14 17:54:45 +00:00
|
|
|
utils::timeout(duration, Self::unix_connect(path, codec))
|
2021-08-18 07:34:04 +00:00
|
|
|
.await
|
|
|
|
.and_then(convert::identity)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-09-19 01:06:57 +00:00
|
|
|
impl Session {
|
2021-08-22 23:58:55 +00:00
|
|
|
/// Initializes a session using the provided transport
|
2021-09-19 01:06:57 +00:00
|
|
|
pub fn initialize<T, U>(transport: Transport<T, U>) -> io::Result<Self>
|
|
|
|
where
|
|
|
|
T: DataStream,
|
|
|
|
U: Codec + Send + 'static,
|
|
|
|
{
|
|
|
|
let (mut t_read, mut t_write) = transport.into_split();
|
|
|
|
let post_office = Arc::new(Mutex::new(PostOffice::new()));
|
|
|
|
let weak_post_office = Arc::downgrade(&post_office);
|
2021-08-18 07:34:04 +00:00
|
|
|
|
2021-09-19 01:06:57 +00:00
|
|
|
// Start a task that continually checks for responses and delivers them using the
|
|
|
|
// post office
|
2021-08-18 07:34:04 +00:00
|
|
|
let response_task = tokio::spawn(async move {
|
|
|
|
loop {
|
|
|
|
match t_read.receive::<Response>().await {
|
|
|
|
Ok(Some(res)) => {
|
2021-08-22 06:04:24 +00:00
|
|
|
trace!("Incoming response: {:?}", res);
|
2021-09-19 01:06:57 +00:00
|
|
|
let res_id = res.id;
|
|
|
|
let res_origin_id = res.origin_id;
|
|
|
|
|
|
|
|
// Try to send response to appropriate mailbox
|
|
|
|
// NOTE: We don't log failures as errors as using fire(...) for a
|
|
|
|
// session is valid and would not have a mailbox
|
|
|
|
if !post_office.lock().await.deliver(res).await {
|
|
|
|
trace!(
|
|
|
|
"Response {} has no mailbox for origin {}",
|
|
|
|
res_id,
|
|
|
|
res_origin_id
|
|
|
|
);
|
2021-08-18 07:34:04 +00:00
|
|
|
}
|
|
|
|
}
|
2021-08-31 03:46:51 +00:00
|
|
|
Ok(None) => {
|
|
|
|
debug!("Session closing response task as transport read-half closed!");
|
|
|
|
break;
|
|
|
|
}
|
2021-08-18 07:34:04 +00:00
|
|
|
Err(x) => {
|
|
|
|
error!("{}", x);
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2021-09-19 01:06:57 +00:00
|
|
|
|
|
|
|
// Clean up remaining mailbox senders
|
|
|
|
post_office.lock().await.clear_mailboxes();
|
|
|
|
});
|
|
|
|
|
|
|
|
let (tx, mut rx) = mpsc::channel::<Request>(1);
|
|
|
|
let request_task = tokio::spawn(async move {
|
|
|
|
while let Some(req) = rx.recv().await {
|
|
|
|
if let Err(x) = t_write.send(req).await {
|
|
|
|
error!("Failed to send request to server: {}", x);
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
2021-08-18 07:34:04 +00:00
|
|
|
});
|
|
|
|
|
2021-09-19 01:06:57 +00:00
|
|
|
// Create a task that runs once a minute and prunes mailboxes
|
|
|
|
let post_office = Weak::clone(&weak_post_office);
|
|
|
|
let prune_task = tokio::spawn(async move {
|
|
|
|
loop {
|
|
|
|
tokio::time::sleep(Duration::from_secs(60)).await;
|
|
|
|
if let Some(post_office) = Weak::upgrade(&post_office) {
|
|
|
|
post_office.lock().await.prune_mailboxes();
|
|
|
|
} else {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
});
|
|
|
|
|
|
|
|
let channel = SessionChannel {
|
|
|
|
tx,
|
|
|
|
post_office: weak_post_office,
|
|
|
|
};
|
|
|
|
|
2021-08-18 07:34:04 +00:00
|
|
|
Ok(Self {
|
2021-09-19 01:06:57 +00:00
|
|
|
channel,
|
|
|
|
request_task,
|
2021-08-18 07:34:04 +00:00
|
|
|
response_task,
|
2021-09-19 01:06:57 +00:00
|
|
|
prune_task,
|
2021-08-18 07:34:04 +00:00
|
|
|
})
|
|
|
|
}
|
2021-09-14 17:54:45 +00:00
|
|
|
}
|
2021-08-18 07:34:04 +00:00
|
|
|
|
2021-09-19 01:06:57 +00:00
|
|
|
impl Session {
|
2021-08-22 06:04:24 +00:00
|
|
|
/// Waits for the session to terminate, which results when the receiving end of the network
|
|
|
|
/// connection is closed (or the session is shutdown)
|
2021-08-18 07:34:04 +00:00
|
|
|
pub async fn wait(self) -> Result<(), JoinError> {
|
2021-09-19 01:06:57 +00:00
|
|
|
self.prune_task.abort();
|
|
|
|
tokio::try_join!(self.request_task, self.response_task).map(|_| ())
|
2021-08-18 07:34:04 +00:00
|
|
|
}
|
|
|
|
|
2021-08-22 06:04:24 +00:00
|
|
|
/// Abort the session's current connection by forcing its response task to shutdown
|
2021-08-18 07:34:04 +00:00
|
|
|
pub fn abort(&self) {
|
2021-09-19 01:06:57 +00:00
|
|
|
self.request_task.abort();
|
|
|
|
self.response_task.abort();
|
|
|
|
self.prune_task.abort();
|
2021-08-18 07:34:04 +00:00
|
|
|
}
|
|
|
|
|
2021-09-19 01:06:57 +00:00
|
|
|
/// Clones the underlying channel for requests and returns the cloned instance
|
|
|
|
pub fn clone_channel(&self) -> SessionChannel {
|
|
|
|
self.channel.clone()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Deref for Session {
|
|
|
|
type Target = SessionChannel;
|
|
|
|
|
|
|
|
fn deref(&self) -> &Self::Target {
|
|
|
|
&self.channel
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl DerefMut for Session {
|
|
|
|
fn deref_mut(&mut self) -> &mut Self::Target {
|
|
|
|
&mut self.channel
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl From<Session> for SessionChannel {
|
|
|
|
fn from(session: Session) -> Self {
|
|
|
|
session.channel
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Represents a sender of requests tied to a session, holding onto a weak reference of
|
|
|
|
/// mailboxes to relay responses, meaning that once the [`Session`] is closed or dropped,
|
|
|
|
/// any sent request will no longer be able to receive responses
|
|
|
|
#[derive(Clone)]
|
|
|
|
pub struct SessionChannel {
|
|
|
|
/// Used to send requests to a server
|
|
|
|
tx: mpsc::Sender<Request>,
|
|
|
|
|
|
|
|
/// Collection of mailboxes for receiving responses to requests
|
|
|
|
post_office: Weak<Mutex<PostOffice>>,
|
|
|
|
}
|
2021-08-30 06:01:08 +00:00
|
|
|
|
2021-09-19 01:06:57 +00:00
|
|
|
impl SessionChannel {
|
|
|
|
/// Returns true if no more requests can be transferred
|
|
|
|
pub fn is_closed(&self) -> bool {
|
|
|
|
self.tx.is_closed()
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Sends a request and returns a mailbox that can receive one or more responses, failing if
|
|
|
|
/// unable to send a request or if the session's receiving line to the remote server has
|
|
|
|
/// already been severed
|
|
|
|
pub async fn mail(&mut self, req: Request) -> Result<Mailbox, TransportError> {
|
|
|
|
trace!("Mailing request: {:?}", req);
|
|
|
|
|
|
|
|
// First, create a mailbox using the request's id
|
|
|
|
let mailbox = Weak::upgrade(&self.post_office)
|
|
|
|
.ok_or_else(|| {
|
|
|
|
TransportError::IoError(io::Error::new(
|
|
|
|
io::ErrorKind::NotConnected,
|
|
|
|
"Session's post office is no longer available",
|
|
|
|
))
|
|
|
|
})?
|
|
|
|
.lock()
|
|
|
|
.await
|
|
|
|
.make_mailbox(req.id, CLIENT_MAILBOX_CAPACITY);
|
2021-08-18 07:34:04 +00:00
|
|
|
|
|
|
|
// Second, send the request
|
2021-09-19 01:06:57 +00:00
|
|
|
self.fire(req).await?;
|
2021-08-18 07:34:04 +00:00
|
|
|
|
2021-09-19 01:06:57 +00:00
|
|
|
// Third, return mailbox
|
|
|
|
Ok(mailbox)
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Sends a request and waits for a response, failing if unable to send a request or if
|
|
|
|
/// the session's receiving line to the remote server has already been severed
|
|
|
|
pub async fn send(&mut self, req: Request) -> Result<Response, TransportError> {
|
|
|
|
trace!("Sending request: {:?}", req);
|
|
|
|
|
|
|
|
// Send mail and get back a mailbox
|
|
|
|
let mut mailbox = self.mail(req).await?;
|
|
|
|
|
|
|
|
// Wait for first response, and then drop the mailbox
|
|
|
|
mailbox.next().await.ok_or_else(|| {
|
|
|
|
TransportError::IoError(io::Error::from(io::ErrorKind::ConnectionAborted))
|
|
|
|
})
|
2021-08-18 07:34:04 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Sends a request and waits for a response, timing out after duration has passed
|
|
|
|
pub async fn send_timeout(
|
|
|
|
&mut self,
|
|
|
|
req: Request,
|
|
|
|
duration: Duration,
|
|
|
|
) -> Result<Response, TransportError> {
|
|
|
|
utils::timeout(duration, self.send(req))
|
|
|
|
.await
|
|
|
|
.map_err(TransportError::from)
|
|
|
|
.and_then(convert::identity)
|
|
|
|
}
|
|
|
|
|
2021-09-19 01:06:57 +00:00
|
|
|
/// Sends a request without waiting for a response; this method is able to be used even
|
|
|
|
/// if the session's receiving line to the remote server has been severed
|
2021-08-18 07:34:04 +00:00
|
|
|
pub async fn fire(&mut self, req: Request) -> Result<(), TransportError> {
|
2021-08-30 06:01:08 +00:00
|
|
|
trace!("Firing off request: {:?}", req);
|
2021-09-19 01:06:57 +00:00
|
|
|
self.tx
|
|
|
|
.send(req)
|
|
|
|
.await
|
|
|
|
.map_err(|x| TransportError::IoError(io::Error::new(io::ErrorKind::BrokenPipe, x)))
|
2021-08-18 07:34:04 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Sends a request without waiting for a response, timing out after duration has passed
|
|
|
|
pub async fn fire_timeout(
|
|
|
|
&mut self,
|
|
|
|
req: Request,
|
|
|
|
duration: Duration,
|
|
|
|
) -> Result<(), TransportError> {
|
|
|
|
utils::timeout(duration, self.fire(req))
|
|
|
|
.await
|
|
|
|
.map_err(TransportError::from)
|
|
|
|
.and_then(convert::identity)
|
|
|
|
}
|
|
|
|
}
|
2021-08-18 08:57:42 +00:00
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
mod tests {
|
|
|
|
use super::*;
|
2021-08-28 23:15:00 +00:00
|
|
|
use crate::{
|
2021-08-18 22:09:00 +00:00
|
|
|
constants::test::TENANT,
|
2021-08-18 08:57:42 +00:00
|
|
|
data::{RequestData, ResponseData},
|
|
|
|
};
|
|
|
|
use std::time::Duration;
|
|
|
|
|
2021-09-19 01:06:57 +00:00
|
|
|
#[tokio::test]
|
|
|
|
async fn mail_should_return_mailbox_that_receives_responses_until_transport_closes() {
|
|
|
|
let (t1, mut t2) = Transport::make_pair();
|
|
|
|
let mut session = Session::initialize(t1).unwrap();
|
|
|
|
|
|
|
|
let req = Request::new(TENANT, vec![RequestData::ProcList {}]);
|
|
|
|
let res = Response::new(TENANT, req.id, vec![ResponseData::Ok]);
|
|
|
|
|
|
|
|
let mut mailbox = session.mail(req).await.unwrap();
|
|
|
|
|
|
|
|
// Get first response
|
|
|
|
match tokio::join!(mailbox.next(), t2.send(res.clone())) {
|
|
|
|
(Some(actual), _) => assert_eq!(actual, res),
|
|
|
|
x => panic!("Unexpected response: {:?}", x),
|
|
|
|
}
|
|
|
|
|
|
|
|
// Get second response
|
|
|
|
match tokio::join!(mailbox.next(), t2.send(res.clone())) {
|
|
|
|
(Some(actual), _) => assert_eq!(actual, res),
|
|
|
|
x => panic!("Unexpected response: {:?}", x),
|
|
|
|
}
|
|
|
|
|
|
|
|
// Trigger the mailbox to wait BEFORE closing our transport to ensure that
|
|
|
|
// we don't get stuck if the mailbox was already waiting
|
|
|
|
let next_task = tokio::spawn(async move { mailbox.next().await });
|
|
|
|
tokio::task::yield_now().await;
|
|
|
|
|
|
|
|
drop(t2);
|
|
|
|
match next_task.await {
|
|
|
|
Ok(None) => {}
|
|
|
|
x => panic!("Unexpected response: {:?}", x),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-08-18 08:57:42 +00:00
|
|
|
#[tokio::test]
|
|
|
|
async fn send_should_wait_until_response_received() {
|
2021-08-22 23:58:55 +00:00
|
|
|
let (t1, mut t2) = Transport::make_pair();
|
2021-09-04 04:49:37 +00:00
|
|
|
let mut session = Session::initialize(t1).unwrap();
|
2021-08-18 08:57:42 +00:00
|
|
|
|
2021-08-18 22:09:00 +00:00
|
|
|
let req = Request::new(TENANT, vec![RequestData::ProcList {}]);
|
2021-08-18 08:57:42 +00:00
|
|
|
let res = Response::new(
|
2021-08-18 22:09:00 +00:00
|
|
|
TENANT,
|
2021-09-19 01:06:57 +00:00
|
|
|
req.id,
|
2021-08-18 08:57:42 +00:00
|
|
|
vec![ResponseData::ProcEntries {
|
|
|
|
entries: Vec::new(),
|
|
|
|
}],
|
|
|
|
);
|
|
|
|
|
2021-08-22 06:04:24 +00:00
|
|
|
let (actual, _) = tokio::join!(session.send(req), t2.send(res.clone()));
|
2021-08-18 08:57:42 +00:00
|
|
|
match actual {
|
|
|
|
Ok(actual) => assert_eq!(actual, res),
|
|
|
|
x => panic!("Unexpected response: {:?}", x),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
async fn send_timeout_should_fail_if_response_not_received_in_time() {
|
2021-08-22 23:58:55 +00:00
|
|
|
let (t1, mut t2) = Transport::make_pair();
|
2021-09-04 04:49:37 +00:00
|
|
|
let mut session = Session::initialize(t1).unwrap();
|
2021-08-18 08:57:42 +00:00
|
|
|
|
2021-08-18 22:09:00 +00:00
|
|
|
let req = Request::new(TENANT, vec![RequestData::ProcList {}]);
|
2021-08-22 06:04:24 +00:00
|
|
|
match session.send_timeout(req, Duration::from_millis(30)).await {
|
2021-08-18 08:57:42 +00:00
|
|
|
Err(TransportError::IoError(x)) => assert_eq!(x.kind(), io::ErrorKind::TimedOut),
|
|
|
|
x => panic!("Unexpected response: {:?}", x),
|
|
|
|
}
|
|
|
|
|
|
|
|
let req = t2.receive::<Request>().await.unwrap().unwrap();
|
2021-08-18 22:09:00 +00:00
|
|
|
assert_eq!(req.tenant, TENANT);
|
2021-08-18 08:57:42 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
async fn fire_should_send_request_and_not_wait_for_response() {
|
2021-08-22 23:58:55 +00:00
|
|
|
let (t1, mut t2) = Transport::make_pair();
|
2021-09-04 04:49:37 +00:00
|
|
|
let mut session = Session::initialize(t1).unwrap();
|
2021-08-18 08:57:42 +00:00
|
|
|
|
2021-08-18 22:09:00 +00:00
|
|
|
let req = Request::new(TENANT, vec![RequestData::ProcList {}]);
|
2021-08-22 06:04:24 +00:00
|
|
|
match session.fire(req).await {
|
2021-08-18 08:57:42 +00:00
|
|
|
Ok(_) => {}
|
|
|
|
x => panic!("Unexpected response: {:?}", x),
|
|
|
|
}
|
|
|
|
|
|
|
|
let req = t2.receive::<Request>().await.unwrap().unwrap();
|
2021-08-18 22:09:00 +00:00
|
|
|
assert_eq!(req.tenant, TENANT);
|
2021-08-18 08:57:42 +00:00
|
|
|
}
|
|
|
|
}
|