1 /** 2 * The condition module provides a primitive for synchronized condition 3 * checking. 4 * 5 * Copyright: Copyright Sean Kelly 2005 - 2009. 6 * License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost License 1.0) 7 * Authors: Sean Kelly 8 * Source: $(DRUNTIMESRC core/sync/_condition.d) 9 */ 10 11 /* Copyright Sean Kelly 2005 - 2009. 12 * Distributed under the Boost Software License, Version 1.0. 13 * (See accompanying file LICENSE or copy at 14 * http://www.boost.org/LICENSE_1_0.txt) 15 */ 16 module core.sync.condition; 17 18 19 public import core.sync.exception; 20 public import core.sync.mutex; 21 public import core.time; 22 23 import core.exception : AssertError, staticError; 24 25 26 version (Windows) 27 { 28 import core.sync.semaphore; 29 import core.sys.windows.basetsd /+: HANDLE+/; 30 import core.sys.windows.winbase /+: CloseHandle, CreateSemaphoreA, CRITICAL_SECTION, 31 DeleteCriticalSection, EnterCriticalSection, INFINITE, InitializeCriticalSection, 32 LeaveCriticalSection, ReleaseSemaphore, WAIT_OBJECT_0, WaitForSingleObject+/; 33 import core.sys.windows.windef /+: BOOL, DWORD+/; 34 import core.sys.windows.winerror /+: WAIT_TIMEOUT+/; 35 } 36 else version (Posix) 37 { 38 import core.sync.config; 39 import core.stdc.errno; 40 import core.sys.posix.pthread; 41 import core.sys.posix.time; 42 } 43 else 44 { 45 static assert(false, "Platform not supported"); 46 } 47 48 49 //////////////////////////////////////////////////////////////////////////////// 50 // Condition 51 // 52 // void wait(); 53 // void notify(); 54 // void notifyAll(); 55 //////////////////////////////////////////////////////////////////////////////// 56 57 /** 58 * This class represents a condition variable as conceived by C.A.R. Hoare. As 59 * per Mesa type monitors however, "signal" has been replaced with "notify" to 60 * indicate that control is not transferred to the waiter when a notification 61 * is sent. 62 */ 63 class Condition 64 { 65 //////////////////////////////////////////////////////////////////////////// 66 // Initialization 67 //////////////////////////////////////////////////////////////////////////// 68 69 /** 70 * Initializes a condition object which is associated with the supplied 71 * mutex object. 72 * 73 * Params: 74 * m = The mutex with which this condition will be associated. 75 * 76 * Throws: 77 * SyncError on error. 78 */ 79 this( Mutex m ) nothrow @safe @nogc 80 { 81 this(m, true); 82 } 83 84 /// ditto 85 this( shared Mutex m ) shared nothrow @safe @nogc 86 { 87 import core.atomic : atomicLoad; 88 this(atomicLoad(m), true); 89 } 90 91 // 92 private this(this Q, M)( M m, bool _unused_ ) nothrow @trusted @nogc 93 if ((is(Q == Condition) && is(M == Mutex)) || 94 (is(Q == shared Condition) && is(M == shared Mutex))) 95 { 96 version (Windows) 97 { 98 static if (is(Q == Condition)) 99 { 100 alias HANDLE_TYPE = void*; 101 } 102 else 103 { 104 alias HANDLE_TYPE = shared(void*); 105 } 106 m_blockLock = cast(HANDLE_TYPE) CreateSemaphoreA( null, 1, 1, null ); 107 if ( m_blockLock == m_blockLock.init ) 108 throw staticError!AssertError("Unable to initialize condition", __FILE__, __LINE__); 109 scope(failure) CloseHandle( cast(void*) m_blockLock ); 110 111 m_blockQueue = cast(HANDLE_TYPE) CreateSemaphoreA( null, 0, int.max, null ); 112 if ( m_blockQueue == m_blockQueue.init ) 113 throw staticError!AssertError("Unable to initialize condition", __FILE__, __LINE__); 114 scope(failure) CloseHandle( cast(void*) m_blockQueue ); 115 116 InitializeCriticalSection( cast(RTL_CRITICAL_SECTION*) &m_unblockLock ); 117 m_assocMutex = m; 118 } 119 else version (Posix) 120 { 121 static if (is(Q == shared)) 122 { 123 import core.atomic : atomicLoad; 124 m_assocMutex = atomicLoad(m); 125 } 126 else 127 { 128 m_assocMutex = m; 129 } 130 static if ( is( typeof( pthread_condattr_setclock ) ) ) 131 { 132 () @trusted 133 { 134 pthread_condattr_t attr = void; 135 int rc = pthread_condattr_init( &attr ); 136 if ( rc ) 137 throw staticError!AssertError("Unable to initialize condition", __FILE__, __LINE__); 138 rc = pthread_condattr_setclock( &attr, CLOCK_MONOTONIC ); 139 if ( rc ) 140 throw staticError!AssertError("Unable to initialize condition", __FILE__, __LINE__); 141 rc = pthread_cond_init( cast(pthread_cond_t*) &m_hndl, &attr ); 142 if ( rc ) 143 throw staticError!AssertError("Unable to initialize condition", __FILE__, __LINE__); 144 rc = pthread_condattr_destroy( &attr ); 145 if ( rc ) 146 throw staticError!AssertError("Unable to initialize condition", __FILE__, __LINE__); 147 } (); 148 } 149 else 150 { 151 int rc = pthread_cond_init( cast(pthread_cond_t*) &m_hndl, null ); 152 if ( rc ) 153 throw staticError!AssertError("Unable to initialize condition", __FILE__, __LINE__); 154 } 155 } 156 } 157 158 ~this() @nogc 159 { 160 version (Windows) 161 { 162 BOOL rc = CloseHandle( m_blockLock ); 163 assert( rc, "Unable to destroy condition" ); 164 rc = CloseHandle( m_blockQueue ); 165 assert( rc, "Unable to destroy condition" ); 166 DeleteCriticalSection( &m_unblockLock ); 167 } 168 else version (Posix) 169 { 170 int rc = pthread_cond_destroy( &m_hndl ); 171 assert( !rc, "Unable to destroy condition" ); 172 } 173 } 174 175 176 //////////////////////////////////////////////////////////////////////////// 177 // General Properties 178 //////////////////////////////////////////////////////////////////////////// 179 180 181 /** 182 * Gets the mutex associated with this condition. 183 * 184 * Returns: 185 * The mutex associated with this condition. 186 */ 187 @property Mutex mutex() 188 { 189 return m_assocMutex; 190 } 191 192 /// ditto 193 @property shared(Mutex) mutex() shared 194 { 195 import core.atomic : atomicLoad; 196 return atomicLoad(m_assocMutex); 197 } 198 199 // undocumented function for internal use 200 final @property Mutex mutex_nothrow() pure nothrow @safe @nogc 201 { 202 return m_assocMutex; 203 } 204 205 // ditto 206 final @property shared(Mutex) mutex_nothrow() shared pure nothrow @safe @nogc 207 { 208 import core.atomic : atomicLoad; 209 return atomicLoad(m_assocMutex); 210 } 211 212 //////////////////////////////////////////////////////////////////////////// 213 // General Actions 214 //////////////////////////////////////////////////////////////////////////// 215 216 217 /** 218 * Wait until notified. 219 * 220 * Throws: 221 * SyncError on error. 222 */ 223 void wait() 224 { 225 wait!(typeof(this))(true); 226 } 227 228 /// ditto 229 void wait() shared 230 { 231 wait!(typeof(this))(true); 232 } 233 234 /// ditto 235 void wait(this Q)( bool _unused_ ) 236 if (is(Q == Condition) || is(Q == shared Condition)) 237 { 238 version (Windows) 239 { 240 timedWait( INFINITE ); 241 } 242 else version (Posix) 243 { 244 int rc = pthread_cond_wait( cast(pthread_cond_t*) &m_hndl, (cast(Mutex) m_assocMutex).handleAddr() ); 245 if ( rc ) 246 throw staticError!AssertError("Unable to wait for condition", __FILE__, __LINE__); 247 } 248 } 249 250 /** 251 * Suspends the calling thread until a notification occurs or until the 252 * supplied time period has elapsed. 253 * 254 * Params: 255 * val = The time to wait. 256 * 257 * In: 258 * val must be non-negative. 259 * 260 * Throws: 261 * SyncError on error. 262 * 263 * Returns: 264 * true if notified before the timeout and false if not. 265 */ 266 bool wait( Duration val ) 267 { 268 return wait!(typeof(this))(val, true); 269 } 270 271 /// ditto 272 bool wait( Duration val ) shared 273 { 274 return wait!(typeof(this))(val, true); 275 } 276 277 /// ditto 278 bool wait(this Q)( Duration val, bool _unused_ ) 279 if (is(Q == Condition) || is(Q == shared Condition)) 280 in 281 { 282 assert( !val.isNegative ); 283 } 284 do 285 { 286 version (Windows) 287 { 288 auto maxWaitMillis = dur!("msecs")( uint.max - 1 ); 289 290 while ( val > maxWaitMillis ) 291 { 292 if ( timedWait( cast(uint) 293 maxWaitMillis.total!"msecs" ) ) 294 return true; 295 val -= maxWaitMillis; 296 } 297 return timedWait( cast(uint) val.total!"msecs" ); 298 } 299 else version (Posix) 300 { 301 timespec t = void; 302 mktspec( t, val ); 303 304 int rc = pthread_cond_timedwait( cast(pthread_cond_t*) &m_hndl, 305 (cast(Mutex) m_assocMutex).handleAddr(), 306 &t ); 307 if ( !rc ) 308 return true; 309 if ( rc == ETIMEDOUT ) 310 return false; 311 throw staticError!AssertError("Unable to wait for condition", __FILE__, __LINE__); 312 } 313 } 314 315 /** 316 * Notifies one waiter. 317 * 318 * Throws: 319 * SyncError on error. 320 */ 321 void notify() 322 { 323 notify!(typeof(this))(true); 324 } 325 326 /// ditto 327 void notify() shared 328 { 329 notify!(typeof(this))(true); 330 } 331 332 /// ditto 333 void notify(this Q)( bool _unused_ ) 334 if (is(Q == Condition) || is(Q == shared Condition)) 335 { 336 version (Windows) 337 { 338 notify_( false ); 339 } 340 else version (Posix) 341 { 342 // Since OS X 10.7 (Lion), pthread_cond_signal returns EAGAIN after retrying 8192 times, 343 // so need to retrying while it returns EAGAIN. 344 // 345 // 10.7.0 (Lion): http://www.opensource.apple.com/source/Libc/Libc-763.11/pthreads/pthread_cond.c 346 // 10.8.0 (Mountain Lion): http://www.opensource.apple.com/source/Libc/Libc-825.24/pthreads/pthread_cond.c 347 // 10.10.0 (Yosemite): http://www.opensource.apple.com/source/libpthread/libpthread-105.1.4/src/pthread_cond.c 348 // 10.11.0 (El Capitan): http://www.opensource.apple.com/source/libpthread/libpthread-137.1.1/src/pthread_cond.c 349 // 10.12.0 (Sierra): http://www.opensource.apple.com/source/libpthread/libpthread-218.1.3/src/pthread_cond.c 350 // 10.13.0 (High Sierra): http://www.opensource.apple.com/source/libpthread/libpthread-301.1.6/src/pthread_cond.c 351 // 10.14.0 (Mojave): http://www.opensource.apple.com/source/libpthread/libpthread-330.201.1/src/pthread_cond.c 352 // 10.14.1 (Mojave): http://www.opensource.apple.com/source/libpthread/libpthread-330.220.2/src/pthread_cond.c 353 354 int rc; 355 do { 356 rc = pthread_cond_signal( cast(pthread_cond_t*) &m_hndl ); 357 } while ( rc == EAGAIN ); 358 if ( rc ) 359 throw staticError!AssertError("Unable to notify condition", __FILE__, __LINE__); 360 } 361 } 362 363 /** 364 * Notifies all waiters. 365 * 366 * Throws: 367 * SyncError on error. 368 */ 369 void notifyAll() 370 { 371 notifyAll!(typeof(this))(true); 372 } 373 374 /// ditto 375 void notifyAll() shared 376 { 377 notifyAll!(typeof(this))(true); 378 } 379 380 /// ditto 381 void notifyAll(this Q)( bool _unused_ ) 382 if (is(Q == Condition) || is(Q == shared Condition)) 383 { 384 version (Windows) 385 { 386 notify_( true ); 387 } 388 else version (Posix) 389 { 390 // Since OS X 10.7 (Lion), pthread_cond_broadcast returns EAGAIN after retrying 8192 times, 391 // so need to retrying while it returns EAGAIN. 392 // 393 // 10.7.0 (Lion): http://www.opensource.apple.com/source/Libc/Libc-763.11/pthreads/pthread_cond.c 394 // 10.8.0 (Mountain Lion): http://www.opensource.apple.com/source/Libc/Libc-825.24/pthreads/pthread_cond.c 395 // 10.10.0 (Yosemite): http://www.opensource.apple.com/source/libpthread/libpthread-105.1.4/src/pthread_cond.c 396 // 10.11.0 (El Capitan): http://www.opensource.apple.com/source/libpthread/libpthread-137.1.1/src/pthread_cond.c 397 // 10.12.0 (Sierra): http://www.opensource.apple.com/source/libpthread/libpthread-218.1.3/src/pthread_cond.c 398 // 10.13.0 (High Sierra): http://www.opensource.apple.com/source/libpthread/libpthread-301.1.6/src/pthread_cond.c 399 // 10.14.0 (Mojave): http://www.opensource.apple.com/source/libpthread/libpthread-330.201.1/src/pthread_cond.c 400 // 10.14.1 (Mojave): http://www.opensource.apple.com/source/libpthread/libpthread-330.220.2/src/pthread_cond.c 401 402 int rc; 403 do { 404 rc = pthread_cond_broadcast( cast(pthread_cond_t*) &m_hndl ); 405 } while ( rc == EAGAIN ); 406 if ( rc ) 407 throw staticError!AssertError("Unable to notify condition", __FILE__, __LINE__); 408 } 409 } 410 411 private: 412 version (Windows) 413 { 414 bool timedWait(this Q)( DWORD timeout ) 415 if (is(Q == Condition) || is(Q == shared Condition)) 416 { 417 static if (is(Q == Condition)) 418 { 419 auto op(string o, T, V1)(ref T val, V1 mod) 420 { 421 return mixin("val " ~ o ~ "mod"); 422 } 423 } 424 else 425 { 426 auto op(string o, T, V1)(ref shared T val, V1 mod) 427 { 428 import core.atomic: atomicOp; 429 return atomicOp!o(val, mod); 430 } 431 } 432 433 int numSignalsLeft; 434 int numWaitersGone; 435 DWORD rc; 436 437 rc = WaitForSingleObject( cast(HANDLE) m_blockLock, INFINITE ); 438 assert( rc == WAIT_OBJECT_0 ); 439 440 op!"+="(m_numWaitersBlocked, 1); 441 442 rc = ReleaseSemaphore( cast(HANDLE) m_blockLock, 1, null ); 443 assert( rc ); 444 445 m_assocMutex.unlock(); 446 scope(failure) m_assocMutex.lock(); 447 448 rc = WaitForSingleObject( cast(HANDLE) m_blockQueue, timeout ); 449 assert( rc == WAIT_OBJECT_0 || rc == WAIT_TIMEOUT ); 450 bool timedOut = (rc == WAIT_TIMEOUT); 451 452 EnterCriticalSection( &m_unblockLock ); 453 scope(failure) LeaveCriticalSection( &m_unblockLock ); 454 455 if ( (numSignalsLeft = m_numWaitersToUnblock) != 0 ) 456 { 457 if ( timedOut ) 458 { 459 // timeout (or canceled) 460 if ( m_numWaitersBlocked != 0 ) 461 { 462 op!"-="(m_numWaitersBlocked, 1); 463 // do not unblock next waiter below (already unblocked) 464 numSignalsLeft = 0; 465 } 466 else 467 { 468 // spurious wakeup pending!! 469 m_numWaitersGone = 1; 470 } 471 } 472 if ( op!"-="(m_numWaitersToUnblock, 1) == 0 ) 473 { 474 if ( m_numWaitersBlocked != 0 ) 475 { 476 // open the gate 477 rc = ReleaseSemaphore( cast(HANDLE) m_blockLock, 1, null ); 478 assert( rc ); 479 // do not open the gate below again 480 numSignalsLeft = 0; 481 } 482 else if ( (numWaitersGone = m_numWaitersGone) != 0 ) 483 { 484 m_numWaitersGone = 0; 485 } 486 } 487 } 488 else if ( op!"+="(m_numWaitersGone, 1) == int.max / 2 ) 489 { 490 // timeout/canceled or spurious event :-) 491 rc = WaitForSingleObject( cast(HANDLE) m_blockLock, INFINITE ); 492 assert( rc == WAIT_OBJECT_0 ); 493 // something is going on here - test of timeouts? 494 op!"-="(m_numWaitersBlocked, m_numWaitersGone); 495 rc = ReleaseSemaphore( cast(HANDLE) m_blockLock, 1, null ); 496 assert( rc == WAIT_OBJECT_0 ); 497 m_numWaitersGone = 0; 498 } 499 500 LeaveCriticalSection( &m_unblockLock ); 501 502 if ( numSignalsLeft == 1 ) 503 { 504 // better now than spurious later (same as ResetEvent) 505 for ( ; numWaitersGone > 0; --numWaitersGone ) 506 { 507 rc = WaitForSingleObject( cast(HANDLE) m_blockQueue, INFINITE ); 508 assert( rc == WAIT_OBJECT_0 ); 509 } 510 // open the gate 511 rc = ReleaseSemaphore( cast(HANDLE) m_blockLock, 1, null ); 512 assert( rc ); 513 } 514 else if ( numSignalsLeft != 0 ) 515 { 516 // unblock next waiter 517 rc = ReleaseSemaphore( cast(HANDLE) m_blockQueue, 1, null ); 518 assert( rc ); 519 } 520 m_assocMutex.lock(); 521 return !timedOut; 522 } 523 524 525 void notify_(this Q)( bool all ) 526 if (is(Q == Condition) || is(Q == shared Condition)) 527 { 528 static if (is(Q == Condition)) 529 { 530 auto op(string o, T, V1)(ref T val, V1 mod) 531 { 532 return mixin("val " ~ o ~ "mod"); 533 } 534 } 535 else 536 { 537 auto op(string o, T, V1)(ref shared T val, V1 mod) 538 { 539 import core.atomic: atomicOp; 540 return atomicOp!o(val, mod); 541 } 542 } 543 544 DWORD rc; 545 546 EnterCriticalSection( &m_unblockLock ); 547 scope(failure) LeaveCriticalSection( &m_unblockLock ); 548 549 if ( m_numWaitersToUnblock != 0 ) 550 { 551 if ( m_numWaitersBlocked == 0 ) 552 { 553 LeaveCriticalSection( &m_unblockLock ); 554 return; 555 } 556 if ( all ) 557 { 558 op!"+="(m_numWaitersToUnblock, m_numWaitersBlocked); 559 m_numWaitersBlocked = 0; 560 } 561 else 562 { 563 op!"+="(m_numWaitersToUnblock, 1); 564 op!"-="(m_numWaitersBlocked, 1); 565 } 566 LeaveCriticalSection( &m_unblockLock ); 567 } 568 else if ( m_numWaitersBlocked > m_numWaitersGone ) 569 { 570 rc = WaitForSingleObject( cast(HANDLE) m_blockLock, INFINITE ); 571 assert( rc == WAIT_OBJECT_0 ); 572 if ( 0 != m_numWaitersGone ) 573 { 574 op!"-="(m_numWaitersBlocked, m_numWaitersGone); 575 m_numWaitersGone = 0; 576 } 577 if ( all ) 578 { 579 m_numWaitersToUnblock = m_numWaitersBlocked; 580 m_numWaitersBlocked = 0; 581 } 582 else 583 { 584 m_numWaitersToUnblock = 1; 585 op!"-="(m_numWaitersBlocked, 1); 586 } 587 LeaveCriticalSection( &m_unblockLock ); 588 rc = ReleaseSemaphore( cast(HANDLE) m_blockQueue, 1, null ); 589 assert( rc ); 590 } 591 else 592 { 593 LeaveCriticalSection( &m_unblockLock ); 594 } 595 } 596 597 598 // NOTE: This implementation uses Algorithm 8c as described here: 599 // http://groups.google.com/group/comp.programming.threads/ 600 // browse_frm/thread/1692bdec8040ba40/e7a5f9d40e86503a 601 HANDLE m_blockLock; // auto-reset event (now semaphore) 602 HANDLE m_blockQueue; // auto-reset event (now semaphore) 603 Mutex m_assocMutex; // external mutex/CS 604 CRITICAL_SECTION m_unblockLock; // internal mutex/CS 605 int m_numWaitersGone = 0; 606 int m_numWaitersBlocked = 0; 607 int m_numWaitersToUnblock = 0; 608 } 609 else version (Posix) 610 { 611 Mutex m_assocMutex; 612 pthread_cond_t m_hndl; 613 } 614 } 615 616 617 //////////////////////////////////////////////////////////////////////////////// 618 // Unit Tests 619 //////////////////////////////////////////////////////////////////////////////// 620 621 unittest 622 { 623 import core.thread; 624 import core.sync.mutex; 625 import core.sync.semaphore; 626 627 628 void testNotify() 629 { 630 auto mutex = new Mutex; 631 auto condReady = new Condition( mutex ); 632 auto semDone = new Semaphore; 633 auto synLoop = new Object; 634 int numWaiters = 10; 635 int numTries = 10; 636 int numReady = 0; 637 int numTotal = 0; 638 int numDone = 0; 639 int numPost = 0; 640 641 void waiter() 642 { 643 for ( int i = 0; i < numTries; ++i ) 644 { 645 synchronized( mutex ) 646 { 647 while ( numReady < 1 ) 648 { 649 condReady.wait(); 650 } 651 --numReady; 652 ++numTotal; 653 } 654 655 synchronized( synLoop ) 656 { 657 ++numDone; 658 } 659 semDone.wait(); 660 } 661 } 662 663 auto group = new ThreadGroup; 664 665 for ( int i = 0; i < numWaiters; ++i ) 666 group.create( &waiter ); 667 668 for ( int i = 0; i < numTries; ++i ) 669 { 670 for ( int j = 0; j < numWaiters; ++j ) 671 { 672 synchronized( mutex ) 673 { 674 ++numReady; 675 condReady.notify(); 676 } 677 } 678 while ( true ) 679 { 680 synchronized( synLoop ) 681 { 682 if ( numDone >= numWaiters ) 683 break; 684 } 685 Thread.yield(); 686 } 687 for ( int j = 0; j < numWaiters; ++j ) 688 { 689 semDone.notify(); 690 } 691 } 692 693 group.joinAll(); 694 assert( numTotal == numWaiters * numTries ); 695 } 696 697 698 void testNotifyAll() 699 { 700 auto mutex = new Mutex; 701 auto condReady = new Condition( mutex ); 702 int numWaiters = 10; 703 int numReady = 0; 704 int numDone = 0; 705 bool alert = false; 706 707 void waiter() 708 { 709 synchronized( mutex ) 710 { 711 ++numReady; 712 while ( !alert ) 713 condReady.wait(); 714 ++numDone; 715 } 716 } 717 718 auto group = new ThreadGroup; 719 720 for ( int i = 0; i < numWaiters; ++i ) 721 group.create( &waiter ); 722 723 while ( true ) 724 { 725 synchronized( mutex ) 726 { 727 if ( numReady >= numWaiters ) 728 { 729 alert = true; 730 condReady.notifyAll(); 731 break; 732 } 733 } 734 Thread.yield(); 735 } 736 group.joinAll(); 737 assert( numReady == numWaiters && numDone == numWaiters ); 738 } 739 740 741 void testWaitTimeout() 742 { 743 auto mutex = new Mutex; 744 auto condReady = new Condition( mutex ); 745 bool waiting = false; 746 bool alertedOne = true; 747 bool alertedTwo = true; 748 749 void waiter() 750 { 751 synchronized( mutex ) 752 { 753 waiting = true; 754 // we never want to miss the notification (30s) 755 alertedOne = condReady.wait( dur!"seconds"(30) ); 756 // but we don't want to wait long for the timeout (10ms) 757 alertedTwo = condReady.wait( dur!"msecs"(10) ); 758 } 759 } 760 761 auto thread = new Thread( &waiter ); 762 thread.start(); 763 764 while ( true ) 765 { 766 synchronized( mutex ) 767 { 768 if ( waiting ) 769 { 770 condReady.notify(); 771 break; 772 } 773 } 774 Thread.yield(); 775 } 776 thread.join(); 777 assert( waiting ); 778 assert( alertedOne ); 779 assert( !alertedTwo ); 780 } 781 782 testNotify(); 783 testNotifyAll(); 784 testWaitTimeout(); 785 } 786 787 unittest 788 { 789 import core.thread; 790 import core.sync.mutex; 791 import core.sync.semaphore; 792 793 794 void testNotify() 795 { 796 auto mutex = new shared Mutex; 797 auto condReady = new shared Condition( mutex ); 798 auto semDone = new Semaphore; 799 auto synLoop = new Object; 800 int numWaiters = 10; 801 int numTries = 10; 802 int numReady = 0; 803 int numTotal = 0; 804 int numDone = 0; 805 int numPost = 0; 806 807 void waiter() 808 { 809 for ( int i = 0; i < numTries; ++i ) 810 { 811 synchronized( mutex ) 812 { 813 while ( numReady < 1 ) 814 { 815 condReady.wait(); 816 } 817 --numReady; 818 ++numTotal; 819 } 820 821 synchronized( synLoop ) 822 { 823 ++numDone; 824 } 825 semDone.wait(); 826 } 827 } 828 829 auto group = new ThreadGroup; 830 831 for ( int i = 0; i < numWaiters; ++i ) 832 group.create( &waiter ); 833 834 for ( int i = 0; i < numTries; ++i ) 835 { 836 for ( int j = 0; j < numWaiters; ++j ) 837 { 838 synchronized( mutex ) 839 { 840 ++numReady; 841 condReady.notify(); 842 } 843 } 844 while ( true ) 845 { 846 synchronized( synLoop ) 847 { 848 if ( numDone >= numWaiters ) 849 break; 850 } 851 Thread.yield(); 852 } 853 for ( int j = 0; j < numWaiters; ++j ) 854 { 855 semDone.notify(); 856 } 857 } 858 859 group.joinAll(); 860 assert( numTotal == numWaiters * numTries ); 861 } 862 863 864 void testNotifyAll() 865 { 866 auto mutex = new shared Mutex; 867 auto condReady = new shared Condition( mutex ); 868 int numWaiters = 10; 869 int numReady = 0; 870 int numDone = 0; 871 bool alert = false; 872 873 void waiter() 874 { 875 synchronized( mutex ) 876 { 877 ++numReady; 878 while ( !alert ) 879 condReady.wait(); 880 ++numDone; 881 } 882 } 883 884 auto group = new ThreadGroup; 885 886 for ( int i = 0; i < numWaiters; ++i ) 887 group.create( &waiter ); 888 889 while ( true ) 890 { 891 synchronized( mutex ) 892 { 893 if ( numReady >= numWaiters ) 894 { 895 alert = true; 896 condReady.notifyAll(); 897 break; 898 } 899 } 900 Thread.yield(); 901 } 902 group.joinAll(); 903 assert( numReady == numWaiters && numDone == numWaiters ); 904 } 905 906 907 void testWaitTimeout() 908 { 909 auto mutex = new shared Mutex; 910 auto condReady = new shared Condition( mutex ); 911 bool waiting = false; 912 bool alertedOne = true; 913 bool alertedTwo = true; 914 915 void waiter() 916 { 917 synchronized( mutex ) 918 { 919 waiting = true; 920 // we never want to miss the notification (30s) 921 alertedOne = condReady.wait( dur!"seconds"(30) ); 922 // but we don't want to wait long for the timeout (10ms) 923 alertedTwo = condReady.wait( dur!"msecs"(10) ); 924 } 925 } 926 927 auto thread = new Thread( &waiter ); 928 thread.start(); 929 930 while ( true ) 931 { 932 synchronized( mutex ) 933 { 934 if ( waiting ) 935 { 936 condReady.notify(); 937 break; 938 } 939 } 940 Thread.yield(); 941 } 942 thread.join(); 943 assert( waiting ); 944 assert( alertedOne ); 945 assert( !alertedTwo ); 946 } 947 948 testNotify(); 949 testNotifyAll(); 950 testWaitTimeout(); 951 }