Update change to include timestamp and details fields

pull/196/head
Chip Senkbeil 11 months ago
parent 4eaae55d53
commit 72cc998595
No known key found for this signature in database
GPG Key ID: 35EF1F8EC72A4131

@ -24,6 +24,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
change notification (will aggregate and merge changes)
- `debounce_tick_rate = <secs>` to specify how long to wait between event
aggregation loops
- `distant-protocol` response for a change now supports these additional
fields:
- `timestamp` (serialized as `ts`) to communicate the seconds since unix
epoch when the event was received
- `details` containing `attributes` (clarify changes on attribute kind) and
`extra` (to convey arbitrary platform-specific extra information)
### Changed

@ -263,12 +263,16 @@ mod tests {
req.id,
vec![
protocol::Response::Changed(Change {
timestamp: 0,
kind: ChangeKind::Access,
paths: vec![test_path.to_path_buf()],
details: Default::default(),
}),
protocol::Response::Changed(Change {
timestamp: 1,
kind: ChangeKind::Modify,
paths: vec![test_path.to_path_buf()],
details: Default::default(),
}),
],
))
@ -280,8 +284,10 @@ mod tests {
assert_eq!(
change,
Change {
timestamp: 0,
kind: ChangeKind::Access,
paths: vec![test_path.to_path_buf()]
paths: vec![test_path.to_path_buf()],
details: Default::default(),
}
);
@ -289,8 +295,10 @@ mod tests {
assert_eq!(
change,
Change {
timestamp: 1,
kind: ChangeKind::Modify,
paths: vec![test_path.to_path_buf()]
paths: vec![test_path.to_path_buf()],
details: Default::default(),
}
);
}
@ -330,8 +338,10 @@ mod tests {
.write_frame_for(&Response::new(
req.id.clone(),
protocol::Response::Changed(Change {
timestamp: 0,
kind: ChangeKind::Access,
paths: vec![test_path.to_path_buf()],
details: Default::default(),
}),
))
.await
@ -342,8 +352,10 @@ mod tests {
.write_frame_for(&Response::new(
req.id.clone() + "1",
protocol::Response::Changed(Change {
timestamp: 1,
kind: ChangeKind::Modify,
paths: vec![test_path.to_path_buf()],
details: Default::default(),
}),
))
.await
@ -354,8 +366,10 @@ mod tests {
.write_frame_for(&Response::new(
req.id,
protocol::Response::Changed(Change {
timestamp: 2,
kind: ChangeKind::Delete,
paths: vec![test_path.to_path_buf()],
details: Default::default(),
}),
))
.await
@ -366,8 +380,10 @@ mod tests {
assert_eq!(
change,
Change {
timestamp: 0,
kind: ChangeKind::Access,
paths: vec![test_path.to_path_buf()]
paths: vec![test_path.to_path_buf()],
details: Default::default(),
}
);
@ -375,8 +391,10 @@ mod tests {
assert_eq!(
change,
Change {
timestamp: 2,
kind: ChangeKind::Delete,
paths: vec![test_path.to_path_buf()]
paths: vec![test_path.to_path_buf()],
details: Default::default(),
}
);
}
@ -414,16 +432,22 @@ mod tests {
req.id,
vec![
protocol::Response::Changed(Change {
timestamp: 0,
kind: ChangeKind::Access,
paths: vec![test_path.to_path_buf()],
details: Default::default(),
}),
protocol::Response::Changed(Change {
timestamp: 1,
kind: ChangeKind::Modify,
paths: vec![test_path.to_path_buf()],
details: Default::default(),
}),
protocol::Response::Changed(Change {
timestamp: 2,
kind: ChangeKind::Delete,
paths: vec![test_path.to_path_buf()],
details: Default::default(),
}),
],
))
@ -447,8 +471,10 @@ mod tests {
assert_eq!(
change,
Change {
timestamp: 0,
kind: ChangeKind::Access,
paths: vec![test_path.to_path_buf()]
paths: vec![test_path.to_path_buf()],
details: Default::default(),
}
);
@ -470,8 +496,10 @@ mod tests {
.write_frame_for(&Response::new(
req.id,
protocol::Response::Changed(Change {
timestamp: 3,
kind: ChangeKind::Unknown,
paths: vec![test_path.to_path_buf()],
details: Default::default(),
}),
))
.await
@ -482,15 +510,19 @@ mod tests {
assert_eq!(
watcher.lock().await.next().await,
Some(Change {
timestamp: 1,
kind: ChangeKind::Modify,
paths: vec![test_path.to_path_buf()]
paths: vec![test_path.to_path_buf()],
details: Default::default(),
})
);
assert_eq!(
watcher.lock().await.next().await,
Some(Change {
timestamp: 2,
kind: ChangeKind::Delete,
paths: vec![test_path.to_path_buf()]
paths: vec![test_path.to_path_buf()],
details: Default::default(),
})
);
assert_eq!(watcher.lock().await.next().await, None);

@ -2,12 +2,12 @@ use std::collections::HashMap;
use std::io;
use std::ops::Deref;
use std::path::{Path, PathBuf};
use std::time::Duration;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use distant_core::net::common::ConnectionId;
use distant_core::protocol::ChangeKind;
use distant_core::protocol::{Change, ChangeDetails, ChangeDetailsAttributes, ChangeKind};
use log::*;
use notify::event::{AccessKind, AccessMode, ModifyKind};
use notify::event::{AccessKind, AccessMode, MetadataKind, ModifyKind};
use notify::{
Config as WatcherConfig, Error as WatcherError, ErrorKind as WatcherErrorKind,
Event as WatcherEvent, EventKind, PollWatcher, RecommendedWatcher, RecursiveMode, Watcher,
@ -317,6 +317,11 @@ async fn watcher_task<W>(
}
}
InnerWatcherMsg::Event { ev } => {
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("System time before unix epoch")
.as_secs();
let kind = match ev.kind {
EventKind::Access(AccessKind::Read) => ChangeKind::Access,
EventKind::Modify(ModifyKind::Metadata(_)) => ChangeKind::Attribute,
@ -332,8 +337,27 @@ async fn watcher_task<W>(
_ => ChangeKind::Unknown,
};
let attributes = match ev.kind {
EventKind::Modify(ModifyKind::Metadata(MetadataKind::WriteTime)) => {
vec![ChangeDetailsAttributes::Timestamp]
}
EventKind::Modify(ModifyKind::Metadata(
MetadataKind::Ownership | MetadataKind::Permissions,
)) => vec![ChangeDetailsAttributes::Permissions],
_ => Vec::new(),
};
for registered_path in registered_paths.iter() {
match registered_path.filter_and_send(kind, &ev.paths).await {
let change = Change {
timestamp,
kind,
paths: ev.paths.clone(),
details: ChangeDetails {
attributes: attributes.clone(),
extra: ev.info().map(ToString::to_string),
},
};
match registered_path.filter_and_send(change).await {
Ok(_) => (),
Err(x) => error!(
"[Conn {}] Failed to forward changes to paths: {}",

@ -4,7 +4,7 @@ use std::{fmt, io};
use distant_core::net::common::ConnectionId;
use distant_core::net::server::Reply;
use distant_core::protocol::{Change, ChangeKind, ChangeKindSet, Error, Response};
use distant_core::protocol::{Change, ChangeKindSet, Error, Response};
/// Represents a path registered with a watcher that includes relevant state including
/// the ability to reply with
@ -122,24 +122,17 @@ impl RegisteredPath {
/// out any paths that are not applicable
///
/// Returns true if message was sent, and false if not
pub async fn filter_and_send<T>(&self, kind: ChangeKind, paths: T) -> io::Result<bool>
where
T: IntoIterator,
T::Item: AsRef<Path>,
{
if !self.allowed().contains(&kind) {
pub async fn filter_and_send(&self, mut change: Change) -> io::Result<bool> {
if !self.allowed().contains(&change.kind) {
return Ok(false);
}
let paths: Vec<PathBuf> = paths
.into_iter()
.filter(|p| self.applies_to_path(p.as_ref()))
.map(|p| p.as_ref().to_path_buf())
.collect();
// filter the paths that are not applicable
change.paths.retain(|p| self.applies_to_path(p.as_path()));
if !paths.is_empty() {
if !change.paths.is_empty() {
self.reply
.send(Response::Changed(Change { kind, paths }))
.send(Response::Changed(change))
.await
.map(|_| true)
} else {

@ -14,11 +14,48 @@ use strum::{EnumString, EnumVariantNames, VariantNames};
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case", deny_unknown_fields)]
pub struct Change {
/// Unix timestamp (in seconds) when the server was notified of this change (not when the
/// change occurred)
#[serde(rename = "ts")]
pub timestamp: u64,
/// Label describing the kind of change
pub kind: ChangeKind,
/// Paths that were changed
pub paths: Vec<PathBuf>,
/// Additional details associated with the change
#[serde(default, skip_serializing_if = "ChangeDetails::is_empty")]
pub details: ChangeDetails,
}
/// Details about a change
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(default, rename_all = "snake_case", deny_unknown_fields)]
pub struct ChangeDetails {
/// Clarity on type of attribute changes that have occurred (for kind == attribute)
#[serde(skip_serializing_if = "Vec::is_empty")]
pub attributes: Vec<ChangeDetailsAttributes>,
/// Optional information about the change that is typically platform-specific
#[serde(skip_serializing_if = "Option::is_none")]
pub extra: Option<String>,
}
impl ChangeDetails {
/// Returns true if no details are contained within.
pub fn is_empty(&self) -> bool {
self.attributes.is_empty() && self.extra.is_none()
}
}
/// Specific details about modification
#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case", deny_unknown_fields)]
pub enum ChangeDetailsAttributes {
Permissions,
Timestamp,
}
/// Represents a label attached to a [`Change`] that describes the kind of change.

@ -615,13 +615,15 @@ mod tests {
use std::path::PathBuf;
use super::*;
use crate::common::ChangeKind;
use crate::common::{ChangeDetails, ChangeDetailsAttributes, ChangeKind};
#[test]
fn should_be_able_to_serialize_to_json() {
fn should_be_able_to_serialize_minimal_payload_to_json() {
let payload = Response::Changed(Change {
timestamp: u64::MAX,
kind: ChangeKind::Access,
paths: vec![PathBuf::from("path")],
details: ChangeDetails::default(),
});
let value = serde_json::to_value(payload).unwrap();
@ -629,6 +631,7 @@ mod tests {
value,
serde_json::json!({
"type": "changed",
"ts": u64::MAX,
"kind": "access",
"paths": ["path"],
})
@ -636,9 +639,38 @@ mod tests {
}
#[test]
fn should_be_able_to_deserialize_from_json() {
fn should_be_able_to_serialize_full_payload_to_json() {
let payload = Response::Changed(Change {
timestamp: u64::MAX,
kind: ChangeKind::Access,
paths: vec![PathBuf::from("path")],
details: ChangeDetails {
attributes: vec![ChangeDetailsAttributes::Permissions],
extra: Some(String::from("info")),
},
});
let value = serde_json::to_value(payload).unwrap();
assert_eq!(
value,
serde_json::json!({
"type": "changed",
"ts": u64::MAX,
"kind": "access",
"paths": ["path"],
"details": {
"attributes": ["permissions"],
"extra": "info",
},
})
);
}
#[test]
fn should_be_able_to_deserialize_minimal_payload_from_json() {
let value = serde_json::json!({
"type": "changed",
"ts": u64::MAX,
"kind": "access",
"paths": ["path"],
});
@ -647,17 +679,49 @@ mod tests {
assert_eq!(
payload,
Response::Changed(Change {
timestamp: u64::MAX,
kind: ChangeKind::Access,
paths: vec![PathBuf::from("path")],
details: ChangeDetails::default(),
})
);
}
#[test]
fn should_be_able_to_serialize_to_msgpack() {
fn should_be_able_to_deserialize_full_payload_from_json() {
let value = serde_json::json!({
"type": "changed",
"ts": u64::MAX,
"kind": "access",
"paths": ["path"],
"details": {
"attributes": ["permissions"],
"extra": "info",
},
});
let payload: Response = serde_json::from_value(value).unwrap();
assert_eq!(
payload,
Response::Changed(Change {
timestamp: u64::MAX,
kind: ChangeKind::Access,
paths: vec![PathBuf::from("path")],
details: ChangeDetails {
attributes: vec![ChangeDetailsAttributes::Permissions],
extra: Some(String::from("info")),
},
})
);
}
#[test]
fn should_be_able_to_serialize_minimal_payload_to_msgpack() {
let payload = Response::Changed(Change {
timestamp: u64::MAX,
kind: ChangeKind::Access,
paths: vec![PathBuf::from("path")],
details: ChangeDetails::default(),
});
// NOTE: We don't actually check the output here because it's an implementation detail
@ -668,14 +732,35 @@ mod tests {
}
#[test]
fn should_be_able_to_deserialize_from_msgpack() {
fn should_be_able_to_serialize_full_payload_to_msgpack() {
let payload = Response::Changed(Change {
timestamp: u64::MAX,
kind: ChangeKind::Access,
paths: vec![PathBuf::from("path")],
details: ChangeDetails {
attributes: vec![ChangeDetailsAttributes::Permissions],
extra: Some(String::from("info")),
},
});
// NOTE: We don't actually check the output here because it's an implementation detail
// and could change as we change how serialization is done. This is merely to verify
// that we can serialize since there are times when serde fails to serialize at
// runtime.
let _ = rmp_serde::encode::to_vec_named(&payload).unwrap();
}
#[test]
fn should_be_able_to_deserialize_minimal_payload_from_msgpack() {
// NOTE: It may seem odd that we are serializing just to deserialize, but this is to
// verify that we are not corrupting or causing issues when serializing on a
// client/server and then trying to deserialize on the other side. This has happened
// enough times with minor changes that we need tests to verify.
let buf = rmp_serde::encode::to_vec_named(&Response::Changed(Change {
timestamp: u64::MAX,
kind: ChangeKind::Access,
paths: vec![PathBuf::from("path")],
details: ChangeDetails::default(),
}))
.unwrap();
@ -683,8 +768,42 @@ mod tests {
assert_eq!(
payload,
Response::Changed(Change {
timestamp: u64::MAX,
kind: ChangeKind::Access,
paths: vec![PathBuf::from("path")],
details: ChangeDetails::default(),
})
);
}
#[test]
fn should_be_able_to_deserialize_full_payload_from_msgpack() {
// NOTE: It may seem odd that we are serializing just to deserialize, but this is to
// verify that we are not corrupting or causing issues when serializing on a
// client/server and then trying to deserialize on the other side. This has happened
// enough times with minor changes that we need tests to verify.
let buf = rmp_serde::encode::to_vec_named(&Response::Changed(Change {
timestamp: u64::MAX,
kind: ChangeKind::Access,
paths: vec![PathBuf::from("path")],
details: ChangeDetails {
attributes: vec![ChangeDetailsAttributes::Permissions],
extra: Some(String::from("info")),
},
}))
.unwrap();
let payload: Response = rmp_serde::decode::from_slice(&buf).unwrap();
assert_eq!(
payload,
Response::Changed(Change {
timestamp: u64::MAX,
kind: ChangeKind::Access,
paths: vec![PathBuf::from("path")],
details: ChangeDetails {
attributes: vec![ChangeDetailsAttributes::Permissions],
extra: Some(String::from("info")),
},
})
);
}

Loading…
Cancel
Save