thread safety stuff

pull/1748/head
Jeff Becker 3 years ago committed by Jeff
parent ba57ab04aa
commit b225ec1043

@ -229,7 +229,7 @@ struct lokinet_context
/// make a udp handler and hold onto it
/// return its id
[[nodiscard]] int
[[nodiscard]] std::optional<int>
make_udp_handler(
const std::shared_ptr<llarp::service::Endpoint>& ep,
llarp::huint16_t exposePort,
@ -250,21 +250,38 @@ struct lokinet_context
});
}
auto udp = std::make_unique<UDPHandler>(
auto udp = std::make_shared<UDPHandler>(
next_socket_id(), llarp::ToNet(exposePort), filter, recv, timeout, user, std::weak_ptr{ep});
auto id = udp->m_SocketID;
auto pkt = ep->EgresPacketRouter();
pkt->AddUDPHandler(exposePort, [udp = udp.get(), this](auto from, auto pkt) {
udp->HandlePacketFrom(std::move(from), std::move(pkt));
std::promise<bool> result;
impl->router->loop()->call([ep, &result, udp]() {
if (auto pkt = ep->GetEgresPacketRouter())
{
pkt->AddUDPHandler(exposePort, [udp = std::weak_ptr{udp}](auto from, auto pkt) {
if (auto ptr = udp.lock())
{
ptr->HandlePacketFrom(std::move(from), std::move(pkt));
}
});
result.set_value(true);
}
else
result.set_value(false);
});
udp_sockets[udp->m_SocketID] = std::move(udp);
return id;
if (result.get_future().get())
{
udp_sockets[udp->m_SocketID] = std::move(udp);
return id;
}
return std::nullopt;
}
void
remove_udp_handler(int socket_id)
{
std::unique_ptr<UDPHandler> udp;
std::shared_ptr<UDPHandler> udp;
{
std::unique_lock lock{m_access};
if (auto itr = udp_sockets.find(socket_id); itr != udp_sockets.end())
@ -274,7 +291,14 @@ struct lokinet_context
}
}
if (udp)
{
udp->KillAllFlows();
// remove packet handler
impl->router->loop()->call([ep = udp->m_Endpoint.lock(), locaport = udp->m_LocalPort]() {
if (auto pkt = ep->EgresPacketRouter())
pkt->RemoveUDPHandler(localport);
});
}
}
/// acquire mutex for accessing this context
@ -291,7 +315,7 @@ struct lokinet_context
}
std::unordered_map<int, bool> streams;
std::unordered_map<int, std::unique_ptr<UDPHandler>> udp_sockets;
std::unordered_map<int, std::shared_ptr<UDPHandler>> udp_sockets;
void
inbound_stream(int id)
@ -868,19 +892,23 @@ extern "C"
auto lock = ctx->acquire();
if (auto ep = ctx->endpoint())
{
result->socket_id =
ctx->make_udp_handler(ep, llarp::huint16_t{exposedPort}, filter, recv, timeout, user);
return 0;
if (auto maybe =
ctx->make_udp_handler(ep, llarp::huint16_t{exposedPort}, filter, recv, timeout, user))
{
result->socket_id = *maybe;
return 0;
}
}
else
return EINVAL;
return EINVAL;
}
void EXPORT
lokinet_udp_close(int socket_id, struct lokinet_context* ctx)
{
if (ctx)
{
ctx->remove_udp_handler(socket_id);
}
}
int EXPORT
@ -918,7 +946,7 @@ extern "C"
if (pkt.sz == 0)
return EINVAL;
std::promise<int> ret;
ctx->impl->router->loop()->call_soon([addr = *maybe, pkt = std::move(pkt), ep, &ret]() {
ctx->impl->router->loop()->call([addr = *maybe, pkt = std::move(pkt), ep, &ret]() {
if (auto tag = ep->GetBestConvoTagFor(addr))
{
if (ep->SendToOrQueue(*tag, pkt.ConstBuffer(), llarp::service::ProtocolType::TrafficV4))
@ -946,6 +974,11 @@ extern "C"
std::shared_ptr<llarp::EndpointBase> ep;
{
auto lock = ctx->acquire();
if (ctx->impl->router->loop()->inEventLoop())
{
LogError("cannot call udp_establish from internal event loop");
return EINVAL;
}
if (auto itr = ctx->udp_sockets.find(remote->socket_id); itr != ctx->udp_sockets.end())
{
ep = itr->second->m_Endpoint.lock();
@ -969,7 +1002,7 @@ extern "C"
}
}
std::promise<bool> gotten;
ctx->impl->router->loop()->call_soon([addr = *maybe, ep, &gotten]() {
ctx->impl->router->loop()->call([addr = *maybe, ep, &gotten]() {
ep->EnsurePathTo(
addr, [&gotten](auto result) { gotten.set_value(result.has_value()); }, 5s);
});

Loading…
Cancel
Save