From d902097acd971718514e178ff8cad5cd3e7ec350 Mon Sep 17 00:00:00 2001 From: Manos Pitsidianakis Date: Mon, 3 Jun 2024 09:25:25 +0300 Subject: [PATCH] melib/jmap: add event source support WIP Reading resources: RFC8620 - The JSON Meta Application Protocol (JMAP), 7.3. Event Source https://datatracker.ietf.org/doc/html/rfc8620#section-7.3 https://html.spec.whatwg.org/multipage/server-sent-events.html#server-sent-events Signed-off-by: Manos Pitsidianakis --- melib/src/jmap/eventsource.rs | 290 ++++++++++++++++++++++++++++++++++ melib/src/jmap/mod.rs | 43 ++--- 2 files changed, 315 insertions(+), 18 deletions(-) create mode 100644 melib/src/jmap/eventsource.rs diff --git a/melib/src/jmap/eventsource.rs b/melib/src/jmap/eventsource.rs new file mode 100644 index 00000000..80242fb0 --- /dev/null +++ b/melib/src/jmap/eventsource.rs @@ -0,0 +1,290 @@ +// +// meli +// +// Copyright 2021 Emmanouil Pitsidianakis +// +// This file is part of meli. +// +// meli is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// meli is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with meli. If not, see . +// +// SPDX-License-Identifier: EUPL-1.2 OR GPL-3.0-or-later + +use futures::io::{AsyncReadExt, AsyncWriteExt}; +use isahc::{ + config::{Configurable, RedirectPolicy}, + AsyncReadResponseExt, HttpClient, +}; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use crate::error::Result; +use crate::jmap::{methods::RequestUrlTemplate, JmapConnection, JmapServerConf, Store}; + +const DEFAULT_RETRY: u64 = 5000; + +/// A single Server-Sent Event. +#[derive(Debug, Default)] +pub struct Event { + /// Corresponds to the `id` field. + pub id: Option, + /// Corresponds to the `event` field. + pub event_type: Option, + /// All `data` fields concatenated by newlines. + pub data: String, +} + +/// Possible results from parsing a single event-stream line. +#[derive(Debug, PartialEq)] +pub enum ParseResult { + /// Line parsed successfully, but the event is not complete yet. + Next, + /// The event is complete now. Pass a new (empty) event for the next call. + Dispatch, + /// Set retry time. + SetRetry(Duration), +} + +pub fn parse_event_line(line: &str, event: &mut Event) -> ParseResult { + let line = line.trim_end_matches(|c| c == '\r' || c == '\n'); + if line == "" { + ParseResult::Dispatch + } else { + let (field, value) = if let Some(pos) = line.find(':') { + let (f, v) = line.split_at(pos); + // Strip : and an optional space. + let v = &v[1..]; + let v = if v.starts_with(' ') { &v[1..] } else { v }; + (f, v) + } else { + (line, "") + }; + match field { + "event" => { + event.event_type = Some(value.to_string()); + } + "data" => { + event.data.push_str(value); + event.data.push('\n'); + } + "id" => { + event.id = Some(value.to_string()); + } + "retry" => { + if let Ok(retry) = value.parse::() { + return ParseResult::SetRetry(Duration::from_millis(retry)); + } + } + _ => (), // ignored + } + + ParseResult::Next + } +} + +impl Event { + /// Creates an empty event. + pub fn new() -> Event { + Event::default() + } + + /// Returns `true` if the event is empty. + /// + /// An event is empty if it has no id or event type and its data field is empty. + pub fn is_empty(&self) -> bool { + self.id.is_none() && self.event_type.is_none() && self.data.is_empty() + } + + /// Makes the event empty. + pub fn clear(&mut self) { + self.id = None; + self.event_type = None; + self.data.clear(); + } +} + +/// A client for a Server-Sent Events endpoint. +/// +/// Read events by iterating over the client. +pub struct EventSourceConnection { + pub client: Arc, + pub store: Arc, + pub server_conf: JmapServerConf, + response: Option, + url: Arc, + last_event_id: Option, + last_try: Option, + pub retry: Duration, +} + +impl EventSourceConnection { + pub async fn new(conn: &JmapConnection) -> Result { + let url = { + let g = conn.session_guard().await?; + g.event_source_url.clone() + }; + debug!("event_source {:?}", &url); + Ok(Self { + client: conn.client.clone(), + server_conf: conn.server_conf.clone(), + store: conn.store.clone(), + response: None, + url, + last_event_id: None, + last_try: None, + retry: Duration::from_millis(DEFAULT_RETRY), + }) + } + + pub async fn next_request(&mut self) -> Result<()> { + use isahc::{http, http::request::Request, prelude::*}; + + let mut request = Request::get(self.url.url.as_str()) + .timeout(std::time::Duration::from_secs(10)) + .header(http::header::ACCEPT, "text/event-stream") + .ssl_options(if self.server_conf.danger_accept_invalid_certs { + isahc::config::SslOption::DANGER_ACCEPT_INVALID_CERTS + | isahc::config::SslOption::DANGER_ACCEPT_INVALID_HOSTS + | isahc::config::SslOption::DANGER_ACCEPT_REVOKED_CERTS + } else { + isahc::config::SslOption::NONE + }) + .redirect_policy(RedirectPolicy::Limit(10)); + request = if self.server_conf.use_token { + request + .authentication(isahc::auth::Authentication::none()) + .header( + http::header::AUTHORIZATION, + format!("Bearer {}", &self.server_conf.server_password), + ) + } else { + request + .authentication(isahc::auth::Authentication::basic()) + .credentials(isahc::auth::Credentials::new( + &self.server_conf.server_username, + &self.server_conf.server_password, + )) + }; + if let Some(ref id) = self.last_event_id { + request = request.header("Last-Event-ID", id.as_str()); + } + let request = request.body(()).map_err(|err| err.to_string())?; + + debug!(&request); + let mut response = self.client.send_async(request).await?; + debug!(&response); + //debug_assert!(response.status().is_success()); + /* + let mut headers = HeaderMap::with_capacity(2); + headers.insert(ACCEPT, HeaderValue::from_str("text/event-stream").unwrap()); + if let Some(ref id) = self.last_event_id { + headers.insert("Last-Event-ID", HeaderValue::from_str(id).unwrap()); + } + + let res = self.client.get(self.url.clone()).headers(headers).send()?; + */ + + // Check status code and Content-Type. + { + let status = response.status(); + if !status.is_success() { + let res_text = response.text().await?; + return Err(debug!(format!("{} {}", status.as_str(), res_text)).into()); + } + + if let Some(content_type_hv) = response.headers().get(isahc::http::header::CONTENT_TYPE) + { + if content_type_hv.to_str().unwrap() != "text/event-stream" { + log::error!("got content-type: {}", content_type_hv.to_str().unwrap()); + } + /* + let content_type = content_type_hv + .to_str() + .unwrap() + .to_string() + .parse::() + .unwrap(); + // Compare type and subtype only, MIME parameters are ignored. + if (content_type.type_(), content_type.subtype()) + != (mime::TEXT, mime::EVENT_STREAM) + { + return Err(ErrorKind::InvalidContentType(content_type.clone()).into()); + } + */ + } + } + + self.response = Some(response.into_body()); + Ok(()) + } + + pub async fn next_event(&mut self) -> Result { + let mut line = Vec::new(); + 'main_loop: loop { + match self.response.as_mut() { + None => { + // wait for the next request. + if let Some(last_try) = self.last_try { + let elapsed = last_try.elapsed(); + if elapsed < self.retry { + crate::utils::futures::sleep(self.retry - elapsed).await; + } + } + // Set here in case the request fails. + self.last_try = Some(Instant::now()); + + self.next_request().await?; + } + Some(reader) => { + let mut event = Event::new(); + loop { + match reader.read(&mut line).await { + Ok(0) => { + // EOF or a stream error, retry after timeout + self.last_try = Some(Instant::now()); + self.response = None; + continue 'main_loop; + } + // Got new bytes from stream + Ok(_n) => { + log::trace!("read line {}", String::from_utf8_lossy(&line)); + if let Ok(s) = std::str::from_utf8(&line) { + match parse_event_line(s, &mut event) { + ParseResult::Next => {} // okay, just continue + ParseResult::Dispatch => { + if let Some(ref id) = event.id { + self.last_event_id = Some(id.clone()); + } + return Ok(event); + } + ParseResult::SetRetry(ref retry) => { + self.retry = *retry; + } + } + } + line.clear(); + } + Err(err) => { + // EOF or a stream error, retry after timeout + self.last_try = Some(Instant::now()); + self.response = None; + debug!(&err); + continue 'main_loop; + } + } + } + } + } + } + } +} diff --git a/melib/src/jmap/mod.rs b/melib/src/jmap/mod.rs index 47e79f29..b5c9d1f1 100644 --- a/melib/src/jmap/mod.rs +++ b/melib/src/jmap/mod.rs @@ -104,6 +104,7 @@ pub mod argument; pub mod capabilities; pub mod comparator; pub mod email; +pub mod eventsource; pub mod filters; pub mod identity; pub mod mailbox; @@ -448,30 +449,36 @@ impl MailBackend for JmapType { fn watch(&self) -> ResultFuture<()> { let connection = self.connection.clone(); - let store = self.store.clone(); + //let store = self.store.clone(); Ok(Box::pin(async move { - { + let mut event_source = { let mut conn = connection.lock().await; conn.connect().await?; - } + eventsource::EventSourceConnection::new(&conn).await? + }; loop { - { - let mailbox_hashes = { - store - .mailboxes - .read() - .unwrap() - .keys() - .cloned() - .collect::>() - }; - let conn = connection.lock().await; - for mailbox_hash in mailbox_hashes { - conn.email_changes(mailbox_hash).await?; - } + while let Ok(event) = event_source.next_event().await { + log::debug!("watch: got event_source event {:?}", event); } - sleep(Duration::from_secs(60)).await; } + //loop { + // { + // let mailbox_hashes = { + // store + // .mailboxes + // .read() + // .unwrap() + // .keys() + // .cloned() + // .collect::>() + // }; + // let conn = connection.lock().await; + // for mailbox_hash in mailbox_hashes { + // conn.email_changes(mailbox_hash).await?; + // } + // } + // sleep(Duration::from_secs(60)).await; + //} })) }