1 /**
2  * The condition module provides a primitive for synchronized condition
3  * checking.
4  *
5  * Copyright: Copyright Sean Kelly 2005 - 2009.
6  * License:   $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost License 1.0)
7  * Authors:   Sean Kelly
8  * Source:    $(DRUNTIMESRC core/sync/_condition.d)
9  */
10 
11 /*          Copyright Sean Kelly 2005 - 2009.
12  * Distributed under the Boost Software License, Version 1.0.
13  *    (See accompanying file LICENSE or copy at
14  *          http://www.boost.org/LICENSE_1_0.txt)
15  */
16 module core.sync.condition;
17 
18 
19 public import core.sync.exception;
20 public import core.sync.mutex;
21 public import core.time;
22 
23 import core.exception : AssertError, staticError;
24 
25 
26 version (Windows)
27 {
28     import core.sync.semaphore;
29     import core.sys.windows.basetsd /+: HANDLE+/;
30     import core.sys.windows.winbase /+: CloseHandle, CreateSemaphoreA, CRITICAL_SECTION,
31         DeleteCriticalSection, EnterCriticalSection, INFINITE, InitializeCriticalSection,
32         LeaveCriticalSection, ReleaseSemaphore, WAIT_OBJECT_0, WaitForSingleObject+/;
33     import core.sys.windows.windef /+: BOOL, DWORD+/;
34     import core.sys.windows.winerror /+: WAIT_TIMEOUT+/;
35 }
36 else version (Posix)
37 {
38     import core.sync.config;
39     import core.stdc.errno;
40     import core.sys.posix.pthread;
41     import core.sys.posix.time;
42 }
43 else
44 {
45     static assert(false, "Platform not supported");
46 }
47 
48 
49 ////////////////////////////////////////////////////////////////////////////////
50 // Condition
51 //
52 // void wait();
53 // void notify();
54 // void notifyAll();
55 ////////////////////////////////////////////////////////////////////////////////
56 
57 /**
58  * This class represents a condition variable as conceived by C.A.R. Hoare.  As
59  * per Mesa type monitors however, "signal" has been replaced with "notify" to
60  * indicate that control is not transferred to the waiter when a notification
61  * is sent.
62  */
63 class Condition
64 {
65     ////////////////////////////////////////////////////////////////////////////
66     // Initialization
67     ////////////////////////////////////////////////////////////////////////////
68 
69     /**
70      * Initializes a condition object which is associated with the supplied
71      * mutex object.
72      *
73      * Params:
74      *  m = The mutex with which this condition will be associated.
75      *
76      * Throws:
77      *  SyncError on error.
78      */
79     this( Mutex m ) nothrow @safe @nogc
80     {
81         this(m, true);
82     }
83 
84     /// ditto
85     this( shared Mutex m ) shared nothrow @safe @nogc
86     {
87         import core.atomic : atomicLoad;
88         this(atomicLoad(m), true);
89     }
90 
91     //
92     private this(this Q, M)( M m, bool _unused_ ) nothrow @trusted @nogc
93         if ((is(Q == Condition) && is(M == Mutex)) ||
94             (is(Q == shared Condition) && is(M == shared Mutex)))
95     {
96         version (Windows)
97         {
98             static if (is(Q == Condition))
99             {
100                 alias HANDLE_TYPE = void*;
101             }
102             else
103             {
104                 alias HANDLE_TYPE = shared(void*);
105             }
106             m_blockLock = cast(HANDLE_TYPE) CreateSemaphoreA( null, 1, 1, null );
107             if ( m_blockLock == m_blockLock.init )
108                 throw staticError!AssertError("Unable to initialize condition", __FILE__, __LINE__);
109             scope(failure) CloseHandle( cast(void*) m_blockLock );
110 
111             m_blockQueue = cast(HANDLE_TYPE) CreateSemaphoreA( null, 0, int.max, null );
112             if ( m_blockQueue == m_blockQueue.init )
113                 throw staticError!AssertError("Unable to initialize condition", __FILE__, __LINE__);
114             scope(failure) CloseHandle( cast(void*) m_blockQueue );
115 
116             InitializeCriticalSection( cast(RTL_CRITICAL_SECTION*) &m_unblockLock );
117             m_assocMutex = m;
118         }
119         else version (Posix)
120         {
121             static if (is(Q == shared))
122             {
123                 import core.atomic : atomicLoad;
124                 m_assocMutex = atomicLoad(m);
125             }
126             else
127             {
128                 m_assocMutex = m;
129             }
130             static if ( is( typeof( pthread_condattr_setclock ) ) )
131             {
132                 () @trusted
133                 {
134                     pthread_condattr_t attr = void;
135                     int rc  = pthread_condattr_init( &attr );
136                     if ( rc )
137                         throw staticError!AssertError("Unable to initialize condition", __FILE__, __LINE__);
138                     rc = pthread_condattr_setclock( &attr, CLOCK_MONOTONIC );
139                     if ( rc )
140                         throw staticError!AssertError("Unable to initialize condition", __FILE__, __LINE__);
141                     rc = pthread_cond_init( cast(pthread_cond_t*) &m_hndl, &attr );
142                     if ( rc )
143                         throw staticError!AssertError("Unable to initialize condition", __FILE__, __LINE__);
144                     rc = pthread_condattr_destroy( &attr );
145                     if ( rc )
146                         throw staticError!AssertError("Unable to initialize condition", __FILE__, __LINE__);
147                 } ();
148             }
149             else
150             {
151                 int rc = pthread_cond_init( cast(pthread_cond_t*) &m_hndl, null );
152                 if ( rc )
153                     throw staticError!AssertError("Unable to initialize condition", __FILE__, __LINE__);
154             }
155         }
156     }
157 
158     ~this() @nogc
159     {
160         version (Windows)
161         {
162             BOOL rc = CloseHandle( m_blockLock );
163             assert( rc, "Unable to destroy condition" );
164             rc = CloseHandle( m_blockQueue );
165             assert( rc, "Unable to destroy condition" );
166             DeleteCriticalSection( &m_unblockLock );
167         }
168         else version (Posix)
169         {
170             int rc = pthread_cond_destroy( &m_hndl );
171             assert( !rc, "Unable to destroy condition" );
172         }
173     }
174 
175 
176     ////////////////////////////////////////////////////////////////////////////
177     // General Properties
178     ////////////////////////////////////////////////////////////////////////////
179 
180 
181     /**
182      * Gets the mutex associated with this condition.
183      *
184      * Returns:
185      *  The mutex associated with this condition.
186      */
187     @property Mutex mutex()
188     {
189         return m_assocMutex;
190     }
191 
192     /// ditto
193     @property shared(Mutex) mutex() shared
194     {
195         import core.atomic : atomicLoad;
196         return atomicLoad(m_assocMutex);
197     }
198 
199     // undocumented function for internal use
200     final @property Mutex mutex_nothrow() pure nothrow @safe @nogc
201     {
202         return m_assocMutex;
203     }
204 
205     // ditto
206     final @property shared(Mutex) mutex_nothrow() shared pure nothrow @safe @nogc
207     {
208         import core.atomic : atomicLoad;
209         return atomicLoad(m_assocMutex);
210     }
211 
212     ////////////////////////////////////////////////////////////////////////////
213     // General Actions
214     ////////////////////////////////////////////////////////////////////////////
215 
216 
217     /**
218      * Wait until notified.
219      *
220      * Throws:
221      *  SyncError on error.
222      */
223     void wait()
224     {
225         wait!(typeof(this))(true);
226     }
227 
228     /// ditto
229     void wait() shared
230     {
231         wait!(typeof(this))(true);
232     }
233 
234     /// ditto
235     void wait(this Q)( bool _unused_ )
236         if (is(Q == Condition) || is(Q == shared Condition))
237     {
238         version (Windows)
239         {
240             timedWait( INFINITE );
241         }
242         else version (Posix)
243         {
244             int rc = pthread_cond_wait( cast(pthread_cond_t*) &m_hndl, (cast(Mutex) m_assocMutex).handleAddr() );
245             if ( rc )
246                 throw staticError!AssertError("Unable to wait for condition", __FILE__, __LINE__);
247         }
248     }
249 
250     /**
251      * Suspends the calling thread until a notification occurs or until the
252      * supplied time period has elapsed.
253      *
254      * Params:
255      *  val = The time to wait.
256      *
257      * In:
258      *  val must be non-negative.
259      *
260      * Throws:
261      *  SyncError on error.
262      *
263      * Returns:
264      *  true if notified before the timeout and false if not.
265      */
266     bool wait( Duration val )
267     {
268         return wait!(typeof(this))(val, true);
269     }
270 
271     /// ditto
272     bool wait( Duration val ) shared
273     {
274         return wait!(typeof(this))(val, true);
275     }
276 
277     /// ditto
278     bool wait(this Q)( Duration val, bool _unused_ )
279         if (is(Q == Condition) || is(Q == shared Condition))
280     in
281     {
282         assert( !val.isNegative );
283     }
284     do
285     {
286         version (Windows)
287         {
288             auto maxWaitMillis = dur!("msecs")( uint.max - 1 );
289 
290             while ( val > maxWaitMillis )
291             {
292                 if ( timedWait( cast(uint)
293                                maxWaitMillis.total!"msecs" ) )
294                     return true;
295                 val -= maxWaitMillis;
296             }
297             return timedWait( cast(uint) val.total!"msecs" );
298         }
299         else version (Posix)
300         {
301             timespec t = void;
302             mktspec( t, val );
303 
304             int rc = pthread_cond_timedwait( cast(pthread_cond_t*) &m_hndl,
305                                              (cast(Mutex) m_assocMutex).handleAddr(),
306                                              &t );
307             if ( !rc )
308                 return true;
309             if ( rc == ETIMEDOUT )
310                 return false;
311             throw staticError!AssertError("Unable to wait for condition", __FILE__, __LINE__);
312         }
313     }
314 
315     /**
316      * Notifies one waiter.
317      *
318      * Throws:
319      *  SyncError on error.
320      */
321     void notify()
322     {
323         notify!(typeof(this))(true);
324     }
325 
326     /// ditto
327     void notify() shared
328     {
329         notify!(typeof(this))(true);
330     }
331 
332     /// ditto
333     void notify(this Q)( bool _unused_ )
334         if (is(Q == Condition) || is(Q == shared Condition))
335     {
336         version (Windows)
337         {
338             notify_( false );
339         }
340         else version (Posix)
341         {
342             // Since OS X 10.7 (Lion), pthread_cond_signal returns EAGAIN after retrying 8192 times,
343             // so need to retrying while it returns EAGAIN.
344             //
345             // 10.7.0 (Lion):          http://www.opensource.apple.com/source/Libc/Libc-763.11/pthreads/pthread_cond.c
346             // 10.8.0 (Mountain Lion): http://www.opensource.apple.com/source/Libc/Libc-825.24/pthreads/pthread_cond.c
347             // 10.10.0 (Yosemite):     http://www.opensource.apple.com/source/libpthread/libpthread-105.1.4/src/pthread_cond.c
348             // 10.11.0 (El Capitan):   http://www.opensource.apple.com/source/libpthread/libpthread-137.1.1/src/pthread_cond.c
349             // 10.12.0 (Sierra):       http://www.opensource.apple.com/source/libpthread/libpthread-218.1.3/src/pthread_cond.c
350             // 10.13.0 (High Sierra):  http://www.opensource.apple.com/source/libpthread/libpthread-301.1.6/src/pthread_cond.c
351             // 10.14.0 (Mojave):       http://www.opensource.apple.com/source/libpthread/libpthread-330.201.1/src/pthread_cond.c
352             // 10.14.1 (Mojave):       http://www.opensource.apple.com/source/libpthread/libpthread-330.220.2/src/pthread_cond.c
353 
354             int rc;
355             do {
356                 rc = pthread_cond_signal( cast(pthread_cond_t*) &m_hndl );
357             } while ( rc == EAGAIN );
358             if ( rc )
359                 throw staticError!AssertError("Unable to notify condition", __FILE__, __LINE__);
360         }
361     }
362 
363     /**
364      * Notifies all waiters.
365      *
366      * Throws:
367      *  SyncError on error.
368      */
369     void notifyAll()
370     {
371         notifyAll!(typeof(this))(true);
372     }
373 
374     /// ditto
375     void notifyAll() shared
376     {
377         notifyAll!(typeof(this))(true);
378     }
379 
380     /// ditto
381     void notifyAll(this Q)( bool _unused_ )
382         if (is(Q == Condition) || is(Q == shared Condition))
383     {
384         version (Windows)
385         {
386             notify_( true );
387         }
388         else version (Posix)
389         {
390             // Since OS X 10.7 (Lion), pthread_cond_broadcast returns EAGAIN after retrying 8192 times,
391             // so need to retrying while it returns EAGAIN.
392             //
393             // 10.7.0 (Lion):          http://www.opensource.apple.com/source/Libc/Libc-763.11/pthreads/pthread_cond.c
394             // 10.8.0 (Mountain Lion): http://www.opensource.apple.com/source/Libc/Libc-825.24/pthreads/pthread_cond.c
395             // 10.10.0 (Yosemite):     http://www.opensource.apple.com/source/libpthread/libpthread-105.1.4/src/pthread_cond.c
396             // 10.11.0 (El Capitan):   http://www.opensource.apple.com/source/libpthread/libpthread-137.1.1/src/pthread_cond.c
397             // 10.12.0 (Sierra):       http://www.opensource.apple.com/source/libpthread/libpthread-218.1.3/src/pthread_cond.c
398             // 10.13.0 (High Sierra):  http://www.opensource.apple.com/source/libpthread/libpthread-301.1.6/src/pthread_cond.c
399             // 10.14.0 (Mojave):       http://www.opensource.apple.com/source/libpthread/libpthread-330.201.1/src/pthread_cond.c
400             // 10.14.1 (Mojave):       http://www.opensource.apple.com/source/libpthread/libpthread-330.220.2/src/pthread_cond.c
401 
402             int rc;
403             do {
404                 rc = pthread_cond_broadcast( cast(pthread_cond_t*) &m_hndl );
405             } while ( rc == EAGAIN );
406             if ( rc )
407                 throw staticError!AssertError("Unable to notify condition", __FILE__, __LINE__);
408         }
409     }
410 
411 private:
412     version (Windows)
413     {
414         bool timedWait(this Q)( DWORD timeout )
415             if (is(Q == Condition) || is(Q == shared Condition))
416         {
417             static if (is(Q == Condition))
418             {
419                 auto op(string o, T, V1)(ref T val, V1 mod)
420                 {
421                     return mixin("val " ~ o ~ "mod");
422                 }
423             }
424             else
425             {
426                 auto op(string o, T, V1)(ref shared T val, V1 mod)
427                 {
428                     import core.atomic: atomicOp;
429                     return atomicOp!o(val, mod);
430                 }
431             }
432 
433             int   numSignalsLeft;
434             int   numWaitersGone;
435             DWORD rc;
436 
437             rc = WaitForSingleObject( cast(HANDLE) m_blockLock, INFINITE );
438             assert( rc == WAIT_OBJECT_0 );
439 
440             op!"+="(m_numWaitersBlocked, 1);
441 
442             rc = ReleaseSemaphore( cast(HANDLE) m_blockLock, 1, null );
443             assert( rc );
444 
445             m_assocMutex.unlock();
446             scope(failure) m_assocMutex.lock();
447 
448             rc = WaitForSingleObject( cast(HANDLE) m_blockQueue, timeout );
449             assert( rc == WAIT_OBJECT_0 || rc == WAIT_TIMEOUT );
450             bool timedOut = (rc == WAIT_TIMEOUT);
451 
452             EnterCriticalSection( &m_unblockLock );
453             scope(failure) LeaveCriticalSection( &m_unblockLock );
454 
455             if ( (numSignalsLeft = m_numWaitersToUnblock) != 0 )
456             {
457                 if ( timedOut )
458                 {
459                     // timeout (or canceled)
460                     if ( m_numWaitersBlocked != 0 )
461                     {
462                         op!"-="(m_numWaitersBlocked, 1);
463                         // do not unblock next waiter below (already unblocked)
464                         numSignalsLeft = 0;
465                     }
466                     else
467                     {
468                         // spurious wakeup pending!!
469                         m_numWaitersGone = 1;
470                     }
471                 }
472                 if ( op!"-="(m_numWaitersToUnblock, 1) == 0 )
473                 {
474                     if ( m_numWaitersBlocked != 0 )
475                     {
476                         // open the gate
477                         rc = ReleaseSemaphore( cast(HANDLE) m_blockLock, 1, null );
478                         assert( rc );
479                         // do not open the gate below again
480                         numSignalsLeft = 0;
481                     }
482                     else if ( (numWaitersGone = m_numWaitersGone) != 0 )
483                     {
484                         m_numWaitersGone = 0;
485                     }
486                 }
487             }
488             else if ( op!"+="(m_numWaitersGone, 1) == int.max / 2 )
489             {
490                 // timeout/canceled or spurious event :-)
491                 rc = WaitForSingleObject( cast(HANDLE) m_blockLock, INFINITE );
492                 assert( rc == WAIT_OBJECT_0 );
493                 // something is going on here - test of timeouts?
494                 op!"-="(m_numWaitersBlocked, m_numWaitersGone);
495                 rc = ReleaseSemaphore( cast(HANDLE) m_blockLock, 1, null );
496                 assert( rc == WAIT_OBJECT_0 );
497                 m_numWaitersGone = 0;
498             }
499 
500             LeaveCriticalSection( &m_unblockLock );
501 
502             if ( numSignalsLeft == 1 )
503             {
504                 // better now than spurious later (same as ResetEvent)
505                 for ( ; numWaitersGone > 0; --numWaitersGone )
506                 {
507                     rc = WaitForSingleObject( cast(HANDLE) m_blockQueue, INFINITE );
508                     assert( rc == WAIT_OBJECT_0 );
509                 }
510                 // open the gate
511                 rc = ReleaseSemaphore( cast(HANDLE) m_blockLock, 1, null );
512                 assert( rc );
513             }
514             else if ( numSignalsLeft != 0 )
515             {
516                 // unblock next waiter
517                 rc = ReleaseSemaphore( cast(HANDLE) m_blockQueue, 1, null );
518                 assert( rc );
519             }
520             m_assocMutex.lock();
521             return !timedOut;
522         }
523 
524 
525         void notify_(this Q)( bool all )
526             if (is(Q == Condition) || is(Q == shared Condition))
527         {
528             static if (is(Q == Condition))
529             {
530                 auto op(string o, T, V1)(ref T val, V1 mod)
531                 {
532                     return mixin("val " ~ o ~ "mod");
533                 }
534             }
535             else
536             {
537                 auto op(string o, T, V1)(ref shared T val, V1 mod)
538                 {
539                     import core.atomic: atomicOp;
540                     return atomicOp!o(val, mod);
541                 }
542             }
543 
544             DWORD rc;
545 
546             EnterCriticalSection( &m_unblockLock );
547             scope(failure) LeaveCriticalSection( &m_unblockLock );
548 
549             if ( m_numWaitersToUnblock != 0 )
550             {
551                 if ( m_numWaitersBlocked == 0 )
552                 {
553                     LeaveCriticalSection( &m_unblockLock );
554                     return;
555                 }
556                 if ( all )
557                 {
558                     op!"+="(m_numWaitersToUnblock, m_numWaitersBlocked);
559                     m_numWaitersBlocked = 0;
560                 }
561                 else
562                 {
563                     op!"+="(m_numWaitersToUnblock, 1);
564                     op!"-="(m_numWaitersBlocked, 1);
565                 }
566                 LeaveCriticalSection( &m_unblockLock );
567             }
568             else if ( m_numWaitersBlocked > m_numWaitersGone )
569             {
570                 rc = WaitForSingleObject( cast(HANDLE) m_blockLock, INFINITE );
571                 assert( rc == WAIT_OBJECT_0 );
572                 if ( 0 != m_numWaitersGone )
573                 {
574                     op!"-="(m_numWaitersBlocked, m_numWaitersGone);
575                     m_numWaitersGone = 0;
576                 }
577                 if ( all )
578                 {
579                     m_numWaitersToUnblock = m_numWaitersBlocked;
580                     m_numWaitersBlocked = 0;
581                 }
582                 else
583                 {
584                     m_numWaitersToUnblock = 1;
585                     op!"-="(m_numWaitersBlocked, 1);
586                 }
587                 LeaveCriticalSection( &m_unblockLock );
588                 rc = ReleaseSemaphore( cast(HANDLE) m_blockQueue, 1, null );
589                 assert( rc );
590             }
591             else
592             {
593                 LeaveCriticalSection( &m_unblockLock );
594             }
595         }
596 
597 
598         // NOTE: This implementation uses Algorithm 8c as described here:
599         //       http://groups.google.com/group/comp.programming.threads/
600         //              browse_frm/thread/1692bdec8040ba40/e7a5f9d40e86503a
601         HANDLE              m_blockLock;    // auto-reset event (now semaphore)
602         HANDLE              m_blockQueue;   // auto-reset event (now semaphore)
603         Mutex               m_assocMutex;   // external mutex/CS
604         CRITICAL_SECTION    m_unblockLock;  // internal mutex/CS
605         int                 m_numWaitersGone        = 0;
606         int                 m_numWaitersBlocked     = 0;
607         int                 m_numWaitersToUnblock   = 0;
608     }
609     else version (Posix)
610     {
611         Mutex               m_assocMutex;
612         pthread_cond_t      m_hndl;
613     }
614 }
615 
616 
617 ////////////////////////////////////////////////////////////////////////////////
618 // Unit Tests
619 ////////////////////////////////////////////////////////////////////////////////
620 
621 unittest
622 {
623     import core.thread;
624     import core.sync.mutex;
625     import core.sync.semaphore;
626 
627 
628     void testNotify()
629     {
630         auto mutex      = new Mutex;
631         auto condReady  = new Condition( mutex );
632         auto semDone    = new Semaphore;
633         auto synLoop    = new Object;
634         int  numWaiters = 10;
635         int  numTries   = 10;
636         int  numReady   = 0;
637         int  numTotal   = 0;
638         int  numDone    = 0;
639         int  numPost    = 0;
640 
641         void waiter()
642         {
643             for ( int i = 0; i < numTries; ++i )
644             {
645                 synchronized( mutex )
646                 {
647                     while ( numReady < 1 )
648                     {
649                         condReady.wait();
650                     }
651                     --numReady;
652                     ++numTotal;
653                 }
654 
655                 synchronized( synLoop )
656                 {
657                     ++numDone;
658                 }
659                 semDone.wait();
660             }
661         }
662 
663         auto group = new ThreadGroup;
664 
665         for ( int i = 0; i < numWaiters; ++i )
666             group.create( &waiter );
667 
668         for ( int i = 0; i < numTries; ++i )
669         {
670             for ( int j = 0; j < numWaiters; ++j )
671             {
672                 synchronized( mutex )
673                 {
674                     ++numReady;
675                     condReady.notify();
676                 }
677             }
678             while ( true )
679             {
680                 synchronized( synLoop )
681                 {
682                     if ( numDone >= numWaiters )
683                         break;
684                 }
685                 Thread.yield();
686             }
687             for ( int j = 0; j < numWaiters; ++j )
688             {
689                 semDone.notify();
690             }
691         }
692 
693         group.joinAll();
694         assert( numTotal == numWaiters * numTries );
695     }
696 
697 
698     void testNotifyAll()
699     {
700         auto mutex      = new Mutex;
701         auto condReady  = new Condition( mutex );
702         int  numWaiters = 10;
703         int  numReady   = 0;
704         int  numDone    = 0;
705         bool alert      = false;
706 
707         void waiter()
708         {
709             synchronized( mutex )
710             {
711                 ++numReady;
712                 while ( !alert )
713                     condReady.wait();
714                 ++numDone;
715             }
716         }
717 
718         auto group = new ThreadGroup;
719 
720         for ( int i = 0; i < numWaiters; ++i )
721             group.create( &waiter );
722 
723         while ( true )
724         {
725             synchronized( mutex )
726             {
727                 if ( numReady >= numWaiters )
728                 {
729                     alert = true;
730                     condReady.notifyAll();
731                     break;
732                 }
733             }
734             Thread.yield();
735         }
736         group.joinAll();
737         assert( numReady == numWaiters && numDone == numWaiters );
738     }
739 
740 
741     void testWaitTimeout()
742     {
743         auto mutex      = new Mutex;
744         auto condReady  = new Condition( mutex );
745         bool waiting    = false;
746         bool alertedOne = true;
747         bool alertedTwo = true;
748 
749         void waiter()
750         {
751             synchronized( mutex )
752             {
753                 waiting    = true;
754                 // we never want to miss the notification (30s)
755                 alertedOne = condReady.wait( dur!"seconds"(30) );
756                 // but we don't want to wait long for the timeout (10ms)
757                 alertedTwo = condReady.wait( dur!"msecs"(10) );
758             }
759         }
760 
761         auto thread = new Thread( &waiter );
762         thread.start();
763 
764         while ( true )
765         {
766             synchronized( mutex )
767             {
768                 if ( waiting )
769                 {
770                     condReady.notify();
771                     break;
772                 }
773             }
774             Thread.yield();
775         }
776         thread.join();
777         assert( waiting );
778         assert( alertedOne );
779         assert( !alertedTwo );
780     }
781 
782     testNotify();
783     testNotifyAll();
784     testWaitTimeout();
785 }
786 
787 unittest
788 {
789     import core.thread;
790     import core.sync.mutex;
791     import core.sync.semaphore;
792 
793 
794     void testNotify()
795     {
796         auto mutex      = new shared Mutex;
797         auto condReady  = new shared Condition( mutex );
798         auto semDone    = new Semaphore;
799         auto synLoop    = new Object;
800         int  numWaiters = 10;
801         int  numTries   = 10;
802         int  numReady   = 0;
803         int  numTotal   = 0;
804         int  numDone    = 0;
805         int  numPost    = 0;
806 
807         void waiter()
808         {
809             for ( int i = 0; i < numTries; ++i )
810             {
811                 synchronized( mutex )
812                 {
813                     while ( numReady < 1 )
814                     {
815                         condReady.wait();
816                     }
817                     --numReady;
818                     ++numTotal;
819                 }
820 
821                 synchronized( synLoop )
822                 {
823                     ++numDone;
824                 }
825                 semDone.wait();
826             }
827         }
828 
829         auto group = new ThreadGroup;
830 
831         for ( int i = 0; i < numWaiters; ++i )
832             group.create( &waiter );
833 
834         for ( int i = 0; i < numTries; ++i )
835         {
836             for ( int j = 0; j < numWaiters; ++j )
837             {
838                 synchronized( mutex )
839                 {
840                     ++numReady;
841                     condReady.notify();
842                 }
843             }
844             while ( true )
845             {
846                 synchronized( synLoop )
847                 {
848                     if ( numDone >= numWaiters )
849                         break;
850                 }
851                 Thread.yield();
852             }
853             for ( int j = 0; j < numWaiters; ++j )
854             {
855                 semDone.notify();
856             }
857         }
858 
859         group.joinAll();
860         assert( numTotal == numWaiters * numTries );
861     }
862 
863 
864     void testNotifyAll()
865     {
866         auto mutex      = new shared Mutex;
867         auto condReady  = new shared Condition( mutex );
868         int  numWaiters = 10;
869         int  numReady   = 0;
870         int  numDone    = 0;
871         bool alert      = false;
872 
873         void waiter()
874         {
875             synchronized( mutex )
876             {
877                 ++numReady;
878                 while ( !alert )
879                     condReady.wait();
880                 ++numDone;
881             }
882         }
883 
884         auto group = new ThreadGroup;
885 
886         for ( int i = 0; i < numWaiters; ++i )
887             group.create( &waiter );
888 
889         while ( true )
890         {
891             synchronized( mutex )
892             {
893                 if ( numReady >= numWaiters )
894                 {
895                     alert = true;
896                     condReady.notifyAll();
897                     break;
898                 }
899             }
900             Thread.yield();
901         }
902         group.joinAll();
903         assert( numReady == numWaiters && numDone == numWaiters );
904     }
905 
906 
907     void testWaitTimeout()
908     {
909         auto mutex      = new shared Mutex;
910         auto condReady  = new shared Condition( mutex );
911         bool waiting    = false;
912         bool alertedOne = true;
913         bool alertedTwo = true;
914 
915         void waiter()
916         {
917             synchronized( mutex )
918             {
919                 waiting    = true;
920                 // we never want to miss the notification (30s)
921                 alertedOne = condReady.wait( dur!"seconds"(30) );
922                 // but we don't want to wait long for the timeout (10ms)
923                 alertedTwo = condReady.wait( dur!"msecs"(10) );
924             }
925         }
926 
927         auto thread = new Thread( &waiter );
928         thread.start();
929 
930         while ( true )
931         {
932             synchronized( mutex )
933             {
934                 if ( waiting )
935                 {
936                     condReady.notify();
937                     break;
938                 }
939             }
940             Thread.yield();
941         }
942         thread.join();
943         assert( waiting );
944         assert( alertedOne );
945         assert( !alertedTwo );
946     }
947 
948     testNotify();
949     testNotifyAll();
950     testWaitTimeout();
951 }