From 9b3f4a27e59f77a650643ecdc9a876723b2aa67f Mon Sep 17 00:00:00 2001 From: Eloi BAIL <eloi.bail@savoirfairelinux.com> Date: Wed, 16 Sep 2015 09:44:31 -0400 Subject: [PATCH] tools: refactor automatic call test tool Refactor of automatic call test tool Now do ./dringctrl --help : all test name will be listed ./dringctrl --test <testName> to execute a test Issue: #80498 Change-Id: I86d16e7d96f04f040133f16856ba9ca8a061374d --- tools/dringctrl/controler.py | 12 ++ tools/dringctrl/dringctrl.py | 18 +-- tools/dringctrl/tester.py | 211 +++++++++++++++++++++++++---------- 3 files changed, 166 insertions(+), 75 deletions(-) diff --git a/tools/dringctrl/controler.py b/tools/dringctrl/controler.py index e71b2c46b6..e719219302 100644 --- a/tools/dringctrl/controler.py +++ b/tools/dringctrl/controler.py @@ -457,6 +457,13 @@ class DRingCtrl(Thread): account = self._valid_account(account) return [int(x) for x in self.configurationmanager.getActiveCodecList(account)] + def setCodecBitrate(self, account, bitrate): + """ Change bitrate for all codecs on given account""" + + for codecId in self.configurationmanager.getActiveCodecList(account): + details = self.configurationmanager.getCodecDetails(account, codecId) + details['CodecInfo.bitrate'] = str(bitrate) + self.configurationmanager.setCodecDetails(account, codecId, details) # # Call management # @@ -599,6 +606,11 @@ class DRingCtrl(Thread): self.callmanager.hangUpConference(confId) + def switchInput(self, callid, inputName): + """switch to input if exist""" + + return self.callmanager.switchInput(callid, inputName) + def run(self): """Processing method for this thread""" diff --git a/tools/dringctrl/dringctrl.py b/tools/dringctrl/dringctrl.py index 7e1e64f029..82434a9898 100755 --- a/tools/dringctrl/dringctrl.py +++ b/tools/dringctrl/dringctrl.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright (C) 2015 Savoir-Faire Linux Inc. +# Copyright (C) 2015 Savoir-faire Linux Inc. # Author: Guillaume Roguez <guillaume.roguez@savoirfairelinux.com> # # This program is free software; you can redistribute it and/or modify @@ -16,18 +16,6 @@ # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. -# -# Additional permission under GNU GPL version 3 section 7: -# -# If you modify this program, or any covered work, by linking or -# combining it with the OpenSSL project's OpenSSL library (or a -# modified version of that library), containing parts covered by the -# terms of the OpenSSL or SSLeay licenses, Savoir-Faire Linux Inc. -# grants you additional permission to convey the resulting work. -# Corresponding Source for a non-source form of such a combination -# shall include the source code for the parts of OpenSSL used as well -# as that of the covered work. -# import sys import os @@ -100,7 +88,7 @@ if __name__ == "__main__": parser.add_argument('--dtmf', help='Send DTMF', metavar='<key>') parser.add_argument('--toggleVideo', help='Launch toggle video tests', action='store_true') - parser.add_argument('--test', help='Launch automatic tests', action='store_true') + parser.add_argument('--test', help=' '.join(str(test) for test in DRingTester().getTestName() ), metavar='<testName>') args = parser.parse_args() @@ -179,7 +167,7 @@ if __name__ == "__main__": ctrl.Dtmf(args.dtmf) if args.test: - DRingTester().start(ctrl) + DRingTester().start(ctrl, args.test) if args.toggleVideo: if not ctrl.videomanager: diff --git a/tools/dringctrl/tester.py b/tools/dringctrl/tester.py index f1c747569e..1c934f761f 100644 --- a/tools/dringctrl/tester.py +++ b/tools/dringctrl/tester.py @@ -36,29 +36,52 @@ from random import shuffle from errors import * -# Dht Client -SIP_TEST_ACCOUNT = 'sf1' -# Dht Client -DHT_TEST_ACCOUNT = '6c8d591e7c55b348338209bb6f9f638688d50034' -# Ring Client -RING_TEST_ACCOUNT = '192.168.48.125' -# RING_TEST_ACCOUNT = '192.168.48.158' -# Polycom Client -POLYCOM_TEST_ACCOUNT = '192.168.40.38' + WITH_HOLD = True WITHOUT_HOLD = False +ALL_TEST_NAME = { + 'TestConfig': 'testConfig', + 'LoopCallDht': 'testLoopCallDht', + 'VideoBirtateDHT': 'testLoopCallDhtWithIncBitrate', + 'SimultenousDHTCall' : 'testSimultaneousLoopCallDht', + 'DhtCallHold' : 'testLoopCallDhtWithHold' + } class DRingTester(): + + DHT_ACCOUNT_ID = '' + SIP_ACCOUNT_ID = '' + # Dht Client + SIP_TEST_ACCOUNT = 'sf1' + # Dht Client + DHT_TEST_ACCOUNT = '280ca11317ec90a939c86fbfa06532dbb8a08f8a' + # Ring Client + RING_TEST_ACCOUNT = '192.168.50.143' + # Polycom Client + POLYCOM_TEST_ACCOUNT = '192.168.40.38' + FILE_TEST = '/home/eloi/Videos/Mad_Max_Fury_Road_2015_Trailer_F4_5.1-1080p-HDTN.mp4' + + minBitrate = 400 + maxBitrate = 4000 + incBitrate = 100 + def testConfig(self, ctrl): print("**[BEGIN] test config") allCodecs = ctrl.getAllCodecs() if len(allCodecs) == 0: print("error no codec on the system") return 0 + + print("**[SUCCESS] test config") print("**[END] test config") return 1 +# +# Helpers +# + def getTestName(self): + return ALL_TEST_NAME.keys() def checkIP2IPAccount(self, ctrl): ipAccount = ctrl.getAllAccounts() @@ -87,66 +110,117 @@ class DRingTester(): print("UnHolding: " + callId) ctrl.UnHold(callId) - def startCallTests(self, ctrl, localAccount, destAccount, nbIteration, - delay, withHold): - print("**[BEGIN] Call Test for account:" + localAccount) - if localAccount == 'IP2IP': - ctrl.setAccount(localAccount) +# +# tests +# + +# testLoopCallDht +# perform <nbIteration> DHT calls using <delay> between each call + + def testLoopCallDht(self, ctrl, nbIteration, delay): + print("**[BEGIN] DHT Call Test") count = 0 while count < nbIteration: print("[%s/%s]" % (count, nbIteration)) - self.setRandomActiveCodecs(ctrl, localAccount) - ctrl.Call(destAccount) - callId = ctrl.getAllCalls()[0] + self.setRandomActiveCodecs(ctrl, self.DHT_ACCOUNT_ID) - if withHold: - delayHold = 5 - nbHold = delay / (delayHold * 2) - countHold = 0 + callId = ctrl.Call(self.DHT_TEST_ACCOUNT) - while countHold < nbHold: - self.holdToggle(ctrl, callId, delayHold) - countHold += 1 - else: - time.sleep(delay) + # switch to file input + ctrl.switchInput(callId,'file://'+self.FILE_TEST) + + time.sleep(delay) ctrl.HangUp(callId) count += 1 - print("**[END] Call Test for account:" + localAccount) + print("**[END] DHT Call Test") + +# testLoopCallDhtWithHold +# perform <nbIteration> DHT calls using <delay> between each call +# perform stress hold/unhold between each call + + def testLoopCallDhtWithHold(self, ctrl, nbIteration, delay): + print("**[BEGIN] DHT Call Test") + + count = 0 + while count < nbIteration: + print("[%s/%s]" % (count, nbIteration)) + + self.setRandomActiveCodecs(ctrl, self.DHT_ACCOUNT_ID) + + callId = ctrl.Call(self.DHT_TEST_ACCOUNT) - def startSimultaneousCallTests(self, ctrl, localAccount, destAccount, - nbIteration, delay, withHold, nbCalls): - print("**[BEGIN] Call Test for account:" + localAccount) - if localAccount == 'IP2IP': - ctrl.setAccount(localAccount) + # switch to file input + ctrl.switchInput(callId,'file://'+self.FILE_TEST) + + delayHold = 5 + nbHold = delay / (delayHold * 2) + countHold = 0 + + while countHold < nbHold: + self.holdToggle(ctrl, callId, delayHold) + countHold = countHold + 1 + + + ctrl.HangUp(callId) + count += 1 + + print("**[END] DHT Call Test") + + +# testLoopCallDhtWithIncBitrate +# perform <nbIteration> DHT calls using <delay> between each call +# inc bitrate between each iteration + + def testLoopCallDhtWithIncBitrate(self, ctrl, nbIteration, delay): + print("**[BEGIN] VIDEO Bitrate Test") count = 0 + currBitrate = self.minBitrate + while count < nbIteration: print("[%s/%s]" % (count, nbIteration)) - self.setRandomActiveCodecs(ctrl, localAccount) + + self.setRandomActiveCodecs(ctrl, self.DHT_ACCOUNT_ID) + print("setting video bitrate to "+str(currBitrate)) + ctrl.setCodecBitrate(self.DHT_ACCOUNT_ID, currBitrate) + + callId = ctrl.Call(self.DHT_TEST_ACCOUNT) + + # switch to file input + ctrl.switchInput(callId,'file://'+self.FILE_TEST) + + time.sleep(delay) + + ctrl.HangUp(callId) + count += 1 + + currBitrate += self.incBitrate + if (currBitrate > self.maxBitrate): + currBitrate = self.minBitrate + + print("**[END] VIDEO Bitrate Test") + +# testSimultaneousLoopCallDht +# perform <nbIteration> simultaneous DHT calls using <delay> between each call + + def testSimultaneousLoopCallDht(self, ctrl, nbIteration, delay): + count = 0 + while count < nbIteration: + print("[%s/%s]" % (count, nbIteration)) + self.setRandomActiveCodecs(ctrl, self.DHT_ACCOUNT_ID) # do all the calls currCall = 0 - while currCall < nbCalls: - ctrl.Call(destAccount) + NB_SIMULTANEOUS_CALL = 10; + while currCall <= NB_SIMULTANEOUS_CALL: + ctrl.Call(self.DHT_TEST_ACCOUNT) time.sleep(1) currCall = currCall + 1 - # hold or wait for each call - if withHold: - delayHold = 5 - nbHold = delay / (delayHold * 2) - countHold = 0 - - while countHold < nbHold: - for callId in ctrl.getAllCalls(): - self.holdToggle(ctrl, callId, delayHold) - countHold = countHold + 1 - - else: time.sleep(delay) # hangup each call @@ -157,31 +231,48 @@ class DRingTester(): print("**[END] Call Test for account:" + localAccount) - def start(self, ctrl): - # testConfig - if not (self.testConfig(ctrl)): - print(("testConfig [KO]")) + + + +# Main function + + def start(self, ctrl, testName): + + if not testName in ALL_TEST_NAME: + print(("wrong test name")) return - else: - print(("testConfig [OK]")) + + #getConfig + self.DHT_ACCOUNT_ID = ctrl.getAllAccounts('RING')[0] + self.SIP_ACCOUNT_ID = ctrl.getAllAccounts('SIP')[0] + TEST_NB_ITERATION = 100 + TEST_DELAY = 30 + + getattr(self, ALL_TEST_NAME[testName])(ctrl,TEST_NB_ITERATION, TEST_DELAY) + + + +""" + + # DHT tests + dhtAccount = ctrl.getAllAccounts('RING')[0] + self.startDynamicBitrateCallTests(ctrl, dhtAccount, DHT_TEST_ACCOUNT, 100, 60, 250, 4000, 100) + self.startCallTests(ctrl, dhtAccount, DHT_TEST_ACCOUNT, 100, 60, WITHOUT_HOLD) + self.startCallTests(ctrl, dhtAccount, DHT_TEST_ACCOUNT, 100, 60, WITH_HOLD) # RING IP2IP tests self.startCallTests(ctrl, 'IP2IP', RING_TEST_ACCOUNT, 1000, 20, WITHOUT_HOLD) self.startCallTests(ctrl, 'IP2IP', RING_TEST_ACCOUNT, 1000, 20, WITH_HOLD) + + # Polycom IP2IP tests self.startCallTests(ctrl, 'IP2IP', POLYCOM_TEST_ACCOUNT, 1000, 20, WITHOUT_HOLD) self.startCallTests(ctrl, 'IP2IP', POLYCOM_TEST_ACCOUNT, 1000, 20, WITH_HOLD) # SIP tests sipAccount = ctrl.getAllAccounts('SIP')[0] - self.registerAccount(ctrl, sipAccount) # self.startSimultaneousCallTests(ctrl, sipAccount, SIP_TEST_ACCOUNT, 10, 40, WITHOUT_HOLD,10) self.startCallTests(ctrl, sipAccount, SIP_TEST_ACCOUNT, 1000, 20, WITH_HOLD) self.startCallTests(ctrl, sipAccount, SIP_TEST_ACCOUNT, 1000, 20, WITHOUT_HOLD) - - # DHT tests - dhtAccount = ctrl.getAllAccounts('RING')[0] - self.registerAccount(ctrl, dhtAccount) - self.startCallTests(ctrl, dhtAccount, DHT_TEST_ACCOUNT, 10, 60, WITHOUT_HOLD) - self.startCallTests(ctrl, dhtAccount, DHT_TEST_ACCOUNT, 10, 60, WITH_HOLD) +""" -- GitLab