diff --git a/python/tools/dht/tests.py b/python/tools/dht/tests.py index 287c5ed2aa5c9a00f502d6075af7820340939725..f1d2e70c17fb484c2fe287eb8c75141186513ca4 100644 --- a/python/tools/dht/tests.py +++ b/python/tools/dht/tests.py @@ -389,59 +389,56 @@ class PersistenceTest(DhtFeatureTest): num_values_per_hash = self._num_values/wb.node_num if self._num_values else 5 # nodes and values counters - total_nr_values = num_values_per_hash * wb.node_num + total_nr_values = 0 nr_nodes = wb.node_num nr_nodes_cv = threading.Condition() + op_cv = threading.Condition() + # values string in string format. Used for sending cluster request. hashes = [random_hash() for _ in range(wb.node_num)] - # initial set of values - for h in hashes: - self._dhtPut(bootstrap.front(), h, *[Value(random_str_val(size=VALUE_SIZE).encode()) - for _ in range(num_values_per_hash)]) - - def normalBehavior(do, t, log=None): - nonlocal total_nr_values + def normalBehavior(do, t): + nonlocal total_nr_values, op_cv while True: - do() - time.sleep(random.choice(range(t))) + # makes sure we receive our response from the subprocess and + # avoids using transaction numbers + with op_cv: + do() + time.sleep(random.uniform(0.0, float(t))) def putRequest(): nonlocal hashes, VALUE_SIZE, total_nr_values total_nr_values += 1 DhtNetwork.log("Random value put on the DHT.", "(now "+ str(total_nr_values)+" values on the dht)") random.choice(wb.procs).sendClusterPutRequest(random.choice(hashes).toString(), - random_str_val(size=VALUE_SIZE)) - puts = threading.Thread(target=normalBehavior, args=(putRequest, 30)) + random_str_val(size=VALUE_SIZE).encode()) + puts = threading.Thread(target=normalBehavior, args=(putRequest, 30.0/wb.node_num)) puts.daemon = True puts.start() def newNodeRequest(): - nonlocal nr_nodes + nonlocal nr_nodes, nr_nodes_cv with nr_nodes_cv: nr_nodes += 1 DhtNetwork.log("Node joining the DHT.", "(now "+str(nr_nodes)+" nodes on the dht)") nr_nodes_cv.notify() random.choice(wb.procs).sendClusterRequest(DhtNetworkSubProcess.NEW_NODE_REQ) - connections = threading.Thread(target=normalBehavior, args=(newNodeRequest, 1*60)) + connections = threading.Thread(target=normalBehavior, args=(newNodeRequest, 1*50.0/wb.node_num)) connections.daemon = True connections.start() def shutdownNodeRequest(): - nonlocal nr_nodes + nonlocal nr_nodes, nr_nodes_cv with nr_nodes_cv: nr_nodes -= 1 DhtNetwork.log("Node quitting the DHT.", "(now "+str(nr_nodes)+" nodes on the dht)") nr_nodes_cv.notify() random.choice(wb.procs).sendClusterRequest(DhtNetworkSubProcess.SHUTDOWN_NODE_REQ) - shutdowns = threading.Thread(target=normalBehavior, args=(shutdownNodeRequest, 1*60)) + shutdowns = threading.Thread(target=normalBehavior, args=(shutdownNodeRequest, 1*60.0/wb.node_num)) shutdowns.daemon = True shutdowns.start() - for h in hashes: - self._trigger_dp(trigger_nodes, h) - if self._traffic_plot: display_traffic_plot('br'+wb.ifname) else: