Merge pull request #833 from majestrate/gut-utp-2019-09-19

remove libutp and all such code related to utp
pull/836/head
Jeff 5 years ago committed by GitHub
commit 35230adbe5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -245,7 +245,6 @@ endif()
set(LIBS ${MALLOC_LIB} ${FS_LIB} ${LIBUV_LIBRARY})
add_subdirectory(crypto)
add_subdirectory(libutp)
add_subdirectory(llarp)
add_subdirectory(libabyss)

@ -73,7 +73,7 @@ else()
endif()
set(EXE_LIBS ${STATIC_LIB} libutp)
set(EXE_LIBS ${STATIC_LIB})
if(RELEASE_MOTTO)
add_definitions(-DLLARP_RELEASE_MOTTO="${RELEASE_MOTTO}")

@ -1,19 +0,0 @@
set(UTP_SRC
utp_callbacks.cpp
utp_utils.cpp
utp_internal.cpp
utp_api.cpp
utp_packedsockaddr.cpp
utp_hash.cpp
)
if(WIN32)
list(APPEND UTP_SRC libutp_inet_ntop.cpp)
endif(WIN32)
add_library(libutp STATIC ${UTP_SRC})
if(WIN32)
target_link_libraries(libutp ws2_32)
endif(WIN32)
target_include_directories(libutp PUBLIC ${CMAKE_CURRENT_SOURCE_DIR} ${CORE_INCLUDE})
add_log_tag(libutp)

@ -1,19 +0,0 @@
Copyright (c) 2010-2013 BitTorrent, Inc.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

@ -1,48 +0,0 @@
OBJS = utp_internal.o utp_utils.o utp_hash.o utp_callbacks.o utp_api.o utp_packedsockaddr.o
CFLAGS = -Wall -DPOSIX -g -fno-exceptions $(OPT)
OPT ?= -O3
CXXFLAGS = $(CFLAGS) -fPIC -fno-rtti
CC = gcc
CXX = g++
CXXFLAGS += -Wno-sign-compare
CXXFLAGS += -fpermissive
# Uncomment to enable utp_get_stats(), and a few extra sanity checks
#CFLAGS += -D_DEBUG
# Uncomment to enable debug logging
#CFLAGS += -DUTP_DEBUG_LOGGING
# Dynamically determine if librt is available. If so, assume we need to link
# against it for clock_gettime(2). This is required for clean builds on OSX;
# see <https://github.com/bittorrent/libutp/issues/1> for more. This should
# probably be ported to CMake at some point, but is suitable for now.
lrt := $(shell echo 'int main() {}' | $(CC) -xc -o /dev/null - -lrt >/dev/null 2>&1; echo $$?)
ifeq ($(strip $(lrt)),0)
LDFLAGS += -lrt
endif
all: libutp.so libutp.a ucat ucat-static
libutp.so: $(OBJS)
$(CXX) $(CXXFLAGS) -o libutp.so -shared $(OBJS)
libutp.a: $(OBJS)
ar rvs libutp.a $(OBJS)
ucat: ucat.o libutp.so
$(CC) $(CFLAGS) -o ucat ucat.o -L. -lutp $(LDFLAGS)
ucat-static: ucat.o libutp.a
$(CXX) $(CXXFLAGS) -o ucat-static ucat.o libutp.a $(LDFLAGS)
clean:
rm -f *.o libutp.so libutp.a ucat ucat-static
tags: $(shell ls *.cpp *.h)
rm -f tags
ctags *.cpp *.h
anyway: clean all
.PHONY: clean all anyway

@ -1,68 +0,0 @@
# libutp - The uTorrent Transport Protocol library.
Copyright (c) 2010 BitTorrent, Inc.
uTP is a TCP-like implementation of [LEDBAT][ledbat] documented as a BitTorrent
extension in [BEP-29][bep29]. uTP provides reliable, ordered delivery
while maintaining minimum extra delay. It is implemented on top of UDP to be
cross-platform and functional today. As a result, uTP is the primary transport
for uTorrent peer-to-peer connections.
uTP is written in C++, but the external interface is strictly C (ANSI C89).
## The Interface
The uTP socket interface is a bit different from the Berkeley socket API to
avoid the need for our own select() implementation, and to make it easier to
write event-based code with minimal buffering.
When you create a uTP socket, you register a set of callbacks. Most notably, the
on_read callback is a reactive callback which occurs when bytes arrive off the
network. The write side of the socket is proactive, and you call UTP_Write to
indicate the number of bytes you wish to write. As packets are created, the
on_write callback is called for each packet, so you can fill the buffers with
data.
The libutp interface is not thread-safe. It was designed for use in a
single-threaded asyncronous context, although with proper synchronization
it may be used from a multi-threaded environment as well.
See utp.h for more details and other API documentation.
## Example
See ucat.c. Build with:
make ucat
## Building
uTP has been known to build on Windows with MSVC and on linux and OS X with gcc.
On Windows, use the MSVC project files (utp.sln, and friends). On other platforms,
building the shared library is as simple as:
make
To build one of the examples, which will statically link in everything it needs
from libutp:
cd utp_test && make
## Packaging and API
The libutp API is considered unstable, and probably always will be. We encourage
you to test with the version of libutp you have, and be mindful when upgrading.
For this reason, it is probably also a good idea to bundle libutp with your
application.
## License
libutp is released under the [MIT][lic] license.
## Related Work
Research and analysis of congestion control mechanisms can be found [here.][survey]
[ledbat]: http://datatracker.ietf.org/wg/ledbat/charter/
[bep29]: http://www.bittorrent.org/beps/bep_0029.html
[lic]: http://www.opensource.org/licenses/mit-license.php
[survey]: http://datatracker.ietf.org/doc/draft-ietf-ledbat-survey/

@ -1,57 +0,0 @@
/*
* Copyright (c) 2010-2013 BitTorrent, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#ifndef WIN32_LEAN_AND_MEAN
#define WIN32_LEAN_AND_MEAN
#endif
#include <windows.h>
#include <winsock2.h>
#include <ws2tcpip.h>
#include <wspiapi.h>
#include "libutp_inet_ntop.h"
// we already have our own definition of these
// -despair
#if _WIN32_WINNT < 0x600
namespace
{
extern "C"
{
const char *
inet_ntop(int af, const void *src, char *dst, size_t size);
int
inet_pton(int af, const char *src, void *dst);
}
} // namespace
#endif
//######################################################################
const char *
libutp::inet_ntop(int af, const void *src, char *dest, size_t length)
{
return ::inet_ntop(af, (void *)src, dest, length);
}
//######################################################################
int
libutp::inet_pton(int af, const char *src, void *dest)
{
return ::inet_pton(af, src, dest);
}

@ -1,63 +0,0 @@
#ifndef LIBUTP_INET_NTOP_H
#define LIBUTP_INET_NTOP_H
/*
* Copyright (c) 2010-2013 BitTorrent, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
// About us linking the system inet_pton and inet_ntop symbols:
// 1) These symbols are usually defined on POSIX systems
// 2) They are not defined on Windows versions earlier than Vista
// Defined in:
// ut_utils/src/sockaddr.cpp
// libutp/win32_inet_ntop.obj
//
#if defined(_WIN32_WINNT)
#if _WIN32_WINNT >= 0x600 // Win32, post-XP
#include <ws2tcpip.h> // for inet_ntop, inet_pton
#define INET_NTOP inet_ntop
#define INET_PTON inet_pton
#else
#define INET_NTOP libutp::inet_ntop // Win32, pre-XP: Use ours
#define INET_PTON libutp::inet_pton
#endif
#else // not WIN32
#include <arpa/inet.h> // for inet_ntop, inet_pton
#define INET_NTOP inet_ntop
#define INET_PTON inet_pton
#endif
//######################################################################
//######################################################################
namespace libutp {
//######################################################################
const char *inet_ntop(int af, const void *src, char *dest, size_t length);
//######################################################################
int inet_pton(int af, const char* src, void* dest);
} //namespace libutp
#endif // LIBUTP_INET_NTOP_H

@ -1,295 +0,0 @@
import os
import sys
# usage: parse_log.py log-file [socket-index to focus on]
socket_filter = None
if len(sys.argv) >= 3:
socket_filter = sys.argv[2].strip()
if socket_filter is None:
print "scanning for socket with the most packets"
file = open(sys.argv[1], 'rb')
sockets = {}
for l in file:
if not 'our_delay' in l:
continue
try:
a = l.strip().split(" ")
socket_index = a[1][:-1]
except:
continue
# msvc's runtime library doesn't prefix pointers
# with '0x'
# if socket_index[:2] != '0x':
# continue
if socket_index in sockets:
sockets[socket_index] += 1
else:
sockets[socket_index] = 1
items = sockets.items()
items.sort(lambda x, y: y[1] - x[1])
count = 0
for i in items:
print '%s: %d' % (i[0], i[1])
count += 1
if count > 5:
break
file.close()
socket_filter = items[0][0]
print '\nfocusing on socket %s' % socket_filter
file = open(sys.argv[1], 'rb')
out_file = 'utp.out%s' % socket_filter
out = open(out_file, 'wb')
delay_samples = 'dots lc rgb "blue"'
delay_base = 'steps lw 2 lc rgb "purple"'
target_delay = 'steps lw 2 lc rgb "red"'
off_target = 'dots lc rgb "blue"'
cwnd = 'steps lc rgb "green"'
window_size = 'steps lc rgb "sea-green"'
rtt = 'lines lc rgb "light-blue"'
metrics = {
'our_delay': ['our delay (ms)', 'x1y2', delay_samples],
'upload_rate': ['send rate (B/s)', 'x1y1', 'lines'],
'max_window': ['cwnd (B)', 'x1y1', cwnd],
'target_delay': ['target delay (ms)', 'x1y2', target_delay],
'cur_window': ['bytes in-flight (B)', 'x1y1', window_size],
'cur_window_packets': ['number of packets in-flight', 'x1y2', 'steps'],
'packet_size': ['current packet size (B)', 'x1y2', 'steps'],
'rtt': ['rtt (ms)', 'x1y2', rtt],
'off_target': ['off-target (ms)', 'x1y2', off_target],
'delay_sum': ['delay sum (ms)', 'x1y2', 'steps'],
'their_delay': ['their delay (ms)', 'x1y2', delay_samples],
'get_microseconds': ['clock (us)', 'x1y1', 'steps'],
'wnduser': ['advertised window size (B)', 'x1y1', 'steps'],
'delay_base': ['delay base (us)', 'x1y1', delay_base],
'their_delay_base': ['their delay base (us)', 'x1y1', delay_base],
'their_actual_delay': ['their actual delay (us)', 'x1y1', delay_samples],
'actual_delay': ['actual_delay (us)', 'x1y1', delay_samples]
}
histogram_quantization = 1
socket_index = None
columns = []
begin = None
title = "-"
packet_loss = 0
packet_timeout = 0
delay_histogram = {}
window_size = {'0': 0, '1': 0}
# [35301484] 0x00ec1190: actual_delay:1021583 our_delay:102 their_delay:-1021345 off_target:297 max_window:2687 upload_rate:18942 delay_base:1021481154 delay_sum:-1021242 target_delay:400 acked_bytes:1441 cur_window:2882 scaled_gain:2.432
counter = 0
print "reading log file"
for l in file:
if "UTP_Connect" in l:
title = l[:-2]
if socket_filter != None:
title += ' socket: %s' % socket_filter
else:
title += ' sum of all sockets'
continue
try:
a = l.strip().split(" ")
t = a[0][1:-1]
socket_index = a[1][:-1]
except:
continue
# if socket_index[:2] != '0x':
# continue
if socket_filter != None and socket_index != socket_filter:
continue
counter += 1
if (counter % 300 == 0):
print "\r%d " % counter,
if "lost." in l:
packet_loss = packet_loss + 1
continue
if "Packet timeout" in l:
packet_timeout = packet_timeout + 1
continue
if "our_delay:" not in l:
continue
# used for Logf timestamps
# t, m = t.split(".")
# t = time.strptime(t, "%H:%M:%S")
# t = list(t)
# t[0] += 107
# t = tuple(t)
# m = float(m)
# m /= 1000.0
# t = time.mktime(t) + m
# used for tick count timestamps
t = int(t)
if begin is None:
begin = t
t = t - begin
# print time. Convert from milliseconds to seconds
print >>out, '%f\t' % (float(t)/1000.),
# if t > 200000:
# break
fill_columns = not columns
for i in a[2:]:
try:
n, v = i.split(':')
except:
continue
v = float(v)
if n == "our_delay":
bucket = v / histogram_quantization
delay_histogram[bucket] = 1 + delay_histogram.get(bucket, 0)
if not n in metrics:
continue
if fill_columns:
columns.append(n)
if n == "max_window":
window_size[socket_index] = v
print >>out, '%f\t' % int(
reduce(lambda a, b: a+b, window_size.values())),
else:
print >>out, '%f\t' % v,
print >>out, float(packet_loss * 8000), float(packet_timeout * 8000)
packet_loss = 0
packet_timeout = 0
out.close()
out = open('%s.histogram' % out_file, 'wb')
for d, f in delay_histogram.iteritems():
print >>out, float(d*histogram_quantization) + \
histogram_quantization / 2, f
out.close()
plot = [
{
'data': ['upload_rate', 'max_window', 'cur_window', 'wnduser', 'cur_window_packets', 'packet_size', 'rtt'],
'title': 'send-packet-size',
'y1': 'Bytes',
'y2': 'Time (ms)'
},
{
'data': ['our_delay', 'max_window', 'target_delay', 'cur_window', 'wnduser', 'cur_window_packets'],
'title': 'uploading',
'y1': 'Bytes',
'y2': 'Time (ms)'
},
{
'data': ['our_delay', 'max_window', 'target_delay', 'cur_window', 'cur_window_packets'],
'title': 'uploading_packets',
'y1': 'Bytes',
'y2': 'Time (ms)'
},
{
'data': ['get_microseconds'],
'title': 'timer',
'y1': 'Time microseconds',
'y2': 'Time (ms)'
},
{
'data': ['their_delay', 'target_delay', 'rtt'],
'title': 'their_delay',
'y1': '',
'y2': 'Time (ms)'
},
{
'data': ['their_actual_delay', 'their_delay_base'],
'title': 'their_delay_base',
'y1': 'Time (us)',
'y2': ''
},
{
'data': ['our_delay', 'target_delay', 'rtt'],
'title': 'our-delay',
'y1': '',
'y2': 'Time (ms)'
},
{
'data': ['actual_delay', 'delay_base'],
'title': 'our_delay_base',
'y1': 'Time (us)',
'y2': ''
}
]
out = open('utp.gnuplot', 'w+')
files = ''
#print >>out, 'set xtics 0, 20'
print >>out, "set term png size 1280,800"
print >>out, 'set output "%s.delays.png"' % out_file
print >>out, 'set xrange [0:250]'
print >>out, 'set xlabel "delay (ms)"'
print >>out, 'set boxwidth 1'
print >>out, 'set style fill solid'
print >>out, 'set ylabel "number of packets"'
print >>out, 'plot "%s.histogram" using 1:2 with boxes' % out_file
print >>out, "set style data steps"
#print >>out, "set yrange [0:*]"
print >>out, "set y2range [*:*]"
files += out_file + '.delays.png '
# set hidden3d
# set title "Peer bandwidth distribution"
# set xlabel "Ratio"
for p in plot:
print >>out, 'set title "%s %s"' % (p['title'], title)
print >>out, 'set xlabel "time (s)"'
print >>out, 'set ylabel "%s"' % p['y1']
print >>out, "set tics nomirror"
print >>out, 'set y2tics'
print >>out, 'set y2label "%s"' % p['y2']
print >>out, 'set xrange [0:*]'
print >>out, "set key box"
print >>out, "set term png size 1280,800"
print >>out, 'set output "%s-%s.png"' % (out_file, p['title'])
files += '%s-%s.png ' % (out_file, p['title'])
comma = ''
print >>out, "plot",
for c in p['data']:
if not c in metrics:
continue
i = columns.index(c)
print >>out, '%s"%s" using 1:%d title "%s-%s" axes %s with %s' % (
comma, out_file, i + 2, metrics[c][0], metrics[c][1], metrics[c][1], metrics[c][2]),
comma = ', '
print >>out, ''
out.close()
os.system("gnuplot utp.gnuplot")
os.system("open %s" % files)

@ -1,139 +0,0 @@
// vim:set ts=4 sw=4 ai:
/*
* Copyright (c) 2010-2013 BitTorrent, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#include <stdio.h>
#include "utp_internal.h"
#include "utp_utils.h"
extern "C" {
const char * utp_callback_names[] = {
"UTP_ON_FIREWALL",
"UTP_ON_ACCEPT",
"UTP_ON_CONNECT",
"UTP_ON_ERROR",
"UTP_ON_READ",
"UTP_ON_OVERHEAD_STATISTICS",
"UTP_ON_STATE_CHANGE",
"UTP_GET_READ_BUFFER_SIZE",
"UTP_ON_DELAY_SAMPLE",
"UTP_GET_UDP_MTU",
"UTP_GET_UDP_OVERHEAD",
"UTP_GET_MILLISECONDS",
"UTP_GET_MICROSECONDS",
"UTP_GET_RANDOM",
"UTP_LOG",
"UTP_SENDTO",
};
const char * utp_error_code_names[] = {
"UTP_ECONNREFUSED",
"UTP_ECONNRESET",
"UTP_ETIMEDOUT",
};
const char *utp_state_names[] = {
NULL,
"UTP_STATE_CONNECT",
"UTP_STATE_WRITABLE",
"UTP_STATE_EOF",
"UTP_STATE_DESTROYING",
};
struct_utp_context::struct_utp_context()
: userdata(NULL)
, current_ms(0)
, last_utp_socket(NULL)
, log_normal(false)
, log_mtu(false)
, log_debug(false)
{
memset(&context_stats, 0, sizeof(context_stats));
memset(callbacks, 0, sizeof(callbacks));
target_delay = CCONTROL_TARGET;
utp_sockets = new UTPSocketHT;
callbacks[UTP_GET_UDP_MTU] = &utp_default_get_udp_mtu;
callbacks[UTP_GET_UDP_OVERHEAD] = &utp_default_get_udp_overhead;
callbacks[UTP_GET_MILLISECONDS] = &utp_default_get_milliseconds;
callbacks[UTP_GET_MICROSECONDS] = &utp_default_get_microseconds;
callbacks[UTP_GET_RANDOM] = &utp_default_get_random;
// 1 MB of receive buffer (i.e. max bandwidth delay product)
// means that from a peer with 200 ms RTT, we cannot receive
// faster than 5 MB/s
// from a peer with 10 ms RTT, we cannot receive faster than
// 100 MB/s. This is assumed to be good enough, since bandwidth
// often is proportional to RTT anyway
// when setting a download rate limit, all sockets should have
// their receive buffer set much lower, to say 60 kiB or so
opt_rcvbuf = opt_sndbuf = 1024 * 1024;
last_check = 0;
}
struct_utp_context::~struct_utp_context() {
delete this->utp_sockets;
}
utp_context* utp_init (int version)
{
assert(version == 2);
if (version != 2)
return NULL;
utp_context *ctx = new utp_context;
return ctx;
}
void utp_destroy(utp_context *ctx) {
assert(ctx);
if (ctx) delete ctx;
}
void utp_set_callback(utp_context *ctx, int callback_name, utp_callback_t *proc) {
assert(ctx);
if (ctx) ctx->callbacks[callback_name] = proc;
}
void* utp_context_set_userdata(utp_context *ctx, void *userdata) {
assert(ctx);
if (ctx) ctx->userdata = userdata;
return ctx ? ctx->userdata : NULL;
}
void* utp_context_get_userdata(utp_context *ctx) {
assert(ctx);
return ctx ? ctx->userdata : NULL;
}
utp_context_stats* utp_get_context_stats(utp_context *ctx) {
assert(ctx);
return ctx ? &ctx->context_stats : NULL;
}
ssize_t utp_write(utp_socket *socket, void *buf, size_t len) {
struct utp_iovec iovec = { buf, len };
return utp_writev(socket, &iovec, 1);
}
}

@ -1,208 +0,0 @@
// vim:set ts=4 sw=4 ai:
/*
* Copyright (c) 2010-2013 BitTorrent, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#include "utp_callbacks.h"
int utp_call_on_firewall(utp_context *ctx, const struct sockaddr *address, socklen_t address_len)
{
utp_callback_arguments args;
if (!ctx->callbacks[UTP_ON_FIREWALL]) return 0;
args.callback_type = UTP_ON_FIREWALL;
args.context = ctx;
args.socket = NULL;
args.address = address;
args.address_len = address_len;
return (int)ctx->callbacks[UTP_ON_FIREWALL](&args);
}
void utp_call_on_accept(utp_context *ctx, utp_socket *socket, const struct sockaddr *address, socklen_t address_len)
{
utp_callback_arguments args;
if (!ctx->callbacks[UTP_ON_ACCEPT]) return;
args.callback_type = UTP_ON_ACCEPT;
args.context = ctx;
args.socket = socket;
args.address = address;
args.address_len = address_len;
ctx->callbacks[UTP_ON_ACCEPT](&args);
}
void utp_call_on_connect(utp_context *ctx, utp_socket *socket)
{
utp_callback_arguments args;
if (!ctx->callbacks[UTP_ON_CONNECT]) return;
args.callback_type = UTP_ON_CONNECT;
args.context = ctx;
args.socket = socket;
ctx->callbacks[UTP_ON_CONNECT](&args);
}
void utp_call_on_error(utp_context *ctx, utp_socket *socket, int error_code)
{
utp_callback_arguments args;
if (!ctx->callbacks[UTP_ON_ERROR]) return;
args.callback_type = UTP_ON_ERROR;
args.context = ctx;
args.socket = socket;
args.error_code = error_code;
ctx->callbacks[UTP_ON_ERROR](&args);
}
void utp_call_on_read(utp_context *ctx, utp_socket *socket, const byte *buf, size_t len)
{
utp_callback_arguments args;
if (!ctx->callbacks[UTP_ON_READ]) return;
args.callback_type = UTP_ON_READ;
args.context = ctx;
args.socket = socket;
args.buf = buf;
args.len = len;
ctx->callbacks[UTP_ON_READ](&args);
}
void utp_call_on_overhead_statistics(utp_context *ctx, utp_socket *socket, int send, size_t len, int type)
{
utp_callback_arguments args;
if (!ctx->callbacks[UTP_ON_OVERHEAD_STATISTICS]) return;
args.callback_type = UTP_ON_OVERHEAD_STATISTICS;
args.context = ctx;
args.socket = socket;
args.send = send;
args.len = len;
args.type = type;
ctx->callbacks[UTP_ON_OVERHEAD_STATISTICS](&args);
}
void utp_call_on_delay_sample(utp_context *ctx, utp_socket *socket, int sample_ms)
{
utp_callback_arguments args;
if (!ctx->callbacks[UTP_ON_DELAY_SAMPLE]) return;
args.callback_type = UTP_ON_DELAY_SAMPLE;
args.context = ctx;
args.socket = socket;
args.sample_ms = sample_ms;
ctx->callbacks[UTP_ON_DELAY_SAMPLE](&args);
}
void utp_call_on_state_change(utp_context *ctx, utp_socket *socket, int state)
{
utp_callback_arguments args;
if (!ctx->callbacks[UTP_ON_STATE_CHANGE]) return;
args.callback_type = UTP_ON_STATE_CHANGE;
args.context = ctx;
args.socket = socket;
args.state = state;
ctx->callbacks[UTP_ON_STATE_CHANGE](&args);
}
uint16 utp_call_get_udp_mtu(utp_context *ctx, utp_socket *socket, const struct sockaddr *address, socklen_t address_len)
{
utp_callback_arguments args;
if (!ctx->callbacks[UTP_GET_UDP_MTU]) return 0;
args.callback_type = UTP_GET_UDP_MTU;
args.context = ctx;
args.socket = socket;
args.address = address;
args.address_len = address_len;
return (uint16)ctx->callbacks[UTP_GET_UDP_MTU](&args);
}
uint16 utp_call_get_udp_overhead(utp_context *ctx, utp_socket *socket, const struct sockaddr *address, socklen_t address_len)
{
utp_callback_arguments args;
if (!ctx->callbacks[UTP_GET_UDP_OVERHEAD]) return 0;
args.callback_type = UTP_GET_UDP_OVERHEAD;
args.context = ctx;
args.socket = socket;
args.address = address;
args.address_len = address_len;
return (uint16)ctx->callbacks[UTP_GET_UDP_OVERHEAD](&args);
}
uint64 utp_call_get_milliseconds(utp_context *ctx, utp_socket *socket)
{
utp_callback_arguments args;
if (!ctx->callbacks[UTP_GET_MILLISECONDS]) return 0;
args.callback_type = UTP_GET_MILLISECONDS;
args.context = ctx;
args.socket = socket;
return ctx->callbacks[UTP_GET_MILLISECONDS](&args);
}
uint64 utp_call_get_microseconds(utp_context *ctx, utp_socket *socket)
{
utp_callback_arguments args;
if (!ctx->callbacks[UTP_GET_MICROSECONDS]) return 0;
args.callback_type = UTP_GET_MICROSECONDS;
args.context = ctx;
args.socket = socket;
return ctx->callbacks[UTP_GET_MICROSECONDS](&args);
}
uint32 utp_call_get_random(utp_context *ctx, utp_socket *socket)
{
utp_callback_arguments args;
if (!ctx->callbacks[UTP_GET_RANDOM]) return 0;
args.callback_type = UTP_GET_RANDOM;
args.context = ctx;
args.socket = socket;
return (uint32)ctx->callbacks[UTP_GET_RANDOM](&args);
}
size_t utp_call_get_read_buffer_size(utp_context *ctx, utp_socket *socket)
{
utp_callback_arguments args;
if (!ctx->callbacks[UTP_GET_READ_BUFFER_SIZE]) return 0;
args.callback_type = UTP_GET_READ_BUFFER_SIZE;
args.context = ctx;
args.socket = socket;
return (size_t)ctx->callbacks[UTP_GET_READ_BUFFER_SIZE](&args);
}
void utp_call_log(utp_context *ctx, utp_socket *socket, const byte *buf)
{
utp_callback_arguments args;
if (!ctx->callbacks[UTP_LOG]) return;
args.callback_type = UTP_LOG;
args.context = ctx;
args.socket = socket;
args.buf = buf;
ctx->callbacks[UTP_LOG](&args);
}
void utp_call_sendto(utp_context *ctx, utp_socket *socket, const byte *buf, size_t len, const struct sockaddr *address, socklen_t address_len, uint32 flags)
{
utp_callback_arguments args;
if (!ctx->callbacks[UTP_SENDTO]) return;
args.callback_type = UTP_SENDTO;
args.context = ctx;
args.socket = socket;
args.buf = buf;
args.len = len;
args.address = address;
args.address_len = address_len;
args.flags = flags;
ctx->callbacks[UTP_SENDTO](&args);
}

@ -1,47 +0,0 @@
/*
* Copyright (c) 2010-2013 BitTorrent, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#ifndef __UTP_CALLBACKS_H__
#define __UTP_CALLBACKS_H__
#include "utp.h"
#include "utp_internal.h"
// Generated by running: grep ^[a-z] utp_callbacks.cpp | sed 's/$/;/'
int utp_call_on_firewall(utp_context *ctx, const struct sockaddr *address, socklen_t address_len);
void utp_call_on_accept(utp_context *ctx, utp_socket *s, const struct sockaddr *address, socklen_t address_len);
void utp_call_on_connect(utp_context *ctx, utp_socket *s);
void utp_call_on_error(utp_context *ctx, utp_socket *s, int error_code);
void utp_call_on_read(utp_context *ctx, utp_socket *s, const byte *buf, size_t len);
void utp_call_on_overhead_statistics(utp_context *ctx, utp_socket *s, int send, size_t len, int type);
void utp_call_on_delay_sample(utp_context *ctx, utp_socket *s, int sample_ms);
void utp_call_on_state_change(utp_context *ctx, utp_socket *s, int state);
uint16 utp_call_get_udp_mtu(utp_context *ctx, utp_socket *s, const struct sockaddr *address, socklen_t address_len);
uint16 utp_call_get_udp_overhead(utp_context *ctx, utp_socket *s, const struct sockaddr *address, socklen_t address_len);
uint64 utp_call_get_milliseconds(utp_context *ctx, utp_socket *s);
uint64 utp_call_get_microseconds(utp_context *ctx, utp_socket *s);
uint32 utp_call_get_random(utp_context *ctx, utp_socket *s);
size_t utp_call_get_read_buffer_size(utp_context *ctx, utp_socket *s);
void utp_call_log(utp_context *ctx, utp_socket *s, const byte *buf);
void utp_call_sendto(utp_context *ctx, utp_socket *s, const byte *buf, size_t len, const struct sockaddr *address, socklen_t address_len, uint32 flags);
#endif // __UTP_CALLBACKS_H__

@ -1,246 +0,0 @@
/*
* Copyright (c) 2010-2013 BitTorrent, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#include "utp_hash.h"
#include "utp_types.h"
#define LIBUTP_HASH_UNUSED ((utp_link_t)-1)
#ifdef STRICT_ALIGN
inline uint32 Read32(const void *p)
{
uint32 tmp;
memcpy(&tmp, p, sizeof tmp);
return tmp;
}
#else
inline uint32 Read32(const void *p) { return *(uint32*)p; }
#endif
// Get the amount of memory required for the hash parameters and the bucket set
// Waste a space for an unused bucket in order to ensure the following managed memory have 32-bit aligned addresses
// TODO: make this 64-bit clean
#define BASE_SIZE(bc) (sizeof(utp_hash_t) + sizeof(utp_link_t) * ((bc) + 1))
// Get a pointer to the base of the structure array managed by the hash table
#define get_bep(h) ((byte*)(h)) + BASE_SIZE((h)->N)
// Get the address of the information associated with a specific structure in the array,
// given the address of the base of the structure.
// This assumes a utp_link_t link member is at the end of the structure.
// Given compilers filling out the memory to a 32-bit clean value, this may mean that
// the location named in the structure may not be the location actually used by the hash table,
// since the compiler may have padded the end of the structure with 2 bytes after the utp_link_t member.
// TODO: this macro should not require that the variable pointing at the hash table be named 'hash'
#define ptr_to_link(p) (utp_link_t *) (((byte *) (p)) + hash->E - sizeof(utp_link_t))
// Calculate how much to allocate for a hash table with bucket count, total size, and structure count
// TODO: make this 64-bit clean
#define ALLOCATION_SIZE(bc, ts, sc) (BASE_SIZE((bc)) + (ts) * (sc))
utp_hash_t *utp_hash_create(int N, int key_size, int total_size, int initial, utp_hash_compute_t hashfun, utp_hash_equal_t compfun)
{
// Must have odd number of hash buckets (prime number is best)
assert(N % 2);
// Ensure structures will be at aligned memory addresses
// TODO: make this 64-bit clean
assert(0 == (total_size % 4));
int size = ALLOCATION_SIZE(N, total_size, initial);
utp_hash_t *hash = (utp_hash_t *) malloc( size );
memset( hash, 0, size );
for (int i = 0; i < N + 1; ++i)
hash->inits[i] = LIBUTP_HASH_UNUSED;
hash->N = N;
hash->K = key_size;
hash->E = total_size;
hash->hash_compute = hashfun;
hash->hash_equal = compfun;
hash->allocated = initial;
hash->count = 0;
hash->used = 0;
hash->free = LIBUTP_HASH_UNUSED;
return hash;
}
uint utp_hash_mem(const void *keyp, size_t keysize)
{
uint hash = 0;
uint n = keysize;
while (n >= 4) {
hash ^= Read32(keyp);
keyp = (byte*)keyp + sizeof(uint32);
hash = (hash << 13) | (hash >> 19);
n -= 4;
}
while (n != 0) {
hash ^= *(byte*)keyp;
keyp = (byte*)keyp + sizeof(byte);
hash = (hash << 8) | (hash >> 24);
n--;
}
return hash;
}
uint utp_hash_mkidx(utp_hash_t *hash, const void *keyp)
{
// Generate a key from the hash
return hash->hash_compute(keyp, hash->K) % hash->N;
}
static inline bool compare(byte *a, byte *b,int n)
{
assert(n >= 4);
if (Read32(a) != Read32(b)) return false;
return memcmp(a+4, b+4, n-4) == 0;
}
#define COMPARE(h,k1,k2,ks) (((h)->hash_equal) ? (h)->hash_equal((void*)k1,(void*)k2,ks) : compare(k1,k2,ks))
// Look-up a key in the hash table.
// Returns NULL if not found
void *utp_hash_lookup(utp_hash_t *hash, const void *key)
{
utp_link_t idx = utp_hash_mkidx(hash, key);
// base pointer
byte *bep = get_bep(hash);
utp_link_t cur = hash->inits[idx];
while (cur != LIBUTP_HASH_UNUSED) {
byte *key2 = bep + (cur * hash->E);
if (COMPARE(hash, (byte*)key, key2, hash->K))
return key2;
cur = *ptr_to_link(key2);
}
return NULL;
}
// Add a new element to the hash table.
// Returns a pointer to the new element.
// This assumes the element is not already present!
void *utp_hash_add(utp_hash_t **hashp, const void *key)
{
//Allocate a new entry
byte *elemp;
utp_link_t elem;
utp_hash_t *hash = *hashp;
utp_link_t idx = utp_hash_mkidx(hash, key);
if ((elem=hash->free) == LIBUTP_HASH_UNUSED) {
utp_link_t all = hash->allocated;
if (hash->used == all) {
utp_hash_t *nhash;
if (all <= (LIBUTP_HASH_UNUSED/2)) {
all *= 2;
} else if (all != LIBUTP_HASH_UNUSED) {
all = LIBUTP_HASH_UNUSED;
} else {
// too many items! can't grow!
assert(0);
return NULL;
}
// otherwise need to allocate.
nhash = (utp_hash_t*)realloc(hash, ALLOCATION_SIZE(hash->N, hash->E, all));
if (!nhash) {
// out of memory (or too big to allocate)
assert(nhash);
return NULL;
}
hash = *hashp = nhash;
hash->allocated = all;
}
elem = hash->used++;
elemp = get_bep(hash) + elem * hash->E;
} else {
elemp = get_bep(hash) + elem * hash->E;
hash->free = *ptr_to_link(elemp);
}
*ptr_to_link(elemp) = hash->inits[idx];
hash->inits[idx] = elem;
hash->count++;
// copy key into it
memcpy(elemp, key, hash->K);
return elemp;
}
// Delete an element from the utp_hash_t
// Returns a pointer to the already deleted element.
void *utp_hash_del(utp_hash_t *hash, const void *key)
{
utp_link_t idx = utp_hash_mkidx(hash, key);
// base pointer
byte *bep = get_bep(hash);
utp_link_t *curp = &hash->inits[idx];
utp_link_t cur;
while ((cur=*curp) != LIBUTP_HASH_UNUSED) {
byte *key2 = bep + (cur * hash->E);
if (COMPARE(hash,(byte*)key,(byte*)key2, hash->K )) {
// found an item that matched. unlink it
*curp = *ptr_to_link(key2);
// Insert into freelist
*ptr_to_link(key2) = hash->free;
hash->free = cur;
hash->count--;
return key2;
}
curp = ptr_to_link(key2);
}
return NULL;
}
void *utp_hash_iterate(utp_hash_t *hash, utp_hash_iterator_t *iter)
{
utp_link_t elem;
if ((elem=iter->elem) == LIBUTP_HASH_UNUSED) {
// Find a bucket with an element
utp_link_t buck = iter->bucket + 1;
for(;;) {
if (buck >= hash->N)
return NULL;
if ((elem = hash->inits[buck]) != LIBUTP_HASH_UNUSED)
break;
buck++;
}
iter->bucket = buck;
}
byte *elemp = get_bep(hash) + (elem * hash->E);
iter->elem = *ptr_to_link(elemp);
return elemp;
}
void utp_hash_free_mem(utp_hash_t* hash)
{
free(hash);
}

@ -1,209 +0,0 @@
/*
* Copyright (c) 2010-2013 BitTorrent, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#ifndef __UTP_HASH_H__
#define __UTP_HASH_H__
#include <string.h> // memset
#include <stdlib.h> // malloc
#include "utp_types.h"
#include "utp_templates.h"
// TODO: make utp_link_t a template parameter to HashTable
typedef uint32 utp_link_t;
#ifdef _MSC_VER
// Silence the warning about the C99-compliant zero-length array at the end of
// the structure
#pragma warning(disable : 4200)
#endif
#include <absl/base/attributes.h>
typedef uint32 (*utp_hash_compute_t)(const void *keyp, size_t keysize);
typedef uint (*utp_hash_equal_t)(const void *key_a, const void *key_b,
size_t keysize);
// In memory the HashTable is laid out as follows:
// ---------------------------- low
// | hash table data members |
// ---------------------------- _
// | indices | ^
// | . | | utp_link_t indices into the key-values.
// | . | .
// ---------------------------- - <----- bep
// | keys and values | each key-value pair has size total_size
// | . |
// | . |
// ---------------------------- high
//
// The code depends on the ability of the compiler to pad the length
// of the hash table data members structure to
// a length divisible by 32-bits with no remainder.
//
// Since the number of hash buckets (indices) should be odd, the code
// asserts this and adds one to the hash bucket count to ensure that the
// following key-value pairs array starts on a 32-bit boundary.
//
// The key-value pairs array should start on a 32-bit boundary, otherwise
// processors like the ARM will silently mangle 32-bit data in these structures
// (e.g., turning 0xABCD into 0XCDAB when moving a value from memory to register
// when the memory address is 16 bits offset from a 32-bit boundary),
// also, the value will be stored at an address two bytes lower than the address
// value would ordinarily indicate.
//
// The key-value pair is of type T. The first field in T must
// be the key, i.e., the first K bytes of T contains the key.
// total_size = sizeof(T) and thus sizeof(T) >= sizeof(K)
//
// N is the number of buckets.
//
struct utp_hash_t
{
utp_link_t N;
byte K;
byte E;
size_t count;
utp_hash_compute_t hash_compute;
utp_hash_equal_t hash_equal;
utp_link_t allocated;
utp_link_t used;
utp_link_t free;
utp_link_t inits[0];
};
#ifdef _MSC_VER
#pragma warning(default : 4200)
#endif
struct utp_hash_iterator_t
{
utp_link_t bucket;
utp_link_t elem;
utp_hash_iterator_t() : bucket(0xffffffff), elem(0xffffffff)
{
}
};
uint
utp_hash_mem(const void *keyp, size_t keysize);
uint
utp_hash_comp(const void *key_a, const void *key_b, size_t keysize);
utp_hash_t *
utp_hash_create(int N, int key_size, int total_size, int initial,
utp_hash_compute_t hashfun = utp_hash_mem,
utp_hash_equal_t eqfun = NULL);
void *
utp_hash_lookup(utp_hash_t *hash, const void *key);
void *
utp_hash_add(utp_hash_t **hashp, const void *key);
void *
utp_hash_del(utp_hash_t *hash, const void *key);
void *
utp_hash_iterate(utp_hash_t *hash, utp_hash_iterator_t *iter);
void
utp_hash_free_mem(utp_hash_t *hash);
/*
This HashTable requires that T have at least
sizeof(K)+sizeof(utp_link_t) bytes. Usually done like this:
struct K {
int whatever;
};
struct T {
K wtf;
utp_link_t link; // also wtf
};
*/
template < typename K, typename T >
class utpHashTable
{
utp_hash_t *hash;
public:
static uint
compare(const void *k1, const void *k2, ABSL_ATTRIBUTE_UNUSED size_t ks)
{
return *((K *)k1) == *((K *)k2);
}
static uint32
compute_hash(const void *k, ABSL_ATTRIBUTE_UNUSED size_t ks)
{
return ((K *)k)->compute_hash();
}
void
Init()
{
hash = NULL;
}
bool
Allocated()
{
return (hash != NULL);
}
void
Free()
{
utp_hash_free_mem(hash);
hash = NULL;
}
void
Create(int N, int initial)
{
hash = utp_hash_create(N, sizeof(K), sizeof(T), initial, &compute_hash,
&compare);
}
T *
Lookup(const K &key)
{
return (T *)utp_hash_lookup(hash, &key);
}
T *
Add(const K &key)
{
return (T *)utp_hash_add(&hash, &key);
}
T *
Delete(const K &key)
{
return (T *)utp_hash_del(hash, &key);
}
T *
Iterate(utp_hash_iterator_t &iterator)
{
return (T *)utp_hash_iterate(hash, &iterator);
}
size_t
GetCount()
{
return hash->count;
}
};
#endif //__UTP_HASH_H__

File diff suppressed because it is too large Load Diff

@ -1,161 +0,0 @@
/*
* Copyright (c) 2010-2013 BitTorrent, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#ifndef __UTP_INTERNAL_H__
#define __UTP_INTERNAL_H__
#include <stdarg.h>
#include <string.h>
#include <assert.h>
#include <stdio.h>
#include "utp.h"
#include "utp_callbacks.h"
#include "utp_templates.h"
#include "utp_hash.h"
#include "utp_hash.h"
#include "utp_packedsockaddr.h"
/* These originally lived in utp_config.h */
#define CCONTROL_TARGET (100 * 1000) // us
enum bandwidth_type_t
{
payload_bandwidth,
connect_overhead,
close_overhead,
ack_overhead,
header_overhead,
retransmit_overhead
};
#ifdef WIN32
#ifdef _MSC_VER
#include "libutp_inet_ntop.h"
#endif
// newer versions of MSVC define these in errno.h
#ifndef ECONNRESET
#define ECONNRESET WSAECONNRESET
#define EMSGSIZE WSAEMSGSIZE
#define ECONNREFUSED WSAECONNREFUSED
#define ETIMEDOUT WSAETIMEDOUT
#endif
#endif
struct PACKED_ATTRIBUTE RST_Info
{
PackedSockAddr addr;
uint32 connid;
uint16 ack_nr;
uint64 timestamp;
};
// It's really important that we don't have duplicate keys in the hash table.
// If we do, we'll eventually crash. if we try to remove the second instance
// of the key, we'll accidentally remove the first instead. then later,
// checkTimeouts will try to access the second one's already freed memory.
void
UTP_FreeAll(struct UTPSocketHT *utp_sockets);
struct UTPSocketKey
{
PackedSockAddr addr;
uint32 recv_id; // "conn_seed", "conn_id"
UTPSocketKey(const PackedSockAddr &_addr, uint32 _recv_id)
: addr(_addr)
, recv_id(_recv_id)
{
}
bool
operator==(const UTPSocketKey &other) const
{
return recv_id == other.recv_id && addr == other.addr;
}
uint32
compute_hash() const
{
return recv_id ^ addr.compute_hash();
}
};
struct UTPSocketKeyData
{
UTPSocketKey key;
UTPSocket *socket;
utp_link_t link;
};
#define UTP_SOCKET_BUCKETS 79
#define UTP_SOCKET_INIT 15
struct UTPSocketHT : utpHashTable< UTPSocketKey, UTPSocketKeyData >
{
UTPSocketHT()
{
const int buckets = UTP_SOCKET_BUCKETS;
const int initial = UTP_SOCKET_INIT;
this->Create(buckets, initial);
}
~UTPSocketHT()
{
UTP_FreeAll(this);
this->Free();
}
};
struct struct_utp_context
{
void *userdata;
utp_callback_t *callbacks[UTP_ARRAY_SIZE];
uint64 current_ms;
utp_context_stats context_stats;
UTPSocket *last_utp_socket;
Array< UTPSocket * > ack_sockets;
Array< RST_Info > rst_info;
UTPSocketHT *utp_sockets;
size_t target_delay;
size_t opt_sndbuf;
size_t opt_rcvbuf;
uint64 last_check;
struct_utp_context();
~struct_utp_context();
void
log(int level, utp_socket *socket, char const *fmt, ...);
void
log_unchecked(utp_socket *socket, char const *fmt, ...);
bool
would_log(int level);
bool log_normal : 1; // log normal events?
bool log_mtu : 1; // log MTU related events?
bool log_debug : 1; // log debugging events? (Must also compile with
// UTP_DEBUG_LOGGING defined)
};
#endif //__UTP_INTERNAL_H__

@ -1,166 +0,0 @@
// vim:set ts=4 sw=4 ai:
/*
* Copyright (c) 2010-2013 BitTorrent, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#include <string.h>
#include <assert.h>
#include <stdio.h>
#include "utp_types.h"
#include "utp_hash.h"
#include "utp_packedsockaddr.h"
#include "libutp_inet_ntop.h"
byte
PackedSockAddr::get_family() const
{
#if defined(__sh__)
return ((_sin6d[0] == 0) && (_sin6d[1] == 0)
&& (_sin6d[2] == htonl(0xffff)) != 0)
? AF_INET
: AF_INET6;
#else
return (IN6_IS_ADDR_V4MAPPED(&_in._in6addr) != 0) ? AF_INET : AF_INET6;
#endif // defined(__sh__)
}
bool
PackedSockAddr::operator==(const PackedSockAddr &rhs) const
{
if(&rhs == this)
return true;
if(_port != rhs._port)
return false;
return memcmp(_sin6, rhs._sin6, sizeof(_sin6)) == 0;
}
bool
PackedSockAddr::operator!=(const PackedSockAddr &rhs) const
{
return !(*this == rhs);
}
uint32
PackedSockAddr::compute_hash() const
{
return utp_hash_mem(&_in, sizeof(_in)) ^ _port;
}
void
PackedSockAddr::set(const SOCKADDR_STORAGE *sa, socklen_t len)
{
// on unix, the cast does nothing, socklen_t is _already_ unsigned
if(sa->ss_family == AF_INET)
{
assert((unsigned)len >= sizeof(sockaddr_in));
const sockaddr_in *sin = (sockaddr_in *)sa;
_sin6w[0] = 0;
_sin6w[1] = 0;
_sin6w[2] = 0;
_sin6w[3] = 0;
_sin6w[4] = 0;
_sin6w[5] = 0xffff;
_sin4 = sin->sin_addr.s_addr;
_port = ntohs(sin->sin_port);
}
else
{
assert((unsigned)len >= sizeof(sockaddr_in6));
const sockaddr_in6 *sin6 = (sockaddr_in6 *)sa;
_in._in6addr = sin6->sin6_addr;
_port = ntohs(sin6->sin6_port);
}
(void)len;
}
PackedSockAddr::PackedSockAddr(const SOCKADDR_STORAGE *sa, socklen_t len)
{
set(sa, len);
}
PackedSockAddr::PackedSockAddr(void)
{
SOCKADDR_STORAGE sa;
socklen_t len = sizeof(SOCKADDR_STORAGE);
memset(&sa, 0, len);
sa.ss_family = AF_INET;
set(&sa, len);
}
SOCKADDR_STORAGE
PackedSockAddr::get_sockaddr_storage(socklen_t *len = NULL) const
{
SOCKADDR_STORAGE sa;
const byte family = get_family();
if(family == AF_INET)
{
sockaddr_in *sin = (sockaddr_in *)&sa;
if(len)
*len = sizeof(sockaddr_in);
memset(sin, 0, sizeof(sockaddr_in));
sin->sin_family = family;
sin->sin_port = htons(_port);
sin->sin_addr.s_addr = _sin4;
}
else
{
sockaddr_in6 *sin6 = (sockaddr_in6 *)&sa;
memset(sin6, 0, sizeof(sockaddr_in6));
if(len)
*len = sizeof(sockaddr_in6);
sin6->sin6_family = family;
sin6->sin6_addr = _in._in6addr;
sin6->sin6_port = htons(_port);
}
return sa;
}
// #define addrfmt(x, s) x.fmt(s, sizeof(s))
cstr
PackedSockAddr::fmt(str s, size_t len) const
{
memset(s, 0, len);
const byte family = get_family();
str i;
if(family == AF_INET)
{
INET_NTOP(family, (uint32 *)&_sin4, s, len);
i = s;
while(*++i)
{
}
}
else
{
i = s;
*i++ = '[';
INET_NTOP(family, (in6_addr *)&_in._in6addr, i, len - 1);
while(*++i)
{
}
*i++ = ']';
}
snprintf(i, len - (i - s), ":%u", _port);
return s;
}

@ -1,60 +0,0 @@
/*
* Copyright (c) 2010-2013 BitTorrent, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#ifndef __UTP_PACKEDSOCKADDR_H__
#define __UTP_PACKEDSOCKADDR_H__
#include "utp_types.h"
struct PACKED_ATTRIBUTE PackedSockAddr {
// The values are always stored here in network byte order
union {
byte _in6[16]; // IPv6
uint16 _in6w[8]; // IPv6, word based (for convenience)
uint32 _in6d[4]; // Dword access
in6_addr _in6addr; // For convenience
} _in;
// Host byte order
uint16 _port;
#define _sin4 _in._in6d[3] // IPv4 is stored where it goes if mapped
#define _sin6 _in._in6
#define _sin6w _in._in6w
#define _sin6d _in._in6d
byte get_family() const;
bool operator==(const PackedSockAddr& rhs) const;
bool operator!=(const PackedSockAddr& rhs) const;
void set(const SOCKADDR_STORAGE* sa, socklen_t len);
PackedSockAddr(const SOCKADDR_STORAGE* sa, socklen_t len);
PackedSockAddr(void);
SOCKADDR_STORAGE get_sockaddr_storage(socklen_t *len) const;
cstr fmt(str s, size_t len) const;
uint32 compute_hash() const;
} ALIGNED_ATTRIBUTE(4);
#endif //__UTP_PACKEDSOCKADDR_H__

@ -1,195 +0,0 @@
/*
* Copyright (c) 2010-2013 BitTorrent, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#ifndef __TEMPLATES_H__
#define __TEMPLATES_H__
#include "utp_types.h"
#include <assert.h>
#if defined(POSIX)
/* Allow over-writing FORCEINLINE from makefile because gcc 3.4.4 for buffalo
doesn't seem to support __attribute__((always_inline)) in -O0 build
(strangely, it works in -Os build) */
#ifndef FORCEINLINE
// The always_inline attribute asks gcc to inline the function even if no optimization is being requested.
// This macro should be used exclusive-or with the inline directive (use one or the other but not both)
// since Microsoft uses __forceinline to also mean inline,
// and this code is following a Microsoft compatibility model.
// Just setting the attribute without also specifying the inline directive apparently won't inline the function,
// as evidenced by multiply-defined symbols found at link time.
#define FORCEINLINE inline __attribute__((always_inline))
#endif
#endif
// Utility templates
#undef min
#undef max
template <typename T> static inline T min(T a, T b) { if (a < b) return a; return b; }
template <typename T> static inline T max(T a, T b) { if (a > b) return a; return b; }
template <typename T> static inline T min(T a, T b, T c) { return min(min(a,b),c); }
template <typename T> static inline T max(T a, T b, T c) { return max(max(a,b),c); }
template <typename T> static inline T clamp(T v, T mi, T ma)
{
if (v > ma) v = ma;
if (v < mi) v = mi;
return v;
}
#if (defined(__SVR4) && defined(__sun))
#pragma pack(1)
#else
#pragma pack(push,1)
#endif
namespace aux
{
FORCEINLINE uint16 host_to_network(uint16 i) { return htons(i); }
FORCEINLINE uint32 host_to_network(uint32 i) { return htonl(i); }
FORCEINLINE int32 host_to_network(int32 i) { return htonl(i); }
FORCEINLINE uint16 network_to_host(uint16 i) { return ntohs(i); }
FORCEINLINE uint32 network_to_host(uint32 i) { return ntohl(i); }
FORCEINLINE int32 network_to_host(int32 i) { return ntohl(i); }
}
template <class T>
struct PACKED_ATTRIBUTE big_endian
{
T operator=(T i) { m_integer = aux::host_to_network(i); return i; }
operator T() const { return aux::network_to_host(m_integer); }
private:
T m_integer;
};
typedef big_endian<int32> int32_big;
typedef big_endian<uint32> uint32_big;
typedef big_endian<uint16> uint16_big;
#if (defined(__SVR4) && defined(__sun))
#pragma pack(0)
#else
#pragma pack(pop)
#endif
template<typename T> static inline void zeromem(T *a, size_t count = 1) { memset(a, 0, count * sizeof(T)); }
typedef int SortCompareProc(const void *, const void *);
template<typename T> static FORCEINLINE void QuickSortT(T *base, size_t num, int (*comp)(const T *, const T *)) { qsort(base, num, sizeof(T), (SortCompareProc*)comp); }
// WARNING: The template parameter MUST be a POD type!
template <typename T, size_t minsize = 16> class Array {
protected:
T *mem;
size_t alloc,count;
public:
Array(size_t init) { Init(init); }
Array() { Init(); }
~Array() { Free(); }
void inline Init() { mem = NULL; alloc = count = 0; }
void inline Init(size_t init) { Init(); if (init) Resize(init); }
size_t inline GetCount() const { return count; }
size_t inline GetAlloc() const { return alloc; }
void inline SetCount(size_t c) { count = c; }
inline T& operator[](size_t offset) { assert(offset ==0 || offset<alloc); return mem[offset]; }
inline const T& operator[](size_t offset) const { assert(offset ==0 || offset<alloc); return mem[offset]; }
void inline Resize(size_t a) {
if (a == 0) { free(mem); Init(); }
else { mem = (T*)realloc(mem, (alloc=a) * sizeof(T)); }
}
void Grow() { Resize(::max<size_t>(minsize, alloc * 2)); }
inline size_t Append(const T &t) {
if (count >= alloc) Grow();
size_t r=count++;
mem[r] = t;
return r;
}
T inline &Append() {
if (count >= alloc) Grow();
return mem[count++];
}
void inline Compact() {
Resize(count);
}
void inline Free() {
free(mem);
Init();
}
void inline Clear() {
count = 0;
}
bool inline MoveUpLast(size_t index) {
assert(index < count);
size_t c = --count;
if (index != c) {
mem[index] = mem[c];
return true;
}
return false;
}
bool inline MoveUpLastExist(const T &v) {
return MoveUpLast(LookupElementExist(v));
}
size_t inline LookupElement(const T &v) const {
for(size_t i = 0; i != count; i++)
if (mem[i] == v)
return i;
return (size_t) -1;
}
bool inline HasElement(const T &v) const {
return LookupElement(v) != -1;
}
typedef int SortCompareProc(const T *a, const T *b);
void Sort(SortCompareProc* proc, size_t start, size_t end) {
QuickSortT(&mem[start], end - start, proc);
}
void Sort(SortCompareProc* proc, size_t start) {
Sort(proc, start, count);
}
void Sort(SortCompareProc* proc) {
Sort(proc, 0, count);
}
};
#endif //__TEMPLATES_H__

@ -1,307 +0,0 @@
/*
* Copyright (c) 2010-2013 BitTorrent, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#include <stdlib.h>
#include <assert.h>
#include "utp.h"
#include "utp_types.h"
#ifdef WIN32
#ifndef WIN32_LEAN_AND_MEAN
#define WIN32_LEAN_AND_MEAN
#endif
#include <windows.h>
#include <winsock2.h>
#include <ws2tcpip.h>
#else //! WIN32
#include <time.h>
#include <sys/time.h> // Linux needs both time.h and sys/time.h
#endif
#if defined(__APPLE__)
#include <mach/mach_time.h>
#endif
#include <absl/base/attributes.h>
#include "utp_utils.h"
#ifdef WIN32
typedef ULONGLONG(WINAPI GetTickCount64Proc)(void);
static GetTickCount64Proc *pt2GetTickCount64;
static GetTickCount64Proc *pt2RealGetTickCount;
static uint64 startPerformanceCounter;
static uint64 startGetTickCount;
// MSVC 6 standard doesn't like division with uint64s
static double counterPerMicrosecond;
static uint64
UTGetTickCount64()
{
if(pt2GetTickCount64)
{
return pt2GetTickCount64();
}
if(pt2RealGetTickCount)
{
uint64 v = pt2RealGetTickCount();
// fix return value from GetTickCount
return (DWORD)v | ((v >> 0x18) & 0xFFFFFFFF00000000);
}
return (uint64)GetTickCount();
}
static void
Time_Initialize()
{
HMODULE kernel32 = GetModuleHandleA("kernel32.dll");
pt2GetTickCount64 =
(GetTickCount64Proc *)GetProcAddress(kernel32, "GetTickCount64");
// not a typo. GetTickCount actually returns 64 bits
pt2RealGetTickCount =
(GetTickCount64Proc *)GetProcAddress(kernel32, "GetTickCount");
uint64 frequency;
QueryPerformanceCounter((LARGE_INTEGER *)&startPerformanceCounter);
QueryPerformanceFrequency((LARGE_INTEGER *)&frequency);
counterPerMicrosecond = (double)frequency / 1000000.0f;
startGetTickCount = UTGetTickCount64();
}
static int64
abs64(int64 x)
{
return x < 0 ? -x : x;
}
static uint64
__GetMicroseconds()
{
static bool time_init = false;
if(!time_init)
{
time_init = true;
Time_Initialize();
}
uint64 counter;
uint64 tick;
QueryPerformanceCounter((LARGE_INTEGER *)&counter);
tick = UTGetTickCount64();
// unfortunately, QueryPerformanceCounter is not guaranteed
// to be monotonic. Make it so.
int64 ret = (int64)(((int64)counter - (int64)startPerformanceCounter)
/ counterPerMicrosecond);
// if the QPC clock leaps more than one second off GetTickCount64()
// something is seriously fishy. Adjust QPC to stay monotonic
int64 tick_diff = tick - startGetTickCount;
if(abs64(ret / 100000 - tick_diff / 100) > 10)
{
startPerformanceCounter -=
(uint64)((int64)(tick_diff * 1000 - ret) * counterPerMicrosecond);
ret = (int64)((counter - startPerformanceCounter) / counterPerMicrosecond);
}
return ret;
}
static inline uint64
UTP_GetMilliseconds()
{
return GetTickCount();
}
#else //! WIN32
static inline uint64
UTP_GetMicroseconds(void);
static inline uint64
UTP_GetMilliseconds()
{
return UTP_GetMicroseconds() / 1000;
}
#if defined(__APPLE__)
static uint64
__GetMicroseconds()
{
// http://developer.apple.com/mac/library/qa/qa2004/qa1398.html
// http://www.macresearch.org/tutorial_performance_and_time
static mach_timebase_info_data_t sTimebaseInfo;
static uint64_t start_tick = 0;
uint64_t tick;
// Returns a counter in some fraction of a nanoseconds
tick = mach_absolute_time();
if(sTimebaseInfo.denom == 0)
{
// Get the timer ratio to convert mach_absolute_time to nanoseconds
mach_timebase_info(&sTimebaseInfo);
start_tick = tick;
}
// Calculate the elapsed time, convert it to microseconds and return it.
return ((tick - start_tick) * sTimebaseInfo.numer)
/ (sTimebaseInfo.denom * 1000);
}
#else // !__APPLE__
// While _POSIX_TIMERS == -1 in openbsd, clock_gettime(2) _does_ support monotonic.
// this is true for all the BSDs
#if !(__OpenBSD__ || __NetBSD__ || __FreeBSD__)
#if !(defined(_POSIX_TIMERS) && _POSIX_TIMERS > 0 && defined(CLOCK_MONOTONIC))
#warning "Using non-monotonic function gettimeofday() in UTP_GetMicroseconds()"
#endif
#endif
/* Unfortunately, #ifdef CLOCK_MONOTONIC is not enough to make sure that
POSIX clocks work -- we could be running a recent libc with an ancient
kernel (think OpenWRT). -- jch */
static uint64_t
__GetMicroseconds()
{
struct timeval tv;
#if defined(CLOCK_MONOTONIC)
static int have_posix_clocks = -1;
int rc;
if(have_posix_clocks < 0)
{
struct timespec ts;
rc = clock_gettime(CLOCK_MONOTONIC, &ts);
if(rc < 0)
{
have_posix_clocks = 0;
}
else
{
have_posix_clocks = 1;
}
}
if(have_posix_clocks)
{
struct timespec ts;
rc = clock_gettime(CLOCK_MONOTONIC, &ts);
return uint64(ts.tv_sec) * 1000000 + uint64(ts.tv_nsec) / 1000;
}
#endif
gettimeofday(&tv, NULL);
return uint64(tv.tv_sec) * 1000000 + tv.tv_usec;
}
#endif //!__APPLE__
#endif //! WIN32
/*
* Whew. Okay. After that #ifdef maze above, we now know we have a working
* __GetMicroseconds() implementation on all platforms.
*
* Because there are a number of assertions in libutp that will cause a crash
* if monotonic time isn't monotonic, now apply some safety checks. While in
* principle we're already protecting ourselves in cases where non-monotonic
* time is likely to happen, this protects all versions.
*/
static inline uint64
UTP_GetMicroseconds()
{
static uint64 offset = 0, previous = 0;
uint64 now = __GetMicroseconds() + offset;
if(previous > now)
{
/* Eek! */
offset += previous - now;
now = previous;
}
previous = now;
return now;
}
#define ETHERNET_MTU 1500
#define IPV4_HEADER_SIZE 20
#define IPV6_HEADER_SIZE 40
#define UDP_HEADER_SIZE 8
#define GRE_HEADER_SIZE 24
#define PPPOE_HEADER_SIZE 8
#define MPPE_HEADER_SIZE 2
// packets have been observed in the wild that were fragmented
// with a payload of 1416 for the first fragment
// There are reports of routers that have MTU sizes as small as 1392
#define FUDGE_HEADER_SIZE 36
#define TEREDO_MTU 1280
#define UDP_IPV4_OVERHEAD (IPV4_HEADER_SIZE + UDP_HEADER_SIZE)
#define UDP_IPV6_OVERHEAD (IPV6_HEADER_SIZE + UDP_HEADER_SIZE)
#define UDP_TEREDO_OVERHEAD (UDP_IPV4_OVERHEAD + UDP_IPV6_OVERHEAD)
#define UDP_IPV4_MTU \
(ETHERNET_MTU - IPV4_HEADER_SIZE - UDP_HEADER_SIZE - GRE_HEADER_SIZE \
- PPPOE_HEADER_SIZE - MPPE_HEADER_SIZE - FUDGE_HEADER_SIZE)
#define UDP_IPV6_MTU \
(ETHERNET_MTU - IPV6_HEADER_SIZE - UDP_HEADER_SIZE - GRE_HEADER_SIZE \
- PPPOE_HEADER_SIZE - MPPE_HEADER_SIZE - FUDGE_HEADER_SIZE)
#define UDP_TEREDO_MTU (TEREDO_MTU - IPV6_HEADER_SIZE - UDP_HEADER_SIZE)
uint64
utp_default_get_udp_mtu(utp_callback_arguments *args)
{
// Since we don't know the local address of the interface,
// be conservative and assume all IPv6 connections are Teredo.
return (args->address->sa_family == AF_INET6) ? UDP_TEREDO_MTU : UDP_IPV4_MTU;
}
uint64
utp_default_get_udp_overhead(utp_callback_arguments *args)
{
// Since we don't know the local address of the interface,
// be conservative and assume all IPv6 connections are Teredo.
return (args->address->sa_family == AF_INET6) ? UDP_TEREDO_OVERHEAD
: UDP_IPV4_OVERHEAD;
}
uint64
utp_default_get_random(ABSL_ATTRIBUTE_UNUSED utp_callback_arguments *args)
{
return rand();
}
uint64
utp_default_get_milliseconds(ABSL_ATTRIBUTE_UNUSED
utp_callback_arguments *args)
{
return UTP_GetMilliseconds();
}
uint64
utp_default_get_microseconds(ABSL_ATTRIBUTE_UNUSED
utp_callback_arguments *args)
{
return UTP_GetMicroseconds();
}

@ -1,27 +0,0 @@
/*
* Copyright (c) 2010-2013 BitTorrent, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
uint64 utp_default_get_udp_mtu(utp_callback_arguments *args);
uint64 utp_default_get_udp_overhead(utp_callback_arguments *args);
uint64 utp_default_get_random(utp_callback_arguments *args);
uint64 utp_default_get_milliseconds(utp_callback_arguments *args);
uint64 utp_default_get_microseconds(utp_callback_arguments *args);

@ -105,7 +105,7 @@ else()
endif(WIN32)
add_library(${PLATFORM_LIB} STATIC ${LIB_PLATFORM_SRC})
target_link_libraries(${PLATFORM_LIB} PUBLIC ${CRYPTOGRAPHY_LIB} ${UTIL_LIB} libutp Threads::Threads ${LIBS})
target_link_libraries(${PLATFORM_LIB} PUBLIC ${CRYPTOGRAPHY_LIB} ${UTIL_LIB} Threads::Threads ${LIBS})
if(${CMAKE_SYSTEM_NAME} MATCHES "Linux")
if(NON_PC_TARGET)
@ -247,17 +247,12 @@ set(LIB_SRC
service/tag_lookup_job.cpp
service/tag.cpp
service/vanity.cpp
utp/inbound_message.cpp
utp/linklayer.cpp
utp/session.cpp
utp/utp.cpp
)
if(TESTNET)
set(LIB_SRC ${LIB_SRC} testnet.c)
endif()
add_library(${STATIC_LIB} STATIC ${LIB_SRC})
set(LIBS ${LIBS} libutp)
target_link_libraries(${STATIC_LIB} PUBLIC cxxopts ${ABYSS_LIB} ${PLATFORM_LIB} ${UTIL_LIB} ${CRYPTOGRAPHY_LIB} ${FS_LIB})
if(${CMAKE_SYSTEM_NAME} MATCHES "FreeBSD")

@ -1,14 +1,11 @@
#include <link/factory.hpp>
#include <iwp/iwp.hpp>
#include <utp/utp.hpp>
namespace llarp
{
LinkFactory::LinkType
LinkFactory::TypeFromName(string_view str)
{
if(str == "utp")
return LinkType::eLinkUTP;
if(str == "iwp")
return LinkType::eLinkIWP;
if(str == "mempipe")
@ -21,8 +18,6 @@ namespace llarp
{
switch(tp)
{
case LinkType::eLinkUTP:
return "utp";
case LinkType::eLinkIWP:
return "iwp";
case LinkType::eLinkMempipe:
@ -37,10 +32,6 @@ namespace llarp
{
switch(tp)
{
case LinkType::eLinkUTP:
if(permitInbound)
return llarp::utp::NewInboundLink;
return llarp::utp::NewOutboundLink;
case LinkType::eLinkIWP:
if(permitInbound)
return llarp::iwp::NewInboundLink;

@ -21,7 +21,7 @@
#include <util/meta/memfn.hpp>
#include <util/metrics/metrics.hpp>
#include <util/str.hpp>
#include <utp/utp.hpp>
#include <ev/ev.hpp>
#include <fstream>
#include <cstdlib>

@ -1,27 +0,0 @@
#include <utp/inbound_message.hpp>
#include <cstring>
namespace llarp
{
namespace utp
{
bool
InboundMessage::IsExpired(llarp_time_t now) const
{
return now > lastActive && now - lastActive >= 2000;
}
bool
InboundMessage::AppendData(const byte_t* ptr, uint16_t sz)
{
if(buffer.size_left() < sz)
return false;
std::copy_n(ptr, sz, buffer.cur);
buffer.cur += sz;
return true;
}
} // namespace utp
} // namespace llarp

@ -1,86 +0,0 @@
#ifndef LLARP_UTP_INBOUND_MESSAGE_HPP
#define LLARP_UTP_INBOUND_MESSAGE_HPP
#include <constants/link_layer.hpp>
#include <util/aligned.hpp>
#include <util/types.hpp>
#include <utp_types.h> // for uint32
#include <cstring>
namespace llarp
{
namespace utp
{
/// size of keyed hash
constexpr size_t FragmentHashSize = 32;
/// size of outer nonce
constexpr size_t FragmentNonceSize = 32;
/// size of outer overhead
constexpr size_t FragmentOverheadSize =
FragmentHashSize + FragmentNonceSize;
/// max fragment payload size
constexpr size_t FragmentBodyPayloadSize = 512;
/// size of inner nonce
constexpr size_t FragmentBodyNonceSize = 24;
/// size of fragment body overhead
constexpr size_t FragmentBodyOverhead = FragmentBodyNonceSize
+ sizeof(uint32) + sizeof(uint16_t) + sizeof(uint16_t);
/// size of fragment body
constexpr size_t FragmentBodySize =
FragmentBodyOverhead + FragmentBodyPayloadSize;
/// size of fragment
constexpr size_t FragmentBufferSize =
FragmentOverheadSize + FragmentBodySize;
static_assert(FragmentBufferSize == 608, "Fragment Buffer Size is not 608");
/// buffer for a single utp fragment
using FragmentBuffer = AlignedBuffer< FragmentBufferSize >;
/// buffer for a link layer message
using MessageBuffer = AlignedBuffer< MAX_LINK_MSG_SIZE >;
/// pending inbound message being received
struct InboundMessage
{
/// timestamp of last activity
llarp_time_t lastActive{0};
/// the underlying message buffer
MessageBuffer _msg;
/// for accessing message buffer
llarp_buffer_t buffer;
/// return true if this inbound message can be removed due to expiration
bool
IsExpired(llarp_time_t now) const;
/// append data at ptr of size sz bytes to message buffer
/// increment current position
/// return false if we don't have enough room
/// return true on success
bool
AppendData(const byte_t* ptr, uint16_t sz);
InboundMessage() : _msg(), buffer(_msg)
{
}
InboundMessage(const InboundMessage& other)
: lastActive(other.lastActive), _msg(other._msg), buffer(_msg)
{
}
};
inline bool
operator==(const InboundMessage& lhs, const InboundMessage& rhs)
{
return lhs.buffer.base == rhs.buffer.base;
}
} // namespace utp
} // namespace llarp
#endif

@ -1,341 +0,0 @@
#include <utp/linklayer.hpp>
#include <utp/session.hpp>
#ifdef __linux__
#include <linux/errqueue.h>
#include <netinet/ip_icmp.h>
#endif
#ifdef _WIN32
#include <winsock2.h>
#include <ws2tcpip.h>
#include <wspiapi.h>
#endif
#ifndef IP_DONTFRAGMENT
#define IP_DONTFRAGMENT IP_DONTFRAG
#endif
#include <functional>
#include <cstring>
namespace llarp
{
namespace utp
{
uint64
LinkLayer::OnConnect(utp_callback_arguments* arg)
{
auto* l =
static_cast< LinkLayer* >(utp_context_get_userdata(arg->context));
auto* session = static_cast< Session* >(utp_get_userdata(arg->socket));
if(session && l)
session->OutboundLinkEstablished(l);
return 0;
}
uint64
LinkLayer::SendTo(utp_callback_arguments* arg)
{
auto* l =
static_cast< LinkLayer* >(utp_context_get_userdata(arg->context));
if(l == nullptr)
return 0;
LogDebug("utp_sendto ", Addr(*arg->address), " ", arg->len, " bytes");
return l->m_udp.sendto(&l->m_udp, arg->address, arg->buf, arg->len);
}
uint64
LinkLayer::OnError(utp_callback_arguments* arg)
{
auto* session = static_cast< Session* >(utp_get_userdata(arg->socket));
auto* link =
static_cast< LinkLayer* >(utp_context_get_userdata(arg->context));
if(session && link)
{
link->HandleTimeout(session);
session->Close();
}
return 0;
}
uint64
LinkLayer::OnLog(utp_callback_arguments* arg)
{
LogDebug(arg->buf);
return 0;
}
LinkLayer::LinkLayer(const SecretKey& routerEncSecret, GetRCFunc getrc,
LinkMessageHandler h, SignBufferFunc sign,
SessionEstablishedHandler established,
SessionRenegotiateHandler reneg,
TimeoutHandler timeout, SessionClosedHandler closed,
bool permitInbound)
: ILinkLayer(routerEncSecret, getrc, h, sign, established, reneg,
timeout, closed)
{
_utp_ctx = utp_init(2);
utp_context_set_userdata(_utp_ctx, this);
utp_set_callback(_utp_ctx, UTP_SENDTO, &LinkLayer::SendTo);
if(permitInbound)
utp_set_callback(_utp_ctx, UTP_ON_ACCEPT, &LinkLayer::OnAccept);
utp_set_callback(_utp_ctx, UTP_ON_CONNECT, &LinkLayer::OnConnect);
utp_set_callback(_utp_ctx, UTP_ON_STATE_CHANGE,
&LinkLayer::OnStateChange);
utp_set_callback(_utp_ctx, UTP_ON_READ, &LinkLayer::OnRead);
utp_set_callback(_utp_ctx, UTP_ON_ERROR, &LinkLayer::OnError);
utp_set_callback(_utp_ctx, UTP_LOG, &LinkLayer::OnLog);
utp_context_set_option(_utp_ctx, UTP_LOG_NORMAL, 1);
utp_context_set_option(_utp_ctx, UTP_LOG_MTU, 1);
utp_context_set_option(_utp_ctx, UTP_LOG_DEBUG, 1);
utp_context_set_option(
_utp_ctx, UTP_SNDBUF,
(MAX_LINK_MSG_SIZE * MaxSendQueueSize * size_t{3}) / size_t{2});
utp_context_set_option(
_utp_ctx, UTP_RCVBUF,
(MAX_LINK_MSG_SIZE * MaxSendQueueSize * size_t{3}) / size_t{2});
}
LinkLayer::~LinkLayer()
{
m_Pending.clear();
m_AuthedLinks.clear();
utp_destroy(_utp_ctx);
}
uint16_t
LinkLayer::Rank() const
{
return 1;
}
void
LinkLayer::RecvFrom(const Addr& from, const void* buf, size_t sz)
{
utp_process_udp(_utp_ctx, (const byte_t*)buf, sz, from, from.SockLen());
}
#ifdef __linux__
void
LinkLayer::ProcessICMP()
{
#ifndef TESTNET
do
{
byte_t vec_buf[4096], ancillary_buf[4096];
struct iovec iov = {vec_buf, sizeof(vec_buf)};
struct sockaddr_in remote;
struct msghdr msg;
ssize_t len;
struct cmsghdr* cmsg;
struct sock_extended_err* e;
struct sockaddr* icmp_addr;
struct sockaddr_in* icmp_sin;
memset(&msg, 0, sizeof(msg));
msg.msg_name = &remote;
msg.msg_namelen = sizeof(remote);
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
msg.msg_flags = 0;
msg.msg_control = ancillary_buf;
msg.msg_controllen = sizeof(ancillary_buf);
len = recvmsg(m_udp.fd, &msg, MSG_ERRQUEUE | MSG_DONTWAIT);
if(len < 0)
{
if(errno == EAGAIN || errno == EWOULDBLOCK)
errno = 0;
else
LogError("failed to read icmp for utp ", strerror(errno));
return;
}
for(cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg))
{
if(cmsg->cmsg_type != IP_RECVERR)
{
continue;
}
if(cmsg->cmsg_level != SOL_IP)
{
continue;
}
e = (struct sock_extended_err*)CMSG_DATA(cmsg);
if(!e)
continue;
if(e->ee_origin != SO_EE_ORIGIN_ICMP)
{
continue;
}
icmp_addr = (struct sockaddr*)SO_EE_OFFENDER(e);
icmp_sin = (struct sockaddr_in*)icmp_addr;
if(icmp_sin->sin_port != 0)
{
continue;
}
if(e->ee_type == 3 && e->ee_code == 4)
{
utp_process_icmp_fragmentation(_utp_ctx, vec_buf, len,
(struct sockaddr*)&remote,
sizeof(remote), e->ee_info);
}
else
{
utp_process_icmp_error(_utp_ctx, vec_buf, len,
(struct sockaddr*)&remote, sizeof(remote));
}
}
} while(true);
#endif
}
#endif
void
LinkLayer::Pump()
{
#ifdef __linux__
ProcessICMP();
#endif
utp_issue_deferred_acks(_utp_ctx);
std::set< RouterID > sessions;
{
Lock l(&m_AuthedLinksMutex);
auto itr = m_AuthedLinks.begin();
while(itr != m_AuthedLinks.end())
{
sessions.insert(itr->first);
++itr;
}
}
ILinkLayer::Pump();
{
Lock l(&m_AuthedLinksMutex);
for(const auto& pk : sessions)
{
if(m_AuthedLinks.count(pk) == 0)
{
// all sessions were removed
SessionClosed(pk);
}
}
}
}
void
LinkLayer::Stop()
{
ForEachSession([](ILinkSession* s) { s->Close(); });
}
bool
LinkLayer::KeyGen(SecretKey& k)
{
CryptoManager::instance()->encryption_keygen(k);
return true;
}
void
LinkLayer::Tick(llarp_time_t now)
{
utp_check_timeouts(_utp_ctx);
ILinkLayer::Tick(now);
}
utp_socket*
LinkLayer::NewSocket()
{
return utp_create_socket(_utp_ctx);
}
const char*
LinkLayer::Name() const
{
return "utp";
}
std::shared_ptr< ILinkSession >
LinkLayer::NewOutboundSession(const RouterContact& rc,
const AddressInfo& addr)
{
return std::make_shared< OutboundSession >(this, NewSocket(), rc, addr);
}
uint64
LinkLayer::OnRead(utp_callback_arguments* arg)
{
auto* self = static_cast< Session* >(utp_get_userdata(arg->socket));
if(self)
{
if(self->state == Session::eClose)
{
return 0;
}
if(!self->Recv(arg->buf, arg->len))
{
LogDebug("recv fail for ", self->remoteAddr);
self->Close();
return 0;
}
utp_read_drained(arg->socket);
}
else
{
LogWarn("utp_socket got data with no underlying session");
utp_close(arg->socket);
}
return 0;
}
uint64
LinkLayer::OnStateChange(utp_callback_arguments* arg)
{
auto* session = static_cast< Session* >(utp_get_userdata(arg->socket));
if(session)
{
if(arg->state == UTP_STATE_WRITABLE)
{
session->Pump();
}
else if(arg->state == UTP_STATE_EOF)
{
LogDebug("got eof from ", session->remoteAddr);
session->Close();
}
}
return 0;
}
uint64
LinkLayer::OnAccept(utp_callback_arguments* arg)
{
auto* self =
static_cast< LinkLayer* >(utp_context_get_userdata(arg->context));
Addr remote(*arg->address);
std::shared_ptr< ILinkSession > session =
std::make_shared< InboundSession >(self, arg->socket, remote);
if(!self->PutSession(session))
{
LogWarn("dropping inbound utp session from ", remote);
// close later
self->m_Logic->call_later(50, [=]() { session->Close(); });
}
else
{
LogDebug("utp accepted from ", remote);
session->OnLinkEstablished(self);
}
return 0;
}
} // namespace utp
} // namespace llarp

@ -1,105 +0,0 @@
#ifndef LLARP_UTP_LINKLAYER_HPP
#define LLARP_UTP_LINKLAYER_HPP
#include <utp/inbound_message.hpp>
#include <crypto/crypto.hpp>
#include <crypto/types.hpp>
#include <link/server.hpp>
#include <utp.h>
#include <deque>
namespace llarp
{
namespace utp
{
struct LinkLayer final : public ILinkLayer
{
utp_context* _utp_ctx = nullptr;
// low level read callback
static uint64
OnRead(utp_callback_arguments* arg);
// low level sendto callback
static uint64
SendTo(utp_callback_arguments* arg);
/// error callback
static uint64
OnError(utp_callback_arguments* arg);
/// state change callback
static uint64
OnStateChange(utp_callback_arguments*);
static uint64
OnConnect(utp_callback_arguments*);
/// accept callback
static uint64
OnAccept(utp_callback_arguments*);
/// logger callback
static uint64
OnLog(utp_callback_arguments* arg);
/// construct
LinkLayer(const SecretKey& routerEncSecret, GetRCFunc getrc,
LinkMessageHandler h, SignBufferFunc sign,
SessionEstablishedHandler established,
SessionRenegotiateHandler reneg, TimeoutHandler timeout,
SessionClosedHandler closed, bool acceptInbound);
/// destruct
~LinkLayer() override;
/// get AI rank
uint16_t
Rank() const override;
/// handle low level recv
void
RecvFrom(const Addr& from, const void* buf, size_t sz) override;
#ifdef __linux__
/// process ICMP stuff on linux
void
ProcessICMP();
#endif
/// pump sessions
void
Pump() override;
/// stop link layer
void
Stop() override;
/// regenerate transport keypair
bool
KeyGen(SecretKey& k) override;
/// do tick
void
Tick(llarp_time_t now);
/// create new outbound session
std::shared_ptr< ILinkSession >
NewOutboundSession(const RouterContact& rc,
const AddressInfo& addr) override;
/// create new socket
utp_socket*
NewSocket();
/// get ai name
const char*
Name() const override;
};
} // namespace utp
} // namespace llarp
#endif

@ -1,810 +0,0 @@
#include <utp/session.hpp>
#include <utp/linklayer.hpp>
#include <messages/discard.hpp>
#include <messages/link_intro.hpp>
#include <util/metrics/metrics.hpp>
#include <util/meta/memfn.hpp>
namespace llarp
{
namespace utp
{
void
Session::OnLinkEstablished(ILinkLayer* p)
{
parent = p;
EnterState(eLinkEstablished);
LogDebug("link established with ", remoteAddr);
}
/// pump tx queue
void
Session::PumpWrite(size_t numSend)
{
if(!sock)
return;
ssize_t expect = 0;
std::vector< utp_iovec > send;
for(const auto& msg : sendq)
{
for(const auto& vec : msg.vecs)
{
if(vec.iov_len > 0)
{
expect += vec.iov_len;
send.emplace_back(vec);
numSend--;
}
}
}
if(expect)
{
ssize_t s = utp_writev(sock, send.data(), send.size());
if(s <= 0)
return;
lastSend = parent->Now();
metrics::integerTick("utp.session.tx", "writes", s, "id",
RouterID(remoteRC.pubkey).ToString());
m_TXRate += s;
size_t sz = s;
do
{
auto& msg = sendq.front();
while(msg.vecs.size() > 0 && sz >= msg.vecs.front().iov_len)
{
sz -= msg.vecs.front().iov_len;
msg.vecs.pop_front();
}
if(msg.vecs.size() == 0)
{
msg.Delivered();
sendq.pop_front();
}
else if(sz)
{
auto& front = msg.vecs.front();
front.iov_len -= sz;
front.iov_base = ((byte_t*)front.iov_base) + sz;
return;
}
else
return;
} while(sendq.size());
}
}
/// prune expired inbound messages
void
Session::PruneInboundMessages(llarp_time_t now)
{
auto itr = m_RecvMsgs.begin();
while(itr != m_RecvMsgs.end())
{
if(itr->second.IsExpired(now))
{
itr = m_RecvMsgs.erase(itr);
}
else
++itr;
}
}
void
Session::OutboundLinkEstablished(LinkLayer* p)
{
OnLinkEstablished(p);
metrics::integerTick("utp.session.open", "to", 1, "id",
RouterID(remoteRC.pubkey).ToString());
OutboundHandshake();
}
template < bool (Crypto::*dh_func)(SharedSecret&, const PubKey&,
const SecretKey&, const TunnelNonce&) >
bool
Session::DoKeyExchange(SharedSecret& K, const KeyExchangeNonce& n,
const PubKey& other, const SecretKey& secret)
{
ShortHash t_h;
static constexpr size_t TMP_SIZE = 64;
static_assert(SharedSecret::SIZE + KeyExchangeNonce::SIZE == TMP_SIZE,
"Invalid sizes");
AlignedBuffer< TMP_SIZE > tmp;
std::copy(K.begin(), K.end(), tmp.begin());
std::copy(n.begin(), n.end(), tmp.begin() + K.size());
// t_h = HS(K + L.n)
if(!CryptoManager::instance()->shorthash(t_h, llarp_buffer_t(tmp)))
{
LogError("failed to mix key to ", remoteAddr);
return false;
}
// K = TKE(a.p, B_a.e, sk, t_h)
if(!(CryptoManager::instance()->*dh_func)(K, other, secret, t_h))
{
LogError("key exchange with ", other, " failed");
return false;
}
LogDebug("keys mixed with session to ", remoteAddr);
return true;
}
bool
Session::MutateKey(SharedSecret& K, const AlignedBuffer< 24 >& A)
{
AlignedBuffer< 56 > tmp;
llarp_buffer_t buf{tmp};
std::copy(K.begin(), K.end(), buf.cur);
buf.cur += K.size();
std::copy(A.begin(), A.end(), buf.cur);
buf.cur = buf.base;
return CryptoManager::instance()->shorthash(K, buf);
}
void
Session::Tick(llarp_time_t now)
{
PruneInboundMessages(now);
// ensure that this section is called every 1s or so
if(now > m_LastTick && now - m_LastTick >= 1000)
{
m_TXRate = 0;
m_RXRate = 0;
metrics::integerTick("utp.session.sendq", "size", sendq.size(), "id",
RouterID(remoteRC.pubkey).ToString());
m_LastTick = now;
}
else
{
// try sending 1 segment
PumpWrite(1);
}
}
/// low level read
bool
Session::Recv(const byte_t* buf, size_t sz)
{
// mark we are alive
Alive();
m_RXRate += sz;
size_t s = sz;
metrics::integerTick("utp.session.rx", "size", s, "id",
RouterID(remoteRC.pubkey).ToString());
// process leftovers
if(recvBufOffset)
{
auto left = FragmentBufferSize - recvBufOffset;
if(s >= left)
{
// yes it fills it
LogDebug("process leftovers, offset=", recvBufOffset, " sz=", s,
" left=", left);
std::copy(buf, buf + left, recvBuf.begin() + recvBufOffset);
s -= left;
recvBufOffset = 0;
buf += left;
if(!VerifyThenDecrypt(recvBuf.data()))
return false;
}
}
// process full fragments
while(s >= FragmentBufferSize)
{
recvBufOffset = 0;
LogDebug("process full sz=", s);
if(!VerifyThenDecrypt(buf))
return false;
buf += FragmentBufferSize;
s -= FragmentBufferSize;
}
if(s)
{
// hold onto leftovers
LogDebug("leftovers sz=", s);
std::copy(buf, buf + s, recvBuf.begin() + recvBufOffset);
recvBufOffset += s;
}
return true;
}
bool
Session::TimedOut(llarp_time_t now) const
{
if(state == eClose)
return true;
if(state == eConnecting)
return now - lastActive > 5000;
if(state == eSessionReady)
{
const bool remoteIsSNode = remoteRC.IsPublicRouter();
const bool weAreSnode = parent->GetOurRC().IsPublicRouter();
const bool recvTimeout =
(now > lastActive) && now - lastActive > sessionTimeout;
const bool sendTimeout =
(now > lastSend) && now - lastSend > sessionTimeout;
if(remoteIsSNode && weAreSnode)
{
/// for s2s connections only check write direction
return sendTimeout;
}
else if(weAreSnode)
{
// for edge connection as service node check either direction for
// timeout
return recvTimeout || sendTimeout;
}
else
{
/// for edge connections as client we check if both directions have
/// been silent for 60s
return recvTimeout && sendTimeout;
}
}
if(state == eLinkEstablished)
return now - lastActive
> 10000; /// 10 second timeout for the whole handshake
return true;
}
PubKey
Session::GetPubKey() const
{
return remoteRC.pubkey;
}
Addr
Session::GetRemoteEndpoint() const
{
return remoteAddr;
}
/// base constructor
Session::Session(LinkLayer* p)
{
state = eInitial;
m_NextTXMsgID = 0;
m_NextRXMsgID = 0;
parent = p;
remoteTransportPubKey.Zero();
gotLIM = false;
recvBufOffset = 0;
lastActive = parent->Now();
}
bool
Session::ShouldPing() const
{
if(state != eSessionReady)
return false;
const auto dlt = parent->Now() - lastSend;
return dlt >= 10000;
}
ILinkLayer*
Session::GetLinkLayer() const
{
return parent;
}
void
Session::Pump()
{
if(sendq.size() > (MaxSendQueueSize / 4))
PumpWrite(sendq.size() / 2);
else
PumpWrite(sendq.size());
// prune inbound messages
PruneInboundMessages(parent->Now());
}
bool
Session::SendMessageBuffer(
const llarp_buffer_t& buf,
ILinkSession::CompletionHandler completionHandler)
{
if(SendQueueBacklog() >= MaxSendQueueSize)
{
// pump write queue if we seem to be full
PumpWrite(MaxSendQueueSize / 2);
}
if(SendQueueBacklog() >= MaxSendQueueSize)
{
// we didn't pump anything wtf
// this means we're stalled
return false;
}
size_t sz = buf.sz;
byte_t* ptr = buf.base;
uint32_t msgid = m_NextTXMsgID++;
sendq.emplace_back(msgid, completionHandler);
while(sz)
{
uint32_t s = std::min(FragmentBodyPayloadSize, sz);
if(!EncryptThenHash(ptr, msgid, s, sz - s))
{
LogError("EncryptThenHash failed?!");
Close();
return false;
}
LogDebug("encrypted ", s, " bytes");
ptr += s;
sz -= s;
}
if(state != eSessionReady)
PumpWrite(sendq.size());
return true;
}
bool
Session::SendKeepAlive()
{
if(ShouldPing())
{
DiscardMessage msg;
std::array< byte_t, 128 > tmp;
llarp_buffer_t buf(tmp);
if(!msg.BEncode(&buf))
return false;
buf.sz = buf.cur - buf.base;
buf.cur = buf.base;
return this->SendMessageBuffer(buf, nullptr);
}
return true;
}
void
Session::OutboundHandshake()
{
std::array< byte_t, LinkIntroMessage::MaxSize > tmp;
llarp_buffer_t buf(tmp);
// build our RC
LinkIntroMessage msg;
msg.rc = parent->GetOurRC();
if(!msg.rc.Verify(parent->Now()))
{
LogError("our RC is invalid? closing session to", remoteAddr);
Close();
return;
}
msg.N.Randomize();
msg.P = DefaultLinkSessionLifetime;
if(!msg.Sign(parent->Sign))
{
LogError("failed to sign LIM for outbound handshake to ", remoteAddr);
Close();
return;
}
// encode
if(!msg.BEncode(&buf))
{
LogError("failed to encode LIM for handshake to ", remoteAddr);
Close();
return;
}
// rewind
buf.sz = buf.cur - buf.base;
buf.cur = buf.base;
// send
if(!SendMessageBuffer(buf, nullptr))
{
LogError("failed to send handshake to ", remoteAddr);
Close();
return;
}
if(!DoClientKeyExchange(txKey, msg.N, remoteTransportPubKey,
parent->RouterEncryptionSecret()))
{
LogError("failed to mix keys for outbound session to ", remoteAddr);
Close();
return;
}
}
Session::~Session()
{
if(sock)
{
utp_set_userdata(sock, nullptr);
sock = nullptr;
}
}
bool
Session::EncryptThenHash(const byte_t* ptr, uint32_t msgid, uint16_t length,
uint16_t remaining)
{
auto& msg = sendq.back();
msg.vecs.emplace_back();
auto& vec = msg.vecs.back();
msg.fragments.emplace_back();
auto& buf = msg.fragments.back();
vec.iov_base = buf.data();
vec.iov_len = FragmentBufferSize;
buf.Randomize();
byte_t* noncePtr = buf.data() + FragmentHashSize;
byte_t* body = noncePtr + FragmentNonceSize;
byte_t* base = body;
AlignedBuffer< 24 > A(base);
// skip inner nonce
body += A.size();
// put msgid
htobe32buf(body, msgid);
body += sizeof(uint32_t);
// put length
htobe16buf(body, length);
body += sizeof(uint16_t);
// put remaining
htobe16buf(body, remaining);
body += sizeof(uint16_t);
// put body
memcpy(body, ptr, length);
llarp_buffer_t payload(base, base,
FragmentBufferSize - FragmentOverheadSize);
TunnelNonce nonce(noncePtr);
// encrypt
if(!CryptoManager::instance()->xchacha20(payload, txKey, nonce))
return false;
payload.base = noncePtr;
payload.cur = payload.base;
payload.sz = FragmentBufferSize - FragmentHashSize;
// key'd hash
if(!CryptoManager::instance()->hmac(buf.data(), payload, txKey))
return false;
return MutateKey(txKey, A);
}
void
Session::EnterState(State st)
{
state = st;
if(st != eClose)
{
Alive();
}
if(st == eSessionReady)
{
parent->MapAddr(remoteRC.pubkey.as_array(), this);
if(!parent->SessionEstablished(this))
Close();
}
}
util::StatusObject
Session::ExtractStatus() const
{
return {{"client", !remoteRC.IsPublicRouter()},
{"sendBacklog", uint64_t(SendQueueBacklog())},
{"tx", m_TXRate},
{"rx", m_RXRate},
{"remoteAddr", remoteAddr.ToString()},
{"pubkey", remoteRC.pubkey.ToHex()}};
}
bool
Session::GotSessionRenegotiate(const LinkIntroMessage* msg)
{
// check with parent and possibly process and store new rc
if(!parent->SessionRenegotiate(msg->rc, remoteRC))
{
// failed to renegotiate
Close();
return false;
}
// set remote rc
remoteRC = msg->rc;
// recalculate rx key
return DoServerKeyExchange(rxKey, msg->N, remoteRC.enckey,
parent->RouterEncryptionSecret());
}
bool
Session::RenegotiateSession()
{
LinkIntroMessage lim;
lim.rc = parent->GetOurRC();
lim.N.Randomize();
lim.P = 60 * 1000 * 10;
if(!lim.Sign(parent->Sign))
return false;
std::array< byte_t, LinkIntroMessage::MaxSize > tmp;
llarp_buffer_t buf(tmp);
if(!lim.BEncode(&buf))
return false;
// rewind and resize buffer
buf.sz = buf.cur - buf.base;
buf.cur = buf.base;
// send message
if(!SendMessageBuffer(buf, nullptr))
return false;
// regen our tx Key
return DoClientKeyExchange(txKey, lim.N, remoteRC.enckey,
parent->RouterEncryptionSecret());
}
bool
Session::VerifyThenDecrypt(const byte_t* ptr)
{
LogDebug("verify then decrypt ", remoteAddr);
ShortHash digest;
llarp_buffer_t hbuf(ptr + FragmentHashSize,
FragmentBufferSize - FragmentHashSize);
if(!CryptoManager::instance()->hmac(digest.data(), hbuf, rxKey))
{
LogError("keyed hash failed");
return false;
}
const ShortHash expected(ptr);
if(expected != digest)
{
LogError("Message Integrity Failed: got ", digest, " from ", remoteAddr,
" instead of ", expected);
Close();
return false;
}
llarp_buffer_t in(ptr + FragmentOverheadSize,
FragmentBufferSize - FragmentOverheadSize);
llarp_buffer_t out(rxFragBody);
// decrypt
if(!CryptoManager::instance()->xchacha20_alt(out, in, rxKey,
ptr + FragmentHashSize))
{
LogError("failed to decrypt message from ", remoteAddr);
return false;
}
// get inner nonce
AlignedBuffer< 24 > A(out.base);
// advance buffer
out.cur += A.size();
// read msgid
uint32_t msgid;
if(!out.read_uint32(msgid))
{
LogError("failed to read msgid");
return false;
}
// read length and remaining
uint16_t length, remaining;
if(!(out.read_uint16(length) && out.read_uint16(remaining)))
{
LogError("failed to read the rest of the header");
return false;
}
if(length > (out.sz - (out.cur - out.base)))
{
// too big length
LogError("fragment body too big");
return false;
}
if(msgid < m_NextRXMsgID)
return false;
m_NextRXMsgID = msgid;
// get message
if(m_RecvMsgs.find(msgid) == m_RecvMsgs.end())
{
m_RecvMsgs.emplace(msgid, InboundMessage());
}
auto itr = m_RecvMsgs.find(msgid);
// add message activity
itr->second.lastActive = parent->Now();
// append data
if(!itr->second.AppendData(out.cur, length))
{
LogError("inbound buffer is full");
m_RecvMsgs.erase(itr);
return false; // not enough room
}
// mutate key
if(!MutateKey(rxKey, A))
{
LogError("failed to mutate rx key");
return false;
}
if(remaining == 0)
{
ManagedBuffer buf{itr->second.buffer};
// resize
buf.underlying.sz = buf.underlying.cur - buf.underlying.base;
// rewind
buf.underlying.cur = buf.underlying.base;
// process buffer
LogDebug("got message ", msgid, " from ", remoteAddr);
parent->HandleMessage(this, buf.underlying);
m_RecvMsgs.erase(itr);
}
return true;
}
void
Session::Close()
{
if(state != eClose)
{
if(sock)
{
if(state == eLinkEstablished || state == eSessionReady)
{
// only call shutdown when we are actually connected
utp_shutdown(sock, SHUT_RDWR);
}
utp_close(sock);
utp_set_userdata(sock, nullptr);
sock = nullptr;
LogDebug("utp_close ", remoteAddr);
if(remoteRC.IsPublicRouter())
{
metrics::integerTick("utp.session.close", "to", 1, "id",
RouterID(remoteRC.pubkey).ToString());
}
// discard sendq
// TODO: retry on another session ?
while(sendq.size())
{
sendq.front().Dropped();
sendq.pop_front();
}
}
}
EnterState(eClose);
}
void
Session::Alive()
{
lastActive = parent->Now();
}
InboundSession::InboundSession(LinkLayer* p, utp_socket* s,
const Addr& addr)
: Session(p)
{
sock = s;
remoteAddr = addr;
RouterID rid = p->GetOurRC().pubkey;
CryptoManager::instance()->shorthash(rxKey, llarp_buffer_t(rid));
remoteRC.Clear();
ABSL_ATTRIBUTE_UNUSED void* res = utp_set_userdata(sock, this);
assert(res == this);
assert(s == sock);
GotLIM = util::memFn(&InboundSession::InboundLIM, this);
}
bool
InboundSession::InboundLIM(const LinkIntroMessage* msg)
{
if(gotLIM && remoteRC.pubkey != msg->rc.pubkey)
{
Close();
return false;
}
if(!gotLIM)
{
remoteRC = msg->rc;
CryptoManager::instance()->shorthash(txKey,
llarp_buffer_t(remoteRC.pubkey));
if(!DoServerKeyExchange(rxKey, msg->N, remoteRC.enckey,
parent->TransportSecretKey()))
return false;
std::array< byte_t, LinkIntroMessage::MaxSize > tmp;
llarp_buffer_t buf(tmp);
LinkIntroMessage replymsg;
replymsg.rc = parent->GetOurRC();
if(!replymsg.rc.Verify(parent->Now()))
{
LogError("our RC is invalid? closing session to", remoteAddr);
Close();
return false;
}
replymsg.N.Randomize();
replymsg.P = DefaultLinkSessionLifetime;
if(!replymsg.Sign(parent->Sign))
{
LogError("failed to sign LIM for inbound handshake from ",
remoteAddr);
Close();
return false;
}
// encode
if(!replymsg.BEncode(&buf))
{
LogError("failed to encode LIM for handshake from ", remoteAddr);
Close();
return false;
}
// rewind
buf.sz = buf.cur - buf.base;
buf.cur = buf.base;
// send
if(!SendMessageBuffer(buf, nullptr))
{
LogError("failed to repl to handshake from ", remoteAddr);
Close();
return false;
}
if(!DoClientKeyExchange(txKey, replymsg.N, remoteRC.enckey,
parent->RouterEncryptionSecret()))
return false;
LogDebug("Sent reply LIM");
gotLIM = true;
EnterState(eSessionReady);
/// future LIM are used for session renegotiation
GotLIM = util::memFn(&Session::GotSessionRenegotiate, this);
}
return true;
}
OutboundSession::OutboundSession(LinkLayer* p, utp_socket* s,
const RouterContact& rc,
const AddressInfo& addr)
: Session(p)
{
remoteTransportPubKey = addr.pubkey;
remoteRC = rc;
sock = s;
remoteAddr = addr;
RouterID rid = remoteRC.pubkey;
CryptoManager::instance()->shorthash(txKey, llarp_buffer_t(rid));
rid = p->GetOurRC().pubkey;
CryptoManager::instance()->shorthash(rxKey, llarp_buffer_t(rid));
ABSL_ATTRIBUTE_UNUSED void* res = utp_set_userdata(sock, this);
assert(res == this);
assert(s == sock);
GotLIM = util::memFn(&OutboundSession::OutboundLIM, this);
}
void
OutboundSession::Start()
{
utp_connect(sock, remoteAddr, remoteAddr.SockLen());
EnterState(eConnecting);
}
bool
OutboundSession::OutboundLIM(const LinkIntroMessage* msg)
{
if(gotLIM && remoteRC.pubkey != msg->rc.pubkey)
{
return false;
}
remoteRC = msg->rc;
gotLIM = true;
if(!DoServerKeyExchange(rxKey, msg->N, remoteRC.enckey,
parent->RouterEncryptionSecret()))
{
Close();
return false;
}
/// future LIM are used for session renegotiation
GotLIM = util::memFn(&Session::GotSessionRenegotiate, this);
EnterState(eSessionReady);
return true;
}
} // namespace utp
} // namespace llarp

@ -1,301 +0,0 @@
#ifndef LLARP_UTP_SESSION_HPP
#define LLARP_UTP_SESSION_HPP
#include <crypto/crypto.hpp>
#include <link/session.hpp>
#include <utility>
#include <utp/inbound_message.hpp>
#include <deque>
#include <utp.h>
namespace llarp
{
namespace utp
{
struct LinkLayer;
struct Session : public ILinkSession,
public std::enable_shared_from_this< Session >
{
/// remote router's rc
RouterContact remoteRC;
/// underlying socket
utp_socket* sock;
/// link layer parent
ILinkLayer* parent;
/// did we get a LIM from the remote yet?
bool gotLIM;
/// remote router's transport pubkey
PubKey remoteTransportPubKey;
/// remote router's transport ip
Addr remoteAddr;
/// rx session key
SharedSecret rxKey;
/// tx session key
SharedSecret txKey;
/// timestamp last active
llarp_time_t lastActive = 0;
/// timestamp last send success
llarp_time_t lastSend = 0;
/// session timeout (60s)
const static llarp_time_t sessionTimeout = DefaultLinkSessionLifetime;
struct OutboundMessage
{
OutboundMessage(uint32_t id, CompletionHandler func)
: msgid{id}, completed{std::move(func)}
{
}
const uint32_t msgid;
std::deque< utp_iovec > vecs;
std::deque< FragmentBuffer > fragments;
CompletionHandler completed;
void
Dropped()
{
if(completed)
{
completed(DeliveryStatus::eDeliveryDropped);
completed = nullptr;
}
}
void
Delivered()
{
if(completed)
{
completed(DeliveryStatus::eDeliverySuccess);
completed = nullptr;
}
}
bool
operator<(const OutboundMessage& other) const
{
return msgid < other.msgid;
}
};
/// current rx fragment buffer
FragmentBuffer recvBuf;
/// current offset in current rx fragment buffer
size_t recvBufOffset;
/// rx fragment message body
AlignedBuffer< FragmentBodySize > rxFragBody;
/// the next message id for tx
uint32_t m_NextTXMsgID;
/// the next message id for rx
uint32_t m_NextRXMsgID;
using SendQueue_t = std::deque< OutboundMessage >;
/// messages we are currently sending
SendQueue_t sendq;
/// messages we are recving right now
std::unordered_map< uint32_t, InboundMessage > m_RecvMsgs;
/// are we stalled or nah?
bool stalled = false;
uint64_t m_RXRate = 0;
uint64_t m_TXRate = 0;
llarp_time_t m_LastTick = 0;
/// mark session as alive
void
Alive();
util::StatusObject
ExtractStatus() const override;
~Session() override = 0;
/// base
explicit Session(LinkLayer* p);
enum State
{
eInitial, // initial state
eConnecting, // we are connecting
eLinkEstablished, // when utp connection is established
eCryptoHandshake, // crypto handshake initiated
eSessionReady, // session is ready
eClose // utp connection is closed
};
/// session state, call EnterState(State) to set
State state;
/// hook for utp for when we have established a connection
void
OnLinkEstablished(ILinkLayer* p) override;
/// switch states
void
EnterState(State st);
/// handle LIM after handshake
bool
GotSessionRenegotiate(const LinkIntroMessage* msg);
/// re negotiate session with our new local RC
bool
RenegotiateSession() override;
bool
ShouldPing() const override;
/// pump tx queue
void
PumpWrite(size_t numMessages);
void
Pump() override;
std::shared_ptr< ILinkSession >
BorrowSelf() override
{
return shared_from_this();
}
bool
SendKeepAlive() override;
bool
IsEstablished() const override
{
return state == eSessionReady;
}
bool
TimedOut(llarp_time_t now) const override;
/// verify a fragment buffer and the decrypt it
/// buf is assumed to be FragmentBufferSize bytes long
bool
VerifyThenDecrypt(const byte_t* buf);
/// encrypt a fragment then hash the ciphertext
bool
EncryptThenHash(const byte_t* ptr, uint32_t msgid, uint16_t sz,
uint16_t remain);
/// queue a fully formed message
bool
SendMessageBuffer(const llarp_buffer_t& buf,
ILinkSession::CompletionHandler) override;
/// prune expired inbound messages
void
PruneInboundMessages(llarp_time_t now);
/// do low level connect
void
Connect();
/// handle outbound connection made
void
OutboundLinkEstablished(LinkLayer* p);
// send first message
void
OutboundHandshake();
// do key exchange for handshake
template < bool (Crypto::*dh_func)(SharedSecret&, const PubKey&,
const SecretKey&, const TunnelNonce&) >
bool
DoKeyExchange(SharedSecret& K, const KeyExchangeNonce& n,
const PubKey& other, const SecretKey& secret);
bool
DoClientKeyExchange(SharedSecret& K, const KeyExchangeNonce& n,
const PubKey& other, const SecretKey& secret)
{
return DoKeyExchange< &Crypto::transport_dh_client >(K, n, other,
secret);
}
bool
DoServerKeyExchange(SharedSecret& K, const KeyExchangeNonce& n,
const PubKey& other, const SecretKey& secret)
{
return DoKeyExchange< &Crypto::transport_dh_server >(K, n, other,
secret);
}
/// does K = HS(K + A)
bool
MutateKey(SharedSecret& K, const AlignedBuffer< 24 >& A);
void
Tick(llarp_time_t now) override;
/// close session
void
Close() override;
/// low level read
bool
Recv(const byte_t* buf, size_t sz);
/// get remote identity pubkey
PubKey
GetPubKey() const override;
/// get remote address
Addr
GetRemoteEndpoint() const override;
RouterContact
GetRemoteRC() const override
{
return remoteRC;
}
/// get parent link
ILinkLayer*
GetLinkLayer() const override;
void
MarkEstablished();
size_t
SendQueueBacklog() const override
{
return sendq.size();
}
};
struct InboundSession final : public Session
{
InboundSession(LinkLayer* p, utp_socket* s, const Addr& addr);
bool
InboundLIM(const LinkIntroMessage* msg);
void
Start() override
{
}
};
struct OutboundSession final : public Session
{
OutboundSession(LinkLayer* p, utp_socket* s, const RouterContact& rc,
const AddressInfo& addr);
bool
OutboundLIM(const LinkIntroMessage* msg);
void
Start() override;
};
} // namespace utp
} // namespace llarp
#endif

@ -1,35 +0,0 @@
#include <utp/utp.hpp>
#include <router/abstractrouter.hpp>
#include <util/meta/memfn.hpp>
#include <utp/linklayer.hpp>
namespace llarp
{
namespace utp
{
LinkLayer_ptr
NewOutboundLink(const SecretKey& routerEncSecret, GetRCFunc getrc,
LinkMessageHandler h, SignBufferFunc sign,
SessionEstablishedHandler est,
SessionRenegotiateHandler reneg, TimeoutHandler timeout,
SessionClosedHandler closed)
{
return std::make_shared< LinkLayer >(routerEncSecret, getrc, h, sign, est,
reneg, timeout, closed, false);
}
LinkLayer_ptr
NewInboundLink(const SecretKey& routerEncSecret, GetRCFunc getrc,
LinkMessageHandler h, SignBufferFunc sign,
SessionEstablishedHandler est,
SessionRenegotiateHandler reneg, TimeoutHandler timeout,
SessionClosedHandler closed)
{
return std::make_shared< LinkLayer >(routerEncSecret, getrc, h, sign, est,
reneg, timeout, closed, true);
}
} // namespace utp
} // namespace llarp

@ -1,28 +0,0 @@
#ifndef LLARP_UTP_UTP_HPP
#define LLARP_UTP_UTP_HPP
#include <memory>
#include <link/server.hpp>
namespace llarp
{
namespace utp
{
LinkLayer_ptr
NewInboundLink(const SecretKey& routerEncSecret, GetRCFunc getrc,
LinkMessageHandler h, SignBufferFunc sign,
SessionEstablishedHandler est,
SessionRenegotiateHandler reneg, TimeoutHandler timeout,
SessionClosedHandler closed);
LinkLayer_ptr
NewOutboundLink(const SecretKey& routerEncSecret, GetRCFunc getrc,
LinkMessageHandler h, SignBufferFunc sign,
SessionEstablishedHandler est,
SessionRenegotiateHandler reneg, TimeoutHandler timeout,
SessionClosedHandler closed);
/// shim
const auto NewServer = NewInboundLink;
} // namespace utp
} // namespace llarp
#endif

@ -5,7 +5,7 @@
#include <iwp/iwp.hpp>
#include <messages/link_intro.hpp>
#include <messages/discard.hpp>
#include <utp/utp.hpp>
#include <test_util.hpp>
@ -261,188 +261,3 @@ TEST_F(LinkLayerTest, TestIWP)
ASSERT_TRUE(Bob.gotLIM);
#endif
};
TEST_F(LinkLayerTest, TestUTPAliceRenegWithBob)
{
#ifdef WIN32
GTEST_SKIP();
#else
Alice.link = utp::NewServer(
Alice.encryptionKey,
[&]() -> const RouterContact& { return Alice.GetRC(); },
[&](ILinkSession* s, const llarp_buffer_t& buf) -> bool {
if(Alice.gotLIM)
{
Alice.Regen();
return s->RenegotiateSession();
}
else
{
LinkIntroMessage msg;
ManagedBuffer copy{buf};
if(!msg.BDecode(&copy.underlying))
return false;
if(!s->GotLIM(&msg))
return false;
Alice.gotLIM = true;
return true;
}
},
[&](Signature& sig, const llarp_buffer_t& buf) -> bool {
return m_crypto.sign(sig, Alice.signingKey, buf);
},
[&](ILinkSession* s) -> bool {
const auto rc = s->GetRemoteRC();
return rc.pubkey == Bob.GetRC().pubkey;
},
[&](RouterContact, RouterContact) -> bool { return true; },
[&](ILinkSession* session) {
ASSERT_FALSE(session->IsEstablished());
Stop();
},
[&](RouterID router) { ASSERT_EQ(router, Bob.GetRouterID()); });
auto sendDiscardMessage = [](ILinkSession* s) -> bool {
// send discard message in reply to complete unit test
std::array< byte_t, 32 > tmp;
llarp_buffer_t otherBuf(tmp);
DiscardMessage discard;
if(!discard.BEncode(&otherBuf))
return false;
otherBuf.sz = otherBuf.cur - otherBuf.base;
otherBuf.cur = otherBuf.base;
return s->SendMessageBuffer(otherBuf, nullptr);
};
Bob.link = utp::NewServer(
Bob.encryptionKey, [&]() -> const RouterContact& { return Bob.GetRC(); },
[&](ILinkSession* s, const llarp_buffer_t& buf) -> bool {
LinkIntroMessage msg;
ManagedBuffer copy{buf};
if(!msg.BDecode(&copy.underlying))
return false;
if(!s->GotLIM(&msg))
return false;
Bob.gotLIM = true;
return sendDiscardMessage(s);
},
[&](Signature& sig, const llarp_buffer_t& buf) -> bool {
return m_crypto.sign(sig, Bob.signingKey, buf);
},
[&](ILinkSession* s) -> bool {
if(s->GetRemoteRC().pubkey != Alice.GetRC().pubkey)
return false;
LogInfo("bob established with alice");
return Bob.link->VisitSessionByPubkey(Alice.GetRC().pubkey.as_array(),
sendDiscardMessage);
},
[&](RouterContact newrc, RouterContact oldrc) -> bool {
success = newrc.pubkey == oldrc.pubkey;
return true;
},
[&](ILinkSession* session) { ASSERT_FALSE(session->IsEstablished()); },
[&](RouterID router) { ASSERT_EQ(router, Alice.GetRouterID()); });
ASSERT_TRUE(Alice.Start(m_logic, netLoop, AlicePort));
ASSERT_TRUE(Bob.Start(m_logic, netLoop, BobPort));
ASSERT_TRUE(Alice.link->TryEstablishTo(Bob.GetRC()));
RunMainloop();
ASSERT_TRUE(Bob.gotLIM);
ASSERT_TRUE(success);
#endif
}
TEST_F(LinkLayerTest, TestUTPAliceConnectToBob)
{
#ifdef WIN32
GTEST_SKIP();
#else
Alice.link = utp::NewServer(
Alice.encryptionKey,
[&]() -> const RouterContact& { return Alice.GetRC(); },
[&](ILinkSession* s, const llarp_buffer_t& buf) -> bool {
LinkIntroMessage lim;
llarp_buffer_t copy(buf.base, buf.sz);
if(lim.BDecode(&copy))
{
if(s->GotLIM(&lim))
{
Alice.gotLIM = true;
return true;
}
return false;
}
return AliceGotMessage(buf);
},
[&](Signature& sig, const llarp_buffer_t& buf) -> bool {
return m_crypto.sign(sig, Alice.signingKey, buf);
},
[&](ILinkSession* s) -> bool {
if(s->GetRemoteRC().pubkey != Bob.GetRC().pubkey)
return false;
LogInfo("alice established with bob");
return true;
},
[&](RouterContact, RouterContact) -> bool { return true; },
[&](ILinkSession* session) {
ASSERT_FALSE(session->IsEstablished());
Stop();
},
[&](RouterID router) { ASSERT_EQ(router, Bob.GetRouterID()); });
Bob.link = utp::NewServer(
Bob.encryptionKey, [&]() -> const RouterContact& { return Bob.GetRC(); },
[&](ILinkSession* s, const llarp_buffer_t& buf) -> bool {
LinkIntroMessage lim;
llarp_buffer_t copy(buf.base, buf.sz);
if(lim.BDecode(&copy))
{
if(s->GotLIM(&lim))
{
Bob.gotLIM = true;
return true;
}
return false;
}
return true;
},
[&](Signature& sig, const llarp_buffer_t& buf) -> bool {
return m_crypto.sign(sig, Bob.signingKey, buf);
},
[&](ILinkSession* s) -> bool {
if(s->GetRemoteRC().pubkey != Alice.GetRC().pubkey)
return false;
LogInfo("bob established with alice");
m_logic->queue_job({s, [](void* u) {
ILinkSession* self =
static_cast< ILinkSession* >(u);
std::array< byte_t, 32 > tmp;
llarp_buffer_t otherBuf(tmp);
DiscardMessage discard;
if(!discard.BEncode(&otherBuf))
return;
otherBuf.sz = otherBuf.cur - otherBuf.base;
otherBuf.cur = otherBuf.base;
self->SendMessageBuffer(otherBuf, nullptr);
}});
return true;
},
[&](RouterContact, RouterContact) -> bool { return true; },
[&](ILinkSession* session) { ASSERT_FALSE(session->IsEstablished()); },
[&](RouterID router) { ASSERT_EQ(router, Alice.GetRouterID()); });
ASSERT_TRUE(Alice.Start(m_logic, netLoop, AlicePort));
ASSERT_TRUE(Bob.Start(m_logic, netLoop, BobPort));
ASSERT_TRUE(Alice.link->TryEstablishTo(Bob.GetRC()));
RunMainloop();
ASSERT_TRUE(Bob.gotLIM);
ASSERT_TRUE(success);
#endif
}

Loading…
Cancel
Save