1 /** 2 * The semaphore module provides a general use semaphore for synchronization. 3 * 4 * Copyright: Copyright Sean Kelly 2005 - 2009. 5 * License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost License 1.0) 6 * Authors: Sean Kelly 7 * Source: $(DRUNTIMESRC core/sync/_semaphore.d) 8 */ 9 10 /* Copyright Sean Kelly 2005 - 2009. 11 * Distributed under the Boost Software License, Version 1.0. 12 * (See accompanying file LICENSE or copy at 13 * http://www.boost.org/LICENSE_1_0.txt) 14 */ 15 module core.sync.semaphore; 16 17 18 public import core.sync.exception; 19 public import core.time; 20 21 version (OSX) 22 version = Darwin; 23 else version (iOS) 24 version = Darwin; 25 else version (TVOS) 26 version = Darwin; 27 else version (WatchOS) 28 version = Darwin; 29 30 version (Windows) 31 { 32 import core.sys.windows.basetsd /+: HANDLE+/; 33 import core.sys.windows.winbase /+: CloseHandle, CreateSemaphoreA, INFINITE, 34 ReleaseSemaphore, WAIT_OBJECT_0, WaitForSingleObject+/; 35 import core.sys.windows.windef /+: BOOL, DWORD+/; 36 import core.sys.windows.winerror /+: WAIT_TIMEOUT+/; 37 } 38 else version (Darwin) 39 { 40 import core.sync.config; 41 import core.stdc.errno; 42 import core.sys.posix.time; 43 import core.sys.darwin.mach.semaphore; 44 } 45 else version (Posix) 46 { 47 import core.sync.config; 48 import core.stdc.errno; 49 import core.sys.posix.pthread; 50 import core.sys.posix.semaphore; 51 } 52 else 53 { 54 static assert(false, "Platform not supported"); 55 } 56 57 58 //////////////////////////////////////////////////////////////////////////////// 59 // Semaphore 60 // 61 // void wait(); 62 // void notify(); 63 // bool tryWait(); 64 //////////////////////////////////////////////////////////////////////////////// 65 66 67 /** 68 * This class represents a general counting semaphore as concieved by Edsger 69 * Dijkstra. As per Mesa type monitors however, "signal" has been replaced 70 * with "notify" to indicate that control is not transferred to the waiter when 71 * a notification is sent. 72 */ 73 class Semaphore 74 { 75 //////////////////////////////////////////////////////////////////////////// 76 // Initialization 77 //////////////////////////////////////////////////////////////////////////// 78 79 80 /** 81 * Initializes a semaphore object with the specified initial count. 82 * 83 * Params: 84 * count = The initial count for the semaphore. 85 * 86 * Throws: 87 * SyncError on error. 88 */ 89 this( uint count = 0 ) 90 { 91 version (Windows) 92 { 93 m_hndl = CreateSemaphoreA( null, count, int.max, null ); 94 if ( m_hndl == m_hndl.init ) 95 throw new SyncError( "Unable to create semaphore" ); 96 } 97 else version (Darwin) 98 { 99 auto rc = semaphore_create( mach_task_self(), &m_hndl, SYNC_POLICY_FIFO, count ); 100 if ( rc ) 101 throw new SyncError( "Unable to create semaphore" ); 102 } 103 else version (Posix) 104 { 105 int rc = sem_init( &m_hndl, 0, count ); 106 if ( rc ) 107 throw new SyncError( "Unable to create semaphore" ); 108 } 109 } 110 111 112 ~this() 113 { 114 version (Windows) 115 { 116 BOOL rc = CloseHandle( m_hndl ); 117 assert( rc, "Unable to destroy semaphore" ); 118 } 119 else version (Darwin) 120 { 121 auto rc = semaphore_destroy( mach_task_self(), m_hndl ); 122 assert( !rc, "Unable to destroy semaphore" ); 123 } 124 else version (Posix) 125 { 126 int rc = sem_destroy( &m_hndl ); 127 assert( !rc, "Unable to destroy semaphore" ); 128 } 129 } 130 131 132 //////////////////////////////////////////////////////////////////////////// 133 // General Actions 134 //////////////////////////////////////////////////////////////////////////// 135 136 137 /** 138 * Wait until the current count is above zero, then atomically decrement 139 * the count by one and return. 140 * 141 * Throws: 142 * SyncError on error. 143 */ 144 void wait() 145 { 146 version (Windows) 147 { 148 DWORD rc = WaitForSingleObject( m_hndl, INFINITE ); 149 if ( rc != WAIT_OBJECT_0 ) 150 throw new SyncError( "Unable to wait for semaphore" ); 151 } 152 else version (Darwin) 153 { 154 while ( true ) 155 { 156 auto rc = semaphore_wait( m_hndl ); 157 if ( !rc ) 158 return; 159 if ( rc == KERN_ABORTED && errno == EINTR ) 160 continue; 161 throw new SyncError( "Unable to wait for semaphore" ); 162 } 163 } 164 else version (Posix) 165 { 166 while ( true ) 167 { 168 if ( !sem_wait( &m_hndl ) ) 169 return; 170 if ( errno != EINTR ) 171 throw new SyncError( "Unable to wait for semaphore" ); 172 } 173 } 174 } 175 176 177 /** 178 * Suspends the calling thread until the current count moves above zero or 179 * until the supplied time period has elapsed. If the count moves above 180 * zero in this interval, then atomically decrement the count by one and 181 * return true. Otherwise, return false. 182 * 183 * Params: 184 * period = The time to wait. 185 * 186 * In: 187 * period must be non-negative. 188 * 189 * Throws: 190 * SyncError on error. 191 * 192 * Returns: 193 * true if notified before the timeout and false if not. 194 */ 195 bool wait( Duration period ) 196 in 197 { 198 assert( !period.isNegative ); 199 } 200 do 201 { 202 version (Windows) 203 { 204 auto maxWaitMillis = dur!("msecs")( uint.max - 1 ); 205 206 while ( period > maxWaitMillis ) 207 { 208 auto rc = WaitForSingleObject( m_hndl, cast(uint) 209 maxWaitMillis.total!"msecs" ); 210 switch ( rc ) 211 { 212 case WAIT_OBJECT_0: 213 return true; 214 case WAIT_TIMEOUT: 215 period -= maxWaitMillis; 216 continue; 217 default: 218 throw new SyncError( "Unable to wait for semaphore" ); 219 } 220 } 221 switch ( WaitForSingleObject( m_hndl, cast(uint) period.total!"msecs" ) ) 222 { 223 case WAIT_OBJECT_0: 224 return true; 225 case WAIT_TIMEOUT: 226 return false; 227 default: 228 throw new SyncError( "Unable to wait for semaphore" ); 229 } 230 } 231 else version (Darwin) 232 { 233 mach_timespec_t t = void; 234 (cast(byte*) &t)[0 .. t.sizeof] = 0; 235 236 if ( period.total!"seconds" > t.tv_sec.max ) 237 { 238 t.tv_sec = t.tv_sec.max; 239 t.tv_nsec = cast(typeof(t.tv_nsec)) period.split!("seconds", "nsecs")().nsecs; 240 } 241 else 242 period.split!("seconds", "nsecs")(t.tv_sec, t.tv_nsec); 243 while ( true ) 244 { 245 auto rc = semaphore_timedwait( m_hndl, t ); 246 if ( !rc ) 247 return true; 248 if ( rc == KERN_OPERATION_TIMED_OUT ) 249 return false; 250 if ( rc != KERN_ABORTED || errno != EINTR ) 251 throw new SyncError( "Unable to wait for semaphore" ); 252 } 253 } 254 else version (Posix) 255 { 256 import core.sys.posix.time : clock_gettime, CLOCK_REALTIME; 257 258 timespec t = void; 259 clock_gettime( CLOCK_REALTIME, &t ); 260 mvtspec( t, period ); 261 262 while ( true ) 263 { 264 if ( !sem_timedwait( &m_hndl, &t ) ) 265 return true; 266 if ( errno == ETIMEDOUT ) 267 return false; 268 if ( errno != EINTR ) 269 throw new SyncError( "Unable to wait for semaphore" ); 270 } 271 } 272 } 273 274 275 /** 276 * Atomically increment the current count by one. This will notify one 277 * waiter, if there are any in the queue. 278 * 279 * Throws: 280 * SyncError on error. 281 */ 282 void notify() 283 { 284 version (Windows) 285 { 286 if ( !ReleaseSemaphore( m_hndl, 1, null ) ) 287 throw new SyncError( "Unable to notify semaphore" ); 288 } 289 else version (Darwin) 290 { 291 auto rc = semaphore_signal( m_hndl ); 292 if ( rc ) 293 throw new SyncError( "Unable to notify semaphore" ); 294 } 295 else version (Posix) 296 { 297 int rc = sem_post( &m_hndl ); 298 if ( rc ) 299 throw new SyncError( "Unable to notify semaphore" ); 300 } 301 } 302 303 304 /** 305 * If the current count is equal to zero, return. Otherwise, atomically 306 * decrement the count by one and return true. 307 * 308 * Throws: 309 * SyncError on error. 310 * 311 * Returns: 312 * true if the count was above zero and false if not. 313 */ 314 bool tryWait() 315 { 316 version (Windows) 317 { 318 switch ( WaitForSingleObject( m_hndl, 0 ) ) 319 { 320 case WAIT_OBJECT_0: 321 return true; 322 case WAIT_TIMEOUT: 323 return false; 324 default: 325 throw new SyncError( "Unable to wait for semaphore" ); 326 } 327 } 328 else version (Darwin) 329 { 330 return wait( dur!"hnsecs"(0) ); 331 } 332 else version (Posix) 333 { 334 while ( true ) 335 { 336 if ( !sem_trywait( &m_hndl ) ) 337 return true; 338 if ( errno == EAGAIN ) 339 return false; 340 if ( errno != EINTR ) 341 throw new SyncError( "Unable to wait for semaphore" ); 342 } 343 } 344 } 345 346 347 protected: 348 349 /// Aliases the operating-system-specific semaphore type. 350 version (Windows) alias Handle = HANDLE; 351 /// ditto 352 else version (Darwin) alias Handle = semaphore_t; 353 /// ditto 354 else version (Posix) alias Handle = sem_t; 355 356 /// Handle to the system-specific semaphore. 357 Handle m_hndl; 358 } 359 360 361 //////////////////////////////////////////////////////////////////////////////// 362 // Unit Tests 363 //////////////////////////////////////////////////////////////////////////////// 364 365 unittest 366 { 367 import core.thread, core.atomic; 368 369 void testWait() 370 { 371 auto semaphore = new Semaphore; 372 shared bool stopConsumption = false; 373 immutable numToProduce = 20; 374 immutable numConsumers = 10; 375 shared size_t numConsumed; 376 shared size_t numComplete; 377 378 void consumer() 379 { 380 while (true) 381 { 382 semaphore.wait(); 383 384 if (atomicLoad(stopConsumption)) 385 break; 386 atomicOp!"+="(numConsumed, 1); 387 } 388 atomicOp!"+="(numComplete, 1); 389 } 390 391 void producer() 392 { 393 assert(!semaphore.tryWait()); 394 395 foreach (_; 0 .. numToProduce) 396 semaphore.notify(); 397 398 // wait until all items are consumed 399 while (atomicLoad(numConsumed) != numToProduce) 400 Thread.yield(); 401 402 // mark consumption as finished 403 atomicStore(stopConsumption, true); 404 405 // wake all consumers 406 foreach (_; 0 .. numConsumers) 407 semaphore.notify(); 408 409 // wait until all consumers completed 410 while (atomicLoad(numComplete) != numConsumers) 411 Thread.yield(); 412 413 assert(!semaphore.tryWait()); 414 semaphore.notify(); 415 assert(semaphore.tryWait()); 416 assert(!semaphore.tryWait()); 417 } 418 419 auto group = new ThreadGroup; 420 421 for ( int i = 0; i < numConsumers; ++i ) 422 group.create(&consumer); 423 group.create(&producer); 424 group.joinAll(); 425 } 426 427 428 void testWaitTimeout() 429 { 430 auto sem = new Semaphore; 431 shared bool semReady; 432 bool alertedOne, alertedTwo; 433 434 void waiter() 435 { 436 while (!atomicLoad(semReady)) 437 Thread.yield(); 438 alertedOne = sem.wait(dur!"msecs"(1)); 439 alertedTwo = sem.wait(dur!"msecs"(1)); 440 assert(alertedOne && !alertedTwo); 441 } 442 443 auto thread = new Thread(&waiter); 444 thread.start(); 445 446 sem.notify(); 447 atomicStore(semReady, true); 448 thread.join(); 449 assert(alertedOne && !alertedTwo); 450 } 451 452 testWait(); 453 testWaitTimeout(); 454 }