diff --git a/CMakeLists.txt b/CMakeLists.txt index 54dfc8f79..fc1293642 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -241,6 +241,7 @@ set(LIB_SRC llarp/iwp/frame_header.cpp llarp/iwp/frame_state.cpp llarp/iwp/session.cpp + llarp/iwp/server.cpp llarp/iwp/transit_message.cpp llarp/iwp/xmit.cpp llarp/link/encoder.cpp diff --git a/daemon.ini b/daemon.ini index 978ba9439..b08452887 100644 --- a/daemon.ini +++ b/daemon.ini @@ -1,17 +1,12 @@ [router] -worker-threads=8 -net-threads=1 -contact-file=router.signed -ident-privkey=server-ident.key -#public-address=0.0.0.0 -#public-port=1090 +threads = 2 +net-threads = 1 +contact-file = /home/jeff/git/llarp/self.signed +transport-privkey = /home/jeff/git/llarp/transport.key +identity-privkey = /home/jeff/git/llarp/identity.key [netdb] -dir=/tmp/nodes - -[connect] -#i2p.rocks=i2p.rocks.signed.txt -#named-node1=/path/to/routercontact1.signed +dir = /home/jeff/git/llarp/netdb [bind] -#en0=1090 + diff --git a/doc/dht_v0.txt b/doc/dht_v0.txt index 391655db0..57d56deb2 100644 --- a/doc/dht_v0.txt +++ b/doc/dht_v0.txt @@ -85,7 +85,8 @@ find a router by long term RC.k public key { A: "R", - I: 0 or 1 if iterative lookup + E: 0 or 1 if exploritory lookup, + I: 0 or 1 if iterative lookup, K: "<32 byte public key of router>", T: transaction_id_uint64, V: 0 diff --git a/include/llarp/aligned.hpp b/include/llarp/aligned.hpp index 211ea3c26..af4d20abb 100644 --- a/include/llarp/aligned.hpp +++ b/include/llarp/aligned.hpp @@ -12,12 +12,16 @@ namespace llarp { /// aligned buffer, sz must be multiple of 8 bytes - template < size_t sz > + template < size_t sz, bool randomize = false > struct AlignedBuffer { static_assert(sz % 8 == 0, "aligned buffer size is not a multiple of 8"); - AlignedBuffer() = default; + AlignedBuffer() + { + if(randomize) + Randomize(); + } AlignedBuffer(const AlignedBuffer& other) { @@ -174,6 +178,7 @@ namespace llarp uint64_t l[sz / 8]; }; }; + } // namespace llarp #endif diff --git a/include/llarp/dtls.h b/include/llarp/dtls.h index 4daf85b69..2e482704f 100644 --- a/include/llarp/dtls.h +++ b/include/llarp/dtls.h @@ -1,27 +1,5 @@ #ifndef LLARP_DTLS_H_ #define LLARP_DTLS_H_ -#include - -/** - * dtls.h - * - * Datagram TLS functions - * https://en.wikipedia.org/wiki/Datagram_Transport_Layer_Security for more info - * on DTLS - */ - -/// DTLS configuration -struct llarp_dtls_args -{ - struct llarp_alloc* mem; - const char* keyfile; - const char* certfile; -}; - -/// allocator -void -dtls_link_init(struct llarp_link* link, struct llarp_dtls_args args, - struct llarp_msg_muxer* muxer); #endif diff --git a/include/llarp/iwp/server.hpp b/include/llarp/iwp/server.hpp index 66e79d954..46e7a2206 100644 --- a/include/llarp/iwp/server.hpp +++ b/include/llarp/iwp/server.hpp @@ -1,11 +1,9 @@ #pragma once - +#include #include -#include "llarp/iwp.h" #include "llarp/iwp/establish_job.hpp" #include "router.hpp" #include "session.hpp" -#include "str.hpp" #include #include @@ -36,6 +34,7 @@ struct llarp_link { return m_name; } + const char *m_name; typedef std::unordered_map< llarp::Addr, llarp_link_session *, @@ -60,452 +59,102 @@ struct llarp_link llarp::SecretKey seckey; - llarp_link(const llarp_iwp_args &args) - : router(args.router) - , crypto(args.crypto) - , logic(args.logic) - , worker(args.cryptoworker) - , m_name("IWP") - { - strncpy(keyfile, args.keyfile, sizeof(keyfile)); - iwp = llarp_async_iwp_new(crypto, logic, worker); - pumpingLogic.store(false); - } + llarp_link(const llarp_iwp_args &args); - ~llarp_link() - { - llarp_async_iwp_free(iwp); - } + ~llarp_link(); bool - has_intro_from(const llarp::Addr &from) - { - lock_t lock(m_PendingSessions_Mutex); - return m_PendingSessions.find(from) != m_PendingSessions.end(); - } + has_intro_from(const llarp::Addr &from); void - put_intro_from(llarp_link_session *s) - { - lock_t lock(m_PendingSessions_Mutex); - m_PendingSessions[s->addr] = s; - } + put_intro_from(llarp_link_session *s); void - remove_intro_from(const llarp::Addr &from) - { - lock_t lock(m_PendingSessions_Mutex); - m_PendingSessions.erase(from); - } + remove_intro_from(const llarp::Addr &from); // set that src address has identity pubkey void - MapAddr(const llarp::Addr &src, const llarp::PubKey &identity) - { - lock_t lock(m_Connected_Mutex); - m_Connected[identity] = src; - } + MapAddr(const llarp::Addr &src, const llarp::PubKey &identity); - static bool - has_session_to(llarp_link *serv, const byte_t *pubkey) - { - llarp::PubKey pk(pubkey); - lock_t lock(serv->m_Connected_Mutex); - return serv->m_Connected.find(pk) != serv->m_Connected.end(); - } + bool + has_session_to(const byte_t *pubkey); void - TickSessions() - { - auto now = llarp_time_now_ms(); - { - lock_t lock(m_PendingSessions_Mutex); - auto itr = m_PendingSessions.begin(); - while(itr != m_PendingSessions.end()) - { - if(itr->second->timedout(now)) - { - itr->second->done(); - delete itr->second; - itr = m_PendingSessions.erase(itr); - } - else - ++itr; - } - } - { - lock_t lock(m_sessions_Mutex); - auto itr = m_sessions.begin(); - while(itr != m_sessions.end()) - { - if(itr->second->Tick(now)) - { - itr->second->done(); - delete itr->second; - itr = m_sessions.erase(itr); - } - else - ++itr; - } - } - } + TickSessions(); - static bool - sendto(llarp_link *serv, const byte_t *pubkey, llarp_buffer_t buf) - { - llarp_link_session *link = nullptr; - { - lock_t lock(serv->m_Connected_Mutex); - auto itr = serv->m_Connected.find(pubkey); - if(itr != serv->m_Connected.end()) - { - lock_t innerlock(serv->m_sessions_Mutex); - auto inner_itr = serv->m_sessions.find(itr->second); - if(inner_itr != serv->m_sessions.end()) - { - link = inner_itr->second; - } - } - } - return link && link->sendto(buf); - } + bool + sendto(const byte_t *pubkey, llarp_buffer_t buf); void - UnmapAddr(const llarp::Addr &src) - { - lock_t lock(m_Connected_Mutex); - // std::unordered_map< llarp::pubkey, llarp::Addr, llarp::pubkeyhash > - auto itr = std::find_if( - m_Connected.begin(), m_Connected.end(), - [src](const std::pair< llarp::PubKey, llarp::Addr > &item) -> bool { - return src == item.second; - }); - if(itr == std::end(m_Connected)) - return; - - // tell router we are done with this session - router->SessionClosed(itr->first); - - m_Connected.erase(itr); - } + UnmapAddr(const llarp::Addr &src); llarp_link_session * - create_session(const llarp::Addr &src) - { - return new llarp_link_session(this, seckey, src); - } + create_session(const llarp::Addr &src); bool - has_session_to(const llarp::Addr &dst) - { - lock_t lock(m_sessions_Mutex); - return m_sessions.find(dst) != m_sessions.end(); - } + has_session_via(const llarp::Addr &dst); llarp_link_session * - find_session(const llarp::Addr &addr) - { - lock_t lock(m_sessions_Mutex); - auto itr = m_sessions.find(addr); - if(itr == m_sessions.end()) - return nullptr; - else - return itr->second; - } + find_session(const llarp::Addr &addr); void - put_session(const llarp::Addr &src, llarp_link_session *impl) - { - lock_t lock(m_sessions_Mutex); - m_sessions.insert(std::make_pair(src, impl)); - impl->our_router = &router->rc; - } + put_session(const llarp::Addr &src, llarp_link_session *impl); void - clear_sessions() - { - lock_t lock(m_sessions_Mutex); - auto itr = m_sessions.begin(); - while(itr != m_sessions.end()) - { - delete itr->second; - itr = m_sessions.erase(itr); - } - } + clear_sessions(); /// safe iterate sessions void - iterate_sessions(std::function< bool(llarp_link_session *) > visitor) - { - auto now = llarp_time_now_ms(); - std::list< llarp_link_session * > slist; - { - lock_t lock(m_sessions_Mutex); - auto itr = m_sessions.begin(); - while(itr != m_sessions.end()) - { - // if not timing out soon add to list to iterate on - if(!itr->second->timedout(now, 11500)) - slist.push_back(itr->second); - ++itr; - } - } - for(auto &s : slist) - if(!visitor(s)) - return; - } + iterate_sessions(std::function< bool(llarp_link_session *) > visitor); static void - handle_logic_pump(void *user) - { - llarp_link *self = static_cast< llarp_link * >(user); - auto now = llarp_time_now_ms(); - self->iterate_sessions([now](llarp_link_session *s) -> bool { - s->TickLogic(now); - return true; - }); - self->pumpingLogic = false; - } + handle_logic_pump(void *user); void - PumpLogic() - { - if(pumpingLogic) - return; - pumpingLogic = true; - llarp_logic_queue_job(logic, {this, &handle_logic_pump}); - } + PumpLogic(); void - RemoveSession(llarp_link_session *s) - { - lock_t lock(m_sessions_Mutex); - auto itr = m_sessions.find(s->addr); - if(itr != m_sessions.end()) - { - UnmapAddr(s->addr); - s->done(); - m_sessions.erase(itr); - delete s; - } - } + RemoveSession(llarp_link_session *s); uint8_t * - pubkey() - { - return llarp::seckey_topublic(seckey); - } + pubkey(); bool - ensure_privkey() - { - llarp::LogDebug("ensure transport private key at ", keyfile); - std::error_code ec; - if(!fs::exists(keyfile, ec)) - { - if(!keygen(keyfile)) - return false; - } - std::ifstream f(keyfile); - if(f.is_open()) - { - f.read((char *)seckey.data(), seckey.size()); - return true; - } - return false; - } + ensure_privkey(); bool - keygen(const char *fname) - { - crypto->encryption_keygen(seckey); - llarp::LogInfo("new transport key generated"); - std::ofstream f(fname); - if(f.is_open()) - { - f.write((char *)seckey.data(), seckey.size()); - return true; - } - return false; - } + keygen(const char *fname); static void - handle_cleanup_timer(void *l, uint64_t orig, uint64_t left) - { - if(left) - return; - llarp_link *link = static_cast< llarp_link * >(l); - link->timeout_job_id = 0; - link->TickSessions(); - link->issue_cleanup_timer(orig); - } + handle_cleanup_timer(void *l, uint64_t orig, uint64_t left); // this is called in net threadpool static void handle_recvfrom(struct llarp_udp_io *udp, const struct sockaddr *saddr, - const void *buf, ssize_t sz) - { - llarp_link *link = static_cast< llarp_link * >(udp->user); - - llarp_link_session *s = link->find_session(*saddr); - if(s == nullptr) - { - // new inbound session - s = link->create_session(*saddr); - } - s->recv(buf, sz); - } + const void *buf, ssize_t sz); void - cancel_timer() - { - if(timeout_job_id) - { - llarp_logic_cancel_call(logic, timeout_job_id); - } - timeout_job_id = 0; - } + cancel_timer(); void - issue_cleanup_timer(uint64_t timeout) - { - timeout_job_id = llarp_logic_call_later( - logic, {timeout, this, &llarp_link::handle_cleanup_timer}); - } + issue_cleanup_timer(uint64_t timeout); void - get_our_address(struct llarp_ai *addr) - { - addr->rank = 1; - strncpy(addr->dialect, "IWP", sizeof(addr->dialect)); - memcpy(addr->enc_key, pubkey(), 32); - memcpy(addr->ip.s6_addr, this->addr.addr6(), 16); - addr->port = this->addr.port(); - } + get_our_address(struct llarp_ai *addr); static void - after_recv(llarp_udp_io *udp) - { - llarp_link *self = static_cast< llarp_link * >(udp->user); - self->PumpLogic(); - } + after_recv(llarp_udp_io *udp); bool configure(struct llarp_ev_loop *netloop, const char *ifname, int af, - uint16_t port) - { - if(!ensure_privkey()) - { - llarp::LogError("failed to ensure private key"); - return false; - } - - llarp::LogDebug("configure link ifname=", ifname, " af=", af, - " port=", port); - // bind - sockaddr_in ip4addr; - sockaddr_in6 ip6addr; - sockaddr *addr = nullptr; - switch(af) - { - case AF_INET: - addr = (sockaddr *)&ip4addr; - llarp::Zero(addr, sizeof(ip4addr)); - break; - case AF_INET6: - addr = (sockaddr *)&ip6addr; - llarp::Zero(addr, sizeof(ip6addr)); - break; - // TODO: AF_PACKET - default: - llarp::LogError(__FILE__, "unsupported address family", af); - return false; - } - - addr->sa_family = af; - - if(!llarp::StrEq(ifname, "*")) - { - if(!llarp_getifaddr(ifname, af, addr)) - { - llarp::LogError("failed to get address of network interface ", ifname); - return false; - } - } - else - m_name = "OWP"; // outboundLink_name; - - switch(af) - { - case AF_INET: - ip4addr.sin_port = htons(port); - break; - case AF_INET6: - ip6addr.sin6_port = htons(port); - break; - // TODO: AF_PACKET - default: - return false; - } - - this->addr = *addr; - this->netloop = netloop; - udp.recvfrom = &llarp_link::handle_recvfrom; - udp.user = this; - udp.tick = &llarp_link::after_recv; - llarp::LogDebug("bind IWP link to ", addr); - if(llarp_ev_add_udp(netloop, &udp, addr) == -1) - { - llarp::LogError("failed to bind to ", addr); - return false; - } - return true; - } + uint16_t port); bool - start_link(struct llarp_logic *pLogic) - { - // give link implementations - // link->parent = l; - timeout_job_id = 0; - this->logic = pLogic; - // start cleanup timer - issue_cleanup_timer(500); - return true; - } + start_link(struct llarp_logic *pLogic); bool - stop_link() - { - cancel_timer(); - llarp_ev_close_udp(&udp); - clear_sessions(); - return true; - } + stop_link(); bool - try_establish(struct llarp_link_establish_job *job) - { - llarp::Addr dst(job->ai); - llarp::LogDebug("establish session to ", dst); - llarp_link_session *s = find_session(dst); - if(s == nullptr) - { - s = create_session(dst); - put_session(dst, s); - } - else - return false; - s->establish_job = job; - s->frame.alive(); // mark it alive - s->introduce(job->ai.enc_key); - - return true; - } - - void - mark_session_active(llarp_link_session *s) - { - s->frame.alive(); - } + try_establish(struct llarp_link_establish_job *job); }; diff --git a/include/llarp/iwp/session.hpp b/include/llarp/iwp/session.hpp index 7f5cc292e..56245a878 100644 --- a/include/llarp/iwp/session.hpp +++ b/include/llarp/iwp/session.hpp @@ -61,19 +61,25 @@ struct llarp_link_session void done(); + void pump(); + void introduce(uint8_t *pub); void intro_ack(); + void on_intro_ack(const void *buf, size_t sz); + void on_intro(const void *buf, size_t sz); + void on_session_start(const void *buf, size_t sz); + void encrypt_frame_async_send(const void *buf, size_t sz); @@ -112,6 +118,7 @@ struct llarp_link_session /// cached timestamp for frame creation llarp_time_t now; + llarp_time_t createdAt = 0; llarp_time_t lastKeepalive = 0; uint32_t establish_job_id = 0; uint32_t frames = 0; @@ -128,14 +135,15 @@ struct llarp_link_session FrameCompareTime > decryptedFrames; - uint32_t pump_send_timer_id = 0; - uint32_t pump_recv_timer_id = 0; - llarp::Addr addr; iwp_async_intro intro; iwp_async_introack introack; iwp_async_session_start start; + /// timestamp last intro packet sent at + llarp_time_t lastIntroSentAt = 0; + uint32_t intro_resend_job_id = 0; + byte_t token[32]; byte_t workbuf[MAX_PAD + 128]; @@ -168,6 +176,9 @@ struct llarp_link_session static void handle_frame_decrypt(iwp_async_frame *f); + static void + handle_introack_timeout(void *user, uint64_t timeout, uint64_t left); + frame_state frame; }; @@ -176,4 +187,4 @@ struct llarp_link_session_iter void *user; struct llarp_link *link; bool (*visit)(struct llarp_link_session_iter *, struct llarp_link_session *); -}; \ No newline at end of file +}; diff --git a/llarp/config.cpp b/llarp/config.cpp index ea3aae25c..277dc3615 100644 --- a/llarp/config.cpp +++ b/llarp/config.cpp @@ -30,6 +30,7 @@ namespace llarp dns = find_section(top, "dns", section_t{}); iwp_links = find_section(top, "bind", section_t{}); services = find_section(top, "services", section_t{}); + dns = find_section(top, "dns", section_t{}); return true; } return false; @@ -65,12 +66,9 @@ llarp_config_iter(struct llarp_config *conf, struct llarp_config_iterator *iter) { iter->conf = conf; std::map< std::string, llarp::Config::section_t & > sections = { - {"network", conf->impl.network}, - {"connect", conf->impl.connect}, - {"bind", conf->impl.iwp_links}, - {"netdb", conf->impl.netdb}, - {"dns", conf->impl.dns}, - {"services", conf->impl.services}}; + {"network", conf->impl.network}, {"connect", conf->impl.connect}, + {"bind", conf->impl.iwp_links}, {"netdb", conf->impl.netdb}, + {"dns", conf->impl.dns}, {"services", conf->impl.services}}; for(const auto item : conf->impl.router) iter->visit(iter, "router", item.first.c_str(), item.second.c_str()); diff --git a/llarp/crypto_async.cpp b/llarp/crypto_async.cpp index 2f545cac9..83cd27fda 100644 --- a/llarp/crypto_async.cpp +++ b/llarp/crypto_async.cpp @@ -68,8 +68,8 @@ namespace iwp buf.sz = intro->sz - 32; crypto->hmac(intro->buf, buf, sharedkey); // inform result - intro->hook(intro); - // llarp_logic_queue_job(intro->iwp->logic, {intro, &inform_intro}); + // intro->hook(intro); + llarp_logic_queue_job(intro->iwp->logic, {intro, &inform_intro}); } void @@ -123,7 +123,7 @@ namespace iwp { iwp_async_introack *introack = static_cast< iwp_async_introack * >(user); auto crypto = introack->iwp->crypto; - // auto logic = introack->iwp->logic; + auto logic = introack->iwp->logic; llarp::ShortHash digest; llarp::SharedSecret sharedkey; @@ -160,8 +160,8 @@ namespace iwp // copy token memcpy(introack->token, token, 32); } - introack->hook(introack); - // llarp_logic_queue_job(logic, {introack, &inform_introack}); + // introack->hook(introack); + llarp_logic_queue_job(logic, {introack, &inform_introack}); } void @@ -189,9 +189,8 @@ namespace iwp buf.sz = introack->sz - 32; buf.cur = buf.base; crypto->hmac(introack->buf, buf, sharedkey); - introack->hook(introack); - // llarp_logic_queue_job(introack->iwp->logic, {introack, - // &inform_introack}); + // introack->hook(introack); + llarp_logic_queue_job(introack->iwp->logic, {introack, &inform_introack}); } void @@ -265,7 +264,6 @@ namespace iwp auto hmac = crypto->hmac; auto decrypt = crypto->xchacha20; - // auto logic = session->iwp->logic; auto b_sK = session->secretkey; auto a_K = session->remote_pubkey; auto N = session->nonce; diff --git a/llarp/iwp/server.cpp b/llarp/iwp/server.cpp new file mode 100644 index 000000000..1e00ce10c --- /dev/null +++ b/llarp/iwp/server.cpp @@ -0,0 +1,442 @@ +#include +#include "str.hpp" + +llarp_link::llarp_link(const llarp_iwp_args& args) + : router(args.router) + , crypto(args.crypto) + , logic(args.logic) + , worker(args.cryptoworker) + , m_name("IWP") +{ + strncpy(keyfile, args.keyfile, sizeof(keyfile)); + iwp = llarp_async_iwp_new(crypto, logic, worker); + pumpingLogic.store(false); +} + +llarp_link::~llarp_link() +{ + llarp_async_iwp_free(iwp); +} + +bool +llarp_link::has_intro_from(const llarp::Addr& from) +{ + lock_t lock(m_PendingSessions_Mutex); + return m_PendingSessions.find(from) != m_PendingSessions.end(); +} + +void +llarp_link::put_intro_from(llarp_link_session* s) +{ + lock_t lock(m_PendingSessions_Mutex); + m_PendingSessions[s->addr] = s; +} + +void +llarp_link::remove_intro_from(const llarp::Addr& from) +{ + lock_t lock(m_PendingSessions_Mutex); + m_PendingSessions.erase(from); +} + +void +llarp_link::MapAddr(const llarp::Addr& src, const llarp::PubKey& identity) +{ + lock_t lock(m_Connected_Mutex); + m_Connected[identity] = src; +} + +bool +llarp_link::has_session_to(const byte_t* pubkey) +{ + llarp::PubKey pk(pubkey); + lock_t lock(m_Connected_Mutex); + return m_Connected.find(pk) != m_Connected.end(); +} + +void +llarp_link::TickSessions() +{ + auto now = llarp_time_now_ms(); + { + lock_t lock(m_PendingSessions_Mutex); + auto itr = m_PendingSessions.begin(); + while(itr != m_PendingSessions.end()) + { + if(itr->second->timedout(now)) + { + itr->second->done(); + delete itr->second; + itr = m_PendingSessions.erase(itr); + } + else + ++itr; + } + } + { + lock_t lock(m_sessions_Mutex); + auto itr = m_sessions.begin(); + while(itr != m_sessions.end()) + { + if(itr->second->Tick(now)) + { + itr->second->done(); + delete itr->second; + itr = m_sessions.erase(itr); + } + else + ++itr; + } + } +} + +bool +llarp_link::sendto(const byte_t* pubkey, llarp_buffer_t buf) +{ + llarp_link_session* link = nullptr; + { + lock_t lock(m_Connected_Mutex); + auto itr = m_Connected.find(pubkey); + if(itr != m_Connected.end()) + { + lock_t innerlock(m_sessions_Mutex); + auto inner_itr = m_sessions.find(itr->second); + if(inner_itr != m_sessions.end()) + { + link = inner_itr->second; + } + } + } + return link && link->sendto(buf); +} + +void +llarp_link::UnmapAddr(const llarp::Addr& src) +{ + lock_t lock(m_Connected_Mutex); + // std::unordered_map< llarp::pubkey, llarp::Addr, llarp::pubkeyhash > + auto itr = std::find_if( + m_Connected.begin(), m_Connected.end(), + [src](const std::pair< llarp::PubKey, llarp::Addr >& item) -> bool { + return src == item.second; + }); + if(itr == std::end(m_Connected)) + return; + + // tell router we are done with this session + router->SessionClosed(itr->first); + + m_Connected.erase(itr); +} + +llarp_link_session* +llarp_link::create_session(const llarp::Addr& src) +{ + return new llarp_link_session(this, seckey, src); +} + +bool +llarp_link::has_session_via(const llarp::Addr& dst) +{ + lock_t lock(m_sessions_Mutex); + return m_sessions.find(dst) != m_sessions.end(); +} + +llarp_link_session* +llarp_link::find_session(const llarp::Addr& addr) +{ + lock_t lock(m_sessions_Mutex); + auto itr = m_sessions.find(addr); + if(itr == m_sessions.end()) + return nullptr; + else + return itr->second; +} + +void +llarp_link::put_session(const llarp::Addr& src, llarp_link_session* impl) +{ + lock_t lock(m_sessions_Mutex); + m_sessions.insert(std::make_pair(src, impl)); + impl->our_router = &router->rc; +} + +void +llarp_link::clear_sessions() +{ + lock_t lock(m_sessions_Mutex); + auto itr = m_sessions.begin(); + while(itr != m_sessions.end()) + { + delete itr->second; + itr = m_sessions.erase(itr); + } +} + +void +llarp_link::iterate_sessions(std::function< bool(llarp_link_session*) > visitor) +{ + auto now = llarp_time_now_ms(); + std::list< llarp_link_session* > slist; + { + lock_t lock(m_sessions_Mutex); + auto itr = m_sessions.begin(); + while(itr != m_sessions.end()) + { + // if not timing out soon add to list to iterate on + if(!itr->second->timedout(now, 11500)) + slist.push_back(itr->second); + ++itr; + } + } + for(auto& s : slist) + if(!visitor(s)) + return; +} + +void +llarp_link::handle_logic_pump(void* user) +{ + llarp_link* self = static_cast< llarp_link* >(user); + auto now = llarp_time_now_ms(); + self->iterate_sessions([now](llarp_link_session* s) -> bool { + s->TickLogic(now); + return true; + }); + self->pumpingLogic = false; +} + +void +llarp_link::PumpLogic() +{ + if(pumpingLogic) + return; + pumpingLogic = true; + llarp_logic_queue_job(logic, {this, &handle_logic_pump}); +} + +void +llarp_link::RemoveSession(llarp_link_session* s) +{ + lock_t lock(m_sessions_Mutex); + auto itr = m_sessions.find(s->addr); + if(itr != m_sessions.end()) + { + UnmapAddr(s->addr); + s->done(); + m_sessions.erase(itr); + delete s; + } +} + +uint8_t* +llarp_link::pubkey() +{ + return llarp::seckey_topublic(seckey); +} + +bool +llarp_link::ensure_privkey() +{ + llarp::LogDebug("ensure transport private key at ", keyfile); + std::error_code ec; + if(!fs::exists(keyfile, ec)) + { + if(!keygen(keyfile)) + return false; + } + std::ifstream f(keyfile); + if(f.is_open()) + { + f.read((char*)seckey.data(), seckey.size()); + return true; + } + return false; +} + +bool +llarp_link::keygen(const char* fname) +{ + crypto->encryption_keygen(seckey); + llarp::LogInfo("new transport key generated"); + std::ofstream f(fname); + if(f.is_open()) + { + f.write((char*)seckey.data(), seckey.size()); + return true; + } + return false; +} + +void +llarp_link::handle_cleanup_timer(void* l, uint64_t orig, uint64_t left) +{ + if(left) + return; + llarp_link* link = static_cast< llarp_link* >(l); + link->timeout_job_id = 0; + link->TickSessions(); + link->issue_cleanup_timer(orig); +} + +void +llarp_link::handle_recvfrom(struct llarp_udp_io* udp, + const struct sockaddr* saddr, const void* buf, + ssize_t sz) +{ + llarp_link* link = static_cast< llarp_link* >(udp->user); + + llarp_link_session* s = link->find_session(*saddr); + if(s == nullptr) + { + // new inbound session + s = link->create_session(*saddr); + } + s->recv(buf, sz); +} + +void +llarp_link::cancel_timer() +{ + if(timeout_job_id) + { + llarp_logic_cancel_call(logic, timeout_job_id); + } + timeout_job_id = 0; +} + +void +llarp_link::issue_cleanup_timer(uint64_t timeout) +{ + timeout_job_id = llarp_logic_call_later( + logic, {timeout, this, &llarp_link::handle_cleanup_timer}); +} + +void +llarp_link::get_our_address(llarp_ai* addr) +{ + addr->rank = 1; + strncpy(addr->dialect, "IWP", sizeof(addr->dialect)); + memcpy(addr->enc_key, pubkey(), 32); + memcpy(addr->ip.s6_addr, this->addr.addr6(), 16); + addr->port = this->addr.port(); +} + +void +llarp_link::after_recv(llarp_udp_io* udp) +{ + llarp_link* self = static_cast< llarp_link* >(udp->user); + self->PumpLogic(); +} + +bool +llarp_link::configure(struct llarp_ev_loop* netloop, const char* ifname, int af, + uint16_t port) +{ + if(!ensure_privkey()) + { + llarp::LogError("failed to ensure private key"); + return false; + } + + llarp::LogDebug("configure link ifname=", ifname, " af=", af, " port=", port); + // bind + sockaddr_in ip4addr; + sockaddr_in6 ip6addr; + sockaddr* addr = nullptr; + switch(af) + { + case AF_INET: + addr = (sockaddr*)&ip4addr; + llarp::Zero(addr, sizeof(ip4addr)); + break; + case AF_INET6: + addr = (sockaddr*)&ip6addr; + llarp::Zero(addr, sizeof(ip6addr)); + break; + // TODO: AF_PACKET + default: + llarp::LogError(__FILE__, "unsupported address family", af); + return false; + } + + addr->sa_family = af; + + if(!llarp::StrEq(ifname, "*")) + { + if(!llarp_getifaddr(ifname, af, addr)) + { + llarp::LogError("failed to get address of network interface ", ifname); + return false; + } + } + else + m_name = "OWP"; // outboundLink_name; + + switch(af) + { + case AF_INET: + ip4addr.sin_port = htons(port); + break; + case AF_INET6: + ip6addr.sin6_port = htons(port); + break; + // TODO: AF_PACKET + default: + return false; + } + + this->addr = *addr; + this->netloop = netloop; + udp.recvfrom = &llarp_link::handle_recvfrom; + udp.user = this; + udp.tick = &llarp_link::after_recv; + llarp::LogDebug("bind IWP link to ", addr); + if(llarp_ev_add_udp(netloop, &udp, addr) == -1) + { + llarp::LogError("failed to bind to ", addr); + return false; + } + return true; +} + +bool +llarp_link::start_link(struct llarp_logic* pLogic) +{ + // give link implementations + // link->parent = l; + timeout_job_id = 0; + this->logic = pLogic; + // start cleanup timer + issue_cleanup_timer(500); + return true; +} + +bool +llarp_link::stop_link() +{ + cancel_timer(); + llarp_ev_close_udp(&udp); + clear_sessions(); + return true; +} + +bool +llarp_link::try_establish(struct llarp_link_establish_job* job) +{ + llarp::Addr dst(job->ai); + llarp::LogDebug("establish session to ", dst); + llarp_link_session* s = find_session(dst); + if(s == nullptr) + { + s = create_session(dst); + put_session(dst, s); + } + else + return false; + s->establish_job = job; + s->frame.alive(); // mark it alive + s->introduce(job->ai.enc_key); + + return true; +} \ No newline at end of file diff --git a/llarp/iwp/session.cpp b/llarp/iwp/session.cpp index 7aa05b44d..fb5d9531f 100644 --- a/llarp/iwp/session.cpp +++ b/llarp/iwp/session.cpp @@ -1,10 +1,10 @@ +#include #include #include #include #include "address_info.hpp" #include "buffer.hpp" #include "link/encoder.hpp" - #include "llarp/ev.h" // for handle_frame_encrypt static void @@ -45,6 +45,7 @@ llarp_link_session::llarp_link_session(llarp_link *l, const byte_t *seckey, llarp::LogInfo("session created"); frame.alive(); working.store(false); + createdAt = llarp_time_now_ms(); } llarp_link_session::~llarp_link_session() @@ -239,6 +240,7 @@ handle_verify_introack(iwp_async_introack *introack) { llarp_link_session *link = static_cast< llarp_link_session * >(introack->user); + auto logic = link->serv->logic; link->working = false; if(introack->buf == nullptr) @@ -249,6 +251,9 @@ handle_verify_introack(iwp_async_introack *introack) link->serv->RemoveSession(link); return; } + // cancel resend + llarp_logic_cancel_call(logic, link->intro_resend_job_id); + link->EnterState(llarp_link_session::eIntroAckRecv); link->session_start(); } @@ -262,9 +267,9 @@ handle_establish_timeout(void *user, uint64_t orig, uint64_t left) self->establish_job_id = 0; if(self->establish_job) { - auto job = self->establish_job; - self->establish_job = nullptr; - job->link = self->serv; + llarp_link_establish_job *job = self->establish_job; + self->establish_job = nullptr; + job->link = self->serv; if(self->IsEstablished()) { job->session = self; @@ -287,13 +292,10 @@ llarp_link_session::done() llarp_logic_remove_call(logic, establish_job_id); handle_establish_timeout(this, 0, 0); } - if(pump_recv_timer_id) + if(intro_resend_job_id) { - llarp_logic_remove_call(logic, pump_recv_timer_id); - } - if(pump_send_timer_id) - { - llarp_logic_remove_call(logic, pump_send_timer_id); + llarp_logic_remove_call(logic, intro_resend_job_id); + handle_introack_timeout(this, 0, 0); } } @@ -503,7 +505,7 @@ handle_introack_generated(iwp_async_introack *i) if(i->buf && link->serv->has_intro_from(link->addr)) { // track it with the server here - if(link->serv->has_session_to(link->addr)) + if(link->serv->has_session_via(link->addr)) { // duplicate session llarp::LogWarn("duplicate session to ", link->addr); @@ -538,6 +540,11 @@ handle_generated_intro(iwp_async_intro *i) return; } link->EnterState(llarp_link_session::eIntroSent); + link->lastIntroSentAt = llarp_time_now_ms(); + auto dlt = (link->createdAt - link->lastIntroSentAt); + auto logic = link->serv->logic; + link->intro_resend_job_id = llarp_logic_call_later( + logic, {dlt, link, &llarp_link_session::handle_introack_timeout}); } else { @@ -545,11 +552,25 @@ handle_generated_intro(iwp_async_intro *i) } } +void +llarp_link_session::handle_introack_timeout(void *user, uint64_t timeout, + uint64_t left) +{ + if(timeout && left == 0) + { + // timeout reached + llarp_link_session *self = static_cast< llarp_link_session * >(user); + // retry introduce + self->introduce(nullptr); + } +} + void llarp_link_session::introduce(uint8_t *pub) { llarp::LogDebug("session introduce"); - memcpy(remote, pub, PUBKEYSIZE); + if(pub) + memcpy(remote, pub, PUBKEYSIZE); intro.buf = workbuf; size_t w0sz = (llarp_randint() % MAX_PAD); intro.sz = (32 * 3) + w0sz; @@ -571,8 +592,9 @@ llarp_link_session::introduce(uint8_t *pub) working = true; iwp_call_async_gen_intro(iwp, &intro); // start introduce timer - establish_job_id = llarp_logic_call_later( - serv->logic, {5000, this, &handle_establish_timeout}); + if(pub) + establish_job_id = llarp_logic_call_later( + serv->logic, {5000, this, &handle_establish_timeout}); } void @@ -667,7 +689,7 @@ llarp_link_session::on_session_start(const void *buf, size_t sz) void llarp_link_session::intro_ack() { - if(serv->has_session_to(addr)) + if(serv->has_session_via(addr)) { llarp::LogWarn("won't ack intro for duplicate session from ", addr); return; diff --git a/llarp/pathbuilder.cpp b/llarp/pathbuilder.cpp index ae8d294ac..61c66f155 100644 --- a/llarp/pathbuilder.cpp +++ b/llarp/pathbuilder.cpp @@ -132,7 +132,6 @@ namespace llarp AsyncPathKeyExchangeContext< llarp_pathbuild_job >* ctx) { auto remote = ctx->path->Upstream(); - llarp::LogInfo("Generated LRCM to ", remote); auto router = ctx->user->router; if(!router->SendToOrQueue(remote, ctx->LRCM)) { diff --git a/llarp/pathfinder.cpp b/llarp/pathfinder.cpp deleted file mode 100644 index 2583e1288..000000000 --- a/llarp/pathfinder.cpp +++ /dev/null @@ -1 +0,0 @@ -#include "pathfinder.hpp" diff --git a/llarp/router.cpp b/llarp/router.cpp index d5eb938d8..57d6881cc 100644 --- a/llarp/router.cpp +++ b/llarp/router.cpp @@ -61,11 +61,11 @@ llarp_router::SendToOrQueue(const llarp::RouterID &remote, const llarp::ILinkMessage *msg) { llarp_link *chosen = nullptr; - if(!outboundLink->has_session_to(outboundLink, remote)) + if(!outboundLink->has_session_to(remote)) { for(auto link : inboundLinks) { - if(link->has_session_to(link, remote)) + if(link->has_session_to(remote)) { chosen = link; break; @@ -428,17 +428,17 @@ llarp_router::SendTo(llarp::RouterID remote, const llarp::ILinkMessage *msg, buf.cur = buf.base; if(link) { - link->sendto(link, remote, buf); + link->sendto(remote, buf); return; } - bool sent = outboundLink->sendto(outboundLink, remote, buf); + bool sent = outboundLink->sendto(remote, buf); if(!sent) { for(auto link : inboundLinks) { if(!sent) { - sent = link->sendto(link, remote, buf); + sent = link->sendto(remote, buf); } } } @@ -469,10 +469,10 @@ llarp_router::GetLinkWithSessionByPubkey(const llarp::RouterID &pubkey) { for(auto &link : inboundLinks) { - if(link->has_session_to(link, pubkey)) + if(link->has_session_to(pubkey)) return link; } - if(outboundLink->has_session_to(outboundLink, pubkey)) + if(outboundLink->has_session_to(pubkey)) return outboundLink; return nullptr; } @@ -509,7 +509,7 @@ llarp_router::FlushOutboundFor(const llarp::RouterID &remote, // set size of message buf.sz = buf.cur - buf.base; buf.cur = buf.base; - if(!chosen->sendto(chosen, remote, buf)) + if(!chosen->sendto(remote, buf)) llarp::LogWarn("failed to send outboud message to ", remote, " via ", chosen->name()); diff --git a/readme.md b/readme.md index e5f2a38fc..6c7cb1c19 100644 --- a/readme.md +++ b/readme.md @@ -11,6 +11,7 @@ You have 2 ways the build this project ### Recommended Method (for stable builds) + $ sudo apt install build-essential libtool autoconf cmake git python3-venv $ git clone --recursive https://github.com/majestrate/llarpd-builder $ cd llarpd-builder $ make @@ -38,7 +39,7 @@ Right now the reference daemon connects to nodes you tell it to and that's it. If you built using the recommended way just run: - $ ./llarpd + $ ./lokinet It'll attempt to connect to a test node I run and keep the session alive. That's it.