diff --git a/python/tools/benchmark.py b/python/tools/benchmark.py index 9bf18468dd63ff2cbe1038e8a9354d7ac7f34421..eb1064eb32b9fe7a436e325e50c05a379ff74f2e 100755 --- a/python/tools/benchmark.py +++ b/python/tools/benchmark.py @@ -161,7 +161,7 @@ if __name__ == '__main__': help='Launches data persistence benchmark test. '\ 'Available args for "-t" are: delete, replace, mult_time. '\ 'Available args for "-o" are : dump_str_log, keep_alive, trigger, traffic_plot, op_plot. '\ - 'Use "-m" to specify the number of producers on the DHT.'\ + 'Use "-m" to specify the number of producers on the DHT. '\ 'Use "-e" to specify the number of values to put on the DHT.') args = parser.parse_args() diff --git a/python/tools/dht/network.py b/python/tools/dht/network.py index 0d186c2e3bfb3775e6168224d37b7317f8f1e03a..fd340f35f1ce2079e76266e6b8042c9cb470d714 100755 --- a/python/tools/dht/network.py +++ b/python/tools/dht/network.py @@ -231,7 +231,7 @@ class DhtNetworkSubProcess(NSPopen): self._sendRequest(DhtNetworkSubProcess.MESSAGE_STATS, answer_cb=cb) return stats - def sendNodePutRequest(self, _hash, value): + def sendClusterPutRequest(self, _hash, value): """ Sends a put operation request. @@ -243,7 +243,7 @@ class DhtNetworkSubProcess(NSPopen): self._sendRequest(b_space_join(DhtNetworkSubProcess.NODE_PUT_REQ, _hash, value)) - def sendNodesRequest(self, request, ids=b''): + def sendClusterRequest(self, request, ids=b''): """ Send request to a list of nodes or the whole cluster. @@ -268,7 +268,7 @@ class DhtNetwork(object): def log(*to_print): BOLD = "\033[1m" NORMAL = "\033[0m" - print('%s[DhtNetwork-%s]%s' % (BOLD, DhtNetwork.iface, NORMAL), ':' , *to_print, file=sys.stderr) + print('%s[DhtNetwork-%s]%s' % (BOLD, DhtNetwork.iface, NORMAL), *to_print, file=sys.stderr) @staticmethod def run_node(ip4, ip6, p, bootstrap=[], is_bootstrap=False): diff --git a/python/tools/dht/tests.py b/python/tools/dht/tests.py index 7b23a9319151d6dba588b9ac830bcda0ed7d1add..436eab198bd496c2ff980f69882386e4719f118f 100644 --- a/python/tools/dht/tests.py +++ b/python/tools/dht/tests.py @@ -27,8 +27,22 @@ bit_format = None Kbit_format = FuncFormatter(lambda x, pos: '%1.1f' % (x*1024**-1) + 'Kb') Mbit_format = FuncFormatter(lambda x, pos: '%1.1f' % (x*1024**-2) + 'Mb') +def random_str_val(size=1024): + """Creates a random string value of specified size. + + @param size: Size, in bytes, of the value. + @type size: int + + @return: Random string value + @rtype : str + """ + return ''.join(random.choice(string.hexdigits) for _ in range(size)) + + def random_hash(): - return InfoHash(''.join(random.SystemRandom().choice(string.hexdigits) for _ in range(40)).encode()) + """Creates random InfoHash. + """ + return InfoHash(random_str_val(size=40).encode()) def reset_before_test(featureTestMethod): """ @@ -204,9 +218,9 @@ class DhtFeatureTest(FeatureTest): @staticmethod def putDoneCb(ok, nodes): - if not ok: - DhtNetwork.log("[PUT]: failed!") with FeatureTest.lock: + if not ok: + DhtNetwork.log("[PUT]: failed!") FeatureTest.done -= 1 FeatureTest.lock.notify() @@ -332,7 +346,7 @@ class PersistenceTest(DhtFeatureTest): DhtNetwork.log('Dumping all storage log from '\ 'hosting nodes.') for proc in self._workbench.procs: - proc.sendNodesRequest(DhtNetworkSubProcess.DUMP_STORAGE_REQ, DhtFeatureTest.foreignNodes) + proc.sendClusterRequest(DhtNetworkSubProcess.DUMP_STORAGE_REQ, DhtFeatureTest.foreignNodes) else: DhtNetwork.log("Values didn't reach new hosting nodes after shutdown.") @@ -369,55 +383,58 @@ class PersistenceTest(DhtFeatureTest): trigger_nodes = [] wb = self._workbench bootstrap = self.bootstrap + # Value representing an ICE packet. Each ICE packet is around 1KB. VALUE_SIZE = 1024 - NUM_VALUES = self._num_values/wb.node_num if self._num_values else 5 - nr_values = NUM_VALUES * wb.node_num + num_values_per_hash = self._num_values/wb.node_num if self._num_values else 5 + + # nodes and values counters + total_nr_values = num_values_per_hash * wb.node_num nr_nodes = wb.node_num nr_nodes_cv = threading.Condition() - values = [b''.join(random.choice(string.hexdigits).encode() for _ in range(VALUE_SIZE)) for __ in range(NUM_VALUES)] + # values string in string format. Used for sending cluster request. hashes = [random_hash() for _ in range(wb.node_num)] # initial set of values - i = 0 for h in hashes: - # TODO: bloque dans le put?? - self._dhtPut(bootstrap.front(), h, *[Value(v) for v in values]) - print("at: ", i) - i += 1 + self._dhtPut(bootstrap.front(), h, *[Value(random_str_val(size=VALUE_SIZE).encode()) + for _ in range(num_values_per_hash)]) def normalBehavior(do, t, log=None): - nonlocal nr_values + nonlocal total_nr_values while True: do() time.sleep(random.choice(range(t))) def putRequest(): - nonlocal hashes, values, nr_values - nr_values += 1 - DhtNetwork.log("Random value put on the DHT.", "(now "+ str(nr_values)+" values on the dht)") - random.choice(wb.procs).sendNodePutRequest(random.choice(hashes).toString(), random.choice(values)) + nonlocal hashes, VALUE_SIZE, total_nr_values + total_nr_values += 1 + DhtNetwork.log("Random value put on the DHT.", "(now "+ str(total_nr_values)+" values on the dht)") + random.choice(wb.procs).sendClusterPutRequest(random.choice(hashes).toString(), + random_str_val(size=VALUE_SIZE)) puts = threading.Thread(target=normalBehavior, args=(putRequest, 30)) puts.daemon = True puts.start() + def newNodeRequest(): nonlocal nr_nodes with nr_nodes_cv: nr_nodes += 1 DhtNetwork.log("Node joining the DHT.", "(now "+str(nr_nodes)+" nodes on the dht)") nr_nodes_cv.notify() - random.choice(wb.procs).sendNodesRequest(DhtNetworkSubProcess.NEW_NODE_REQ) + random.choice(wb.procs).sendClusterRequest(DhtNetworkSubProcess.NEW_NODE_REQ) connections = threading.Thread(target=normalBehavior, args=(newNodeRequest, 1*60)) connections.daemon = True connections.start() + def shutdownNodeRequest(): nonlocal nr_nodes with nr_nodes_cv: nr_nodes -= 1 DhtNetwork.log("Node quitting the DHT.", "(now "+str(nr_nodes)+" nodes on the dht)") nr_nodes_cv.notify() - random.choice(wb.procs).sendNodesRequest(DhtNetworkSubProcess.SHUTDOWN_NODE_REQ) + random.choice(wb.procs).sendClusterRequest(DhtNetworkSubProcess.SHUTDOWN_NODE_REQ) shutdowns = threading.Thread(target=normalBehavior, args=(shutdownNodeRequest, 1*60)) shutdowns.daemon = True shutdowns.start() @@ -472,7 +489,7 @@ class PersistenceTest(DhtFeatureTest): cluster_ops_count = 0 for proc in self._workbench.procs: DhtNetwork.log('[REMOVE]: sending shutdown request to', proc) - proc.sendNodesRequest( + proc.sendClusterRequest( DhtNetworkSubProcess.SHUTDOWN_NODE_REQ, DhtFeatureTest.foreignNodes ) @@ -521,7 +538,7 @@ class PersistenceTest(DhtFeatureTest): i = random.randint(0, len(self._workbench.procs)-1) proc = self._workbench.procs[i] DhtNetwork.log('Replacing', proc) - proc.sendNodesRequest(DhtNetworkSubProcess.SHUTDOWN_CLUSTER_REQ) + proc.sendClusterRequest(DhtNetworkSubProcess.SHUTDOWN_CLUSTER_REQ) self._workbench.stop_cluster(i) self._workbench.start_cluster(i) @@ -550,7 +567,7 @@ class PersistenceTest(DhtFeatureTest): # Generating considerable amount of values of size 1KB. VALUE_SIZE = 1024 NUM_VALUES = self._num_values if self._num_values else 50 - values = [Value(''.join(random.choice(string.hexdigits) for _ in range(VALUE_SIZE)).encode()) for _ in range(NUM_VALUES)] + values = [Value(random_str_val(size=VALUE_SIZE).encode()) for _ in range(NUM_VALUES)] bootstrap.resize(N_PRODUCERS+2) consumer = bootstrap.get(N_PRODUCERS+1) @@ -578,7 +595,7 @@ class PersistenceTest(DhtFeatureTest): DhtNetwork.log('Deleting old nodes from previous search.') for proc in self._workbench.procs: DhtNetwork.log('[REMOVE]: sending delete request to', proc) - proc.sendNodesRequest( + proc.sendClusterRequest( DhtNetworkSubProcess.REMOVE_NODE_REQ, nodes) @@ -755,7 +772,7 @@ class PerformanceTest(DhtFeatureTest): DhtNetwork.log('Removing all nodes hosting target values...') for proc in self._workbench.procs: DhtNetwork.log('[REMOVE]: sending shutdown request to', proc) - proc.sendNodesRequest( + proc.sendClusterRequest( DhtNetworkSubProcess.SHUTDOWN_NODE_REQ, DhtFeatureTest.foreignNodes )