1 /**
2  * The read/write mutex module provides a primitive for maintaining shared read
3  * access and mutually exclusive write access.
4  *
5  * Copyright: Copyright Sean Kelly 2005 - 2009.
6  * License:   $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost License 1.0)
7  * Authors:   Sean Kelly
8  * Source:    $(DRUNTIMESRC core/sync/_rwmutex.d)
9  */
10 
11 /*          Copyright Sean Kelly 2005 - 2009.
12  * Distributed under the Boost Software License, Version 1.0.
13  *    (See accompanying file LICENSE or copy at
14  *          http://www.boost.org/LICENSE_1_0.txt)
15  */
16 module core.sync.rwmutex;
17 
18 
19 public import core.sync.exception;
20 import core.sync.condition;
21 import core.sync.mutex;
22 import core.memory;
23 
24 version (Posix)
25 {
26     import core.sys.posix.pthread;
27 }
28 
29 
30 ////////////////////////////////////////////////////////////////////////////////
31 // ReadWriteMutex
32 //
33 // Reader reader();
34 // Writer writer();
35 ////////////////////////////////////////////////////////////////////////////////
36 
37 
38 /**
39  * This class represents a mutex that allows any number of readers to enter,
40  * but when a writer enters, all other readers and writers are blocked.
41  *
42  * Please note that this mutex is not recursive and is intended to guard access
43  * to data only.  Also, no deadlock checking is in place because doing so would
44  * require dynamic memory allocation, which would reduce performance by an
45  * unacceptable amount.  As a result, any attempt to recursively acquire this
46  * mutex may well deadlock the caller, particularly if a write lock is acquired
47  * while holding a read lock, or vice-versa.  In practice, this should not be
48  * an issue however, because it is uncommon to call deeply into unknown code
49  * while holding a lock that simply protects data.
50  */
51 class ReadWriteMutex
52 {
53     /**
54      * Defines the policy used by this mutex.  Currently, two policies are
55      * defined.
56      *
57      * The first will queue writers until no readers hold the mutex, then
58      * pass the writers through one at a time.  If a reader acquires the mutex
59      * while there are still writers queued, the reader will take precedence.
60      *
61      * The second will queue readers if there are any writers queued.  Writers
62      * are passed through one at a time, and once there are no writers present,
63      * all queued readers will be alerted.
64      *
65      * Future policies may offer a more even balance between reader and writer
66      * precedence.
67      */
68     enum Policy
69     {
70         PREFER_READERS, /// Readers get preference.  This may starve writers.
71         PREFER_WRITERS  /// Writers get preference.  This may starve readers.
72     }
73 
74 
75     ////////////////////////////////////////////////////////////////////////////
76     // Initialization
77     ////////////////////////////////////////////////////////////////////////////
78 
79 
80     /**
81      * Initializes a read/write mutex object with the supplied policy.
82      *
83      * Params:
84      *  policy = The policy to use.
85      *
86      * Throws:
87      *  SyncError on error.
88      */
89     this( Policy policy = Policy.PREFER_WRITERS ) @safe nothrow
90     {
91         m_commonMutex = new Mutex;
92         if ( !m_commonMutex )
93             throw new SyncError( "Unable to initialize mutex" );
94 
95         m_readerQueue = new Condition( m_commonMutex );
96         if ( !m_readerQueue )
97             throw new SyncError( "Unable to initialize mutex" );
98 
99         m_writerQueue = new Condition( m_commonMutex );
100         if ( !m_writerQueue )
101             throw new SyncError( "Unable to initialize mutex" );
102 
103         m_policy = policy;
104         m_reader = new Reader;
105         m_writer = new Writer;
106     }
107 
108     /// ditto
109     shared this( Policy policy = Policy.PREFER_WRITERS ) @safe nothrow
110     {
111         m_commonMutex = new shared Mutex;
112         if ( !m_commonMutex )
113             throw new SyncError( "Unable to initialize mutex" );
114 
115         m_readerQueue = new shared Condition( m_commonMutex );
116         if ( !m_readerQueue )
117             throw new SyncError( "Unable to initialize mutex" );
118 
119         m_writerQueue = new shared Condition( m_commonMutex );
120         if ( !m_writerQueue )
121             throw new SyncError( "Unable to initialize mutex" );
122 
123         m_policy = policy;
124         m_reader = new shared Reader;
125         m_writer = new shared Writer;
126     }
127 
128     ////////////////////////////////////////////////////////////////////////////
129     // General Properties
130     ////////////////////////////////////////////////////////////////////////////
131 
132 
133     /**
134      * Gets the policy used by this mutex.
135      *
136      * Returns:
137      *  The policy used by this mutex.
138      */
139     @property Policy policy() @safe nothrow
140     {
141         return m_policy;
142     }
143 
144     ///ditto
145     @property Policy policy() shared @safe nothrow
146     {
147         return m_policy;
148     }
149 
150     ////////////////////////////////////////////////////////////////////////////
151     // Reader/Writer Handles
152     ////////////////////////////////////////////////////////////////////////////
153 
154 
155     /**
156      * Gets an object representing the reader lock for the associated mutex.
157      *
158      * Returns:
159      *  A reader sub-mutex.
160      */
161     @property Reader reader() @safe nothrow
162     {
163         return m_reader;
164     }
165 
166     ///ditto
167     @property shared(Reader) reader() shared @safe nothrow
168     {
169         return m_reader;
170     }
171 
172     /**
173      * Gets an object representing the writer lock for the associated mutex.
174      *
175      * Returns:
176      *  A writer sub-mutex.
177      */
178     @property Writer writer() @safe nothrow
179     {
180         return m_writer;
181     }
182 
183     ///ditto
184     @property shared(Writer) writer() shared @safe nothrow
185     {
186         return m_writer;
187     }
188 
189 
190     ////////////////////////////////////////////////////////////////////////////
191     // Reader
192     ////////////////////////////////////////////////////////////////////////////
193 
194 
195     /**
196      * This class can be considered a mutex in its own right, and is used to
197      * negotiate a read lock for the enclosing mutex.
198      */
199     class Reader :
200         Object.Monitor
201     {
202         /**
203          * Initializes a read/write mutex reader proxy object.
204          */
205         this(this Q)() @trusted nothrow
206             if (is(Q == Reader) || is(Q == shared Reader))
207         {
208             m_proxy.link = this;
209             this.__monitor = cast(void*) &m_proxy;
210         }
211 
212         /**
213          * Acquires a read lock on the enclosing mutex.
214          */
215         @trusted void lock()
216         {
217             synchronized( m_commonMutex )
218             {
219                 ++m_numQueuedReaders;
220                 scope(exit) --m_numQueuedReaders;
221 
222                 while ( shouldQueueReader )
223                     m_readerQueue.wait();
224                 ++m_numActiveReaders;
225             }
226         }
227 
228         /// ditto
229         @trusted void lock() shared
230         {
231             synchronized( m_commonMutex )
232             {
233                 ++(cast()m_numQueuedReaders);
234                 scope(exit) --(cast()m_numQueuedReaders);
235 
236                 while ( shouldQueueReader )
237                     m_readerQueue.wait();
238                 ++(cast()m_numActiveReaders);
239             }
240         }
241 
242         /**
243          * Releases a read lock on the enclosing mutex.
244          */
245         @trusted void unlock()
246         {
247             synchronized( m_commonMutex )
248             {
249                 if ( --m_numActiveReaders < 1 )
250                 {
251                     if ( m_numQueuedWriters > 0 )
252                         m_writerQueue.notify();
253                 }
254             }
255         }
256 
257         /// ditto
258         @trusted void unlock() shared
259         {
260             synchronized( m_commonMutex )
261             {
262                 if ( --(cast()m_numActiveReaders) < 1 )
263                 {
264                     if ( m_numQueuedWriters > 0 )
265                         m_writerQueue.notify();
266                 }
267             }
268         }
269 
270         /**
271          * Attempts to acquire a read lock on the enclosing mutex.  If one can
272          * be obtained without blocking, the lock is acquired and true is
273          * returned.  If not, the lock is not acquired and false is returned.
274          *
275          * Returns:
276          *  true if the lock was acquired and false if not.
277          */
278         @trusted bool tryLock()
279         {
280             synchronized( m_commonMutex )
281             {
282                 if ( shouldQueueReader )
283                     return false;
284                 ++m_numActiveReaders;
285                 return true;
286             }
287         }
288 
289         /// ditto
290         @trusted bool tryLock() shared
291         {
292             synchronized( m_commonMutex )
293             {
294                 if ( shouldQueueReader )
295                     return false;
296                 ++(cast()m_numActiveReaders);
297                 return true;
298             }
299         }
300 
301         /**
302          * Attempts to acquire a read lock on the enclosing mutex. If one can
303          * be obtained without blocking, the lock is acquired and true is
304          * returned. If not, the function blocks until either the lock can be
305          * obtained or the time elapsed exceeds $(D_PARAM timeout), returning
306          * true if the lock was acquired and false if the function timed out.
307          *
308          * Params:
309          *  timeout = maximum amount of time to wait for the lock
310          * Returns:
311          *  true if the lock was acquired and false if not.
312          */
313         @trusted bool tryLock(Duration timeout)
314         {
315             synchronized( m_commonMutex )
316             {
317                 if (!shouldQueueReader)
318                 {
319                     ++m_numActiveReaders;
320                     return true;
321                 }
322 
323                 enum zero = Duration.zero();
324                 if (timeout <= zero)
325                     return false;
326 
327                 ++m_numQueuedReaders;
328                 scope(exit) --m_numQueuedReaders;
329 
330                 enum maxWaitPerCall = dur!"hours"(24 * 365); // Avoid problems calling wait with huge Duration.
331                 const initialTime = MonoTime.currTime;
332                 m_readerQueue.wait(timeout < maxWaitPerCall ? timeout : maxWaitPerCall);
333                 while (shouldQueueReader)
334                 {
335                     const timeElapsed = MonoTime.currTime - initialTime;
336                     if (timeElapsed >= timeout)
337                         return false;
338                     auto nextWait = timeout - timeElapsed;
339                     m_readerQueue.wait(nextWait < maxWaitPerCall ? nextWait : maxWaitPerCall);
340                 }
341                 ++m_numActiveReaders;
342                 return true;
343             }
344         }
345 
346         /// ditto
347         @trusted bool tryLock(Duration timeout) shared
348         {
349             const initialTime = MonoTime.currTime;
350             synchronized( m_commonMutex )
351             {
352                 ++(cast()m_numQueuedReaders);
353                 scope(exit) --(cast()m_numQueuedReaders);
354 
355                 while (shouldQueueReader)
356                 {
357                     const timeElapsed = MonoTime.currTime - initialTime;
358                     if (timeElapsed >= timeout)
359                         return false;
360                     auto nextWait = timeout - timeElapsed;
361                     // Avoid problems calling wait(Duration) with huge arguments.
362                     enum maxWaitPerCall = dur!"hours"(24 * 365);
363                     m_readerQueue.wait(nextWait < maxWaitPerCall ? nextWait : maxWaitPerCall);
364                 }
365                 ++(cast()m_numActiveReaders);
366                 return true;
367             }
368         }
369 
370 
371     private:
372         @property bool shouldQueueReader(this Q)() nothrow @safe @nogc
373             if (is(Q == Reader) || is(Q == shared Reader))
374         {
375             if ( m_numActiveWriters > 0 )
376                 return true;
377 
378             switch ( m_policy )
379             {
380             case Policy.PREFER_WRITERS:
381                  return m_numQueuedWriters > 0;
382 
383             case Policy.PREFER_READERS:
384             default:
385                  break;
386             }
387 
388         return false;
389         }
390 
391         struct MonitorProxy
392         {
393             Object.Monitor link;
394         }
395 
396         MonitorProxy    m_proxy;
397     }
398 
399 
400     ////////////////////////////////////////////////////////////////////////////
401     // Writer
402     ////////////////////////////////////////////////////////////////////////////
403 
404 
405     /**
406      * This class can be considered a mutex in its own right, and is used to
407      * negotiate a write lock for the enclosing mutex.
408      */
409     class Writer :
410         Object.Monitor
411     {
412         /**
413          * Initializes a read/write mutex writer proxy object.
414          */
415         this(this Q)() @trusted nothrow
416             if (is(Q == Writer) || is(Q == shared Writer))
417         {
418             m_proxy.link = this;
419             this.__monitor = cast(void*) &m_proxy;
420         }
421 
422 
423         /**
424          * Acquires a write lock on the enclosing mutex.
425          */
426         @trusted void lock()
427         {
428             synchronized( m_commonMutex )
429             {
430                 ++m_numQueuedWriters;
431                 scope(exit) --m_numQueuedWriters;
432 
433                 while ( shouldQueueWriter )
434                     m_writerQueue.wait();
435                 ++m_numActiveWriters;
436             }
437         }
438 
439         /// ditto
440         @trusted void lock() shared
441         {
442             synchronized( m_commonMutex )
443             {
444                 ++(cast()m_numQueuedWriters);
445                 scope(exit) --(cast()m_numQueuedWriters);
446 
447                 while ( shouldQueueWriter )
448                     m_writerQueue.wait();
449                 ++(cast()m_numActiveWriters);
450             }
451         }
452 
453 
454         /**
455          * Releases a write lock on the enclosing mutex.
456          */
457         @trusted void unlock()
458         {
459             synchronized( m_commonMutex )
460             {
461                 if ( --m_numActiveWriters < 1 )
462                 {
463                     switch ( m_policy )
464                     {
465                     default:
466                     case Policy.PREFER_READERS:
467                         if ( m_numQueuedReaders > 0 )
468                             m_readerQueue.notifyAll();
469                         else if ( m_numQueuedWriters > 0 )
470                             m_writerQueue.notify();
471                         break;
472                     case Policy.PREFER_WRITERS:
473                         if ( m_numQueuedWriters > 0 )
474                             m_writerQueue.notify();
475                         else if ( m_numQueuedReaders > 0 )
476                             m_readerQueue.notifyAll();
477                     }
478                 }
479             }
480         }
481 
482         /// ditto
483         @trusted void unlock() shared
484         {
485             synchronized( m_commonMutex )
486             {
487                 if ( --(cast()m_numActiveWriters) < 1 )
488                 {
489                     switch ( m_policy )
490                     {
491                     default:
492                     case Policy.PREFER_READERS:
493                         if ( m_numQueuedReaders > 0 )
494                             m_readerQueue.notifyAll();
495                         else if ( m_numQueuedWriters > 0 )
496                             m_writerQueue.notify();
497                         break;
498                     case Policy.PREFER_WRITERS:
499                         if ( m_numQueuedWriters > 0 )
500                             m_writerQueue.notify();
501                         else if ( m_numQueuedReaders > 0 )
502                             m_readerQueue.notifyAll();
503                     }
504                 }
505             }
506         }
507 
508 
509         /**
510          * Attempts to acquire a write lock on the enclosing mutex.  If one can
511          * be obtained without blocking, the lock is acquired and true is
512          * returned.  If not, the lock is not acquired and false is returned.
513          *
514          * Returns:
515          *  true if the lock was acquired and false if not.
516          */
517         @trusted bool tryLock()
518         {
519             synchronized( m_commonMutex )
520             {
521                 if ( shouldQueueWriter )
522                     return false;
523                 ++m_numActiveWriters;
524                 return true;
525             }
526         }
527 
528         /// ditto
529         @trusted bool tryLock() shared
530         {
531             synchronized( m_commonMutex )
532             {
533                 if ( shouldQueueWriter )
534                     return false;
535                 ++(cast()m_numActiveWriters);
536                 return true;
537             }
538         }
539 
540         /**
541          * Attempts to acquire a write lock on the enclosing mutex. If one can
542          * be obtained without blocking, the lock is acquired and true is
543          * returned. If not, the function blocks until either the lock can be
544          * obtained or the time elapsed exceeds $(D_PARAM timeout), returning
545          * true if the lock was acquired and false if the function timed out.
546          *
547          * Params:
548          *  timeout = maximum amount of time to wait for the lock
549          * Returns:
550          *  true if the lock was acquired and false if not.
551          */
552         @trusted bool tryLock(Duration timeout)
553         {
554             synchronized( m_commonMutex )
555             {
556                 if (!shouldQueueWriter)
557                 {
558                     ++m_numActiveWriters;
559                     return true;
560                 }
561 
562                 enum zero = Duration.zero();
563                 if (timeout <= zero)
564                     return false;
565 
566                 ++m_numQueuedWriters;
567                 scope(exit) --m_numQueuedWriters;
568 
569                 enum maxWaitPerCall = dur!"hours"(24 * 365); // Avoid problems calling wait with huge Duration.
570                 const initialTime = MonoTime.currTime;
571                 m_writerQueue.wait(timeout < maxWaitPerCall ? timeout : maxWaitPerCall);
572                 while (shouldQueueWriter)
573                 {
574                     const timeElapsed = MonoTime.currTime - initialTime;
575                     if (timeElapsed >= timeout)
576                         return false;
577                     auto nextWait = timeout - timeElapsed;
578                     m_writerQueue.wait(nextWait < maxWaitPerCall ? nextWait : maxWaitPerCall);
579                 }
580                 ++m_numActiveWriters;
581                 return true;
582             }
583         }
584 
585         /// ditto
586         @trusted bool tryLock(Duration timeout) shared
587         {
588             const initialTime = MonoTime.currTime;
589             synchronized( m_commonMutex )
590             {
591                 ++(cast()m_numQueuedWriters);
592                 scope(exit) --(cast()m_numQueuedWriters);
593 
594                 while (shouldQueueWriter)
595                 {
596                     const timeElapsed = MonoTime.currTime - initialTime;
597                     if (timeElapsed >= timeout)
598                         return false;
599                     auto nextWait = timeout - timeElapsed;
600                     // Avoid problems calling wait(Duration) with huge arguments.
601                     enum maxWaitPerCall = dur!"hours"(24 * 365);
602                     m_writerQueue.wait(nextWait < maxWaitPerCall ? nextWait : maxWaitPerCall);
603                 }
604                 ++(cast()m_numActiveWriters);
605                 return true;
606             }
607         }
608 
609     private:
610         @property bool shouldQueueWriter(this Q)()
611             if (is(Q == Writer) || is(Q == shared Writer))
612         {
613             if ( m_numActiveWriters > 0 ||
614                 m_numActiveReaders > 0 )
615                 return true;
616             switch ( m_policy )
617             {
618             case Policy.PREFER_READERS:
619                 return m_numQueuedReaders > 0;
620 
621             case Policy.PREFER_WRITERS:
622             default:
623                  break;
624             }
625 
626         return false;
627         }
628 
629         struct MonitorProxy
630         {
631             Object.Monitor link;
632         }
633 
634         MonitorProxy    m_proxy;
635     }
636 
637 
638 private:
639     Policy      m_policy;
640     Reader      m_reader;
641     Writer      m_writer;
642 
643     Mutex       m_commonMutex;
644     Condition   m_readerQueue;
645     Condition   m_writerQueue;
646 
647     int         m_numQueuedReaders;
648     int         m_numActiveReaders;
649     int         m_numQueuedWriters;
650     int         m_numActiveWriters;
651 }
652 
653 
654 ////////////////////////////////////////////////////////////////////////////////
655 // Unit Tests
656 ////////////////////////////////////////////////////////////////////////////////
657 
658 
659 unittest
660 {
661     import core.atomic, core.thread, core.sync.semaphore;
662 
663     static void runTest(ReadWriteMutex.Policy policy)
664     {
665         scope mutex = new ReadWriteMutex(policy);
666         scope rdSemA = new Semaphore, rdSemB = new Semaphore,
667               wrSemA = new Semaphore, wrSemB = new Semaphore;
668         shared size_t numReaders, numWriters;
669 
670         void readerFn()
671         {
672             synchronized (mutex.reader)
673             {
674                 atomicOp!"+="(numReaders, 1);
675                 rdSemA.notify();
676                 rdSemB.wait();
677                 atomicOp!"-="(numReaders, 1);
678             }
679         }
680 
681         void writerFn()
682         {
683             synchronized (mutex.writer)
684             {
685                 atomicOp!"+="(numWriters, 1);
686                 wrSemA.notify();
687                 wrSemB.wait();
688                 atomicOp!"-="(numWriters, 1);
689             }
690         }
691 
692         void waitQueued(size_t queuedReaders, size_t queuedWriters)
693         {
694             for (;;)
695             {
696                 synchronized (mutex.m_commonMutex)
697                 {
698                     if (mutex.m_numQueuedReaders == queuedReaders &&
699                         mutex.m_numQueuedWriters == queuedWriters)
700                         break;
701                 }
702                 Thread.yield();
703             }
704         }
705 
706         scope group = new ThreadGroup;
707 
708         // 2 simultaneous readers
709         group.create(&readerFn); group.create(&readerFn);
710         rdSemA.wait(); rdSemA.wait();
711         assert(numReaders == 2);
712         rdSemB.notify(); rdSemB.notify();
713         group.joinAll();
714         assert(numReaders == 0);
715         foreach (t; group) group.remove(t);
716 
717         // 1 writer at a time
718         group.create(&writerFn); group.create(&writerFn);
719         wrSemA.wait();
720         assert(!wrSemA.tryWait());
721         assert(numWriters == 1);
722         wrSemB.notify();
723         wrSemA.wait();
724         assert(numWriters == 1);
725         wrSemB.notify();
726         group.joinAll();
727         assert(numWriters == 0);
728         foreach (t; group) group.remove(t);
729 
730         // reader and writer are mutually exclusive
731         group.create(&readerFn);
732         rdSemA.wait();
733         group.create(&writerFn);
734         waitQueued(0, 1);
735         assert(!wrSemA.tryWait());
736         assert(numReaders == 1 && numWriters == 0);
737         rdSemB.notify();
738         wrSemA.wait();
739         assert(numReaders == 0 && numWriters == 1);
740         wrSemB.notify();
741         group.joinAll();
742         assert(numReaders == 0 && numWriters == 0);
743         foreach (t; group) group.remove(t);
744 
745         // writer and reader are mutually exclusive
746         group.create(&writerFn);
747         wrSemA.wait();
748         group.create(&readerFn);
749         waitQueued(1, 0);
750         assert(!rdSemA.tryWait());
751         assert(numReaders == 0 && numWriters == 1);
752         wrSemB.notify();
753         rdSemA.wait();
754         assert(numReaders == 1 && numWriters == 0);
755         rdSemB.notify();
756         group.joinAll();
757         assert(numReaders == 0 && numWriters == 0);
758         foreach (t; group) group.remove(t);
759 
760         // policy determines whether queued reader or writers progress first
761         group.create(&writerFn);
762         wrSemA.wait();
763         group.create(&readerFn);
764         group.create(&writerFn);
765         waitQueued(1, 1);
766         assert(numReaders == 0 && numWriters == 1);
767         wrSemB.notify();
768 
769         if (policy == ReadWriteMutex.Policy.PREFER_READERS)
770         {
771             rdSemA.wait();
772             assert(numReaders == 1 && numWriters == 0);
773             rdSemB.notify();
774             wrSemA.wait();
775             assert(numReaders == 0 && numWriters == 1);
776             wrSemB.notify();
777         }
778         else if (policy == ReadWriteMutex.Policy.PREFER_WRITERS)
779         {
780             wrSemA.wait();
781             assert(numReaders == 0 && numWriters == 1);
782             wrSemB.notify();
783             rdSemA.wait();
784             assert(numReaders == 1 && numWriters == 0);
785             rdSemB.notify();
786         }
787         group.joinAll();
788         assert(numReaders == 0 && numWriters == 0);
789         foreach (t; group) group.remove(t);
790     }
791     runTest(ReadWriteMutex.Policy.PREFER_READERS);
792     runTest(ReadWriteMutex.Policy.PREFER_WRITERS);
793 }
794 
795 unittest
796 {
797     import core.atomic, core.thread;
798     __gshared ReadWriteMutex rwmutex;
799     shared static bool threadTriedOnceToGetLock;
800     shared static bool threadFinallyGotLock;
801 
802     rwmutex = new ReadWriteMutex();
803     atomicFence;
804     const maxTimeAllowedForTest = dur!"seconds"(20);
805     // Test ReadWriteMutex.Reader.tryLock(Duration).
806     {
807         static void testReaderTryLock()
808         {
809             assert(!rwmutex.reader.tryLock(Duration.min));
810             threadTriedOnceToGetLock.atomicStore(true);
811             assert(rwmutex.reader.tryLock(Duration.max));
812             threadFinallyGotLock.atomicStore(true);
813             rwmutex.reader.unlock;
814         }
815         assert(rwmutex.writer.tryLock(Duration.zero), "should have been able to obtain lock without blocking");
816         auto otherThread = new Thread(&testReaderTryLock).start;
817         const failIfThisTimeisReached = MonoTime.currTime + maxTimeAllowedForTest;
818         Thread.yield;
819         // We started otherThread with the writer lock held so otherThread's
820         // first rwlock.reader.tryLock with timeout Duration.min should fail.
821         while (!threadTriedOnceToGetLock.atomicLoad)
822         {
823             assert(MonoTime.currTime < failIfThisTimeisReached, "timed out");
824             Thread.yield;
825         }
826         rwmutex.writer.unlock;
827         // Soon after we release the writer lock otherThread's second
828         // rwlock.reader.tryLock with timeout Duration.max should succeed.
829         while (!threadFinallyGotLock.atomicLoad)
830         {
831             assert(MonoTime.currTime < failIfThisTimeisReached, "timed out");
832             Thread.yield;
833         }
834         otherThread.join;
835     }
836     threadTriedOnceToGetLock.atomicStore(false); // Reset.
837     threadFinallyGotLock.atomicStore(false); // Reset.
838     // Test ReadWriteMutex.Writer.tryLock(Duration).
839     {
840         static void testWriterTryLock()
841         {
842             assert(!rwmutex.writer.tryLock(Duration.min));
843             threadTriedOnceToGetLock.atomicStore(true);
844             assert(rwmutex.writer.tryLock(Duration.max));
845             threadFinallyGotLock.atomicStore(true);
846             rwmutex.writer.unlock;
847         }
848         assert(rwmutex.reader.tryLock(Duration.zero), "should have been able to obtain lock without blocking");
849         auto otherThread = new Thread(&testWriterTryLock).start;
850         const failIfThisTimeisReached = MonoTime.currTime + maxTimeAllowedForTest;
851         Thread.yield;
852         // We started otherThread with the reader lock held so otherThread's
853         // first rwlock.writer.tryLock with timeout Duration.min should fail.
854         while (!threadTriedOnceToGetLock.atomicLoad)
855         {
856             assert(MonoTime.currTime < failIfThisTimeisReached, "timed out");
857             Thread.yield;
858         }
859         rwmutex.reader.unlock;
860         // Soon after we release the reader lock otherThread's second
861         // rwlock.writer.tryLock with timeout Duration.max should succeed.
862         while (!threadFinallyGotLock.atomicLoad)
863         {
864             assert(MonoTime.currTime < failIfThisTimeisReached, "timed out");
865             Thread.yield;
866         }
867         otherThread.join;
868     }
869 }
870 
871 unittest
872 {
873     import core.atomic, core.thread, core.sync.semaphore;
874 
875     static void runTest(ReadWriteMutex.Policy policy)
876     {
877         shared scope mutex = new shared ReadWriteMutex(policy);
878         scope rdSemA = new Semaphore, rdSemB = new Semaphore,
879               wrSemA = new Semaphore, wrSemB = new Semaphore;
880         shared size_t numReaders, numWriters;
881 
882         void readerFn()
883         {
884             synchronized (mutex.reader)
885             {
886                 atomicOp!"+="(numReaders, 1);
887                 rdSemA.notify();
888                 rdSemB.wait();
889                 atomicOp!"-="(numReaders, 1);
890             }
891         }
892 
893         void writerFn()
894         {
895             synchronized (mutex.writer)
896             {
897                 atomicOp!"+="(numWriters, 1);
898                 wrSemA.notify();
899                 wrSemB.wait();
900                 atomicOp!"-="(numWriters, 1);
901             }
902         }
903 
904         void waitQueued(size_t queuedReaders, size_t queuedWriters)
905         {
906             for (;;)
907             {
908                 synchronized (mutex.m_commonMutex)
909                 {
910                     if (mutex.m_numQueuedReaders == queuedReaders &&
911                         mutex.m_numQueuedWriters == queuedWriters)
912                         break;
913                 }
914                 Thread.yield();
915             }
916         }
917 
918         scope group = new ThreadGroup;
919 
920         // 2 simultaneous readers
921         group.create(&readerFn); group.create(&readerFn);
922         rdSemA.wait(); rdSemA.wait();
923         assert(numReaders == 2);
924         rdSemB.notify(); rdSemB.notify();
925         group.joinAll();
926         assert(numReaders == 0);
927         foreach (t; group) group.remove(t);
928 
929         // 1 writer at a time
930         group.create(&writerFn); group.create(&writerFn);
931         wrSemA.wait();
932         assert(!wrSemA.tryWait());
933         assert(numWriters == 1);
934         wrSemB.notify();
935         wrSemA.wait();
936         assert(numWriters == 1);
937         wrSemB.notify();
938         group.joinAll();
939         assert(numWriters == 0);
940         foreach (t; group) group.remove(t);
941 
942         // reader and writer are mutually exclusive
943         group.create(&readerFn);
944         rdSemA.wait();
945         group.create(&writerFn);
946         waitQueued(0, 1);
947         assert(!wrSemA.tryWait());
948         assert(numReaders == 1 && numWriters == 0);
949         rdSemB.notify();
950         wrSemA.wait();
951         assert(numReaders == 0 && numWriters == 1);
952         wrSemB.notify();
953         group.joinAll();
954         assert(numReaders == 0 && numWriters == 0);
955         foreach (t; group) group.remove(t);
956 
957         // writer and reader are mutually exclusive
958         group.create(&writerFn);
959         wrSemA.wait();
960         group.create(&readerFn);
961         waitQueued(1, 0);
962         assert(!rdSemA.tryWait());
963         assert(numReaders == 0 && numWriters == 1);
964         wrSemB.notify();
965         rdSemA.wait();
966         assert(numReaders == 1 && numWriters == 0);
967         rdSemB.notify();
968         group.joinAll();
969         assert(numReaders == 0 && numWriters == 0);
970         foreach (t; group) group.remove(t);
971 
972         // policy determines whether queued reader or writers progress first
973         group.create(&writerFn);
974         wrSemA.wait();
975         group.create(&readerFn);
976         group.create(&writerFn);
977         waitQueued(1, 1);
978         assert(numReaders == 0 && numWriters == 1);
979         wrSemB.notify();
980 
981         if (policy == ReadWriteMutex.Policy.PREFER_READERS)
982         {
983             rdSemA.wait();
984             assert(numReaders == 1 && numWriters == 0);
985             rdSemB.notify();
986             wrSemA.wait();
987             assert(numReaders == 0 && numWriters == 1);
988             wrSemB.notify();
989         }
990         else if (policy == ReadWriteMutex.Policy.PREFER_WRITERS)
991         {
992             wrSemA.wait();
993             assert(numReaders == 0 && numWriters == 1);
994             wrSemB.notify();
995             rdSemA.wait();
996             assert(numReaders == 1 && numWriters == 0);
997             rdSemB.notify();
998         }
999         group.joinAll();
1000         assert(numReaders == 0 && numWriters == 0);
1001         foreach (t; group) group.remove(t);
1002     }
1003     runTest(ReadWriteMutex.Policy.PREFER_READERS);
1004     runTest(ReadWriteMutex.Policy.PREFER_WRITERS);
1005 }
1006 
1007 unittest
1008 {
1009     import core.atomic, core.thread;
1010     shared static ReadWriteMutex rwmutex;
1011     shared static bool threadTriedOnceToGetLock;
1012     shared static bool threadFinallyGotLock;
1013 
1014     rwmutex = new shared ReadWriteMutex();
1015     atomicFence;
1016     const maxTimeAllowedForTest = dur!"seconds"(20);
1017     // Test ReadWriteMutex.Reader.tryLock(Duration).
1018     {
1019         static void testReaderTryLock()
1020         {
1021             assert(!rwmutex.reader.tryLock(Duration.min));
1022             threadTriedOnceToGetLock.atomicStore(true);
1023             assert(rwmutex.reader.tryLock(Duration.max));
1024             threadFinallyGotLock.atomicStore(true);
1025             rwmutex.reader.unlock;
1026         }
1027         assert(rwmutex.writer.tryLock(Duration.zero), "should have been able to obtain lock without blocking");
1028         auto otherThread = new Thread(&testReaderTryLock).start;
1029         const failIfThisTimeisReached = MonoTime.currTime + maxTimeAllowedForTest;
1030         Thread.yield;
1031         // We started otherThread with the writer lock held so otherThread's
1032         // first rwlock.reader.tryLock with timeout Duration.min should fail.
1033         while (!threadTriedOnceToGetLock.atomicLoad)
1034         {
1035             assert(MonoTime.currTime < failIfThisTimeisReached, "timed out");
1036             Thread.yield;
1037         }
1038         rwmutex.writer.unlock;
1039         // Soon after we release the writer lock otherThread's second
1040         // rwlock.reader.tryLock with timeout Duration.max should succeed.
1041         while (!threadFinallyGotLock.atomicLoad)
1042         {
1043             assert(MonoTime.currTime < failIfThisTimeisReached, "timed out");
1044             Thread.yield;
1045         }
1046         otherThread.join;
1047     }
1048     threadTriedOnceToGetLock.atomicStore(false); // Reset.
1049     threadFinallyGotLock.atomicStore(false); // Reset.
1050     // Test ReadWriteMutex.Writer.tryLock(Duration).
1051     {
1052         static void testWriterTryLock()
1053         {
1054             assert(!rwmutex.writer.tryLock(Duration.min));
1055             threadTriedOnceToGetLock.atomicStore(true);
1056             assert(rwmutex.writer.tryLock(Duration.max));
1057             threadFinallyGotLock.atomicStore(true);
1058             rwmutex.writer.unlock;
1059         }
1060         assert(rwmutex.reader.tryLock(Duration.zero), "should have been able to obtain lock without blocking");
1061         auto otherThread = new Thread(&testWriterTryLock).start;
1062         const failIfThisTimeisReached = MonoTime.currTime + maxTimeAllowedForTest;
1063         Thread.yield;
1064         // We started otherThread with the reader lock held so otherThread's
1065         // first rwlock.writer.tryLock with timeout Duration.min should fail.
1066         while (!threadTriedOnceToGetLock.atomicLoad)
1067         {
1068             assert(MonoTime.currTime < failIfThisTimeisReached, "timed out");
1069             Thread.yield;
1070         }
1071         rwmutex.reader.unlock;
1072         // Soon after we release the reader lock otherThread's second
1073         // rwlock.writer.tryLock with timeout Duration.max should succeed.
1074         while (!threadFinallyGotLock.atomicLoad)
1075         {
1076             assert(MonoTime.currTime < failIfThisTimeisReached, "timed out");
1077             Thread.yield;
1078         }
1079         otherThread.join;
1080     }
1081 }