|
|
|
@ -1,7 +1,7 @@
|
|
|
|
|
use crate::{
|
|
|
|
|
utils::Timer, ConnectionId, FramedTransport, GenericServerRef, Interest, Listener, Request,
|
|
|
|
|
Response, Server, ServerConnection, ServerCtx, ServerRef, ServerReply, ServerState, Shutdown,
|
|
|
|
|
Transport, UntypedRequest,
|
|
|
|
|
utils::Timer, ConnectionId, FramedTransport, GenericServerRef, Interest, Listener, Response,
|
|
|
|
|
Server, ServerConnection, ServerCtx, ServerRef, ServerReply, ServerState, Shutdown, Transport,
|
|
|
|
|
UntypedRequest,
|
|
|
|
|
};
|
|
|
|
|
use log::*;
|
|
|
|
|
use serde::{de::DeserializeOwned, Serialize};
|
|
|
|
@ -37,10 +37,10 @@ pub trait ServerExt {
|
|
|
|
|
type Response;
|
|
|
|
|
|
|
|
|
|
/// Start a new server using the provided listener
|
|
|
|
|
fn start<L, T>(self, listener: L) -> io::Result<Box<dyn ServerRef>>
|
|
|
|
|
fn start<L>(self, listener: L) -> io::Result<Box<dyn ServerRef>>
|
|
|
|
|
where
|
|
|
|
|
L: Listener<Output = T> + 'static,
|
|
|
|
|
T: Transport + Send + 'static;
|
|
|
|
|
L: Listener + 'static,
|
|
|
|
|
L::Output: Transport + Send + 'static;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl<S> ServerExt for S
|
|
|
|
@ -53,10 +53,10 @@ where
|
|
|
|
|
type Request = S::Request;
|
|
|
|
|
type Response = S::Response;
|
|
|
|
|
|
|
|
|
|
fn start<L, T>(self, listener: L) -> io::Result<Box<dyn ServerRef>>
|
|
|
|
|
fn start<L>(self, listener: L) -> io::Result<Box<dyn ServerRef>>
|
|
|
|
|
where
|
|
|
|
|
L: Listener<Output = T> + 'static,
|
|
|
|
|
T: Transport + Send + 'static,
|
|
|
|
|
L: Listener + 'static,
|
|
|
|
|
L::Output: Transport + Send + 'static,
|
|
|
|
|
{
|
|
|
|
|
let server = Arc::new(self);
|
|
|
|
|
let state = Arc::new(ServerState::new());
|
|
|
|
@ -67,14 +67,14 @@ where
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn task<S, L, T>(server: Arc<S>, state: Arc<ServerState>, mut listener: L)
|
|
|
|
|
async fn task<S, L>(server: Arc<S>, state: Arc<ServerState>, mut listener: L)
|
|
|
|
|
where
|
|
|
|
|
S: Server + Sync + 'static,
|
|
|
|
|
S::Request: DeserializeOwned + Send + Sync + 'static,
|
|
|
|
|
S::Response: Serialize + Send + 'static,
|
|
|
|
|
S::LocalData: Default + Send + Sync + 'static,
|
|
|
|
|
L: Listener<Output = T> + 'static,
|
|
|
|
|
T: Transport + Send + 'static,
|
|
|
|
|
L: Listener + 'static,
|
|
|
|
|
L::Output: Transport + Send + 'static,
|
|
|
|
|
{
|
|
|
|
|
// Grab a copy of our server's configuration so we can leverage it below
|
|
|
|
|
let config = server.config();
|
|
|
|
@ -117,7 +117,7 @@ where
|
|
|
|
|
|
|
|
|
|
// Receive a new connection, exiting if no longer accepting connections or if the shutdown
|
|
|
|
|
// signal has been received
|
|
|
|
|
let mut transport = tokio::select! {
|
|
|
|
|
let transport = tokio::select! {
|
|
|
|
|
result = listener.accept() => {
|
|
|
|
|
match result {
|
|
|
|
|
Ok(x) => x,
|
|
|
|
@ -220,33 +220,35 @@ where
|
|
|
|
|
if ready.is_readable() {
|
|
|
|
|
match transport.try_read_frame() {
|
|
|
|
|
Ok(Some(frame)) => match UntypedRequest::from_slice(frame.as_item()) {
|
|
|
|
|
Ok(request) => match request.to_typed_request() {
|
|
|
|
|
Ok(request) => {
|
|
|
|
|
let reply = ServerReply {
|
|
|
|
|
origin_id: request.id.clone(),
|
|
|
|
|
tx: tx.clone(),
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let ctx = ServerCtx {
|
|
|
|
|
connection_id,
|
|
|
|
|
request,
|
|
|
|
|
reply: reply.clone(),
|
|
|
|
|
local_data: Arc::clone(&self.local_data),
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
self.server.on_request(ctx).await;
|
|
|
|
|
Ok(request) => {
|
|
|
|
|
if log::log_enabled!(Level::Trace) {
|
|
|
|
|
trace!(
|
|
|
|
|
"[Conn {connection_id}] Receiving {}",
|
|
|
|
|
String::from_utf8_lossy(&request.payload),
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
Err(x) => {
|
|
|
|
|
if log::log_enabled!(Level::Trace) {
|
|
|
|
|
trace!(
|
|
|
|
|
"[Conn {connection_id}] Request payload: {}",
|
|
|
|
|
String::from_utf8_lossy(&request.payload),
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
error!("[Conn {connection_id}] Invalid request: {x}");
|
|
|
|
|
match request.to_typed_request() {
|
|
|
|
|
Ok(request) => {
|
|
|
|
|
let reply = ServerReply {
|
|
|
|
|
origin_id: request.id.clone(),
|
|
|
|
|
tx: tx.clone(),
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let ctx = ServerCtx {
|
|
|
|
|
connection_id,
|
|
|
|
|
request,
|
|
|
|
|
reply: reply.clone(),
|
|
|
|
|
local_data: Arc::clone(&self.local_data),
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
self.server.on_request(ctx).await;
|
|
|
|
|
}
|
|
|
|
|
Err(x) => {
|
|
|
|
|
error!("[Conn {connection_id}] Invalid request: {x}");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
Err(x) => {
|
|
|
|
|
error!("[Conn {connection_id}] Invalid request: {x}");
|
|
|
|
|
}
|
|
|
|
@ -282,22 +284,30 @@ where
|
|
|
|
|
// the queue and process it
|
|
|
|
|
if ready.is_writable() {
|
|
|
|
|
match rx.try_recv() {
|
|
|
|
|
Ok(data) => {
|
|
|
|
|
Ok(response) => {
|
|
|
|
|
// Log our message as a string, which can be expensive
|
|
|
|
|
if log_enabled!(Level::Trace) {
|
|
|
|
|
trace!(
|
|
|
|
|
"[Conn {connection_id}] Sending {}",
|
|
|
|
|
&data
|
|
|
|
|
&response
|
|
|
|
|
.to_vec()
|
|
|
|
|
.map(|x| String::from_utf8_lossy(&x).to_string())
|
|
|
|
|
.unwrap_or_else(|_| "<Cannot serialize>".to_string())
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
match self.transport.try_write(data) {
|
|
|
|
|
Ok(()) => continue,
|
|
|
|
|
Err(x) if x.kind() == io::ErrorKind::WouldBlock => continue,
|
|
|
|
|
Err(x) => error!("[Conn {connection_id}] Send failed: {x}"),
|
|
|
|
|
match response.to_vec() {
|
|
|
|
|
Ok(data) => match transport.try_write_frame(data) {
|
|
|
|
|
Ok(()) => continue,
|
|
|
|
|
Err(x) if x.kind() == io::ErrorKind::WouldBlock => continue,
|
|
|
|
|
Err(x) => error!("[Conn {connection_id}] Send failed: {x}"),
|
|
|
|
|
},
|
|
|
|
|
Err(x) => {
|
|
|
|
|
error!(
|
|
|
|
|
"[Conn {connection_id}] Unable to serialize outgoing response: {x}"
|
|
|
|
|
);
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|