Finish tests for common connection

pull/146/head
Chip Senkbeil 2 years ago
parent d81b5df330
commit 24518f9882
No known key found for this signature in database
GPG Key ID: 35EF1F8EC72A4131

@ -66,6 +66,22 @@ impl<T> Connection<T> {
Self::Server { .. } => None,
}
}
/// Returns a reference to the underlying transport.
pub fn transport(&self) -> &FramedTransport<T> {
match self {
Self::Client { transport, .. } => transport,
Self::Server { transport, .. } => transport,
}
}
/// Returns a mutable reference to the underlying transport.
pub fn mut_transport(&mut self) -> &mut FramedTransport<T> {
match self {
Self::Client { transport, .. } => transport,
Self::Server { transport, .. } => transport,
}
}
}
impl<T> Deref for Connection<T> {
@ -415,6 +431,7 @@ mod tests {
authentication::{msg::Challenge, Authenticator, DummyAuthHandler},
Frame,
};
use std::sync::Arc;
use test_log::test;
#[test(tokio::test)]
@ -986,43 +1003,283 @@ mod tests {
}
#[test(tokio::test)]
async fn reconnect_should_fail_if_client_side_connection_unable_to_reconnect_transport() {
todo!();
async fn client_server_new_connection_e2e_should_establish_connection() {
let (t1, t2) = InmemoryTransport::pair(100);
let verifier = Verifier::none();
let keychain = Keychain::new();
// Spawn a task to perform the server connection so we don't deadlock
let task = tokio::spawn(async move {
Connection::server(t2, &verifier, keychain)
.await
.expect("Failed to connect from server")
});
// Perform the client-side of the connection
let mut client = Connection::client(t1, DummyAuthHandler)
.await
.expect("Failed to connect from client");
let mut server = task.await.unwrap();
// Test out the connection
client.write_frame(Frame::new(b"hello")).await.unwrap();
assert_eq!(server.read_frame().await.unwrap().unwrap(), b"hello");
server.write_frame(Frame::new(b"goodbye")).await.unwrap();
assert_eq!(client.read_frame().await.unwrap().unwrap(), b"goodbye");
}
#[test(tokio::test)]
async fn reconnect_should_fail_if_client_side_connection_handshake_fails() {
todo!();
/// Helper utility to set up for a client reconnection
async fn setup_reconnect_scenario() -> (
Connection<InmemoryTransport>,
InmemoryTransport,
Arc<Verifier>,
Keychain<oneshot::Receiver<Backup>>,
) {
let (t1, t2) = InmemoryTransport::pair(100);
let verifier = Arc::new(Verifier::none());
let keychain = Keychain::new();
// Spawn a task to perform the server connection so we don't deadlock
let task = {
let verifier = Arc::clone(&verifier);
let keychain = keychain.clone();
tokio::spawn(async move {
Connection::server(t2, &verifier, keychain)
.await
.expect("Failed to connect from server")
})
};
// Perform the client-side of the connection
let mut client = Connection::client(t1, DummyAuthHandler)
.await
.expect("Failed to connect from client");
// Ensure the server is established and then drop it
let server = task.await.unwrap();
drop(server);
// Create a new inmemory transport and link it to the client
let mut t2 = InmemoryTransport::pair(100).0;
t2.link(client.mut_transport().as_mut_inner(), 100);
(client, t2, verifier, keychain)
}
#[test(tokio::test)]
async fn reconnect_should_fail_if_client_side_connection_unable_to_send_connect_type() {
todo!();
async fn reconnect_should_fail_if_client_side_connection_handshake_fails() {
let (mut client, transport, _verifier, _keychain) = setup_reconnect_scenario().await;
let mut transport = FramedTransport::plain(transport);
// Spawn a task to perform the client reconnection so we don't deadlock
let task = tokio::spawn(async move { client.reconnect().await.unwrap() });
// Send garbage to fail handshake from server-side
transport.write_frame(b"hello").await.unwrap();
// Client should fail
task.await.unwrap_err();
}
#[test(tokio::test)]
async fn reconnect_should_fail_if_client_side_connection_unable_to_receive_new_connection_id() {
todo!();
let (mut client, transport, _verifier, _keychain) = setup_reconnect_scenario().await;
let mut transport = FramedTransport::plain(transport);
// Spawn a task to perform the client reconnection so we don't deadlock
let task = tokio::spawn(async move { client.reconnect().await.unwrap() });
// Perform first step of completing server-side of handshake
transport.server_handshake().await.unwrap();
// Drop transport to cause client to fail in not receiving connection id
drop(transport);
// Client should fail
task.await.unwrap_err();
}
#[test(tokio::test)]
async fn reconnect_should_fail_if_client_side_connection_unable_to_exchange_otp_with_server() {
todo!();
let (mut client, transport, _verifier, keychain) = setup_reconnect_scenario().await;
let mut transport = FramedTransport::plain(transport);
// Spawn a task to perform the client reconnection so we don't deadlock
let task = tokio::spawn(async move { client.reconnect().await.unwrap() });
// Perform first step of completing server-side of handshake
transport.server_handshake().await.unwrap();
// Receive reconnect data from client-side
let (id, otp) = match transport.read_frame_as::<ConnectType>().await {
Ok(Some(ConnectType::Reconnect { id, otp })) => (id, HeapSecretKey::from(otp)),
x => panic!("Unexpected result: {x:?}"),
};
// Verify the id and OTP matches the one stored into our keychain from the setup
assert!(
keychain.has_key(id.to_string(), otp).await,
"Wrong id or OTP"
);
// Send a new id back to the client connection
transport
.write_frame_for(&rand::random::<ConnectionId>())
.await
.unwrap();
// Send garbage to fail the key exchange for new OTP
transport.write_frame(Frame::new(b"hello")).await.unwrap();
// Client should fail
task.await.unwrap_err();
}
#[test(tokio::test)]
async fn reconnect_should_fail_if_client_side_connection_unable_to_synchronize_with_server() {
todo!();
let (mut client, transport, _verifier, keychain) = setup_reconnect_scenario().await;
let mut transport = FramedTransport::plain(transport);
// Spawn a task to perform the client reconnection so we don't deadlock
let task = tokio::spawn(async move { client.reconnect().await.unwrap() });
// Perform first step of completing server-side of handshake
transport.server_handshake().await.unwrap();
// Receive reconnect data from client-side
let (id, otp) = match transport.read_frame_as::<ConnectType>().await {
Ok(Some(ConnectType::Reconnect { id, otp })) => (id, HeapSecretKey::from(otp)),
x => panic!("Unexpected result: {x:?}"),
};
// Verify the id and OTP matches the one stored into our keychain from the setup
assert!(
keychain.has_key(id.to_string(), otp).await,
"Wrong id or OTP"
);
// Send a new id back to the client connection
transport
.write_frame_for(&rand::random::<ConnectionId>())
.await
.unwrap();
// Send garbage to fail the key exchange for new OTP
transport.write_frame(Frame::new(b"hello")).await.unwrap();
// Client should fail
task.await.unwrap_err();
}
#[test(tokio::test)]
async fn reconnect_should_succeed_if_client_side_connection_fully_connects_and_synchronizes_with_server(
) {
todo!();
let (mut client, transport, _verifier, keychain) = setup_reconnect_scenario().await;
let mut transport = FramedTransport::plain(transport);
// Copy client backup for verification later
let client_backup = client.transport().backup.clone();
// Spawn a task to perform the client reconnection so we don't deadlock
let task = tokio::spawn(async move {
client.reconnect().await.unwrap();
client
});
// Perform first step of completing server-side of handshake
transport.server_handshake().await.unwrap();
// Receive reconnect data from client-side
let (id, otp) = match transport.read_frame_as::<ConnectType>().await {
Ok(Some(ConnectType::Reconnect { id, otp })) => (id, HeapSecretKey::from(otp)),
x => panic!("Unexpected result: {x:?}"),
};
// Retrieve server backup
let backup = keychain
.remove_if_has_key(id.to_string(), otp)
.await
.into_ok()
.expect("Invalid id or OTP")
.await
.expect("Failed to retrieve backup");
// Send a new id back to the client connection
transport
.write_frame_for(&rand::random::<ConnectionId>())
.await
.unwrap();
// Perform key exchange
let otp = transport.exchange_keys().await.unwrap();
// Perform synchronization after restoring backup
transport.backup = backup;
transport.synchronize().await.unwrap();
// Client should succeed
let mut client = task.await.unwrap();
assert_eq!(client.otp(), Some(&otp.into_heap_secret_key()));
// Verify client backup sent/received count was not modified (stored frames may be
// truncated, though)
assert_eq!(
client.transport().backup.sent_cnt(),
client_backup.sent_cnt(),
"Client backup sent cnt altered"
);
assert_eq!(
client.transport().backup.received_cnt(),
client_backup.received_cnt(),
"Client backup received cnt altered"
);
// Verify that client can send a frame and receive a frame, and that there is
// nothing unexpected in the buffers on either side
client.write_frame(Frame::new(b"hello")).await.unwrap();
assert_eq!(transport.read_frame().await.unwrap().unwrap(), b"hello");
transport.write_frame(Frame::new(b"goodbye")).await.unwrap();
assert_eq!(client.read_frame().await.unwrap().unwrap(), b"goodbye");
}
#[test(tokio::test)]
async fn reconnect_should_fail_if_connection_is_server_side() {
todo!();
let mut connection = Connection::Server {
id: rand::random(),
tx: oneshot::channel().0,
transport: FramedTransport::pair(100).0,
};
assert_eq!(
connection.reconnect().await.unwrap_err().kind(),
io::ErrorKind::Unsupported
);
}
#[test(tokio::test)]
async fn client_server_returning_connection_e2e_should_reestablish_connection() {
let (mut client, transport, verifier, keychain) = setup_reconnect_scenario().await;
// Spawn a task to perform the server reconnection so we don't deadlock
let task = tokio::spawn(async move {
Connection::server(transport, &verifier, keychain)
.await
.expect("Failed to connect from server")
});
// Reconnect and verify that the connection still works
client
.reconnect()
.await
.expect("Failed to reconnect from client");
// Ensure the server is established and then drop it
let mut server = task.await.unwrap();
// Test out the connection
client.write_frame(Frame::new(b"hello")).await.unwrap();
assert_eq!(server.read_frame().await.unwrap().unwrap(), b"hello");
server.write_frame(Frame::new(b"goodbye")).await.unwrap();
assert_eq!(client.read_frame().await.unwrap().unwrap(), b"goodbye");
}
}

@ -99,6 +99,16 @@ impl<T> FramedTransport<T> {
self.outgoing.clear();
}
/// Returns a reference to the inner value this transport wraps.
pub fn as_inner(&self) -> &T {
&self.inner
}
/// Returns a mutable reference to the inner value this transport wraps.
pub fn as_mut_inner(&mut self) -> &mut T {
&mut self.inner
}
/// Consumes this transport, returning the inner value that it wraps.
pub fn into_inner(self) -> T {
self.inner

@ -5,7 +5,7 @@ use std::collections::VecDeque;
const MAX_BACKUP_SIZE: usize = 256 * 1024 * 1024;
/// Stores [`Frame`]s for reuse later.
#[derive(Clone, Debug)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct Backup {
/// Maximum size (in bytes) to save frames in case we need to backup them
///
@ -115,7 +115,7 @@ impl Backup {
}
/// Returns how many frames have been sent.
pub(super) fn sent_cnt(&self) -> u64 {
pub(crate) fn sent_cnt(&self) -> u64 {
self.sent_cnt
}
@ -131,7 +131,7 @@ impl Backup {
}
/// Returns how many frames have been received.
pub(super) fn received_cnt(&self) -> u64 {
pub(crate) fn received_cnt(&self) -> u64 {
self.received_cnt
}
@ -191,7 +191,10 @@ impl Backup {
pub(super) fn truncate_front(&mut self, size: usize) {
if !self.frozen {
while self.frames.len() > size {
self.frames.pop_front();
if let Some(frame) = self.frames.pop_front() {
self.current_backup_size -=
std::cmp::min(frame.len(), self.current_backup_size);
}
}
}
}

Loading…
Cancel
Save