Skip to content
Snippets Groups Projects
Commit 5a5f7012 authored by Simon Désaulniers's avatar Simon Désaulniers Committed by Adrien Béraud
Browse files

add pht insert test

parent 5a239cdd
No related branches found
No related tags found
No related merge requests found
...@@ -45,3 +45,13 @@ Makefile.in ...@@ -45,3 +45,13 @@ Makefile.in
# KDevelop # KDevelop
.kdev4/ .kdev4/
*.kdev4 *.kdev4
# Python
*.pyc
# git backup files
*.orig
# vim swap files
*.swp
*.swo
...@@ -132,12 +132,12 @@ struct IndexEntry : public dht::Value::Serializable<IndexEntry> { ...@@ -132,12 +132,12 @@ struct IndexEntry : public dht::Value::Serializable<IndexEntry> {
class Pht { class Pht {
static constexpr const char* INDEX_PREFIX = "index.pht."; static constexpr const char* INDEX_PREFIX = "index.pht.";
public:
/* This is the maximum number of entries per node. This parameter is /* This is the maximum number of entries per node. This parameter is
* critical and influences the traffic alot during a lookup operation. * critical and influences the traffic alot during a lookup operation.
*/ */
static constexpr const size_t MAX_NODE_ENTRY_COUNT {16}; static constexpr const size_t MAX_NODE_ENTRY_COUNT {16};
public:
using Key = std::map<std::string, Prefix>; using Key = std::map<std::string, Prefix>;
using LookupCallback = std::function<void(std::vector<std::shared_ptr<Value>>& values, Prefix p)>; using LookupCallback = std::function<void(std::vector<std::shared_ptr<Value>>& values, Prefix p)>;
......
...@@ -393,9 +393,11 @@ cdef class DhtRunner(_WithID): ...@@ -393,9 +393,11 @@ cdef class DhtRunner(_WithID):
cdef class IndexValue(object): cdef class IndexValue(object):
cdef cpp.shared_ptr[cpp.IndexValue] _value cdef cpp.shared_ptr[cpp.IndexValue] _value
def __init__(self, InfoHash h, cpp.uint64_t vid=0): def __init__(self, InfoHash h=None, cpp.uint64_t vid=0):
cdef cpp.InfoHash hh = h._infohash cdef cpp.InfoHash hh = h._infohash
self._value.reset(new cpp.IndexValue(hh, vid)) self._value.reset(new cpp.IndexValue(hh, vid))
def __str__(self):
return "(" + self.getKey().toString().decode() +", "+ str(self.getValueId()) +")"
def getKey(self): def getKey(self):
h = InfoHash() h = InfoHash()
h._infohash = self._value.get().first h._infohash = self._value.get().first
...@@ -407,6 +409,9 @@ cdef class Pht(object): ...@@ -407,6 +409,9 @@ cdef class Pht(object):
cdef cpp.Pht* thisptr cdef cpp.Pht* thisptr
def __cinit__(self, bytes name, DhtRunner dht): def __cinit__(self, bytes name, DhtRunner dht):
self.thisptr = new cpp.Pht(name, dht.thisptr) self.thisptr = new cpp.Pht(name, dht.thisptr)
property MAX_NODE_ENTRY_COUNT:
def __get__(self):
return cpp.PHT_MAX_NODE_ENTRY_COUNT
def lookup(self, key, lookup_cb=None, done_cb=None): def lookup(self, key, lookup_cb=None, done_cb=None):
"""Query the Index with a specified key. """Query the Index with a specified key.
......
...@@ -159,6 +159,7 @@ cdef extern from "opendht/log.h" namespace "dht::log": ...@@ -159,6 +159,7 @@ cdef extern from "opendht/log.h" namespace "dht::log":
void enableFileLogging(DhtRunner& dht, const string& path) void enableFileLogging(DhtRunner& dht, const string& path)
cdef extern from "opendht/indexation/pht.h" namespace "dht::indexation": cdef extern from "opendht/indexation/pht.h" namespace "dht::indexation":
size_t PHT_MAX_NODE_ENTRY_COUNT "dht::indexation::Pht::MAX_NODE_ENTRY_COUNT"
cdef cppclass Prefix: cdef cppclass Prefix:
Prefix() except + Prefix() except +
Prefix(vector[uint8_t]) except + Prefix(vector[uint8_t]) except +
......
...@@ -27,7 +27,7 @@ import random ...@@ -27,7 +27,7 @@ import random
from dht.network import DhtNetwork from dht.network import DhtNetwork
from dht.network import DhtNetworkSubProcess from dht.network import DhtNetworkSubProcess
from dht.tests import PerformanceTest, PersistenceTest from dht.tests import PerformanceTest, PersistenceTest, PhtTest
from dht import virtual_network_builder from dht import virtual_network_builder
from dht import network as dhtnetwork from dht import network as dhtnetwork
...@@ -168,6 +168,10 @@ if __name__ == '__main__': ...@@ -168,6 +168,10 @@ if __name__ == '__main__':
featureArgs = parser.add_mutually_exclusive_group(required=True) featureArgs = parser.add_mutually_exclusive_group(required=True)
featureArgs.add_argument('--performance', action='store_true', default=False, featureArgs.add_argument('--performance', action='store_true', default=False,
help='Launches performance benchmark test. Available args for "-t" are: gets.') help='Launches performance benchmark test. Available args for "-t" are: gets.')
featureArgs.add_argument('--pht', action='store_true', default=False,
help='Launches PHT benchmark test. '\
'Available args for "-t" are: insert. '\
'Use "-m" option for fixing number of keys to create during the test.')
featureArgs.add_argument('--data-persistence', action='store_true', default=0, featureArgs.add_argument('--data-persistence', action='store_true', default=0,
help='Launches data persistence benchmark test. '\ help='Launches data persistence benchmark test. '\
'Available args for "-t" are: delete, replace, mult_time. '\ 'Available args for "-t" are: delete, replace, mult_time. '\
...@@ -217,6 +221,10 @@ if __name__ == '__main__': ...@@ -217,6 +221,10 @@ if __name__ == '__main__':
PerformanceTest(args.test, wb, test_opt).run() PerformanceTest(args.test, wb, test_opt).run()
elif args.data_persistence: elif args.data_persistence:
PersistenceTest(args.test, wb, test_opt).run() PersistenceTest(args.test, wb, test_opt).run()
elif args.pht:
if args.m:
test_opt.update({ 'num_keys' : args.m })
PhtTest(args.test, wb, test_opt).run()
except Exception as e: except Exception as e:
print(e) print(e)
......
...@@ -13,9 +13,12 @@ import subprocess ...@@ -13,9 +13,12 @@ import subprocess
import re import re
import traceback import traceback
from matplotlib.ticker import FuncFormatter
import math
import numpy as np import numpy as np
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
from matplotlib.ticker import FuncFormatter import networkx as nx
from opendht import * from opendht import *
from dht.network import DhtNetwork, DhtNetworkSubProcess from dht.network import DhtNetwork, DhtNetworkSubProcess
...@@ -156,13 +159,13 @@ def iftop_traffic_data(ifname, interval=2, rate_type='send_receive'): ...@@ -156,13 +159,13 @@ def iftop_traffic_data(ifname, interval=2, rate_type='send_receive'):
########### ###########
class FeatureTest(object): class FeatureTest(object):
done = 0
lock = None
""" """
This is a base test. This is a base test.
""" """
done = 0
lock = None
def __init__(self, test, workbench): def __init__(self, test, workbench):
""" """
@param test: The test string indicating the test to run. This string is @param test: The test string indicating the test to run. This string is
...@@ -174,6 +177,7 @@ class FeatureTest(object): ...@@ -174,6 +177,7 @@ class FeatureTest(object):
""" """
self._test = test self._test = test
self._workbench = workbench self._workbench = workbench
self._bootstrap = self._workbench.get_bootstrap()
def _reset(self): def _reset(self):
""" """
...@@ -188,6 +192,138 @@ class FeatureTest(object): ...@@ -188,6 +192,138 @@ class FeatureTest(object):
raise NotImplementedError('This method must be implemented.') raise NotImplementedError('This method must be implemented.')
##################################
# PHT #
##################################
class PhtTest(FeatureTest):
"""TODO
"""
indexEntries = None
prefix = None
key = None
def __init__(self, test, workbench, opts):
"""
@param test: is one of the following:
- 'insert': indexes a considerable amount of data in
the PHT structure.
TODO
@type test: string
@param opts: Dictionnary containing options for the test. Allowed
options are:
- 'num_keys': this specifies the number of keys to insert
in the PHT during the test.
@type opts: dict
"""
super(PhtTest, self).__init__(test, workbench)
self._num_keys = opts['num_keys'] if 'num_keys' in opts else 32
def _reset(self):
super(PhtTest, self)._reset()
PhtTest.indexEntries = []
@staticmethod
def lookupCb(vals, prefix):
PhtTest.indexEntries = list(vals)
PhtTest.prefix = prefix.decode()
DhtNetwork.log('Index name: <todo>')
DhtNetwork.log('Leaf prefix:', prefix)
for v in vals:
DhtNetwork.log('[ENTRY]:', v)
@staticmethod
def lookupDoneCb(ok):
DhtNetwork.log('[LOOKUP]:', PhtTest.key, "--", "success!" if ok else "Fail...")
with FeatureTest.lock:
FeatureTest.lock.notify()
@staticmethod
def insertDoneCb(ok):
DhtNetwork.log('[INSERT]:', PhtTest.key, "--", "success!" if ok else "Fail...")
with FeatureTest.lock:
FeatureTest.lock.notify()
@staticmethod
def drawTrie(trie_dict):
"""
Draws the trie structure of the PHT from dictionnary.
@param trie_dict: Dictionnary of index entries (prefix -> entry).
@type trie_dict: dict
"""
prefixes = list(trie_dict.keys())
if len(prefixes) == 0:
return
edges = set([])
for prefix in prefixes:
for i in range(-1, len(prefix)-1):
u = prefix[:i+1]
edges.add( ("" if i == -1 else u, u+"0") )
edges.add( ("" if i == -1 else u, u+"1") )
# TODO: use a binary tree position layout...
G = nx.Graph(list(edges))
nx.draw(G, with_labels=True, node_color='white')
plt.show()
def run(self):
try:
if self._test == 'insert':
self._massIndexTest()
except Exception as e:
print(e)
finally:
self._bootstrap.resize(1)
###########
# Tests #
###########
@reset_before_test
def _insertTest(self):
"""TODO: Docstring for _massIndexTest.
"""
bootstrap = self._bootstrap
bootstrap.resize(2)
dht = bootstrap.get(1)
pht = Pht(b'foo_index', dht)
DhtNetwork.log('PHT has',
pht.MAX_NODE_ENTRY_COUNT,
'node'+ ('s' if pht.MAX_NODE_ENTRY_COUNT > 1 else ''),
'per leaf bucket.')
NUM_DIG = max(math.log(self._num_keys, 2)/4, 5) # at least 5 digit keys.
keys = [{
'foo' :
''.join(random.SystemRandom().choice(string.hexdigits)
for _ in range(NUM_DIG)).encode()
} for n in range(self._num_keys)]
all_entries = {}
# Index all entries.
for key in keys:
PhtTest.key = key
pht.insert(key, IndexValue(random_hash()), PhtTest.insertDoneCb)
with FeatureTest.lock:
FeatureTest.lock.wait()
# Recover entries now that the trie is complete.
for key in keys:
PhtTest.key = key
pht.lookup(key, PhtTest.lookupCb, PhtTest.lookupDoneCb)
with FeatureTest.lock:
FeatureTest.lock.wait()
all_entries[PhtTest.prefix] = [e.__str__()
for e in PhtTest.indexEntries]
PhtTest.drawTrie(all_entries)
################################## ##################################
# DHT # # DHT #
...@@ -195,7 +331,7 @@ class FeatureTest(object): ...@@ -195,7 +331,7 @@ class FeatureTest(object):
class DhtFeatureTest(FeatureTest): class DhtFeatureTest(FeatureTest):
""" """
This is base test. A method run() implementation is required. This is a base dht test.
""" """
#static variables used by class callbacks #static variables used by class callbacks
successfullTransfer = lambda lv,fv: len(lv) == len(fv) successfullTransfer = lambda lv,fv: len(lv) == len(fv)
...@@ -204,7 +340,6 @@ class DhtFeatureTest(FeatureTest): ...@@ -204,7 +340,6 @@ class DhtFeatureTest(FeatureTest):
def __init__(self, test, workbench): def __init__(self, test, workbench):
super(DhtFeatureTest, self).__init__(test, workbench) super(DhtFeatureTest, self).__init__(test, workbench)
self.bootstrap = self._workbench.get_bootstrap()
def _reset(self): def _reset(self):
super(DhtFeatureTest, self)._reset() super(DhtFeatureTest, self)._reset()
...@@ -323,9 +458,9 @@ class PersistenceTest(DhtFeatureTest): ...@@ -323,9 +458,9 @@ class PersistenceTest(DhtFeatureTest):
config.setNodeId(InfoHash(_hash_str.encode())) config.setNodeId(InfoHash(_hash_str.encode()))
n = DhtRunner() n = DhtRunner()
n.run(config=config) n.run(config=config)
n.bootstrap(self.bootstrap.ip4, n.bootstrap(self._bootstrap.ip4,
str(self.bootstrap.port)) str(self._bootstrap.port))
DhtNetwork.Log.log('Node','['+_hash_str+']', DhtNetwork.log('Node','['+_hash_str+']',
'started around', _hash.toString().decode() 'started around', _hash.toString().decode()
if n.isRunning() else if n.isRunning() else
'failed to start...' 'failed to start...'
...@@ -333,7 +468,7 @@ class PersistenceTest(DhtFeatureTest): ...@@ -333,7 +468,7 @@ class PersistenceTest(DhtFeatureTest):
trigger_nodes.append(n) trigger_nodes.append(n)
def _result(self, local_values, new_nodes): def _result(self, local_values, new_nodes):
bootstrap = self.bootstrap bootstrap = self._bootstrap
if not DhtFeatureTest.successfullTransfer(local_values, DhtFeatureTest.foreignValues): if not DhtFeatureTest.successfullTransfer(local_values, DhtFeatureTest.foreignValues):
DhtNetwork.Log.log('[GET]: Only %s on %s values persisted.' % DhtNetwork.Log.log('[GET]: Only %s on %s values persisted.' %
(len(DhtFeatureTest.foreignValues), len(local_values))) (len(DhtFeatureTest.foreignValues), len(local_values)))
...@@ -372,7 +507,7 @@ class PersistenceTest(DhtFeatureTest): ...@@ -372,7 +507,7 @@ class PersistenceTest(DhtFeatureTest):
plot_fname = "traffic-plot" plot_fname = "traffic-plot"
print('plot saved to', plot_fname) print('plot saved to', plot_fname)
plt.savefig(plot_fname) plt.savefig(plot_fname)
self.bootstrap.resize(1) self._bootstrap.resize(1)
########### ###########
# Tests # # Tests #
...@@ -385,8 +520,7 @@ class PersistenceTest(DhtFeatureTest): ...@@ -385,8 +520,7 @@ class PersistenceTest(DhtFeatureTest):
""" """
trigger_nodes = [] trigger_nodes = []
wb = self._workbench wb = self._workbench
bootstrap = self.bootstrap bootstrap = self._bootstrap
# Value representing an ICE packet. Each ICE packet is around 1KB. # Value representing an ICE packet. Each ICE packet is around 1KB.
VALUE_SIZE = 1024 VALUE_SIZE = 1024
num_values_per_hash = self._num_values/wb.node_num if self._num_values else 5 num_values_per_hash = self._num_values/wb.node_num if self._num_values else 5
...@@ -481,13 +615,7 @@ class PersistenceTest(DhtFeatureTest): ...@@ -481,13 +615,7 @@ class PersistenceTest(DhtFeatureTest):
It uses Dht shutdown call from the API to gracefuly finish the nodes one It uses Dht shutdown call from the API to gracefuly finish the nodes one
after the other. after the other.
""" """
wb = self._workbench bootstrap = self._bootstrap
if self._traffic_plot:
traffic_plot_thread = threading.Thread(target=display_traffic_plot, args=tuple(['br'+wb.ifname]))
traffic_plot_thread.daemon = True
traffic_plot_thread.start()
bootstrap = self.bootstrap
ops_count = [] ops_count = []
...@@ -573,7 +701,7 @@ class PersistenceTest(DhtFeatureTest): ...@@ -573,7 +701,7 @@ class PersistenceTest(DhtFeatureTest):
""" """
clusters = 8 clusters = 8
bootstrap = self.bootstrap bootstrap = self._bootstrap
bootstrap.resize(3) bootstrap.resize(3)
consumer = bootstrap.get(1) consumer = bootstrap.get(1)
...@@ -610,7 +738,7 @@ class PersistenceTest(DhtFeatureTest): ...@@ -610,7 +738,7 @@ class PersistenceTest(DhtFeatureTest):
minutes for the nodes to trigger storage maintenance. minutes for the nodes to trigger storage maintenance.
""" """
trigger_nodes = [] trigger_nodes = []
bootstrap = self.bootstrap bootstrap = self._bootstrap
N_PRODUCERS = self._num_producers if self._num_values else 16 N_PRODUCERS = self._num_producers if self._num_values else 16
DP_TIMEOUT = 1 DP_TIMEOUT = 1
...@@ -693,7 +821,7 @@ class PerformanceTest(DhtFeatureTest): ...@@ -693,7 +821,7 @@ class PerformanceTest(DhtFeatureTest):
traceback.print_tb(e.__traceback__) traceback.print_tb(e.__traceback__)
print(type(e).__name__+':', e, file=sys.stderr) print(type(e).__name__+':', e, file=sys.stderr)
finally: finally:
self.bootstrap.resize(1) self._bootstrap.resize(1)
########### ###########
...@@ -705,7 +833,7 @@ class PerformanceTest(DhtFeatureTest): ...@@ -705,7 +833,7 @@ class PerformanceTest(DhtFeatureTest):
""" """
Tests for performance of the DHT doing multiple get() operation. Tests for performance of the DHT doing multiple get() operation.
""" """
bootstrap = self.bootstrap bootstrap = self._bootstrap
plt.ion() plt.ion()
...@@ -794,8 +922,7 @@ class PerformanceTest(DhtFeatureTest): ...@@ -794,8 +922,7 @@ class PerformanceTest(DhtFeatureTest):
deleting around the target hash. deleting around the target hash.
""" """
bootstrap = self._bootstrap
bootstrap = self.bootstrap
bootstrap.resize(3) bootstrap.resize(3)
consumer = bootstrap.get(1) consumer = bootstrap.get(1)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment