diff --git a/python/tools/dht/tests.py b/python/tools/dht/tests.py index b94c3794c2a7eaec091a18c67aff70b7c9f0cdc2..42d9922d8c67a75f6e6f4dac2424fd3dbd0302b4 100644 --- a/python/tools/dht/tests.py +++ b/python/tools/dht/tests.py @@ -10,31 +10,70 @@ import time from opendht import * from dht.network import DhtNetwork, DhtNetworkSubProcess +###################### +# Common functions # +###################### def random_hash(): return InfoHash(''.join(random.SystemRandom().choice(string.hexdigits) for _ in range(40)).encode()) class FeatureTest(object): + done = 0 + lock = None + + """ + This is a base test. + """ + + def __init__(self, test, workbench): + """ + @param test: The test string indicating the test to run. This string is + determined in the child classes. + @type test: string + + @param workbench: A WorkBench object to use inside this test. + @type workbench: WorkBench + """ + self._test = test + self._workbench = workbench + + def reset(self): + FeatureTest.done = 0 + FeatureTest.lock = threading.Condition() + + def run(self): + raise NotImplementedError('This method must be implemented.') + + + +################################## +# DHT # +################################## + +class DhtFeatureTest(FeatureTest): """ This is base test. A method run() implementation is required. """ #static variables used by class callbacks successfullTransfer = lambda lv,fv: len(lv) == len(fv) - done = 0 - lock = None - foreign_nodes = None - foreign_values = None + foreignNodes = None + foreignValues = None def __init__(self, test, workbench): self._test = test self.wb = workbench self.bootstrap = self.wb.get_bootstrap() + def reset(self): + super(DhtFeatureTest, self).reset() + DhtFeatureTest.foreignNodes = [] + DhtFeatureTest.foreignValues = [] + @staticmethod def getcb(value): DhtNetwork.log('[GET]: %s' % value) - FeatureTest.foreign_values.append(value) + DhtFeatureTest.foreignValues.append(value) return True @staticmethod @@ -53,7 +92,7 @@ class FeatureTest(object): else: for node in nodes: if not node.getNode().isExpired(): - FeatureTest.foreign_nodes.append(node.getId().toString()) + DhtFeatureTest.foreignNodes.append(node.getId().toString()) FeatureTest.done -= 1 FeatureTest.lock.notify() @@ -62,25 +101,21 @@ class FeatureTest(object): with FeatureTest.lock: DhtNetwork.log('[PUT]: %s' % val) FeatureTest.done += 1 - producer.put(_hash, val, FeatureTest.putDoneCb) + producer.put(_hash, val, DhtFeatureTest.putDoneCb) while FeatureTest.done > 0: FeatureTest.lock.wait() def _dhtGet(self, consumer, _hash): - FeatureTest.foreign_values = [] - FeatureTest.foreign_nodes = [] + DhtFeatureTest.foreignValues = [] + DhtFeatureTest.foreignNodes = [] with FeatureTest.lock: FeatureTest.done += 1 - consumer.get(_hash, FeatureTest.getcb, FeatureTest.getDoneCb) + consumer.get(_hash, DhtFeatureTest.getcb, DhtFeatureTest.getDoneCb) while FeatureTest.done > 0: FeatureTest.lock.wait() - def run(self): - raise NotImplementedError('This method must be implemented.') - - -class PersistenceTest(FeatureTest): +class PersistenceTest(DhtFeatureTest): """ This tests persistence of data on the network. """ @@ -108,12 +143,12 @@ class PersistenceTest(FeatureTest): def _result(self, local_values, new_nodes): bootstrap = self.bootstrap - if not FeatureTest.successfullTransfer(local_values, FeatureTest.foreign_values): + if not DhtFeatureTest.successfullTransfer(local_values, DhtFeatureTest.foreignValues): DhtNetwork.log('[GET]: Only %s on %s values persisted.' % - (len(FeatureTest.foreign_values), len(local_values))) + (len(DhtFeatureTest.foreignValues), len(local_values))) else: DhtNetwork.log('[GET]: All values successfully persisted.') - if FeatureTest.foreign_values: + if DhtFeatureTest.foreignValues: if new_nodes: DhtNetwork.log('Values are newly found on:') for node in new_nodes: @@ -122,8 +157,8 @@ class PersistenceTest(FeatureTest): DhtNetwork.log('Dumping all storage log from '\ 'hosting nodes.') - for proc in self.wb.procs: - proc.sendDumpStorage(FeatureTest.foreign_nodes) + for proc in self._workbench.procs: + proc.sendDumpStorage(DhtFeatureTest.foreignNodes) else: DhtNetwork.log("Values didn't reach new hosting nodes after shutdown.") @@ -140,20 +175,16 @@ class PersistenceTest(FeatureTest): finally: self.bootstrap.resize(1) - #----------- - #- Tests - - #----------- + ########### + # Tests # + ########### def _deleteTest(self): """ It uses Dht shutdown call from the API to gracefuly finish the nodes one after the other. """ - FeatureTest.done = 0 - FeatureTest.lock = threading.Condition() - FeatureTest.foreign_nodes = [] - FeatureTest.foreign_values = [] - + self.reset(); bootstrap = self.bootstrap ops_count = [] @@ -169,43 +200,43 @@ class PersistenceTest(FeatureTest): #checking if values were transfered self._dhtGet(consumer, myhash) - if not FeatureTest.successfullTransfer(local_values, FeatureTest.foreign_values): - if FeatureTest.foreign_values: - DhtNetwork.log('[GET]: Only ', len(FeatureTest.foreign_values) ,' on ', + if not DhtFeatureTest.successfullTransfer(local_values, DhtFeatureTest.foreignValues): + if DhtFeatureTest.foreignValues: + DhtNetwork.log('[GET]: Only ', len(DhtFeatureTest.foreignValues) ,' on ', len(local_values), ' values successfully put.') else: DhtNetwork.log('[GET]: 0 values successfully put') - if FeatureTest.foreign_values and FeatureTest.foreign_nodes: + if DhtFeatureTest.foreignValues and DhtFeatureTest.foreignNodes: DhtNetwork.log('Values are found on :') - for node in FeatureTest.foreign_nodes: + for node in DhtFeatureTest.foreignNodes: DhtNetwork.log(node) #DhtNetwork.log("Waiting a minute for the network to settle down.") #time.sleep(60) - for _ in range(max(1, int(self.wb.node_num/32))): + for _ in range(max(1, int(self._workbench.node_num/32))): DhtNetwork.log('Removing all nodes hosting target values...') cluster_ops_count = 0 - for proc in self.wb.procs: + for proc in self._workbench.procs: DhtNetwork.log('[REMOVE]: sending shutdown request to', proc) proc.sendNodesRequest( DhtNetworkSubProcess.SHUTDOWN_NODE_REQ, - FeatureTest.foreign_nodes + DhtFeatureTest.foreignNodes ) DhtNetwork.log('sending message stats request') stats = proc.sendGetMessageStats() cluster_ops_count += sum(stats[1:]) DhtNetwork.log("3 seconds wait...") time.sleep(3) - ops_count.append(cluster_ops_count/self.wb.node_num) + ops_count.append(cluster_ops_count/self._workbench.node_num) # checking if values were transfered to new nodes - foreign_nodes_before_delete = FeatureTest.foreign_nodes + foreignNodes_before_delete = DhtFeatureTest.foreignNodes DhtNetwork.log('[GET]: trying to fetch persistent values') self._dhtGet(consumer, myhash) - new_nodes = set(FeatureTest.foreign_nodes) - set(foreign_nodes_before_delete) + new_nodes = set(DhtFeatureTest.foreignNodes) - set(foreignNodes_before_delete) self._result(local_values, new_nodes) @@ -222,10 +253,7 @@ class PersistenceTest(FeatureTest): """ It replaces all clusters one after the other. """ - FeatureTest.done = 0 - FeatureTest.lock = threading.Condition() - FeatureTest.foreign_nodes = [] - FeatureTest.foreign_values = [] + self.reset(); #clusters = opts['clusters'] if 'clusters' in opts else 5 clusters = 5 @@ -241,20 +269,20 @@ class PersistenceTest(FeatureTest): self._dhtPut(producer, myhash, *local_values) self._dhtGet(consumer, myhash) - initial_nodes = FeatureTest.foreign_nodes + initial_nodes = DhtFeatureTest.foreignNodes DhtNetwork.log('Replacing', clusters, 'random clusters successively...') for n in range(clusters): - i = random.randint(0, len(self.wb.procs)-1) - proc = self.wb.procs[i] + i = random.randint(0, len(self._workbench.procs)-1) + proc = self._workbench.procs[i] DhtNetwork.log('Replacing', proc) proc.sendShutdown() - self.wb.stop_cluster(i) - self.wb.start_cluster(i) + self._workbench.stop_cluster(i) + self._workbench.start_cluster(i) DhtNetwork.log('[GET]: trying to fetch persistent values') self._dhtGet(consumer, myhash) - new_nodes = set(FeatureTest.foreign_nodes) - set(initial_nodes) + new_nodes = set(DhtFeatureTest.foreignNodes) - set(initial_nodes) self._result(local_values, new_nodes) @@ -266,10 +294,7 @@ class PersistenceTest(FeatureTest): enable storage maintenance each nodes. Therefor, this tests will wait 10 minutes for the nodes to trigger storage maintenance. """ - FeatureTest.done = 0 - FeatureTest.lock = threading.Condition() - FeatureTest.foreign_nodes = [] - FeatureTest.foreign_values = [] + self.reset(); bootstrap = self.bootstrap N_PRODUCERS = 16 @@ -286,7 +311,7 @@ class PersistenceTest(FeatureTest): for h in hashes: self._dhtGet(consumer, h) if nodes is not None: - for n in FeatureTest.foreign_nodes: + for n in DhtFeatureTest.foreignNodes: nodes.add(n) def createNodesAroundHash(_hash, radius=4): @@ -325,7 +350,7 @@ class PersistenceTest(FeatureTest): time.sleep(10*60) DhtNetwork.log('Deleting old nodes from previous search.') - for proc in self.wb.procs: + for proc in self._workbench.procs: DhtNetwork.log('[REMOVE]: sending shutdown request to', proc) proc.sendNodesRequest( DhtNetworkSubProcess.REMOVE_NODE_REQ, @@ -341,7 +366,7 @@ class PersistenceTest(FeatureTest): self._result(values, nodes_after_time - nodes) -class PerformanceTest(FeatureTest): +class PerformanceTest(DhtFeatureTest): """ Tests for general performance of dht operations. """ @@ -437,7 +462,7 @@ class PerformanceTest(FeatureTest): times = [] for n in range(10): - self.wb.replace_cluster() + self._workbench.replace_cluster() plt.pause(2) DhtNetwork.log("Getting 50 random hashes succesively.") for i in range(50): @@ -462,10 +487,7 @@ class PerformanceTest(FeatureTest): deleting around the target hash. """ - FeatureTest.done = 0 - FeatureTest.lock = threading.Condition() - FeatureTest.foreign_nodes = [] - FeatureTest.foreign_values = [] + self.reset(); bootstrap = self.bootstrap @@ -476,7 +498,7 @@ class PerformanceTest(FeatureTest): myhash = random_hash() local_values = [Value(b'foo'), Value(b'bar'), Value(b'foobar')] - for _ in range(max(1, int(self.wb.node_num/32))): + for _ in range(max(1, int(self._workbench.node_num/32))): self._dhtGet(consumer, myhash) DhtNetwork.log("Waiting 15 seconds...") time.sleep(15) @@ -486,20 +508,20 @@ class PerformanceTest(FeatureTest): #checking if values were transfered self._dhtGet(consumer, myhash) DhtNetwork.log('Values are found on :') - for node in FeatureTest.foreign_nodes: + for node in DhtFeatureTest.foreignNodes: DhtNetwork.log(node) - if not FeatureTest.successfullTransfer(local_values, FeatureTest.foreign_values): - if FeatureTest.foreign_values: - DhtNetwork.log('[GET]: Only ', len(FeatureTest.foreign_values) ,' on ', + if not DhtFeatureTest.successfullTransfer(local_values, DhtFeatureTest.foreignValues): + if DhtFeatureTest.foreignValues: + DhtNetwork.log('[GET]: Only ', len(DhtFeatureTest.foreignValues) ,' on ', len(local_values), ' values successfully put.') else: DhtNetwork.log('[GET]: 0 values successfully put') DhtNetwork.log('Removing all nodes hosting target values...') - for proc in self.wb.procs: + for proc in self._workbench.procs: DhtNetwork.log('[REMOVE]: sending shutdown request to', proc) proc.sendNodesRequest( DhtNetworkSubProcess.SHUTDOWN_NODE_REQ, - FeatureTest.foreign_nodes + DhtFeatureTest.foreignNodes )