root/trunk/parallelFuture/parallelFuture.d

Revision 782, 61.7 kB (checked in by dsimcha, 4 years ago)

Allow ref parallel foreach over non-random access ranges.

Line 
1 /**A library that implements various high-level parallelism primitives, such as
2  * parallel foreach over arbitrary ranges, parallel map and reduce,
3  * futures, and a task pool.
4  *
5  * By:  David Simcha
6  *
7  * Examples:
8  * ---
9     // Create a TaskPool object with the default number of threads.
10     auto poolInstance = new TaskPool();
11
12     // Create some data to work on.
13     uint[] numbers = new uint[1_000];
14
15     // Fill in this array in parallel, using default block size.
16     // Note:  Be careful when writing to adjacent elements of an array from
17     // different threads, as this can cause word tearing bugs when
18     // the elements aren't properly aligned or aren't the machine's native
19     // word size.  In this case, though, we're ok.
20     foreach(i; poolInstance.parallel( iota(0, numbers.length)) ) {
21         numbers[i] = i;
22     }
23
24     // Make sure it works.
25     foreach(i; 0..numbers.length) {
26         assert(numbers[i] == i);
27     }
28
29     stderr.writeln("Done creating nums.");
30
31     // Parallel foreach also works on non-random access ranges, albeit
32     // less efficiently.
33     auto myNumbers = filter!"a % 7 > 0"( iota(0, 1000));
34     foreach(num; poolInstance.parallel(myNumbers)) {
35         assert(num % 7 > 0 && num < 1000);
36     }
37     stderr.writeln("Done modulus test.");
38
39     // Use parallel map to calculate the square of each element in numbers,
40     // and make sure it's right.
41     uint[] squares = poolInstance.map!"a * a"(numbers, 100);
42     assert(squares.length == numbers.length);
43     foreach(i, number; numbers) {
44         assert(squares[i] == number * number);
45     }
46     stderr.writeln("Done squares.");
47
48     // Sum up the array in parallel with the current thread.
49     auto sumFuture = poolInstance.task!( reduce!"a + b" )(numbers);
50
51     // Go off and do other stuff while that future executes:
52     // Find the sum of squares of numbers.
53     ulong sumSquares = 0;
54     foreach(elem; numbers) {
55         sumSquares += elem * elem;
56     }
57
58     // Ask for our result.  If the pool has not yet started working on
59     // this task, spinWait() automatically steals it and executes it in this
60     // thread.
61     uint mySum = sumFuture.spinWait();
62     assert(mySum == 999 * 1000 / 2);
63
64     // We could have also computed this sum in parallel using parallel
65     // reduce.
66     auto mySumParallel = poolInstance.reduce!"a + b"(numbers);
67     assert(mySum == mySumParallel);
68     stderr.writeln("Done sums.");
69
70     // Execute an anonymous delegate as a task.
71     auto myTask = poolInstance.task({
72         synchronized writeln("Our lives are parallel...Our lives are parallel.");
73     });
74
75     // Parallel foreach loops can also be nested:
76     auto nestedOuter = "abcd"d;
77     auto nestedInner =  iota(0, 5);
78
79     foreach(letter; poolInstance.parallel(nestedOuter, 1)) {
80         foreach(number; poolInstance.parallel(nestedInner, 1)) {
81             synchronized writeln(cast(char) letter, number);
82         }
83     }
84
85     // Block until all jobs are finished and then shut down the thread pool.
86     poolInstance.waitStop();
87  * ---
88  *
89  *
90  * License:
91  * Boost Software License - Version 1.0 - August 17th, 2003
92  *
93  * Permission is hereby granted, free of charge, to any person or organization
94  * obtaining a copy of the software and accompanying documentation covered by
95  * this license (the "Software") to use, reproduce, display, distribute,
96  * execute, and transmit the Software, and to prepare derivative works of the
97  * Software, and to permit third-parties to whom the Software is furnished to
98  * do so, all subject to the following:
99  *
100  * The copyright notices in the Software and this entire statement, including
101  * the above license grant, this restriction and the following disclaimer,
102  * must be included in all copies of the Software, in whole or in part, and
103  * all derivative works of the Software, unless such copies or derivative
104  * works are solely in the form of machine-executable object code generated by
105  * a source language processor.
106  *
107  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
108  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
109  * FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
110  * SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
111  * FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
112  * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
113  * DEALINGS IN THE SOFTWARE.
114  */
115 module parallelfuture;
116
117 import core.thread, core.cpuid, std.algorithm, std.range, std.c.stdlib, std.stdio,
118     std.contracts, std.functional, std.conv, std.math, core.memory, std.traits,
119     std.typetuple;
120
121 import core.sync.condition, core.sync.mutex;
122
123 // Workaround for bug 3753.
124 version(Posix) {
125     // Can't use alloca() because it can't be used with exception handling.
126     // Use the GC instead even though it's slightly less efficient.
127     void* alloca(size_t nBytes) {
128         return GC.malloc(nBytes);
129     }
130 } else {
131     // Can really use alloca().
132     import core.stdc.stdlib : alloca;
133 }
134
135 /* Atomics code.  The ASM routines were borrowed from Tango on an ad-hoc basis.
136  * These snippets are likely not copyrightable.  Nonetheless,
137  * these should be replaced with generic functions once D2 gets a decent
138  * standard atomics lib.
139  */
140 void atomicSetUbyte(ref ubyte stuff, ubyte newVal) {
141     asm {
142         mov EAX, stuff;
143         mov DL, newVal;
144         lock;
145         xchg [EAX], DL;
146     }
147 }
148
149 ubyte atomicReadUbyte(ref ubyte stuff) {
150     asm {
151         mov DL, 0;
152         mov AL, 0;
153         mov ECX, stuff;
154         lock;
155         cmpxchg [ECX], DL;
156     }
157 }
158
159 bool atomicCasUbyte(ref ubyte stuff, ubyte testVal, ubyte newVal) {
160     asm {
161         mov DL, newVal;
162         mov AL, testVal;
163         mov ECX, stuff;
164         lock;
165         cmpxchg [ECX], DL;
166         setz AL;
167     }
168 }
169
170 void atomicIncUint(ref uint num) {
171     asm {
172         mov EAX, num;
173         lock;
174         inc int ptr [EAX];
175         mov EAX, [EAX];
176     }
177 }
178
179 //-----------------------------------------------------------------------------
180
181
182 /*--------------------- Generic helper functions, etc.------------------------*/
183 private template MapType(alias fun, R) {
184     alias typeof(unaryFun!(fun)(ElementType!(R).init)) MapType;
185 }
186
187 private template ReduceType(alias fun, R, E) {
188     alias typeof(binaryFun!(fun)(E.init, ElementType!(R).init)) ReduceType;
189 }
190
191 private template hasLength(R) {
192     enum bool hasLength = __traits(compiles, R.init.length);
193 }
194
195 private template lValueElements(R) {
196     enum bool lValueElements = lValueElementsImpl!(R).ret;
197 }
198
199 private template lValueElementsImpl(R) {
200     private R myRange;
201     enum ret = is(typeof(&(myRange.front())));
202 }
203
204 private void sleepMillisecond(long nMilliseconds) {
205     Thread.sleep(nMilliseconds * 10_000);
206 }
207
208
209 private T* toHeap(T)(T object) {
210     GC.BlkAttr gcFlags = (typeid(T).flags & 1) ?
211                           cast(GC.BlkAttr) 0 :
212                           GC.BlkAttr.NO_SCAN;
213     T* myPtr = cast(T*) GC.malloc(T.sizeof, gcFlags);
214     *myPtr = object;
215     return myPtr;
216 }
217
218 //------------------------------------------------------------------------------
219 /* Various classes of task.  These use manual C-style polymorphism, the kind
220  * with lots of structs and pointer casting.  This is because real classes
221  * would prevent some of the allocation tricks I'm using and waste space on
222  * monitors and vtbls for something that needs to be ultra-efficient.
223  */
224
225 private enum TaskState : ubyte {
226     notStarted,
227     inProgress,
228     done
229 }
230
231 private template BaseMixin(ubyte initTaskStatus) {
232     AbstractTask* prev;
233     AbstractTask* next;
234
235     static if(is(typeof(&impl))) {
236         void function(void*) runTask = &impl;
237     } else {
238         void function(void*) runTask;
239     }
240
241     Object exception;
242     ubyte taskStatus = initTaskStatus;
243
244
245     /* Kludge:  Some tasks need to re-submit themselves after they finish.
246      * In this case, they will set themselves to TaskState.notStarted before
247      * resubmitting themselves.  Setting this flag to false prevents the work
248      * stealing loop from setting them to done.*/
249     bool shouldSetDone = true;
250
251     bool done() {
252         if(atomicReadUbyte(taskStatus) == TaskState.done) {
253             if(exception) {
254                 throw exception;
255             }
256
257             return true;
258         }
259
260         return false;
261     }
262 }
263
264 // The base "class" for all of the other tasks.
265 private struct AbstractTask {
266     mixin BaseMixin!(TaskState.notStarted);
267
268     void job() {
269         runTask(&this);
270     }
271 }
272
273 private template AliasReturn(alias fun, T...) {
274     alias AliasReturnImpl!(fun, T).ret AliasReturn;
275 }
276
277 private template AliasReturnImpl(alias fun, T...) {
278     private T args;
279     alias typeof(fun(args)) ret;
280 }
281
282 /**Calls a delegate or function pointer with args.  This is basically an
283  * adapter that makes Task work with delegates, function pointers and
284  * functors instead of just aliases.
285  */
286 ReturnType!(F) runCallable(F, Args...)(F fpOrDelegate, Args args) {
287     return fpOrDelegate(args);
288 }
289
290 /**A struct that encapsulates the information about a task, including
291  * its current status, what pool it was submitted to, and its arguments.
292  *
293  * Note:  If a Task has been submitted to the pool, is being stored in a stack
294  * frame, and has not yet finished, the destructor for this struct will
295  * automatically call yieldWait() so that the task can finish and the
296  * stack frame can be destroyed safely.
297  *
298  * Bugs:  Doesn't work with ref/out arguments.
299  */
300 struct Task(alias fun, Args...) {
301 private:
302     static void impl(void* myTask) {
303         Task* myCastedTask = cast(typeof(this)*) myTask;
304         static if(is(ReturnType == void)) {
305             fun(myCastedTask.args);
306         } else {
307             myCastedTask.returnVal = fun(myCastedTask.args);
308         }
309     }
310     mixin BaseMixin!(TaskState.notStarted) Base;
311
312     TaskPool pool;  // For work stealing stuff.
313     Args args;
314
315     alias typeof(fun(args)) ReturnType;
316     static if(!is(ReturnType == void)) {
317         ReturnType returnVal;
318     }
319
320     void enforcePool() {
321         enforce(this.pool !is null, "Job not submitted yet.");
322     }
323
324     this(Args args) {
325         static if(args.length > 0) {
326             this.args = args;
327         }
328     }
329
330 public:
331     /**If the task isn't started yet, steal it and do it in this thread.
332      * If it's done, return its return value, if any.  If it's in progress,
333      * busy spin until it's done, then return the return value.
334      *
335      * This function should be used when you expect the result of the
336      * task to be available relatively quickly, on a timescale shorter
337      * than that of an OS context switch.
338      */
339     ReturnType spinWait() {
340         enforcePool();
341
342         this.pool.tryStealDelete( cast(AbstractTask*) &this);
343
344         while(atomicReadUbyte(this.taskStatus) != TaskState.done) {}
345
346         if(exception) {
347             throw exception;
348         }
349
350         static if(!is(ReturnType == void)) {
351             return this.returnVal;
352         }
353     }
354
355     /**If the task isn't started yet, steal it and do it in this thread.
356      * If it's done, return its return value, if any.  If it's in progress,
357      * wait on a condition variable.
358      *
359      * This function should be used when you expect the result of the
360      * task to take a while, as waiting on a condition variable
361      * introduces latency, but results in negligible wasted CPU cycles.
362      */
363     ReturnType yieldWait() {
364         enforcePool();
365         this.pool.tryStealDelete( cast(AbstractTask*) &this);
366         if(atomicReadUbyte(this.taskStatus) == TaskState.done) {
367
368             static if(is(ReturnType == void)) {
369                 return;
370             } else {
371                 return this.returnVal;
372             }
373         }
374
375         pool.lock();
376         scope(exit) pool.unlock();
377
378         while(atomicReadUbyte(this.taskStatus) != TaskState.done) {
379             pool.waitUntilCompletion();
380         }
381
382         if(exception) {
383             throw exception;
384         }
385
386         static if(!is(ReturnType == void)) {
387             return this.returnVal;
388         }
389     }
390
391     /**If this task is not started yet, steal it and execute it in the current
392      * thread.  If it is finished, return its result.  If it is in progress,
393      * execute any other available tasks from the task pool until this one
394      * is finished.  If no other tasks are available, yield wait.
395      */
396     ReturnType workWait() {
397         enforcePool();
398         this.pool.tryStealDelete( cast(AbstractTask*) &this);
399
400         while(true) {
401             if(done) {
402                 static if(is(ReturnType == void)) {
403                     return;
404                 } else {
405                     return this.returnVal;
406                 }
407             }
408
409             pool.lock();
410             AbstractTask* job;
411             try {
412                 // Locking explicitly and calling popNoSync() because
413                 // pop() waits on a condition variable if there are no jobs
414                 // in the queue.
415                 job = pool.popNoSync();
416             } finally {
417                 pool.unlock();
418             }
419
420             if(job !is null) {
421
422                 version(verboseUnittest) {
423                     stderr.writeln("Doing workWait work.");
424                 }
425
426                 pool.doJob(job);
427
428                 if(done) {
429                     static if(is(ReturnType == void)) {
430                         return;
431                     } else {
432                         return this.returnVal;
433                     }
434                 }
435             } else {
436                 version(verboseUnittest) {
437                     stderr.writeln("Yield from workWait.");
438                 }
439
440                 return yieldWait();
441             }
442         }
443     }
444
445     ///
446     bool done() {
447         // Explicitly forwarded for documentation purposes.
448         return Base.done();
449     }
450
451     ~this() {
452         if(pool !is null && taskStatus != TaskState.done) {
453             yieldWait();
454         }
455     }
456 }
457
458 /**Creates a task that calls an alias.
459  *
460  * Examples:
461  * ---
462  * auto pool = new TaskPool();
463  * uint[] foo = someFunction();
464  *
465  * // Create a task to sum this array in the background.
466  * auto myTask = task!( reduce!"a + b" )(foo);
467  * pool.put(myTask);
468  *
469  * // Do other stuff.
470  *
471  * // Get value.  Steals the job and executes it in this thread if it
472  * // hasn't been started by a worker thread yet.
473  * writeln("Sum = ", myFuture.spinWait());
474  * ---
475  *
476  * Note:
477  * This method of creating tasks allocates on the stack and requires an explicit
478  * submission to the pool.  It is designed for tasks that are to finish before
479  * the function in which they are created returns.  If you want to escape the
480  * Task object from the function in which it was created or prefer to heap
481  * allocate and automatically submit to the pool, see pool.task().
482  */
483 Task!(fun, Args) task(alias fun, Args...)(Args args) {
484     alias Task!(fun, Args) RetType;
485     return RetType(args);
486 }
487
488 /**Create a task that calls a function pointer, delegate, or functor.
489  * This works for anonymous delegates.
490  *
491  * Examples:
492  * ---
493  * auto pool = new TaskPool();
494  * auto myTask = task({
495  *     stderr.writeln("I've completed a task.");
496  * });
497  *
498  * pool.put(myTask);
499  *
500  * // Do other stuff.
501  *
502  * myTask.yieldWait();
503  * ---
504  *
505  * Notes:
506  * This method of creating tasks allocates on the stack and requires an explicit
507  * submission to the pool.  It is designed for tasks that are to finish before
508  * the function in which they are created returns.  If you want to escape the
509  * Task object from the function in which it was created or prefer to heap
510  * allocate and automatically submit to the pool, see pool.task().
511  *
512  * In the case of delegates, this function takes a scope delegate to prevent
513  * the allocation of closures, since its intended use is for tasks that will
514  * be finished before the function in which they're created returns.
515  * pool.task() takes a non-scope delegate and will allow the use of closures.
516  */
517 Task!(runCallable, TypeTuple!(F, Args))
518 task(F, Args...)(scope F delegateOrFp, Args args)
519 if(is(typeof(delegateOrFp(args)))) {
520     alias typeof(return) RT;
521     return RT(delegateOrFp, args);
522 }
523
524 private struct ParallelForeachTask(R, Delegate)
525 if(isRandomAccessRange!R && hasLength!R) {
526     enum withIndex = ParameterTypeTuple!(Delegate).length == 2;
527
528     static void impl(void* myTask) {
529         auto myCastedTask = cast(ParallelForeachTask!(R, Delegate)*) myTask;
530         foreach(i; myCastedTask.lowerBound..myCastedTask.upperBound) {
531
532             static if(lValueElements!R) {
533                 static if(withIndex) {
534                     myCastedTask.runMe(i, myCastedTask.myRange[i]);
535                 } else {
536                     myCastedTask.runMe( myCastedTask.myRange[i]);
537                 }
538             } else {
539                 auto valToPass = myCastedTask.myRange[i];
540                 static if(withIndex) {
541                     myCastedTask.runMe(i, valToPass);
542                 } else {
543                     myCastedTask.runMe(valToPass);
544                 }
545             }
546         }
547
548         // Allow some memory reclamation.
549         myCastedTask.myRange = R.init;
550         myCastedTask.runMe = null;
551     }
552
553     mixin BaseMixin!(TaskState.done);
554
555     TaskPool pool;
556
557     // More specific stuff.
558     size_t lowerBound;
559     size_t upperBound;
560     R myRange;
561     Delegate runMe;
562
563     void wait() {
564         if(pool is null) {
565             // Never submitted.  No need to wait.
566             return;
567         }
568
569         pool.lock();
570         scope(exit) pool.unlock();
571
572         // No work stealing here b/c the function that waits on this task
573         // wants to recycle it as soon as it finishes.
574         while(!done()) {
575             pool.waitUntilCompletion();
576         }
577
578         if(exception) {
579             throw exception;
580         }
581     }
582 }
583
584 private struct ParallelForeachTask(R, Delegate)
585 if(!isRandomAccessRange!R || !hasLength!R) {
586     enum withIndex = ParameterTypeTuple!(Delegate).length == 2;
587
588     static void impl(void* myTask) {
589         auto myCastedTask = cast(ParallelForeachTask!(R, Delegate)*) myTask;
590
591         static ref ElementType!(R) getElement(T)(ref T elemOrPtr) {
592             static if(is(typeof(*elemOrPtr) == ElementType!R)) {
593                 return *elemOrPtr;
594             } else {
595                 return elemOrPtr;
596             }
597         }
598
599         foreach(i, element; myCastedTask.elements) {
600             static if(withIndex) {
601                 size_t lValueIndex = i + myCastedTask.startIndex;
602                 myCastedTask.runMe(lValueIndex, getElement(element));
603             } else {
604                 myCastedTask.runMe(getElement(element));
605             }
606         }
607
608         // Make memory easier to reclaim.
609         myCastedTask.runMe = null;
610     }
611
612     mixin BaseMixin!(TaskState.done);
613
614     TaskPool pool;
615
616     // More specific stuff.
617     alias ElementType!R E;
618     Delegate runMe;
619
620     static if(lValueElements!(R)) {
621         E*[] elements;
622     } else {
623         E[] elements;
624     }
625     size_t startIndex;
626
627     void wait() {
628         if(pool is null) {
629             // Never submitted.  No need to wait.
630             return;
631         }
632
633         pool.lock();
634         scope(exit) pool.unlock();
635
636         // No work stealing here b/c the function that waits on this task
637         // wants to recycle it as soon as it finishes.
638
639         while(!done()) {
640             pool.waitUntilCompletion();
641         }
642
643         if(exception) {
644             throw exception;
645         }
646     }
647 }
648
649 private struct MapTask(alias fun, R) if(isRandomAccessRange!R && hasLength!R) {
650     static void impl(void* myTask) {
651         auto myCastedTask = cast(MapTask!(fun, R)*) myTask;
652
653         foreach(i; myCastedTask.lowerBound..myCastedTask.upperBound) {
654             myCastedTask.results[i] = uFun(myCastedTask.range[i]);
655         }
656
657         // Nullify stuff, make GC's life easier.
658         myCastedTask.results = null;
659         myCastedTask.range = R.init;
660     }
661
662     mixin BaseMixin!(TaskState.done);
663
664     TaskPool pool;
665
666     // More specific stuff.
667     alias unaryFun!fun uFun;
668     R range;
669     alias ElementType!R E;
670     alias typeof(uFun(E.init)) ReturnType;
671     ReturnType[] results;
672     size_t lowerBound;
673     size_t upperBound;
674
675     void wait() {
676         if(pool is null) {
677             // Never submitted.  No need to wait on it.
678             return;
679         }
680
681         pool.lock();
682         scope(exit) pool.unlock();
683
684         // Again, no work stealing.
685
686         while(!done()) {
687             pool.waitUntilCompletion();
688         }
689
690         if(exception) {
691             throw exception;
692         }
693     }
694 }
695
696 /**Struct for creating worker-local storage.  Worker-local storage is basically
697  * thread-local storage that exists only for workers in a given pool, is
698  * allocated on the heap in a way that avoids false sharing,
699  * and doesn't necessarily have global scope within any
700  * thread.  It can be accessed from any worker thread in the pool that created
701  * it, or the thread that created the pool that created it.  Accessing from
702  * other threads will result in undefined behavior.
703  *
704  * Since the underlying data for this struct is heap-allocated, this struct
705  * has reference semantics when passed around.
706  *
707  * At a more concrete level, the main use case for WorkerLocal is performing
708  * parallel reductions with an imperative, as opposed to functional,
709  * programming style.  Therefore, it's useful to treat WorkerLocal as local
710  * to each thread for only the parallel portion of an algorithm.
711  *
712  * Examples:
713  * ---
714  * auto pool = new TaskPool;
715  * auto sumParts = pool.createWorkerLocal!(uint)();
716  * foreach(i; pool.parallel(iota(0U, someLargeNumber))) {
717  *     // Do complicated stuff.
718  *     sumParts.get += resultOfComplicatedStuff;
719  * }
720  *
721  * writeln("Sum = ", reduce!"a + b"(sumParts.toRange));
722  * ---
723  */
724 struct WorkerLocal(T) {
725 private:
726     size_t subtractThis;
727     size_t size;
728
729     static immutable uint cacheLineSize;
730     size_t elemSize;
731     bool* stillThreadLocal;
732
733     static this() {
734         size_t lineSize = 0;
735         foreach(cachelevel; datacache) {
736             if(cachelevel.lineSize > lineSize && cachelevel.lineSize < uint.max) {
737                 lineSize = cachelevel.lineSize;
738             }
739         }
740
741         cacheLineSize = lineSize;
742     }
743
744     static size_t roundToLine(size_t num) pure nothrow {
745         if(num % cacheLineSize == 0) {
746             return num;
747         } else {
748             return ((num / cacheLineSize) + 1) * cacheLineSize;
749         }
750     }
751
752     void* data;
753
754     void initialize(TaskPool pool) {
755         subtractThis = pool.instanceStartIndex - 1;
756         size = pool.size() + 1;
757         stillThreadLocal = new bool;
758         *stillThreadLocal = true;
759
760         // Determines whether the GC should scan the array.
761         auto blkInfo = (typeid(T).flags & 1) ?
762                        cast(GC.BlkAttr) 0 :
763                        GC.BlkAttr.NO_SCAN;
764
765         immutable nElem = pool.size + 1;
766         elemSize = roundToLine(T.sizeof);
767
768         // The + 3 is to pad one full cache line worth of space on either side
769         // of the data structure to make sure false sharing with completely
770         // unrelated heap data is prevented, and to provide enough padding to
771         // make sure that data is cache line-aligned.
772         data = GC.malloc(elemSize * (nElem + 3), blkInfo) + elemSize;
773
774         // Cache line align data ptr.
775         data = cast(void*) roundToLine(cast(size_t) data);
776
777         foreach(i; 0..nElem) {
778             this.opIndex(i) = T.init;
779         }
780     }
781
782     ref T opIndex(size_t index) {
783         assert(index < size, text(index, '\t', uint.max));
784         return *(cast(T*) (data + elemSize * index));
785     }
786
787     void opIndexAssign(T val, size_t index) {
788         assert(index < size);
789         *(cast(T*) (data + elemSize * index)) = val;
790     }
791
792 public:
793     /**Get the current thread's instance.  Returns by reference even though
794      * ddoc refuses to say so.  Undefined behavior will result if the current
795      * thread is not a worker in the pool that created this instance or the
796      * thread that created the pool that created this instance.
797      *
798      * If assertions are enabled and toRange() has been called, then this
799      * WorkerLocal instance is no longer worker-local and an assertion
800      * failure will result when calling this method.  This is not checked
801      * when assertions are disabled for performance reasons.
802      */
803     ref T get() @property {
804         assert(*stillThreadLocal,
805                "Cannot call get() on this instance of WorkerLocal because it" ~
806                " is no longer worker-local."
807         );
808         auto index = TaskPool.threadIndex;
809
810         // Assume it's the thread that created the pool.
811         if(index < subtractThis) {
812             index = subtractThis;
813         }
814         return opIndex(index - subtractThis);
815     }
816
817     /**Assign a value to the current thread's instance.  This function has
818      * the same caveats as its overload.
819      */
820     void get(T val) @property {
821         assert(*stillThreadLocal,
822                "Cannot call get() on this instance of WorkerLocal because it" ~
823                " is no longer worker-local."
824         );
825
826         auto index = TaskPool.threadIndex;
827
828         // Assume it's the thread that created the pool.
829         if(index < subtractThis) {
830             index = subtractThis;
831         }
832         opIndexAssign(val, index - subtractThis);
833     }
834
835     /**Returns a range view of the values for all threads, which can be used
836      * to do stuff with the results of each thread after running the parallel
837      * part of your algorithm.  Do NOT use this method in the parallel portion
838      * of your algorithm.
839      *
840      * Calling this function will also set a flag indicating
841      * that this struct is no longer thread-local, and attempting to use the
842      * get() method again will result in an assertion failure if assertions
843      * are enabled.
844      */
845     WorkerLocalRange!T toRange() @property {
846         if(*stillThreadLocal) {
847             *stillThreadLocal = false;
848
849             // Make absolutely sure results are visible to all threads.
850             synchronized {}
851         }
852
853        return WorkerLocalRange!(T)(this);
854     }
855 }
856
857 /**Range primitives for worker-local storage.  The purpose of this is to
858  * access results produced by each worker thread from a single thread once you
859  * are no longer using the worker-local storage from multiple threads.
860  * Do NOT use this struct in the parallel portion of your algorithm.
861  */
862 struct WorkerLocalRange(T) {
863 private:
864     WorkerLocal!T workerLocal;
865
866     size_t _length;
867     size_t beginOffset;
868
869     this(WorkerLocal!(T) wl) {
870         this.workerLocal = wl;
871         _length = wl.size;
872     }
873
874 public:
875     ///
876     ref T front() @property {
877         return this[0];
878     }
879
880     ///
881     ref T back() @property {
882         return this[_length - 1];
883     }
884
885     ///
886     void popFront() {
887         if(_length > 0) {
888             beginOffset++;
889             _length--;
890         }
891     }
892
893     ///
894     void popBack() {
895         if(_length > 0) {
896             _length--;
897         }
898     }
899
900     ///
901     typeof(this) save() @property {
902         return this;
903     }
904
905     ///
906     ref T opIndex(size_t index) {
907         assert(index < _length);
908         return workerLocal[index + beginOffset];
909     }
910
911     ///
912     void opIndexAssign(T val, size_t index) {
913         assert(index < _length);
914         workerLocal[index] = val;
915     }
916
917     ///
918     bool empty() @property {
919         return length == 0;
920     }
921
922     ///
923     size_t length() @property {
924         return _length;
925     }
926 }
927
928 /**The task pool class that is the workhorse of this library.
929  */
930 class TaskPool {
931 private:
932     Thread[] pool;
933     AbstractTask* head;
934     AbstractTask* tail;
935     PoolState status = PoolState.running;  // All operations on this are done atomically.
936     Condition workerCondition;
937     Condition waiterCondition;
938     Mutex mutex;
939
940     // The instanceStartIndex of the next instance that will be created.
941     __gshared static size_t nextInstanceIndex = 1;
942
943     // The index of the current thread.
944     static size_t threadIndex;
945
946     // The index of the first thread in the next instance.
947     immutable size_t instanceStartIndex;
948
949     // The index that the next thread to be initialized in this pool will have.
950     size_t nextThreadIndex;
951
952     enum PoolState : ubyte {
953         running,
954         finishing,
955         stopNow
956     }
957
958     void doJob(AbstractTask* job) {
959         assert(job.taskStatus == TaskState.inProgress);
960         assert(job.next is null);
961         assert(job.prev is null);
962
963         scope(exit) {
964             lock();
965             notifyWaiters();
966             unlock();
967         }
968
969         try {
970             job.job();
971             if(job.shouldSetDone) {
972                 atomicSetUbyte(job.taskStatus, TaskState.done);
973             }
974         } catch(Object e) {
975             job.exception = e;
976             if(job.shouldSetDone) {
977                 atomicSetUbyte(job.taskStatus, TaskState.done);
978             }
979         }
980     }
981
982     void workLoop() {
983         // Initialize thread index.
984         synchronized(this) {
985             threadIndex = nextThreadIndex;
986             nextThreadIndex++;
987         }
988
989         while(atomicReadUbyte(status) != PoolState.stopNow) {
990             AbstractTask* task = pop();
991             if (task is null) {
992                 if(atomicReadUbyte(status) == PoolState.finishing) {
993                     atomicSetUbyte(status, PoolState.stopNow);
994                     return;
995                 }
996             } else {
997                 doJob(task);
998             }
999         }
1000     }
1001
1002     bool deleteItem(AbstractTask* item) {
1003         lock();
1004         auto ret = deleteItemNoSync(item);
1005         unlock();
1006         return ret;
1007     }
1008
1009     bool deleteItemNoSync(AbstractTask* item)
1010     out {
1011         assert(item.next is null);
1012         assert(item.prev is null);
1013     } body {
1014         if(item.taskStatus != TaskState.notStarted) {
1015             return false;
1016         }
1017         item.taskStatus = TaskState.inProgress;
1018
1019         if(item is head) {
1020             // Make sure head gets set properly.
1021             popNoSync();
1022             return true;;
1023         }
1024         if(item is tail) {
1025             tail = tail.prev;
1026             if(tail !is null) {
1027                 tail.next = null;
1028             }
1029             item.next = null;
1030             item.prev = null;
1031             return true;
1032         }
1033         if(item.next !is null) {
1034             assert(item.next.prev is item);  // Check queue consistency.
1035             item.next.prev = item.prev;
1036         }
1037         if(item.prev !is null) {
1038             assert(item.prev.next is item);  // Check queue consistency.
1039             item.prev.next = item.next;
1040         }
1041         item.next = null;
1042         item.prev = null;
1043         return true;
1044     }
1045
1046     // Pop a task off the queue.  Should only be called by worker threads.
1047     AbstractTask* pop() {
1048         lock();
1049         auto ret = popNoSync();
1050         if(ret is null && status == PoolState.running) {
1051             wait();
1052            // stderr.writeln("AWAKE");
1053         }
1054         unlock();
1055         return ret;
1056     }
1057
1058     AbstractTask* popNoSync()
1059     out(returned) {
1060         /* If task.prev and task.next aren't null, then another thread
1061          * can try to delete this task from the pool after it's
1062          * alreadly been deleted/popped.
1063          */
1064         if(returned !is null) {
1065             assert(returned.next is null);
1066             assert(returned.prev is null);
1067         }
1068     } body {
1069         AbstractTask* returned = head;
1070         if (head !is null) {
1071             head = head.next;
1072             returned.prev = null;
1073             returned.next = null;
1074             returned.taskStatus = TaskState.inProgress;
1075         }
1076         if(head !is null) {
1077             head.prev = null;
1078         }
1079
1080         return returned;
1081     }
1082
1083     // Push a task onto the queue.
1084     void abstractPut(AbstractTask* task) {
1085         lock();
1086         abstractPutNoSync(task);
1087         unlock();
1088     }
1089
1090     void abstractPutNoSync(AbstractTask* task)
1091     out {
1092         assert(tail.prev !is tail);
1093         assert(tail.next is null, text(tail.prev, '\t', tail.next));
1094         if(tail.prev !is null) {
1095             assert(tail.prev.next is tail, text(tail.prev, '\t', tail.next));
1096         }
1097     } body {
1098         task.next = null;
1099         if (head is null) { //Queue is empty.
1100             head = task;
1101             tail = task;
1102             tail.prev = null;
1103         } else {
1104             task.prev = tail;
1105             tail.next = task;
1106             tail = task;
1107         }
1108         notify();
1109     }
1110
1111     // Same as trySteal, but also deletes the task from the queue so the
1112     // Task object can be recycled.
1113     bool tryStealDelete(AbstractTask* toSteal) {
1114         if( !deleteItem(toSteal) ) {
1115             return false;
1116         }
1117
1118         toSteal.job();
1119
1120         /* shouldSetDone should always be true except if the task re-submits
1121          * itself to the pool and needs to bypass this.*/
1122         if(toSteal.shouldSetDone == 1) {
1123             atomicSetUbyte(toSteal.taskStatus, TaskState.done);
1124         }
1125
1126         return true;
1127     }
1128
1129     size_t defaultBlockSize(size_t rangeLen) {
1130         if(this.size == 0) {
1131             return rangeLen;
1132         }
1133
1134         immutable size_t twoSize = 2 * (this.size + 1);
1135         return (rangeLen / twoSize) + ((rangeLen % twoSize == 0) ? 0 : 1);
1136     }
1137
1138     void lock() {
1139         mutex.lock();
1140     }
1141
1142     void unlock() {
1143         mutex.unlock();
1144     }
1145
1146     void wait() {
1147         workerCondition.wait();
1148     }
1149
1150     void notify() {
1151         workerCondition.notify();
1152     }
1153
1154     void notifyAll() {
1155         workerCondition.notifyAll();
1156     }
1157
1158     void waitUntilCompletion() {
1159         waiterCondition.wait();
1160     }
1161
1162     void notifyWaiters() {
1163         waiterCondition.notifyAll();
1164     }
1165
1166 public:
1167
1168     /**Default constructor that initializes a TaskPool with
1169      * however many cores are on your CPU, minus 1 because the thread
1170      * that initialized the pool will also do work.
1171      *
1172      * Note:  Initializing a pool with zero threads (as would happen in the
1173      * case of a single-core CPU) is well-tested and does work.
1174      */
1175     this() {
1176         this(coresPerCPU - 1);
1177     }
1178
1179     /**Allows for custom poolSize.*/
1180     this(size_t poolSize) {
1181         synchronized(TaskPool.classinfo) {
1182             instanceStartIndex = nextInstanceIndex;
1183
1184             // The first worker thread to be initialized will have this index,
1185             // and will increment it.  The second worker to be initialized will
1186             // have this index plus 1.
1187             nextThreadIndex = instanceStartIndex;
1188
1189             nextInstanceIndex += poolSize;
1190         }
1191
1192         mutex = new Mutex(this);
1193         workerCondition = new Condition(mutex);
1194         waiterCondition = new Condition(mutex);
1195
1196         pool = new Thread[poolSize];
1197         foreach(ref poolThread; pool) {
1198             poolThread = new Thread(&workLoop);
1199             poolThread.start();
1200         }
1201     }
1202
1203     /**Gets the index of the current thread relative to this pool.  The thread
1204      * that created this pool receives an index of 0.  The worker threads in
1205      * this pool receive indices of 1 through poolSize.
1206      *
1207      * The worker index is useful mainly for maintaining worker-local storate.
1208      *
1209      * Bugs:  Subject to integer overflow errors if more than size_t.max threads
1210      *        are ever created during the course of a program's execution.  This
1211      *        will likely never be fixed because it's an extreme corner case
1212      *        on 32-bit and it's completely implausible on 64-bit.
1213      *
1214      *        Will silently return undefined results for threads that are not
1215      *        workers in this pool and did not create this pool.
1216      */
1217     final size_t workerIndex() {
1218         immutable rawInd = threadIndex;
1219         return (rawInd >= instanceStartIndex) ?
1220                 (rawInd - instanceStartIndex + 1) : 0;
1221     }
1222
1223     /**Create an instance of worker-local storage, initialized with a given
1224      * value.  The value is lazy so that you can, for example, easily
1225      * create one instance of a class for each worker.
1226      */
1227     WorkerLocal!(T) createWorkerLocal(T)(lazy T initialVal = T.init) {
1228         WorkerLocal!(T) ret;
1229         ret.initialize(this);
1230         foreach(i; 0..size + 1) {
1231             ret[i] = initialVal;
1232         }
1233         synchronized {}  // Make sure updates are visible in all threads.
1234         return ret;
1235     }
1236
1237     /**Kills pool immediately w/o waiting for jobs to finish.  Use only if you
1238      * have waitied on every job and therefore know there can't possibly be more
1239      * in queue, or if you speculatively executed a bunch of stuff and realized
1240      * you don't need those results anymore.
1241      *
1242      * Note:  Does not affect jobs that are already executing, only those
1243      * in queue.
1244      */
1245     void stop() {
1246         lock();
1247         scope(exit) unlock();
1248         atomicSetUbyte(status, PoolState.stopNow);
1249         notifyAll();
1250     }
1251
1252     /// Waits for all jobs to finish, then shuts down the pool.
1253     void waitStop() {
1254         finish();
1255         foreach(t; pool) {
1256             t.join();
1257         }
1258     }
1259
1260     /**Instructs worker threads to stop when the queue becomes empty, but does
1261      * not block.
1262      */
1263     void finish() {
1264         lock();
1265         scope(exit) unlock();
1266         atomicCasUbyte(status, PoolState.running, PoolState.finishing);
1267         notifyAll();
1268     }
1269
1270     /// Returns the number of worker threads in the pool.
1271     final uint size() {
1272         return pool.length;
1273     }
1274
1275     // Kept public for backwards compatibility, but not documented.
1276     // Using ref parameters is a nicer API and is made safe because the
1277     // d'tor for Task waits until the task is finished before destroying the
1278     // stack frame.  This function will eventually be made private and/or
1279     // deprecated.
1280     void put(alias fun, Args...)(Task!(fun, Args)* task) {
1281         task.pool = this;
1282         abstractPut( cast(AbstractTask*) task);
1283     }
1284
1285     /**Put a task on the queue.
1286     *
1287     * Note:  While this function takes the address of variables that may
1288     * potentially be on the stack, it is safe and will be marked as @trusted
1289     * once SafeD is fully implemented.  Task objects include a destructor that
1290     * waits for the task to complete before destroying the stack frame that
1291     * they are allocated on.  Therefore, it is impossible for the stack
1292     * frame to be destroyed before the task is complete and out of the queue.
1293     */
1294     void put(alias fun, Args...)(ref Task!(fun, Args) task) {
1295         task.pool = this;
1296         abstractPut( cast(AbstractTask*) &task);
1297     }
1298
1299     /**Convenience method that automatically creates a Task calling an alias on
1300      * the GC heap and submits it to the pool.  See examples for the
1301      * non-member function task().
1302      *
1303      * Returns:  A pointer to the Task object.
1304      */
1305     Task!(fun, Args)* task(alias fun, Args...)(Args args) {
1306         auto ret = toHeap(.task!(fun)(args));
1307         put(ret);
1308         return ret;
1309     }
1310
1311     /**Convenience method that automatically creates a Task calling a delegate,
1312      * function pointer, or functor on the GC heap and submits it to the pool.
1313      * See examples for the non-member function task().
1314      *
1315      * Returns:  A pointer to the Task object.
1316      *
1317      * Note:  This function takes a non-scope delegate, meaning it can be
1318      * used with closures.  If you can't allocate a closure due to objects
1319      * on the stack that have scoped destruction, see the global function
1320      * task(), which takes a scope delegate.
1321      */
1322      Task!(runCallable, TypeTuple!(F, Args))*
1323      task(F, Args...)(F delegateOrFp, Args args)
1324      if(is(ReturnType!(F))) {
1325          auto ptr = toHeap(.task(delegateOrFp, args));
1326          put(ptr);
1327          return ptr;
1328      }
1329
1330     /**Implements a parallel foreach loop over a range.  blockSize is the
1331      * number of elements to process in one work unit.
1332      *
1333      * Examples:
1334      * ---
1335      * auto pool = new TaskPool();
1336      *
1337      * uint[] squares = new uint[1_000];
1338      * foreach(i; pool.parallel( iota(0, foo.length), 100)) {
1339      *     // Iterate over foo using work units of size 100.
1340      *     squares[i] = i * i;
1341      * }
1342      * ---
1343      *
1344      * Note:  Since breaking from a loop that's being executed in parallel
1345      * doesn't make much sense, it is considered undefined behavior in this
1346      * implementation of parallel foreach.
1347      *
1348      */
1349     ParallelForeach!R parallel(R)(R range, size_t blockSize) {
1350         alias ParallelForeach!R RetType;
1351         return RetType(this, range, blockSize);
1352     }
1353
1354     /**Parallel foreach with default block size.  For ranges that don't have
1355      * a length, the default is 512 elements.  For ranges that do, the default
1356      * is whatever number would create exactly twice as many work units as
1357      * we have worker threads.
1358      */
1359     ParallelForeach!R parallel(R)(R range) {
1360         static if(hasLength!R) {
1361             // Default block size is such that we would use 2x as many
1362             // slots as are in this thread pool.
1363             size_t blockSize = defaultBlockSize(range.length);
1364             return parallel(range, blockSize);
1365         } else {
1366             // Just use a really, really dumb guess if the user is too lazy to
1367             // specify.
1368             return parallel(range, 512);
1369         }
1370     }
1371
1372     /**Parallel map.  Unlike std.algorithm.map, this is evaluated eagerly
1373      * because it wouldn't make sense to evaluate a parallel map lazily.
1374      *
1375      * fun is the function to be evaluated, range is the range to evaluate
1376      * this function on.  range must be a random access range with length
1377      * for now, though this restriction may be lifted in the future.
1378      * blockSize is the size of the work unit to submit to the thread pool,
1379      * in elements.  buf is an optional buffer to store the results in.
1380      * If none is provided, one is allocated.  If one is provided, it
1381      * must have the same length as range.
1382      *
1383      * Examples:
1384      * ---
1385      * auto pool = new TaskPool();
1386      *
1387      * real[] numbers = new real[1_000];
1388      * foreach(i, ref num; numbers) {
1389      *     num = i;
1390      * }
1391      *
1392      * // Find the squares of numbers[] using work units of size 100.
1393      * real[] squares = pool.map!"a * a"(numbers, 100);
1394      * ---
1395      */
1396     // Bug:  Weird behavior trying to do the overload the normal way.
1397     // Doing it with templating on the type of blockSize b/c that's the
1398     // only way it compiles.
1399     MapType!(fun, R)[] map(alias fun, R, I : size_t)(R range, I blockSize,
1400         MapType!(fun, R)[] buf = null) {
1401         immutable len = range.length;
1402
1403         if(buf.length == 0) {
1404             // Create buffer without initializing contents.
1405             alias MapType!(fun, R) MT;
1406             GC.BlkAttr gcFlags = (typeid(MT).flags & 1) ?
1407                                   cast(GC.BlkAttr) 0 :
1408                                   GC.BlkAttr.NO_SCAN;
1409             auto myPtr = cast(MT*) GC.malloc(len * MT.sizeof, gcFlags);
1410             buf = myPtr[0..len];
1411         }
1412         enforce(buf.length == len,
1413             text("Can't use a user supplied buffer that's the wrong size.  ",
1414             "(Expected  :", len, " Got:  ", buf.length));
1415         if(blockSize > len) {
1416             blockSize = len;
1417         }
1418
1419         // Handle as a special case:
1420         if(size == 0) {
1421             size_t index = 0;
1422             foreach(elem; range) {
1423                 buf[index++] = unaryFun!(fun)(elem);
1424             }
1425             return buf;
1426         }
1427
1428         alias MapTask!(fun, R) MTask;
1429         MTask[] tasks = (cast(MTask*) alloca(this.size * MTask.sizeof * 2))
1430                         [0..this.size * 2];
1431         tasks[] = MTask.init;
1432
1433         size_t curPos;
1434         void useTask(ref MTask task) {
1435             task.lowerBound = curPos;
1436             task.upperBound = min(len, curPos + blockSize);
1437             task.range = range;
1438             task.results = buf;
1439             task.pool = this;
1440             curPos += blockSize;
1441
1442             lock();
1443             atomicSetUbyte(task.taskStatus, TaskState.notStarted);
1444             abstractPutNoSync(cast(AbstractTask*) &task);
1445             unlock();
1446         }
1447
1448         ubyte doneSubmitting = 0;
1449
1450         Task!(runCallable, void delegate()) submitNextBatch;
1451
1452         void submitJobs() {
1453             // Search for slots, then sleep.
1454             foreach(ref task; tasks) if(task.done) {
1455                 useTask(task);
1456                 if(curPos >= len) {
1457                     atomicSetUbyte(doneSubmitting, 1);
1458                     return;
1459                 }
1460             }
1461
1462             // Now that we've submitted all the worker tasks, submit
1463             // the next submission task.  Synchronizing on the pool
1464             // to prevent the stealing thread from deleting the job
1465             // before it's submitted.
1466             lock();
1467             atomicSetUbyte(submitNextBatch.taskStatus, TaskState.notStarted);
1468             abstractPutNoSync( cast(AbstractTask*) &submitNextBatch);
1469             unlock();
1470         }
1471
1472         submitNextBatch = .task(&submitJobs);
1473
1474         // The submitAndSteal mixin relies on the TaskPool instance
1475         // being called pool.
1476         TaskPool pool = this;
1477
1478         mixin(submitAndSteal);
1479
1480         return buf;
1481     }
1482
1483     /**Parallel map with default block size.*/
1484     MapType!(fun, R)[] map(alias fun, R)(R range, MapType!(fun, R)[] buf = null) {
1485         size_t blkSize = defaultBlockSize(range.length);
1486         alias map!(fun, R, size_t) mapFun;
1487
1488         return mapFun(range, blkSize, buf);
1489     }
1490
1491     /**Parallel reduce.  For now, the range must offer random access and have
1492      * a length.  In the future, this restriction may be lifted.
1493      *
1494      * Note:  Because this operation is being carried out in parallel,
1495      * fun must be associative.  For notational simplicity, let # be an
1496      * infix operator representing fun.  Then, (a # b) # c must equal
1497      * a # (b # c).  This is NOT the same thing as commutativity.  Matrix
1498      * multiplication, for example, is associative but not commutative.
1499      *
1500      * Examples:
1501      * ---
1502      * // Find the max of an array in parallel.  Note that this is a toy example
1503      * // and unless the comparison function was very expensive, it would
1504      * // almost always be faster to do this in serial.
1505      *
1506      * auto pool = new TaskPool();
1507      *
1508      * auto myArr = somethingExpensiveToCompare();
1509      * auto myMax = pool.reduce!max(myArr);
1510      * ---
1511      */
1512     ReduceType!(fun, R, E)
1513     reduce(alias fun, R, E)(E startVal, R range, size_t blockSize = 0) {
1514
1515         if(size == 0) {
1516             return std.algorithm.reduce!(fun)(startVal, range);
1517         }
1518
1519         // Unlike the rest of the functions here, I can't use the Task object
1520         // recycling trick here because this has to work on non-commutative
1521         // operations.  After all the tasks are done executing, fun() has to
1522         // be applied on the results of these to get a final result, but
1523         // it can't be evaluated out of order.
1524
1525         immutable len = range.length;
1526         if(blockSize == 0) {
1527             blockSize = defaultBlockSize(len);
1528         }
1529
1530         if(blockSize > len) {
1531             blockSize = len;
1532         }
1533
1534         immutable size_t nWorkUnits = (len / blockSize) +
1535             ((len % blockSize == 0) ? 0 : 1);
1536         assert(nWorkUnits * blockSize >= len);
1537
1538         alias binaryFun!fun compiledFun;
1539
1540         static E reduceOnRange
1541         (E startVal, R range, size_t lowerBound, size_t upperBound) {
1542             E result = startVal;
1543             foreach(i; lowerBound..upperBound) {
1544                 result = compiledFun(result, range[i]);
1545             }
1546             return result;
1547         }
1548
1549         alias Task!(reduceOnRange, E, R, size_t, size_t) RTask;
1550         RTask[] tasks;
1551
1552         enum MAX_STACK = 512;
1553         immutable size_t nBytesNeeded = nWorkUnits * RTask.sizeof;
1554
1555         if(nBytesNeeded < MAX_STACK) {
1556             tasks = (cast(RTask*) alloca(nBytesNeeded))[0..nWorkUnits];
1557             tasks[] = RTask.init;
1558         } else {
1559             tasks = new RTask[nWorkUnits];
1560         }
1561
1562         size_t curPos = 0;
1563         void useTask(ref RTask task) {
1564             task.args[2] = curPos + 1; // lower bound.
1565             task.args[3] = min(len, curPos + blockSize);  // upper bound.
1566             task.args[1] = range;  // range
1567             task.args[0] = range[curPos];  // Start val.
1568
1569             curPos += blockSize;
1570             put(task);
1571         }
1572
1573         foreach(ref task; tasks) {
1574             useTask(task);
1575         }
1576
1577         // Try to steal each of these.
1578         foreach(ref task; tasks) {
1579             tryStealDelete( cast(AbstractTask*) &task);
1580         }
1581
1582         // Now that we've tried to steal every task, they're all either done
1583         // or in progress.  Wait on all of them.
1584         E result = startVal;
1585         foreach(ref task; tasks) {
1586             task.yieldWait();
1587             result = compiledFun(result, task.returnVal);
1588         }
1589         return result;
1590     }
1591
1592     /**Parallel reduce with the first element of the range as the start value.*/
1593     ReduceType!(fun, R, ElementType!R)
1594     reduce(alias fun, R)(R range, size_t blockSize = 0) {
1595         enforce(!range.empty,
1596             "Cannot reduce an empty range with first element as start value.");
1597         auto startVal = range.front;
1598         range.popFront;
1599         return reduce!(fun, R, ElementType!R)(startVal, range, blockSize);
1600     }
1601 }
1602
1603 // Where the magic happens.  This mixin causes tasks to be submitted lazily to
1604 // the task pool.  Attempts are then made by the calling thread to steal
1605 // them.
1606 enum submitAndSteal = q{
1607
1608     // See documentation for BaseMixin.shouldSetDone.
1609     submitNextBatch.shouldSetDone = false;
1610
1611     // Submit first batch from this thread.
1612     submitJobs();
1613
1614     while( !atomicReadUbyte(doneSubmitting) ) {
1615         // Try to steal parallel foreach tasks.
1616         foreach(ref task; tasks) {
1617             pool.tryStealDelete( cast(AbstractTask*) &task);
1618         }
1619
1620         // All tasks in progress or done unless next
1621         // submission task started running.  Try to steal the submission task.
1622         pool.tryStealDelete(cast(AbstractTask*) &submitNextBatch);
1623     }
1624
1625     // Steal one last time, after they're all submitted.
1626     foreach(ref task; tasks) {
1627         pool.tryStealDelete( cast(AbstractTask*) &task);
1628     }
1629
1630
1631     foreach(ref task; tasks) {
1632         task.wait();
1633     }
1634 };
1635
1636 /*------Structs that implement opApply for parallel foreach.------------------*/
1637 template randLen(R) {
1638     enum randLen = isRandomAccessRange!R && hasLength!R;
1639 }
1640
1641 private enum string parallelApplyMixin = q{
1642     alias ParallelForeachTask!(R, typeof(dg)) PTask;
1643
1644     // Handle empty thread pool as special case.
1645     if(pool.size == 0) {
1646         int res = 0;
1647         size_t index = 0;
1648
1649         // The explicit ElementType!R in the foreach loops is necessary for
1650         // correct behavior when iterating over strings.
1651         static if(lValueElements!(R)) {
1652             foreach(ref ElementType!R elem; range) {
1653                 static if(ParameterTypeTuple!(dg).length == 2) {
1654                     res = dg(index, elem);
1655                 } else {
1656                     res = dg(elem);
1657                 }
1658                 index++;
1659             }
1660         } else {
1661             foreach(ElementType!R elem; range) {
1662                 static if(ParameterTypeTuple!(dg).length == 2) {
1663                     res = dg(index, elem);
1664                 } else {
1665                     res = dg(elem);
1666                 }
1667                 index++;
1668             }
1669         }
1670         return res;
1671     }
1672
1673     PTask[] tasks = (cast(PTask*) alloca(pool.size * PTask.sizeof * 2))
1674                     [0..pool.size * 2];
1675     tasks[] = PTask.init;
1676     Task!(runCallable, void delegate()) submitNextBatch;
1677
1678     static if(randLen!R) {
1679
1680         immutable size_t len = range.length;
1681         size_t curPos = 0;
1682
1683         void useTask(ref PTask task) {
1684             task.lowerBound = curPos;
1685             task.upperBound = min(len, curPos + blockSize);
1686             task.myRange = range;
1687             task.runMe = dg;
1688             task.pool = pool;
1689             curPos += blockSize;
1690
1691             pool.lock();
1692             atomicSetUbyte(task.taskStatus, TaskState.notStarted);
1693             pool.abstractPutNoSync(cast(AbstractTask*) &task);
1694             pool.unlock();
1695         }
1696
1697         void submitJobs() {
1698             // Search for slots to recycle.
1699             foreach(ref task; tasks) if(task.done) {
1700                 useTask(task);
1701                 if(curPos >= len) {
1702                     atomicSetUbyte(doneSubmitting, 1);
1703                     return;
1704                 }
1705             }
1706
1707             // Now that we've submitted all the worker tasks, submit
1708             // the next submission task.  Synchronizing on the pool
1709             // to prevent the stealing thread from deleting the job
1710             // before it's submitted.
1711             pool.lock();
1712             atomicSetUbyte(submitNextBatch.taskStatus, TaskState.notStarted);
1713             pool.abstractPutNoSync( cast(AbstractTask*) &submitNextBatch);
1714             pool.unlock();
1715         }
1716
1717     } else {
1718
1719         void useTask(ref PTask task) {
1720             task.runMe = dg;
1721             task.pool = pool;
1722             size_t copyIndex = 0;
1723             if(task.elements.length == 0) {
1724                 task.elements.length = blockSize;
1725             }
1726
1727             for(; copyIndex < blockSize && !range.empty; copyIndex++) {
1728                 static if(lValueElements!R) {
1729                     task.elements[copyIndex] = &range.front();
1730                 } else {
1731                     task.elements[copyIndex] = range.front;
1732                 }
1733                 range.popFront;
1734             }
1735
1736             // We only actually change the array  size on the last task, when the
1737             // range is empty.
1738             task.elements = task.elements[0..copyIndex];
1739
1740             pool.lock();
1741             task.startIndex = this.startIndex;
1742             this.startIndex += copyIndex;
1743             atomicSetUbyte(task.taskStatus, TaskState.notStarted);
1744             pool.abstractPutNoSync(cast(AbstractTask*) &task);
1745             pool.unlock();
1746         }
1747
1748
1749         void submitJobs() {
1750             // Search for slots to recycle.
1751             foreach(ref task; tasks) if(task.done) {
1752                 useTask(task);
1753                 if(range.empty) {
1754                     atomicSetUbyte(doneSubmitting, 1);
1755                     return;
1756                 }
1757             }
1758
1759             // Now that we've submitted all the worker tasks, submit
1760             // the next submission task.  Synchronizing on the pool
1761             // to prevent the stealing thread from deleting the job
1762             // before it's submitted.
1763             pool.lock();
1764             atomicSetUbyte(submitNextBatch.taskStatus, TaskState.notStarted);
1765             pool.abstractPutNoSync( cast(AbstractTask*) &submitNextBatch);
1766             pool.unlock();
1767         }
1768
1769     }
1770     submitNextBatch = task(&submitJobs);
1771
1772     mixin(submitAndSteal);
1773
1774     return 0;
1775 };
1776
1777 private struct ParallelForeach(R) {
1778     TaskPool pool;
1779     R range;
1780     size_t blockSize;
1781     size_t startIndex;
1782     ubyte doneSubmitting;
1783
1784     alias ElementType!R E;
1785
1786     int opApply(scope int delegate(ref E) dg) {
1787         mixin(parallelApplyMixin);
1788     }
1789
1790     int opApply(scope int delegate(ref size_t, ref E) dg) {
1791         mixin(parallelApplyMixin);
1792     }
1793 }
1794
1795 version(unittest) {
1796     // This was the only way I could get nested maps to work.
1797     __gshared TaskPool poolInstance;
1798 }
1799
1800 // These unittests are intended to also function as an example of how to
1801 // use this module.
1802 unittest {
1803     foreach(attempt; 0..10)
1804     foreach(poolSize; [0, 4]) {
1805         // Create a TaskPool object with the default number of threads.
1806         poolInstance = new TaskPool(poolSize);
1807
1808         // Create some data to work on.
1809         uint[] numbers = new uint[1_000];
1810
1811         // Fill in this array in parallel, using default block size.
1812         // Note:  Be careful when writing to adjacent elements of an arary from
1813         // different threads, as this can cause word tearing bugs when
1814         // the elements aren't properly aligned or aren't the machine's native
1815         // word size.  In this case, though, we're ok.
1816         foreach(i; poolInstance.parallel( iota(0, numbers.length)) ) {
1817             numbers[i] = i;
1818         }
1819
1820         // Make sure it works.
1821         foreach(i; 0..numbers.length) {
1822             assert(numbers[i] == i);
1823         }
1824
1825         stderr.writeln("Done creating nums.");
1826
1827         // Parallel foreach also works on non-random access ranges, albeit
1828         // less efficiently.
1829         auto myNumbers = filter!"a % 7 > 0"( iota(0, 1000));
1830         foreach(num; poolInstance.parallel(myNumbers)) {
1831             assert(num % 7 > 0 && num < 1000);
1832         }
1833         stderr.writeln("Done modulus test.");
1834
1835         // Use parallel map to calculate the square of each element in numbers,
1836         // and make sure it's right.
1837         uint[] squares = poolInstance.map!"a * a"(numbers, 100);
1838         assert(squares.length == numbers.length);
1839         foreach(i, number; numbers) {
1840             assert(squares[i] == number * number);
1841         }
1842         stderr.writeln("Done squares.");
1843
1844         // Sum up the array in parallel with the current thread.
1845         auto sumFuture = poolInstance.task!( reduce!"a + b" )(numbers);
1846
1847         // Go off and do other stuff while that future executes:
1848         // Find the sum of squares of numbers.
1849         ulong sumSquares = 0;
1850         foreach(elem; numbers) {
1851             sumSquares += elem * elem;
1852         }
1853
1854         // Ask for our result.  If the pool has not yet started working on
1855         // this task, spinWait() automatically steals it and executes it in this
1856         // thread.
1857         uint mySum = sumFuture.spinWait();
1858         assert(mySum == 999 * 1000 / 2);
1859
1860         // We could have also computed this sum in parallel using parallel
1861         // reduce.
1862         auto mySumParallel = poolInstance.reduce!"a + b"(numbers);
1863         assert(mySum == mySumParallel);
1864         stderr.writeln("Done sums.");
1865
1866         // Execute an anonymous delegate as a task.
1867         auto myTask = task({
1868             synchronized writeln("Our lives are parallel...Our lives are parallel.");
1869         });
1870         poolInstance.put(myTask);
1871
1872         // Parallel foreach loops can also be nested, and can have an index
1873         // variable attached to the foreach loop.
1874         auto nestedOuter = "abcd";
1875         auto nestedInner =  iota(0, 10, 2);
1876
1877         foreach(i, letter; poolInstance.parallel(nestedOuter, 1)) {
1878             foreach(j, number; poolInstance.parallel(nestedInner, 1)) {
1879                 synchronized writeln
1880                     (i, ": ", letter, "  ", j, ": ", number);
1881             }
1882         }
1883
1884         // Block until all jobs are finished and then shut down the thread pool.
1885         poolInstance.waitStop();
1886     }
1887
1888     writeln("Press enter to go to next round of unittests.");
1889     readln();
1890 }
1891
1892 // These unittests are intended more for actual testing and not so much
1893 // as examples.
1894 unittest {
1895     foreach(attempt; 0..10)
1896     foreach(poolSize; [0, 4]) {
1897         poolInstance = new TaskPool(poolSize);
1898
1899         // Test indexing.
1900         stderr.writeln("Creator Raw Index:  ", poolInstance.threadIndex);
1901         assert(poolInstance.workerIndex() == 0);
1902
1903         // Test worker-local storage.
1904         auto workerLocal = poolInstance.createWorkerLocal!(uint)(1);
1905         foreach(i; poolInstance.parallel(iota(0U, 1_000_000))) {
1906             workerLocal.get++;
1907         }
1908         assert(reduce!"a + b"(workerLocal.toRange) ==
1909             1_000_000 + poolInstance.size + 1);
1910
1911         // Make sure work is reasonably balanced among threads.  This test is
1912         // non-deterministic and is more of a sanity check than something that
1913         // has an absolute pass/fail.
1914         uint[void*] nJobsByThread;
1915         foreach(thread; poolInstance.pool) {
1916             nJobsByThread[cast(void*) thread] = 0;
1917         }
1918         nJobsByThread[ cast(void*) Thread.getThis] = 0;
1919
1920         foreach(i; poolInstance.parallel( iota(0, 1_000_000), 100 )) {
1921             atomicIncUint( nJobsByThread[ cast(void*) Thread.getThis() ]);
1922         }
1923
1924         stderr.writeln("\nCurrent (stealing) thread is:  ",
1925             cast(void*) Thread.getThis());
1926         stderr.writeln("Workload distribution:  ");
1927         foreach(k, v; nJobsByThread) {
1928             stderr.writeln(k, '\t', v);
1929         }
1930
1931         // Test whether map can be nested.
1932         real[][] matrix = new real[][](1000, 1000);
1933         foreach(i; poolInstance.parallel( iota(0, matrix.length) )) {
1934             foreach(j; poolInstance.parallel( iota(0, matrix[0].length) )) {
1935                 matrix[i][j] = i * j;
1936             }
1937         }
1938
1939         // Get around weird bugs having to do w/ sqrt being an intrinsic:
1940         static real mySqrt(real num) {
1941             return sqrt(num);
1942         }
1943
1944         static real[] parallelSqrt(real[] nums) {
1945             return poolInstance.map!mySqrt(nums);
1946         }
1947
1948         real[][] sqrtMatrix = poolInstance.map!parallelSqrt(matrix);
1949
1950         foreach(i, row; sqrtMatrix) {
1951             foreach(j, elem; row) {
1952                 real shouldBe = sqrt( cast(real) i * j);
1953                 assert(approxEqual(shouldBe, elem));
1954                 sqrtMatrix[i][j] = shouldBe;
1955             }
1956         }
1957
1958         auto saySuccess = task({
1959             stderr.writeln(
1960                 "Success doing matrix stuff that involves nested pool use.");
1961         });
1962         poolInstance.put(saySuccess);
1963         saySuccess.workWait();
1964
1965         // A more thorough test of map, reduce:  Find the sum of the square roots of
1966         // matrix.
1967
1968         static real parallelSum(real[] input) {
1969             return poolInstance.reduce!"a + b"(input);
1970         }
1971
1972         auto sumSqrt = poolInstance.reduce!"a + b"(
1973             poolInstance.map!parallelSum(
1974                 sqrtMatrix
1975             )
1976         );
1977
1978         assert(approxEqual(sumSqrt, 4.437e8));
1979         stderr.writeln("Done sum of square roots.");
1980
1981         // Test whether tasks work with function pointers.
1982         auto nanTask = poolInstance.task(&isNaN, 1.0L);
1983         assert(nanTask.spinWait == false);
1984
1985         if(poolInstance.size > 0) {
1986             // Test work waiting.
1987             static void uselessFun() {
1988                 foreach(i; 0..1_000_000) {}
1989             }
1990
1991             auto uselessTasks = new typeof(task(&uselessFun))[1000];
1992             foreach(ref uselessTask; uselessTasks) {
1993                 uselessTask = task(&uselessFun);
1994             }
1995             foreach(ref uselessTask; uselessTasks) {
1996                 poolInstance.put(uselessTask);
1997             }
1998             foreach(ref uselessTask; uselessTasks) {
1999                 uselessTask.workWait();
2000             }
2001         }
2002
2003         // Test the case of non-random access + ref returns.
2004         int[] nums = [1,2,3,4,5];
2005         static struct RemoveRandom {
2006             int[] arr;
2007
2008             ref int front() { return arr.front; }
2009             void popFront() { arr.popFront(); }
2010             bool empty() { return arr.empty; }
2011         }
2012
2013         auto refRange = RemoveRandom(nums);
2014         foreach(ref elem; poolInstance.parallel(refRange)) {
2015             elem++;
2016         }
2017         assert(nums == [2,3,4,5,6]);
2018         stderr.writeln("Nums:  ", nums);
2019
2020         poolInstance.stop();
2021     }
2022 }
2023
2024 version(unittest) {
2025     void main() {}
2026 }
Note: See TracBrowser for help on using the browser.