1 /** 2 * The read/write mutex module provides a primitive for maintaining shared read 3 * access and mutually exclusive write access. 4 * 5 * Copyright: Copyright Sean Kelly 2005 - 2009. 6 * License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost License 1.0) 7 * Authors: Sean Kelly 8 * Source: $(DRUNTIMESRC core/sync/_rwmutex.d) 9 */ 10 11 /* Copyright Sean Kelly 2005 - 2009. 12 * Distributed under the Boost Software License, Version 1.0. 13 * (See accompanying file LICENSE or copy at 14 * http://www.boost.org/LICENSE_1_0.txt) 15 */ 16 module core.sync.rwmutex; 17 18 19 public import core.sync.exception; 20 import core.sync.condition; 21 import core.sync.mutex; 22 import core.memory; 23 24 version (Posix) 25 { 26 import core.sys.posix.pthread; 27 } 28 29 30 //////////////////////////////////////////////////////////////////////////////// 31 // ReadWriteMutex 32 // 33 // Reader reader(); 34 // Writer writer(); 35 //////////////////////////////////////////////////////////////////////////////// 36 37 38 /** 39 * This class represents a mutex that allows any number of readers to enter, 40 * but when a writer enters, all other readers and writers are blocked. 41 * 42 * Please note that this mutex is not recursive and is intended to guard access 43 * to data only. Also, no deadlock checking is in place because doing so would 44 * require dynamic memory allocation, which would reduce performance by an 45 * unacceptable amount. As a result, any attempt to recursively acquire this 46 * mutex may well deadlock the caller, particularly if a write lock is acquired 47 * while holding a read lock, or vice-versa. In practice, this should not be 48 * an issue however, because it is uncommon to call deeply into unknown code 49 * while holding a lock that simply protects data. 50 */ 51 class ReadWriteMutex 52 { 53 /** 54 * Defines the policy used by this mutex. Currently, two policies are 55 * defined. 56 * 57 * The first will queue writers until no readers hold the mutex, then 58 * pass the writers through one at a time. If a reader acquires the mutex 59 * while there are still writers queued, the reader will take precedence. 60 * 61 * The second will queue readers if there are any writers queued. Writers 62 * are passed through one at a time, and once there are no writers present, 63 * all queued readers will be alerted. 64 * 65 * Future policies may offer a more even balance between reader and writer 66 * precedence. 67 */ 68 enum Policy 69 { 70 PREFER_READERS, /// Readers get preference. This may starve writers. 71 PREFER_WRITERS /// Writers get preference. This may starve readers. 72 } 73 74 75 //////////////////////////////////////////////////////////////////////////// 76 // Initialization 77 //////////////////////////////////////////////////////////////////////////// 78 79 80 /** 81 * Initializes a read/write mutex object with the supplied policy. 82 * 83 * Params: 84 * policy = The policy to use. 85 * 86 * Throws: 87 * SyncError on error. 88 */ 89 this( Policy policy = Policy.PREFER_WRITERS ) @safe nothrow 90 { 91 m_commonMutex = new Mutex; 92 if ( !m_commonMutex ) 93 throw new SyncError( "Unable to initialize mutex" ); 94 95 m_readerQueue = new Condition( m_commonMutex ); 96 if ( !m_readerQueue ) 97 throw new SyncError( "Unable to initialize mutex" ); 98 99 m_writerQueue = new Condition( m_commonMutex ); 100 if ( !m_writerQueue ) 101 throw new SyncError( "Unable to initialize mutex" ); 102 103 m_policy = policy; 104 m_reader = new Reader; 105 m_writer = new Writer; 106 } 107 108 /// ditto 109 shared this( Policy policy = Policy.PREFER_WRITERS ) @safe nothrow 110 { 111 m_commonMutex = new shared Mutex; 112 if ( !m_commonMutex ) 113 throw new SyncError( "Unable to initialize mutex" ); 114 115 m_readerQueue = new shared Condition( m_commonMutex ); 116 if ( !m_readerQueue ) 117 throw new SyncError( "Unable to initialize mutex" ); 118 119 m_writerQueue = new shared Condition( m_commonMutex ); 120 if ( !m_writerQueue ) 121 throw new SyncError( "Unable to initialize mutex" ); 122 123 m_policy = policy; 124 m_reader = new shared Reader; 125 m_writer = new shared Writer; 126 } 127 128 //////////////////////////////////////////////////////////////////////////// 129 // General Properties 130 //////////////////////////////////////////////////////////////////////////// 131 132 133 /** 134 * Gets the policy used by this mutex. 135 * 136 * Returns: 137 * The policy used by this mutex. 138 */ 139 @property Policy policy() @safe nothrow 140 { 141 return m_policy; 142 } 143 144 ///ditto 145 @property Policy policy() shared @safe nothrow 146 { 147 return m_policy; 148 } 149 150 //////////////////////////////////////////////////////////////////////////// 151 // Reader/Writer Handles 152 //////////////////////////////////////////////////////////////////////////// 153 154 155 /** 156 * Gets an object representing the reader lock for the associated mutex. 157 * 158 * Returns: 159 * A reader sub-mutex. 160 */ 161 @property Reader reader() @safe nothrow 162 { 163 return m_reader; 164 } 165 166 ///ditto 167 @property shared(Reader) reader() shared @safe nothrow 168 { 169 return m_reader; 170 } 171 172 /** 173 * Gets an object representing the writer lock for the associated mutex. 174 * 175 * Returns: 176 * A writer sub-mutex. 177 */ 178 @property Writer writer() @safe nothrow 179 { 180 return m_writer; 181 } 182 183 ///ditto 184 @property shared(Writer) writer() shared @safe nothrow 185 { 186 return m_writer; 187 } 188 189 190 //////////////////////////////////////////////////////////////////////////// 191 // Reader 192 //////////////////////////////////////////////////////////////////////////// 193 194 195 /** 196 * This class can be considered a mutex in its own right, and is used to 197 * negotiate a read lock for the enclosing mutex. 198 */ 199 class Reader : 200 Object.Monitor 201 { 202 /** 203 * Initializes a read/write mutex reader proxy object. 204 */ 205 this(this Q)() @trusted nothrow 206 if (is(Q == Reader) || is(Q == shared Reader)) 207 { 208 m_proxy.link = this; 209 this.__monitor = cast(void*) &m_proxy; 210 } 211 212 /** 213 * Acquires a read lock on the enclosing mutex. 214 */ 215 @trusted void lock() 216 { 217 synchronized( m_commonMutex ) 218 { 219 ++m_numQueuedReaders; 220 scope(exit) --m_numQueuedReaders; 221 222 while ( shouldQueueReader ) 223 m_readerQueue.wait(); 224 ++m_numActiveReaders; 225 } 226 } 227 228 /// ditto 229 @trusted void lock() shared 230 { 231 synchronized( m_commonMutex ) 232 { 233 ++(cast()m_numQueuedReaders); 234 scope(exit) --(cast()m_numQueuedReaders); 235 236 while ( shouldQueueReader ) 237 m_readerQueue.wait(); 238 ++(cast()m_numActiveReaders); 239 } 240 } 241 242 /** 243 * Releases a read lock on the enclosing mutex. 244 */ 245 @trusted void unlock() 246 { 247 synchronized( m_commonMutex ) 248 { 249 if ( --m_numActiveReaders < 1 ) 250 { 251 if ( m_numQueuedWriters > 0 ) 252 m_writerQueue.notify(); 253 } 254 } 255 } 256 257 /// ditto 258 @trusted void unlock() shared 259 { 260 synchronized( m_commonMutex ) 261 { 262 if ( --(cast()m_numActiveReaders) < 1 ) 263 { 264 if ( m_numQueuedWriters > 0 ) 265 m_writerQueue.notify(); 266 } 267 } 268 } 269 270 /** 271 * Attempts to acquire a read lock on the enclosing mutex. If one can 272 * be obtained without blocking, the lock is acquired and true is 273 * returned. If not, the lock is not acquired and false is returned. 274 * 275 * Returns: 276 * true if the lock was acquired and false if not. 277 */ 278 @trusted bool tryLock() 279 { 280 synchronized( m_commonMutex ) 281 { 282 if ( shouldQueueReader ) 283 return false; 284 ++m_numActiveReaders; 285 return true; 286 } 287 } 288 289 /// ditto 290 @trusted bool tryLock() shared 291 { 292 synchronized( m_commonMutex ) 293 { 294 if ( shouldQueueReader ) 295 return false; 296 ++(cast()m_numActiveReaders); 297 return true; 298 } 299 } 300 301 /** 302 * Attempts to acquire a read lock on the enclosing mutex. If one can 303 * be obtained without blocking, the lock is acquired and true is 304 * returned. If not, the function blocks until either the lock can be 305 * obtained or the time elapsed exceeds $(D_PARAM timeout), returning 306 * true if the lock was acquired and false if the function timed out. 307 * 308 * Params: 309 * timeout = maximum amount of time to wait for the lock 310 * Returns: 311 * true if the lock was acquired and false if not. 312 */ 313 @trusted bool tryLock(Duration timeout) 314 { 315 synchronized( m_commonMutex ) 316 { 317 if (!shouldQueueReader) 318 { 319 ++m_numActiveReaders; 320 return true; 321 } 322 323 enum zero = Duration.zero(); 324 if (timeout <= zero) 325 return false; 326 327 ++m_numQueuedReaders; 328 scope(exit) --m_numQueuedReaders; 329 330 enum maxWaitPerCall = dur!"hours"(24 * 365); // Avoid problems calling wait with huge Duration. 331 const initialTime = MonoTime.currTime; 332 m_readerQueue.wait(timeout < maxWaitPerCall ? timeout : maxWaitPerCall); 333 while (shouldQueueReader) 334 { 335 const timeElapsed = MonoTime.currTime - initialTime; 336 if (timeElapsed >= timeout) 337 return false; 338 auto nextWait = timeout - timeElapsed; 339 m_readerQueue.wait(nextWait < maxWaitPerCall ? nextWait : maxWaitPerCall); 340 } 341 ++m_numActiveReaders; 342 return true; 343 } 344 } 345 346 /// ditto 347 @trusted bool tryLock(Duration timeout) shared 348 { 349 const initialTime = MonoTime.currTime; 350 synchronized( m_commonMutex ) 351 { 352 ++(cast()m_numQueuedReaders); 353 scope(exit) --(cast()m_numQueuedReaders); 354 355 while (shouldQueueReader) 356 { 357 const timeElapsed = MonoTime.currTime - initialTime; 358 if (timeElapsed >= timeout) 359 return false; 360 auto nextWait = timeout - timeElapsed; 361 // Avoid problems calling wait(Duration) with huge arguments. 362 enum maxWaitPerCall = dur!"hours"(24 * 365); 363 m_readerQueue.wait(nextWait < maxWaitPerCall ? nextWait : maxWaitPerCall); 364 } 365 ++(cast()m_numActiveReaders); 366 return true; 367 } 368 } 369 370 371 private: 372 @property bool shouldQueueReader(this Q)() nothrow @safe @nogc 373 if (is(Q == Reader) || is(Q == shared Reader)) 374 { 375 if ( m_numActiveWriters > 0 ) 376 return true; 377 378 switch ( m_policy ) 379 { 380 case Policy.PREFER_WRITERS: 381 return m_numQueuedWriters > 0; 382 383 case Policy.PREFER_READERS: 384 default: 385 break; 386 } 387 388 return false; 389 } 390 391 struct MonitorProxy 392 { 393 Object.Monitor link; 394 } 395 396 MonitorProxy m_proxy; 397 } 398 399 400 //////////////////////////////////////////////////////////////////////////// 401 // Writer 402 //////////////////////////////////////////////////////////////////////////// 403 404 405 /** 406 * This class can be considered a mutex in its own right, and is used to 407 * negotiate a write lock for the enclosing mutex. 408 */ 409 class Writer : 410 Object.Monitor 411 { 412 /** 413 * Initializes a read/write mutex writer proxy object. 414 */ 415 this(this Q)() @trusted nothrow 416 if (is(Q == Writer) || is(Q == shared Writer)) 417 { 418 m_proxy.link = this; 419 this.__monitor = cast(void*) &m_proxy; 420 } 421 422 423 /** 424 * Acquires a write lock on the enclosing mutex. 425 */ 426 @trusted void lock() 427 { 428 synchronized( m_commonMutex ) 429 { 430 ++m_numQueuedWriters; 431 scope(exit) --m_numQueuedWriters; 432 433 while ( shouldQueueWriter ) 434 m_writerQueue.wait(); 435 ++m_numActiveWriters; 436 } 437 } 438 439 /// ditto 440 @trusted void lock() shared 441 { 442 synchronized( m_commonMutex ) 443 { 444 ++(cast()m_numQueuedWriters); 445 scope(exit) --(cast()m_numQueuedWriters); 446 447 while ( shouldQueueWriter ) 448 m_writerQueue.wait(); 449 ++(cast()m_numActiveWriters); 450 } 451 } 452 453 454 /** 455 * Releases a write lock on the enclosing mutex. 456 */ 457 @trusted void unlock() 458 { 459 synchronized( m_commonMutex ) 460 { 461 if ( --m_numActiveWriters < 1 ) 462 { 463 switch ( m_policy ) 464 { 465 default: 466 case Policy.PREFER_READERS: 467 if ( m_numQueuedReaders > 0 ) 468 m_readerQueue.notifyAll(); 469 else if ( m_numQueuedWriters > 0 ) 470 m_writerQueue.notify(); 471 break; 472 case Policy.PREFER_WRITERS: 473 if ( m_numQueuedWriters > 0 ) 474 m_writerQueue.notify(); 475 else if ( m_numQueuedReaders > 0 ) 476 m_readerQueue.notifyAll(); 477 } 478 } 479 } 480 } 481 482 /// ditto 483 @trusted void unlock() shared 484 { 485 synchronized( m_commonMutex ) 486 { 487 if ( --(cast()m_numActiveWriters) < 1 ) 488 { 489 switch ( m_policy ) 490 { 491 default: 492 case Policy.PREFER_READERS: 493 if ( m_numQueuedReaders > 0 ) 494 m_readerQueue.notifyAll(); 495 else if ( m_numQueuedWriters > 0 ) 496 m_writerQueue.notify(); 497 break; 498 case Policy.PREFER_WRITERS: 499 if ( m_numQueuedWriters > 0 ) 500 m_writerQueue.notify(); 501 else if ( m_numQueuedReaders > 0 ) 502 m_readerQueue.notifyAll(); 503 } 504 } 505 } 506 } 507 508 509 /** 510 * Attempts to acquire a write lock on the enclosing mutex. If one can 511 * be obtained without blocking, the lock is acquired and true is 512 * returned. If not, the lock is not acquired and false is returned. 513 * 514 * Returns: 515 * true if the lock was acquired and false if not. 516 */ 517 @trusted bool tryLock() 518 { 519 synchronized( m_commonMutex ) 520 { 521 if ( shouldQueueWriter ) 522 return false; 523 ++m_numActiveWriters; 524 return true; 525 } 526 } 527 528 /// ditto 529 @trusted bool tryLock() shared 530 { 531 synchronized( m_commonMutex ) 532 { 533 if ( shouldQueueWriter ) 534 return false; 535 ++(cast()m_numActiveWriters); 536 return true; 537 } 538 } 539 540 /** 541 * Attempts to acquire a write lock on the enclosing mutex. If one can 542 * be obtained without blocking, the lock is acquired and true is 543 * returned. If not, the function blocks until either the lock can be 544 * obtained or the time elapsed exceeds $(D_PARAM timeout), returning 545 * true if the lock was acquired and false if the function timed out. 546 * 547 * Params: 548 * timeout = maximum amount of time to wait for the lock 549 * Returns: 550 * true if the lock was acquired and false if not. 551 */ 552 @trusted bool tryLock(Duration timeout) 553 { 554 synchronized( m_commonMutex ) 555 { 556 if (!shouldQueueWriter) 557 { 558 ++m_numActiveWriters; 559 return true; 560 } 561 562 enum zero = Duration.zero(); 563 if (timeout <= zero) 564 return false; 565 566 ++m_numQueuedWriters; 567 scope(exit) --m_numQueuedWriters; 568 569 enum maxWaitPerCall = dur!"hours"(24 * 365); // Avoid problems calling wait with huge Duration. 570 const initialTime = MonoTime.currTime; 571 m_writerQueue.wait(timeout < maxWaitPerCall ? timeout : maxWaitPerCall); 572 while (shouldQueueWriter) 573 { 574 const timeElapsed = MonoTime.currTime - initialTime; 575 if (timeElapsed >= timeout) 576 return false; 577 auto nextWait = timeout - timeElapsed; 578 m_writerQueue.wait(nextWait < maxWaitPerCall ? nextWait : maxWaitPerCall); 579 } 580 ++m_numActiveWriters; 581 return true; 582 } 583 } 584 585 /// ditto 586 @trusted bool tryLock(Duration timeout) shared 587 { 588 const initialTime = MonoTime.currTime; 589 synchronized( m_commonMutex ) 590 { 591 ++(cast()m_numQueuedWriters); 592 scope(exit) --(cast()m_numQueuedWriters); 593 594 while (shouldQueueWriter) 595 { 596 const timeElapsed = MonoTime.currTime - initialTime; 597 if (timeElapsed >= timeout) 598 return false; 599 auto nextWait = timeout - timeElapsed; 600 // Avoid problems calling wait(Duration) with huge arguments. 601 enum maxWaitPerCall = dur!"hours"(24 * 365); 602 m_writerQueue.wait(nextWait < maxWaitPerCall ? nextWait : maxWaitPerCall); 603 } 604 ++(cast()m_numActiveWriters); 605 return true; 606 } 607 } 608 609 private: 610 @property bool shouldQueueWriter(this Q)() 611 if (is(Q == Writer) || is(Q == shared Writer)) 612 { 613 if ( m_numActiveWriters > 0 || 614 m_numActiveReaders > 0 ) 615 return true; 616 switch ( m_policy ) 617 { 618 case Policy.PREFER_READERS: 619 return m_numQueuedReaders > 0; 620 621 case Policy.PREFER_WRITERS: 622 default: 623 break; 624 } 625 626 return false; 627 } 628 629 struct MonitorProxy 630 { 631 Object.Monitor link; 632 } 633 634 MonitorProxy m_proxy; 635 } 636 637 638 private: 639 Policy m_policy; 640 Reader m_reader; 641 Writer m_writer; 642 643 Mutex m_commonMutex; 644 Condition m_readerQueue; 645 Condition m_writerQueue; 646 647 int m_numQueuedReaders; 648 int m_numActiveReaders; 649 int m_numQueuedWriters; 650 int m_numActiveWriters; 651 } 652 653 654 //////////////////////////////////////////////////////////////////////////////// 655 // Unit Tests 656 //////////////////////////////////////////////////////////////////////////////// 657 658 659 unittest 660 { 661 import core.atomic, core.thread, core.sync.semaphore; 662 663 static void runTest(ReadWriteMutex.Policy policy) 664 { 665 scope mutex = new ReadWriteMutex(policy); 666 scope rdSemA = new Semaphore, rdSemB = new Semaphore, 667 wrSemA = new Semaphore, wrSemB = new Semaphore; 668 shared size_t numReaders, numWriters; 669 670 void readerFn() 671 { 672 synchronized (mutex.reader) 673 { 674 atomicOp!"+="(numReaders, 1); 675 rdSemA.notify(); 676 rdSemB.wait(); 677 atomicOp!"-="(numReaders, 1); 678 } 679 } 680 681 void writerFn() 682 { 683 synchronized (mutex.writer) 684 { 685 atomicOp!"+="(numWriters, 1); 686 wrSemA.notify(); 687 wrSemB.wait(); 688 atomicOp!"-="(numWriters, 1); 689 } 690 } 691 692 void waitQueued(size_t queuedReaders, size_t queuedWriters) 693 { 694 for (;;) 695 { 696 synchronized (mutex.m_commonMutex) 697 { 698 if (mutex.m_numQueuedReaders == queuedReaders && 699 mutex.m_numQueuedWriters == queuedWriters) 700 break; 701 } 702 Thread.yield(); 703 } 704 } 705 706 scope group = new ThreadGroup; 707 708 // 2 simultaneous readers 709 group.create(&readerFn); group.create(&readerFn); 710 rdSemA.wait(); rdSemA.wait(); 711 assert(numReaders == 2); 712 rdSemB.notify(); rdSemB.notify(); 713 group.joinAll(); 714 assert(numReaders == 0); 715 foreach (t; group) group.remove(t); 716 717 // 1 writer at a time 718 group.create(&writerFn); group.create(&writerFn); 719 wrSemA.wait(); 720 assert(!wrSemA.tryWait()); 721 assert(numWriters == 1); 722 wrSemB.notify(); 723 wrSemA.wait(); 724 assert(numWriters == 1); 725 wrSemB.notify(); 726 group.joinAll(); 727 assert(numWriters == 0); 728 foreach (t; group) group.remove(t); 729 730 // reader and writer are mutually exclusive 731 group.create(&readerFn); 732 rdSemA.wait(); 733 group.create(&writerFn); 734 waitQueued(0, 1); 735 assert(!wrSemA.tryWait()); 736 assert(numReaders == 1 && numWriters == 0); 737 rdSemB.notify(); 738 wrSemA.wait(); 739 assert(numReaders == 0 && numWriters == 1); 740 wrSemB.notify(); 741 group.joinAll(); 742 assert(numReaders == 0 && numWriters == 0); 743 foreach (t; group) group.remove(t); 744 745 // writer and reader are mutually exclusive 746 group.create(&writerFn); 747 wrSemA.wait(); 748 group.create(&readerFn); 749 waitQueued(1, 0); 750 assert(!rdSemA.tryWait()); 751 assert(numReaders == 0 && numWriters == 1); 752 wrSemB.notify(); 753 rdSemA.wait(); 754 assert(numReaders == 1 && numWriters == 0); 755 rdSemB.notify(); 756 group.joinAll(); 757 assert(numReaders == 0 && numWriters == 0); 758 foreach (t; group) group.remove(t); 759 760 // policy determines whether queued reader or writers progress first 761 group.create(&writerFn); 762 wrSemA.wait(); 763 group.create(&readerFn); 764 group.create(&writerFn); 765 waitQueued(1, 1); 766 assert(numReaders == 0 && numWriters == 1); 767 wrSemB.notify(); 768 769 if (policy == ReadWriteMutex.Policy.PREFER_READERS) 770 { 771 rdSemA.wait(); 772 assert(numReaders == 1 && numWriters == 0); 773 rdSemB.notify(); 774 wrSemA.wait(); 775 assert(numReaders == 0 && numWriters == 1); 776 wrSemB.notify(); 777 } 778 else if (policy == ReadWriteMutex.Policy.PREFER_WRITERS) 779 { 780 wrSemA.wait(); 781 assert(numReaders == 0 && numWriters == 1); 782 wrSemB.notify(); 783 rdSemA.wait(); 784 assert(numReaders == 1 && numWriters == 0); 785 rdSemB.notify(); 786 } 787 group.joinAll(); 788 assert(numReaders == 0 && numWriters == 0); 789 foreach (t; group) group.remove(t); 790 } 791 runTest(ReadWriteMutex.Policy.PREFER_READERS); 792 runTest(ReadWriteMutex.Policy.PREFER_WRITERS); 793 } 794 795 unittest 796 { 797 import core.atomic, core.thread; 798 __gshared ReadWriteMutex rwmutex; 799 shared static bool threadTriedOnceToGetLock; 800 shared static bool threadFinallyGotLock; 801 802 rwmutex = new ReadWriteMutex(); 803 atomicFence; 804 const maxTimeAllowedForTest = dur!"seconds"(20); 805 // Test ReadWriteMutex.Reader.tryLock(Duration). 806 { 807 static void testReaderTryLock() 808 { 809 assert(!rwmutex.reader.tryLock(Duration.min)); 810 threadTriedOnceToGetLock.atomicStore(true); 811 assert(rwmutex.reader.tryLock(Duration.max)); 812 threadFinallyGotLock.atomicStore(true); 813 rwmutex.reader.unlock; 814 } 815 assert(rwmutex.writer.tryLock(Duration.zero), "should have been able to obtain lock without blocking"); 816 auto otherThread = new Thread(&testReaderTryLock).start; 817 const failIfThisTimeisReached = MonoTime.currTime + maxTimeAllowedForTest; 818 Thread.yield; 819 // We started otherThread with the writer lock held so otherThread's 820 // first rwlock.reader.tryLock with timeout Duration.min should fail. 821 while (!threadTriedOnceToGetLock.atomicLoad) 822 { 823 assert(MonoTime.currTime < failIfThisTimeisReached, "timed out"); 824 Thread.yield; 825 } 826 rwmutex.writer.unlock; 827 // Soon after we release the writer lock otherThread's second 828 // rwlock.reader.tryLock with timeout Duration.max should succeed. 829 while (!threadFinallyGotLock.atomicLoad) 830 { 831 assert(MonoTime.currTime < failIfThisTimeisReached, "timed out"); 832 Thread.yield; 833 } 834 otherThread.join; 835 } 836 threadTriedOnceToGetLock.atomicStore(false); // Reset. 837 threadFinallyGotLock.atomicStore(false); // Reset. 838 // Test ReadWriteMutex.Writer.tryLock(Duration). 839 { 840 static void testWriterTryLock() 841 { 842 assert(!rwmutex.writer.tryLock(Duration.min)); 843 threadTriedOnceToGetLock.atomicStore(true); 844 assert(rwmutex.writer.tryLock(Duration.max)); 845 threadFinallyGotLock.atomicStore(true); 846 rwmutex.writer.unlock; 847 } 848 assert(rwmutex.reader.tryLock(Duration.zero), "should have been able to obtain lock without blocking"); 849 auto otherThread = new Thread(&testWriterTryLock).start; 850 const failIfThisTimeisReached = MonoTime.currTime + maxTimeAllowedForTest; 851 Thread.yield; 852 // We started otherThread with the reader lock held so otherThread's 853 // first rwlock.writer.tryLock with timeout Duration.min should fail. 854 while (!threadTriedOnceToGetLock.atomicLoad) 855 { 856 assert(MonoTime.currTime < failIfThisTimeisReached, "timed out"); 857 Thread.yield; 858 } 859 rwmutex.reader.unlock; 860 // Soon after we release the reader lock otherThread's second 861 // rwlock.writer.tryLock with timeout Duration.max should succeed. 862 while (!threadFinallyGotLock.atomicLoad) 863 { 864 assert(MonoTime.currTime < failIfThisTimeisReached, "timed out"); 865 Thread.yield; 866 } 867 otherThread.join; 868 } 869 } 870 871 unittest 872 { 873 import core.atomic, core.thread, core.sync.semaphore; 874 875 static void runTest(ReadWriteMutex.Policy policy) 876 { 877 shared scope mutex = new shared ReadWriteMutex(policy); 878 scope rdSemA = new Semaphore, rdSemB = new Semaphore, 879 wrSemA = new Semaphore, wrSemB = new Semaphore; 880 shared size_t numReaders, numWriters; 881 882 void readerFn() 883 { 884 synchronized (mutex.reader) 885 { 886 atomicOp!"+="(numReaders, 1); 887 rdSemA.notify(); 888 rdSemB.wait(); 889 atomicOp!"-="(numReaders, 1); 890 } 891 } 892 893 void writerFn() 894 { 895 synchronized (mutex.writer) 896 { 897 atomicOp!"+="(numWriters, 1); 898 wrSemA.notify(); 899 wrSemB.wait(); 900 atomicOp!"-="(numWriters, 1); 901 } 902 } 903 904 void waitQueued(size_t queuedReaders, size_t queuedWriters) 905 { 906 for (;;) 907 { 908 synchronized (mutex.m_commonMutex) 909 { 910 if (mutex.m_numQueuedReaders == queuedReaders && 911 mutex.m_numQueuedWriters == queuedWriters) 912 break; 913 } 914 Thread.yield(); 915 } 916 } 917 918 scope group = new ThreadGroup; 919 920 // 2 simultaneous readers 921 group.create(&readerFn); group.create(&readerFn); 922 rdSemA.wait(); rdSemA.wait(); 923 assert(numReaders == 2); 924 rdSemB.notify(); rdSemB.notify(); 925 group.joinAll(); 926 assert(numReaders == 0); 927 foreach (t; group) group.remove(t); 928 929 // 1 writer at a time 930 group.create(&writerFn); group.create(&writerFn); 931 wrSemA.wait(); 932 assert(!wrSemA.tryWait()); 933 assert(numWriters == 1); 934 wrSemB.notify(); 935 wrSemA.wait(); 936 assert(numWriters == 1); 937 wrSemB.notify(); 938 group.joinAll(); 939 assert(numWriters == 0); 940 foreach (t; group) group.remove(t); 941 942 // reader and writer are mutually exclusive 943 group.create(&readerFn); 944 rdSemA.wait(); 945 group.create(&writerFn); 946 waitQueued(0, 1); 947 assert(!wrSemA.tryWait()); 948 assert(numReaders == 1 && numWriters == 0); 949 rdSemB.notify(); 950 wrSemA.wait(); 951 assert(numReaders == 0 && numWriters == 1); 952 wrSemB.notify(); 953 group.joinAll(); 954 assert(numReaders == 0 && numWriters == 0); 955 foreach (t; group) group.remove(t); 956 957 // writer and reader are mutually exclusive 958 group.create(&writerFn); 959 wrSemA.wait(); 960 group.create(&readerFn); 961 waitQueued(1, 0); 962 assert(!rdSemA.tryWait()); 963 assert(numReaders == 0 && numWriters == 1); 964 wrSemB.notify(); 965 rdSemA.wait(); 966 assert(numReaders == 1 && numWriters == 0); 967 rdSemB.notify(); 968 group.joinAll(); 969 assert(numReaders == 0 && numWriters == 0); 970 foreach (t; group) group.remove(t); 971 972 // policy determines whether queued reader or writers progress first 973 group.create(&writerFn); 974 wrSemA.wait(); 975 group.create(&readerFn); 976 group.create(&writerFn); 977 waitQueued(1, 1); 978 assert(numReaders == 0 && numWriters == 1); 979 wrSemB.notify(); 980 981 if (policy == ReadWriteMutex.Policy.PREFER_READERS) 982 { 983 rdSemA.wait(); 984 assert(numReaders == 1 && numWriters == 0); 985 rdSemB.notify(); 986 wrSemA.wait(); 987 assert(numReaders == 0 && numWriters == 1); 988 wrSemB.notify(); 989 } 990 else if (policy == ReadWriteMutex.Policy.PREFER_WRITERS) 991 { 992 wrSemA.wait(); 993 assert(numReaders == 0 && numWriters == 1); 994 wrSemB.notify(); 995 rdSemA.wait(); 996 assert(numReaders == 1 && numWriters == 0); 997 rdSemB.notify(); 998 } 999 group.joinAll(); 1000 assert(numReaders == 0 && numWriters == 0); 1001 foreach (t; group) group.remove(t); 1002 } 1003 runTest(ReadWriteMutex.Policy.PREFER_READERS); 1004 runTest(ReadWriteMutex.Policy.PREFER_WRITERS); 1005 } 1006 1007 unittest 1008 { 1009 import core.atomic, core.thread; 1010 shared static ReadWriteMutex rwmutex; 1011 shared static bool threadTriedOnceToGetLock; 1012 shared static bool threadFinallyGotLock; 1013 1014 rwmutex = new shared ReadWriteMutex(); 1015 atomicFence; 1016 const maxTimeAllowedForTest = dur!"seconds"(20); 1017 // Test ReadWriteMutex.Reader.tryLock(Duration). 1018 { 1019 static void testReaderTryLock() 1020 { 1021 assert(!rwmutex.reader.tryLock(Duration.min)); 1022 threadTriedOnceToGetLock.atomicStore(true); 1023 assert(rwmutex.reader.tryLock(Duration.max)); 1024 threadFinallyGotLock.atomicStore(true); 1025 rwmutex.reader.unlock; 1026 } 1027 assert(rwmutex.writer.tryLock(Duration.zero), "should have been able to obtain lock without blocking"); 1028 auto otherThread = new Thread(&testReaderTryLock).start; 1029 const failIfThisTimeisReached = MonoTime.currTime + maxTimeAllowedForTest; 1030 Thread.yield; 1031 // We started otherThread with the writer lock held so otherThread's 1032 // first rwlock.reader.tryLock with timeout Duration.min should fail. 1033 while (!threadTriedOnceToGetLock.atomicLoad) 1034 { 1035 assert(MonoTime.currTime < failIfThisTimeisReached, "timed out"); 1036 Thread.yield; 1037 } 1038 rwmutex.writer.unlock; 1039 // Soon after we release the writer lock otherThread's second 1040 // rwlock.reader.tryLock with timeout Duration.max should succeed. 1041 while (!threadFinallyGotLock.atomicLoad) 1042 { 1043 assert(MonoTime.currTime < failIfThisTimeisReached, "timed out"); 1044 Thread.yield; 1045 } 1046 otherThread.join; 1047 } 1048 threadTriedOnceToGetLock.atomicStore(false); // Reset. 1049 threadFinallyGotLock.atomicStore(false); // Reset. 1050 // Test ReadWriteMutex.Writer.tryLock(Duration). 1051 { 1052 static void testWriterTryLock() 1053 { 1054 assert(!rwmutex.writer.tryLock(Duration.min)); 1055 threadTriedOnceToGetLock.atomicStore(true); 1056 assert(rwmutex.writer.tryLock(Duration.max)); 1057 threadFinallyGotLock.atomicStore(true); 1058 rwmutex.writer.unlock; 1059 } 1060 assert(rwmutex.reader.tryLock(Duration.zero), "should have been able to obtain lock without blocking"); 1061 auto otherThread = new Thread(&testWriterTryLock).start; 1062 const failIfThisTimeisReached = MonoTime.currTime + maxTimeAllowedForTest; 1063 Thread.yield; 1064 // We started otherThread with the reader lock held so otherThread's 1065 // first rwlock.writer.tryLock with timeout Duration.min should fail. 1066 while (!threadTriedOnceToGetLock.atomicLoad) 1067 { 1068 assert(MonoTime.currTime < failIfThisTimeisReached, "timed out"); 1069 Thread.yield; 1070 } 1071 rwmutex.reader.unlock; 1072 // Soon after we release the reader lock otherThread's second 1073 // rwlock.writer.tryLock with timeout Duration.max should succeed. 1074 while (!threadFinallyGotLock.atomicLoad) 1075 { 1076 assert(MonoTime.currTime < failIfThisTimeisReached, "timed out"); 1077 Thread.yield; 1078 } 1079 otherThread.join; 1080 } 1081 }