pull/146/head
Chip Senkbeil 2 years ago
parent 9fccea014f
commit fa4a69a931
No known key found for this signature in database
GPG Key ID: 35EF1F8EC72A4131

@ -91,6 +91,36 @@ mod tests {
SocketAddr::from((addr, port))
}
async fn start_and_run_server(tx: oneshot::Sender<SocketAddr>) -> io::Result<()> {
let addr = find_ephemeral_addr().await;
// Start listening at the distinct address
let listener = TcpListener::bind(addr).await?;
// Send the address back to our main test thread
tx.send(addr)
.map_err(|x| io::Error::new(io::ErrorKind::Other, x.to_string()))?;
run_server(listener).await
}
async fn run_server(listener: TcpListener) -> io::Result<()> {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
// Get the connection
let (mut conn, _) = listener.accept().await?;
// Send some data to the connection (10 bytes)
conn.write_all(b"hello conn").await?;
// Receive some data from the connection (12 bytes)
let mut buf: [u8; 12] = [0; 12];
let _ = conn.read_exact(&mut buf).await?;
assert_eq!(&buf, b"hello server");
Ok(())
}
#[tokio::test]
async fn should_fail_to_connect_if_nothing_listening() {
let addr = find_ephemeral_addr().await;
@ -108,41 +138,72 @@ mod tests {
// Spawn a task that will wait for a connection, send data,
// and receive data that it will return in the task
let task: JoinHandle<io::Result<()>> = tokio::spawn(async move {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
let addr = find_ephemeral_addr().await;
let task: JoinHandle<io::Result<()>> = tokio::spawn(start_and_run_server(tx));
// Start listening at the distinct address
let listener = TcpListener::bind(addr).await?;
// Wait for the server to be ready
let addr = rx.await.expect("Failed to get server server address");
// Send the address back to our main test thread
tx.send(addr)
.map_err(|x| io::Error::new(io::ErrorKind::Other, x.to_string()))?;
// Connect to the socket, send some bytes, and get some bytes
let mut buf: [u8; 10] = [0; 10];
// Get the connection
let (mut conn, _) = listener.accept().await?;
let conn = TcpTransport::connect(&addr)
.await
.expect("Conn failed to connect");
// Send some data to the connection (10 bytes)
conn.write_all(b"hello conn").await?;
// Continually read until we get all of the data
conn.read_exact(&mut buf)
.await
.expect("Conn failed to read");
assert_eq!(&buf, b"hello conn");
conn.write_all(b"hello server")
.await
.expect("Conn failed to write");
// Receive some data from the connection (12 bytes)
let mut buf: [u8; 12] = [0; 12];
let _ = conn.read_exact(&mut buf).await?;
assert_eq!(&buf, b"hello server");
// Verify that the task has completed by waiting on it
let _ = task.await.expect("Server task failed unexpectedly");
}
Ok(())
});
#[tokio::test]
async fn should_be_able_to_reconnect() {
let (tx, rx) = oneshot::channel();
// Spawn a task that will wait for a connection, send data,
// and receive data that it will return in the task
let task: JoinHandle<io::Result<()>> = tokio::spawn(start_and_run_server(tx));
// Wait for the server to be ready
let addr = rx.await.expect("Failed to get server server address");
// Connect to the socket, send some bytes, and get some bytes
let mut buf: [u8; 10] = [0; 10];
let conn = TcpTransport::connect(&addr)
// Connect to the server
let mut conn = TcpTransport::connect(&addr)
.await
.expect("Conn failed to connect");
// Kill the server to make the connection fail
task.abort();
// Verify the connection fails by trying to read from it (should get connection reset)
conn.readable()
.await
.expect("Failed to wait for conn to be readable");
let res = conn.read_exact(&mut [0; 10]).await;
assert!(
matches!(res, Ok(0) | Err(_)),
"Unexpected read result: {res:?}"
);
// Restart the server
let task: JoinHandle<io::Result<()>> = tokio::spawn(run_server(
TcpListener::bind(addr)
.await
.expect("Failed to rebind server"),
));
// Reconnect to the socket, send some bytes, and get some bytes
let mut buf: [u8; 10] = [0; 10];
conn.reconnect().await.expect("Conn failed to reconnect");
// Continually read until we get all of the data
conn.read_exact(&mut buf)
.await

@ -70,6 +70,38 @@ mod tests {
task::JoinHandle,
};
async fn start_and_run_server(tx: oneshot::Sender<PathBuf>) -> io::Result<()> {
// Generate a socket path and delete the file after so there is nothing there
let path = NamedTempFile::new()
.expect("Failed to create socket file")
.path()
.to_path_buf();
// Start listening at the socket path
let listener = UnixListener::bind(&path)?;
// Send the path back to our main test thread
tx.send(path)
.map_err(|x| io::Error::new(io::ErrorKind::Other, x.display().to_string()))?;
run_server(listener).await
}
async fn run_server(listener: UnixListener) -> io::Result<()> {
// Get the connection
let (mut conn, _) = listener.accept().await?;
// Send some data to the connection (10 bytes)
conn.write_all(b"hello conn").await?;
// Receive some data from the connection (12 bytes)
let mut buf: [u8; 12] = [0; 12];
let _ = conn.read_exact(&mut buf).await?;
assert_eq!(&buf, b"hello server");
Ok(())
}
#[tokio::test]
async fn should_fail_to_connect_if_socket_does_not_exist() {
// Generate a socket path and delete the file after so there is nothing there
@ -103,43 +135,70 @@ mod tests {
// Spawn a task that will wait for a connection, send data,
// and receive data that it will return in the task
let task: JoinHandle<io::Result<()>> = tokio::spawn(async move {
// Generate a socket path and delete the file after so there is nothing there
let path = NamedTempFile::new()
.expect("Failed to create socket file")
.path()
.to_path_buf();
let task: JoinHandle<io::Result<()>> = tokio::spawn(start_and_run_server(tx));
// Start listening at the socket path
let socket = UnixListener::bind(&path)?;
// Wait for the server to be ready
let path = rx.await.expect("Failed to get server socket path");
// Send the path back to our main test thread
tx.send(path)
.map_err(|x| io::Error::new(io::ErrorKind::Other, x.display().to_string()))?;
// Connect to the socket, send some bytes, and get some bytes
let mut buf: [u8; 10] = [0; 10];
// Get the connection
let (mut conn, _) = socket.accept().await?;
let conn = UnixSocketTransport::connect(&path)
.await
.expect("Conn failed to connect");
conn.read_exact(&mut buf)
.await
.expect("Conn failed to read");
assert_eq!(&buf, b"hello conn");
// Send some data to the connection (10 bytes)
conn.write_all(b"hello conn").await?;
conn.write_all(b"hello server")
.await
.expect("Conn failed to write");
// Receive some data from the connection (12 bytes)
let mut buf: [u8; 12] = [0; 12];
let _ = conn.read_exact(&mut buf).await?;
assert_eq!(&buf, b"hello server");
// Verify that the task has completed by waiting on it
let _ = task.await.expect("Server task failed unexpectedly");
}
Ok(())
});
#[tokio::test]
async fn should_be_able_to_reconnect() {
let (tx, rx) = oneshot::channel();
// Spawn a task that will wait for a connection, send data,
// and receive data that it will return in the task
let task: JoinHandle<io::Result<()>> = tokio::spawn(start_and_run_server(tx));
// Wait for the server to be ready
let path = rx.await.expect("Failed to get server socket path");
// Connect to the socket, send some bytes, and get some bytes
let mut buf: [u8; 10] = [0; 10];
let conn = UnixSocketTransport::connect(&path)
// Connect to the server
let mut conn = UnixSocketTransport::connect(&path)
.await
.expect("Conn failed to connect");
// Kill the server to make the connection fail
task.abort();
// Verify the connection fails by trying to read from it (should get connection reset)
conn.readable()
.await
.expect("Failed to wait for conn to be readable");
let res = conn.read_exact(&mut [0; 10]).await;
assert!(
matches!(res, Ok(0) | Err(_)),
"Unexpected read result: {res:?}"
);
// Restart the server (need to remove the socket file)
let _ = tokio::fs::remove_file(&path).await;
let task: JoinHandle<io::Result<()>> = tokio::spawn(run_server(
UnixListener::bind(&path).expect("Failed to rebind server"),
));
// Reconnect to the socket, send some bytes, and get some bytes
let mut buf: [u8; 10] = [0; 10];
conn.reconnect().await.expect("Conn failed to reconnect");
// Continually read until we get all of the data
conn.read_exact(&mut buf)
.await
.expect("Conn failed to read");

@ -88,7 +88,47 @@ impl RawTransport for WindowsPipeTransport {
#[cfg(test)]
mod tests {
use super::*;
use tokio::{net::windows::named_pipe::ServerOptions, sync::oneshot, task::JoinHandle};
use tokio::{
net::windows::named_pipe::{NamedPipeServer, ServerOptions},
sync::oneshot,
task::JoinHandle,
};
async fn start_and_run_server(tx: oneshot::Sender<String>) -> io::Result<()> {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
// Generate a pipe address (not just a name)
let addr = format!(r"\\.\pipe\test_pipe_{}", rand::random::<usize>());
// Listen at the pipe
let pipe = ServerOptions::new()
.first_pipe_instance(true)
.create(&addr)?;
// Send the address back to our main test thread
tx.send(addr)
.map_err(|x| io::Error::new(io::ErrorKind::Other, x))?;
run_server(pipe).await
}
async fn run_server(pipe: NamedPipeServer) -> io::Result<()> {
// Get the connection
let mut conn = {
pipe.connect().await?;
pipe
};
// Send some data to the connection (10 bytes)
conn.write_all(b"hello conn").await?;
// Receive some data from the connection (12 bytes)
let mut buf: [u8; 12] = [0; 12];
let _ = conn.read_exact(&mut buf).await?;
assert_eq!(&buf, b"hello server");
Ok(())
}
#[tokio::test]
async fn should_fail_to_connect_if_pipe_does_not_exist() {
@ -107,47 +147,70 @@ mod tests {
// Spawn a task that will wait for a connection, send data,
// and receive data that it will return in the task
let task: JoinHandle<io::Result<()>> = tokio::spawn(async move {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
let task: JoinHandle<io::Result<()>> = tokio::spawn(start_and_run_server(tx));
// Generate a pipe address (not just a name)
let addr = format!(r"\\.\pipe\test_pipe_{}", rand::random::<usize>());
// Wait for the server to be ready
let address = rx.await.expect("Failed to get server address");
// Listen at the pipe
let pipe = ServerOptions::new()
.first_pipe_instance(true)
.create(&addr)?;
// Connect to the pipe, send some bytes, and get some bytes
let mut buf: [u8; 10] = [0; 10];
// Send the address back to our main test thread
tx.send(addr)
.map_err(|x| io::Error::new(io::ErrorKind::Other, x))?;
let conn = WindowsPipeTransport::connect(&address)
.await
.expect("Conn failed to connect");
conn.read_exact(&mut buf)
.await
.expect("Conn failed to read");
assert_eq!(&buf, b"hello conn");
// Get the connection
let mut conn = {
pipe.connect().await?;
pipe
};
conn.write_all(b"hello server")
.await
.expect("Conn failed to write");
// Send some data to the connection (10 bytes)
conn.write_all(b"hello conn").await?;
// Verify that the task has completed by waiting on it
let _ = task.await.expect("Server task failed unexpectedly");
}
// Receive some data from the connection (12 bytes)
let mut buf: [u8; 12] = [0; 12];
let _ = conn.read_exact(&mut buf).await?;
assert_eq!(&buf, b"hello server");
#[tokio::test]
async fn should_be_able_to_reconnect() {
let (tx, rx) = oneshot::channel();
Ok(())
});
// Spawn a task that will wait for a connection, send data,
// and receive data that it will return in the task
let task: JoinHandle<io::Result<()>> = tokio::spawn(start_and_run_server(tx));
// Wait for the server to be ready
let address = rx.await.expect("Failed to get server address");
// Connect to the pipe, send some bytes, and get some bytes
let mut buf: [u8; 10] = [0; 10];
let conn = WindowsPipeTransport::connect(&address)
// Connect to the server
let mut conn = WindowsPipeTransport::connect(&address)
.await
.expect("Conn failed to connect");
// Kill the server to make the connection fail
task.abort();
// Verify the connection fails by trying to read from it (should get connection reset)
conn.readable()
.await
.expect("Failed to wait for conn to be readable");
let res = conn.read_exact(&mut [0; 10]).await;
assert!(
matches!(res, Ok(0) | Err(_)),
"Unexpected read result: {res:?}"
);
// Restart the server
let task: JoinHandle<io::Result<()>> = tokio::spawn(run_server(
ServerOptions::new()
.first_pipe_instance(true)
.create(&address)?,
));
// Reconnect to the pipe, send some bytes, and get some bytes
let mut buf: [u8; 10] = [0; 10];
conn.reconnect().await.expect("Conn failed to reconnect");
conn.read_exact(&mut buf)
.await
.expect("Conn failed to read");

Loading…
Cancel
Save