Skip to content
Snippets Groups Projects
Commit d93a8635 authored by jpbl's avatar jpbl
Browse files

*** empty log message ***

parent 61b39686
Branches
No related tags found
No related merge requests found
...@@ -38,7 +38,7 @@ class ObjectPool ...@@ -38,7 +38,7 @@ class ObjectPool
/** /**
* This function will wait for an available line. * This function will wait for an available line.
*/ */
T pop(); bool pop(T &value, unsigned long time = ULONG_MAX);
private: private:
std::list< T > mPool; std::list< T > mPool;
......
...@@ -31,17 +31,21 @@ ObjectPool< T >::push(const T &value) ...@@ -31,17 +31,21 @@ ObjectPool< T >::push(const T &value)
} }
template< typename T > template< typename T >
T bool
ObjectPool< T >::pop() ObjectPool< T >::pop(T &value, unsigned long time)
{ {
QMutexLocker guard(&mMutex); QMutexLocker guard(&mMutex);
while(mPool.begin() == mPool.end()) { mDataAvailable.wait(guard.mutex(), time);
mDataAvailable.wait(guard.mutex());
}
if(mPool.begin() == mPool.end()) {
return false;
}
else {
typename std::list< T >::iterator pos = mPool.begin(); typename std::list< T >::iterator pos = mPool.begin();
mPool.pop_front(); mPool.pop_front();
return (*pos); value = (*pos);
return true;
}
} }
#endif #endif
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
#include <stdexcept> #include <stdexcept>
#include <sstream> #include <sstream>
#include "global.h"
#include "requesterimpl.h" #include "requesterimpl.h"
#include "sessionio.h" #include "sessionio.h"
#include "answerreceiver.h" #include "answerreceiver.h"
...@@ -171,3 +172,21 @@ RequesterImpl::generateSequenceId() ...@@ -171,3 +172,21 @@ RequesterImpl::generateSequenceId()
return id.str(); return id.str();
} }
void
RequesterImpl::inputIsDown(const std::string &sessionId)
{
std::map< std::string, SessionIO * >::iterator pos;
pos = mSessions.find(sessionId);
if(pos == mSessions.end()) {
// we will not thow an exception, but this is
// a logic error
_debug("SessionIO input for session %s is down, but we don't have that session.\n",
sessionId.c_str());
}
else {
// If we no longer can receive, it means it's
// not possible to receive answer for new requests,
// so we close the session.
pos->second->stop();
}
}
...@@ -79,6 +79,24 @@ class RequesterImpl ...@@ -79,6 +79,24 @@ class RequesterImpl
*/ */
void registerSession(const std::string &id, SessionIO *io); void registerSession(const std::string &id, SessionIO *io);
/**
* This function is used to notify that the SessionIO
* input of a session is down. It means that we no longer
* can receive answers.
*
* Note: Only SessionIO related classes should call this function.
*/
void inputIsDown(const std::string &sessionId);
/**
* This function is used to notify that the SessionIO
* output of a session is down. It means that we no longer
* can send requests.
*
* Note: Only SessionIO related classes should call this function.
*/
void outputIsDown(const std::string &sessionId);
private: private:
/** /**
......
...@@ -32,7 +32,7 @@ Session::Session(const std::string &id) ...@@ -32,7 +32,7 @@ Session::Session(const std::string &id)
Session::Session() Session::Session()
{ {
mId = Requester::instance().generateSessionId(); mId = Requester::instance().generateSessionId();
SessionIO *s = new SessionIO(&std::cin, &std::cout); SessionIO *s = new SessionIO(mId, &std::cin, &std::cout);
Requester::instance().registerSession(mId, s); Requester::instance().registerSession(mId, s);
} }
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
*/ */
#include "global.h" #include "global.h"
#include "requester.h"
#include "sessionio.h" #include "sessionio.h"
InputStreamer::InputStreamer(SessionIO *sessionIO) InputStreamer::InputStreamer(SessionIO *sessionIO)
...@@ -33,6 +34,7 @@ InputStreamer::run() ...@@ -33,6 +34,7 @@ InputStreamer::run()
} }
_debug("The Session input is down.\n"); _debug("The Session input is down.\n");
Requester::instance().inputIsDown(mSessionIO->id);
} }
OutputStreamer::OutputStreamer(SessionIO *sessionIO) OutputStreamer::OutputStreamer(SessionIO *sessionIO)
...@@ -49,8 +51,11 @@ OutputStreamer::run() ...@@ -49,8 +51,11 @@ OutputStreamer::run()
_debug("The Session output is down.\n"); _debug("The Session output is down.\n");
} }
SessionIO::SessionIO(std::istream *input, std::ostream *output) SessionIO::SessionIO(const std::string &sessionId,
: mIsUp(false) std::istream *input,
std::ostream *output)
: id(sessionId)
, mIsUp(false)
, mInput(input) , mInput(input)
, mOutput(output) , mOutput(output)
, mInputStreamer(this) , mInputStreamer(this)
...@@ -60,6 +65,7 @@ SessionIO::SessionIO(std::istream *input, std::ostream *output) ...@@ -60,6 +65,7 @@ SessionIO::SessionIO(std::istream *input, std::ostream *output)
SessionIO::~SessionIO() SessionIO::~SessionIO()
{ {
stop(); stop();
waitStop();
} }
bool bool
...@@ -75,6 +81,7 @@ void ...@@ -75,6 +81,7 @@ void
SessionIO::start() SessionIO::start()
{ {
stop(); stop();
waitStop();
//just protecting the mutex //just protecting the mutex
{ {
QMutexLocker guard(&mMutex); QMutexLocker guard(&mMutex);
...@@ -90,7 +97,11 @@ SessionIO::stop() ...@@ -90,7 +97,11 @@ SessionIO::stop()
mMutex.lock(); mMutex.lock();
mIsUp = false; mIsUp = false;
mMutex.unlock(); mMutex.unlock();
}
void
SessionIO::waitStop()
{
mInputStreamer.wait(); mInputStreamer.wait();
mOutputStreamer.wait(); mOutputStreamer.wait();
} }
...@@ -104,7 +115,7 @@ SessionIO::send(const std::string &request) ...@@ -104,7 +115,7 @@ SessionIO::send(const std::string &request)
void void
SessionIO::receive(std::string &answer) SessionIO::receive(std::string &answer)
{ {
answer = mInputPool.pop(); mInputPool.pop(answer);
} }
void void
...@@ -116,10 +127,13 @@ SessionIO::send() ...@@ -116,10 +127,13 @@ SessionIO::send()
mMutex.unlock(); mMutex.unlock();
} }
else { else {
(*mOutput) << mOutputPool.pop(); std::string output;
if(mOutputPool.pop(output, 100)) {
(*mOutput) << output;
mOutput->flush(); mOutput->flush();
} }
} }
}
void void
SessionIO::receive() SessionIO::receive()
......
...@@ -68,7 +68,6 @@ class OutputStreamer : public QThread ...@@ -68,7 +68,6 @@ class OutputStreamer : public QThread
SessionIO *mSessionIO; SessionIO *mSessionIO;
}; };
/** /**
* This is the main class that will handle * This is the main class that will handle
* the IO. * the IO.
...@@ -82,7 +81,9 @@ class SessionIO ...@@ -82,7 +81,9 @@ class SessionIO
/** /**
* Those streams will be the streams read or write to. * Those streams will be the streams read or write to.
*/ */
SessionIO(std::istream *input, std::ostream *output); SessionIO(const std::string &id,
std::istream *input,
std::ostream *output);
~SessionIO(); ~SessionIO();
/** /**
...@@ -93,10 +94,18 @@ class SessionIO ...@@ -93,10 +94,18 @@ class SessionIO
/** /**
* This function will stop the streaming * This function will stop the streaming
* processing. * processing. On return, the service
* might not be completly down. You need
* to use waitStop if you want to be sure.
*/ */
void stop(); void stop();
/**
* This function will wait until the
* service is really down.
*/
void waitStop();
/** /**
* You can use this function for sending request. * You can use this function for sending request.
* The sending is non-blocking. This function will * The sending is non-blocking. This function will
...@@ -128,6 +137,9 @@ class SessionIO ...@@ -128,6 +137,9 @@ class SessionIO
*/ */
void receive(); void receive();
public:
const std::string id;
private: private:
QMutex mMutex; QMutex mMutex;
bool mIsUp; bool mIsUp;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment