|
|
|
@ -1,6 +1,4 @@
|
|
|
|
|
use crate::common::{
|
|
|
|
|
FramedTransport, Interest, Reconnectable, Request, Transport, UntypedResponse,
|
|
|
|
|
};
|
|
|
|
|
use crate::common::{Connection, Interest, Reconnectable, Request, Transport, UntypedResponse};
|
|
|
|
|
use async_trait::async_trait;
|
|
|
|
|
use log::*;
|
|
|
|
|
use serde::{de::DeserializeOwned, Serialize};
|
|
|
|
@ -44,13 +42,8 @@ where
|
|
|
|
|
T: Send + Sync + Serialize + 'static,
|
|
|
|
|
U: Send + Sync + DeserializeOwned + 'static,
|
|
|
|
|
{
|
|
|
|
|
/// Creates a client using the provided [`FramedTransport`].
|
|
|
|
|
///
|
|
|
|
|
/// ### Note
|
|
|
|
|
///
|
|
|
|
|
/// It is assumed that the provided transport has performed any necessary handshake and is
|
|
|
|
|
/// fully authenticated.
|
|
|
|
|
pub fn new<V>(mut transport: FramedTransport<V>) -> Self
|
|
|
|
|
/// Spawns a client using the provided [`Connection`].
|
|
|
|
|
fn spawn<V>(mut connection: Connection<V>) -> Self
|
|
|
|
|
where
|
|
|
|
|
V: Transport + Send + Sync + 'static,
|
|
|
|
|
{
|
|
|
|
@ -61,7 +54,7 @@ where
|
|
|
|
|
let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1);
|
|
|
|
|
|
|
|
|
|
// Ensure that our transport starts off clean (nothing in buffers or backup)
|
|
|
|
|
transport.clear();
|
|
|
|
|
connection.clear();
|
|
|
|
|
|
|
|
|
|
// Start a task that continually checks for responses and delivers them using the
|
|
|
|
|
// post office
|
|
|
|
@ -75,7 +68,7 @@ where
|
|
|
|
|
cb = reconnect_rx.recv() => {
|
|
|
|
|
debug!("Client got reconnect signal, so attempting to reconnect");
|
|
|
|
|
if let Some(cb) = cb {
|
|
|
|
|
let _ = match Reconnectable::reconnect(&mut transport).await {
|
|
|
|
|
let _ = match Reconnectable::reconnect(&mut connection).await {
|
|
|
|
|
Ok(()) => cb.send(Ok(())),
|
|
|
|
|
Err(x) => {
|
|
|
|
|
error!("Client reconnect failed: {x}");
|
|
|
|
@ -88,7 +81,7 @@ where
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
result = transport.ready(Interest::READABLE | Interest::WRITABLE) => {
|
|
|
|
|
result = connection.ready(Interest::READABLE | Interest::WRITABLE) => {
|
|
|
|
|
match result {
|
|
|
|
|
Ok(result) => result,
|
|
|
|
|
Err(x) => {
|
|
|
|
@ -103,7 +96,7 @@ where
|
|
|
|
|
let mut write_blocked = !ready.is_writable();
|
|
|
|
|
|
|
|
|
|
if ready.is_readable() {
|
|
|
|
|
match transport.try_read_frame() {
|
|
|
|
|
match connection.try_read_frame() {
|
|
|
|
|
Ok(Some(frame)) => {
|
|
|
|
|
match UntypedResponse::from_slice(frame.as_item()) {
|
|
|
|
|
Ok(response) => {
|
|
|
|
@ -148,7 +141,7 @@ where
|
|
|
|
|
// outgoing bytes that weren't sent earlier.
|
|
|
|
|
if let Ok(request) = rx.try_recv() {
|
|
|
|
|
match request.to_vec() {
|
|
|
|
|
Ok(data) => match transport.try_write_frame(data) {
|
|
|
|
|
Ok(data) => match connection.try_write_frame(data) {
|
|
|
|
|
Ok(()) => (),
|
|
|
|
|
Err(x) if x.kind() == io::ErrorKind::WouldBlock => {
|
|
|
|
|
write_blocked = true
|
|
|
|
@ -166,7 +159,7 @@ where
|
|
|
|
|
// 1. When flush did not write any bytes, which can happen when the buffer
|
|
|
|
|
// is empty
|
|
|
|
|
// 2. When the call to write bytes blocks
|
|
|
|
|
match transport.try_flush() {
|
|
|
|
|
match connection.try_flush() {
|
|
|
|
|
Ok(0) => write_blocked = true,
|
|
|
|
|
Ok(_) => (),
|
|
|
|
|
Err(x) if x.kind() == io::ErrorKind::WouldBlock => write_blocked = true,
|
|
|
|
@ -199,6 +192,11 @@ where
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl Client<(), ()> {
|
|
|
|
|
/// Creates a new [`ClientBuilder`]
|
|
|
|
|
pub fn build() -> ClientBuilder<(), ()> {
|
|
|
|
|
ClientBuilder::new()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Creates a new [`TcpClientBuilder`]
|
|
|
|
|
pub fn tcp() -> TcpClientBuilder<()> {
|
|
|
|
|
TcpClientBuilder::new()
|
|
|
|
|