Simplify watch event passing by dropping events rather than blocking the hot code path

pull/118/head
Chip Senkbeil 2 years ago
parent 6991543ff7
commit e51392fcde
No known key found for this signature in database
GPG Key ID: 35EF1F8EC72A4131

@ -10,9 +10,6 @@ pub const CLIENT_WATCHER_CAPACITY: usize = 100;
/// Capacity associated with the server's file watcher to pass events outbound /// Capacity associated with the server's file watcher to pass events outbound
pub const SERVER_WATCHER_CAPACITY: usize = 10000; pub const SERVER_WATCHER_CAPACITY: usize = 10000;
/// Duration in milliseconds to sleep when reaching maximum watcher events in queue
pub const SERVER_WATCHER_PAUSE_MILLIS: u64 = 100;
/// Represents the maximum size (in bytes) that data will be read from pipes /// Represents the maximum size (in bytes) that data will be read from pipes
/// per individual `read` call /// per individual `read` call
/// ///

@ -1,5 +1,5 @@
use crate::{ use crate::{
constants::{SERVER_WATCHER_CAPACITY, SERVER_WATCHER_PAUSE_MILLIS}, constants::SERVER_WATCHER_CAPACITY,
data::{ data::{
self, Change, ChangeKind, ChangeKindSet, DirEntry, FileType, Metadata, PtySize, Request, self, Change, ChangeKind, ChangeKindSet, DirEntry, FileType, Metadata, PtySize, Request,
RequestData, Response, ResponseData, RunningProcess, SystemInfo, RequestData, Response, ResponseData, RunningProcess, SystemInfo,
@ -19,7 +19,7 @@ use std::{
path::{Path, PathBuf}, path::{Path, PathBuf},
pin::Pin, pin::Pin,
sync::Arc, sync::Arc,
time::{Duration, SystemTime}, time::SystemTime,
}; };
use tokio::{ use tokio::{
io::{self, AsyncWriteExt}, io::{self, AsyncWriteExt},
@ -406,28 +406,16 @@ where
// with a large volume of watch requests // with a large volume of watch requests
let (tx, mut rx) = mpsc::channel(SERVER_WATCHER_CAPACITY); let (tx, mut rx) = mpsc::channel(SERVER_WATCHER_CAPACITY);
let mut watcher = notify::recommended_watcher(move |res| { let mut watcher = notify::recommended_watcher(move |res| match tx.try_send(res) {
let mut res = res; Ok(_) => {}
Err(TrySendError::Full(_)) => {
// Attempt to send our result, breaking out of the loop warn!(
// if we succeed or it is impossible, otherwise trying "Reached watcher capacity of {}! Dropping watcher event!",
// again after a brief sleep SERVER_WATCHER_CAPACITY,
loop { );
match tx.try_send(res) { }
Ok(_) => break, Err(TrySendError::Closed(_)) => {
Err(TrySendError::Full(x)) => { warn!("Skipping watch event because watcher channel closed");
warn!(
"Reached watcher capacity of {}! Trying again after {}ms",
SERVER_WATCHER_CAPACITY, SERVER_WATCHER_PAUSE_MILLIS
);
res = x;
std::thread::sleep(Duration::from_millis(SERVER_WATCHER_PAUSE_MILLIS));
}
Err(TrySendError::Closed(_)) => {
warn!("Skipping watch event because watcher channel closed");
break;
}
}
} }
})?; })?;

Loading…
Cancel
Save