From a608ad3cf65b83f9c0722c2755a6244eace48746 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20D=C3=A9saulniers?= <sim.desaulniers@gmail.com> Date: Tue, 23 Aug 2016 17:56:18 -0400 Subject: [PATCH] benchmark: send new distinct values during test During totallyNormalTest, an initial set of values would be put on the dht. Then, random values from this same set would be put over time, but since values would already be there, no put were really made. --- python/tools/benchmark.py | 2 +- python/tools/dht/network.py | 6 ++-- python/tools/dht/tests.py | 65 +++++++++++++++++++++++-------------- 3 files changed, 45 insertions(+), 28 deletions(-) diff --git a/python/tools/benchmark.py b/python/tools/benchmark.py index 9bf18468..eb1064eb 100755 --- a/python/tools/benchmark.py +++ b/python/tools/benchmark.py @@ -161,7 +161,7 @@ if __name__ == '__main__': help='Launches data persistence benchmark test. '\ 'Available args for "-t" are: delete, replace, mult_time. '\ 'Available args for "-o" are : dump_str_log, keep_alive, trigger, traffic_plot, op_plot. '\ - 'Use "-m" to specify the number of producers on the DHT.'\ + 'Use "-m" to specify the number of producers on the DHT. '\ 'Use "-e" to specify the number of values to put on the DHT.') args = parser.parse_args() diff --git a/python/tools/dht/network.py b/python/tools/dht/network.py index 0d186c2e..fd340f35 100755 --- a/python/tools/dht/network.py +++ b/python/tools/dht/network.py @@ -231,7 +231,7 @@ class DhtNetworkSubProcess(NSPopen): self._sendRequest(DhtNetworkSubProcess.MESSAGE_STATS, answer_cb=cb) return stats - def sendNodePutRequest(self, _hash, value): + def sendClusterPutRequest(self, _hash, value): """ Sends a put operation request. @@ -243,7 +243,7 @@ class DhtNetworkSubProcess(NSPopen): self._sendRequest(b_space_join(DhtNetworkSubProcess.NODE_PUT_REQ, _hash, value)) - def sendNodesRequest(self, request, ids=b''): + def sendClusterRequest(self, request, ids=b''): """ Send request to a list of nodes or the whole cluster. @@ -268,7 +268,7 @@ class DhtNetwork(object): def log(*to_print): BOLD = "\033[1m" NORMAL = "\033[0m" - print('%s[DhtNetwork-%s]%s' % (BOLD, DhtNetwork.iface, NORMAL), ':' , *to_print, file=sys.stderr) + print('%s[DhtNetwork-%s]%s' % (BOLD, DhtNetwork.iface, NORMAL), *to_print, file=sys.stderr) @staticmethod def run_node(ip4, ip6, p, bootstrap=[], is_bootstrap=False): diff --git a/python/tools/dht/tests.py b/python/tools/dht/tests.py index 7b23a931..436eab19 100644 --- a/python/tools/dht/tests.py +++ b/python/tools/dht/tests.py @@ -27,8 +27,22 @@ bit_format = None Kbit_format = FuncFormatter(lambda x, pos: '%1.1f' % (x*1024**-1) + 'Kb') Mbit_format = FuncFormatter(lambda x, pos: '%1.1f' % (x*1024**-2) + 'Mb') +def random_str_val(size=1024): + """Creates a random string value of specified size. + + @param size: Size, in bytes, of the value. + @type size: int + + @return: Random string value + @rtype : str + """ + return ''.join(random.choice(string.hexdigits) for _ in range(size)) + + def random_hash(): - return InfoHash(''.join(random.SystemRandom().choice(string.hexdigits) for _ in range(40)).encode()) + """Creates random InfoHash. + """ + return InfoHash(random_str_val(size=40).encode()) def reset_before_test(featureTestMethod): """ @@ -204,9 +218,9 @@ class DhtFeatureTest(FeatureTest): @staticmethod def putDoneCb(ok, nodes): - if not ok: - DhtNetwork.log("[PUT]: failed!") with FeatureTest.lock: + if not ok: + DhtNetwork.log("[PUT]: failed!") FeatureTest.done -= 1 FeatureTest.lock.notify() @@ -332,7 +346,7 @@ class PersistenceTest(DhtFeatureTest): DhtNetwork.log('Dumping all storage log from '\ 'hosting nodes.') for proc in self._workbench.procs: - proc.sendNodesRequest(DhtNetworkSubProcess.DUMP_STORAGE_REQ, DhtFeatureTest.foreignNodes) + proc.sendClusterRequest(DhtNetworkSubProcess.DUMP_STORAGE_REQ, DhtFeatureTest.foreignNodes) else: DhtNetwork.log("Values didn't reach new hosting nodes after shutdown.") @@ -369,55 +383,58 @@ class PersistenceTest(DhtFeatureTest): trigger_nodes = [] wb = self._workbench bootstrap = self.bootstrap + # Value representing an ICE packet. Each ICE packet is around 1KB. VALUE_SIZE = 1024 - NUM_VALUES = self._num_values/wb.node_num if self._num_values else 5 - nr_values = NUM_VALUES * wb.node_num + num_values_per_hash = self._num_values/wb.node_num if self._num_values else 5 + + # nodes and values counters + total_nr_values = num_values_per_hash * wb.node_num nr_nodes = wb.node_num nr_nodes_cv = threading.Condition() - values = [b''.join(random.choice(string.hexdigits).encode() for _ in range(VALUE_SIZE)) for __ in range(NUM_VALUES)] + # values string in string format. Used for sending cluster request. hashes = [random_hash() for _ in range(wb.node_num)] # initial set of values - i = 0 for h in hashes: - # TODO: bloque dans le put?? - self._dhtPut(bootstrap.front(), h, *[Value(v) for v in values]) - print("at: ", i) - i += 1 + self._dhtPut(bootstrap.front(), h, *[Value(random_str_val(size=VALUE_SIZE).encode()) + for _ in range(num_values_per_hash)]) def normalBehavior(do, t, log=None): - nonlocal nr_values + nonlocal total_nr_values while True: do() time.sleep(random.choice(range(t))) def putRequest(): - nonlocal hashes, values, nr_values - nr_values += 1 - DhtNetwork.log("Random value put on the DHT.", "(now "+ str(nr_values)+" values on the dht)") - random.choice(wb.procs).sendNodePutRequest(random.choice(hashes).toString(), random.choice(values)) + nonlocal hashes, VALUE_SIZE, total_nr_values + total_nr_values += 1 + DhtNetwork.log("Random value put on the DHT.", "(now "+ str(total_nr_values)+" values on the dht)") + random.choice(wb.procs).sendClusterPutRequest(random.choice(hashes).toString(), + random_str_val(size=VALUE_SIZE)) puts = threading.Thread(target=normalBehavior, args=(putRequest, 30)) puts.daemon = True puts.start() + def newNodeRequest(): nonlocal nr_nodes with nr_nodes_cv: nr_nodes += 1 DhtNetwork.log("Node joining the DHT.", "(now "+str(nr_nodes)+" nodes on the dht)") nr_nodes_cv.notify() - random.choice(wb.procs).sendNodesRequest(DhtNetworkSubProcess.NEW_NODE_REQ) + random.choice(wb.procs).sendClusterRequest(DhtNetworkSubProcess.NEW_NODE_REQ) connections = threading.Thread(target=normalBehavior, args=(newNodeRequest, 1*60)) connections.daemon = True connections.start() + def shutdownNodeRequest(): nonlocal nr_nodes with nr_nodes_cv: nr_nodes -= 1 DhtNetwork.log("Node quitting the DHT.", "(now "+str(nr_nodes)+" nodes on the dht)") nr_nodes_cv.notify() - random.choice(wb.procs).sendNodesRequest(DhtNetworkSubProcess.SHUTDOWN_NODE_REQ) + random.choice(wb.procs).sendClusterRequest(DhtNetworkSubProcess.SHUTDOWN_NODE_REQ) shutdowns = threading.Thread(target=normalBehavior, args=(shutdownNodeRequest, 1*60)) shutdowns.daemon = True shutdowns.start() @@ -472,7 +489,7 @@ class PersistenceTest(DhtFeatureTest): cluster_ops_count = 0 for proc in self._workbench.procs: DhtNetwork.log('[REMOVE]: sending shutdown request to', proc) - proc.sendNodesRequest( + proc.sendClusterRequest( DhtNetworkSubProcess.SHUTDOWN_NODE_REQ, DhtFeatureTest.foreignNodes ) @@ -521,7 +538,7 @@ class PersistenceTest(DhtFeatureTest): i = random.randint(0, len(self._workbench.procs)-1) proc = self._workbench.procs[i] DhtNetwork.log('Replacing', proc) - proc.sendNodesRequest(DhtNetworkSubProcess.SHUTDOWN_CLUSTER_REQ) + proc.sendClusterRequest(DhtNetworkSubProcess.SHUTDOWN_CLUSTER_REQ) self._workbench.stop_cluster(i) self._workbench.start_cluster(i) @@ -550,7 +567,7 @@ class PersistenceTest(DhtFeatureTest): # Generating considerable amount of values of size 1KB. VALUE_SIZE = 1024 NUM_VALUES = self._num_values if self._num_values else 50 - values = [Value(''.join(random.choice(string.hexdigits) for _ in range(VALUE_SIZE)).encode()) for _ in range(NUM_VALUES)] + values = [Value(random_str_val(size=VALUE_SIZE).encode()) for _ in range(NUM_VALUES)] bootstrap.resize(N_PRODUCERS+2) consumer = bootstrap.get(N_PRODUCERS+1) @@ -578,7 +595,7 @@ class PersistenceTest(DhtFeatureTest): DhtNetwork.log('Deleting old nodes from previous search.') for proc in self._workbench.procs: DhtNetwork.log('[REMOVE]: sending delete request to', proc) - proc.sendNodesRequest( + proc.sendClusterRequest( DhtNetworkSubProcess.REMOVE_NODE_REQ, nodes) @@ -755,7 +772,7 @@ class PerformanceTest(DhtFeatureTest): DhtNetwork.log('Removing all nodes hosting target values...') for proc in self._workbench.procs: DhtNetwork.log('[REMOVE]: sending shutdown request to', proc) - proc.sendNodesRequest( + proc.sendClusterRequest( DhtNetworkSubProcess.SHUTDOWN_NODE_REQ, DhtFeatureTest.foreignNodes ) -- GitLab