|
|
|
@ -171,12 +171,42 @@ where
|
|
|
|
|
verifier,
|
|
|
|
|
} = self;
|
|
|
|
|
|
|
|
|
|
// Will check if no more connections and restart timer if that's the case
|
|
|
|
|
macro_rules! terminate_connection {
|
|
|
|
|
// Prints an error message before terminating the connection
|
|
|
|
|
(@error $($msg:tt)+) => {
|
|
|
|
|
error!($($msg)+);
|
|
|
|
|
terminate_connection!();
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// Prints a debug message before terminating the connection
|
|
|
|
|
(@debug $($msg:tt)+) => {
|
|
|
|
|
debug!($($msg)+);
|
|
|
|
|
terminate_connection!();
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// Performs the connection termination by removing it from server state and
|
|
|
|
|
// restarting the shutdown timer if it was the last connection
|
|
|
|
|
() => {
|
|
|
|
|
// Remove the connection from our state if it has closed
|
|
|
|
|
if let Some(state) = Weak::upgrade(&state) {
|
|
|
|
|
state.connections.write().await.remove(&self.id);
|
|
|
|
|
// If we have no more connections, start the timer
|
|
|
|
|
if let Some(timer) = Weak::upgrade(&shutdown_timer) {
|
|
|
|
|
if state.connections.read().await.is_empty() {
|
|
|
|
|
timer.write().await.restart();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return;
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Attempt to upgrade our handler for use with the connection going forward
|
|
|
|
|
let handler = match Weak::upgrade(&handler) {
|
|
|
|
|
Some(handler) => handler,
|
|
|
|
|
None => {
|
|
|
|
|
error!("[Conn {id}] Handler has been dropped");
|
|
|
|
|
return;
|
|
|
|
|
terminate_connection!(@error "[Conn {id}] Handler has been dropped");
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
@ -186,21 +216,18 @@ where
|
|
|
|
|
// Perform a handshake to ensure that the connection is properly established
|
|
|
|
|
let mut transport: FramedTransport<T> = FramedTransport::plain(transport);
|
|
|
|
|
if let Err(x) = transport.server_handshake().await {
|
|
|
|
|
error!("[Conn {id}] Handshake failed: {x}");
|
|
|
|
|
return;
|
|
|
|
|
terminate_connection!(@error "[Conn {id}] Handshake failed: {x}");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Perform authentication to ensure the connection is valid
|
|
|
|
|
match Weak::upgrade(&verifier) {
|
|
|
|
|
Some(verifier) => {
|
|
|
|
|
if let Err(x) = verifier.verify(&mut transport).await {
|
|
|
|
|
error!("[Conn {id}] Verification failed: {x}");
|
|
|
|
|
return;
|
|
|
|
|
terminate_connection!(@error "[Conn {id}] Verification failed: {x}");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
None => {
|
|
|
|
|
error!("[Conn {id}] Verifier has been dropped");
|
|
|
|
|
return;
|
|
|
|
|
terminate_connection!(@error "[Conn {id}] Verifier has been dropped");
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
@ -215,17 +242,21 @@ where
|
|
|
|
|
})
|
|
|
|
|
.await
|
|
|
|
|
{
|
|
|
|
|
error!("[Conn {id}] Accepting connection failed: {x}");
|
|
|
|
|
return;
|
|
|
|
|
terminate_connection!(@error "[Conn {id}] Accepting connection failed: {x}");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let local_data = Arc::new(local_data);
|
|
|
|
|
|
|
|
|
|
loop {
|
|
|
|
|
let ready = transport
|
|
|
|
|
let ready = match transport
|
|
|
|
|
.ready(Interest::READABLE | Interest::WRITABLE)
|
|
|
|
|
.await
|
|
|
|
|
.expect("[Conn {connection_id}] Failed to examine ready state");
|
|
|
|
|
{
|
|
|
|
|
Ok(ready) => ready,
|
|
|
|
|
Err(x) => {
|
|
|
|
|
terminate_connection!(@error "[Conn {id}] Failed to examine ready state: {x}");
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// Keep track of whether we read or wrote anything
|
|
|
|
|
let mut read_blocked = false;
|
|
|
|
@ -266,20 +297,7 @@ where
|
|
|
|
|
}
|
|
|
|
|
},
|
|
|
|
|
Ok(None) => {
|
|
|
|
|
debug!("[Conn {id}] Connection closed");
|
|
|
|
|
|
|
|
|
|
// Remove the connection from our state if it has closed
|
|
|
|
|
if let Some(state) = Weak::upgrade(&state) {
|
|
|
|
|
state.connections.write().await.remove(&self.id);
|
|
|
|
|
|
|
|
|
|
// If we have no more connections, start the timer
|
|
|
|
|
if let Some(timer) = Weak::upgrade(&shutdown_timer) {
|
|
|
|
|
if state.connections.read().await.is_empty() {
|
|
|
|
|
timer.write().await.restart();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
break;
|
|
|
|
|
terminate_connection!(@debug "[Conn {id}] Connection closed");
|
|
|
|
|
}
|
|
|
|
|
Err(x) if x.kind() == io::ErrorKind::WouldBlock => read_blocked = true,
|
|
|
|
|
Err(x) => {
|
|
|
|
|