pull/146/head
Chip Senkbeil 2 years ago
parent a7568492d7
commit 535e4478b0
No known key found for this signature in database
GPG Key ID: 35EF1F8EC72A4131

@ -1,6 +1,12 @@
use crate::{
client::Client,
common::{authentication::AuthHandler, ConnectionId, Destination, Map},
common::{
authentication::{
msg::{Authentication, AuthenticationResponse},
AuthHandler,
},
ConnectionId, Destination, Map, Request,
},
manager::data::{
ConnectionInfo, ConnectionList, ManagerCapabilities, ManagerRequest, ManagerResponse,
},
@ -56,7 +62,7 @@ impl ManagerClient {
&mut self,
destination: impl Into<Destination>,
options: impl Into<Map>,
handler: impl AuthHandler,
handler: impl AuthHandler + Send,
) -> io::Result<ConnectionId> {
let destination = Box::new(destination.into());
let options = options.into();
@ -73,6 +79,56 @@ impl ManagerClient {
// connected or fail
while let Some(res) = mailbox.next().await {
match res.payload {
ManagerResponse::Authenticate { id, msg } => match msg {
Authentication::Initialization(x) => {
if log::log_enabled!(Level::Debug) {
debug!(
"Initializing authentication, supporting {}",
x.methods.into_iter().collect::<Vec<_>>().join(",")
);
}
let msg = AuthenticationResponse::Initialization(
handler.on_initialization(x).await?,
);
self.fire(Request::new(ManagerRequest::Authenticate { id, msg }))
.await?;
}
Authentication::StartMethod(x) => {
debug!("Starting authentication method {}", x.method);
}
Authentication::Challenge(x) => {
if log::log_enabled!(Level::Debug) {
for question in x.questions.iter() {
debug!(
"Received challenge question [{}]: {}",
question.label, question.text
);
}
}
let msg = AuthenticationResponse::Challenge(handler.on_challenge(x).await?);
self.fire(Request::new(ManagerRequest::Authenticate { id, msg }))
.await?;
}
Authentication::Verification(x) => {
debug!("Received verification request {}: {}", x.kind, x.text);
let msg =
AuthenticationResponse::Verification(handler.on_verification(x).await?);
self.fire(Request::new(ManagerRequest::Authenticate { id, msg }))
.await?;
}
Authentication::Info(x) => {
info!("{}", x.text);
}
Authentication::Error(x) => {
error!("{}", x.text);
if x.is_fatal() {
return Err(x.into_io_permission_denied());
}
}
Authentication::Finished => {
debug!("Finished authentication for {destination}");
}
},
ManagerResponse::Connected { id } => return Ok(id),
ManagerResponse::Error(x) => return Err(x.into()),
x => {
@ -83,6 +139,11 @@ impl ManagerClient {
}
}
}
Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"Missing connection confirmation",
))
}
/// Establishes a channel with the server represented by the `connection_id`,
@ -463,7 +524,10 @@ mod tests {
.unwrap();
transport
.write(Response::new(request.id, ManagerResponse::Shutdown))
.write(Response::new(
request.id,
ManagerResponse::Connected { id: 0 },
))
.await
.unwrap();
});

@ -41,16 +41,16 @@ impl DerefMut for ConnectionList {
}
}
impl Index<u64> for ConnectionList {
impl Index<ConnectionId> for ConnectionList {
type Output = Destination;
fn index(&self, connection_id: u64) -> &Self::Output {
fn index(&self, connection_id: ConnectionId) -> &Self::Output {
&self.0[&connection_id]
}
}
impl IndexMut<u64> for ConnectionList {
fn index_mut(&mut self, connection_id: u64) -> &mut Self::Output {
impl IndexMut<ConnectionId> for ConnectionList {
fn index_mut(&mut self, connection_id: ConnectionId) -> &mut Self::Output {
self.0
.get_mut(&connection_id)
.expect("No connection with id")

@ -8,9 +8,6 @@ use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(rename_all = "snake_case", deny_unknown_fields, tag = "type")]
pub enum ManagerResponse {
/// Acknowledgement of a response being processed
Ok,
/// Acknowledgement that a connection was killed
Killed,

@ -185,7 +185,7 @@ impl ManagerServer {
pub struct DistantManagerServerConnection {
/// Holds on to open channels feeding data back from a server to some connected client,
/// enabling us to cancel the tasks on demand
channels: RwLock<HashMap<ChannelId, ManagerChannel>>,
channels: RwLock<HashMap<ManagerChannelId, ManagerChannel>>,
}
#[async_trait]
@ -244,7 +244,7 @@ impl ServerHandler for ManagerServer {
ManagerRequest::Authenticate { id, msg } => {
match self.registry.write().await.remove(&id) {
Some(cb) => match cb.send(msg) {
Ok(_) => ManagerResponse::Ok,
Ok(_) => return,
Err(x) => ManagerResponse::Error(x.into()),
},
None => ManagerResponse::Error(

Loading…
Cancel
Save