mirror of
https://git.meli.delivery/meli/meli
synced 2024-11-17 03:26:20 +00:00
melib/imap: timeout when establishing connection
This commit is contained in:
parent
0b00f5dfbc
commit
dede8d2a9e
@ -42,6 +42,7 @@ use crate::backends::{
|
||||
*,
|
||||
};
|
||||
use crate::conf::AccountSettings;
|
||||
use crate::connections::timeout;
|
||||
use crate::email::*;
|
||||
use crate::error::{MeliError, Result, ResultIntoMeliError};
|
||||
use futures::lock::Mutex as FutureMutex;
|
||||
@ -53,7 +54,7 @@ use std::hash::Hasher;
|
||||
use std::pin::Pin;
|
||||
use std::str::FromStr;
|
||||
use std::sync::{Arc, Mutex, RwLock};
|
||||
use std::time::Instant;
|
||||
use std::time::{Duration, Instant};
|
||||
pub type UID = usize;
|
||||
|
||||
pub static SUPPORTED_CAPABILITIES: &[&str] = &[
|
||||
@ -296,14 +297,12 @@ impl MailBackend for ImapType {
|
||||
*self.uid_store.sender.write().unwrap() = Some(sender);
|
||||
let uid_store = self.uid_store.clone();
|
||||
Ok(Box::pin(async move {
|
||||
let inbox = uid_store
|
||||
.mailboxes
|
||||
.lock()
|
||||
.await
|
||||
let inbox = timeout(Duration::from_secs(3), uid_store.mailboxes.lock())
|
||||
.await?
|
||||
.get(&mailbox_hash)
|
||||
.map(std::clone::Clone::clone)
|
||||
.unwrap();
|
||||
let mut conn = main_conn.lock().await;
|
||||
let mut conn = timeout(Duration::from_secs(3), main_conn.lock()).await?;
|
||||
watch::examine_updates(inbox, &mut conn, &uid_store).await?;
|
||||
Ok(())
|
||||
}))
|
||||
@ -1694,15 +1693,3 @@ async fn fetch_hlpr(
|
||||
};
|
||||
Ok(payload)
|
||||
}
|
||||
|
||||
use futures::future::{self, Either};
|
||||
|
||||
async fn timeout<O>(dur: std::time::Duration, f: impl Future<Output = O>) -> Result<O> {
|
||||
futures::pin_mut!(f);
|
||||
match future::select(f, smol::Timer::after(dur)).await {
|
||||
Either::Left((out, _)) => Ok(out),
|
||||
Either::Right(_) => {
|
||||
Err(MeliError::new("Timed out.").set_kind(crate::error::ErrorKind::Network))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -21,7 +21,7 @@
|
||||
|
||||
use super::protocol_parser::{ImapLineSplit, ImapResponse, RequiredResponses};
|
||||
use crate::backends::MailboxHash;
|
||||
use crate::connections::{lookup_ipv4, Connection};
|
||||
use crate::connections::{lookup_ipv4, timeout, Connection};
|
||||
use crate::email::parser::BytesExt;
|
||||
use crate::error::*;
|
||||
extern crate native_tls;
|
||||
@ -33,7 +33,7 @@ use std::future::Future;
|
||||
use std::iter::FromIterator;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use super::protocol_parser;
|
||||
use super::{Capabilities, ImapServerConf, UIDStore};
|
||||
@ -120,7 +120,7 @@ impl ImapStream {
|
||||
};
|
||||
|
||||
let mut socket = AsyncWrapper::new(Connection::Tcp(
|
||||
TcpStream::connect_timeout(&addr, std::time::Duration::new(4, 0))
|
||||
TcpStream::connect_timeout(&addr, Duration::new(4, 0))
|
||||
.chain_err_kind(crate::error::ErrorKind::Network)?,
|
||||
))
|
||||
.chain_err_kind(crate::error::ErrorKind::Network)?;
|
||||
@ -148,7 +148,7 @@ impl ImapStream {
|
||||
.chain_err_kind(crate::error::ErrorKind::Network)?;
|
||||
let mut response = String::with_capacity(1024);
|
||||
let mut broken = false;
|
||||
let now = std::time::Instant::now();
|
||||
let now = Instant::now();
|
||||
|
||||
while now.elapsed().as_secs() < 3 {
|
||||
let len = socket
|
||||
@ -225,7 +225,7 @@ impl ImapStream {
|
||||
)));
|
||||
};
|
||||
AsyncWrapper::new(Connection::Tcp(
|
||||
TcpStream::connect_timeout(&addr, std::time::Duration::new(4, 0))
|
||||
TcpStream::connect_timeout(&addr, Duration::new(4, 0))
|
||||
.chain_err_kind(crate::error::ErrorKind::Network)?,
|
||||
))
|
||||
.chain_err_kind(crate::error::ErrorKind::Network)?
|
||||
@ -370,7 +370,7 @@ impl ImapStream {
|
||||
ret.clear();
|
||||
let mut last_line_idx: usize = 0;
|
||||
loop {
|
||||
match self.stream.read(&mut buf).await {
|
||||
match timeout(Duration::from_secs(16), self.stream.read(&mut buf)).await? {
|
||||
Ok(0) => break,
|
||||
Ok(b) => {
|
||||
ret.push_str(unsafe { std::str::from_utf8_unchecked(&buf[0..b]) });
|
||||
@ -417,33 +417,36 @@ impl ImapStream {
|
||||
}
|
||||
|
||||
pub async fn send_command(&mut self, command: &[u8]) -> Result<()> {
|
||||
if let Err(err) = try_await(async move {
|
||||
let command = command.trim();
|
||||
match self.protocol {
|
||||
ImapProtocol::IMAP { .. } => {
|
||||
self.stream.write_all(b"M").await?;
|
||||
self.stream
|
||||
.write_all(self.cmd_id.to_string().as_bytes())
|
||||
.await?;
|
||||
self.stream.write_all(b" ").await?;
|
||||
self.cmd_id += 1;
|
||||
if let Err(err) = timeout(
|
||||
Duration::from_secs(16),
|
||||
try_await(async move {
|
||||
let command = command.trim();
|
||||
match self.protocol {
|
||||
ImapProtocol::IMAP { .. } => {
|
||||
self.stream.write_all(b"M").await?;
|
||||
self.stream
|
||||
.write_all(self.cmd_id.to_string().as_bytes())
|
||||
.await?;
|
||||
self.stream.write_all(b" ").await?;
|
||||
self.cmd_id += 1;
|
||||
}
|
||||
ImapProtocol::ManageSieve => {}
|
||||
}
|
||||
ImapProtocol::ManageSieve => {}
|
||||
}
|
||||
|
||||
self.stream.write_all(command).await?;
|
||||
self.stream.write_all(b"\r\n").await?;
|
||||
self.stream.flush().await?;
|
||||
match self.protocol {
|
||||
ImapProtocol::IMAP { .. } => {
|
||||
debug!("sent: M{} {}", self.cmd_id - 1, unsafe {
|
||||
std::str::from_utf8_unchecked(command)
|
||||
});
|
||||
self.stream.write_all(command).await?;
|
||||
self.stream.write_all(b"\r\n").await?;
|
||||
self.stream.flush().await?;
|
||||
match self.protocol {
|
||||
ImapProtocol::IMAP { .. } => {
|
||||
debug!("sent: M{} {}", self.cmd_id - 1, unsafe {
|
||||
std::str::from_utf8_unchecked(command)
|
||||
});
|
||||
}
|
||||
ImapProtocol::ManageSieve => {}
|
||||
}
|
||||
ImapProtocol::ManageSieve => {}
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
Ok(())
|
||||
}),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Err(err.set_err_kind(crate::error::ErrorKind::Network))
|
||||
@ -498,7 +501,7 @@ impl ImapConnection {
|
||||
pub fn connect<'a>(&'a mut self) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
|
||||
Box::pin(async move {
|
||||
if let (instant, ref mut status @ Ok(())) = *self.uid_store.is_online.lock().unwrap() {
|
||||
if Instant::now().duration_since(instant) >= std::time::Duration::new(60 * 30, 0) {
|
||||
if Instant::now().duration_since(instant) >= Duration::new(60 * 30, 0) {
|
||||
*status = Err(MeliError::new("Connection timed out"));
|
||||
self.stream = Err(MeliError::new("Connection timed out"));
|
||||
}
|
||||
@ -845,7 +848,7 @@ impl ImapBlockingConnection {
|
||||
async fn read(
|
||||
conn: &mut ImapBlockingConnection,
|
||||
break_flag: &mut bool,
|
||||
prev_failure: &mut Option<std::time::Instant>,
|
||||
prev_failure: &mut Option<Instant>,
|
||||
) -> Option<Vec<u8>> {
|
||||
let ImapBlockingConnection {
|
||||
ref mut prev_res_length,
|
||||
|
@ -186,3 +186,16 @@ pub fn lookup_ipv4(host: &str, port: u16) -> crate::Result<std::net::SocketAddr>
|
||||
.set_kind(crate::error::ErrorKind::Network),
|
||||
)
|
||||
}
|
||||
|
||||
use futures::future::{self, Either, Future};
|
||||
|
||||
pub async fn timeout<O>(dur: std::time::Duration, f: impl Future<Output = O>) -> crate::Result<O> {
|
||||
futures::pin_mut!(f);
|
||||
match future::select(f, smol::Timer::after(dur)).await {
|
||||
Either::Left((out, _)) => Ok(out),
|
||||
Either::Right(_) => {
|
||||
Err(crate::error::MeliError::new("Timed out.")
|
||||
.set_kind(crate::error::ErrorKind::Network))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user