pull/146/head
Chip Senkbeil 2 years ago
parent 535e4478b0
commit 9a328bfb18
No known key found for this signature in database
GPG Key ID: 35EF1F8EC72A4131

@ -31,26 +31,88 @@ impl ManagerClient {
&mut self,
destination: impl Into<Destination>,
options: impl Into<Map>,
handler: impl AuthHandler,
handler: impl AuthHandler + Send,
) -> io::Result<Destination> {
let destination = Box::new(destination.into());
let options = options.into();
trace!("launch({}, {})", destination, options);
let res = self
.send(ManagerRequest::Launch {
let mailbox = self
.mail(ManagerRequest::Launch {
destination,
options,
})
.await?;
match res.payload {
ManagerResponse::Launched { destination } => Ok(destination),
ManagerResponse::Error(x) => Err(x.into()),
x => Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("Got unexpected response: {:?}", x),
)),
// Continue to process authentication challenges and other details until we are either
// launched or fail
while let Some(res) = mailbox.next().await {
match res.payload {
ManagerResponse::Authenticate { id, msg } => match msg {
Authentication::Initialization(x) => {
if log::log_enabled!(Level::Debug) {
debug!(
"Initializing authentication, supporting {}",
x.methods.into_iter().collect::<Vec<_>>().join(",")
);
}
let msg = AuthenticationResponse::Initialization(
handler.on_initialization(x).await?,
);
self.fire(Request::new(ManagerRequest::Authenticate { id, msg }))
.await?;
}
Authentication::StartMethod(x) => {
debug!("Starting authentication method {}", x.method);
}
Authentication::Challenge(x) => {
if log::log_enabled!(Level::Debug) {
for question in x.questions.iter() {
debug!(
"Received challenge question [{}]: {}",
question.label, question.text
);
}
}
let msg = AuthenticationResponse::Challenge(handler.on_challenge(x).await?);
self.fire(Request::new(ManagerRequest::Authenticate { id, msg }))
.await?;
}
Authentication::Verification(x) => {
debug!("Received verification request {}: {}", x.kind, x.text);
let msg =
AuthenticationResponse::Verification(handler.on_verification(x).await?);
self.fire(Request::new(ManagerRequest::Authenticate { id, msg }))
.await?;
}
Authentication::Info(x) => {
info!("{}", x.text);
}
Authentication::Error(x) => {
error!("{}", x.text);
if x.is_fatal() {
return Err(x.into_io_permission_denied());
}
}
Authentication::Finished => {
debug!("Finished authentication for {destination}");
}
},
ManagerResponse::Launched { destination } => return Ok(destination),
ManagerResponse::Error(x) => return Err(x.into()),
x => {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("Got unexpected response: {:?}", x),
))
}
}
}
Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"Missing connection confirmation",
))
}
/// Request that the manager establishes a new connection at the given `destination`

@ -24,7 +24,7 @@ pub struct ManagerConnection {
#[derive(Clone)]
pub struct ManagerChannel {
channel_id: ManagerChannelId,
tx: mpsc::Sender<Action>,
tx: mpsc::UnboundedSender<Action>,
}
impl ManagerChannel {
@ -32,14 +32,13 @@ impl ManagerChannel {
self.channel_id
}
pub async fn send<T: Serialize>(&self, request: Request<T>) -> io::Result<()> {
pub fn send<T: Serialize>(&self, request: Request<T>) -> io::Result<()> {
let channel_id = self.channel_id;
self.tx
.send(Action::Write {
id: channel_id,
data: request.to_vec()?,
})
.await
.map_err(|x| {
io::Error::new(
io::ErrorKind::BrokenPipe,
@ -48,11 +47,10 @@ impl ManagerChannel {
})
}
pub async fn close(&self) -> io::Result<()> {
pub fn close(&self) -> io::Result<()> {
let channel_id = self.channel_id;
self.tx
.send(Action::Unregister { id: channel_id })
.await
.map_err(|x| {
io::Error::new(
io::ErrorKind::BrokenPipe,
@ -91,10 +89,7 @@ impl ManagerConnection {
}
}
pub async fn open_channel(
&self,
reply: ServerReply<ManagerResponse>,
) -> io::Result<ManagerChannel> {
pub fn open_channel(&self, reply: ServerReply<ManagerResponse>) -> io::Result<ManagerChannel> {
let channel_id = rand::random();
self.tx
.send(Action::Register {

Loading…
Cancel
Save