diff --git a/CMakeLists.txt b/CMakeLists.txt index f4c5a7de4..ff51850cc 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -63,6 +63,10 @@ else() set(WITH_STATIC ON) endif() +if(TESTNET) + add_definitions(-DTESTNET=1) +endif() + add_cflags("-Wall") add_cxxflags("-Wall") @@ -318,7 +322,6 @@ set(LIB_SRC ${UTP_SRC} ${NTRU_SRC} llarp/address_info.cpp - llarp/arpc.cpp llarp/bencode.cpp llarp/buffer.cpp llarp/config.cpp @@ -348,6 +351,7 @@ set(LIB_SRC llarp/relay_up_down.cpp llarp/router_contact.cpp llarp/router.cpp + llarp/rpc.cpp llarp/service.cpp llarp/transit_hop.cpp llarp/testnet.c @@ -439,27 +443,42 @@ include_directories(${sodium_INCLUDE_DIR}) set(RC_EXE rcutil) set(DNS_EXE dns) +set(ABYSS ${CMAKE_SOURCE_DIR}/libabyss) + +set(ABYSS_LIB abyss) + +include_directories(${ABYSS}/include) + +set(ABYSS_SRC + ${ABYSS}/src/http.cpp + ${ABYSS}/src/client.cpp + ${ABYSS}/src/server.cpp + ${ABYSS}/src/lib.cpp) + +add_library(${ABYSS_LIB} ${ABYSS_SRC}) if(SHADOW) -add_shadow_plugin(shadow-plugin-${SHARED_LIB} ${EXE_SRC} ${LIB_SRC} ${LIB_PLATFORM_SRC} ${CPP_BACKPORT_SRC}) +add_shadow_plugin(shadow-plugin-${SHARED_LIB} ${EXE_SRC} ${LIB_SRC} ${LIB_PLATFORM_SRC} ${CPP_BACKPORT_SRC} ${ABYSS_SRC}) target_link_libraries(shadow-plugin-${SHARED_LIB} ${LIBS}) install(TARGETS shadow-plugin-${SHARED_LIB} DESTINATION plugins) else() add_executable(${RC_EXE} ${RC_SRC}) add_executable(${EXE} ${EXE_SRC}) add_executable(${CLIENT_EXE} ${CLIENT_SRC}) + add_executable(${DNS_EXE} ${DNS_SRC}) add_subdirectory(${GTEST_DIR}) include_directories(${GTEST_DIR}/include ${GTEST_DIR}) add_executable(${TEST_EXE} ${TEST_SRC}) + if(WITH_STATIC) add_library(${STATIC_LIB} STATIC ${LIB_SRC}) if(NOT HAVE_CXX17_FILESYSTEM) add_library(${BACKPORT_LIB} STATIC ${CPP_BACKPORT_SRC}) endif(NOT HAVE_CXX17_FILESYSTEM) add_library(${PLATFORM_LIB} STATIC ${LIB_PLATFORM_SRC}) - target_link_libraries(${PLATFORM_LIB} ${THREAD_LIB}) + target_link_libraries(${PLATFORM_LIB} ${THREAD_LIB} ${ABYSS_LIB}) if(${CMAKE_SYSTEM_NAME} MATCHES "Linux") target_link_libraries(${PLATFORM_LIB} -lcap) endif() diff --git a/Makefile b/Makefile index 3f22def8f..c47431766 100644 --- a/Makefile +++ b/Makefile @@ -91,7 +91,7 @@ testnet-clean: clean rm -rf $(TESTNET_ROOT) testnet-configure: testnet-clean - cmake -GNinja -DCMAKE_BUILD_TYPE=Debug -DCMAKE_C_COMPILER=$(CC) -DCMAKE_CXX_COMPILER=$(CXX) + cmake -GNinja -DCMAKE_BUILD_TYPE=Debug -DCMAKE_C_COMPILER=$(CC) -DCMAKE_CXX_COMPILER=$(CXX) -DTESTNET=1 testnet-build: testnet-configure ninja @@ -105,7 +105,7 @@ shared: shared-configure testnet: cp $(EXE) $(TESTNET_EXE) mkdir -p $(TESTNET_ROOT) - python3 contrib/testnet/genconf.py --bin=$(TESTNET_EXE) --svc=$(TESTNET_SERVERS) --clients=$(TESTNET_CLIENTS) --dir=$(TESTNET_ROOT) --out $(TESTNET_CONF) --connect=3 + python3 contrib/testnet/genconf.py --bin=$(TESTNET_EXE) --svc=$(TESTNET_SERVERS) --clients=$(TESTNET_CLIENTS) --dir=$(TESTNET_ROOT) --out $(TESTNET_CONF) --connect=4 LLARP_DEBUG=$(TESTNET_DEBUG) supervisord -n -d $(TESTNET_ROOT) -l $(TESTNET_LOG) -c $(TESTNET_CONF) test: debug diff --git a/include/llarp/arpc.hpp b/include/llarp/arpc.hpp deleted file mode 100644 index 842a48a5c..000000000 --- a/include/llarp/arpc.hpp +++ /dev/null @@ -1,64 +0,0 @@ -#ifndef LLARP_ARPC_HPP -#define LLARP_ARPC_HPP - -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include - -#ifndef _WIN32 -#include -#endif - -#include - -// forward declare -struct llarp_router; - -namespace llarp -{ - namespace arpc - { - // forward declare - struct BaseMessage; - - struct Server - { - llarp_tcp_acceptor m_acceptor; - - llarp_router* router; - Server(llarp_router* r); - - static void - OnAccept(llarp_tcp_acceptor* a, llarp_tcp_conn* conn); - - bool - Start(const std::string& bindaddr); - - const llarp_crypto* - Crypto() const; - - const byte_t* - SigningPublicKey() const - { - return llarp::seckey_topublic(SigningPrivateKey()); - } - - const byte_t* - SigningPrivateKey() const; - - bool - Sign(BaseMessage* msg) const; - }; - - } // namespace arpc -} // namespace llarp - -#endif diff --git a/include/llarp/dht/messages/findintro.hpp b/include/llarp/dht/messages/findintro.hpp index a696ceb56..a0b7fb697 100644 --- a/include/llarp/dht/messages/findintro.hpp +++ b/include/llarp/dht/messages/findintro.hpp @@ -21,8 +21,9 @@ namespace llarp relayed = relay; } - FindIntroMessage(const llarp::service::Tag& tag, uint64_t txid) - : IMessage({}), N(tag), T(txid) + FindIntroMessage(const llarp::service::Tag& tag, uint64_t txid, + uint64_t r = 3) + : IMessage({}), R(r), N(tag), T(txid) { S.Zero(); } diff --git a/include/llarp/ev.h b/include/llarp/ev.h index 935ffee2a..5708e66c5 100644 --- a/include/llarp/ev.h +++ b/include/llarp/ev.h @@ -94,6 +94,8 @@ struct llarp_tcp_conn void (*read)(struct llarp_tcp_conn *, const void *, size_t); /// handle close event (free-ing is handled by event loop) void (*closed)(struct llarp_tcp_conn *); + /// handle event loop tick + void (*tick)(struct llarp_tcp_conn *); }; /// queue async write a buffer in full @@ -120,7 +122,8 @@ struct llarp_tcp_acceptor /// return false if failed to bind /// return true on successs bool -llarp_tcp_serve(struct llarp_tcp_acceptor *t, const sockaddr *bindaddr); +llarp_tcp_serve(struct llarp_ev_loop *loop, struct llarp_tcp_acceptor *t, + const sockaddr *bindaddr); /// close and stop accepting connections void diff --git a/include/llarp/path.hpp b/include/llarp/path.hpp index f8d7933a6..6c87d1cec 100644 --- a/include/llarp/path.hpp +++ b/include/llarp/path.hpp @@ -404,7 +404,7 @@ namespace llarp HandleRelayCommit(const LR_CommitMessage* msg); void - PutTransitHop(TransitHop* hop); + PutTransitHop(std::shared_ptr< TransitHop > hop); IHopHandler* GetByUpstream(const RouterID& id, const PathID_t& path); @@ -443,7 +443,8 @@ namespace llarp void RemovePathSet(PathSet* set); - typedef std::multimap< PathID_t, TransitHop* > TransitHopsMap_t; + typedef std::multimap< PathID_t, std::shared_ptr< TransitHop > > + TransitHopsMap_t; typedef std::pair< util::Mutex, TransitHopsMap_t > SyncTransitMap_t; diff --git a/include/llarp/rpc.hpp b/include/llarp/rpc.hpp new file mode 100644 index 000000000..8db884c8f --- /dev/null +++ b/include/llarp/rpc.hpp @@ -0,0 +1,31 @@ +#ifndef LLARP_RPC_HPP +#define LLARP_RPC_HPP +#include +#include +#include + +// forward declare +struct llarp_router; + +namespace llarp +{ + namespace rpc + { + struct ServerImpl; + + struct Server + { + Server(llarp_router* r); + ~Server(); + + bool + Start(const std::string& bindaddr); + + private: + ServerImpl* m_Impl; + }; + + } // namespace rpc +} // namespace llarp + +#endif diff --git a/include/llarp/service/tag.hpp b/include/llarp/service/tag.hpp index 559f543a6..9c5a6d13c 100644 --- a/include/llarp/service/tag.hpp +++ b/include/llarp/service/tag.hpp @@ -21,11 +21,7 @@ namespace llarp Tag(const std::string& str) : Tag() { -#ifndef MIN -#define MIN(X, Y) (((X) < (Y)) ? (X) : (Y)) - memcpy(data(), str.c_str(), MIN(16UL, str.size())); -#undef MIN -#endif + memcpy(data(), str.c_str(), std::min(16UL, str.size())); } Tag& @@ -38,11 +34,7 @@ namespace llarp Tag& operator=(const std::string& str) { -#ifndef MIN -#define MIN(X, Y) (((X) < (Y)) ? (X) : (Y)) - memcpy(data(), str.data(), MIN(16UL, str.size())); -#undef MIN -#endif + memcpy(data(), str.data(), std::min(16UL, str.size())); return *this; } @@ -67,4 +59,4 @@ namespace llarp } // namespace service } // namespace llarp -#endif \ No newline at end of file +#endif diff --git a/libabyss/CMakeLists.txt b/libabyss/CMakeLists.txt new file mode 100644 index 000000000..f1d63b9aa --- /dev/null +++ b/libabyss/CMakeLists.txt @@ -0,0 +1,15 @@ + +set(ABYSS src) + +set(ABYSS_LIB abyss) + +include_directories(include) + +set(ABYSS_SRC + ${ABYSS}/http.cpp + ${ABYSS}/client.cpp + ${ABYSS}/server.cpp + ${ABYSS}/lib.cpp + ) + +add_library(${ABYSS_LIB} ${ABYSS_SRC}) diff --git a/libabyss/include/libabyss.h b/libabyss/include/libabyss.h new file mode 100644 index 000000000..b42bce632 --- /dev/null +++ b/libabyss/include/libabyss.h @@ -0,0 +1,13 @@ +#ifdnef __LIB_ABYSS_H__ +#define __LIB_ABYSS_H__ + +#include + +#ifdef __cplusplus +extern "C" +{ +#endif +#ifdef __cplusplus +} +#endif +#endif diff --git a/libabyss/include/libabyss.hpp b/libabyss/include/libabyss.hpp new file mode 100644 index 000000000..1327d5405 --- /dev/null +++ b/libabyss/include/libabyss.hpp @@ -0,0 +1,39 @@ +#ifndef __LIB_ABYSS_HPP__ +#define __LIB_ABYSS_HPP__ + +#include +#include +#include +#include +#include + +namespace abyss +{ + namespace http + { + // forward declare + struct ConnHandler; + + struct BaseReqHandler + { + BaseReqHandler(llarp_time_t req_timeout); + ~BaseReqHandler(); + + bool + ServeAsync(llarp_ev_loop* loop, llarp_logic* logic, + const sockaddr* bindaddr); + + private: + static void + OnAccept(struct llarp_tcp_acceptor*, struct llarp_tcp_conn*); + + llarp_ev_loop* m_loop; + llarp_logic* m_Logic; + llarp_tcp_acceptor m_acceptor; + std::vector< std::unique_ptr< ConnHandler > > m_Conns; + llarp_time_t m_ReqTimeout; + }; + } // namespace http +} // namespace abyss + +#endif diff --git a/libabyss/src/client.cpp b/libabyss/src/client.cpp new file mode 100644 index 000000000..e69de29bb diff --git a/libabyss/src/http.cpp b/libabyss/src/http.cpp new file mode 100644 index 000000000..e69de29bb diff --git a/libabyss/src/lib.c b/libabyss/src/lib.c new file mode 100644 index 000000000..e69de29bb diff --git a/libabyss/src/lib.cpp b/libabyss/src/lib.cpp new file mode 100644 index 000000000..e69de29bb diff --git a/libabyss/src/server.cpp b/libabyss/src/server.cpp new file mode 100644 index 000000000..4b23558d0 --- /dev/null +++ b/libabyss/src/server.cpp @@ -0,0 +1,56 @@ +#include +#include + +namespace abyss +{ + namespace http + { + struct ConnHandler + { + llarp_tcp_conn* _conn; + + llarp_time_t m_LastActive; + llarp_time_t m_ReadTimeout; + + ConnHandler(llarp_tcp_conn* c, llarp_time_t readtimeout) : _conn(c) + { + m_LastActive = llarp_time_now_ms(); + m_ReadTimeout = readtimeout; + } + + bool + ShouldClose(llarp_time_t now) const + { + return now - m_LastActive > m_ReadTimeout; + } + + void + Begin() + { + } + }; + + BaseReqHandler::BaseReqHandler(llarp_time_t reqtimeout) + : m_ReqTimeout(reqtimeout) + { + m_loop = nullptr; + m_Logic = nullptr; + m_acceptor.accepted = &BaseReqHandler::OnAccept; + m_acceptor.user = this; + } + + BaseReqHandler::~BaseReqHandler() + { + llarp_tcp_acceptor_close(&m_acceptor); + } + + void + BaseReqHandler::OnAccept(llarp_tcp_acceptor* acceptor, llarp_tcp_conn* conn) + { + BaseReqHandler* self = static_cast< BaseReqHandler* >(acceptor->user); + ConnHandler* handler = new ConnHandler(conn, self->m_ReqTimeout); + conn->user = handler; + self->m_Conns.emplace_back(handler); + } + } // namespace http +} // namespace abyss diff --git a/llarp/arpc.cpp b/llarp/arpc.cpp deleted file mode 100644 index 63b2557e8..000000000 --- a/llarp/arpc.cpp +++ /dev/null @@ -1,405 +0,0 @@ -#include - -namespace llarp -{ - namespace arpc - { - /// interface for request messages - struct IRequest - { - /// returns false if errmsg is set - /// returns true if retval is set - virtual bool - HandleRequest(Server* ctx, std::unique_ptr< BaseMessage >& retval, - std::string& errmsg) const = 0; - }; - - struct BaseMessage : public llarp::IBEncodeMessage, public IRequest - { - static constexpr size_t MaxIDSize = 128; - /// maximum size of a message - static constexpr size_t MaxSize = 1024 * 8; - - BaseMessage() - { - timestamp = llarp_time_now_ms(); - zkey.Zero(); - zsig.Zero(); - } - - std::string m_id; - llarp_time_t timestamp; - llarp::PubKey zkey; - llarp::Signature zsig; - - /// override me - virtual std::string - Method() const = 0; - - /// encode the entire message - bool - BEncode(llarp_buffer_t* buf) const - { - if(!bencode_start_dict(buf)) - return false; - if(!BEncodeWriteDictString("aRPC-method", Method(), buf)) - return false; - if(!BEncodeWriteDictString("id", m_id, buf)) - return false; - if(!BEncodeBody(buf)) - return false; - if(!zkey.IsZero()) - { - if(!BEncodeWriteDictEntry("z-key", zkey, buf)) - return false; - if(!BEncodeWriteDictEntry("z-sig", zsig, buf)) - return false; - } - return bencode_end(buf); - } - - bool - DecodeKey(llarp_buffer_t k, llarp_buffer_t* buf) - { - if(llarp_buffer_eq(k, "id")) - { - return DecodeID(buf); - } - if(llarp_buffer_eq(k, "params")) - { - return DecodeParams(buf); - } - return false; - } - - protected: - typedef bool (*ParamDecoder)(dict_reader*, llarp_buffer_t*); - - virtual ParamDecoder - GetParamDecoder() const = 0; - - bool - DecodeParams(llarp_buffer_t* buf) - { - dict_reader r; - r.user = this; - r.on_key = GetParamDecoder(); - return bencode_read_dict(buf, &r); - } - - bool - DecodeID(llarp_buffer_t* buf) - { - llarp_buffer_t strbuf; - if(!bencode_read_string(buf, &strbuf)) - return false; - if(strbuf.sz > MaxIDSize) // too big - return false; - m_id = std::string((char*)strbuf.base, strbuf.sz); - return true; - } - - /// encode body of message - virtual bool - BEncodeBody(llarp_buffer_t* buf) const = 0; - }; - - struct ConnHandler - { - ConnHandler(Server* s, llarp_tcp_conn* c) : parent(s), m_conn(c) - { - left = 0; - readingHeader = true; - } - - bool readingHeader; - Server* parent; - llarp_tcp_conn* m_conn; - AlignedBuffer< BaseMessage::MaxSize > buf; - uint16_t left; - - void - ParseMessage(); - - void - Close() - { - llarp_tcp_conn_close(m_conn); - } - - static void - OnClosed(llarp_tcp_conn* conn) - { - ConnHandler* self = static_cast< ConnHandler* >(conn->user); - delete self; - } - - static void - OnRead(llarp_tcp_conn* conn, const void* buf, size_t sz) - { - ConnHandler* self = static_cast< ConnHandler* >(conn->user); - const byte_t* ptr = (const byte_t*)buf; - do - { - if(self->readingHeader) - { - self->left = bufbe16toh(ptr); - sz -= 2; - ptr += 2; - self->readingHeader = false; - } - size_t dlt = std::min((size_t)self->left, sz); - memcpy(self->buf.data() + (self->buf.size() - self->left), ptr, dlt); - self->left -= dlt; - sz -= dlt; - if(self->left == 0) - { - self->ParseMessage(); - self->readingHeader = true; - } - } while(sz > 0); - } - }; - - /// base type for ping req/resp - struct Ping : public BaseMessage - { - Ping() : BaseMessage() - { - } - - uint64_t ping; - - std::string - Method() const - { - return "llarp.rpc.ping"; - } - - bool - BEncodeBody(llarp_buffer_t* buf) const - { - if(!bencode_write_bytestring(buf, "params", 6)) - return false; - if(!bencode_start_dict(buf)) - return false; - - if(!BEncodeWriteDictInt("ping", ping, buf)) - return false; - return bencode_end(buf); - } - - static bool - OnParamKey(dict_reader* r, llarp_buffer_t* k) - { - Ping* self = static_cast< Ping* >(r->user); - if(k && llarp_buffer_eq(*k, "ping")) - { - return bencode_read_integer(r->buffer, &self->ping); - } - else - return k == nullptr; - } - - virtual ParamDecoder - GetParamDecoder() const - { - return &OnParamKey; - } - }; - - struct PingResponse : public Ping - { - PingResponse(uint64_t p) : Ping() - { - ping = p; - } - - bool - HandleRequest(Server*, std::unique_ptr< BaseMessage >&, - std::string&) const - { - /// TODO: handle client response - llarp::LogInfo(Method(), "pong ", ping); - return false; - } - }; - - struct PingRequest : public Ping - { - bool - HandleRequest(Server* serv, std::unique_ptr< BaseMessage >& retval, - std::string& errmsg) const - { - PingResponse* resp = new PingResponse(ping); - if(!serv->Sign(resp)) - { - errmsg = "failed to sign response"; - return false; - } - retval.reset(resp); - return true; - } - }; - - struct MessageReader - { - dict_reader m_reader; - BaseMessage* msg = nullptr; - - MessageReader() - { - m_reader.user = this; - m_reader.on_key = &OnKey; - } - - static bool - OnKey(dict_reader* r, llarp_buffer_t* key) - { - static std::unordered_map< std::string, - const std::function< BaseMessage*(void) > > - msgConstructors = { - {"llarp.rpc.ping", - []() -> BaseMessage* { return new PingRequest(); }}, - }; - - MessageReader* self = static_cast< MessageReader* >(r->user); - if(self->msg == nullptr) - { - // first key - if(key == nullptr || !llarp_buffer_eq(*key, "aRPC-method")) - { - // bad value - return false; - } - llarp_buffer_t strbuf; - if(!bencode_read_string(r->buffer, &strbuf)) - return false; - std::string method = std::string((char*)strbuf.base, strbuf.sz); - auto itr = msgConstructors.find(method); - if(itr == msgConstructors.end()) - { - // no such method - return false; - } - else - self->msg = itr->second(); - return true; - } - else if(key) - return self->msg->DecodeKey(*key, r->buffer); - else - return true; - } - - bool - DecodeMessage(llarp_buffer_t* buf, - std::unique_ptr< BaseMessage >& request) - { - msg = nullptr; - if(!bencode_read_dict(buf, &m_reader)) - return false; - request.reset(msg); - return true; - } - }; - - Server::Server(llarp_router* r) - { - router = r; - m_acceptor.user = this; - m_acceptor.accepted = &OnAccept; - } - - bool - Server::Start(const std::string& bindaddr) - { - llarp::Addr addr; - sockaddr* saddr = nullptr; -#ifndef _WIN32 - sockaddr_un unaddr; - if(bindaddr.find("unix:") == 0) - { - unaddr.sun_family = AF_UNIX; - - strncpy(unaddr.sun_path, bindaddr.substr(5).c_str(), - sizeof(unaddr.sun_path)); - saddr = (sockaddr*)&unaddr; - } - else -#endif - { - // TODO: ipv6 - auto idx = bindaddr.find(':'); - std::string host = bindaddr.substr(0, idx); - uint16_t port = std::stoi(bindaddr.substr(idx + 1)); - addr = llarp::Addr(host, port); - saddr = (sockaddr*)addr; - } - return llarp_tcp_serve(&m_acceptor, saddr); - } - - void - Server::OnAccept(llarp_tcp_acceptor* a, llarp_tcp_conn* conn) - { - Server* self = static_cast< Server* >(a->user); - conn->user = new ConnHandler(self, conn); - conn->read = &ConnHandler::OnRead; - conn->closed = &ConnHandler::OnClosed; - } - - bool - Server::Sign(BaseMessage* msg) const - { - msg->zkey = SigningPublicKey(); - msg->zsig.Zero(); - llarp::Signature sig; - // - byte_t tmp[BaseMessage::MaxSize]; - auto buf = llarp::StackBuffer< decltype(tmp) >(tmp); - - if(!msg->BEncode(&buf)) - return false; - // rewind buffer - buf.sz = buf.cur - buf.base; - buf.cur = buf.base; - - if(!Crypto()->sign(sig, SigningPrivateKey(), buf)) - return false; - - msg->zsig = sig; - return true; - } - - void - ConnHandler::ParseMessage() - { - std::unique_ptr< BaseMessage > msg; - std::unique_ptr< BaseMessage > response; - std::string errmsg; - MessageReader r; - auto tmp = llarp::Buffer(buf); - if(!r.DecodeMessage(&tmp, msg)) - { - llarp::LogError("failed to decode message"); - Close(); - return; - } - - // handle request - if(!msg->HandleRequest(parent, response, errmsg)) - { - // TODO: send error reply - llarp::LogError("failed to handle api message: ", errmsg); - Close(); - return; - } - - if(!parent->Sign(response.get())) - { - llarp::LogError("failed to sign response"); - Close(); - } - } - } // namespace arpc -} // namespace llarp diff --git a/llarp/dht/context.cpp b/llarp/dht/context.cpp index 12aab82b6..60ea97868 100644 --- a/llarp/dht/context.cpp +++ b/llarp/dht/context.cpp @@ -132,12 +132,6 @@ namespace llarp ctx->ScheduleCleanupTimer(); } - void - Context::LookupTagForPath(const service::Tag &tag, uint64_t txid, - const llarp::PathID_t &path, const Key_t &askpeer) - { - } - std::set< service::IntroSet > Context::FindRandomIntroSetsWithTagExcluding( const service::Tag &tag, size_t max, @@ -567,7 +561,8 @@ namespace llarp void Start(const TXOwner &peer) { - parent->DHTSendTo(peer.node, new FindIntroMessage(target, peer.txid)); + parent->DHTSendTo(peer.node, + new FindIntroMessage(target, peer.txid, R)); } bool @@ -590,10 +585,10 @@ namespace llarp found.insert(remoteTag); } // collect our local values if we haven't hit a limit - if(found.size() < 8) + if(found.size() < 3) { for(const auto &localTag : - parent->FindRandomIntroSetsWithTagExcluding(target, 2, found)) + parent->FindRandomIntroSetsWithTagExcluding(target, 1, found)) { found.insert(localTag); } @@ -616,6 +611,53 @@ namespace llarp TXOwner asker(whoasked, whoaskedTX); TXOwner peer(askpeer, ++ids); pendingTagLookups.NewTX(peer, tag, new TagLookup(asker, tag, this, R)); + llarp::LogInfo("ask ", askpeer, " for ", tag, " on behalf of ", whoasked, + " R=", R); + } + + struct LocalTagLookup : public TagLookup + { + PathID_t localPath; + + LocalTagLookup(const PathID_t &path, uint64_t txid, + const service::Tag &target, Context *ctx) + : TagLookup(TXOwner{ctx->OurKey(), txid}, target, ctx, 3) + , localPath(path) + { + } + + void + SendReply() + { + auto path = + parent->router->paths.GetByUpstream(parent->OurKey(), localPath); + if(!path) + { + llarp::LogWarn( + "did not send reply for relayed dht request, no such local path " + "for pathid=", + localPath); + return; + } + routing::DHTMessage msg; + msg.M.emplace_back(new GotIntroMessage(valuesFound, whoasked.txid)); + if(!path->SendRoutingMessage(&msg, parent->router)) + { + llarp::LogWarn( + "failed to send routing message when informing result of dht " + "request, pathid=", + localPath); + } + } + }; + + void + Context::LookupTagForPath(const service::Tag &tag, uint64_t txid, + const llarp::PathID_t &path, const Key_t &askpeer) + { + TXOwner peer(askpeer, ++ids); + pendingTagLookups.NewTX(peer, tag, + new LocalTagLookup(path, txid, tag, this)); } bool diff --git a/llarp/dht/find_intro.cpp b/llarp/dht/find_intro.cpp index eb4b017c8..1df2e9061 100644 --- a/llarp/dht/find_intro.cpp +++ b/llarp/dht/find_intro.cpp @@ -182,13 +182,22 @@ namespace llarp replies.emplace_back(new GotIntroMessage(reply, T)); return true; } - else + else if(R < 5) { // tag lookup if(dht.nodes->GetRandomNodeExcluding(peer, exclude)) { dht.LookupTagRecursive(N, From, T, peer, R - 1); } + else + { + replies.emplace_back(new GotIntroMessage({}, T)); + } + } + else + { + // too big R value + replies.emplace_back(new GotIntroMessage({}, T)); } } } diff --git a/llarp/dht/got_intro.cpp b/llarp/dht/got_intro.cpp index 8e28881e1..57c15378d 100644 --- a/llarp/dht/got_intro.cpp +++ b/llarp/dht/got_intro.cpp @@ -41,14 +41,14 @@ namespace llarp auto tagLookup = dht.pendingTagLookups.GetPendingLookupFrom(owner); if(tagLookup) { - dht.pendingTagLookups.Inform(owner, tagLookup->target, I); + dht.pendingTagLookups.Found(owner, tagLookup->target, I); return true; } auto serviceLookup = dht.pendingIntrosetLookups.GetPendingLookupFrom(owner); if(serviceLookup) { - dht.pendingIntrosetLookups.Inform(owner, serviceLookup->target, I); + dht.pendingIntrosetLookups.Found(owner, serviceLookup->target, I); return true; } llarp::LogError("no pending TX for GIM from ", From, " txid=", T); diff --git a/llarp/ev.cpp b/llarp/ev.cpp index b947479d5..41c29926c 100644 --- a/llarp/ev.cpp +++ b/llarp/ev.cpp @@ -111,7 +111,6 @@ llarp_ev_add_tun(struct llarp_ev_loop *loop, struct llarp_tun_io *tun) tun->impl = dev; if(dev) { - loop->tun_listeners.push_back(tun); return loop->add_ev(dev, true); } return false; @@ -131,6 +130,12 @@ llarp_tcp_serve(struct llarp_tcp_acceptor *tcp, const struct sockaddr *bindaddr) return false; } +void +llarp_tcp_acceptor_close(struct llarp_tcp_acceptor *tcp) +{ + // TODO: implement me +} + void llarp_tcp_conn_close(struct llarp_tcp_conn *conn) { diff --git a/llarp/ev.hpp b/llarp/ev.hpp index c214caa66..35981d525 100644 --- a/llarp/ev.hpp +++ b/llarp/ev.hpp @@ -49,6 +49,9 @@ namespace llarp virtual int sendto(const sockaddr* dst, const void* data, size_t sz) = 0; + virtual void + tick(){}; + /// used for tun interface and tcp conn virtual bool do_write(void* data, size_t sz) @@ -191,23 +194,13 @@ struct llarp_ev_loop virtual ~llarp_ev_loop(){}; - std::vector< llarp_udp_io* > udp_listeners; - std::vector< llarp_tun_io* > tun_listeners; + std::vector< std::unique_ptr< llarp::ev_io > > handlers; void tick_listeners() { - for(auto& l : udp_listeners) - if(l->tick) - l->tick(l); - for(auto& l : tun_listeners) - { - if(l->tick) - l->tick(l); - if(l->before_write) - l->before_write(l); - static_cast< llarp::ev_io* >(l->impl)->flush_write(); - } + for(const auto& h : handlers) + h->tick(); } }; diff --git a/llarp/ev_epoll.hpp b/llarp/ev_epoll.hpp index c31d3910a..a20a5c6c4 100644 --- a/llarp/ev_epoll.hpp +++ b/llarp/ev_epoll.hpp @@ -16,6 +16,14 @@ namespace llarp { + struct tcp_serv : public ev_io + { + }; + + struct tcp_conn : public ev_io + { + }; + struct udp_listener : public ev_io { llarp_udp_io* udp; @@ -26,6 +34,13 @@ namespace llarp { } + virtual void + tick() + { + if(udp->tick) + udp->tick(udp); + } + virtual int read(void* buf, size_t sz) { @@ -84,6 +99,14 @@ namespace llarp return -1; } + virtual void + tick() + { + if(t->tick) + t->tick(t); + flush_write(); + } + void flush_write() { @@ -270,7 +293,14 @@ struct llarp_epoll_loop : public llarp_ev_loop bool close_ev(llarp::ev_io* ev) { - return epoll_ctl(epollfd, EPOLL_CTL_DEL, ev->fd, nullptr) != -1; + if(epoll_ctl(epollfd, EPOLL_CTL_DEL, ev->fd, nullptr) == -1) + return false; + // deallocate + std::remove_if(handlers.begin(), handlers.end(), + [ev](const std::unique_ptr< llarp::ev_io >& i) -> bool { + return i.get() == ev; + }); + return true; } llarp::ev_io* @@ -278,7 +308,10 @@ struct llarp_epoll_loop : public llarp_ev_loop { llarp::tun* t = new llarp::tun(tun); if(t->setup()) + { + handlers.emplace_back(t); return t; + } delete t; return nullptr; } @@ -289,9 +322,9 @@ struct llarp_epoll_loop : public llarp_ev_loop int fd = udp_bind(src); if(fd == -1) return nullptr; - llarp::udp_listener* listener = new llarp::udp_listener(fd, l); - l->impl = listener; - udp_listeners.push_back(l); + handlers.emplace_back(new llarp::udp_listener(fd, l)); + llarp::ev_io* listener = handlers.back().get(); + l->impl = listener; return listener; } @@ -321,9 +354,7 @@ struct llarp_epoll_loop : public llarp_ev_loop { close_ev(listener); l->impl = nullptr; - delete listener; - std::remove_if(udp_listeners.begin(), udp_listeners.end(), - [l](llarp_udp_io* i) -> bool { return i == l; }); + ret = true; } return ret; } diff --git a/llarp/net.cpp b/llarp/net.cpp index fb5e1b476..3698e1d00 100644 --- a/llarp/net.cpp +++ b/llarp/net.cpp @@ -985,10 +985,14 @@ namespace llarp bool IsBogon(const in6_addr& addr) { +#ifdef TESTNET + return false; +#else if(!ipv6_is_siit(addr)) return false; return IsIPv4Bogon(ipaddr_ipv4_bits(addr.s6_addr[12], addr.s6_addr[13], addr.s6_addr[14], addr.s6_addr[15])); +#endif } bool diff --git a/llarp/path.cpp b/llarp/path.cpp index 535257ebc..f82b4f714 100644 --- a/llarp/path.cpp +++ b/llarp/path.cpp @@ -143,9 +143,10 @@ namespace llarp bool PathContext::HasTransitHop(const TransitHopInfo& info) { - return MapHas(m_TransitPaths, info.txID, [info](TransitHop* hop) -> bool { - return info == hop->info; - }); + return MapHas(m_TransitPaths, info.txID, + [info](const std::shared_ptr< TransitHop >& hop) -> bool { + return info == hop->info; + }); } IHopHandler* @@ -163,20 +164,24 @@ namespace llarp return own; return MapGet(m_TransitPaths, id, - [remote](const TransitHop* hop) -> bool { + [remote](const std::shared_ptr< TransitHop >& hop) -> bool { return hop->info.upstream == remote; }, - [](TransitHop* h) -> IHopHandler* { return h; }); + [](const std::shared_ptr< TransitHop >& h) -> IHopHandler* { + return h.get(); + }); } IHopHandler* PathContext::GetByDownstream(const RouterID& remote, const PathID_t& id) { return MapGet(m_TransitPaths, id, - [remote](const TransitHop* hop) -> bool { + [remote](const std::shared_ptr< TransitHop >& hop) -> bool { return hop->info.downstream == remote; }, - [](TransitHop* h) -> IHopHandler* { return h; }); + [](const std::shared_ptr< TransitHop >& h) -> IHopHandler* { + return h.get(); + }); } PathSet* @@ -215,14 +220,14 @@ namespace llarp for(auto i = range.first; i != range.second; ++i) { if(i->second->info.upstream == us) - return i->second; + return i->second.get(); } } return nullptr; } void - PathContext::PutTransitHop(TransitHop* hop) + PathContext::PutTransitHop(std::shared_ptr< TransitHop > hop) { MapPut(m_TransitPaths, hop->info.txID, hop); MapPut(m_TransitPaths, hop->info.rxID, hop); @@ -235,23 +240,16 @@ namespace llarp auto now = llarp_time_now_ms(); auto& map = m_TransitPaths.second; auto itr = map.begin(); - std::set< TransitHop* > removePaths; while(itr != map.end()) { if(itr->second->Expired(now)) { - TransitHop* path = itr->second; - llarp::LogDebug("transit path expired ", path->info); - removePaths.insert(path); + itr = map.erase(itr); } - ++itr; - } - for(auto& p : removePaths) - { - map.erase(p->info.txID); - map.erase(p->info.rxID); - delete p; + else + ++itr; } + for(auto& builder : m_PathBuilders) { if(builder) @@ -298,7 +296,7 @@ namespace llarp for(auto i = range.first; i != range.second; ++i) { if(i->second->info.upstream == us) - return i->second; + return i->second.get(); } } return nullptr; diff --git a/llarp/relay_commit.cpp b/llarp/relay_commit.cpp index d2f490661..94b3f68eb 100644 --- a/llarp/relay_commit.cpp +++ b/llarp/relay_commit.cpp @@ -169,7 +169,7 @@ namespace llarp // decrypted record LR_CommitRecord record; // the actual hop - Hop* hop; + std::shared_ptr< Hop > hop; LRCMFrameDecrypt(Context* ctx, Decrypter* dec, const LR_CommitMessage* commit) @@ -196,6 +196,8 @@ namespace llarp self->hop->ExpireTime()); self->context->Router()->PersistSessionUntil(self->hop->info.upstream, self->hop->ExpireTime()); + // put hop + self->context->PutTransitHop(self->hop); // forward to next hop self->context->ForwardLRCM(self->hop->info.upstream, self->frames); self->hop = nullptr; @@ -210,6 +212,8 @@ namespace llarp // persist session to downstream until path expiration self->context->Router()->PersistSessionUntil(self->hop->info.downstream, self->hop->ExpireTime()); + // put hop + self->context->PutTransitHop(self->hop); // send path confirmation llarp::routing::PathConfirmMessage confirm(self->hop->lifetime); if(!self->hop->SendRoutingMessage(&confirm, self->context->Router())) @@ -279,8 +283,6 @@ namespace llarp // TODO: check if we really want to accept it self->hop->started = llarp_time_now_ms(); - llarp::LogDebug("Accepted ", self->hop->info); - self->context->PutTransitHop(self->hop); size_t sz = self->frames[0].size(); // shift diff --git a/llarp/router.cpp b/llarp/router.cpp index 9da1a57fd..8532c7a74 100644 --- a/llarp/router.cpp +++ b/llarp/router.cpp @@ -3,7 +3,7 @@ #include #include #include -#include +#include #include "buffer.hpp" #include "encode.hpp" @@ -655,7 +655,7 @@ llarp_router::Run() { rpcBindAddr = DefaultRPCBindAddr; } - rpcServer = std::make_unique< llarp::arpc::Server >(this); + rpcServer = std::make_unique< llarp::rpc::Server >(this); if(!rpcServer->Start(rpcBindAddr)) { llarp::LogError("Binding rpc server to ", rpcBindAddr, " failed"); @@ -1252,19 +1252,4 @@ namespace llarp } } } - - namespace arpc - { - const byte_t * - Server::SigningPrivateKey() const - { - return router->identity; - } - - const llarp_crypto * - Server::Crypto() const - { - return &router->crypto; - } - } // namespace arpc } // namespace llarp diff --git a/llarp/router.hpp b/llarp/router.hpp index 8b06aa9c8..26f7ae895 100644 --- a/llarp/router.hpp +++ b/llarp/router.hpp @@ -5,7 +5,7 @@ #include #include #include -#include +#include #include #include @@ -105,7 +105,7 @@ struct llarp_router std::string DefaultRPCBindAddr = "127.0.0.1:1190"; bool enableRPCServer = false; - std::unique_ptr< llarp::arpc::Server > rpcServer; + std::unique_ptr< llarp::rpc::Server > rpcServer; std::string rpcBindAddr = DefaultRPCBindAddr; std::unique_ptr< llarp::ILinkLayer > outboundLink; diff --git a/llarp/rpc.cpp b/llarp/rpc.cpp new file mode 100644 index 000000000..ed186c6ab --- /dev/null +++ b/llarp/rpc.cpp @@ -0,0 +1,40 @@ +#include +#include + +namespace llarp +{ + namespace rpc + { + struct ServerImpl : public ::abyss::http::BaseReqHandler + { + llarp_router* router; + + ServerImpl(llarp_router* r) + : ::abyss::http::BaseReqHandler(2000), router(r) + { + } + + bool + Start(const std::string& addr) + { + return false; + } + }; + + Server::Server(llarp_router* r) : m_Impl(new ServerImpl(r)) + { + } + + Server::~Server() + { + delete m_Impl; + } + + bool + Server::Start(const std::string& addr) + { + return m_Impl->Start(addr); + } + + } // namespace rpc +} // namespace llarp diff --git a/llarp/service/endpoint.cpp b/llarp/service/endpoint.cpp index 9558e459d..d5068d27d 100644 --- a/llarp/service/endpoint.cpp +++ b/llarp/service/endpoint.cpp @@ -186,7 +186,7 @@ namespace llarp } } } - +#ifdef TESTNET // prefetch tags for(const auto& tag : m_PrefetchTags) { @@ -201,12 +201,12 @@ namespace llarp { if(HasPendingPathToService(introset.A.Addr())) continue; - if(!EnsurePathToService(introset.A.Addr(), - [](Address addr, OutboundContext* ctx) {}, - 10000)) + byte_t tmp[1024] = {0}; + auto buf = StackBuffer< decltype(tmp) >(tmp); + if(!SendToOrQueue(introset.A.Addr(), buf, eProtocolText)) { - llarp::LogWarn("failed to ensure path to ", introset.A.Addr(), - " for tag ", tag.ToString()); + llarp::LogWarn(Name(), " failed to send/queue data to ", + introset.A.Addr(), " for tag ", tag.ToString()); } } itr->second.Expire(now); @@ -216,10 +216,16 @@ namespace llarp if(path) { auto job = new TagLookupJob(this, &itr->second); - job->SendRequestViaPath(path, Router()); + if(!job->SendRequestViaPath(path, Router())) + llarp::LogError(Name(), " failed to send tag lookup"); + } + else + { + llarp::LogError(Name(), " has no paths for tag lookup"); } } } +#endif // tick remote sessions {