diff --git a/python/tools/benchmark.py b/python/tools/benchmark.py index 21a32956ea36e85bac45487dae9bd6f407fff1b8..accf83111c80ff19e4cf8cdad8b2d07028451306 100755 --- a/python/tools/benchmark.py +++ b/python/tools/benchmark.py @@ -23,9 +23,6 @@ import signal import argparse import time -import numpy as np -import matplotlib.pyplot as plt - from dht.network import DhtNetwork from dht.network import DhtNetworkSubProcess from dht.tests import PerformanceTest, PersistenceTest diff --git a/python/tools/dht/tests.py b/python/tools/dht/tests.py index b94c3794c2a7eaec091a18c67aff70b7c9f0cdc2..46f984b717de7810f237a26fd8e270252428563e 100644 --- a/python/tools/dht/tests.py +++ b/python/tools/dht/tests.py @@ -7,34 +7,96 @@ import random import string import time +import numpy as np +import matplotlib.pyplot as plt + from opendht import * from dht.network import DhtNetwork, DhtNetworkSubProcess +###################### +# Common functions # +###################### def random_hash(): return InfoHash(''.join(random.SystemRandom().choice(string.hexdigits) for _ in range(40)).encode()) +def reset_before_test(featureTestMethod): + """ + This is a decorator for all test methods needing reset(). + + @param featureTestMethod: The method to be decorated. All decorated methods + must have 'self' object as first arg. + @type featureTestMethod: function + """ + def call(*args, **kwargs): + self = args[0] + if isinstance(self, FeatureTest): + self._reset() + return featureTestMethod(*args, **kwargs) + return call + + class FeatureTest(object): + done = 0 + lock = None + + """ + This is a base test. + """ + + def __init__(self, test, workbench): + """ + @param test: The test string indicating the test to run. This string is + determined in the child classes. + @type test: string + + @param workbench: A WorkBench object to use inside this test. + @type workbench: WorkBench + """ + self._test = test + self._workbench = workbench + + def _reset(self): + """ + Resets some static variables. + + This method is most likely going to be called before each tests. + """ + FeatureTest.done = 0 + FeatureTest.lock = threading.Condition() + + def run(self): + raise NotImplementedError('This method must be implemented.') + + + +################################## +# DHT # +################################## + +class DhtFeatureTest(FeatureTest): """ This is base test. A method run() implementation is required. """ #static variables used by class callbacks successfullTransfer = lambda lv,fv: len(lv) == len(fv) - done = 0 - lock = None - foreign_nodes = None - foreign_values = None + foreignNodes = None + foreignValues = None def __init__(self, test, workbench): - self._test = test - self.wb = workbench - self.bootstrap = self.wb.get_bootstrap() + super(DhtFeatureTest, self).__init__(test, workbench) + self.bootstrap = self._workbench.get_bootstrap() + + def _reset(self): + super(DhtFeatureTest, self)._reset() + DhtFeatureTest.foreignNodes = [] + DhtFeatureTest.foreignValues = [] @staticmethod def getcb(value): DhtNetwork.log('[GET]: %s' % value) - FeatureTest.foreign_values.append(value) + DhtFeatureTest.foreignValues.append(value) return True @staticmethod @@ -53,7 +115,7 @@ class FeatureTest(object): else: for node in nodes: if not node.getNode().isExpired(): - FeatureTest.foreign_nodes.append(node.getId().toString()) + DhtFeatureTest.foreignNodes.append(node.getId().toString()) FeatureTest.done -= 1 FeatureTest.lock.notify() @@ -62,25 +124,21 @@ class FeatureTest(object): with FeatureTest.lock: DhtNetwork.log('[PUT]: %s' % val) FeatureTest.done += 1 - producer.put(_hash, val, FeatureTest.putDoneCb) + producer.put(_hash, val, DhtFeatureTest.putDoneCb) while FeatureTest.done > 0: FeatureTest.lock.wait() def _dhtGet(self, consumer, _hash): - FeatureTest.foreign_values = [] - FeatureTest.foreign_nodes = [] + DhtFeatureTest.foreignValues = [] + DhtFeatureTest.foreignNodes = [] with FeatureTest.lock: FeatureTest.done += 1 - consumer.get(_hash, FeatureTest.getcb, FeatureTest.getDoneCb) + consumer.get(_hash, DhtFeatureTest.getcb, DhtFeatureTest.getDoneCb) while FeatureTest.done > 0: FeatureTest.lock.wait() - def run(self): - raise NotImplementedError('This method must be implemented.') - - -class PersistenceTest(FeatureTest): +class PersistenceTest(DhtFeatureTest): """ This tests persistence of data on the network. """ @@ -108,12 +166,12 @@ class PersistenceTest(FeatureTest): def _result(self, local_values, new_nodes): bootstrap = self.bootstrap - if not FeatureTest.successfullTransfer(local_values, FeatureTest.foreign_values): + if not DhtFeatureTest.successfullTransfer(local_values, DhtFeatureTest.foreignValues): DhtNetwork.log('[GET]: Only %s on %s values persisted.' % - (len(FeatureTest.foreign_values), len(local_values))) + (len(DhtFeatureTest.foreignValues), len(local_values))) else: DhtNetwork.log('[GET]: All values successfully persisted.') - if FeatureTest.foreign_values: + if DhtFeatureTest.foreignValues: if new_nodes: DhtNetwork.log('Values are newly found on:') for node in new_nodes: @@ -122,8 +180,8 @@ class PersistenceTest(FeatureTest): DhtNetwork.log('Dumping all storage log from '\ 'hosting nodes.') - for proc in self.wb.procs: - proc.sendDumpStorage(FeatureTest.foreign_nodes) + for proc in self._workbench.procs: + proc.sendDumpStorage(DhtFeatureTest.foreignNodes) else: DhtNetwork.log("Values didn't reach new hosting nodes after shutdown.") @@ -140,20 +198,16 @@ class PersistenceTest(FeatureTest): finally: self.bootstrap.resize(1) - #----------- - #- Tests - - #----------- + ########### + # Tests # + ########### + @reset_before_test def _deleteTest(self): """ It uses Dht shutdown call from the API to gracefuly finish the nodes one after the other. """ - FeatureTest.done = 0 - FeatureTest.lock = threading.Condition() - FeatureTest.foreign_nodes = [] - FeatureTest.foreign_values = [] - bootstrap = self.bootstrap ops_count = [] @@ -169,43 +223,43 @@ class PersistenceTest(FeatureTest): #checking if values were transfered self._dhtGet(consumer, myhash) - if not FeatureTest.successfullTransfer(local_values, FeatureTest.foreign_values): - if FeatureTest.foreign_values: - DhtNetwork.log('[GET]: Only ', len(FeatureTest.foreign_values) ,' on ', + if not DhtFeatureTest.successfullTransfer(local_values, DhtFeatureTest.foreignValues): + if DhtFeatureTest.foreignValues: + DhtNetwork.log('[GET]: Only ', len(DhtFeatureTest.foreignValues) ,' on ', len(local_values), ' values successfully put.') else: DhtNetwork.log('[GET]: 0 values successfully put') - if FeatureTest.foreign_values and FeatureTest.foreign_nodes: + if DhtFeatureTest.foreignValues and DhtFeatureTest.foreignNodes: DhtNetwork.log('Values are found on :') - for node in FeatureTest.foreign_nodes: + for node in DhtFeatureTest.foreignNodes: DhtNetwork.log(node) #DhtNetwork.log("Waiting a minute for the network to settle down.") #time.sleep(60) - for _ in range(max(1, int(self.wb.node_num/32))): + for _ in range(max(1, int(self._workbench.node_num/32))): DhtNetwork.log('Removing all nodes hosting target values...') cluster_ops_count = 0 - for proc in self.wb.procs: + for proc in self._workbench.procs: DhtNetwork.log('[REMOVE]: sending shutdown request to', proc) proc.sendNodesRequest( DhtNetworkSubProcess.SHUTDOWN_NODE_REQ, - FeatureTest.foreign_nodes + DhtFeatureTest.foreignNodes ) DhtNetwork.log('sending message stats request') stats = proc.sendGetMessageStats() cluster_ops_count += sum(stats[1:]) DhtNetwork.log("3 seconds wait...") time.sleep(3) - ops_count.append(cluster_ops_count/self.wb.node_num) + ops_count.append(cluster_ops_count/self._workbench.node_num) # checking if values were transfered to new nodes - foreign_nodes_before_delete = FeatureTest.foreign_nodes + foreignNodes_before_delete = DhtFeatureTest.foreignNodes DhtNetwork.log('[GET]: trying to fetch persistent values') self._dhtGet(consumer, myhash) - new_nodes = set(FeatureTest.foreign_nodes) - set(foreign_nodes_before_delete) + new_nodes = set(DhtFeatureTest.foreignNodes) - set(foreignNodes_before_delete) self._result(local_values, new_nodes) @@ -218,14 +272,11 @@ class PersistenceTest(FeatureTest): DhtNetwork.log("[GET]: either couldn't fetch values or nodes hosting values...") #TODO: complete this test. + @reset_before_test def _replaceClusterTest(self): """ It replaces all clusters one after the other. """ - FeatureTest.done = 0 - FeatureTest.lock = threading.Condition() - FeatureTest.foreign_nodes = [] - FeatureTest.foreign_values = [] #clusters = opts['clusters'] if 'clusters' in opts else 5 clusters = 5 @@ -241,24 +292,25 @@ class PersistenceTest(FeatureTest): self._dhtPut(producer, myhash, *local_values) self._dhtGet(consumer, myhash) - initial_nodes = FeatureTest.foreign_nodes + initial_nodes = DhtFeatureTest.foreignNodes DhtNetwork.log('Replacing', clusters, 'random clusters successively...') for n in range(clusters): - i = random.randint(0, len(self.wb.procs)-1) - proc = self.wb.procs[i] + i = random.randint(0, len(self._workbench.procs)-1) + proc = self._workbench.procs[i] DhtNetwork.log('Replacing', proc) proc.sendShutdown() - self.wb.stop_cluster(i) - self.wb.start_cluster(i) + self._workbench.stop_cluster(i) + self._workbench.start_cluster(i) DhtNetwork.log('[GET]: trying to fetch persistent values') self._dhtGet(consumer, myhash) - new_nodes = set(FeatureTest.foreign_nodes) - set(initial_nodes) + new_nodes = set(DhtFeatureTest.foreignNodes) - set(initial_nodes) self._result(local_values, new_nodes) #TODO: complete this test. + @reset_before_test def _multTimeTest(self): """ Multiple put() calls are made from multiple nodes to multiple hashes @@ -266,10 +318,6 @@ class PersistenceTest(FeatureTest): enable storage maintenance each nodes. Therefor, this tests will wait 10 minutes for the nodes to trigger storage maintenance. """ - FeatureTest.done = 0 - FeatureTest.lock = threading.Condition() - FeatureTest.foreign_nodes = [] - FeatureTest.foreign_values = [] bootstrap = self.bootstrap N_PRODUCERS = 16 @@ -286,7 +334,7 @@ class PersistenceTest(FeatureTest): for h in hashes: self._dhtGet(consumer, h) if nodes is not None: - for n in FeatureTest.foreign_nodes: + for n in DhtFeatureTest.foreignNodes: nodes.add(n) def createNodesAroundHash(_hash, radius=4): @@ -325,7 +373,7 @@ class PersistenceTest(FeatureTest): time.sleep(10*60) DhtNetwork.log('Deleting old nodes from previous search.') - for proc in self.wb.procs: + for proc in self._workbench.procs: DhtNetwork.log('[REMOVE]: sending shutdown request to', proc) proc.sendNodesRequest( DhtNetworkSubProcess.REMOVE_NODE_REQ, @@ -341,7 +389,7 @@ class PersistenceTest(FeatureTest): self._result(values, nodes_after_time - nodes) -class PerformanceTest(FeatureTest): +class PerformanceTest(DhtFeatureTest): """ Tests for general performance of dht operations. """ @@ -373,6 +421,7 @@ class PerformanceTest(FeatureTest): # Tests # ########### + @reset_before_test def _getsTimesTest(self): """ Tests for performance of the DHT doing multiple get() operation. @@ -437,7 +486,7 @@ class PerformanceTest(FeatureTest): times = [] for n in range(10): - self.wb.replace_cluster() + self._workbench.replace_cluster() plt.pause(2) DhtNetwork.log("Getting 50 random hashes succesively.") for i in range(50): @@ -456,16 +505,13 @@ class PerformanceTest(FeatureTest): plt.ioff() plt.show() + @reset_before_test def _delete(self): """ Tests for performance of get() and put() operations on the network while deleting around the target hash. """ - FeatureTest.done = 0 - FeatureTest.lock = threading.Condition() - FeatureTest.foreign_nodes = [] - FeatureTest.foreign_values = [] bootstrap = self.bootstrap @@ -476,7 +522,7 @@ class PerformanceTest(FeatureTest): myhash = random_hash() local_values = [Value(b'foo'), Value(b'bar'), Value(b'foobar')] - for _ in range(max(1, int(self.wb.node_num/32))): + for _ in range(max(1, int(self._workbench.node_num/32))): self._dhtGet(consumer, myhash) DhtNetwork.log("Waiting 15 seconds...") time.sleep(15) @@ -486,20 +532,20 @@ class PerformanceTest(FeatureTest): #checking if values were transfered self._dhtGet(consumer, myhash) DhtNetwork.log('Values are found on :') - for node in FeatureTest.foreign_nodes: + for node in DhtFeatureTest.foreignNodes: DhtNetwork.log(node) - if not FeatureTest.successfullTransfer(local_values, FeatureTest.foreign_values): - if FeatureTest.foreign_values: - DhtNetwork.log('[GET]: Only ', len(FeatureTest.foreign_values) ,' on ', + if not DhtFeatureTest.successfullTransfer(local_values, DhtFeatureTest.foreignValues): + if DhtFeatureTest.foreignValues: + DhtNetwork.log('[GET]: Only ', len(DhtFeatureTest.foreignValues) ,' on ', len(local_values), ' values successfully put.') else: DhtNetwork.log('[GET]: 0 values successfully put') DhtNetwork.log('Removing all nodes hosting target values...') - for proc in self.wb.procs: + for proc in self._workbench.procs: DhtNetwork.log('[REMOVE]: sending shutdown request to', proc) proc.sendNodesRequest( DhtNetworkSubProcess.SHUTDOWN_NODE_REQ, - FeatureTest.foreign_nodes + DhtFeatureTest.foreignNodes )