mirror of
https://github.com/ComradCollective/Comrad
synced 2024-11-09 13:10:44 +00:00
122 lines
4.4 KiB
Python
122 lines
4.4 KiB
Python
|
from random import shuffle
|
||
|
from kademlia.routing import KBucket, TableTraverser
|
||
|
|
||
|
|
||
|
class TestKBucket:
|
||
|
def test_split(self, mknode): # pylint: disable=no-self-use
|
||
|
bucket = KBucket(0, 10, 5)
|
||
|
bucket.add_node(mknode(intid=5))
|
||
|
bucket.add_node(mknode(intid=6))
|
||
|
one, two = bucket.split()
|
||
|
assert len(one) == 1
|
||
|
assert one.range == (0, 5)
|
||
|
assert len(two) == 1
|
||
|
assert two.range == (6, 10)
|
||
|
|
||
|
def test_split_no_overlap(self): # pylint: disable=no-self-use
|
||
|
left, right = KBucket(0, 2 ** 160, 20).split()
|
||
|
assert (right.range[0] - left.range[1]) == 1
|
||
|
|
||
|
def test_add_node(self, mknode): # pylint: disable=no-self-use
|
||
|
# when full, return false
|
||
|
bucket = KBucket(0, 10, 2)
|
||
|
assert bucket.add_node(mknode()) is True
|
||
|
assert bucket.add_node(mknode()) is True
|
||
|
assert bucket.add_node(mknode()) is False
|
||
|
assert len(bucket) == 2
|
||
|
|
||
|
# make sure when a node is double added it's put at the end
|
||
|
bucket = KBucket(0, 10, 3)
|
||
|
nodes = [mknode(), mknode(), mknode()]
|
||
|
for node in nodes:
|
||
|
bucket.add_node(node)
|
||
|
for index, node in enumerate(bucket.get_nodes()):
|
||
|
assert node == nodes[index]
|
||
|
|
||
|
def test_remove_node(self, mknode): # pylint: disable=no-self-use
|
||
|
k = 3
|
||
|
bucket = KBucket(0, 10, k)
|
||
|
nodes = [mknode() for _ in range(10)]
|
||
|
for node in nodes:
|
||
|
bucket.add_node(node)
|
||
|
|
||
|
replacement_nodes = bucket.replacement_nodes
|
||
|
assert list(bucket.nodes.values()) == nodes[:k]
|
||
|
assert list(replacement_nodes.values()) == nodes[k:]
|
||
|
|
||
|
bucket.remove_node(nodes.pop())
|
||
|
assert list(bucket.nodes.values()) == nodes[:k]
|
||
|
assert list(replacement_nodes.values()) == nodes[k:]
|
||
|
|
||
|
bucket.remove_node(nodes.pop(0))
|
||
|
assert list(bucket.nodes.values()) == nodes[:k-1] + nodes[-1:]
|
||
|
assert list(replacement_nodes.values()) == nodes[k-1:-1]
|
||
|
|
||
|
shuffle(nodes)
|
||
|
for node in nodes:
|
||
|
bucket.remove_node(node)
|
||
|
assert not bucket
|
||
|
assert not replacement_nodes
|
||
|
|
||
|
def test_in_range(self, mknode): # pylint: disable=no-self-use
|
||
|
bucket = KBucket(0, 10, 10)
|
||
|
assert bucket.has_in_range(mknode(intid=5)) is True
|
||
|
assert bucket.has_in_range(mknode(intid=11)) is False
|
||
|
assert bucket.has_in_range(mknode(intid=10)) is True
|
||
|
assert bucket.has_in_range(mknode(intid=0)) is True
|
||
|
|
||
|
def test_replacement_factor(self, mknode): # pylint: disable=no-self-use
|
||
|
k = 3
|
||
|
factor = 2
|
||
|
bucket = KBucket(0, 10, k, replacementNodeFactor=factor)
|
||
|
nodes = [mknode() for _ in range(10)]
|
||
|
for node in nodes:
|
||
|
bucket.add_node(node)
|
||
|
|
||
|
replacement_nodes = bucket.replacement_nodes
|
||
|
assert len(list(replacement_nodes.values())) == k * factor
|
||
|
assert list(replacement_nodes.values()) == nodes[k + 1:]
|
||
|
assert nodes[k] not in list(replacement_nodes.values())
|
||
|
|
||
|
|
||
|
# pylint: disable=too-few-public-methods
|
||
|
class TestRoutingTable:
|
||
|
# pylint: disable=no-self-use
|
||
|
def test_add_contact(self, fake_server, mknode):
|
||
|
fake_server.router.add_contact(mknode())
|
||
|
assert len(fake_server.router.buckets) == 1
|
||
|
assert len(fake_server.router.buckets[0].nodes) == 1
|
||
|
|
||
|
|
||
|
# pylint: disable=too-few-public-methods
|
||
|
class TestTableTraverser:
|
||
|
# pylint: disable=no-self-use
|
||
|
def test_iteration(self, fake_server, mknode):
|
||
|
"""
|
||
|
Make 10 nodes, 5 buckets, two nodes add to one bucket in order,
|
||
|
All buckets: [node0, node1], [node2, node3], [node4, node5],
|
||
|
[node6, node7], [node8, node9]
|
||
|
Test traver result starting from node4.
|
||
|
"""
|
||
|
|
||
|
nodes = [mknode(intid=x) for x in range(10)]
|
||
|
|
||
|
buckets = []
|
||
|
for i in range(5):
|
||
|
bucket = KBucket(2 * i, 2 * i + 1, 2)
|
||
|
bucket.add_node(nodes[2 * i])
|
||
|
bucket.add_node(nodes[2 * i + 1])
|
||
|
buckets.append(bucket)
|
||
|
|
||
|
# replace router's bucket with our test buckets
|
||
|
fake_server.router.buckets = buckets
|
||
|
|
||
|
# expected nodes order
|
||
|
expected_nodes = [nodes[5], nodes[4], nodes[3], nodes[2], nodes[7],
|
||
|
nodes[6], nodes[1], nodes[0], nodes[9], nodes[8]]
|
||
|
|
||
|
start_node = nodes[4]
|
||
|
table_traverser = TableTraverser(fake_server.router, start_node)
|
||
|
for index, node in enumerate(table_traverser):
|
||
|
assert node == expected_nodes[index]
|