Commit 9b3f4a27 authored by Éloi Bail's avatar Éloi Bail Committed by gerrit2

tools: refactor automatic call test tool

Refactor of automatic call test tool
Now do ./dringctrl --help : all test name will be listed
./dringctrl --test <testName> to execute a test

Issue: #80498
Change-Id: I86d16e7d96f04f040133f16856ba9ca8a061374d
parent 6432f74f
......@@ -457,6 +457,13 @@ class DRingCtrl(Thread):
account = self._valid_account(account)
return [int(x) for x in self.configurationmanager.getActiveCodecList(account)]
def setCodecBitrate(self, account, bitrate):
""" Change bitrate for all codecs on given account"""
for codecId in self.configurationmanager.getActiveCodecList(account):
details = self.configurationmanager.getCodecDetails(account, codecId)
details['CodecInfo.bitrate'] = str(bitrate)
self.configurationmanager.setCodecDetails(account, codecId, details)
#
# Call management
#
......@@ -599,6 +606,11 @@ class DRingCtrl(Thread):
self.callmanager.hangUpConference(confId)
def switchInput(self, callid, inputName):
"""switch to input if exist"""
return self.callmanager.switchInput(callid, inputName)
def run(self):
"""Processing method for this thread"""
......
#!/usr/bin/env python3
#
# Copyright (C) 2015 Savoir-Faire Linux Inc.
# Copyright (C) 2015 Savoir-faire Linux Inc.
# Author: Guillaume Roguez <guillaume.roguez@savoirfairelinux.com>
#
# This program is free software; you can redistribute it and/or modify
......@@ -16,18 +16,6 @@
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
# Additional permission under GNU GPL version 3 section 7:
#
# If you modify this program, or any covered work, by linking or
# combining it with the OpenSSL project's OpenSSL library (or a
# modified version of that library), containing parts covered by the
# terms of the OpenSSL or SSLeay licenses, Savoir-Faire Linux Inc.
# grants you additional permission to convey the resulting work.
# Corresponding Source for a non-source form of such a combination
# shall include the source code for the parts of OpenSSL used as well
# as that of the covered work.
#
import sys
import os
......@@ -100,7 +88,7 @@ if __name__ == "__main__":
parser.add_argument('--dtmf', help='Send DTMF', metavar='<key>')
parser.add_argument('--toggleVideo', help='Launch toggle video tests', action='store_true')
parser.add_argument('--test', help='Launch automatic tests', action='store_true')
parser.add_argument('--test', help=' '.join(str(test) for test in DRingTester().getTestName() ), metavar='<testName>')
args = parser.parse_args()
......@@ -179,7 +167,7 @@ if __name__ == "__main__":
ctrl.Dtmf(args.dtmf)
if args.test:
DRingTester().start(ctrl)
DRingTester().start(ctrl, args.test)
if args.toggleVideo:
if not ctrl.videomanager:
......
......@@ -36,29 +36,52 @@ from random import shuffle
from errors import *
# Dht Client
SIP_TEST_ACCOUNT = 'sf1'
# Dht Client
DHT_TEST_ACCOUNT = '6c8d591e7c55b348338209bb6f9f638688d50034'
# Ring Client
RING_TEST_ACCOUNT = '192.168.48.125'
# RING_TEST_ACCOUNT = '192.168.48.158'
# Polycom Client
POLYCOM_TEST_ACCOUNT = '192.168.40.38'
WITH_HOLD = True
WITHOUT_HOLD = False
ALL_TEST_NAME = {
'TestConfig': 'testConfig',
'LoopCallDht': 'testLoopCallDht',
'VideoBirtateDHT': 'testLoopCallDhtWithIncBitrate',
'SimultenousDHTCall' : 'testSimultaneousLoopCallDht',
'DhtCallHold' : 'testLoopCallDhtWithHold'
}
class DRingTester():
DHT_ACCOUNT_ID = ''
SIP_ACCOUNT_ID = ''
# Dht Client
SIP_TEST_ACCOUNT = 'sf1'
# Dht Client
DHT_TEST_ACCOUNT = '280ca11317ec90a939c86fbfa06532dbb8a08f8a'
# Ring Client
RING_TEST_ACCOUNT = '192.168.50.143'
# Polycom Client
POLYCOM_TEST_ACCOUNT = '192.168.40.38'
FILE_TEST = '/home/eloi/Videos/Mad_Max_Fury_Road_2015_Trailer_F4_5.1-1080p-HDTN.mp4'
minBitrate = 400
maxBitrate = 4000
incBitrate = 100
def testConfig(self, ctrl):
print("**[BEGIN] test config")
allCodecs = ctrl.getAllCodecs()
if len(allCodecs) == 0:
print("error no codec on the system")
return 0
print("**[SUCCESS] test config")
print("**[END] test config")
return 1
#
# Helpers
#
def getTestName(self):
return ALL_TEST_NAME.keys()
def checkIP2IPAccount(self, ctrl):
ipAccount = ctrl.getAllAccounts()
......@@ -87,66 +110,117 @@ class DRingTester():
print("UnHolding: " + callId)
ctrl.UnHold(callId)
def startCallTests(self, ctrl, localAccount, destAccount, nbIteration,
delay, withHold):
print("**[BEGIN] Call Test for account:" + localAccount)
if localAccount == 'IP2IP':
ctrl.setAccount(localAccount)
#
# tests
#
# testLoopCallDht
# perform <nbIteration> DHT calls using <delay> between each call
def testLoopCallDht(self, ctrl, nbIteration, delay):
print("**[BEGIN] DHT Call Test")
count = 0
while count < nbIteration:
print("[%s/%s]" % (count, nbIteration))
self.setRandomActiveCodecs(ctrl, localAccount)
ctrl.Call(destAccount)
callId = ctrl.getAllCalls()[0]
self.setRandomActiveCodecs(ctrl, self.DHT_ACCOUNT_ID)
if withHold:
delayHold = 5
nbHold = delay / (delayHold * 2)
countHold = 0
callId = ctrl.Call(self.DHT_TEST_ACCOUNT)
while countHold < nbHold:
self.holdToggle(ctrl, callId, delayHold)
countHold += 1
else:
time.sleep(delay)
# switch to file input
ctrl.switchInput(callId,'file://'+self.FILE_TEST)
time.sleep(delay)
ctrl.HangUp(callId)
count += 1
print("**[END] Call Test for account:" + localAccount)
print("**[END] DHT Call Test")
# testLoopCallDhtWithHold
# perform <nbIteration> DHT calls using <delay> between each call
# perform stress hold/unhold between each call
def testLoopCallDhtWithHold(self, ctrl, nbIteration, delay):
print("**[BEGIN] DHT Call Test")
count = 0
while count < nbIteration:
print("[%s/%s]" % (count, nbIteration))
self.setRandomActiveCodecs(ctrl, self.DHT_ACCOUNT_ID)
callId = ctrl.Call(self.DHT_TEST_ACCOUNT)
def startSimultaneousCallTests(self, ctrl, localAccount, destAccount,
nbIteration, delay, withHold, nbCalls):
print("**[BEGIN] Call Test for account:" + localAccount)
if localAccount == 'IP2IP':
ctrl.setAccount(localAccount)
# switch to file input
ctrl.switchInput(callId,'file://'+self.FILE_TEST)
delayHold = 5
nbHold = delay / (delayHold * 2)
countHold = 0
while countHold < nbHold:
self.holdToggle(ctrl, callId, delayHold)
countHold = countHold + 1
ctrl.HangUp(callId)
count += 1
print("**[END] DHT Call Test")
# testLoopCallDhtWithIncBitrate
# perform <nbIteration> DHT calls using <delay> between each call
# inc bitrate between each iteration
def testLoopCallDhtWithIncBitrate(self, ctrl, nbIteration, delay):
print("**[BEGIN] VIDEO Bitrate Test")
count = 0
currBitrate = self.minBitrate
while count < nbIteration:
print("[%s/%s]" % (count, nbIteration))
self.setRandomActiveCodecs(ctrl, localAccount)
self.setRandomActiveCodecs(ctrl, self.DHT_ACCOUNT_ID)
print("setting video bitrate to "+str(currBitrate))
ctrl.setCodecBitrate(self.DHT_ACCOUNT_ID, currBitrate)
callId = ctrl.Call(self.DHT_TEST_ACCOUNT)
# switch to file input
ctrl.switchInput(callId,'file://'+self.FILE_TEST)
time.sleep(delay)
ctrl.HangUp(callId)
count += 1
currBitrate += self.incBitrate
if (currBitrate > self.maxBitrate):
currBitrate = self.minBitrate
print("**[END] VIDEO Bitrate Test")
# testSimultaneousLoopCallDht
# perform <nbIteration> simultaneous DHT calls using <delay> between each call
def testSimultaneousLoopCallDht(self, ctrl, nbIteration, delay):
count = 0
while count < nbIteration:
print("[%s/%s]" % (count, nbIteration))
self.setRandomActiveCodecs(ctrl, self.DHT_ACCOUNT_ID)
# do all the calls
currCall = 0
while currCall < nbCalls:
ctrl.Call(destAccount)
NB_SIMULTANEOUS_CALL = 10;
while currCall <= NB_SIMULTANEOUS_CALL:
ctrl.Call(self.DHT_TEST_ACCOUNT)
time.sleep(1)
currCall = currCall + 1
# hold or wait for each call
if withHold:
delayHold = 5
nbHold = delay / (delayHold * 2)
countHold = 0
while countHold < nbHold:
for callId in ctrl.getAllCalls():
self.holdToggle(ctrl, callId, delayHold)
countHold = countHold + 1
else:
time.sleep(delay)
# hangup each call
......@@ -157,31 +231,48 @@ class DRingTester():
print("**[END] Call Test for account:" + localAccount)
def start(self, ctrl):
# testConfig
if not (self.testConfig(ctrl)):
print(("testConfig [KO]"))
# Main function
def start(self, ctrl, testName):
if not testName in ALL_TEST_NAME:
print(("wrong test name"))
return
else:
print(("testConfig [OK]"))
#getConfig
self.DHT_ACCOUNT_ID = ctrl.getAllAccounts('RING')[0]
self.SIP_ACCOUNT_ID = ctrl.getAllAccounts('SIP')[0]
TEST_NB_ITERATION = 100
TEST_DELAY = 30
getattr(self, ALL_TEST_NAME[testName])(ctrl,TEST_NB_ITERATION, TEST_DELAY)
"""
# DHT tests
dhtAccount = ctrl.getAllAccounts('RING')[0]
self.startDynamicBitrateCallTests(ctrl, dhtAccount, DHT_TEST_ACCOUNT, 100, 60, 250, 4000, 100)
self.startCallTests(ctrl, dhtAccount, DHT_TEST_ACCOUNT, 100, 60, WITHOUT_HOLD)
self.startCallTests(ctrl, dhtAccount, DHT_TEST_ACCOUNT, 100, 60, WITH_HOLD)
# RING IP2IP tests
self.startCallTests(ctrl, 'IP2IP', RING_TEST_ACCOUNT, 1000, 20, WITHOUT_HOLD)
self.startCallTests(ctrl, 'IP2IP', RING_TEST_ACCOUNT, 1000, 20, WITH_HOLD)
# Polycom IP2IP tests
self.startCallTests(ctrl, 'IP2IP', POLYCOM_TEST_ACCOUNT, 1000, 20, WITHOUT_HOLD)
self.startCallTests(ctrl, 'IP2IP', POLYCOM_TEST_ACCOUNT, 1000, 20, WITH_HOLD)
# SIP tests
sipAccount = ctrl.getAllAccounts('SIP')[0]
self.registerAccount(ctrl, sipAccount)
# self.startSimultaneousCallTests(ctrl, sipAccount, SIP_TEST_ACCOUNT, 10, 40, WITHOUT_HOLD,10)
self.startCallTests(ctrl, sipAccount, SIP_TEST_ACCOUNT, 1000, 20, WITH_HOLD)
self.startCallTests(ctrl, sipAccount, SIP_TEST_ACCOUNT, 1000, 20, WITHOUT_HOLD)
# DHT tests
dhtAccount = ctrl.getAllAccounts('RING')[0]
self.registerAccount(ctrl, dhtAccount)
self.startCallTests(ctrl, dhtAccount, DHT_TEST_ACCOUNT, 10, 60, WITHOUT_HOLD)
self.startCallTests(ctrl, dhtAccount, DHT_TEST_ACCOUNT, 10, 60, WITH_HOLD)
"""
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment