NLS Engine  v0.1
The Next Logical Step in game engine design.
 All Classes Namespaces Files Functions Variables Enumerations Enumerator Defines
MessageRouter.cpp
Go to the documentation of this file.
00001 
00011 #include "MessageRouter.h"
00012 
00013 // Standard Includes
00014 #include <algorithm>
00015 
00016 // Library Includes
00017 #include <boost/foreach.hpp>
00018 
00019 // Local Includes
00020 #include "Envelope.h"
00021 
00022 // Static class member initialization
00023 
00024 // Class methods in the order they are defined within the class header
00025 
00026 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
00027 MessageRouter::MessageRouter( void ) : routing(true) {
00028   SubscriberSPTR sshutdown(new Subscriber(boost::bind(&MessageRouter::Shutdown, this, _1)));
00029   Subscribe(CORE_MESSAGE::SHUTDOWN, sshutdown);
00030   SubscriberSPTR squit(new Subscriber(boost::bind(&MessageRouter::Quit, this, _1)));
00031   Subscribe(CORE_MESSAGE::QUIT, squit);
00032 }
00033 
00034 MessageRouter::~MessageRouter( void ) {
00035   while (!this->backlog.empty()) {
00036     this->backlog.pop();
00037   }
00038 }
00039 
00040 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
00041 void MessageRouter::Subscribe(int id, SubscriberSPTR& subscriber) {
00042   Threading::WriteLock w_lock(this->subscriptionsMutex);
00043   
00044   this->subscriptions[id].insert(subscriber);
00045 }
00046 
00047 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
00048 void MessageRouter::Subscribe( std::vector<int>& ids, SubscriberSPTR& subscriber) {
00049   BOOST_FOREACH(int id, ids) {
00050     this->Subscribe(id, subscriber);
00051   }
00052 }
00053 
00054 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
00055 void MessageRouter::SubscribeDirected( int id, std::shared_ptr<DirectedSubscriber>& subscriber ) {
00056   Threading::WriteLock w_lock(this->directedSubscriptionMutex);
00057   
00058   this->directedSubscription[id] = subscriber;
00059 }
00060 
00061 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
00062 void MessageRouter::Unsubscribe( int id, SubscriberSPTR& subscriber ) {
00063   // We need to retrieve the obj_ptr from both functors in order to see if we have a match
00064   Subscriber func1 = (*subscriber.get());
00065   boost::detail::function::function_buffer functor = func1.functor;
00066   Subscriber func2;
00067   boost::detail::function::function_buffer functor2;
00068 
00069   Threading::WriteLock w_lock(this->subscriptionsMutex);
00070   
00071   SubscriptionsType::iterator subscriptions_it = this->subscriptions.find(id);
00072   if (subscriptions_it != this->subscriptions.end()) {
00073     for (SubscriberCollectionType::iterator subscribers_it = subscriptions_it->second.begin(); subscribers_it != subscriptions_it->second.end();) {
00074       SubscriberSPTR subscriber = *subscribers_it;
00075       ++subscribers_it;
00076       
00077       func2 = *subscriber;
00078       functor2 = func2.functor;
00079       if (functor.obj_ptr == functor2.obj_ptr) {
00080         this->subscriptions.at(id).erase(subscriber);
00081       }
00082     }
00083   }
00084 }
00085 
00086 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
00087 void MessageRouter::Unsubscribe( std::vector<int>& ids, SubscriberSPTR& subscriber ) {
00088   BOOST_FOREACH(int id, ids) {
00089     this->Unsubscribe(id, subscriber);
00090   }
00091 }
00092 
00093 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
00094 EnvelopeSPTR MessageRouter::SendDirected( EnvelopeSPTR envelope, int id ) const {
00095   Threading::ReadLock r_lock(this->directedSubscriptionMutex);
00096   
00097   DirectedSubscriptionsType::const_iterator it = this->directedSubscription.find(id);
00098   if (it != this->directedSubscription.end()) {
00099     std::shared_ptr<DirectedSubscriber> sub = (*it).second;
00100     return (*sub.get())(envelope);
00101   }
00102   
00103   return nullptr;
00104 }
00105 
00106 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
00107 void MessageRouter::Route( void ) {
00108   EnvelopeSPTR envelope;
00109   while (routing) {
00110     while (!this->backlog.empty()) {
00111       // Get the next Envelope from the queue and pop it from the queue
00112       {
00113         Threading::WriteLock w_lock(this->backlogMutex);
00114         
00115         envelope = this->backlog.front();
00116         this->backlog.pop();
00117       }
00118       
00119       // See if anyone is subscribed to the message id, get the functor, and call it
00120       this->SendSP(envelope, false);
00121     }
00122     Sleep(1);
00123   }
00124 }
00125 
00126 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
00127 void MessageRouter::Shutdown( EnvelopeSPTR envelope ) {
00128   Threading::WriteLock w_lock(this->backlogMutex);
00129   
00130   while (!this->backlog.empty()) {
00131     this->backlog.pop();
00132   }
00133 }
00134 
00135 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
00136 void MessageRouter::Quit( EnvelopeSPTR envelope ) {
00137   this->routing = false;
00138 }
00139 
00140 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
00141 void MessageRouter::SendSP( EnvelopeSPTR envelope, bool async /*= true*/ ) {
00142   if (async) {
00143     Threading::WriteLock w_lock(this->backlogMutex);
00144     
00145     this->backlog.push(envelope);
00146   }
00147   else {
00148     SubscriberCollectionType subscribers;
00149     
00150     {
00151       Threading::ReadLock r_lock(this->subscriptionsMutex);
00152       
00153       SubscriptionsType::iterator it = this->subscriptions.find(envelope->msgid);
00154       if (it != this->subscriptions.end()) {
00155         subscribers = (*it).second;
00156       }
00157     }
00158     
00159     BOOST_FOREACH(SubscriberSPTR subscriber, subscribers) {
00160       (*subscriber)(envelope);
00161     }
00162   }
00163 }