From 5a5f7012d823d15f9d5ee9fce3422ea3a9fde51c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20D=C3=A9saulniers?= <rostydela@gmail.com> Date: Tue, 19 Jan 2016 02:15:55 -0500 Subject: [PATCH] add pht insert test --- .gitignore | 10 ++ include/opendht/indexation/pht.h | 2 +- python/opendht.pyx | 7 +- python/opendht_cpp.pxd | 1 + python/tools/benchmark.py | 10 +- python/tools/dht/tests.py | 179 ++++++++++++++++++++++++++----- 6 files changed, 180 insertions(+), 29 deletions(-) diff --git a/.gitignore b/.gitignore index 5a9e61f2..9d50e4d5 100644 --- a/.gitignore +++ b/.gitignore @@ -45,3 +45,13 @@ Makefile.in # KDevelop .kdev4/ *.kdev4 + +# Python +*.pyc + +# git backup files +*.orig + +# vim swap files +*.swp +*.swo diff --git a/include/opendht/indexation/pht.h b/include/opendht/indexation/pht.h index 910e97e2..8cfc521f 100644 --- a/include/opendht/indexation/pht.h +++ b/include/opendht/indexation/pht.h @@ -132,12 +132,12 @@ struct IndexEntry : public dht::Value::Serializable<IndexEntry> { class Pht { static constexpr const char* INDEX_PREFIX = "index.pht."; +public: /* This is the maximum number of entries per node. This parameter is * critical and influences the traffic alot during a lookup operation. */ static constexpr const size_t MAX_NODE_ENTRY_COUNT {16}; -public: using Key = std::map<std::string, Prefix>; using LookupCallback = std::function<void(std::vector<std::shared_ptr<Value>>& values, Prefix p)>; diff --git a/python/opendht.pyx b/python/opendht.pyx index 4854de13..8e846efb 100644 --- a/python/opendht.pyx +++ b/python/opendht.pyx @@ -393,9 +393,11 @@ cdef class DhtRunner(_WithID): cdef class IndexValue(object): cdef cpp.shared_ptr[cpp.IndexValue] _value - def __init__(self, InfoHash h, cpp.uint64_t vid=0): + def __init__(self, InfoHash h=None, cpp.uint64_t vid=0): cdef cpp.InfoHash hh = h._infohash self._value.reset(new cpp.IndexValue(hh, vid)) + def __str__(self): + return "(" + self.getKey().toString().decode() +", "+ str(self.getValueId()) +")" def getKey(self): h = InfoHash() h._infohash = self._value.get().first @@ -407,6 +409,9 @@ cdef class Pht(object): cdef cpp.Pht* thisptr def __cinit__(self, bytes name, DhtRunner dht): self.thisptr = new cpp.Pht(name, dht.thisptr) + property MAX_NODE_ENTRY_COUNT: + def __get__(self): + return cpp.PHT_MAX_NODE_ENTRY_COUNT def lookup(self, key, lookup_cb=None, done_cb=None): """Query the Index with a specified key. diff --git a/python/opendht_cpp.pxd b/python/opendht_cpp.pxd index 8f83c175..4b283ee9 100644 --- a/python/opendht_cpp.pxd +++ b/python/opendht_cpp.pxd @@ -159,6 +159,7 @@ cdef extern from "opendht/log.h" namespace "dht::log": void enableFileLogging(DhtRunner& dht, const string& path) cdef extern from "opendht/indexation/pht.h" namespace "dht::indexation": + size_t PHT_MAX_NODE_ENTRY_COUNT "dht::indexation::Pht::MAX_NODE_ENTRY_COUNT" cdef cppclass Prefix: Prefix() except + Prefix(vector[uint8_t]) except + diff --git a/python/tools/benchmark.py b/python/tools/benchmark.py index 80fd5676..ad36a40c 100755 --- a/python/tools/benchmark.py +++ b/python/tools/benchmark.py @@ -27,7 +27,7 @@ import random from dht.network import DhtNetwork from dht.network import DhtNetworkSubProcess -from dht.tests import PerformanceTest, PersistenceTest +from dht.tests import PerformanceTest, PersistenceTest, PhtTest from dht import virtual_network_builder from dht import network as dhtnetwork @@ -168,6 +168,10 @@ if __name__ == '__main__': featureArgs = parser.add_mutually_exclusive_group(required=True) featureArgs.add_argument('--performance', action='store_true', default=False, help='Launches performance benchmark test. Available args for "-t" are: gets.') + featureArgs.add_argument('--pht', action='store_true', default=False, + help='Launches PHT benchmark test. '\ + 'Available args for "-t" are: insert. '\ + 'Use "-m" option for fixing number of keys to create during the test.') featureArgs.add_argument('--data-persistence', action='store_true', default=0, help='Launches data persistence benchmark test. '\ 'Available args for "-t" are: delete, replace, mult_time. '\ @@ -217,6 +221,10 @@ if __name__ == '__main__': PerformanceTest(args.test, wb, test_opt).run() elif args.data_persistence: PersistenceTest(args.test, wb, test_opt).run() + elif args.pht: + if args.m: + test_opt.update({ 'num_keys' : args.m }) + PhtTest(args.test, wb, test_opt).run() except Exception as e: print(e) diff --git a/python/tools/dht/tests.py b/python/tools/dht/tests.py index a4a216ab..f8e84fcb 100644 --- a/python/tools/dht/tests.py +++ b/python/tools/dht/tests.py @@ -13,9 +13,12 @@ import subprocess import re import traceback +from matplotlib.ticker import FuncFormatter +import math + import numpy as np import matplotlib.pyplot as plt -from matplotlib.ticker import FuncFormatter +import networkx as nx from opendht import * from dht.network import DhtNetwork, DhtNetworkSubProcess @@ -156,13 +159,13 @@ def iftop_traffic_data(ifname, interval=2, rate_type='send_receive'): ########### class FeatureTest(object): - done = 0 - lock = None - """ This is a base test. """ + done = 0 + lock = None + def __init__(self, test, workbench): """ @param test: The test string indicating the test to run. This string is @@ -174,6 +177,7 @@ class FeatureTest(object): """ self._test = test self._workbench = workbench + self._bootstrap = self._workbench.get_bootstrap() def _reset(self): """ @@ -188,6 +192,138 @@ class FeatureTest(object): raise NotImplementedError('This method must be implemented.') +################################## +# PHT # +################################## + +class PhtTest(FeatureTest): + """TODO + """ + + indexEntries = None + prefix = None + key = None + + def __init__(self, test, workbench, opts): + """ + @param test: is one of the following: + - 'insert': indexes a considerable amount of data in + the PHT structure. + TODO + @type test: string + + @param opts: Dictionnary containing options for the test. Allowed + options are: + - 'num_keys': this specifies the number of keys to insert + in the PHT during the test. + @type opts: dict + """ + super(PhtTest, self).__init__(test, workbench) + self._num_keys = opts['num_keys'] if 'num_keys' in opts else 32 + + def _reset(self): + super(PhtTest, self)._reset() + PhtTest.indexEntries = [] + + @staticmethod + def lookupCb(vals, prefix): + PhtTest.indexEntries = list(vals) + PhtTest.prefix = prefix.decode() + DhtNetwork.log('Index name: <todo>') + DhtNetwork.log('Leaf prefix:', prefix) + for v in vals: + DhtNetwork.log('[ENTRY]:', v) + + @staticmethod + def lookupDoneCb(ok): + DhtNetwork.log('[LOOKUP]:', PhtTest.key, "--", "success!" if ok else "Fail...") + with FeatureTest.lock: + FeatureTest.lock.notify() + + @staticmethod + def insertDoneCb(ok): + DhtNetwork.log('[INSERT]:', PhtTest.key, "--", "success!" if ok else "Fail...") + with FeatureTest.lock: + FeatureTest.lock.notify() + + @staticmethod + def drawTrie(trie_dict): + """ + Draws the trie structure of the PHT from dictionnary. + + @param trie_dict: Dictionnary of index entries (prefix -> entry). + @type trie_dict: dict + """ + prefixes = list(trie_dict.keys()) + if len(prefixes) == 0: + return + + edges = set([]) + for prefix in prefixes: + for i in range(-1, len(prefix)-1): + u = prefix[:i+1] + edges.add( ("" if i == -1 else u, u+"0") ) + edges.add( ("" if i == -1 else u, u+"1") ) + + # TODO: use a binary tree position layout... + G = nx.Graph(list(edges)) + nx.draw(G, with_labels=True, node_color='white') + plt.show() + + def run(self): + try: + if self._test == 'insert': + self._massIndexTest() + except Exception as e: + print(e) + finally: + self._bootstrap.resize(1) + + ########### + # Tests # + ########### + + @reset_before_test + def _insertTest(self): + """TODO: Docstring for _massIndexTest. + """ + bootstrap = self._bootstrap + bootstrap.resize(2) + + dht = bootstrap.get(1) + pht = Pht(b'foo_index', dht) + + DhtNetwork.log('PHT has', + pht.MAX_NODE_ENTRY_COUNT, + 'node'+ ('s' if pht.MAX_NODE_ENTRY_COUNT > 1 else ''), + 'per leaf bucket.') + NUM_DIG = max(math.log(self._num_keys, 2)/4, 5) # at least 5 digit keys. + keys = [{ + 'foo' : + ''.join(random.SystemRandom().choice(string.hexdigits) + for _ in range(NUM_DIG)).encode() + } for n in range(self._num_keys)] + all_entries = {} + + # Index all entries. + for key in keys: + PhtTest.key = key + pht.insert(key, IndexValue(random_hash()), PhtTest.insertDoneCb) + with FeatureTest.lock: + FeatureTest.lock.wait() + + # Recover entries now that the trie is complete. + for key in keys: + PhtTest.key = key + pht.lookup(key, PhtTest.lookupCb, PhtTest.lookupDoneCb) + with FeatureTest.lock: + FeatureTest.lock.wait() + + all_entries[PhtTest.prefix] = [e.__str__() + for e in PhtTest.indexEntries] + + PhtTest.drawTrie(all_entries) + ################################## # DHT # @@ -195,7 +331,7 @@ class FeatureTest(object): class DhtFeatureTest(FeatureTest): """ - This is base test. A method run() implementation is required. + This is a base dht test. """ #static variables used by class callbacks successfullTransfer = lambda lv,fv: len(lv) == len(fv) @@ -204,7 +340,6 @@ class DhtFeatureTest(FeatureTest): def __init__(self, test, workbench): super(DhtFeatureTest, self).__init__(test, workbench) - self.bootstrap = self._workbench.get_bootstrap() def _reset(self): super(DhtFeatureTest, self)._reset() @@ -323,9 +458,9 @@ class PersistenceTest(DhtFeatureTest): config.setNodeId(InfoHash(_hash_str.encode())) n = DhtRunner() n.run(config=config) - n.bootstrap(self.bootstrap.ip4, - str(self.bootstrap.port)) - DhtNetwork.Log.log('Node','['+_hash_str+']', + n.bootstrap(self._bootstrap.ip4, + str(self._bootstrap.port)) + DhtNetwork.log('Node','['+_hash_str+']', 'started around', _hash.toString().decode() if n.isRunning() else 'failed to start...' @@ -333,7 +468,7 @@ class PersistenceTest(DhtFeatureTest): trigger_nodes.append(n) def _result(self, local_values, new_nodes): - bootstrap = self.bootstrap + bootstrap = self._bootstrap if not DhtFeatureTest.successfullTransfer(local_values, DhtFeatureTest.foreignValues): DhtNetwork.Log.log('[GET]: Only %s on %s values persisted.' % (len(DhtFeatureTest.foreignValues), len(local_values))) @@ -372,7 +507,7 @@ class PersistenceTest(DhtFeatureTest): plot_fname = "traffic-plot" print('plot saved to', plot_fname) plt.savefig(plot_fname) - self.bootstrap.resize(1) + self._bootstrap.resize(1) ########### # Tests # @@ -385,8 +520,7 @@ class PersistenceTest(DhtFeatureTest): """ trigger_nodes = [] wb = self._workbench - bootstrap = self.bootstrap - + bootstrap = self._bootstrap # Value representing an ICE packet. Each ICE packet is around 1KB. VALUE_SIZE = 1024 num_values_per_hash = self._num_values/wb.node_num if self._num_values else 5 @@ -481,13 +615,7 @@ class PersistenceTest(DhtFeatureTest): It uses Dht shutdown call from the API to gracefuly finish the nodes one after the other. """ - wb = self._workbench - if self._traffic_plot: - traffic_plot_thread = threading.Thread(target=display_traffic_plot, args=tuple(['br'+wb.ifname])) - traffic_plot_thread.daemon = True - traffic_plot_thread.start() - - bootstrap = self.bootstrap + bootstrap = self._bootstrap ops_count = [] @@ -573,7 +701,7 @@ class PersistenceTest(DhtFeatureTest): """ clusters = 8 - bootstrap = self.bootstrap + bootstrap = self._bootstrap bootstrap.resize(3) consumer = bootstrap.get(1) @@ -610,7 +738,7 @@ class PersistenceTest(DhtFeatureTest): minutes for the nodes to trigger storage maintenance. """ trigger_nodes = [] - bootstrap = self.bootstrap + bootstrap = self._bootstrap N_PRODUCERS = self._num_producers if self._num_values else 16 DP_TIMEOUT = 1 @@ -693,7 +821,7 @@ class PerformanceTest(DhtFeatureTest): traceback.print_tb(e.__traceback__) print(type(e).__name__+':', e, file=sys.stderr) finally: - self.bootstrap.resize(1) + self._bootstrap.resize(1) ########### @@ -705,7 +833,7 @@ class PerformanceTest(DhtFeatureTest): """ Tests for performance of the DHT doing multiple get() operation. """ - bootstrap = self.bootstrap + bootstrap = self._bootstrap plt.ion() @@ -794,8 +922,7 @@ class PerformanceTest(DhtFeatureTest): deleting around the target hash. """ - - bootstrap = self.bootstrap + bootstrap = self._bootstrap bootstrap.resize(3) consumer = bootstrap.get(1) -- GitLab