Merge pull request #81 from majestrate/dev

various fixes
pull/82/head
Jeff 6 years ago committed by GitHub
commit 1ed197e730
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -45,8 +45,12 @@ add_compile_options(-Wno-cast-function-type)
set(FS_LIB stdc++fs) set(FS_LIB stdc++fs)
endif(WIN32) endif(WIN32)
if(DEBIAN) if(DEBIAN)
add_definitions(-DDEBIAN) add_definitions(-DDEBIAN)
else()
set(CRYPTO_FLAGS -march=native)
endif() endif()
set(CMAKE_THREAD_PREFER_PTHREAD TRUE) set(CMAKE_THREAD_PREFER_PTHREAD TRUE)
@ -112,20 +116,6 @@ if(CMAKE_BUILD_TYPE MATCHES "[Dd][Ee][Bb][Uu][Gg]")
add_compile_options( ${DEBUG_FLAGS} ) add_compile_options( ${DEBUG_FLAGS} )
endif() endif()
if(NOT DEBIAN)
if(NOT ANDROID)
if(NOT RPI)
if (NOT USE_AVX2)
set(CRYPTO_FLAGS -march=core2 -mtune=native)
set(CMAKE_ASM_FLAGS "-march=core2")
else()
set(CRYPTO_FLAGS -march=haswell -mtune=native)
set(CMAKE_ASM_FLAGS "-march=haswell -mtune=native ${CMAKE_ASM_FLAGS} $ENV{ASFLAGS}")
endif()
endif()
endif()
endif()
if(RPI) if(RPI)
add_definitions(-DRPI) add_definitions(-DRPI)
set(WITH_STATIC ON) set(WITH_STATIC ON)

@ -51,11 +51,12 @@ RPI ?= OFF
STATIC_LINK ?= OFF STATIC_LINK ?= OFF
NETNS ?= OFF NETNS ?= OFF
CLANG ?= OFF CLANG ?= OFF
CROSS ?= OFF
CMAKE_GEN ?= Unix Makefiles CMAKE_GEN ?= Unix Makefiles
BUILD_ROOT = $(REPO)/build BUILD_ROOT = $(REPO)/build
CONFIG_CMD = $(shell /bin/echo -n "cd '$(BUILD_ROOT)' && " ; /bin/echo -n "cmake -G'$(CMAKE_GEN)' -DUSING_CLANG=$(CLANG) -DSTATIC_LINK=$(STATIC_LINK) -DUSE_NETNS=$(NETNS) -DUSE_AVX2=$(AVX2) -DUSE_LIBABYSS=$(JSONRPC) -DRPI=$(RPI) '$(REPO)'") CONFIG_CMD = $(shell /bin/echo -n "cd '$(BUILD_ROOT)' && " ; /bin/echo -n "cmake -G'$(CMAKE_GEN)' -DCMAKE_CROSSCOMPILING=$(CROSS) -DUSING_CLANG=$(CLANG) -DSTATIC_LINK=$(STATIC_LINK) -DUSE_NETNS=$(NETNS) -DUSE_AVX2=$(AVX2) -DUSE_LIBABYSS=$(JSONRPC) -DRPI=$(RPI) '$(REPO)'")
SCAN_BUILD ?= scan-build SCAN_BUILD ?= scan-build
ANALYZE_CONFIG_CMD = $(shell /bin/echo -n "cd '$(BUILD_ROOT)' && " ; /bin/echo -n "$(SCAN_BUILD) cmake -DUSE_LIBABYSS=$(JSONRPC) '$(REPO)'") ANALYZE_CONFIG_CMD = $(shell /bin/echo -n "cd '$(BUILD_ROOT)' && " ; /bin/echo -n "$(SCAN_BUILD) cmake -DUSE_LIBABYSS=$(JSONRPC) '$(REPO)'")

@ -74,7 +74,7 @@ namespace llarp
Validate(const V& value) const = 0; Validate(const V& value) const = 0;
void void
OnFound(const Key_t& askedPeer, const V& value) OnFound(const Key_t askedPeer, const V& value)
{ {
peersAsked.insert(askedPeer); peersAsked.insert(askedPeer);
if(Validate(value)) if(Validate(value))
@ -98,18 +98,26 @@ namespace llarp
Key_t peer; Key_t peer;
if(next) if(next)
{ {
peer = *next.get(); // explicit next peer provided
peer = next->data();
} }
else else if(!GetNextPeer(peer, peersAsked))
{ {
if(!GetNextPeer(peer, peersAsked)) // no more peers
{ llarp::LogInfo("no more peers for request asking for", target);
// no more peers return false;
SendReply();
return false;
}
} }
const Key_t targetKey = target.data();
if((prevPeer ^ targetKey) < (peer ^ targetKey))
{
// next peer is not closer
llarp::LogInfo("next peer ", peer, " is not closer to ", target,
" than ", prevPeer);
return false;
}
else
peersAsked.insert(peer);
DoNextRequest(peer); DoNextRequest(peer);
return true; return true;
} }
@ -297,6 +305,7 @@ namespace llarp
{ {
(void)whoasked; (void)whoasked;
tx.emplace(askpeer, std::unique_ptr< TX< K, V > >(t)); tx.emplace(askpeer, std::unique_ptr< TX< K, V > >(t));
auto count = waiting.count(k);
waiting.insert(std::make_pair(k, askpeer)); waiting.insert(std::make_pair(k, askpeer));
auto itr = timeouts.find(k); auto itr = timeouts.find(k);
@ -305,7 +314,8 @@ namespace llarp
timeouts.insert( timeouts.insert(
std::make_pair(k, time_now_ms() + requestTimeoutMS)); std::make_pair(k, time_now_ms() + requestTimeoutMS));
} }
t->Start(askpeer); if(count == 0)
t->Start(askpeer);
} }
/// mark tx as not fond /// mark tx as not fond
@ -316,12 +326,11 @@ namespace llarp
auto txitr = tx.find(from); auto txitr = tx.find(from);
if(txitr == tx.end()) if(txitr == tx.end())
return; return;
if(next)
{ // ask for next peer
// ask for next peer if(txitr->second->AskNextPeer(from.node, next))
if(txitr->second->AskNextPeer(from.node, next)) sendReply = false;
sendReply = false;
}
llarp::LogWarn("Target key ", txitr->second->target); llarp::LogWarn("Target key ", txitr->second->target);
Inform(from, txitr->second->target, {}, sendReply, sendReply); Inform(from, txitr->second->target, {}, sendReply, sendReply);
} }

@ -64,7 +64,7 @@ namespace llarp
return out; return out;
out << std::string("/"); out << std::string("/");
#if defined(ANDROID) || defined(RPI) #if defined(ANDROID) || defined(RPI)
snprintf(tmp, sizeof(tmp), "%lu", snprintf(tmp, sizeof(tmp), "%zu",
llarp::bits::count_array_bits(xi.netmask.s6_addr)); llarp::bits::count_array_bits(xi.netmask.s6_addr));
return out << tmp; return out << tmp;
#else #else

@ -137,8 +137,13 @@ namespace llarp
HandleDataMessage(const PathID_t&, ProtocolMessage* msg); HandleDataMessage(const PathID_t&, ProtocolMessage* msg);
virtual bool virtual bool
ProcessDataMessage(__attribute__((unused)) ProtocolMessage* msg) ProcessDataMessage(ProtocolMessage* msg)
{ {
#ifdef TESTNET
llarp::LogInfo("Got message from ", msg->sender.Addr());
#else
(void)msg;
#endif
return true; return true;
} }

@ -3,7 +3,7 @@
#include <cstdint> #include <cstdint>
using llarp_proto_version_t = std::uint8_t; using llarp_proto_version_t = std::uint8_t;
using llarp_time_t = std::uint64_t; using llarp_time_t = std::uint64_t;
using llarp_seconds_t = std::uint64_t; using llarp_seconds_t = std::uint64_t;
#endif #endif

File diff suppressed because it is too large Load Diff

@ -339,36 +339,6 @@ namespace llarp
return true; return true;
} }
void
DoNextRequest(const Key_t &nextPeer)
{
// iterate to next peer
parent->LookupIntroSetIterative(
target, whoasked.node, whoasked.txid, nextPeer,
std::bind(&ServiceAddressLookup::HandleNextRequestResult, this,
std::placeholders::_1));
}
void
HandleNextRequestResult(const std::vector< service::IntroSet > &results)
{
// merge results
std::set< service::IntroSet > found;
for(const auto &introset : valuesFound)
found.insert(introset);
for(const auto &introset : results)
found.insert(introset);
valuesFound.clear();
for(const auto &introset : found)
valuesFound.push_back(introset);
// send reply
SendReply();
}
bool bool
GetNextPeer(Key_t &next, const std::set< Key_t > &exclude) GetNextPeer(Key_t &next, const std::set< Key_t > &exclude)
{ {
@ -383,6 +353,17 @@ namespace llarp
new FindIntroMessage(peer.txid, target, R)); new FindIntroMessage(peer.txid, target, R));
} }
void
DoNextRequest(const Key_t &ask) override
{
if(R)
parent->LookupIntroSetRecursive(target, whoasked.node, whoasked.txid,
ask, R - 1);
else
parent->LookupIntroSetIterative(target, whoasked.node, whoasked.txid,
ask);
}
virtual void virtual void
SendReply() SendReply()
{ {
@ -408,7 +389,7 @@ namespace llarp
} }
void void
SendReply() SendReply() override
{ {
auto path = auto path =
parent->router->paths.GetByUpstream(parent->OurKey(), localPath); parent->router->paths.GetByUpstream(parent->OurKey(), localPath);

@ -242,8 +242,8 @@ extern "C"
// llarp::LogDebug("Advancing to pos ", std::to_string(*pos)); // llarp::LogDebug("Advancing to pos ", std::to_string(*pos));
moveable += (*pos); // advance to position moveable += (*pos); // advance to position
//hexDump(moveable, 12); // hexDump(moveable, 12);
//hexDumpAt(buffer, *pos, 12); // hexDumpAt(buffer, *pos, 12);
if(*moveable == '\xc0') if(*moveable == '\xc0')
{ {
@ -267,10 +267,10 @@ extern "C"
/* /*
uint32_t readAt32 = *pos; uint32_t readAt32 = *pos;
answer->name = getDNSstring(buffer, &readAt32); answer->name = getDNSstring(buffer, &readAt32);
llarp::LogInfo("Parsed string ", answer->name, " read ", std::to_string(readAt32)); llarp::LogInfo("Parsed string ", answer->name, " read ",
moveable += readAt32; (*pos) += readAt32; std::to_string(readAt32)); moveable += readAt32; (*pos) += readAt32;
*/ */
//moveable++; (*pos)++; // moveable++; (*pos)++;
} }
/* /*
hexDump(moveable, 10); hexDump(moveable, 10);
@ -330,12 +330,12 @@ extern "C"
// FIXME: move this out of here, this shouldn't be responsible for decode // FIXME: move this out of here, this shouldn't be responsible for decode
switch(answer->type) switch(answer->type)
{ {
case 2: // NS case 2: // NS
// don't really need to do anything here // don't really need to do anything here
moveable += answer->rdLen; moveable += answer->rdLen;
(*pos) += answer->rdLen; // advance the length (*pos) += answer->rdLen; // advance the length
break; break;
case 5: case 5:
moveable += answer->rdLen; moveable += answer->rdLen;
(*pos) += answer->rdLen; // advance the length (*pos) += answer->rdLen; // advance the length
break; break;
@ -371,10 +371,10 @@ extern "C"
{ {
std::string revname = getDNSstring(buffer, pos); std::string revname = getDNSstring(buffer, pos);
llarp::LogInfo("revDNSname: ", revname); llarp::LogInfo("revDNSname: ", revname);
//answer->rData = new uint8_t[answer->rdLen + 1]; // answer->rData = new uint8_t[answer->rdLen + 1];
answer->rData.resize(answer->rdLen); answer->rData.resize(answer->rdLen);
memcpy(answer->rData.data(), revname.c_str(), answer->rdLen); memcpy(answer->rData.data(), revname.c_str(), answer->rdLen);
//answer->rData = (uint8_t *)strdup(revname.c_str()); // safer? nope // answer->rData = (uint8_t *)strdup(revname.c_str()); // safer? nope
moveable += answer->rdLen; moveable += answer->rdLen;
//(*pos) += answer->rdLen; // advance the length //(*pos) += answer->rdLen; // advance the length
} }

@ -229,9 +229,10 @@ ReverseHandlerIter(struct llarp::service::Context::endpoint_iter *endpointCfg)
+ tokensCheck[2] + "." + tokensCheck[3]; + tokensCheck[2] + "." + tokensCheck[3];
llarp::LogDebug(searchIp, " vs ", checkIp); llarp::LogDebug(searchIp, " vs ", checkIp);
llarp::IPRange range = llarp::iprange_ipv4( llarp::IPRange range =
std::stoi(tokensCheck[0]), std::stoi(tokensCheck[1]), std::stoi(tokensCheck[2]), llarp::iprange_ipv4(std::stoi(tokensCheck[0]), std::stoi(tokensCheck[1]),
std::stoi(tokensCheck[3]), tunEndpoint->tunif.netmask); // create range std::stoi(tokensCheck[2]), std::stoi(tokensCheck[3]),
tunEndpoint->tunif.netmask); // create range
// hack atm to work around limitations in ipaddr_ipv4_bits and llarp::IPRange // hack atm to work around limitations in ipaddr_ipv4_bits and llarp::IPRange
llarp::huint32_t searchIPv4_fixed = llarp::ipaddr_ipv4_bits( llarp::huint32_t searchIPv4_fixed = llarp::ipaddr_ipv4_bits(
std::stoi(tokensSearch[searchTokens - 6]), std::stoi(tokensSearch[searchTokens - 6]),

@ -224,13 +224,14 @@ generic_handle_dnsc_recvfrom(dnsc_answer_request *request,
for(uint32_t i = 0; i < hdr->qdCount; i++) for(uint32_t i = 0; i < hdr->qdCount; i++)
{ {
question = decode_question(castBufc, &pos); question = decode_question(castBufc, &pos);
//llarp::LogDebug("Read a question, now at ", std::to_string(pos)); // llarp::LogDebug("Read a question, now at ", std::to_string(pos));
// 1 dot: 1 byte for length + length // 1 dot: 1 byte for length + length
// 4 bytes for class/type // 4 bytes for class/type
// castBuf += question->name.length() + 1 + 4; // castBuf += question->name.length() + 1 + 4;
// castBuf += 2; // skip answer label // castBuf += 2; // skip answer label
} }
llarp::LogDebug("Question ", std::to_string(question->type), " ", question->name); llarp::LogDebug("Question ", std::to_string(question->type), " ",
question->name);
// FIXME: only handling one atm // FIXME: only handling one atm
std::vector< dns_msg_answer * > answers; std::vector< dns_msg_answer * > answers;
@ -296,15 +297,15 @@ generic_handle_dnsc_recvfrom(dnsc_answer_request *request,
// pos = 0; // reset pos // pos = 0; // reset pos
answer = decode_answer(castBufc, &pos); answer = decode_answer(castBufc, &pos);
// answers.push_back(answer); // answers.push_back(answer);
llarp::LogDebug("Read an authority for ", llarp::LogDebug("Read an authority for ", request->question.name, " at ",
request->question.name, " at ", std::to_string(pos)); std::to_string(pos));
// castBuf += answer->name.length() + 4 + 4 + 4 + answer->rdLen; // castBuf += answer->name.length() + 4 + 4 + 4 + answer->rdLen;
if((ssize_t)pos > sz) if((ssize_t)pos > sz)
{ {
llarp::LogWarn("Would read past end of dns packet. for ", llarp::LogWarn("Would read past end of dns packet. for ",
request->question.name); request->question.name);
break; break;
} }
} }
/* /*
@ -393,14 +394,14 @@ generic_handle_dnsc_recvfrom(dnsc_answer_request *request,
int ip = 0; int ip = 0;
// if no answer, just bail now // if no answer, just bail now
if (!answer) if(!answer)
{ {
request->found = false; request->found = false;
request->resolved(request); request->resolved(request);
return; return;
} }
/* search for and print IPv4 addresses */ /* search for and print IPv4 addresses */
// if(dnsQuery->reqType == 0x01) // if(dnsQuery->reqType == 0x01)
/* /*
@ -603,7 +604,7 @@ llarp_handle_dnsc_recvfrom(struct llarp_udp_io *const udp,
llarp::LogDebug("Header got client responses for id: ", hdr->id); llarp::LogDebug("Header got client responses for id: ", hdr->id);
// if we sent this out, then there's an id // if we sent this out, then there's an id
struct dns_tracker *tracker = (struct dns_tracker *)udp->user; struct dns_tracker *tracker = (struct dns_tracker *)udp->user;
struct dnsc_answer_request *request = tracker->client_request[hdr->id].get(); struct dnsc_answer_request *request = tracker->client_request[hdr->id].get();
// sometimes we'll get double responses // sometimes we'll get double responses
@ -694,7 +695,7 @@ void
llarp_host_resolved(dnsc_answer_request *const request) llarp_host_resolved(dnsc_answer_request *const request)
{ {
dns_tracker *tracker = (dns_tracker *)request->context->tracker; dns_tracker *tracker = (dns_tracker *)request->context->tracker;
auto val = std::find_if( auto val = std::find_if(
tracker->client_request.begin(), tracker->client_request.end(), tracker->client_request.begin(), tracker->client_request.end(),
[request]( [request](
std::pair< const uint32_t, std::unique_ptr< dnsc_answer_request > > std::pair< const uint32_t, std::unique_ptr< dnsc_answer_request > >
@ -735,7 +736,7 @@ llarp_dnsc_init(struct dnsc_context *const dnsc,
llarp::LogInfo("DNSc adding relay ", dnsc_sockaddr); llarp::LogInfo("DNSc adding relay ", dnsc_sockaddr);
dnsc->resolvers.push_back(dnsc_sockaddr); dnsc->resolvers.push_back(dnsc_sockaddr);
dnsc->tracker = &dns_udp_tracker; dnsc->tracker = &dns_udp_tracker;
dnsc->logic = logic; dnsc->logic = logic;
return true; return true;
} }

@ -152,12 +152,12 @@ _llarp_nt_getadaptersaddresses(struct llarp_nt_ifaddrs_t** ifap)
fprintf(stderr, "IP_ADAPTER_ADDRESSES buffer length %lu bytes.\n", dwSize); fprintf(stderr, "IP_ADAPTER_ADDRESSES buffer length %lu bytes.\n", dwSize);
#endif #endif
pAdapterAddresses = (IP_ADAPTER_ADDRESSES*)_llarp_nt_heap_alloc(dwSize); pAdapterAddresses = (IP_ADAPTER_ADDRESSES*)_llarp_nt_heap_alloc(dwSize);
dwRet = GetAdaptersAddresses( dwRet = GetAdaptersAddresses(AF_UNSPEC,
AF_UNSPEC, GAA_FLAG_INCLUDE_PREFIX | GAA_FLAG_SKIP_ANYCAST
GAA_FLAG_INCLUDE_PREFIX | GAA_FLAG_SKIP_ANYCAST | GAA_FLAG_SKIP_DNS_SERVER
| GAA_FLAG_SKIP_DNS_SERVER | GAA_FLAG_SKIP_FRIENDLY_NAME | GAA_FLAG_SKIP_FRIENDLY_NAME
| GAA_FLAG_SKIP_MULTICAST, | GAA_FLAG_SKIP_MULTICAST,
nullptr, pAdapterAddresses, &dwSize); nullptr, pAdapterAddresses, &dwSize);
if(ERROR_BUFFER_OVERFLOW == dwRet) if(ERROR_BUFFER_OVERFLOW == dwRet)
{ {
_llarp_nt_heap_free(pAdapterAddresses); _llarp_nt_heap_free(pAdapterAddresses);
@ -447,10 +447,10 @@ _llarp_nt_getadaptersaddresses(struct llarp_nt_ifaddrs_t** ifap)
ift->_ifa.ifa_next = (struct llarp_nt_ifaddrs_t*)(ift + 1); ift->_ifa.ifa_next = (struct llarp_nt_ifaddrs_t*)(ift + 1);
ift = (struct _llarp_nt_ifaddrs_t*)(ift->_ifa.ifa_next); ift = (struct _llarp_nt_ifaddrs_t*)(ift->_ifa.ifa_next);
} }
else else
{ {
ift->_ifa.ifa_next = nullptr; ift->_ifa.ifa_next = nullptr;
} }
} }
} }
@ -464,7 +464,7 @@ static unsigned
_llarp_nt_getadaptersaddresses_nametoindex(const char* ifname) _llarp_nt_getadaptersaddresses_nametoindex(const char* ifname)
{ {
ULONG ifIndex; ULONG ifIndex;
DWORD dwSize = 4096, dwRet; DWORD dwSize = 4096, dwRet;
IP_ADAPTER_ADDRESSES *pAdapterAddresses = nullptr, *adapter; IP_ADAPTER_ADDRESSES *pAdapterAddresses = nullptr, *adapter;
char szAdapterName[256]; char szAdapterName[256];
@ -487,7 +487,7 @@ _llarp_nt_getadaptersaddresses_nametoindex(const char* ifname)
for(unsigned i = 3; i; i--) for(unsigned i = 3; i; i--)
{ {
pAdapterAddresses = (IP_ADAPTER_ADDRESSES*)_llarp_nt_heap_alloc(dwSize); pAdapterAddresses = (IP_ADAPTER_ADDRESSES*)_llarp_nt_heap_alloc(dwSize);
dwRet = GetAdaptersAddresses( dwRet = GetAdaptersAddresses(
AF_UNSPEC, AF_UNSPEC,
GAA_FLAG_SKIP_ANYCAST | GAA_FLAG_SKIP_DNS_SERVER GAA_FLAG_SKIP_ANYCAST | GAA_FLAG_SKIP_DNS_SERVER
| GAA_FLAG_SKIP_FRIENDLY_NAME | GAA_FLAG_SKIP_MULTICAST, | GAA_FLAG_SKIP_FRIENDLY_NAME | GAA_FLAG_SKIP_MULTICAST,
@ -524,7 +524,8 @@ _llarp_nt_getadaptersaddresses_nametoindex(const char* ifname)
{ {
if(0 == strcmp(szAdapterName, adapter->AdapterName)) if(0 == strcmp(szAdapterName, adapter->AdapterName))
{ {
//ifIndex = AF_INET6 == iffamily ? adapter->Ipv6IfIndex : adapter->IfIndex; // ifIndex = AF_INET6 == iffamily ? adapter->Ipv6IfIndex :
// adapter->IfIndex;
_llarp_nt_heap_free(pAdapterAddresses); _llarp_nt_heap_free(pAdapterAddresses);
return ifIndex; return ifIndex;
} }

@ -70,7 +70,7 @@ namespace llarp
public: public:
static constexpr size_t Alignment = 64; static constexpr size_t Alignment = 64;
using AtomicIndex = std::atomic<std::uint32_t>; using AtomicIndex = std::atomic< std::uint32_t >;
private: private:
AtomicIndex m_pushIndex; // Index in the buffer that the next AtomicIndex m_pushIndex; // Index in the buffer that the next
@ -90,7 +90,7 @@ namespace llarp
const uint32_t m_maxCombinedIndex; // Maximum combined value of index and const uint32_t m_maxCombinedIndex; // Maximum combined value of index and
// generation for this object. // generation for this object.
std::atomic<std::uint32_t>* m_states; // Array of index states. std::atomic< std::uint32_t >* m_states; // Array of index states.
AtomicIndex& AtomicIndex&
pushIndex(); pushIndex();

@ -163,13 +163,7 @@ llarp_router::PersistSessionUntil(const llarp::RouterID &remote,
llarp_time_t until) llarp_time_t until)
{ {
llarp::LogDebug("persist session to ", remote, " until ", until); llarp::LogDebug("persist session to ", remote, " until ", until);
if(m_PersistingSessions.find(remote) == m_PersistingSessions.end()) m_PersistingSessions[remote] = std::max(until, m_PersistingSessions[remote]);
m_PersistingSessions[remote] = until;
else
{
if(m_PersistingSessions[remote] < until)
m_PersistingSessions[remote] = until;
}
} }
constexpr size_t MaxPendingSendQueueSize = 8; constexpr size_t MaxPendingSendQueueSize = 8;
@ -369,7 +363,7 @@ llarp_router::on_verify_client_rc(llarp_async_verify_rc *job)
llarp::PubKey pk(job->rc.pubkey); llarp::PubKey pk(job->rc.pubkey);
router->FlushOutboundFor(pk, router->GetLinkWithSessionByPubkey(pk)); router->FlushOutboundFor(pk, router->GetLinkWithSessionByPubkey(pk));
delete ctx; delete ctx;
delete job; router->pendingVerifyRC.erase(pk);
} }
void void
@ -387,8 +381,9 @@ llarp_router::on_verify_server_rc(llarp_async_verify_rc *job)
ctx->establish_job->Failed(); ctx->establish_job->Failed();
} }
delete ctx; delete ctx;
delete job;
router->DiscardOutboundFor(pk); router->DiscardOutboundFor(pk);
router->pendingVerifyRC.erase(pk);
return; return;
} }
// we're valid, which means it's already been committed to the nodedb // we're valid, which means it's already been committed to the nodedb
@ -418,7 +413,7 @@ llarp_router::on_verify_server_rc(llarp_async_verify_rc *job)
else else
router->FlushOutboundFor(pk, router->GetLinkWithSessionByPubkey(pk)); router->FlushOutboundFor(pk, router->GetLinkWithSessionByPubkey(pk));
delete ctx; delete ctx;
delete job; router->pendingVerifyRC.erase(pk);
} }
void void
@ -616,19 +611,21 @@ llarp_router::GetLinkWithSessionByPubkey(const llarp::RouterID &pubkey)
} }
void void
llarp_router::FlushOutboundFor(const llarp::RouterID remote, llarp_router::FlushOutboundFor(llarp::RouterID remote,
llarp::ILinkLayer *chosen) llarp::ILinkLayer *chosen)
{ {
llarp::LogDebug("Flush outbound for ", remote); llarp::LogDebug("Flush outbound for ", remote);
pendingEstablishJobs.erase(remote);
auto itr = outboundMessageQueue.find(remote); auto itr = outboundMessageQueue.find(remote);
if(itr == outboundMessageQueue.end()) if(itr == outboundMessageQueue.end())
{ {
pendingEstablishJobs.erase(remote);
return; return;
} }
if(!chosen) if(!chosen)
{ {
DiscardOutboundFor(remote); DiscardOutboundFor(remote);
pendingEstablishJobs.erase(remote);
return; return;
} }
while(itr->second.size()) while(itr->second.size())
@ -640,6 +637,7 @@ llarp_router::FlushOutboundFor(const llarp::RouterID remote,
itr->second.pop(); itr->second.pop();
} }
pendingEstablishJobs.erase(remote);
} }
void void
@ -667,7 +665,18 @@ void
llarp_router::async_verify_RC(const llarp::RouterContact &rc, llarp_router::async_verify_RC(const llarp::RouterContact &rc,
llarp::ILinkLayer *link) llarp::ILinkLayer *link)
{ {
llarp_async_verify_rc *job = new llarp_async_verify_rc(); if(pendingVerifyRC.count(rc.pubkey))
return;
if(rc.IsPublicRouter() && whitelistRouters)
{
if(lokinetRouters.find(rc.pubkey) == lokinetRouters.end())
{
llarp::LogInfo(rc.pubkey, " is NOT a valid service node, rejecting");
link->CloseSessionTo(rc.pubkey);
return;
}
}
llarp_async_verify_rc *job = &pendingVerifyRC[rc.pubkey];
llarp::async_verify_context *ctx = new llarp::async_verify_context(); llarp::async_verify_context *ctx = new llarp::async_verify_context();
ctx->router = this; ctx->router = this;
ctx->establish_job = nullptr; ctx->establish_job = nullptr;
@ -690,17 +699,7 @@ llarp_router::async_verify_RC(const llarp::RouterContact &rc,
job->hook = &llarp_router::on_verify_server_rc; job->hook = &llarp_router::on_verify_server_rc;
else else
job->hook = &llarp_router::on_verify_client_rc; job->hook = &llarp_router::on_verify_client_rc;
if(rc.IsPublicRouter() && whitelistRouters)
{
if(lokinetRouters.find(rc.pubkey) == lokinetRouters.end())
{
llarp::LogInfo(rc.pubkey, " is NOT a valid service node, rejecting");
link->CloseSessionTo(rc.pubkey);
job->valid = false;
job->hook(job);
return;
}
}
llarp_nodedb_async_verify(job); llarp_nodedb_async_verify(job);
} }

@ -153,6 +153,11 @@ struct llarp_router
llarp::RouterID::Hash > llarp::RouterID::Hash >
pendingEstablishJobs; pendingEstablishJobs;
// pending RCs to be verified by pubkey
std::unordered_map< llarp::RouterID, llarp_async_verify_rc,
llarp::RouterID::Hash >
pendingVerifyRC;
// sessions to persist -> timestamp to end persist at // sessions to persist -> timestamp to end persist at
std::unordered_map< llarp::RouterID, llarp_time_t, llarp::RouterID::Hash > std::unordered_map< llarp::RouterID, llarp_time_t, llarp::RouterID::Hash >
m_PersistingSessions; m_PersistingSessions;
@ -254,8 +259,7 @@ struct llarp_router
/// manually flush outbound message queue for just 1 router /// manually flush outbound message queue for just 1 router
void void
FlushOutboundFor(const llarp::RouterID remote, FlushOutboundFor(llarp::RouterID remote, llarp::ILinkLayer *chosen = nullptr);
llarp::ILinkLayer *chosen = nullptr);
/// manually discard all pending messages to remote router /// manually discard all pending messages to remote router
void void

@ -94,7 +94,7 @@ namespace llarp
llarp::LogWarn("could not publish descriptors for endpoint ", Name(), llarp::LogWarn("could not publish descriptors for endpoint ", Name(),
" because we couldn't get enough valid introductions"); " because we couldn't get enough valid introductions");
if(ShouldBuildMore(now) || forceRebuild) if(ShouldBuildMore(now) || forceRebuild)
ManualRebuild(1, llarp::path::ePathRoleInboundHS); ManualRebuild(1);
return; return;
} }
m_IntroSet.I.clear(); m_IntroSet.I.clear();
@ -197,7 +197,7 @@ namespace llarp
continue; continue;
byte_t tmp[1024] = {0}; byte_t tmp[1024] = {0};
auto buf = StackBuffer< decltype(tmp) >(tmp); auto buf = StackBuffer< decltype(tmp) >(tmp);
if(!SendToOrQueue(introset.A.Addr(), buf, eProtocolText)) if(!SendToOrQueue(introset.A.Addr().data(), buf, eProtocolText))
{ {
llarp::LogWarn(Name(), " failed to send/queue data to ", llarp::LogWarn(Name(), " failed to send/queue data to ",
introset.A.Addr(), " for tag ", tag.ToString()); introset.A.Addr(), " for tag ", tag.ToString());
@ -1113,9 +1113,12 @@ namespace llarp
} }
} }
// no converstation // no converstation
EnsurePathToService(remote, [](Address, OutboundContext*) {}, 5000, return EnsurePathToService(remote,
false); [](Address, OutboundContext* c) {
return false; if(c)
c->UpdateIntroSet(true);
},
5000, false);
} }
bool bool
@ -1159,7 +1162,7 @@ namespace llarp
} }
return false; return false;
} }
Build(hops, llarp::path::ePathRoleOutboundHS); Build(hops);
return true; return true;
} }
@ -1529,8 +1532,7 @@ namespace llarp
} }
} }
(void)roles; (void)roles;
return path::Builder::SelectHop(db, prev, cur, hop, return path::Builder::SelectHop(db, prev, cur, hop, roles);
llarp::path::ePathRoleOutboundHS);
} }
uint64_t uint64_t
@ -1547,8 +1549,7 @@ namespace llarp
{ {
if(markedBad) if(markedBad)
return false; return false;
bool should = path::Builder::ShouldBuildMoreForRoles( bool should = path::Builder::ShouldBuildMore(now);
now, llarp::path::ePathRoleOutboundHS);
// determinte newest intro // determinte newest intro
Introduction intro; Introduction intro;
if(!GetNewestIntro(intro)) if(!GetNewestIntro(intro))

Loading…
Cancel
Save