Skip to content
Snippets Groups Projects
Commit ddc7b2fb authored by Simon Désaulniers's avatar Simon Désaulniers Committed by Adrien Béraud
Browse files

benchmark refactor for manual cluster start

parent ac08beb5
No related branches found
No related tags found
No related merge requests found
......@@ -11,34 +11,77 @@ from dhtnetwork import DhtNetwork
sys.path.append('..')
from opendht import *
def start_cluster(i):
global procs
cmd = ["python3", "dhtnetwork.py", "-n", str(node_per_loc), '-I', args.ifname+str(i)+'.1']
if args.bootstrap:
cmd.extend(['-b', args.bootstrap, '-bp', "5000"])
class WorkBench():
"""docstring for WorkBench"""
def __init__(self, ifname='ethdht', virtual_locs=8, node_num=32, remote_bootstrap=None, loss=0, delay=0, disable_ipv4=False,
disable_ipv6=False):
self.ifname = ifname
self.virtual_locs = virtual_locs
self.node_num = node_num
self.clusters = min(virtual_locs, node_num)
self.node_per_loc = int(self.node_num / self.clusters)
self.loss = loss
self.delay = delay
self.disable_ipv4 = disable_ipv4
self.disable_ipv6 = disable_ipv6
self.remote_bootstrap = remote_bootstrap
self.local_bootstrap = None
self.procs = [None for _ in range(self.clusters)]
def get_bootstrap(self):
if not self.local_bootstrap:
self.local_bootstrap = DhtNetwork(iface='br'+self.ifname,
first_bootstrap=False if self.remote_bootstrap else True,
bootstrap=[(self.remote_bootstrap, "5000")] if self.remote_bootstrap else [])
return self.local_bootstrap
def create_virtual_net(self):
if self.virtual_locs > 1:
cmd = ["python3", "virtual_network_builder.py", "-i", self.ifname, "-n", str(self.clusters), '-l', str(self.loss), '-d', str(self.delay)]
if not self.disable_ipv4:
cmd.append('-4')
if not self.disable_ipv6:
cmd.append('-6')
print(cmd)
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
output, err = p.communicate()
print(output.decode())
def destroy_virtual_net(self):
print('Shuting down the virtual IP network.')
subprocess.call(["python3", "virtual_network_builder.py", "-i", self.ifname, "-n", str(self.clusters), "-r"])
def start_cluster(self, i):
if self.local_bootstrap:
cmd = ["python3", "dhtnetwork.py", "-n", str(self.node_per_loc), '-I', self.ifname+str(i)+'.1']
if self.remote_bootstrap:
cmd.extend(['-b', self.remote_bootstrap, '-bp', "5000"])
else:
if not args.disable_ipv4 and bootstrap.ip4:
cmd.extend(['-b', bootstrap.ip4])
if not args.disable_ipv6 and bootstrap.ip6:
cmd.extend(['-b6', bootstrap.ip6])
procs[i] = NSPopen('node'+str(i), cmd)
#plt.pause(2)
def stop_cluster(i):
global procs
if procs[i]:
if not self.disable_ipv4 and self.local_bootstrap.ip4:
cmd.extend(['-b', self.local_bootstrap.ip4])
if not self.disable_ipv6 and self.local_bootstrap.ip6:
cmd.extend(['-b6', self.local_bootstrap.ip6])
self.procs[i] = NSPopen('node'+str(i), cmd)
else:
raise Exception('First create bootstrap.')
def stop_cluster(self, i):
if self.procs[i]:
try:
procs[i].send_signal(signal.SIGINT);
procs[i].wait()
procs[i].release()
self.procs[i].send_signal(signal.SIGINT);
self.procs[i].wait()
self.procs[i].release()
except Exception as e:
print(e)
procs[i] = None
self.procs[i] = None
def replace_cluster(self):
n = random.randrange(0, self.clusters)
self.stop_cluster(n)
self.start_cluster(n)
def replace_cluster():
n = random.randrange(0, clusters)
stop_cluster(n)
start_cluster(n)
def getsTimesTest():
"""TODO: Docstring for
......@@ -102,14 +145,14 @@ def getsTimesTest():
times = []
for n in range(10):
replace_cluster()
wb.replace_cluster()
plt.pause(2)
print("Getting 50 random hashes succesively.")
for i in range(50):
with lock:
done += 1
start = time.time()
bootstrap.front().get(PyInfoHash.getRandom(), getcb, donecb)
bootstrap.front().get(InfoHash.getRandom(), getcb, donecb)
while done > 0:
lock.wait()
update_plot()
......@@ -140,30 +183,18 @@ if __name__ == '__main__':
print('No test specified... Quitting.', file=sys.stderr)
sys.exit(1)
clusters = min(args.virtual_locs, args.node_num)
node_per_loc = int(args.node_num / clusters)
print("Launching", args.node_num, "nodes (", clusters, "clusters of", node_per_loc, "nodes)")
wb = WorkBench(args.ifname, args.virtual_locs, args.node_num, loss=args.loss,
delay=args.delay, disable_ipv4=args.disable_ipv4,
disable_ipv6=args.disable_ipv6)
wb.create_virtual_net()
if args.virtual_locs > 1:
cmd = ["python3", "virtual_network_builder.py", "-i", args.ifname, "-n", str(clusters), '-l', str(args.loss), '-d', str(args.delay)]
if not args.disable_ipv4:
cmd.append('-4')
if not args.disable_ipv6:
cmd.append('-6')
print(cmd)
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
output, err = p.communicate()
print(output.decode())
bootstrap = DhtNetwork(iface='br'+args.ifname, first_bootstrap=False if args.bootstrap else True, bootstrap=[(args.bootstrap, "5000")] if args.bootstrap else [])
bootstrap = wb.get_bootstrap()
bootstrap.resize(1)
procs = [None for _ in range(clusters)]
print("Launching", wb.node_num, "nodes (", wb.clusters, "clusters of", wb.node_per_loc, "nodes)")
try:
for i in range(clusters):
start_cluster(i)
for i in range(wb.clusters):
wb.start_cluster(i)
if args.gets:
getsTimesTest()
......@@ -171,13 +202,12 @@ if __name__ == '__main__':
except Exception as e:
print(e)
finally:
for p in procs:
for p in wb.procs:
if p:
p.send_signal(signal.SIGINT);
bootstrap.resize(0)
print('Shuting down the virtual IP network.')
subprocess.call(["python3", "virtual_network_builder.py", "-i", args.ifname, "-n", str(clusters), "-r"])
for p in procs:
wb.destroy_virtual_net()
for p in wb.procs:
if p:
try:
p.wait()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment