use std::shared_ptr for pending queries

pull/2045/head
Jeff Becker 2 years ago
parent c7a133ac9c
commit 9aa6b64c1e
No known key found for this signature in database
GPG Key ID: 025C02EE3A092F2D

@ -86,7 +86,7 @@ namespace llarp::dns
{
class Resolver;
class Query : public QueryJob_Base
class Query : public QueryJob_Base, public std::enable_shared_from_this<Query>
{
std::shared_ptr<PacketSource_Base> src;
SockAddr resolverAddr;
@ -126,7 +126,7 @@ namespace llarp::dns
#endif
std::optional<SockAddr> m_LocalAddr;
std::set<int> m_Pending;
std::unordered_map<int, std::shared_ptr<Query>> m_Pending;
struct ub_result_deleter
{
@ -148,9 +148,11 @@ namespace llarp::dns
{
// take ownership of ub_result
std::unique_ptr<ub_result, ub_result_deleter> result{_result};
// take ownership of our query
std::unique_ptr<Query> query{static_cast<Query*>(data)};
// borrow query
auto weak_query = static_cast<Query*>(data)->weak_from_this();
auto query = weak_query.lock();
if (not query)
return;
if (err)
{
// some kind of error from upstream
@ -167,9 +169,7 @@ namespace llarp::dns
hdr.id = query->Underlying().hdr_id;
buf.cur = buf.base;
hdr.Encode(&buf);
// remove pending query
if (auto ptr = query->parent.lock())
ptr->call([id = query->id, ptr]() { ptr->m_Pending.erase(id); });
// send reply
query->SendReply(std::move(pkt));
}
@ -343,6 +343,12 @@ namespace llarp::dns
return m_LocalAddr;
}
void
RemovePending(int id)
{
m_Pending.erase(id);
}
void
Up(const llarp::DnsConfig& conf)
{
@ -378,10 +384,14 @@ namespace llarp::dns
runner = std::thread{[this]() {
while (running)
{
ub_wait(m_ctx);
std::this_thread::sleep_for(10ms);
// poll and process callbacks it this thread
if (ub_poll(m_ctx))
{
ub_process(m_ctx);
}
else // nothing to do, sleep.
std::this_thread::sleep_for(10ms);
}
ub_process(m_ctx);
}};
#else
if (auto loop = m_Loop.lock())
@ -403,19 +413,23 @@ namespace llarp::dns
{
#ifdef _WIN32
if (running.exchange(false))
{
log::debug(logcat, "shutting down win32 dns thread");
runner.join();
}
#else
if (m_Poller)
m_Poller->close();
#endif
if (m_ctx)
{
log::debug(logcat, "cancelling {} pending queries", m_Pending.size());
// cancel pending queries
// make copy as ub_cancel modifies m_Pending
const auto pending = m_Pending;
for (auto id : pending)
::ub_cancel(m_ctx, id);
m_Pending.clear();
for (const auto& [id, query] : m_Pending)
query->Cancel();
if (auto err = ::ub_wait(m_ctx))
log::warning(logcat, "issue tearing down unbound: {}", ub_strerror(err));
::ub_ctx_delete(m_ctx);
m_ctx = nullptr;
@ -470,8 +484,8 @@ namespace llarp::dns
{
if (WouldLoop(to, from))
return false;
// we use this unique ptr to clean up on fail
auto tmp = std::make_unique<Query>(weak_from_this(), query, source, to, from);
auto tmp = std::make_shared<Query>(weak_from_this(), query, source, to, from);
// no questions, send fail
if (query.questions.empty())
{
@ -494,6 +508,15 @@ namespace llarp::dns
tmp->Cancel();
return true;
}
#ifdef _WIN32
if (not running)
{
// we are stopping the win32 thread
tmp->Cancel();
return true;
}
#endif
const auto& q = query.questions[0];
if (auto err = ub_resolve_async(
m_ctx,
@ -509,11 +532,8 @@ namespace llarp::dns
tmp->Cancel();
}
else
{
m_Pending.insert(tmp->id);
// Leak the bare pointer we gave to unbound; we'll recapture it in Callback
(void)tmp.release();
}
m_Pending.emplace(tmp->id, tmp);
return true;
}
};
@ -521,14 +541,22 @@ namespace llarp::dns
void
Query::SendReply(llarp::OwnedBuffer replyBuf) const
{
if (auto ptr = parent.lock())
auto parent_ptr = parent.lock();
if (parent_ptr)
{
ptr->call([src = src, from = resolverAddr, to = askerAddr, buf = replyBuf.copy()] {
parent_ptr->call([parent_ptr,
id = id,
src = src,
from = resolverAddr,
to = askerAddr,
buf = replyBuf.copy()] {
src->SendTo(to, from, OwnedBuffer::copy_from(buf));
// remove query
parent_ptr->RemovePending(id);
});
}
else
log::error(logcat, "no source or parent");
log::error(logcat, "no parent");
}
} // namespace libunbound

Loading…
Cancel
Save