1 /**
2  * The event module provides a primitive for lightweight signaling of other threads
3  * (emulating Windows events on Posix)
4  *
5  * Copyright: Copyright (c) 2019 D Language Foundation
6  * License: Distributed under the
7  *    $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0).
8  *    (See accompanying file LICENSE)
9  * Authors: Rainer Schuetze
10  * Source:    $(DRUNTIMESRC core/sync/event.d)
11  */
12 module core.sync.event;
13 
14 version (Windows)
15 {
16     import core.sys.windows.basetsd /+: HANDLE +/;
17     import core.sys.windows.winerror /+: WAIT_TIMEOUT +/;
18     import core.sys.windows.winbase /+: CreateEvent, CloseHandle, SetEvent, ResetEvent,
19         WaitForSingleObject, INFINITE, WAIT_OBJECT_0+/;
20 }
21 else version (Posix)
22 {
23     import core.sys.posix.pthread;
24     import core.sys.posix.sys.types;
25     import core.sys.posix.time;
26 }
27 else
28 {
29     static assert(false, "Platform not supported");
30 }
31 
32 import core.time;
33 import core.internal.abort : abort;
34 
35 /**
36  * represents an event. Clients of an event are suspended while waiting
37  * for the event to be "signaled".
38  *
39  * Implemented using `pthread_mutex` and `pthread_condition` on Posix and
40  * `CreateEvent` and `SetEvent` on Windows.
41 ---
42 import core.sync.event, core.thread, std.file;
43 
44 struct ProcessFile
45 {
46     ThreadGroup group;
47     Event event;
48     void[] buffer;
49 
50     void doProcess()
51     {
52         event.wait();
53         // process buffer
54     }
55 
56     void process(string filename)
57     {
58         event.initialize(true, false);
59         group = new ThreadGroup;
60         for (int i = 0; i < 10; ++i)
61             group.create(&doProcess);
62 
63         buffer = std.file.read(filename);
64         event.set();
65         group.joinAll();
66         event.terminate();
67     }
68 }
69 ---
70  */
71 struct Event
72 {
73 nothrow @nogc:
74     /**
75      * Creates an event object.
76      *
77      * Params:
78      *  manualReset  = the state of the event is not reset automatically after resuming waiting clients
79      *  initialState = initial state of the signal
80      */
81     this(bool manualReset, bool initialState)
82     {
83         initialize(manualReset, initialState);
84     }
85 
86     /**
87      * Initializes an event object. Does nothing if the event is already initialized.
88      *
89      * Params:
90      *  manualReset  = the state of the event is not reset automatically after resuming waiting clients
91      *  initialState = initial state of the signal
92      */
93     void initialize(bool manualReset, bool initialState)
94     {
95         version (Windows)
96         {
97             if (m_event)
98                 return;
99             m_event = CreateEvent(null, manualReset, initialState, null);
100             m_event || abort("Error: CreateEvent failed.");
101         }
102         else version (Posix)
103         {
104             if (m_initalized)
105                 return;
106             pthread_mutex_init(cast(pthread_mutex_t*) &m_mutex, null) == 0 ||
107                 abort("Error: pthread_mutex_init failed.");
108             static if ( is( typeof( pthread_condattr_setclock ) ) )
109             {
110                 pthread_condattr_t attr = void;
111                 pthread_condattr_init(&attr) == 0 ||
112                     abort("Error: pthread_condattr_init failed.");
113                 pthread_condattr_setclock(&attr, CLOCK_MONOTONIC) == 0 ||
114                     abort("Error: pthread_condattr_setclock failed.");
115                 pthread_cond_init(&m_cond, &attr) == 0 ||
116                     abort("Error: pthread_cond_init failed.");
117                 pthread_condattr_destroy(&attr) == 0 ||
118                     abort("Error: pthread_condattr_destroy failed.");
119             }
120             else
121             {
122                 pthread_cond_init(&m_cond, null) == 0 ||
123                     abort("Error: pthread_cond_init failed.");
124             }
125             m_state = initialState;
126             m_manualReset = manualReset;
127             m_initalized = true;
128         }
129     }
130 
131     // copying not allowed, can produce resource leaks
132     @disable this(this);
133     @disable void opAssign(Event);
134 
135     ~this()
136     {
137         terminate();
138     }
139 
140     /**
141      * deinitialize event. Does nothing if the event is not initialized. There must not be
142      * threads currently waiting for the event to be signaled.
143     */
144     void terminate()
145     {
146         version (Windows)
147         {
148             if (m_event)
149                 CloseHandle(m_event);
150             m_event = null;
151         }
152         else version (Posix)
153         {
154             if (m_initalized)
155             {
156                 pthread_mutex_destroy(&m_mutex) == 0 ||
157                     abort("Error: pthread_mutex_destroy failed.");
158                 pthread_cond_destroy(&m_cond) == 0 ||
159                     abort("Error: pthread_cond_destroy failed.");
160                 m_initalized = false;
161             }
162         }
163     }
164 
165 
166     /// Set the event to "signaled", so that waiting clients are resumed
167     void set()
168     {
169         version (Windows)
170         {
171             if (m_event)
172                 SetEvent(m_event);
173         }
174         else version (Posix)
175         {
176             if (m_initalized)
177             {
178                 pthread_mutex_lock(&m_mutex);
179                 m_state = true;
180                 pthread_cond_broadcast(&m_cond);
181                 pthread_mutex_unlock(&m_mutex);
182             }
183         }
184     }
185 
186     /// Reset the event manually
187     void reset()
188     {
189         version (Windows)
190         {
191             if (m_event)
192                 ResetEvent(m_event);
193         }
194         else version (Posix)
195         {
196             if (m_initalized)
197             {
198                 pthread_mutex_lock(&m_mutex);
199                 m_state = false;
200                 pthread_mutex_unlock(&m_mutex);
201             }
202         }
203     }
204 
205     /**
206      * Wait for the event to be signaled without timeout.
207      *
208      * Returns:
209      *  `true` if the event is in signaled state, `false` if the event is uninitialized or another error occured
210      */
211     bool wait()
212     {
213         version (Windows)
214         {
215             return m_event && WaitForSingleObject(m_event, INFINITE) == WAIT_OBJECT_0;
216         }
217         else version (Posix)
218         {
219             return wait(Duration.max);
220         }
221     }
222 
223     /**
224      * Wait for the event to be signaled with timeout.
225      *
226      * Params:
227      *  tmout = the maximum time to wait
228      * Returns:
229      *  `true` if the event is in signaled state, `false` if the event was nonsignaled for the given time or
230      *  the event is uninitialized or another error occured
231      */
232     bool wait(Duration tmout)
233     {
234         version (Windows)
235         {
236             if (!m_event)
237                 return false;
238 
239             auto maxWaitMillis = dur!("msecs")(uint.max - 1);
240 
241             while (tmout > maxWaitMillis)
242             {
243                 auto res = WaitForSingleObject(m_event, uint.max - 1);
244                 if (res != WAIT_TIMEOUT)
245                     return res == WAIT_OBJECT_0;
246                 tmout -= maxWaitMillis;
247             }
248             auto ms = cast(uint)(tmout.total!"msecs");
249             return WaitForSingleObject(m_event, ms) == WAIT_OBJECT_0;
250         }
251         else version (Posix)
252         {
253             if (!m_initalized)
254                 return false;
255 
256             pthread_mutex_lock(&m_mutex);
257 
258             int result = 0;
259             if (!m_state)
260             {
261                 if (tmout == Duration.max)
262                 {
263                     result = pthread_cond_wait(&m_cond, &m_mutex);
264                 }
265                 else
266                 {
267                     import core.sync.config;
268 
269                     timespec t = void;
270                     mktspec(t, tmout);
271 
272                     result = pthread_cond_timedwait(&m_cond, &m_mutex, &t);
273                 }
274             }
275             if (result == 0 && !m_manualReset)
276                 m_state = false;
277 
278             pthread_mutex_unlock(&m_mutex);
279 
280             return result == 0;
281         }
282     }
283 
284 private:
285     version (Windows)
286     {
287         HANDLE m_event;
288     }
289     else version (Posix)
290     {
291         pthread_mutex_t m_mutex;
292         pthread_cond_t m_cond;
293         bool m_initalized;
294         bool m_state;
295         bool m_manualReset;
296     }
297 }
298 
299 // Test single-thread (non-shared) use.
300 @nogc nothrow unittest
301 {
302     // auto-reset, initial state false
303     Event ev1 = Event(false, false);
304     assert(!ev1.wait(1.dur!"msecs"));
305     ev1.set();
306     assert(ev1.wait());
307     assert(!ev1.wait(1.dur!"msecs"));
308 
309     // manual-reset, initial state true
310     Event ev2 = Event(true, true);
311     assert(ev2.wait());
312     assert(ev2.wait());
313     ev2.reset();
314     assert(!ev2.wait(1.dur!"msecs"));
315 }
316 
317 unittest
318 {
319     import core.thread, core.atomic;
320 
321     scope event      = new Event(true, false);
322     int  numThreads = 10;
323     shared int numRunning = 0;
324 
325     void testFn()
326     {
327         event.wait(8.dur!"seconds"); // timeout below limit for druntime test_runner
328         numRunning.atomicOp!"+="(1);
329     }
330 
331     auto group = new ThreadGroup;
332 
333     for (int i = 0; i < numThreads; ++i)
334         group.create(&testFn);
335 
336     auto start = MonoTime.currTime;
337     assert(numRunning == 0);
338 
339     event.set();
340     group.joinAll();
341 
342     assert(numRunning == numThreads);
343 
344     assert(MonoTime.currTime - start < 5.dur!"seconds");
345 }