include/dmlite/cpp/utils/poolcontainer.h

Go to the documentation of this file.
00001 /// @file    include/dmlite/cpp/utils/poolcontainer.h
00002 /// @brief   Pooling
00003 /// @author  Alejandro Álvarez Ayllón <aalvarez@cern.ch>
00004 #ifndef DMLITE_CPP_UTILS_POOLCONTAINER_H
00005 #define DMLITE_CPP_UTILS_POOLCONTAINER_H
00006 
00007 #include <boost/thread/mutex.hpp>
00008 #include <boost/thread/condition.hpp>
00009 #include <boost/date_time/posix_time/posix_time.hpp>
00010 #include <map>
00011 #include <syslog.h>
00012 #include <queue>
00013 #include "../exceptions.h"
00014 
00015 namespace dmlite {
00016 
00017   /// Classes implementing this interface creates the actual element
00018   /// since the pool is agnosstic
00019   template <class E>
00020   class PoolElementFactory {
00021    public:
00022     /// Destructor
00023     virtual ~PoolElementFactory() {};
00024 
00025     /// Creates an element
00026     virtual E create() = 0;
00027 
00028     /// Destroys an element
00029     virtual void destroy(E) = 0;
00030 
00031     /// Check it is still valid
00032     virtual bool isValid(E) = 0;
00033   };
00034 
00035 
00036   /// Implements a pool of whichever resource
00037   template <class E>
00038   class PoolContainer {
00039    public:
00040     /// Constructor
00041     /// @param factory The factory to use when spawning a new resource.
00042     /// @param n       The number of resources to keep in the pool. Up to 2*n slots can be created without penalty (but only n will be pooled)
00043     PoolContainer(PoolElementFactory<E>* factory, int n): max_(n), factory_(factory), freeSlots_(2*n)
00044     {
00045     }
00046 
00047     /// Destructor
00048     ~PoolContainer()
00049     {
00050       boost::mutex::scoped_lock lock(mutex_);
00051       // Free 'free'
00052       while (free_.size() > 0) {
00053         E e = free_.front();
00054         free_.pop_front();
00055         factory_->destroy(e);
00056       }
00057       // Freeing used is dangerous, as we might block if the client code
00058       // forgot about something. Assume the memory leak :(
00059       if (used_.size() > 0) {
00060         syslog(LOG_USER | LOG_WARNING, "%ld used elements from a pool not released on destruction!", (long)used_.size());
00061       }
00062     }
00063 
00064     /// Acquires a free resource.
00065     E  acquire(bool block = true)
00066     {
00067       
00068       E e;
00069       boost::mutex::scoped_lock lock(mutex_);
00070       
00071       // Wait for one free
00072       if (!block && (freeSlots_ == 0)) {
00073         throw DmException(DMLITE_SYSERR(EBUSY),
00074                           std::string("No resources available"));
00075       }
00076 
00077 
00078       boost::system_time const timeout = boost::get_system_time() + boost::posix_time::seconds(60);
00079       
00080       while (freeSlots_ < 1) {
00081         if (boost::get_system_time() >= timeout) {
00082            syslog(LOG_USER | LOG_WARNING, "Timeout...%d seconds", 60);
00083            break;
00084         }
00085         available_.timed_wait(lock, timeout);
00086       }
00087 
00088       // If there is any in the queue, give one from there
00089       if (free_.size() > 0) {
00090         e = free_.front();
00091         free_.pop_front();
00092         // May have expired!
00093         if (!factory_->isValid(e)) {
00094           factory_->destroy(e);
00095           e = factory_->create();
00096         }
00097       }
00098       else {
00099         // None created, so create it now
00100         e = factory_->create();
00101       }
00102       // Keep track of used
00103       used_.insert(std::pair<E, unsigned>(e, 1));
00104 
00105       // Note that in case of timeout freeSlots_ can become negative
00106       --freeSlots_;
00107 
00108       return e;
00109     }
00110 
00111     /// Increases the reference count of a resource.
00112     E acquire(E e)
00113     {
00114       boost::mutex::scoped_lock lock(mutex_);
00115 
00116       // Make sure it is there
00117       typename std::map<E, unsigned>::const_iterator i = used_.find(e);
00118       if (i == used_.end()) {
00119         throw DmException(DMLITE_SYSERR(EINVAL), std::string("The resource has not been locked previously!"));
00120       }
00121 
00122       // Increase
00123       used_[e]++;
00124 
00125       // End
00126       return e;
00127     }
00128 
00129     /// Releases a resource
00130     /// @param e The resource to release.
00131     /// @return  The reference count after releasing.
00132     unsigned release(E e)
00133     {
00134       boost::mutex::scoped_lock lock(mutex_);
00135       // Decrease reference count
00136       unsigned remaining = --used_[e];
00137       // No one else using it (hopefully...)
00138       if (used_[e] == 0) {
00139         // Remove from used
00140         used_.erase(e);
00141         // If the free size is less than the maximum, push to free and notify
00142         if ((long)free_.size() < max_) {
00143           free_.push_back(e);
00144         }
00145         else {
00146           // If we are fine, destroy
00147           factory_->destroy(e);
00148         }
00149       }
00150       available_.notify_one();
00151       ++freeSlots_;
00152 
00153       return remaining;
00154     }
00155 
00156     /// Count the number of instances
00157     unsigned refCount(E e)
00158     {
00159       typename std::map<E, unsigned>::const_iterator i = used_.find(e);
00160       if (i == used_.end())
00161         return 0;
00162       return used_[e];
00163     }
00164 
00165     /// Change the pool size
00166     /// @param ns The new size.
00167     void resize(int ns)
00168     {
00169       // The resizing will be done as we get requests
00170       boost::mutex::scoped_lock lock(mutex_);
00171       max_ = ns;
00172 
00173 
00174       freeSlots_ = 2*max_ - used_.size();
00175       // Increment the semaphore size if needed
00176       // Take into account the used
00177       if (freeSlots_ > 0)
00178         available_.notify_all();
00179     }
00180 
00181    private:
00182     // The max count of pooled instances
00183     int max_;
00184 
00185     PoolElementFactory<E> *factory_;
00186 
00187     std::deque<E>         free_;
00188     std::map<E, unsigned> used_;
00189     unsigned freeSlots_;
00190 
00191     boost::mutex              mutex_;
00192     boost::condition_variable available_;
00193   };
00194 
00195   /// Convenience class that releases a resource on destruction
00196   template <class E>
00197   class PoolGrabber {
00198    public:
00199     PoolGrabber(PoolContainer<E>& pool, bool block = true): pool_(pool)
00200     {
00201       element_ = pool_.acquire(block);
00202     }
00203 
00204     ~PoolGrabber() {
00205       pool_.release(element_);
00206     }
00207 
00208     operator E ()
00209     {
00210       return element_;
00211     }
00212 
00213    private:
00214     PoolContainer<E>& pool_;
00215     E element_;
00216   };
00217 };
00218 
00219 #endif // DMLITE_CPP_UTILS_POOLCONTAINER_H

Generated on 18 Nov 2014 for dmlite by  doxygen 1.4.7