Skip to content
Snippets Groups Projects
Unverified Commit a608ad3c authored by Simon Désaulniers's avatar Simon Désaulniers
Browse files

benchmark: send new distinct values during test

During totallyNormalTest, an initial set of values would be put on the dht.
Then, random values from this same set would be put over time, but since values
would already be there, no put were really made.
parent a135f810
Branches
Tags
No related merge requests found
......@@ -231,7 +231,7 @@ class DhtNetworkSubProcess(NSPopen):
self._sendRequest(DhtNetworkSubProcess.MESSAGE_STATS, answer_cb=cb)
return stats
def sendNodePutRequest(self, _hash, value):
def sendClusterPutRequest(self, _hash, value):
"""
Sends a put operation request.
......@@ -243,7 +243,7 @@ class DhtNetworkSubProcess(NSPopen):
self._sendRequest(b_space_join(DhtNetworkSubProcess.NODE_PUT_REQ, _hash,
value))
def sendNodesRequest(self, request, ids=b''):
def sendClusterRequest(self, request, ids=b''):
"""
Send request to a list of nodes or the whole cluster.
......@@ -268,7 +268,7 @@ class DhtNetwork(object):
def log(*to_print):
BOLD = "\033[1m"
NORMAL = "\033[0m"
print('%s[DhtNetwork-%s]%s' % (BOLD, DhtNetwork.iface, NORMAL), ':' , *to_print, file=sys.stderr)
print('%s[DhtNetwork-%s]%s' % (BOLD, DhtNetwork.iface, NORMAL), *to_print, file=sys.stderr)
@staticmethod
def run_node(ip4, ip6, p, bootstrap=[], is_bootstrap=False):
......
......@@ -27,8 +27,22 @@ bit_format = None
Kbit_format = FuncFormatter(lambda x, pos: '%1.1f' % (x*1024**-1) + 'Kb')
Mbit_format = FuncFormatter(lambda x, pos: '%1.1f' % (x*1024**-2) + 'Mb')
def random_str_val(size=1024):
"""Creates a random string value of specified size.
@param size: Size, in bytes, of the value.
@type size: int
@return: Random string value
@rtype : str
"""
return ''.join(random.choice(string.hexdigits) for _ in range(size))
def random_hash():
return InfoHash(''.join(random.SystemRandom().choice(string.hexdigits) for _ in range(40)).encode())
"""Creates random InfoHash.
"""
return InfoHash(random_str_val(size=40).encode())
def reset_before_test(featureTestMethod):
"""
......@@ -204,9 +218,9 @@ class DhtFeatureTest(FeatureTest):
@staticmethod
def putDoneCb(ok, nodes):
with FeatureTest.lock:
if not ok:
DhtNetwork.log("[PUT]: failed!")
with FeatureTest.lock:
FeatureTest.done -= 1
FeatureTest.lock.notify()
......@@ -332,7 +346,7 @@ class PersistenceTest(DhtFeatureTest):
DhtNetwork.log('Dumping all storage log from '\
'hosting nodes.')
for proc in self._workbench.procs:
proc.sendNodesRequest(DhtNetworkSubProcess.DUMP_STORAGE_REQ, DhtFeatureTest.foreignNodes)
proc.sendClusterRequest(DhtNetworkSubProcess.DUMP_STORAGE_REQ, DhtFeatureTest.foreignNodes)
else:
DhtNetwork.log("Values didn't reach new hosting nodes after shutdown.")
......@@ -369,55 +383,58 @@ class PersistenceTest(DhtFeatureTest):
trigger_nodes = []
wb = self._workbench
bootstrap = self.bootstrap
# Value representing an ICE packet. Each ICE packet is around 1KB.
VALUE_SIZE = 1024
NUM_VALUES = self._num_values/wb.node_num if self._num_values else 5
nr_values = NUM_VALUES * wb.node_num
num_values_per_hash = self._num_values/wb.node_num if self._num_values else 5
# nodes and values counters
total_nr_values = num_values_per_hash * wb.node_num
nr_nodes = wb.node_num
nr_nodes_cv = threading.Condition()
values = [b''.join(random.choice(string.hexdigits).encode() for _ in range(VALUE_SIZE)) for __ in range(NUM_VALUES)]
# values string in string format. Used for sending cluster request.
hashes = [random_hash() for _ in range(wb.node_num)]
# initial set of values
i = 0
for h in hashes:
# TODO: bloque dans le put??
self._dhtPut(bootstrap.front(), h, *[Value(v) for v in values])
print("at: ", i)
i += 1
self._dhtPut(bootstrap.front(), h, *[Value(random_str_val(size=VALUE_SIZE).encode())
for _ in range(num_values_per_hash)])
def normalBehavior(do, t, log=None):
nonlocal nr_values
nonlocal total_nr_values
while True:
do()
time.sleep(random.choice(range(t)))
def putRequest():
nonlocal hashes, values, nr_values
nr_values += 1
DhtNetwork.log("Random value put on the DHT.", "(now "+ str(nr_values)+" values on the dht)")
random.choice(wb.procs).sendNodePutRequest(random.choice(hashes).toString(), random.choice(values))
nonlocal hashes, VALUE_SIZE, total_nr_values
total_nr_values += 1
DhtNetwork.log("Random value put on the DHT.", "(now "+ str(total_nr_values)+" values on the dht)")
random.choice(wb.procs).sendClusterPutRequest(random.choice(hashes).toString(),
random_str_val(size=VALUE_SIZE))
puts = threading.Thread(target=normalBehavior, args=(putRequest, 30))
puts.daemon = True
puts.start()
def newNodeRequest():
nonlocal nr_nodes
with nr_nodes_cv:
nr_nodes += 1
DhtNetwork.log("Node joining the DHT.", "(now "+str(nr_nodes)+" nodes on the dht)")
nr_nodes_cv.notify()
random.choice(wb.procs).sendNodesRequest(DhtNetworkSubProcess.NEW_NODE_REQ)
random.choice(wb.procs).sendClusterRequest(DhtNetworkSubProcess.NEW_NODE_REQ)
connections = threading.Thread(target=normalBehavior, args=(newNodeRequest, 1*60))
connections.daemon = True
connections.start()
def shutdownNodeRequest():
nonlocal nr_nodes
with nr_nodes_cv:
nr_nodes -= 1
DhtNetwork.log("Node quitting the DHT.", "(now "+str(nr_nodes)+" nodes on the dht)")
nr_nodes_cv.notify()
random.choice(wb.procs).sendNodesRequest(DhtNetworkSubProcess.SHUTDOWN_NODE_REQ)
random.choice(wb.procs).sendClusterRequest(DhtNetworkSubProcess.SHUTDOWN_NODE_REQ)
shutdowns = threading.Thread(target=normalBehavior, args=(shutdownNodeRequest, 1*60))
shutdowns.daemon = True
shutdowns.start()
......@@ -472,7 +489,7 @@ class PersistenceTest(DhtFeatureTest):
cluster_ops_count = 0
for proc in self._workbench.procs:
DhtNetwork.log('[REMOVE]: sending shutdown request to', proc)
proc.sendNodesRequest(
proc.sendClusterRequest(
DhtNetworkSubProcess.SHUTDOWN_NODE_REQ,
DhtFeatureTest.foreignNodes
)
......@@ -521,7 +538,7 @@ class PersistenceTest(DhtFeatureTest):
i = random.randint(0, len(self._workbench.procs)-1)
proc = self._workbench.procs[i]
DhtNetwork.log('Replacing', proc)
proc.sendNodesRequest(DhtNetworkSubProcess.SHUTDOWN_CLUSTER_REQ)
proc.sendClusterRequest(DhtNetworkSubProcess.SHUTDOWN_CLUSTER_REQ)
self._workbench.stop_cluster(i)
self._workbench.start_cluster(i)
......@@ -550,7 +567,7 @@ class PersistenceTest(DhtFeatureTest):
# Generating considerable amount of values of size 1KB.
VALUE_SIZE = 1024
NUM_VALUES = self._num_values if self._num_values else 50
values = [Value(''.join(random.choice(string.hexdigits) for _ in range(VALUE_SIZE)).encode()) for _ in range(NUM_VALUES)]
values = [Value(random_str_val(size=VALUE_SIZE).encode()) for _ in range(NUM_VALUES)]
bootstrap.resize(N_PRODUCERS+2)
consumer = bootstrap.get(N_PRODUCERS+1)
......@@ -578,7 +595,7 @@ class PersistenceTest(DhtFeatureTest):
DhtNetwork.log('Deleting old nodes from previous search.')
for proc in self._workbench.procs:
DhtNetwork.log('[REMOVE]: sending delete request to', proc)
proc.sendNodesRequest(
proc.sendClusterRequest(
DhtNetworkSubProcess.REMOVE_NODE_REQ,
nodes)
......@@ -755,7 +772,7 @@ class PerformanceTest(DhtFeatureTest):
DhtNetwork.log('Removing all nodes hosting target values...')
for proc in self._workbench.procs:
DhtNetwork.log('[REMOVE]: sending shutdown request to', proc)
proc.sendNodesRequest(
proc.sendClusterRequest(
DhtNetworkSubProcess.SHUTDOWN_NODE_REQ,
DhtFeatureTest.foreignNodes
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment