Skip to content
Snippets Groups Projects
Commit 5f9e2099 authored by Simon Désaulniers's avatar Simon Désaulniers
Browse files

python tests refactoring

parent 6910a7c0
Branches
Tags
No related merge requests found
......@@ -10,31 +10,70 @@ import time
from opendht import *
from dht.network import DhtNetwork, DhtNetworkSubProcess
######################
# Common functions #
######################
def random_hash():
return InfoHash(''.join(random.SystemRandom().choice(string.hexdigits) for _ in range(40)).encode())
class FeatureTest(object):
done = 0
lock = None
"""
This is a base test.
"""
def __init__(self, test, workbench):
"""
@param test: The test string indicating the test to run. This string is
determined in the child classes.
@type test: string
@param workbench: A WorkBench object to use inside this test.
@type workbench: WorkBench
"""
self._test = test
self._workbench = workbench
def reset(self):
FeatureTest.done = 0
FeatureTest.lock = threading.Condition()
def run(self):
raise NotImplementedError('This method must be implemented.')
##################################
# DHT #
##################################
class DhtFeatureTest(FeatureTest):
"""
This is base test. A method run() implementation is required.
"""
#static variables used by class callbacks
successfullTransfer = lambda lv,fv: len(lv) == len(fv)
done = 0
lock = None
foreign_nodes = None
foreign_values = None
foreignNodes = None
foreignValues = None
def __init__(self, test, workbench):
self._test = test
self.wb = workbench
self.bootstrap = self.wb.get_bootstrap()
def reset(self):
super(DhtFeatureTest, self).reset()
DhtFeatureTest.foreignNodes = []
DhtFeatureTest.foreignValues = []
@staticmethod
def getcb(value):
DhtNetwork.log('[GET]: %s' % value)
FeatureTest.foreign_values.append(value)
DhtFeatureTest.foreignValues.append(value)
return True
@staticmethod
......@@ -53,7 +92,7 @@ class FeatureTest(object):
else:
for node in nodes:
if not node.getNode().isExpired():
FeatureTest.foreign_nodes.append(node.getId().toString())
DhtFeatureTest.foreignNodes.append(node.getId().toString())
FeatureTest.done -= 1
FeatureTest.lock.notify()
......@@ -62,25 +101,21 @@ class FeatureTest(object):
with FeatureTest.lock:
DhtNetwork.log('[PUT]: %s' % val)
FeatureTest.done += 1
producer.put(_hash, val, FeatureTest.putDoneCb)
producer.put(_hash, val, DhtFeatureTest.putDoneCb)
while FeatureTest.done > 0:
FeatureTest.lock.wait()
def _dhtGet(self, consumer, _hash):
FeatureTest.foreign_values = []
FeatureTest.foreign_nodes = []
DhtFeatureTest.foreignValues = []
DhtFeatureTest.foreignNodes = []
with FeatureTest.lock:
FeatureTest.done += 1
consumer.get(_hash, FeatureTest.getcb, FeatureTest.getDoneCb)
consumer.get(_hash, DhtFeatureTest.getcb, DhtFeatureTest.getDoneCb)
while FeatureTest.done > 0:
FeatureTest.lock.wait()
def run(self):
raise NotImplementedError('This method must be implemented.')
class PersistenceTest(FeatureTest):
class PersistenceTest(DhtFeatureTest):
"""
This tests persistence of data on the network.
"""
......@@ -108,12 +143,12 @@ class PersistenceTest(FeatureTest):
def _result(self, local_values, new_nodes):
bootstrap = self.bootstrap
if not FeatureTest.successfullTransfer(local_values, FeatureTest.foreign_values):
if not DhtFeatureTest.successfullTransfer(local_values, DhtFeatureTest.foreignValues):
DhtNetwork.log('[GET]: Only %s on %s values persisted.' %
(len(FeatureTest.foreign_values), len(local_values)))
(len(DhtFeatureTest.foreignValues), len(local_values)))
else:
DhtNetwork.log('[GET]: All values successfully persisted.')
if FeatureTest.foreign_values:
if DhtFeatureTest.foreignValues:
if new_nodes:
DhtNetwork.log('Values are newly found on:')
for node in new_nodes:
......@@ -122,8 +157,8 @@ class PersistenceTest(FeatureTest):
DhtNetwork.log('Dumping all storage log from '\
'hosting nodes.')
for proc in self.wb.procs:
proc.sendDumpStorage(FeatureTest.foreign_nodes)
for proc in self._workbench.procs:
proc.sendDumpStorage(DhtFeatureTest.foreignNodes)
else:
DhtNetwork.log("Values didn't reach new hosting nodes after shutdown.")
......@@ -140,20 +175,16 @@ class PersistenceTest(FeatureTest):
finally:
self.bootstrap.resize(1)
#-----------
#- Tests -
#-----------
###########
# Tests #
###########
def _deleteTest(self):
"""
It uses Dht shutdown call from the API to gracefuly finish the nodes one
after the other.
"""
FeatureTest.done = 0
FeatureTest.lock = threading.Condition()
FeatureTest.foreign_nodes = []
FeatureTest.foreign_values = []
self.reset();
bootstrap = self.bootstrap
ops_count = []
......@@ -169,43 +200,43 @@ class PersistenceTest(FeatureTest):
#checking if values were transfered
self._dhtGet(consumer, myhash)
if not FeatureTest.successfullTransfer(local_values, FeatureTest.foreign_values):
if FeatureTest.foreign_values:
DhtNetwork.log('[GET]: Only ', len(FeatureTest.foreign_values) ,' on ',
if not DhtFeatureTest.successfullTransfer(local_values, DhtFeatureTest.foreignValues):
if DhtFeatureTest.foreignValues:
DhtNetwork.log('[GET]: Only ', len(DhtFeatureTest.foreignValues) ,' on ',
len(local_values), ' values successfully put.')
else:
DhtNetwork.log('[GET]: 0 values successfully put')
if FeatureTest.foreign_values and FeatureTest.foreign_nodes:
if DhtFeatureTest.foreignValues and DhtFeatureTest.foreignNodes:
DhtNetwork.log('Values are found on :')
for node in FeatureTest.foreign_nodes:
for node in DhtFeatureTest.foreignNodes:
DhtNetwork.log(node)
#DhtNetwork.log("Waiting a minute for the network to settle down.")
#time.sleep(60)
for _ in range(max(1, int(self.wb.node_num/32))):
for _ in range(max(1, int(self._workbench.node_num/32))):
DhtNetwork.log('Removing all nodes hosting target values...')
cluster_ops_count = 0
for proc in self.wb.procs:
for proc in self._workbench.procs:
DhtNetwork.log('[REMOVE]: sending shutdown request to', proc)
proc.sendNodesRequest(
DhtNetworkSubProcess.SHUTDOWN_NODE_REQ,
FeatureTest.foreign_nodes
DhtFeatureTest.foreignNodes
)
DhtNetwork.log('sending message stats request')
stats = proc.sendGetMessageStats()
cluster_ops_count += sum(stats[1:])
DhtNetwork.log("3 seconds wait...")
time.sleep(3)
ops_count.append(cluster_ops_count/self.wb.node_num)
ops_count.append(cluster_ops_count/self._workbench.node_num)
# checking if values were transfered to new nodes
foreign_nodes_before_delete = FeatureTest.foreign_nodes
foreignNodes_before_delete = DhtFeatureTest.foreignNodes
DhtNetwork.log('[GET]: trying to fetch persistent values')
self._dhtGet(consumer, myhash)
new_nodes = set(FeatureTest.foreign_nodes) - set(foreign_nodes_before_delete)
new_nodes = set(DhtFeatureTest.foreignNodes) - set(foreignNodes_before_delete)
self._result(local_values, new_nodes)
......@@ -222,10 +253,7 @@ class PersistenceTest(FeatureTest):
"""
It replaces all clusters one after the other.
"""
FeatureTest.done = 0
FeatureTest.lock = threading.Condition()
FeatureTest.foreign_nodes = []
FeatureTest.foreign_values = []
self.reset();
#clusters = opts['clusters'] if 'clusters' in opts else 5
clusters = 5
......@@ -241,20 +269,20 @@ class PersistenceTest(FeatureTest):
self._dhtPut(producer, myhash, *local_values)
self._dhtGet(consumer, myhash)
initial_nodes = FeatureTest.foreign_nodes
initial_nodes = DhtFeatureTest.foreignNodes
DhtNetwork.log('Replacing', clusters, 'random clusters successively...')
for n in range(clusters):
i = random.randint(0, len(self.wb.procs)-1)
proc = self.wb.procs[i]
i = random.randint(0, len(self._workbench.procs)-1)
proc = self._workbench.procs[i]
DhtNetwork.log('Replacing', proc)
proc.sendShutdown()
self.wb.stop_cluster(i)
self.wb.start_cluster(i)
self._workbench.stop_cluster(i)
self._workbench.start_cluster(i)
DhtNetwork.log('[GET]: trying to fetch persistent values')
self._dhtGet(consumer, myhash)
new_nodes = set(FeatureTest.foreign_nodes) - set(initial_nodes)
new_nodes = set(DhtFeatureTest.foreignNodes) - set(initial_nodes)
self._result(local_values, new_nodes)
......@@ -266,10 +294,7 @@ class PersistenceTest(FeatureTest):
enable storage maintenance each nodes. Therefor, this tests will wait 10
minutes for the nodes to trigger storage maintenance.
"""
FeatureTest.done = 0
FeatureTest.lock = threading.Condition()
FeatureTest.foreign_nodes = []
FeatureTest.foreign_values = []
self.reset();
bootstrap = self.bootstrap
N_PRODUCERS = 16
......@@ -286,7 +311,7 @@ class PersistenceTest(FeatureTest):
for h in hashes:
self._dhtGet(consumer, h)
if nodes is not None:
for n in FeatureTest.foreign_nodes:
for n in DhtFeatureTest.foreignNodes:
nodes.add(n)
def createNodesAroundHash(_hash, radius=4):
......@@ -325,7 +350,7 @@ class PersistenceTest(FeatureTest):
time.sleep(10*60)
DhtNetwork.log('Deleting old nodes from previous search.')
for proc in self.wb.procs:
for proc in self._workbench.procs:
DhtNetwork.log('[REMOVE]: sending shutdown request to', proc)
proc.sendNodesRequest(
DhtNetworkSubProcess.REMOVE_NODE_REQ,
......@@ -341,7 +366,7 @@ class PersistenceTest(FeatureTest):
self._result(values, nodes_after_time - nodes)
class PerformanceTest(FeatureTest):
class PerformanceTest(DhtFeatureTest):
"""
Tests for general performance of dht operations.
"""
......@@ -437,7 +462,7 @@ class PerformanceTest(FeatureTest):
times = []
for n in range(10):
self.wb.replace_cluster()
self._workbench.replace_cluster()
plt.pause(2)
DhtNetwork.log("Getting 50 random hashes succesively.")
for i in range(50):
......@@ -462,10 +487,7 @@ class PerformanceTest(FeatureTest):
deleting around the target hash.
"""
FeatureTest.done = 0
FeatureTest.lock = threading.Condition()
FeatureTest.foreign_nodes = []
FeatureTest.foreign_values = []
self.reset();
bootstrap = self.bootstrap
......@@ -476,7 +498,7 @@ class PerformanceTest(FeatureTest):
myhash = random_hash()
local_values = [Value(b'foo'), Value(b'bar'), Value(b'foobar')]
for _ in range(max(1, int(self.wb.node_num/32))):
for _ in range(max(1, int(self._workbench.node_num/32))):
self._dhtGet(consumer, myhash)
DhtNetwork.log("Waiting 15 seconds...")
time.sleep(15)
......@@ -486,20 +508,20 @@ class PerformanceTest(FeatureTest):
#checking if values were transfered
self._dhtGet(consumer, myhash)
DhtNetwork.log('Values are found on :')
for node in FeatureTest.foreign_nodes:
for node in DhtFeatureTest.foreignNodes:
DhtNetwork.log(node)
if not FeatureTest.successfullTransfer(local_values, FeatureTest.foreign_values):
if FeatureTest.foreign_values:
DhtNetwork.log('[GET]: Only ', len(FeatureTest.foreign_values) ,' on ',
if not DhtFeatureTest.successfullTransfer(local_values, DhtFeatureTest.foreignValues):
if DhtFeatureTest.foreignValues:
DhtNetwork.log('[GET]: Only ', len(DhtFeatureTest.foreignValues) ,' on ',
len(local_values), ' values successfully put.')
else:
DhtNetwork.log('[GET]: 0 values successfully put')
DhtNetwork.log('Removing all nodes hosting target values...')
for proc in self.wb.procs:
for proc in self._workbench.procs:
DhtNetwork.log('[REMOVE]: sending shutdown request to', proc)
proc.sendNodesRequest(
DhtNetworkSubProcess.SHUTDOWN_NODE_REQ,
FeatureTest.foreign_nodes
DhtFeatureTest.foreignNodes
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment