00001 //------------------------------------------------------------------------------ 00002 // Copyright (c) 2013 by European Organization for Nuclear Research (CERN) 00003 // Author: Lukasz Janyst <ljanyst@cern.ch> 00004 //------------------------------------------------------------------------------ 00005 // This file is part of the XRootD software suite. 00006 // 00007 // XRootD is free software: you can redistribute it and/or modify 00008 // it under the terms of the GNU Lesser General Public License as published by 00009 // the Free Software Foundation, either version 3 of the License, or 00010 // (at your option) any later version. 00011 // 00012 // XRootD is distributed in the hope that it will be useful, 00013 // but WITHOUT ANY WARRANTY; without even the implied warranty of 00014 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00015 // GNU General Public License for more details. 00016 // 00017 // You should have received a copy of the GNU Lesser General Public License 00018 // along with XRootD. If not, see <http://www.gnu.org/licenses/>. 00019 // 00020 // In applying this licence, CERN does not waive the privileges and immunities 00021 // granted to it by virtue of its status as an Intergovernmental Organization 00022 // or submit itself to any jurisdiction. 00023 //------------------------------------------------------------------------------ 00024 00025 #ifndef __XRD_SYS_LINUX_SEMAPHORE__ 00026 #define __XRD_SYS_LINUX_SEMAPHORE__ 00027 00028 #if defined(__linux__) && defined(HAVE_ATOMICS) 00029 00030 #include <pthread.h> 00031 #include <linux/futex.h> 00032 #include <sys/syscall.h> 00033 #include <unistd.h> 00034 #include <cerrno> 00035 #include <exception> 00036 #include <string> 00037 #include <cstdlib> 00038 00039 namespace XrdSys 00040 { 00041 //---------------------------------------------------------------------------- 00043 //---------------------------------------------------------------------------- 00044 class LinuxSemaphoreError: public std::exception 00045 { 00046 public: 00047 LinuxSemaphoreError( const std::string &error ): pError( error ) {} 00048 virtual ~LinuxSemaphoreError() throw() {}; 00049 00050 virtual const char *what() const throw() 00051 { 00052 return pError.c_str(); 00053 } 00054 00055 private: 00056 std::string pError; 00057 }; 00058 00059 //---------------------------------------------------------------------------- 00077 //---------------------------------------------------------------------------- 00078 class LinuxSemaphore 00079 { 00080 public: 00081 //------------------------------------------------------------------------ 00085 //------------------------------------------------------------------------ 00086 inline int CondWait() 00087 { 00088 int value = 0; 00089 int val = 0; 00090 int waiters = 0; 00091 int newVal = 0; 00092 00093 //---------------------------------------------------------------------- 00094 // We get the value of the semaphore try to atomically decrement it if 00095 // it's larger than 0. 00096 //---------------------------------------------------------------------- 00097 while( 1 ) 00098 { 00099 Unpack( pValue, value, val, waiters ); 00100 if( val == 0 ) 00101 return 0; 00102 newVal = Pack( --val, waiters ); 00103 if( __sync_bool_compare_and_swap( pValue, value, newVal ) ) 00104 return 1; 00105 } 00106 } 00107 00108 //------------------------------------------------------------------------ 00113 //------------------------------------------------------------------------ 00114 inline void Wait() 00115 { 00116 //---------------------------------------------------------------------- 00117 // Examine the state of the semaphore and atomically decrement it if 00118 // possible. If CondWait fails, it means that the semaphore value was 0. 00119 // In this case we atomically bump the number of waiters and go to sleep 00120 //---------------------------------------------------------------------- 00121 while( !CondWait() ) 00122 { 00123 int value = 0; 00124 int val = 0; 00125 int waiters = 0; 00126 int cancelType = 0; 00127 00128 Unpack( pValue, value, val, waiters ); 00129 00130 //-------------------------------------------------------------------- 00131 // We need to make sure again that the value of the semaphore is 0 00132 // because we fetched it again (first time was in CondWait()) and 00133 // it may have changed in the mean time. 00134 //-------------------------------------------------------------------- 00135 if( val != 0 ) 00136 continue; 00137 00138 if( waiters == WaitersMask ) 00139 throw LinuxSemaphoreError( "Reached maximum number of waiters" ); 00140 00141 int newVal = Pack( val, ++waiters ); 00142 00143 //-------------------------------------------------------------------- 00144 // We have bumped the number of waiters successfuly if neither the 00145 // semaphore value nor the number of waiters changed in the mean time. 00146 // We can safely go to sleep. 00147 // 00148 // Once the number of waiters was bumped we cannot get cancelled 00149 // without decrementing it. 00150 //-------------------------------------------------------------------- 00151 pthread_setcanceltype( PTHREAD_CANCEL_DEFERRED, &cancelType ); 00152 if( __sync_bool_compare_and_swap( pValue, value, newVal ) ) 00153 { 00154 while( 1 ) 00155 { 00156 int r = 0; 00157 00158 pthread_cleanup_push( Cleanup, pValue ); 00159 pthread_setcanceltype( PTHREAD_CANCEL_ASYNCHRONOUS, 0 ); 00160 00161 r = syscall( SYS_futex, pValue, FUTEX_WAIT, newVal, 0, 0, 0 ); 00162 00163 pthread_setcanceltype( PTHREAD_CANCEL_DEFERRED, 0 ); 00164 pthread_cleanup_pop( 0 ); 00165 00166 if( r == 0 ) // we've been woken up 00167 break; 00168 00169 if( errno == EINTR ) // interrupt 00170 continue; 00171 00172 if( errno == EWOULDBLOCK ) // futex value changed 00173 break; 00174 00175 throw LinuxSemaphoreError( "FUTEX_WAIT syscall error" ); 00176 } 00177 00178 //------------------------------------------------------------------ 00179 // We have been woken up, so we need to decrement the number of 00180 // waiters 00181 //------------------------------------------------------------------ 00182 do 00183 { 00184 Unpack( pValue, value, val, waiters ); 00185 newVal = Pack( val, --waiters ); 00186 } 00187 while( !__sync_bool_compare_and_swap( pValue, value, newVal ) ); 00188 } 00189 00190 //-------------------------------------------------------------------- 00191 // We are here if: 00192 // 1) we were unable to increase the number of waiters bacause the 00193 // atomic changed in the mean time in another execution thread 00194 // 2) *pValue != newVal upon futex call, this indicates the state 00195 // change in another thread 00196 // 3) we have been woken up by another thread 00197 // 00198 // In either of the above cases we need to re-examine the atomic and 00199 // decide whether we need to sleep or are free to proceed 00200 //-------------------------------------------------------------------- 00201 pthread_setcanceltype( cancelType, 0 ); 00202 } 00203 } 00204 00205 //------------------------------------------------------------------------ 00210 //------------------------------------------------------------------------ 00211 inline void Post() 00212 { 00213 int value = 0; 00214 int val = 0; 00215 int waiters = 0; 00216 int newVal = 0; 00217 00218 //---------------------------------------------------------------------- 00219 // We atomically increment the value of the semaphore and wake one of 00220 // the threads that was waiting for the semaphore value to change 00221 //---------------------------------------------------------------------- 00222 while( 1 ) 00223 { 00224 Unpack( pValue, value, val, waiters ); 00225 00226 if( val == ValueMask ) 00227 throw LinuxSemaphoreError( "Reached maximum value" ); 00228 00229 newVal = Pack( ++val, waiters ); 00230 if( __sync_bool_compare_and_swap( pValue, value, newVal ) ) 00231 { 00232 if( waiters ) 00233 syscall( SYS_futex, pValue, FUTEX_WAKE, 1, 0, 0, 0 ); 00234 return; 00235 } 00236 } 00237 } 00238 00239 //------------------------------------------------------------------------ 00241 //------------------------------------------------------------------------ 00242 int GetValue() const 00243 { 00244 int value = __sync_fetch_and_add( pValue, 0 ); 00245 return value & ValueMask; 00246 } 00247 00248 //------------------------------------------------------------------------ 00252 //------------------------------------------------------------------------ 00253 LinuxSemaphore( int value ) 00254 { 00255 pValue = (int *)malloc(sizeof(int)); 00256 *pValue = (value & ValueMask); 00257 } 00258 00259 //------------------------------------------------------------------------ 00261 //------------------------------------------------------------------------ 00262 ~LinuxSemaphore() 00263 { 00264 free( pValue ); 00265 } 00266 00267 private: 00268 static const int ValueMask = 0x000fffff; 00269 static const int WaitersOffset = 20; 00270 static const int WaitersMask = 0x00000fff; 00271 00272 //------------------------------------------------------------------------ 00273 // Unpack the semaphore value 00274 //------------------------------------------------------------------------ 00275 static inline void Unpack( int *sourcePtr, 00276 int &source, 00277 int &value, 00278 int &nwaiters ) 00279 { 00280 source = __sync_fetch_and_add( sourcePtr, 0 ); 00281 value = source & ValueMask; 00282 nwaiters = (source >> WaitersOffset) & WaitersMask; 00283 } 00284 00285 //------------------------------------------------------------------------ 00286 // Pack the semaphore value 00287 //------------------------------------------------------------------------ 00288 static inline int Pack( int value, int nwaiters ) 00289 { 00290 return (nwaiters << WaitersOffset) | (value & ValueMask); 00291 } 00292 00293 //------------------------------------------------------------------------ 00294 // Cancellation cleaner 00295 //------------------------------------------------------------------------ 00296 static void Cleanup( void *param ) 00297 { 00298 int *iParam = (int*)param; 00299 int value = 0; 00300 int val = 0; 00301 int waiters = 0; 00302 int newVal = 0; 00303 00304 do 00305 { 00306 Unpack( iParam, value, val, waiters ); 00307 newVal = Pack( val, --waiters ); 00308 } 00309 while( !__sync_bool_compare_and_swap( iParam, value, newVal ) ); 00310 } 00311 00312 int *pValue; 00313 }; 00314 }; 00315 00316 #endif // __linux__ && HAVE_ATOMICS 00317 00318 #endif // __XRD_SYS_LINUX_SEMAPHORE__