From 72cc998595374f9ca8f7f62f79626397e92ce74b Mon Sep 17 00:00:00 2001 From: Chip Senkbeil Date: Tue, 6 Jun 2023 13:38:10 -0700 Subject: [PATCH] Update change to include timestamp and details fields --- CHANGELOG.md | 6 + distant-core/src/client/watcher.rs | 46 +++++-- distant-local/src/api/state/watcher.rs | 32 ++++- distant-local/src/api/state/watcher/path.rs | 21 ++-- distant-protocol/src/common/change.rs | 37 ++++++ distant-protocol/src/response.rs | 129 +++++++++++++++++++- 6 files changed, 241 insertions(+), 30 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 82dabc8..4adbd41 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 change notification (will aggregate and merge changes) - `debounce_tick_rate = ` to specify how long to wait between event aggregation loops +- `distant-protocol` response for a change now supports these additional + fields: + - `timestamp` (serialized as `ts`) to communicate the seconds since unix + epoch when the event was received + - `details` containing `attributes` (clarify changes on attribute kind) and + `extra` (to convey arbitrary platform-specific extra information) ### Changed diff --git a/distant-core/src/client/watcher.rs b/distant-core/src/client/watcher.rs index ec66455..57c3d01 100644 --- a/distant-core/src/client/watcher.rs +++ b/distant-core/src/client/watcher.rs @@ -263,12 +263,16 @@ mod tests { req.id, vec![ protocol::Response::Changed(Change { + timestamp: 0, kind: ChangeKind::Access, paths: vec![test_path.to_path_buf()], + details: Default::default(), }), protocol::Response::Changed(Change { + timestamp: 1, kind: ChangeKind::Modify, paths: vec![test_path.to_path_buf()], + details: Default::default(), }), ], )) @@ -280,8 +284,10 @@ mod tests { assert_eq!( change, Change { + timestamp: 0, kind: ChangeKind::Access, - paths: vec![test_path.to_path_buf()] + paths: vec![test_path.to_path_buf()], + details: Default::default(), } ); @@ -289,8 +295,10 @@ mod tests { assert_eq!( change, Change { + timestamp: 1, kind: ChangeKind::Modify, - paths: vec![test_path.to_path_buf()] + paths: vec![test_path.to_path_buf()], + details: Default::default(), } ); } @@ -330,8 +338,10 @@ mod tests { .write_frame_for(&Response::new( req.id.clone(), protocol::Response::Changed(Change { + timestamp: 0, kind: ChangeKind::Access, paths: vec![test_path.to_path_buf()], + details: Default::default(), }), )) .await @@ -342,8 +352,10 @@ mod tests { .write_frame_for(&Response::new( req.id.clone() + "1", protocol::Response::Changed(Change { + timestamp: 1, kind: ChangeKind::Modify, paths: vec![test_path.to_path_buf()], + details: Default::default(), }), )) .await @@ -354,8 +366,10 @@ mod tests { .write_frame_for(&Response::new( req.id, protocol::Response::Changed(Change { + timestamp: 2, kind: ChangeKind::Delete, paths: vec![test_path.to_path_buf()], + details: Default::default(), }), )) .await @@ -366,8 +380,10 @@ mod tests { assert_eq!( change, Change { + timestamp: 0, kind: ChangeKind::Access, - paths: vec![test_path.to_path_buf()] + paths: vec![test_path.to_path_buf()], + details: Default::default(), } ); @@ -375,8 +391,10 @@ mod tests { assert_eq!( change, Change { + timestamp: 2, kind: ChangeKind::Delete, - paths: vec![test_path.to_path_buf()] + paths: vec![test_path.to_path_buf()], + details: Default::default(), } ); } @@ -414,16 +432,22 @@ mod tests { req.id, vec![ protocol::Response::Changed(Change { + timestamp: 0, kind: ChangeKind::Access, paths: vec![test_path.to_path_buf()], + details: Default::default(), }), protocol::Response::Changed(Change { + timestamp: 1, kind: ChangeKind::Modify, paths: vec![test_path.to_path_buf()], + details: Default::default(), }), protocol::Response::Changed(Change { + timestamp: 2, kind: ChangeKind::Delete, paths: vec![test_path.to_path_buf()], + details: Default::default(), }), ], )) @@ -447,8 +471,10 @@ mod tests { assert_eq!( change, Change { + timestamp: 0, kind: ChangeKind::Access, - paths: vec![test_path.to_path_buf()] + paths: vec![test_path.to_path_buf()], + details: Default::default(), } ); @@ -470,8 +496,10 @@ mod tests { .write_frame_for(&Response::new( req.id, protocol::Response::Changed(Change { + timestamp: 3, kind: ChangeKind::Unknown, paths: vec![test_path.to_path_buf()], + details: Default::default(), }), )) .await @@ -482,15 +510,19 @@ mod tests { assert_eq!( watcher.lock().await.next().await, Some(Change { + timestamp: 1, kind: ChangeKind::Modify, - paths: vec![test_path.to_path_buf()] + paths: vec![test_path.to_path_buf()], + details: Default::default(), }) ); assert_eq!( watcher.lock().await.next().await, Some(Change { + timestamp: 2, kind: ChangeKind::Delete, - paths: vec![test_path.to_path_buf()] + paths: vec![test_path.to_path_buf()], + details: Default::default(), }) ); assert_eq!(watcher.lock().await.next().await, None); diff --git a/distant-local/src/api/state/watcher.rs b/distant-local/src/api/state/watcher.rs index 1d4c222..7bfcee8 100644 --- a/distant-local/src/api/state/watcher.rs +++ b/distant-local/src/api/state/watcher.rs @@ -2,12 +2,12 @@ use std::collections::HashMap; use std::io; use std::ops::Deref; use std::path::{Path, PathBuf}; -use std::time::Duration; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; use distant_core::net::common::ConnectionId; -use distant_core::protocol::ChangeKind; +use distant_core::protocol::{Change, ChangeDetails, ChangeDetailsAttributes, ChangeKind}; use log::*; -use notify::event::{AccessKind, AccessMode, ModifyKind}; +use notify::event::{AccessKind, AccessMode, MetadataKind, ModifyKind}; use notify::{ Config as WatcherConfig, Error as WatcherError, ErrorKind as WatcherErrorKind, Event as WatcherEvent, EventKind, PollWatcher, RecommendedWatcher, RecursiveMode, Watcher, @@ -317,6 +317,11 @@ async fn watcher_task( } } InnerWatcherMsg::Event { ev } => { + let timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("System time before unix epoch") + .as_secs(); + let kind = match ev.kind { EventKind::Access(AccessKind::Read) => ChangeKind::Access, EventKind::Modify(ModifyKind::Metadata(_)) => ChangeKind::Attribute, @@ -332,8 +337,27 @@ async fn watcher_task( _ => ChangeKind::Unknown, }; + let attributes = match ev.kind { + EventKind::Modify(ModifyKind::Metadata(MetadataKind::WriteTime)) => { + vec![ChangeDetailsAttributes::Timestamp] + } + EventKind::Modify(ModifyKind::Metadata( + MetadataKind::Ownership | MetadataKind::Permissions, + )) => vec![ChangeDetailsAttributes::Permissions], + _ => Vec::new(), + }; + for registered_path in registered_paths.iter() { - match registered_path.filter_and_send(kind, &ev.paths).await { + let change = Change { + timestamp, + kind, + paths: ev.paths.clone(), + details: ChangeDetails { + attributes: attributes.clone(), + extra: ev.info().map(ToString::to_string), + }, + }; + match registered_path.filter_and_send(change).await { Ok(_) => (), Err(x) => error!( "[Conn {}] Failed to forward changes to paths: {}", diff --git a/distant-local/src/api/state/watcher/path.rs b/distant-local/src/api/state/watcher/path.rs index c3cb98b..ef52c4b 100644 --- a/distant-local/src/api/state/watcher/path.rs +++ b/distant-local/src/api/state/watcher/path.rs @@ -4,7 +4,7 @@ use std::{fmt, io}; use distant_core::net::common::ConnectionId; use distant_core::net::server::Reply; -use distant_core::protocol::{Change, ChangeKind, ChangeKindSet, Error, Response}; +use distant_core::protocol::{Change, ChangeKindSet, Error, Response}; /// Represents a path registered with a watcher that includes relevant state including /// the ability to reply with @@ -122,24 +122,17 @@ impl RegisteredPath { /// out any paths that are not applicable /// /// Returns true if message was sent, and false if not - pub async fn filter_and_send(&self, kind: ChangeKind, paths: T) -> io::Result - where - T: IntoIterator, - T::Item: AsRef, - { - if !self.allowed().contains(&kind) { + pub async fn filter_and_send(&self, mut change: Change) -> io::Result { + if !self.allowed().contains(&change.kind) { return Ok(false); } - let paths: Vec = paths - .into_iter() - .filter(|p| self.applies_to_path(p.as_ref())) - .map(|p| p.as_ref().to_path_buf()) - .collect(); + // filter the paths that are not applicable + change.paths.retain(|p| self.applies_to_path(p.as_path())); - if !paths.is_empty() { + if !change.paths.is_empty() { self.reply - .send(Response::Changed(Change { kind, paths })) + .send(Response::Changed(change)) .await .map(|_| true) } else { diff --git a/distant-protocol/src/common/change.rs b/distant-protocol/src/common/change.rs index d2a70fd..13c38dc 100644 --- a/distant-protocol/src/common/change.rs +++ b/distant-protocol/src/common/change.rs @@ -14,11 +14,48 @@ use strum::{EnumString, EnumVariantNames, VariantNames}; #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "snake_case", deny_unknown_fields)] pub struct Change { + /// Unix timestamp (in seconds) when the server was notified of this change (not when the + /// change occurred) + #[serde(rename = "ts")] + pub timestamp: u64, + /// Label describing the kind of change pub kind: ChangeKind, /// Paths that were changed pub paths: Vec, + + /// Additional details associated with the change + #[serde(default, skip_serializing_if = "ChangeDetails::is_empty")] + pub details: ChangeDetails, +} + +/// Details about a change +#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)] +#[serde(default, rename_all = "snake_case", deny_unknown_fields)] +pub struct ChangeDetails { + /// Clarity on type of attribute changes that have occurred (for kind == attribute) + #[serde(skip_serializing_if = "Vec::is_empty")] + pub attributes: Vec, + + /// Optional information about the change that is typically platform-specific + #[serde(skip_serializing_if = "Option::is_none")] + pub extra: Option, +} + +impl ChangeDetails { + /// Returns true if no details are contained within. + pub fn is_empty(&self) -> bool { + self.attributes.is_empty() && self.extra.is_none() + } +} + +/// Specific details about modification +#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case", deny_unknown_fields)] +pub enum ChangeDetailsAttributes { + Permissions, + Timestamp, } /// Represents a label attached to a [`Change`] that describes the kind of change. diff --git a/distant-protocol/src/response.rs b/distant-protocol/src/response.rs index 30c204b..1d98afb 100644 --- a/distant-protocol/src/response.rs +++ b/distant-protocol/src/response.rs @@ -615,13 +615,15 @@ mod tests { use std::path::PathBuf; use super::*; - use crate::common::ChangeKind; + use crate::common::{ChangeDetails, ChangeDetailsAttributes, ChangeKind}; #[test] - fn should_be_able_to_serialize_to_json() { + fn should_be_able_to_serialize_minimal_payload_to_json() { let payload = Response::Changed(Change { + timestamp: u64::MAX, kind: ChangeKind::Access, paths: vec![PathBuf::from("path")], + details: ChangeDetails::default(), }); let value = serde_json::to_value(payload).unwrap(); @@ -629,6 +631,7 @@ mod tests { value, serde_json::json!({ "type": "changed", + "ts": u64::MAX, "kind": "access", "paths": ["path"], }) @@ -636,9 +639,38 @@ mod tests { } #[test] - fn should_be_able_to_deserialize_from_json() { + fn should_be_able_to_serialize_full_payload_to_json() { + let payload = Response::Changed(Change { + timestamp: u64::MAX, + kind: ChangeKind::Access, + paths: vec![PathBuf::from("path")], + details: ChangeDetails { + attributes: vec![ChangeDetailsAttributes::Permissions], + extra: Some(String::from("info")), + }, + }); + + let value = serde_json::to_value(payload).unwrap(); + assert_eq!( + value, + serde_json::json!({ + "type": "changed", + "ts": u64::MAX, + "kind": "access", + "paths": ["path"], + "details": { + "attributes": ["permissions"], + "extra": "info", + }, + }) + ); + } + + #[test] + fn should_be_able_to_deserialize_minimal_payload_from_json() { let value = serde_json::json!({ "type": "changed", + "ts": u64::MAX, "kind": "access", "paths": ["path"], }); @@ -647,17 +679,49 @@ mod tests { assert_eq!( payload, Response::Changed(Change { + timestamp: u64::MAX, kind: ChangeKind::Access, paths: vec![PathBuf::from("path")], + details: ChangeDetails::default(), }) ); } #[test] - fn should_be_able_to_serialize_to_msgpack() { + fn should_be_able_to_deserialize_full_payload_from_json() { + let value = serde_json::json!({ + "type": "changed", + "ts": u64::MAX, + "kind": "access", + "paths": ["path"], + "details": { + "attributes": ["permissions"], + "extra": "info", + }, + }); + + let payload: Response = serde_json::from_value(value).unwrap(); + assert_eq!( + payload, + Response::Changed(Change { + timestamp: u64::MAX, + kind: ChangeKind::Access, + paths: vec![PathBuf::from("path")], + details: ChangeDetails { + attributes: vec![ChangeDetailsAttributes::Permissions], + extra: Some(String::from("info")), + }, + }) + ); + } + + #[test] + fn should_be_able_to_serialize_minimal_payload_to_msgpack() { let payload = Response::Changed(Change { + timestamp: u64::MAX, kind: ChangeKind::Access, paths: vec![PathBuf::from("path")], + details: ChangeDetails::default(), }); // NOTE: We don't actually check the output here because it's an implementation detail @@ -668,14 +732,35 @@ mod tests { } #[test] - fn should_be_able_to_deserialize_from_msgpack() { + fn should_be_able_to_serialize_full_payload_to_msgpack() { + let payload = Response::Changed(Change { + timestamp: u64::MAX, + kind: ChangeKind::Access, + paths: vec![PathBuf::from("path")], + details: ChangeDetails { + attributes: vec![ChangeDetailsAttributes::Permissions], + extra: Some(String::from("info")), + }, + }); + + // NOTE: We don't actually check the output here because it's an implementation detail + // and could change as we change how serialization is done. This is merely to verify + // that we can serialize since there are times when serde fails to serialize at + // runtime. + let _ = rmp_serde::encode::to_vec_named(&payload).unwrap(); + } + + #[test] + fn should_be_able_to_deserialize_minimal_payload_from_msgpack() { // NOTE: It may seem odd that we are serializing just to deserialize, but this is to // verify that we are not corrupting or causing issues when serializing on a // client/server and then trying to deserialize on the other side. This has happened // enough times with minor changes that we need tests to verify. let buf = rmp_serde::encode::to_vec_named(&Response::Changed(Change { + timestamp: u64::MAX, kind: ChangeKind::Access, paths: vec![PathBuf::from("path")], + details: ChangeDetails::default(), })) .unwrap(); @@ -683,8 +768,42 @@ mod tests { assert_eq!( payload, Response::Changed(Change { + timestamp: u64::MAX, kind: ChangeKind::Access, paths: vec![PathBuf::from("path")], + details: ChangeDetails::default(), + }) + ); + } + + #[test] + fn should_be_able_to_deserialize_full_payload_from_msgpack() { + // NOTE: It may seem odd that we are serializing just to deserialize, but this is to + // verify that we are not corrupting or causing issues when serializing on a + // client/server and then trying to deserialize on the other side. This has happened + // enough times with minor changes that we need tests to verify. + let buf = rmp_serde::encode::to_vec_named(&Response::Changed(Change { + timestamp: u64::MAX, + kind: ChangeKind::Access, + paths: vec![PathBuf::from("path")], + details: ChangeDetails { + attributes: vec![ChangeDetailsAttributes::Permissions], + extra: Some(String::from("info")), + }, + })) + .unwrap(); + + let payload: Response = rmp_serde::decode::from_slice(&buf).unwrap(); + assert_eq!( + payload, + Response::Changed(Change { + timestamp: u64::MAX, + kind: ChangeKind::Access, + paths: vec![PathBuf::from("path")], + details: ChangeDetails { + attributes: vec![ChangeDetailsAttributes::Permissions], + extra: Some(String::from("info")), + }, }) ); }