diff --git a/python/tools/benchmark.py b/python/tools/benchmark.py index eede0af1a80a119a34ed5f72bfb0f74a4e90770a..3288e8bb499de3124983d810d7b6b6242e620b35 100755 --- a/python/tools/benchmark.py +++ b/python/tools/benchmark.py @@ -354,98 +354,100 @@ class FeatureTest(object): """ This is base test. A method run() implementation is required. """ - def run(self): - raise NotImplementedError('This method must be implemented.') - -class PersistenceTest(FeatureTest): - """ - This tests persistence of data on the network. - """ - #static variables used by class callbacks - bootstrap = None + successfullTransfer = lambda lv,fv: len(lv) == len(fv) done = 0 lock = None foreign_nodes = None foreign_values = None - successfullTransfer = lambda lv,fv: len(lv) == len(fv) - - def __init__(self, test, workbench, *opts): - """ - @param test: is one of the following: - - 'mult_time': test persistence of data based on internal - OpenDHT storage maintenance timings. - - 'delete': test persistence of data upon deletion of - nodes. - - 'replace': replacing cluster successively. - @type test: string - - OPTIONS - - - dump_str_log: enables storage log at test ending. - """ + def __init__(self, test, workbench): self._test = test - self.wb = workbench - PersistenceTest.bootstrap = self.wb.get_bootstrap() - - # opts - self._dump_storage = True if 'dump_str_log' in opts else False - self._plot = True if 'plot' in opts else False + self.bootstrap = self.wb.get_bootstrap() @staticmethod def getcb(value): DhtNetwork.log('[GET]: %s' % value) - PersistenceTest.foreign_values.append(value) + FeatureTest.foreign_values.append(value) return True @staticmethod def putDoneCb(ok, nodes): if not ok: DhtNetwork.log("[PUT]: failed!") - with PersistenceTest.lock: - PersistenceTest.done -= 1 - PersistenceTest.lock.notify() + with FeatureTest.lock: + FeatureTest.done -= 1 + FeatureTest.lock.notify() @staticmethod def getDoneCb(ok, nodes): - with PersistenceTest.lock: + with FeatureTest.lock: if not ok: DhtNetwork.log("[GET]: failed!") else: for node in nodes: if not node.getNode().isExpired(): - PersistenceTest.foreign_nodes.append(node.getId().toString()) - PersistenceTest.done -= 1 - PersistenceTest.lock.notify() + FeatureTest.foreign_nodes.append(node.getId().toString()) + FeatureTest.done -= 1 + FeatureTest.lock.notify() def _dhtPut(self, producer, _hash, *values): for val in values: - with PersistenceTest.lock: + with FeatureTest.lock: DhtNetwork.log('[PUT]: %s' % val) - PersistenceTest.done += 1 - producer.put(_hash, val, PersistenceTest.putDoneCb) - while PersistenceTest.done > 0: - PersistenceTest.lock.wait() + FeatureTest.done += 1 + producer.put(_hash, val, FeatureTest.putDoneCb) + while FeatureTest.done > 0: + FeatureTest.lock.wait() def _dhtGet(self, consumer, _hash): - PersistenceTest.foreign_values = [] - PersistenceTest.foreign_nodes = [] - with PersistenceTest.lock: - PersistenceTest.done += 1 - consumer.get(_hash, PersistenceTest.getcb, PersistenceTest.getDoneCb) - while PersistenceTest.done > 0: - PersistenceTest.lock.wait() + FeatureTest.foreign_values = [] + FeatureTest.foreign_nodes = [] + with FeatureTest.lock: + FeatureTest.done += 1 + consumer.get(_hash, FeatureTest.getcb, FeatureTest.getDoneCb) + while FeatureTest.done > 0: + FeatureTest.lock.wait() + + + def run(self): + raise NotImplementedError('This method must be implemented.') + +class PersistenceTest(FeatureTest): + """ + This tests persistence of data on the network. + """ + + def __init__(self, test, workbench, *opts): + """ + @param test: is one of the following: + - 'mult_time': test persistence of data based on internal + OpenDHT storage maintenance timings. + - 'delete': test persistence of data upon deletion of + nodes. + - 'replace': replacing cluster successively. + @type test: string + + + OPTIONS + + - dump_str_log: enables storage log at test ending. + """ + + # opts + super(PersistenceTest, self).__init__(test, workbench) + self._dump_storage = True if 'dump_str_log' in opts else False + self._plot = True if 'plot' in opts else False def _result(self, local_values, new_nodes): - bootstrap = PersistenceTest.bootstrap - if not PersistenceTest.successfullTransfer(local_values, PersistenceTest.foreign_values): + bootstrap = self.bootstrap + if not FeatureTest.successfullTransfer(local_values, FeatureTest.foreign_values): DhtNetwork.log('[GET]: Only %s on %s values persisted.' % - (len(PersistenceTest.foreign_values), len(local_values))) + (len(FeatureTest.foreign_values), len(local_values))) else: DhtNetwork.log('[GET]: All values successfully persisted.') - if PersistenceTest.foreign_values: + if FeatureTest.foreign_values: if new_nodes: DhtNetwork.log('Values are newly found on:') for node in new_nodes: @@ -455,7 +457,7 @@ class PersistenceTest(FeatureTest): 'hosting nodes.') for proc in self.wb.procs: - proc.sendDumpStorage(PersistenceTest.foreign_nodes) + proc.sendDumpStorage(FeatureTest.foreign_nodes) else: DhtNetwork.log("Values didn't reach new hosting nodes after shutdown.") @@ -476,12 +478,12 @@ class PersistenceTest(FeatureTest): It uses Dht shutdown call from the API to gracefuly finish the nodes one after the other. """ - PersistenceTest.done = 0 - PersistenceTest.lock = threading.Condition() - PersistenceTest.foreign_nodes = [] - PersistenceTest.foreign_values = [] + FeatureTest.done = 0 + FeatureTest.lock = threading.Condition() + FeatureTest.foreign_nodes = [] + FeatureTest.foreign_values = [] - bootstrap = PersistenceTest.bootstrap + bootstrap = self.bootstrap ops_count = [] @@ -497,17 +499,17 @@ class PersistenceTest(FeatureTest): #checking if values were transfered self._dhtGet(consumer, myhash) - if not PersistenceTest.successfullTransfer(local_values, PersistenceTest.foreign_values): - if PersistenceTest.foreign_values: - DhtNetwork.log('[GET]: Only ', len(PersistenceTest.foreign_values) ,' on ', + if not FeatureTest.successfullTransfer(local_values, FeatureTest.foreign_values): + if FeatureTest.foreign_values: + DhtNetwork.log('[GET]: Only ', len(FeatureTest.foreign_values) ,' on ', len(local_values), ' values successfully put.') else: DhtNetwork.log('[GET]: 0 values successfully put') - if PersistenceTest.foreign_values and PersistenceTest.foreign_nodes: + if FeatureTest.foreign_values and FeatureTest.foreign_nodes: DhtNetwork.log('Values are found on :') - for node in PersistenceTest.foreign_nodes: + for node in FeatureTest.foreign_nodes: DhtNetwork.log(node) DhtNetwork.log("Waiting a minute for the network to settle down.") @@ -520,7 +522,7 @@ class PersistenceTest(FeatureTest): DhtNetwork.log('[REMOVE]: sending shutdown request to', proc) proc.sendNodesRequest( DhtNetworkSubProcess.SHUTDOWN_NODE_REQ, - PersistenceTest.foreign_nodes + FeatureTest.foreign_nodes ) DhtNetwork.log('sending message stats request') stats = proc.sendGetMessageStats() @@ -530,10 +532,10 @@ class PersistenceTest(FeatureTest): ops_count.append(cluster_ops_count/self.wb.node_num) # checking if values were transfered to new nodes - foreign_nodes_before_delete = PersistenceTest.foreign_nodes + foreign_nodes_before_delete = FeatureTest.foreign_nodes DhtNetwork.log('[GET]: trying to fetch persistent values') self._dhtGet(consumer, myhash) - new_nodes = set(PersistenceTest.foreign_nodes) - set(foreign_nodes_before_delete) + new_nodes = set(FeatureTest.foreign_nodes) - set(foreign_nodes_before_delete) self._result(local_values, new_nodes) @@ -554,14 +556,14 @@ class PersistenceTest(FeatureTest): """ It replaces all clusters one after the other. """ - PersistenceTest.done = 0 - PersistenceTest.lock = threading.Condition() - PersistenceTest.foreign_nodes = [] - PersistenceTest.foreign_values = [] + FeatureTest.done = 0 + FeatureTest.lock = threading.Condition() + FeatureTest.foreign_nodes = [] + FeatureTest.foreign_values = [] clusters = opts['clusters'] if 'clusters' in opts else 5 - bootstrap = PersistenceTest.bootstrap + bootstrap = self.bootstrap try: bootstrap.resize(3) @@ -573,7 +575,7 @@ class PersistenceTest(FeatureTest): self._dhtPut(producer, myhash, *local_values) self._dhtGet(consumer, myhash) - initial_nodes = PersistenceTest.foreign_nodes + initial_nodes = FeatureTest.foreign_nodes DhtNetwork.log('Replacing', clusters, 'random clusters successively...') for n in range(clusters): @@ -586,7 +588,7 @@ class PersistenceTest(FeatureTest): DhtNetwork.log('[GET]: trying to fetch persistent values') self._dhtGet(consumer, myhash) - new_nodes = set(PersistenceTest.foreign_nodes) - set(initial_nodes) + new_nodes = set(FeatureTest.foreign_nodes) - set(initial_nodes) self._result(local_values, new_nodes) @@ -602,11 +604,11 @@ class PersistenceTest(FeatureTest): enable storage maintenance each nodes. Therefor, this tests will wait 10 minutes for the nodes to trigger storage maintenance. """ - PersistenceTest.done = 0 - PersistenceTest.lock = threading.Condition() - PersistenceTest.foreign_nodes = [] - PersistenceTest.foreign_values = [] - bootstrap = PersistenceTest.bootstrap + FeatureTest.done = 0 + FeatureTest.lock = threading.Condition() + FeatureTest.foreign_nodes = [] + FeatureTest.foreign_values = [] + bootstrap = self.bootstrap N_PRODUCERS = 16 @@ -622,7 +624,7 @@ class PersistenceTest(FeatureTest): for h in hashes: self._dhtGet(consumer, h) if nodes is not None: - for n in PersistenceTest.foreign_nodes: + for n in FeatureTest.foreign_nodes: nodes.add(n) def createNodesAroundHash(_hash, radius=4): @@ -636,8 +638,8 @@ class PersistenceTest(FeatureTest): config.setNodeId(InfoHash(_hash_str.encode())) n = DhtRunner() n.run(config=config) - n.bootstrap(PersistenceTest.bootstrap.ip4, - str(PersistenceTest.bootstrap.port)) + n.bootstrap(self.bootstrap.ip4, + str(self.bootstrap.port)) flood_nodes.append(n) try: @@ -687,13 +689,16 @@ class PerformanceTest(FeatureTest): Tests for general performance of dht operations. """ - bootstrap = None - def __init__(self, test, workbench, *opts): - self._test = test - - self.wb = workbench - PerformanceTest.bootstrap = wb.get_bootstrap() + """ + @param test: is one of the following: + - 'gets': multiple get operations and statistical results. + - 'delete_put': perform multiple put() operations followed + by targeted deletion of nodes hosting the values. Doing + so until half of the nodes on the network remain. + @type test: string + """ + super(PerformanceTest, self).__init__(test, workbench) def run(self): if self._test == 'gets': @@ -703,7 +708,7 @@ class PerformanceTest(FeatureTest): """ Tests for performance of the DHT doing multiple get() operation. """ - bootstrap = PerformanceTest.bootstrap + bootstrap = self.bootstrap plt.ion()