Skip to content
Snippets Groups Projects
Commit 079cad67 authored by yanmorin's avatar yanmorin
Browse files

Don't take 100% CPU
Send and receive events: OK
Remove waiting requests: not tested yet
parent d2de28ad
No related branches found
No related tags found
No related merge requests found
...@@ -40,9 +40,6 @@ class ObjectPool ...@@ -40,9 +40,6 @@ class ObjectPool
*/ */
bool pop(T &value, unsigned long time = ULONG_MAX); bool pop(T &value, unsigned long time = ULONG_MAX);
typename std::list< T >::iterator begin();
typename std::list< T >::iterator end();
private: private:
std::list< T > mPool; std::list< T > mPool;
......
...@@ -31,50 +31,25 @@ ObjectPool< T >::push(const T &value) ...@@ -31,50 +31,25 @@ ObjectPool< T >::push(const T &value)
ost::MutexLock guard(mMutex); ost::MutexLock guard(mMutex);
mPool.push_back(value); mPool.push_back(value);
mSemaphore.post(); mSemaphore.post();
std::cerr << "push value..." << std::endl;
} }
template< typename T > template< typename T >
bool bool
ObjectPool< T >::pop(T &value, unsigned long time) ObjectPool< T >::pop(T &value, unsigned long time)
{ {
ost::MutexLock guard(mMutex); //ost::MutexLock guard(mMutex);
mSemaphore.wait(time); if (mSemaphore.wait(time)) {
if(mPool.begin() == mPool.end()) { if(mPool.begin() == mPool.end()) {
std::cerr << "empty list" << std::endl;
return false; return false;
} else { } else {
std::cerr << "pop value..." << std::endl;
typename std::list< T >::iterator pos = mPool.begin(); typename std::list< T >::iterator pos = mPool.begin();
value = (*pos); value = (*pos);
mPool.pop_front(); mPool.pop_front();
return true; return true;
} }
} }
return false;
template< typename T >
typename std::list< T >::iterator
ObjectPool< T >::begin()
{
ost::MutexLock guard(mMutex);
std::cerr << mPool.size();
typename std::list< T >::iterator iter = mPool.begin();
mSemaphore.post();
return iter;
}
template< typename T >
typename std::list< T >::iterator
ObjectPool< T >::end()
{
ost::MutexLock guard(mMutex);
std::cerr << mPool.size();
typename std::list< T >::iterator iter = mPool.end();
mSemaphore.post();
return iter;
} }
#endif #endif
...@@ -27,11 +27,15 @@ ...@@ -27,11 +27,15 @@
void void
TCPSessionIO::run() { TCPSessionIO::run() {
std::string response;
while(!testCancel() && good()) { while(!testCancel() && good()) {
if (isPending(ost::TCPSocket::pendingInput)) { if (isPending(ost::TCPSocket::pendingInput, 10)) {
std::string output; std::string input;
std::getline(*this, output); std::getline(*this, input);
_gui->pushRequestMessage(output); _gui->pushRequestMessage(input);
}
if (_outputPool.pop(response, 10)) {
*this << response << std::endl;
} }
} }
} }
...@@ -45,6 +49,14 @@ GUIServer::GUIServer() ...@@ -45,6 +49,14 @@ GUIServer::GUIServer()
// destructor // destructor
GUIServer::~GUIServer() GUIServer::~GUIServer()
{ {
// Waiting Requests cleanup
std::map<std::string, Request*>::iterator iter = _waitingRequests.begin();
while (iter != _waitingRequests.end()) {
_waitingRequests.erase(iter);
delete (iter->second);
iter++;
}
} }
int int
...@@ -75,8 +87,7 @@ GUIServer::exec() { ...@@ -75,8 +87,7 @@ GUIServer::exec() {
while(_sessionIO->good()) { while(_sessionIO->good()) {
if ( _requests.pop(request, 1000)) { if ( _requests.pop(request, 1000)) {
output = request->execute(*this); output = request->execute(*this);
pushResponseMessage(output); handleExecutedRequest(request, output);
delete request;
} }
} }
} }
...@@ -93,7 +104,6 @@ GUIServer::pushRequestMessage(const std::string &request) ...@@ -93,7 +104,6 @@ GUIServer::pushRequestMessage(const std::string &request)
{ {
Request *tempRequest = _factory.create(request); Request *tempRequest = _factory.create(request);
std::cout << "pushRequestMessage" << std::endl; std::cout << "pushRequestMessage" << std::endl;
//ost::MutexLock lock(_mutex);
_requests.push(tempRequest); _requests.push(tempRequest);
} }
...@@ -101,28 +111,36 @@ void ...@@ -101,28 +111,36 @@ void
GUIServer::pushResponseMessage(const ResponseMessage &response) GUIServer::pushResponseMessage(const ResponseMessage &response)
{ {
std::cout << "pushResponseMessage" << std::endl; std::cout << "pushResponseMessage" << std::endl;
//ost::MutexLock lock(_mutex); _sessionIO->push(response.toString());
*_sessionIO << response.toString() << std::endl;
// remove the request from the list // remove the request from the list
//if (response.isFinal()) { if (response.isFinal()) {
// removeRequest(response.sequenceId()); std::map<std::string, Request*>::iterator iter = _waitingRequests.find(response.sequenceId());
//} if (iter != _waitingRequests.end()) {
_waitingRequests.erase(iter);
delete (iter->second);
}
}
} }
/** /**
* Remove a request with its sequence id * Delete the request from the list of request
* or send it into the waitingRequest map
*/ */
void void
GUIServer::removeRequest(const std::string& sequenceId) GUIServer::handleExecutedRequest(Request * const request, const ResponseMessage& response)
{ {
ost::MutexLock lock(_mutex); if (response.isFinal()) {
std::list<Request*>::iterator iter; delete request;
for(iter=_requests.begin(); iter!=_requests.end(); iter++) { } else {
if ( (*iter)->sequenceId() == sequenceId ) { if (_waitingRequests.find(request->sequenceId()) == _waitingRequests.end()) {
delete (*iter); _waitingRequests[response.sequenceId()] = request;
} else {
// we don't deal with requests with a sequenceId already send...
delete request;
} }
} }
_sessionIO->push(response.toString());
} }
/** /**
......
...@@ -40,9 +40,13 @@ public: ...@@ -40,9 +40,13 @@ public:
_gui(gui) {} _gui(gui) {}
void run(); void run();
void push(const std::string &response) {
_outputPool.push(response);
}
private: private:
GUIServer *_gui; GUIServer *_gui;
ObjectPool<std::string> _outputPool;
}; };
typedef std::map<short, SubCall> CallMap; typedef std::map<short, SubCall> CallMap;
...@@ -56,10 +60,11 @@ public: ...@@ -56,10 +60,11 @@ public:
// exec loop // exec loop
int exec(void); int exec(void);
//void handleExecutedRequest(Request* request, const
void pushRequestMessage(const std::string& request); void pushRequestMessage(const std::string& request);
Request *popRequest(void); Request *popRequest(void);
void pushResponseMessage(const ResponseMessage& response); void pushResponseMessage(const ResponseMessage& response);
void removeRequest(const std::string& sequenceId); void handleExecutedRequest(Request * const request, const ResponseMessage& response);
void insertSubCall(short id, SubCall& subCall); void insertSubCall(short id, SubCall& subCall);
void removeSubCall(short id); void removeSubCall(short id);
...@@ -87,7 +92,7 @@ public: ...@@ -87,7 +92,7 @@ public:
void hangup(const std::string& callId); void hangup(const std::string& callId);
private: private:
ost::TCPSession* _sessionIO; TCPSessionIO* _sessionIO;
/** /**
* This callMap is necessary because * This callMap is necessary because
...@@ -96,7 +101,11 @@ private: ...@@ -96,7 +101,11 @@ private:
* and also a sequence number * and also a sequence number
*/ */
CallMap _callMap; CallMap _callMap;
// Incoming requests not executed
ObjectPool<Request*> _requests; ObjectPool<Request*> _requests;
// Requests executed but waiting for a final response
std::map<std::string, Request*> _waitingRequests;
RequestFactory _factory; RequestFactory _factory;
ost::Mutex _mutex; ost::Mutex _mutex;
}; };
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment