distant-net is broken now, but progress towards client/server properly using backup

pull/146/head
Chip Senkbeil 2 years ago
parent 03aa73e194
commit ccb2cc72d6
No known key found for this signature in database
GPG Key ID: 35EF1F8EC72A4131

@ -35,6 +35,9 @@ pub struct Client<T, U> {
/// Contains the task that is running to send requests and receive responses from a server
task: JoinHandle<()>,
// TODO: We need to change the task type above to return the transport when finished so we
// can reuse it via `reconnect`. We also want some form of retry strategy that determines
// how often we attempt to reconnect, handshake, reauthenticate, and synchronize.
}
impl<T, U> Client<T, U>
@ -58,6 +61,9 @@ where
let (reconnect_tx, mut reconnect_rx) = mpsc::channel::<oneshot::Sender<io::Result<()>>>(1);
let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1);
// Ensure that our transport starts off clean (nothing in buffers or backup)
transport.clear();
// Start a task that continually checks for responses and delivers them using the
// post office
let task = tokio::spawn(async move {
@ -65,7 +71,7 @@ where
let ready = tokio::select! {
_ = shutdown_rx.recv() => {
debug!("Client got shutdown signal, so exiting event loop");
break;
break transport;
}
cb = reconnect_rx.recv() => {
debug!("Client got reconnect signal, so attempting to reconnect");
@ -80,11 +86,17 @@ where
continue;
} else {
error!("Client callback for reconnect missing! Corrupt state!");
break;
break transport;
}
}
result = transport.ready(Interest::READABLE | Interest::WRITABLE) => {
result.expect("Failed to examine ready state")
match result {
Ok(result) => result,
Err(x) => {
error!("Failed to examine ready state: {x}");
break transport;
}
}
}
};
@ -122,7 +134,7 @@ where
}
Ok(None) => {
debug!("Connection closed");
break;
break transport;
}
Err(x) if x.kind() == io::ErrorKind::WouldBlock => read_blocked = true,
Err(x) => {

@ -191,6 +191,7 @@ where
// Remove the connection from our state if it has closed
if let Some(state) = Weak::upgrade(&state) {
state.connections.write().await.remove(&self.id);
// If we have no more connections, start the timer
if let Some(timer) = Weak::upgrade(&shutdown_timer) {
if state.connections.read().await.is_empty() {
@ -252,6 +253,14 @@ where
let local_data = Arc::new(local_data);
macro_rules! store_backup {
() => {{
// TODO: We want to store the transport's backup in our state with a mapping
// like id -> backup so when a new connection happens, we can
// repopulate the backup if it exists.
}};
}
loop {
let ready = match transport
.ready(Interest::READABLE | Interest::WRITABLE)

@ -93,10 +93,11 @@ impl<T> FramedTransport<T> {
self.codec.as_mut()
}
/// Clears the internal buffers used by the transport.
/// Clears the internal buffers and backup used by the transport.
pub fn clear(&mut self) {
self.incoming.clear();
self.outgoing.clear();
self.backup.clear();
}
}

Loading…
Cancel
Save