From f943678ec88e88809dd9c82be2368030f13f5b0e Mon Sep 17 00:00:00 2001
From: Alexandre Savard <alexandre.savard@savoirfairelinux.com>
Date: Wed, 11 Apr 2012 14:57:01 -0400
Subject: [PATCH] #9621 Make registered account call as functional tests

---
 tools/pysflphone/sflphonectrlsimple.py        |  40 +++--
 tools/pysflphone/sippwrap.py                  | 157 +++++++++++++++++
 .../test_sflphone_dbus_interface.py           | 164 ++++++++++++++++--
 3 files changed, 335 insertions(+), 26 deletions(-)
 create mode 100644 tools/pysflphone/sippwrap.py

diff --git a/tools/pysflphone/sflphonectrlsimple.py b/tools/pysflphone/sflphonectrlsimple.py
index 1838ba5371..a8aa40c8b8 100755
--- a/tools/pysflphone/sflphonectrlsimple.py
+++ b/tools/pysflphone/sflphonectrlsimple.py
@@ -75,10 +75,13 @@ class SflPhoneCtrlSimple(Thread):
 
 	self.test = test
 	self.onIncomingCall_cb = None
+        self.onCallRinging_cb = None
+        self.onCallCurrent_cb = None
+        self.onCallFailure_cb = None
 	self.event = Event()
 
 	gobject.threads_init()
-	
+
 
 
     def __del__(self):
@@ -90,7 +93,7 @@ class SflPhoneCtrlSimple(Thread):
     def stopThread(self):
         print "Stop PySFLphone"
         self.isStop = True
-	
+
 
 
     def register(self):
@@ -192,31 +195,46 @@ class SflPhoneCtrlSimple(Thread):
 	if(self.test):
             # TODO fix this bug in daemon, cannot answer too fast
             time.sleep(0.5)
-	    if self.onIncomingCall_cb(self) is not None:
+	    if self.onIncomingCall_cb(self):
                 self.onIncomingCall_cb(self)
-	
+
 
 
     # On call state changed event, set the values for new calls,
     # or delete the call from the list of active calls
     def onCallStateChanged(self, callid, state):
         print "Call state changed: " + callid + ", " + state
-        if state == "HUNGUP":
+        self.currentCallId = callid
+        if state is "HUNGUP":
             try:
                 del self.activeCalls[callid]
             except KeyError:
                 print "Call " + callid + " didn't exist. Cannot delete."
 
-        elif state in [ "RINGING", "CURRENT", "INCOMING", "HOLD" ]:
+        elif state is "RINGING":
             try:
                 self.activeCalls[callid]['State'] = state
+                if self.onCallRinging_cb:
+                    self.onCallRinging_cb(self)
             except KeyError, e:
                 print "This call didn't exist!: " + callid + ". Adding it to the list."
                 callDetails = self.getCallDetails(callid)
                 self.activeCalls[callid] = {'Account': callDetails['ACCOUNTID'],
 					    'To': callDetails['PEER_NUMBER'], 'State': state }
+        elif state in [ "CURRENT", "INCOMING", "HOLD" ]:
+            try:
+                self.activeCalls[callid]['State'] = state
+                if self.onCallCurrent_cb:
+                    self.onCallCurrent_cb(self)
+                callDetails = self.getCallDetails(callid)
+                self.activeCalls[callid] = {'Account': callDetails['ACCOUNTID'],
+					    'To': callDetails['PEER_NUMBER'], 'State': state }
+            except KeyError, e:
+                print "This call didn't exist!: " + callid + ". Adding it to the list."
         elif state in [ "BUSY", "FAILURE" ]:
             try:
+	        if self.onCallFailure_cb:
+                    self.onCallFailure_cb(self)
                 del self.activeCalls[callid]
             except KeyError, e:
                 print "This call didn't exist!: " + callid
@@ -237,7 +255,7 @@ class SflPhoneCtrlSimple(Thread):
 	Required parameters are type, alias, hostname, username and password
 
 	input details
-	
+
 	"""
 
 	if details is None:
@@ -381,7 +399,7 @@ class SflPhoneCtrlSimple(Thread):
         if account is None:
             raise SflPhoneError("No provided or current account !")
         return account in self.getAllAccounts()
-			
+
     def getAllRegisteredAccounts(self):
         """Return a list of registered accounts"""
 
@@ -496,7 +514,7 @@ class SflPhoneCtrlSimple(Thread):
 
 	if dest is None or dest == "":
             raise SflPhoneError("Invalid call destination")
-	
+
         # Set the account to be used for this call
 	if dest.find('sip:') is 0 or dest.find('sips:') is 0:
             print "Ip 2 IP call"
@@ -508,7 +526,7 @@ class SflPhoneCtrlSimple(Thread):
             raise SflPhoneError("Can't place a call without a registered account")
 
         # Generate a call ID for this call
-        callid = self.GenerateCallID()	
+        callid = self.GenerateCallID()
 
         # Add the call to the list of active calls and set status to SENT
         self.activeCalls[callid] = {'Account': self.account, 'To': dest, 'State': 'SENT' }
@@ -576,7 +594,7 @@ class SflPhoneCtrlSimple(Thread):
 
         if callid is None or callid == "":
             raise SflPhoneError("Invalid callID")
-	
+
         self.callmanager.accept(callid)
 
 
diff --git a/tools/pysflphone/sippwrap.py b/tools/pysflphone/sippwrap.py
new file mode 100644
index 0000000000..b959478241
--- /dev/null
+++ b/tools/pysflphone/sippwrap.py
@@ -0,0 +1,157 @@
+#!/usr/bin/env python
+#
+# Copyright (C) 2009 by the Free Software Foundation, Inc.
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+
+import os
+
+class SippWrapper:
+    """ Wrapper taht allow for managing sipp command line easily """
+
+    def __init__(self):
+        self.commandLine = "sipp"
+        self.remoteServer = ""
+        self.remotePort = ""
+        self.localInterface = ""
+        self.localPort = ""
+        self.customScenarioFile = ""
+        self.isUserAgenClient = True
+        self.launchInBackground = False
+        self.numberOfCall = 0
+        self.numberOfSimultaneousCall = 0
+        self.enableTraceMsg = False
+        self.enableTraceShormsg = False
+        self.enableTraceScreen = False
+        self.enableTraceError = False
+        self.enableTraceStat = False
+        self.enableTraceCounts = False
+        self.enableTraceRtt = False
+        self.enableTraceLogs = False
+
+    def buildCommandLine(self):
+	""" Fill the command line arguments based on specified parameters """
+
+        if not self.remotePort and not self.remoteServer:
+            self.isUserAgentClient = False
+        elif self.remotePort and not self.remoteServer:
+	    print "Error cannot have remote port specified with no server"
+            return
+
+        if self.remoteServer:
+            self.commandLine += " " + self.remoteServer
+
+        if self.remotePort:
+            self.commandLine += ":" + self.remotePort
+
+        if self.localInterface:
+            self.commandLine += " -i " + self.localInterface
+
+        if self.localPort:
+            self.commandLine += " -p " + self.localPort
+
+        if self.customScenarioFile:
+            self.commandLine += " -sf " + self.customScenarioFile
+        elif self.isUserAgentClient is True:
+            self.commandLine += " -sn uac"
+        elif self.isUserAgentClient is False:
+            self.commandLine += " -sn uas"
+
+        if self.launchInBackground:
+            self.commandLine += " -bg"
+
+        if self.numberOfCall:
+            self.commandLine += " -m " + str(self.numberOfCall)
+
+        if self.numberOfSimultaneousCall:
+            self.commandLine += " -l " + str(self.numberOfSimultaneousCall)
+
+        if self.enableTraceMsg:
+            self.commandLine += " -trace_msg"
+
+        if self.enableTraceShormsg:
+            self.commandLine += " -trace_shortmsg"
+
+        if self.enableTraceScreen:
+            self.commandLine += " -trace_screen"
+
+        if self.enableTraceError:
+            self.commandLine += " -trace_err"
+
+        if self.enableTraceStat:
+            self.commandLine += " -trace_stat"
+
+        if self.enableTraceCounts:
+            self.commandLine += " -trace_counts"
+
+        if self.enableTraceRtt:
+            self.commandLine += " -trace_rtt"
+
+        if self.enableTraceLogs:
+            self.commandLine += " -trace_logs"
+
+
+    def launch(self):
+        """ Launch the sipp instance using the specified arguments """
+
+        self.buildCommandLine()
+        print self.commandLine
+        return os.system(self.commandLine)
+
+
+class SippScreenStatParser:
+    """ Class that parse statistic reported by a sipp instance
+        report some of the most important value """
+
+    def __init__(self, filename):
+        print "Opening " + filename
+        self.logfile = open(filename, "r").readlines()
+        print self.logfile[39]
+        print self.logfile[40]
+
+    def isAnyFailedCall(self):
+        """ Look for any failed call
+            Return true if there are failed call, false elsewhere """
+
+        # TODO: Find a better way to determine which line to consider
+        if "Failed call" not in self.logfile[40]:
+            print "Error: Could not find 'Failed call' statistics"
+            # We consider this as a failure
+            return True
+
+        return "1" in self.logfile[40]
+
+    def isAnySuccessfulCall(self):
+        """ Look for any successful call
+            Return true if there are successful call, false elsewhere """
+
+        # TODO: Find a better way to determine which line to consider
+        if "Successful call" not in self.logfile[39]:
+            print "Error: Could not find 'Successful call' statistics"
+            return False
+
+        return "1" in self.logfile[39]
+
+
+
+def test_result_parsing():
+    dirlist = os.listdir("./")
+
+    logfile = [x for x in dirlist if "screen.log" in x]
+    testResult = SippScreenStatParser(logfile[0])
+
+    assert(not testResult.isAnyFailedCall())
+
+    assert(testResult.isAnySuccessfulCall())
diff --git a/tools/pysflphone/test_sflphone_dbus_interface.py b/tools/pysflphone/test_sflphone_dbus_interface.py
index c45156decf..e865efaa4d 100644
--- a/tools/pysflphone/test_sflphone_dbus_interface.py
+++ b/tools/pysflphone/test_sflphone_dbus_interface.py
@@ -16,8 +16,11 @@
 # along with this program; if not, write to the Free Software
 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
 
+import os
 import time
 import logging
+from sippwrap import SippWrapper
+from sippwrap import SippScreenStatParser
 from sflphonectrlsimple import SflPhoneCtrlSimple
 
 from nose.tools import nottest
@@ -26,14 +29,42 @@ from nose.tools import nottest
 ### function starting with 'test' are executed.
 ###
 
-# Open sflphone and connect to sflphoned through dbus
-sflphone = SflPhoneCtrlSimple(True)
-
 accountList = ["IP2IP", "Account:1332798167"]
 
+SCENARIO_PATH = "../sippxml/"
+
+def callHangup(sflphone):
+    """ On incoming call, answer the callm, then hangup """
+
+    print "Hangup Call with id " + sflphone.currentCallId
+    sflphone.HangUp(sflphone.currentCallId)
+
+    time.sleep(3)
+
+    print "Stopping Thread"
+    sflphone.stopThread()
+
+
+def callIsRinging(sflphone):
+    """ Display messages when call is ringing """
+
+    print "The call is ringing"
+
+
+def leaveThreadOnFailure(sflphone):
+    """ If a failure occurs duing the call, just leave the running thread """
+
+    print "Stopping Thread"
+    sflphone.stopThread()
+
+
+
 class TestSFLPhoneAccountConfig:
+    """ The test suite for account configuration """
 
     def __init__(self):
+        self.sflphone = SflPhoneCtrlSimple(True)
+
         self.logger = logging.getLogger("TestSFLPhoneAccountConfig")
         filehdlr = logging.FileHandler("/tmp/sflphonedbustest.log")
         formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s")
@@ -44,28 +75,28 @@ class TestSFLPhoneAccountConfig:
     @nottest
     def test_get_account_list(self):
         self.logger.info("Test get account list")
-        accList = sflphone.getAllAccounts()
+        accList = self.sflphone.getAllAccounts()
         listIntersection = set(accList) & set(accountList)
         assert len(listIntersection) == len(accountList)
 
     @nottest
     def test_account_registration(self):
         self.logger.info("Test account registration")
-        accList = [x for x in sflphone.getAllAccounts() if x != "IP2IP"]
+        accList = [x for x in self.sflphone.getAllAccounts() if x != "IP2IP"]
         for acc in accList:
 	    self.logger.info("Registering account " + acc)
 
-            if sflphone.isAccountEnable(acc):
-               sflphone.setAccountEnable(acc, False)
+            if self.sflphone.isAccountEnable(acc):
+               self.sflphone.setAccountEnable(acc, False)
                time.sleep(2)
 
             # Account should not be registered
-            assert sflphone.isAccountRegistered(acc)
+            assert self.sflphone.isAccountRegistered(acc)
 
-            sflphone.setAccountEnable(acc, True)
+            self.sflphone.setAccountEnable(acc, True)
             time.sleep(2)
 
-            assert sflphone.isAccountRegistered(acc)
+            assert self.sflphone.isAccountRegistered(acc)
 
     @nottest
     def test_add_remove_account(self):
@@ -74,20 +105,123 @@ class TestSFLPhoneAccountConfig:
         newAccList = []
 
         # consider only true accounts
-        accList = [x for x in sflphone.getAllAccounts() if x != "IP2IP"]
+        accList = [x for x in self.sflphone.getAllAccounts() if x != "IP2IP"]
 
         # Store the account details localy
         for acc in accList:
-            accountDetails[acc] = sflphone.getAccountDetails(acc)
+            accountDetails[acc] = self.sflphone.getAccountDetails(acc)
 
         # Remove all accounts from sflphone
         for acc in accountDetails:
-            sflphone.removeAccount(acc)
+            self.sflphone.removeAccount(acc)
 
         # Recreate all accounts
         for acc in accountDetails:
-            newAccList.append(sflphone.addAccount(accountDetails[acc]))
+            newAccList.append(self.sflphone.addAccount(accountDetails[acc]))
 
         # New accounts should be automatically registered
         for acc in newAccList:
-            assert sflphone.isAccountRegistered(acc)
+            assert self.sflphone.isAccountRegistered(acc)
+
+
+
+class TestSFLPhoneRegisteredCalls:
+    """ The test suite for call interaction """
+
+    def __init__(self):
+        self.sflphone = SflPhoneCtrlSimple(True)
+
+        self.logger = logging.getLogger("TestSFLPhoneRegisteredCalls")
+        filehdlr = logging.FileHandler("/tmp/sfltestregisteredcall.log")
+        formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s")
+        filehdlr.setFormatter(formatter)
+        self.logger.addHandler(filehdlr)
+        self.logger.setLevel(logging.INFO)
+        self.sippRegistrationInstance = SippWrapper()
+        self.sippCallInstance = SippWrapper()
+        self.localInterface = "127.0.0.1"
+        self.localPort = str(5064)
+        self.sflphone.onCallRinging_cb = callIsRinging
+        self.sflphone.onCallCurrent_cb = callHangup
+        self.sflphone.onCallFailure_cb = leaveThreadOnFailure
+
+        # Make sure the test directory is populated with most recent log files
+        self.clean_log_directory()
+
+    def clean_log_directory(self):
+        dirlist = os.listdir("./")
+        files = [x for x in dirlist if "screen.log" in x]
+        for f in files:
+	    os.remove(f)
+
+    def find_sipp_pid(self):
+        # Retreive the PID of the last
+        # The /proc/PID/cmdline contain the command line from
+        pids = [int(x) for x in os.listdir("/proc") if x.isdigit()]
+        sippPid = [pid for pid in pids if "sipp" in open("/proc/" + str(pid) + "/cmdline").readline()]
+
+        return sippPid[0]
+
+    def parse_results(self):
+        dirlist = os.listdir("./")
+        logfile = [x for x in dirlist if "screen.log" in x]
+        print logfile
+
+        fullpath = os.path.dirname(os.path.realpath(__file__)) + "/"
+
+        # there should be only one screen.log file (see clean_log_directory)
+        resultParser = SippScreenStatParser(fullpath + logfile[0])
+
+        assert(not resultParser.isAnyFailedCall())
+        assert(resultParser.isAnySuccessfulCall())
+
+    def test_registered_call(self):
+        self.logger.info("Test Registered Call")
+
+        # launch the sipp instance in background
+        # sipp 127.0.0.1:5060 -sf uac_register_no_cvs.xml -i 127.0.0.1 -p 5062
+        self.sippRegistrationInstance.remoteServer = "127.0.0.1"
+        self.sippRegistrationInstance.remotePort = str(5062)
+        self.sippRegistrationInstance.localInterface = self.localInterface
+        self.sippRegistrationInstance.localPort = self.localPort
+        self.sippRegistrationInstance.customScenarioFile = SCENARIO_PATH + "uac_register_no_cvs.xml"
+        self.sippRegistrationInstance.launchInBackground = True
+        self.sippRegistrationInstance.numberOfCall = 1
+        self.sippRegistrationInstance.numberOfSimultaneousCall = 1
+
+        self.sippRegistrationInstance.launch()
+
+        # wait for this instance of sipp to complete registration
+        sippPid = self.find_sipp_pid()
+        while os.path.exists("/proc/" + str(sippPid)):
+            time.sleep(1)
+
+        # sipp -sn uas -p 5062 -i 127.0.0.1
+        self.sippCallInstance.localInterface = self.localInterface
+        self.sippCallInstance.localPort = self.localPort
+        self.sippCallInstance.launchInBackground = True
+        self.sippCallInstance.numberOfCall = 1
+        self.sippCallInstance.numberOfSimultaneousCall = 1
+        self.sippCallInstance.enableTraceScreen = True
+
+        self.sippCallInstance.launch()
+
+        sippPid = self.find_sipp_pid()
+
+        # make sure every account are enabled
+        accList = [x for x in self.sflphone.getAllAccounts() if x != "IP2IP"]
+        for acc in accList:
+            if not self.sflphone.isAccountRegistered(acc):
+                self.sflphone.setAccountEnable(acc, True)
+
+        # Make a call to the SIPP instance
+        self.sflphone.Call("300")
+
+        # Start Glib mainloop to process callbacks
+        self.sflphone.start()
+
+        # Wait the sipp instance to dump log files
+        while os.path.exists("/proc/" + str(sippPid)):
+            time.sleep(1)
+
+        self.parse_results()
-- 
GitLab