root/trunk/parallelFuture/std_parallelism.d

Revision 834, 87.3 kB (checked in by dsimcha, 4 years ago)

64-bit support.

Line 
1 /**
2 $(D std.parallelism) is a library that implements some high-level primitives
3 for shared memory SMP parallelism.  These include parallel foreach, parallel
4 reduce, parallel eager map and basic task parallelism primitives.
5
6 This module is geared towards parallelism, not concurrency.  In particular,
7 the default behavior on single-core machines is to use no multithreading at
8 all, since there are no opportunities for parallelism on such machines.
9
10 Warning:  Most of this module completely subverts D's type system to achieve
11           unchecked data sharing and cannot be used with SafeD.
12           If you're looking for D's flagship message passing concurrency
13           model, which can be used with SafeD, you should use
14           $(D std.concurrency) instead.  However, the one exception is that
15           tasks can be used safely (i.e. from SafeD) under a limited set of
16           circumstances, detailed in the documentation for $(D task).
17
18 Author:  David Simcha
19 Copyright:  Copyright (c) 2009-2010, David Simcha.
20 License:    $(WEB boost.org/LICENSE_1_0.txt, Boost License 1.0)
21 */
22 module std.parallelism;
23
24 import core.thread, core.cpuid, std.algorithm, std.range, std.c.stdlib, std.stdio,
25     std.exception, std.functional, std.conv, std.math, core.memory, std.traits,
26     std.typetuple, core.stdc.string, std.typecons;
27
28 import core.sync.condition, core.sync.mutex, core.atomic;
29
30 // Workaround for bug 3753.
31 version(Posix) {
32     // Can't use alloca() because it can't be used with exception handling.
33     // Use the GC instead even though it's slightly less efficient.
34     void* alloca(size_t nBytes) {
35         return GC.malloc(nBytes);
36     }
37 } else {
38     // Can really use alloca().
39     import core.stdc.stdlib : alloca;
40 }
41
42 /* Atomics code.  These just forward to core.atomic, but are written like this
43    for two reasons:
44
45    1.  They used to actually contain ASM code and I don' want to have to change
46        to directly calling core.atomic in a zillion different places.
47
48    2.  core.atomic has some misc. issues that make my use cases difficult
49        without wrapping it.  If I didn't wrap it, casts would be required
50        basically everywhere.
51 */
52 void atomicSetUbyte(ref ubyte stuff, ubyte newVal) {
53     core.atomic.cas(cast(shared) &stuff, stuff, newVal);
54 }
55
56 // Cut and pasted from core.atomic.  See Bug 4760.
57 version(D_InlineAsm_X86) {
58     ubyte atomicReadUbyte(ref ubyte val) {
59         asm {
60             mov DL, 0;
61             mov AL, 0;
62             mov ECX, val;
63             lock; // lock always needed to make this op atomic
64             cmpxchg [ECX], DL;
65         }
66     }
67 } else version(D_InlineAsm_X86_64) {
68     ubyte atomicReadUbyte(ref ubyte val) {
69         asm {
70             mov DL, 0;
71             mov AL, 0;
72             mov RCX, val;
73             lock; // lock always needed to make this op atomic
74             cmpxchg [RCX], DL;
75         }
76     }
77 }
78
79 // This gets rid of the need for a lot of annoying casts in other parts of the
80 // code, when enums are involved.
81 bool atomicCasUbyte(ref ubyte stuff, ubyte testVal, ubyte newVal) {
82     return core.atomic.cas(cast(shared) &stuff, testVal, newVal);
83 }
84
85 void atomicIncUint(ref uint num) {
86     atomicOp!"+="(num, 1U);
87 }
88
89 //-----------------------------------------------------------------------------
90
91
92 /*--------------------- Generic helper functions, etc.------------------------*/
93 private template MapType(R, functions...) {
94     static if(functions.length == 0) {
95         alias typeof(unaryFun!(functions[0])(ElementType!(R).init)) MapType;
96     } else {
97         alias typeof(adjoin!(staticMap!(unaryFun, functions))
98             (ElementType!(R).init)) MapType;
99     }
100 }
101
102 private template ReduceType(alias fun, R, E) {
103     alias typeof(binaryFun!(fun)(E.init, ElementType!(R).init)) ReduceType;
104 }
105
106 private template noUnsharedAliasing(T) {
107     enum bool noUnsharedAliasing = !hasUnsharedAliasing!T;
108 }
109
110 private template isSafeTask(F) {
111     enum bool isSafeTask =
112     (!hasUnsharedAliasing!(ReturnType!F) ||
113         (functionAttributes!F & FunctionAttribute.PURE)) &&
114     !(functionAttributes!F & FunctionAttribute.REF) &&
115     (isFunctionPointer!F || !hasUnsharedAliasing!F) &&
116     allSatisfy!(noUnsharedAliasing, ParameterTypeTuple!F);
117 }
118
119 unittest {
120     static assert(isSafeTask!(void function()));
121     static assert(isSafeTask!(void function(uint, string)));
122     static assert(!isSafeTask!(void function(uint, char[])));
123
124     alias uint[] function(uint, string) pure F1;
125     alias uint[] function(uint, string) F2;
126     static assert(isSafeTask!(F1));
127     static assert(!isSafeTask!(F2));
128 }
129
130
131 private void sleepMillisecond(long nMilliseconds) {
132     Thread.sleep(nMilliseconds * 10_000);
133 }
134
135 private T* moveToHeap(T)(ref T object) {
136     GC.BlkAttr gcFlags = (typeid(T).flags & 1) ?
137                           cast(GC.BlkAttr) 0 :
138                           GC.BlkAttr.NO_SCAN;
139     T* myPtr = cast(T*) GC.malloc(T.sizeof, gcFlags);
140
141     core.stdc.string.memcpy(myPtr, &object, T.sizeof);
142     object = T.init;
143
144     return myPtr;
145 }
146
147 //------------------------------------------------------------------------------
148 /* Various classes of task.  These use manual C-style polymorphism, the kind
149  * with lots of structs and pointer casting.  This is because real classes
150  * would prevent some of the allocation tricks I'm using and waste space on
151  * monitors and vtbls for something that needs to be ultra-efficient.
152  */
153
154 private enum TaskState : ubyte {
155     notStarted,
156     inProgress,
157     done
158 }
159
160 private template BaseMixin(ubyte initTaskStatus) {
161     AbstractTask* prev;
162     AbstractTask* next;
163
164     static if(is(typeof(&impl))) {
165         void function(void*) runTask = &impl;
166     } else {
167         void function(void*) runTask;
168     }
169
170     Throwable exception;
171     ubyte taskStatus = initTaskStatus;
172
173
174     /* Kludge:  Some tasks need to re-submit themselves after they finish.
175      * In this case, they will set themselves to TaskState.notStarted before
176      * resubmitting themselves.  Setting this flag to false prevents the work
177      * stealing loop from setting them to done.*/
178     bool shouldSetDone = true;
179
180     bool done() {
181         if(atomicReadUbyte(taskStatus) == TaskState.done) {
182             if(exception) {
183                 throw exception;
184             }
185
186             return true;
187         }
188
189         return false;
190     }
191 }
192
193 // The base "class" for all of the other tasks.
194 private struct AbstractTask {
195     mixin BaseMixin!(TaskState.notStarted);
196
197     void job() {
198         runTask(&this);
199     }
200 }
201
202 private template AliasReturn(alias fun, T...) {
203     alias AliasReturnImpl!(fun, T).ret AliasReturn;
204 }
205
206 private template AliasReturnImpl(alias fun, T...) {
207     private T args;
208     alias typeof(fun(args)) ret;
209 }
210
211 // Should be private, but std.algorithm.reduce is used in the zero-thread case
212 // and won't work w/ private.
213 template reduceAdjoin(functions...) {
214     static if(functions.length == 1) {
215         alias binaryFun!(functions[0]) reduceAdjoin;
216     } else {
217         T reduceAdjoin(T, U)(T lhs, U rhs) {
218             alias staticMap!(binaryFun, functions) funs;
219
220             foreach(i, Unused; typeof(lhs.field)) {
221                 lhs.field[i] = funs[i](lhs.field[i], rhs);
222             }
223
224             return lhs;
225         }
226     }
227 }
228
229 private template reduceFinish(functions...) {
230     static if(functions.length == 1) {
231         alias binaryFun!(functions[0]) reduceFinish;
232     } else {
233
234
235         T reduceFinish(T)(T lhs, T rhs) {
236             alias staticMap!(binaryFun, functions) funs;
237
238             foreach(i, Unused; typeof(lhs.field)) {
239                 lhs.field[i] = funs[i](lhs.field[i], rhs.field[i]);
240             }
241
242             return lhs;
243         }
244     }
245 }
246
247 template ElementsCompatible(R, A) {
248     static if(!isArray!A) {
249         enum bool ElementsCompatible = false;
250     } else {
251         pragma(msg, ElementType!R.stringof ~ '\t' ~ ElementType!A.stringof);
252         enum bool ElementsCompatible =
253             is(ElementType!R : ElementType!A);
254     }
255 }
256
257 /**
258 The task pool class that is the workhorse of this library.
259  */
260 final class TaskPool {
261 private:
262     Thread[] pool;
263     AbstractTask* head;
264     AbstractTask* tail;
265     PoolState status = PoolState.running;  // All operations on this are done atomically.
266     Condition workerCondition;
267     Condition waiterCondition;
268     Mutex mutex;
269
270     // The instanceStartIndex of the next instance that will be created.
271     __gshared static size_t nextInstanceIndex = 1;
272
273     // The index of the current thread.
274     static size_t threadIndex;
275
276     // The index of the first thread in the next instance.
277     immutable size_t instanceStartIndex;
278
279     // The index that the next thread to be initialized in this pool will have.
280     size_t nextThreadIndex;
281
282     enum PoolState : ubyte {
283         running,
284         finishing,
285         stopNow
286     }
287
288     void doJob(AbstractTask* job) {
289         assert(job.taskStatus == TaskState.inProgress);
290         assert(job.next is null);
291         assert(job.prev is null);
292
293         scope(exit) {
294             lock();
295             notifyWaiters();
296             unlock();
297         }
298
299         try {
300             job.job();
301             if(job.shouldSetDone) {
302                 atomicSetUbyte(job.taskStatus, TaskState.done);
303             }
304         } catch(Throwable e) {
305             job.exception = e;
306             if(job.shouldSetDone) {
307                 atomicSetUbyte(job.taskStatus, TaskState.done);
308             }
309         }
310     }
311
312     void workLoop() {
313         // Initialize thread index.
314         lock();
315         threadIndex = nextThreadIndex;
316         nextThreadIndex++;
317         unlock();
318
319         while(atomicReadUbyte(status) != PoolState.stopNow) {
320             AbstractTask* task = pop();
321             if (task is null) {
322                 if(atomicReadUbyte(status) == PoolState.finishing) {
323                     atomicSetUbyte(status, PoolState.stopNow);
324                     return;
325                 }
326             } else {
327                 doJob(task);
328             }
329         }
330     }
331
332     bool deleteItem(AbstractTask* item) {
333         lock();
334         auto ret = deleteItemNoSync(item);
335         unlock();
336         return ret;
337     }
338
339     bool deleteItemNoSync(AbstractTask* item)
340     out {
341         assert(item.next is null);
342         assert(item.prev is null);
343     } body {
344         if(item.taskStatus != TaskState.notStarted) {
345             return false;
346         }
347         item.taskStatus = TaskState.inProgress;
348
349         if(item is head) {
350             // Make sure head gets set properly.
351             popNoSync();
352             return true;;
353         }
354         if(item is tail) {
355             tail = tail.prev;
356             if(tail !is null) {
357                 tail.next = null;
358             }
359             item.next = null;
360             item.prev = null;
361             return true;
362         }
363         if(item.next !is null) {
364             assert(item.next.prev is item);  // Check queue consistency.
365             item.next.prev = item.prev;
366         }
367         if(item.prev !is null) {
368             assert(item.prev.next is item);  // Check queue consistency.
369             item.prev.next = item.next;
370         }
371         item.next = null;
372         item.prev = null;
373         return true;
374     }
375
376     // Pop a task off the queue.  Should only be called by worker threads.
377     AbstractTask* pop() {
378         lock();
379         auto ret = popNoSync();
380         while(ret is null && status == PoolState.running) {
381             wait();
382             ret = popNoSync();
383         }
384         unlock();
385         return ret;
386     }
387
388     AbstractTask* popNoSync()
389     out(returned) {
390         /* If task.prev and task.next aren't null, then another thread
391          * can try to delete this task from the pool after it's
392          * alreadly been deleted/popped.
393          */
394         if(returned !is null) {
395             assert(returned.next is null);
396             assert(returned.prev is null);
397         }
398     } body {
399         AbstractTask* returned = head;
400         if (head !is null) {
401             head = head.next;
402             returned.prev = null;
403             returned.next = null;
404             returned.taskStatus = TaskState.inProgress;
405         }
406         if(head !is null) {
407             head.prev = null;
408         }
409
410         return returned;
411     }
412
413     // Push a task onto the queue.
414     void abstractPut(AbstractTask* task) {
415         lock();
416         abstractPutNoSync(task);
417         unlock();
418     }
419
420     void abstractPutNoSync(AbstractTask* task)
421     out {
422         assert(tail.prev !is tail);
423         assert(tail.next is null, text(tail.prev, '\t', tail.next));
424         if(tail.prev !is null) {
425             assert(tail.prev.next is tail, text(tail.prev, '\t', tail.next));
426         }
427     } body {
428         task.next = null;
429         if (head is null) { //Queue is empty.
430             head = task;
431             tail = task;
432             tail.prev = null;
433         } else {
434             task.prev = tail;
435             tail.next = task;
436             tail = task;
437         }
438         notify();
439     }
440
441     // Same as trySteal, but also deletes the task from the queue so the
442     // Task object can be recycled.
443     bool tryStealDelete(AbstractTask* toSteal) {
444         if( !deleteItem(toSteal) ) {
445             return false;
446         }
447
448         toSteal.job();
449
450         /* shouldSetDone should always be true except if the task re-submits
451          * itself to the pool and needs to bypass this.*/
452         if(toSteal.shouldSetDone == 1) {
453             atomicSetUbyte(toSteal.taskStatus, TaskState.done);
454         }
455
456         return true;
457     }
458
459     size_t defaultBlockSize(size_t rangeLen) const pure nothrow {
460         if(this.size == 0) {
461             return rangeLen;
462         }
463
464         immutable size_t fourSize = 4 * (this.size + 1);
465         return (rangeLen / fourSize) + ((rangeLen % fourSize == 0) ? 0 : 1);
466     }
467
468     void lock() {
469         mutex.lock();
470     }
471
472     void unlock() {
473         mutex.unlock();
474     }
475
476     void wait() {
477         workerCondition.wait();
478     }
479
480     void notify() {
481         workerCondition.notify();
482     }
483
484     void notifyAll() {
485         workerCondition.notifyAll();
486     }
487
488     void waitUntilCompletion() {
489         waiterCondition.wait();
490     }
491
492     void notifyWaiters() {
493         waiterCondition.notifyAll();
494     }
495
496 public:
497
498     /**
499     Default constructor that initializes a TaskPool with
500     however many cores are on your CPU, minus 1 because the thread
501     that initialized the pool will also do work.
502
503     BUGS:  Will initialize with the wrong number of threads in cases were
504            core.cpuid is buggy.
505
506     Note:  Initializing a pool with zero threads (as would happen in the
507            case of a single-core CPU) is well-tested and does work.
508      */
509     this() @trusted {
510         this(coresPerCPU - 1);
511     }
512
513     /**
514     Allows for custom pool size.
515     */
516     this(size_t poolSize) @trusted {
517         synchronized(TaskPool.classinfo) {
518             instanceStartIndex = nextInstanceIndex;
519
520             // The first worker thread to be initialized will have this index,
521             // and will increment it.  The second worker to be initialized will
522             // have this index plus 1.
523             nextThreadIndex = instanceStartIndex;
524
525             nextInstanceIndex += poolSize;
526         }
527
528         mutex = new Mutex(this);
529         workerCondition = new Condition(mutex);
530         waiterCondition = new Condition(mutex);
531
532         pool = new Thread[poolSize];
533         foreach(ref poolThread; pool) {
534             poolThread = new Thread(&workLoop);
535             poolThread.start();
536         }
537     }
538
539     /**
540     Implements a parallel foreach loop over a range.  blockSize is the
541     number of elements to process in one work unit.
542
543     Examples:
544     ---
545     auto pool = new TaskPool();
546
547     uint[] squares = new uint[1_000];
548     foreach(i; pool.parallel( iota(squares.length), 100)) {
549         // Iterate over squares using work units of size 100.
550         squares[i] = i * i;
551     }
552
553     // Parallel foreach also works with ref parameters and index variables.
554     auto nums = [1, 2, 3, 4, 5];
555
556     foreach(index, num; parallel(nums, 1)) {
557         // Do something interesting.
558     }
559
560     ---
561
562     Notes:
563
564     Breaking from a parallel foreach loop breaks from the current work unit,
565     but still executes other work units.  A goto from inside the parallel
566     foreach loop to a label outside the loop will result in undefined
567     behavior.
568
569     In the case of non-random access ranges, parallel foreach is still usable
570     but buffers lazily to an array of size $(D blockSize) before executing
571     the parallel portion of the loop.  The exception is that, if a parallel
572     foreach is executed over an $(D AsyncBuf) or $(D LazyMap), the copying is
573     elided and the buffers are simply swapped.  However, note that in this case
574     the $(D blockSize) parameter of this function will be ignored and the
575     work unit size will be set to the block size of the $(D AsyncBuf) or
576     $(D LazyMap).
577      */
578     ParallelForeach!R parallel(R)(R range, size_t blockSize) {
579         alias ParallelForeach!R RetType;
580         return RetType(this, range, blockSize);
581     }
582
583     /**
584     Parallel foreach with default block size.  For ranges that don't have
585     a length, the default is 512 elements.  For ranges that do, the default
586     is whatever number would create exactly 4x as many work units as
587     we have worker threads.
588      */
589     ParallelForeach!R parallel(R)(R range) {
590         static if(hasLength!R) {
591             // Default block size is such that we would use 2x as many
592             // slots as are in this thread pool.
593             size_t blockSize = defaultBlockSize(range.length);
594             return parallel(range, blockSize);
595         } else {
596             // Just use a really, really dumb guess if the user is too lazy to
597             // specify.
598             return parallel(range, 512);
599         }
600     }
601
602     /**
603     Eager parallel map.  $(D functions) are the functions to be evaluated.
604     The first argument must be a random access range.  Immediately after the
605     range argument, an optional block size argument may be provided.  If none
606     is provided, the default block size is used.  An optional buffer may be
607     provided as the last argument.  If one is not provided, one will
608     be automatically allocated.  If one is provided, it must be the same
609     length as the range.
610
611     Examples:
612     ---
613     auto pool = new TaskPool();
614
615     real[] numbers = new real[1_000];
616     foreach(i, ref num; numbers) {
617         num = i;
618     }
619
620     // Find the squares of numbers[].
621     real[] squares = pool.map!"a * a"(numbers);
622
623     // Same thing, but make work units explicitly of size 100.
624     real[] squares = pool.map!"a * a"(numbers, 100);
625
626     // Same thing, but explicitly pre-allocate a buffer.
627     auto squares = new real[numbers.length];
628     pool.map!"a * a"(numbers, squares);
629
630     // Multiple functions, explicit buffer, and explicit block size.
631     auto results = new Tuple!(real, real)[numbers.length];
632     pool.map!("a * a", "-a")(numbers, 100, results);
633     ---
634      */
635     template map(functions...) {
636         ///
637         auto map(Args...)(Args args) {
638             static if(functions.length == 1) {
639                 alias unaryFun!(functions[0]) fun;
640             } else {
641                 alias adjoin!(staticMap!(unaryFun, functions)) fun;
642             }
643
644             static if(Args.length > 1 && isArray!(Args[$ - 1]) &&
645                 is(MapType!(Args[0], functions) : ElementType!(Args[$ - 1]))) {
646                 alias args[$ - 1] buf;
647                 alias args[0..$ - 1] args2;
648                 alias Args[0..$ - 1] Args2;
649             } else {
650                 MapType!(Args[0], functions)[] buf;
651                 alias args args2;
652                 alias Args Args2;;
653             }
654
655             static if(isIntegral!(Args2[$ - 1])) {
656                 static assert(args2.length == 2);
657                 alias args2[0] range;
658                 auto blockSize = cast(size_t) args2[1];
659             } else {
660                 static assert(args2.length == 1, Args);
661                 alias args2[0] range;
662                 auto blockSize = defaultBlockSize(range.length);
663             }
664
665             alias typeof(range) R;
666             immutable len = range.length;
667
668             if(buf.length == 0) {
669                 // Create buffer without initializing contents.
670                 alias MapType!(R, functions) MT;
671                 GC.BlkAttr gcFlags = (typeid(MT).flags & 1) ?
672                                       cast(GC.BlkAttr) 0 :
673                                       GC.BlkAttr.NO_SCAN;
674                 auto myPtr = cast(MT*) GC.malloc(len * MT.sizeof, gcFlags);
675                 buf = myPtr[0..len];
676             }
677             enforce(buf.length == len,
678                 text("Can't use a user supplied buffer that's the wrong size.  ",
679                 "(Expected  :", len, " Got:  ", buf.length));
680             if(blockSize > len) {
681                 blockSize = len;
682             }
683
684             // Handle as a special case:
685             if(size == 0) {
686                 size_t index = 0;
687                 foreach(elem; range) {
688                     buf[index++] = fun(elem);
689                 }
690                 return buf;
691             }
692
693             alias MapTask!(fun, R, typeof(buf)) MTask;
694             MTask[] tasks = (cast(MTask*) alloca(this.size * MTask.sizeof * 2))
695                             [0..this.size * 2];
696             tasks[] = MTask.init;
697
698             size_t curPos;
699             void useTask(ref MTask task) {
700                 task.lowerBound = curPos;
701                 task.upperBound = min(len, curPos + blockSize);
702                 task.range = range;
703                 task.results = buf;
704                 task.pool = this;
705                 curPos += blockSize;
706
707                 lock();
708                 atomicSetUbyte(task.taskStatus, TaskState.notStarted);
709                 abstractPutNoSync(cast(AbstractTask*) &task);
710                 unlock();
711             }
712
713             ubyte doneSubmitting = 0;
714
715             Task!(run, void delegate()) submitNextBatch;
716
717             void submitJobs() {
718                 // Search for slots, then sleep.
719                 foreach(ref task; tasks) if(task.done) {
720                     useTask(task);
721                     if(curPos >= len) {
722                         atomicSetUbyte(doneSubmitting, 1);
723                         return;
724                     }
725                 }
726
727                 // Now that we've submitted all the worker tasks, submit
728                 // the next submission task.  Synchronizing on the pool
729                 // to prevent the stealing thread from deleting the job
730                 // before it's submitted.
731                 lock();
732                 atomicSetUbyte(submitNextBatch.taskStatus, TaskState.notStarted);
733                 abstractPutNoSync( cast(AbstractTask*) &submitNextBatch);
734                 unlock();
735             }
736
737             submitNextBatch = .task(&submitJobs);
738
739             // The submitAndSteal mixin relies on the TaskPool instance
740             // being called pool.
741             TaskPool pool = this;
742
743             mixin(submitAndSteal);
744
745             return buf;
746         }
747     }
748
749     /**
750     A semi-lazy parallel map.  The map functions are evaluated for the first
751     $(D bufSize) elements and stored in a temporary buffer and made available
752     to $(D popFront).  Meanwhile, in the background a second buffer of the
753     same size is filled.  When the first buffer is exhausted, it is swapped
754     with the second buffer and filled while the values from what was originally
755     the second buffer are read.  This can be used for pipelining.
756
757     Parameters;
758
759     range:  The range to be mapped.  This must be an input range, though it
760     should preferably be a random access range to avoid needing to buffer
761     to temporary array before mapping.  If the $(D range) is not random access
762     it will be lazily buffered to an array of size bufSize before the map
763     function is evaluated.  (For an exception to this rule, see Notes.)
764
765     bufSize:  The size of the buffer to store the evaluated elements.
766
767     blockSize:  The number of elements to evaluate in a single thread.
768       Must be less than or equal to bufSize, and in practice should be a
769       fraction of bufSize such that all worker threads can be used.  If
770       the default of size_t.max is used, blockSize will be set to the
771       pool-wide default.
772
773     Notes:
774
775     If a $(D LazyMap) or an $(D AsyncBuf) is used as an input to lazyMap(),
776     then as an optimization the copying from the output buffer of the first
777     range to the input buffer of the second range is elided, even though
778     $(D LazyMap) and $(D AsyncBuf) are input ranges.  However, this means
779     that the $(D bufSize) parameter passed to the current call to $(D lazyMap())
780     will be ignored and the size of the buffer will be the buffer size of
781     $(D range).
782
783     Examples:
784     ---
785     // Pipeline reading a file, converting each line to a number, squaring
786     // the numbers, and performing the additions necessary to find the sum of
787     // the squares.
788
789     auto lineRange = File("numberList.txt").byLine();
790     auto dupedLines = std.algorithm.map!"a.idup"(lineRange);
791     auto nums = taskPool.lazyMap!(to!double)(dupedLines);
792     auto squares = taskPool.lazyMap!"a * a"(nums);
793
794     double sum = 0;
795     foreach(elem; squares) {
796         sum += elem;
797     }
798     ---
799     */
800     template lazyMap(functions...) {
801         LazyMap!(functions).LazyMap!(R)
802         lazyMap(R)(R range, size_t bufSize = 100, size_t blockSize = size_t.max)
803         if(isInputRange!R) {
804             enforce(blockSize == size_t.max || blockSize <= bufSize,
805                 "Work unit size must be smaller than buffer size.");
806
807             return new typeof(return)(range, bufSize, blockSize, this);
808         }
809     }
810
811     /**
812     Given an input range that is expensive to iterate over, returns an
813     $(D AsyncBuf) object that asynchronously buffers the contents of
814     $(D range) into a buffer of $(D bufSize) elements a background thread,
815     while making prevously buffered elements from a second buffer, also of size
816     $(D bufSize), available via the range interface of the $(D AsyncBuf)
817     object.  This is useful, for ecample, when performing expensive operations
818     on the elements of ranges that represent data on a disk or network.
819
820     Examples:
821     ---
822     auto lines = File("foo.txt").byLine();
823     auto duped = map!"a.idup"(lines);  // Necessary b/c byLine() recycles buffer
824
825     // Fetch more lines in the background while we do stuff with the lines that
826     // are currently available.
827     auto asyncReader = taskPool.asyncBuf(duped);
828     foreach(line; asyncReader) {
829         // Do something expensive with line.
830     }
831     ---
832     */
833     AsyncBuf!R asyncBuf(R)(R range, size_t bufSize = 100) {
834         return new AsyncBuf!R(range, bufSize, this);
835     }
836
837     /**
838     Parallel reduce.  The first argument must be the range to be reduced.
839     It must offer random access and have a length.  An explicit block size
840     may optionally be provided as the second argument.
841
842     Note:  Because this operation is being carried out in parallel,
843     fun must be associative.  For notational simplicity, let # be an
844     infix operator representing fun.  Then, (a # b) # c must equal
845     a # (b # c).  This is NOT the same thing as commutativity.  Matrix
846     multiplication, for example, is associative but not commutative.
847
848     Examples:
849     ---
850     // Find the max of an array in parallel.  Note that this is a toy example
851     // and unless the comparison function was very expensive, it would
852     // almost always be faster to do this in serial.
853
854     auto pool = new TaskPool();
855
856     auto myArr = somethingExpensiveToCompare();
857     auto myMax = pool.reduce!max(myArr);
858
859     // Find both the min and max.
860     auto minMax = pool.reduce!(min, max)(myArr);
861     assert(minMax.field[0] == reduce!min(myArr));
862     assert(minMax.field[1] == reduce!max(myArr));
863     ---
864      */
865     template reduce(functions...) {
866
867         ///
868         auto reduce(Args...)(Args args) {
869             alias reduceAdjoin!(functions) fun;
870             alias reduceFinish!(functions) finishFun;
871
872             static if(isIntegral!(Args[$ - 1])) {
873                 size_t blockSize = cast(size_t) args[$ - 1];
874                 alias args[0..$ - 1] args2;
875                 alias Args[0..$ - 1] Args2;
876             } else {
877                 alias args args2;
878                 alias Args Args2;
879             }
880
881             static auto makeStartValue(Type)(Type e) {
882                 static if(functions.length == 1) {
883                     return e;
884                 } else {
885                     typeof(adjoin!(staticMap!(binaryFun, functions))(e, e))
886                         startVal = void;
887                     foreach (i, T; startVal.Types) {
888                         auto p = (cast(void*) &startVal.field[i])
889                             [0 .. startVal.field[i].sizeof];
890                         emplace!T(p, e);
891                     }
892
893                     return startVal;
894                 }
895             }
896
897             static if(args2.length == 2) {
898                 static assert(isInputRange!(Args2[1]));
899                 alias args2[1] range;
900                 alias args2[0] startVal;
901                 size_t blockSize = defaultBlockSize(range.length);
902             } else {
903                 static assert(args2.length == 1);
904                 alias args2[0] range;
905                 size_t blockSize = defaultBlockSize(range.length);
906
907
908                 enforce(!range.empty,
909                     "Cannot reduce an empty range with first element as start value.");
910
911                 auto startVal = makeStartValue(range.front);
912                 range.popFront();
913             }
914
915             alias typeof(startVal) E;
916             alias typeof(range) R;
917
918             if(this.size == 0) {
919                 return std.algorithm.reduce!(fun)(startVal, range);
920             }
921
922             // Unlike the rest of the functions here, I can't use the Task object
923             // recycling trick here because this has to work on non-commutative
924             // operations.  After all the tasks are done executing, fun() has to
925             // be applied on the results of these to get a final result, but
926             // it can't be evaluated out of order.
927
928             immutable len = range.length;
929             if(blockSize > len) {
930                 blockSize = len;
931             }
932
933             immutable size_t nWorkUnits = (len / blockSize) +
934                 ((len % blockSize == 0) ? 0 : 1);
935             assert(nWorkUnits * blockSize >= len);
936
937             static E reduceOnRange
938             (E startVal, R range, size_t lowerBound, size_t upperBound) {
939                 E result = startVal;
940                 foreach(i; lowerBound..upperBound) {
941                     result = fun(result, range[i]);
942                 }
943                 return result;
944             }
945
946             alias Task!(reduceOnRange, E, R, size_t, size_t) RTask;
947             RTask[] tasks;
948
949             enum MAX_STACK = 512;
950             immutable size_t nBytesNeeded = nWorkUnits * RTask.sizeof;
951
952             if(nBytesNeeded < MAX_STACK) {
953                 tasks = (cast(RTask*) alloca(nBytesNeeded))[0..nWorkUnits];
954                 tasks[] = RTask.init;
955             } else {
956                 tasks = new RTask[nWorkUnits];
957             }
958
959             size_t curPos = 0;
960             void useTask(ref RTask task) {
961                 task.args[2] = curPos + 1; // lower bound.
962                 task.args[3] = min(len, curPos + blockSize);  // upper bound.
963                 task.args[1] = range;  // range
964                 task.args[0] = makeStartValue(range[curPos]);  // Start val.
965
966                 curPos += blockSize;
967                 put(task);
968             }
969
970             foreach(ref task; tasks) {
971                 useTask(task);
972             }
973
974             // Try to steal each of these.
975             foreach(ref task; tasks) {
976                 tryStealDelete( cast(AbstractTask*) &task);
977             }
978
979             // Now that we've tried to steal every task, they're all either done
980             // or in progress.  Wait on all of them.
981             E result = startVal;
982             foreach(ref task; tasks) {
983                 task.yieldWait();
984                 result = finishFun(result, task.returnVal);
985             }
986             return result;
987         }
988     }
989
990     /**
991     Gets the index of the current thread relative to this pool.  Any thread
992     not in this pool will receive an index of 0.  The worker threads in
993     this pool receive indices of 1 through poolSize.
994
995     The worker index is useful mainly for maintaining worker-local storage.
996
997     BUGS:  Subject to integer overflow errors if more than size_t.max threads
998            are ever created during the course of a program's execution.  This
999            will likely never be fixed because it's an extreme corner case
1000            on 32-bit and it's completely implausible on 64-bit.
1001      */
1002     size_t workerIndex() {
1003         immutable rawInd = threadIndex;
1004         return (rawInd >= instanceStartIndex &&
1005                 rawInd < instanceStartIndex + size) ?
1006                 (rawInd - instanceStartIndex + 1) : 0;
1007     }
1008
1009     /**
1010     Create an instance of worker-local storage, initialized with a given
1011     value.  The value is $(D lazy) so that you can, for example, easily
1012     create one instance of a class for each worker.
1013      */
1014     WorkerLocal!(T) createWorkerLocal(T)(lazy T initialVal = T.init) {
1015         WorkerLocal!(T) ret;
1016         ret.initialize(this);
1017         foreach(i; 0..size + 1) {
1018             ret[i] = initialVal;
1019         }
1020         synchronized {}  // Make sure updates are visible in all threads.
1021         return ret;
1022     }
1023
1024     /**
1025     Kills pool immediately w/o waiting for jobs to finish.  Use only if you
1026     have waitied on every job and therefore know there can't possibly be more
1027     in queue, or if you speculatively executed a bunch of stuff and realized
1028     you don't need those results anymore.
1029
1030     Note:  Does not affect jobs that are already executing, only those
1031     in queue.
1032      */
1033     void stop() @trusted {
1034         lock();
1035         scope(exit) unlock();
1036         atomicSetUbyte(status, PoolState.stopNow);
1037         notifyAll();
1038     }
1039
1040     /// Waits for all jobs to finish, then shuts down the pool.
1041     void join() @trusted {
1042         finish();
1043         foreach(t; pool) {
1044             t.join();
1045         }
1046     }
1047
1048     /**
1049     Instructs worker threads to stop when the queue becomes empty, but does
1050     not block.
1051      */
1052     void finish() @trusted {
1053         lock();
1054         scope(exit) unlock();
1055         atomicCasUbyte(status, PoolState.running, PoolState.finishing);
1056         notifyAll();
1057     }
1058
1059     /// Returns the number of worker threads in the pool.
1060     @property uint size() @safe const pure nothrow {
1061         // Not plausible to have billions of threads.
1062         assert(pool.length <= uint.max);
1063         return cast(uint) pool.length;
1064     }
1065
1066     // Kept public for backwards compatibility, but not documented.
1067     // Using ref parameters is a nicer API and is made safe because the
1068     // d'tor for Task waits until the task is finished before destroying the
1069     // stack frame.  This function will eventually be made private and/or
1070     // deprecated.
1071     void put(alias fun, Args...)(Task!(fun, Args)* task) {
1072         task.pool = this;
1073         abstractPut( cast(AbstractTask*) task);
1074     }
1075
1076     /**
1077     Put a task on the queue.
1078
1079     Note:  While this function takes the address of variables that may
1080     potentially be on the stack, it is safe marked as @trusted.  Task objects
1081     include a destructor that waits for the task to complete before destroying
1082     the stack frame that they are allocated on.  Therefore, it is impossible
1083     for the stack frame to be destroyed before the task is complete and out
1084     of the queue.
1085     */
1086     void put(alias fun, Args...)(ref Task!(fun, Args) task) @trusted {
1087         task.pool = this;
1088         abstractPut( cast(AbstractTask*) &task);
1089     }
1090
1091     /**
1092     Turns the pool's threads into daemon threads so that, when the main threads
1093     of the program exit, the threads in the pool are automatically terminated.
1094     */
1095     void makeDaemon() {
1096         lock();
1097         scope(exit) unlock();
1098         foreach(thread; pool) {
1099             thread.isDaemon = true;
1100         }
1101     }
1102
1103     /**
1104     Undoes a call to $(D makeDaemon).  Turns the threads in this pool into
1105     threads that will prevent a program from terminating even if the main
1106     thread has already terminated.
1107     */
1108     void makeAngel() {
1109         lock();
1110         scope(exit) unlock();
1111         foreach(thread; pool) {
1112             thread.isDaemon = false;
1113         }
1114     }
1115
1116     /**
1117     Convenience method that automatically creates a Task calling an alias on
1118     the GC heap and submits it to the pool.  See examples for the
1119     non-member function task().
1120
1121     Returns:  A pointer to the Task object.
1122      */
1123     Task!(fun, Args)* task(alias fun, Args...)(Args args) {
1124         auto stuff = .task!(fun)(args);
1125         auto ret = moveToHeap(stuff);
1126         put(ret);
1127         return ret;
1128     }
1129
1130     /**
1131     Convenience method that automatically creates a Task calling a delegate,
1132     function pointer, or functor on the GC heap and submits it to the pool.
1133     See examples for the non-member function task().
1134
1135     Returns:  A pointer to the Task object.
1136
1137     Note:  This function takes a non-scope delegate, meaning it can be
1138     used with closures.  If you can't allocate a closure due to objects
1139     on the stack that have scoped destruction, see the global function
1140     task(), which takes a scope delegate.
1141      */
1142      Task!(run, TypeTuple!(F, Args))*
1143      task(F, Args...)(F delegateOrFp, Args args)
1144      if(is(ReturnType!(F))) {
1145          auto stuff = .task(delegateOrFp, args);
1146          auto ptr = moveToHeap(stuff);
1147          put(ptr);
1148          return ptr;
1149      }
1150 }
1151
1152 /**
1153 Returns a lazily initialized default instantiation of $(D TaskPool).
1154 This function can safely be called concurrently from multiple non-worker
1155 threads.  One instance is shared across the entire program.
1156 */
1157  @property TaskPool taskPool() @trusted {
1158     static bool initialized;
1159     __gshared static TaskPool pool;
1160
1161     if(!initialized) {
1162         synchronized {
1163             if(!pool) {
1164                 pool = new TaskPool(defaultPoolThreads);
1165                 pool.makeDaemon();
1166             }
1167         }
1168
1169         initialized = true;
1170     }
1171
1172     return pool;
1173 }
1174
1175 private shared uint _defaultPoolThreads;
1176 shared static this() {
1177     cas(&_defaultPoolThreads, _defaultPoolThreads, core.cpuid.coresPerCPU - 1U);
1178 }
1179
1180 /**
1181 These functions get and set the number of threads in the default pool
1182 returned by $(D taskPool()).  If the setter is never called, the default value
1183 is the number of threads returned by $(D core.cpuid.coresPerCPU) - 1.  Any
1184 changes made via the setter after the default pool is initialized via the
1185 first call to $(D taskPool()) have no effect.
1186 */
1187 @property uint defaultPoolThreads() @trusted {
1188     // Kludge around lack of atomic load.
1189     return atomicOp!"+"(_defaultPoolThreads, 0U);
1190 }
1191
1192 /// Ditto
1193 @property void defaultPoolThreads(uint newVal) @trusted {
1194     cas(&_defaultPoolThreads, _defaultPoolThreads, newVal);
1195 }
1196
1197 /**
1198 Convenience functions that simply forwards to taskPool.parallel().
1199 */
1200 ParallelForeach!R parallel(R)(R range) {
1201     return taskPool.parallel(range);
1202 }
1203
1204 /// Ditto
1205 ParallelForeach!R parallel(R)(R range, size_t blockSize) {
1206     return taskPool.parallel(range, blockSize);
1207 }
1208
1209 /**
1210 Calls a delegate or function pointer with $(D args).  This is basically an
1211 adapter that makes Task work with delegates, function pointers and
1212 functors instead of just aliases.
1213  */
1214 ReturnType!(F) run(F, Args...)(F fpOrDelegate, ref Args args) {
1215     return fpOrDelegate(args);
1216 }
1217
1218 /**
1219 A struct that encapsulates the information about a task, including
1220 its current status, what pool it was submitted to, and its arguments.
1221
1222 Notes:  If a Task has been submitted to the pool, is being stored in a stack
1223 frame, and has not yet finished, the destructor for this struct will
1224 automatically call yieldWait() so that the task can finish and the
1225 stack frame can be destroyed safely.
1226
1227 Function results are returned from $(D yieldWait) and friends by ref.  If
1228 $(D fun) returns by ref, the reference will point directly to the return
1229 reference of $(D fun).  Otherwise it will point to a field in this struct.
1230
1231 Copying of this struct is disabled, since it would provide no useful semantics.
1232 If you want to pass this struct around, you should do it by reference or
1233 pointer.
1234
1235 Bugs:  Changes to $(D ref) and $(D out) arguments are not propagated to the
1236        call site, only to $(D args) in this struct.
1237
1238        Copying is not actually disabled yet due to compiler bugs.  In the
1239        mean time, please understand that if you copy this struct, you're
1240        relying on implementation bugs.
1241 */
1242 struct Task(alias fun, Args...) {
1243     // Work around syntactic ambiguity w.r.t. address of function return vals.
1244     private static T* addressOf(T)(ref T val) {
1245         return &val;
1246     }
1247
1248     private static void impl(void* myTask) {
1249         Task* myCastedTask = cast(typeof(this)*) myTask;
1250         static if(is(ReturnType == void)) {
1251             fun(myCastedTask._args);
1252         } else static if(is(typeof(addressOf(fun(myCastedTask._args))))) {
1253             myCastedTask.returnVal = addressOf(fun(myCastedTask._args));
1254         } else {
1255             myCastedTask.returnVal = fun(myCastedTask._args);
1256         }
1257     }
1258     mixin BaseMixin!(TaskState.notStarted) Base;
1259
1260     TaskPool pool;
1261
1262     Args _args;
1263
1264     /**
1265     The arguments the function was called with.  Changes to $(D out) and
1266     $(D ref) arguments will be reflected here when the function is done
1267     executing.
1268     */
1269     static if(__traits(isSame, fun, run)) {
1270         alias _args[1..$] args;
1271     } else {
1272         alias _args args;
1273     }
1274
1275     alias typeof(fun(_args)) ReturnType;
1276     static if(!is(ReturnType == void)) {
1277         static if(is(typeof(&fun(_args)))) {
1278             // Ref return.
1279             ReturnType* returnVal;
1280
1281             ref ReturnType fixRef(ReturnType* val) {
1282                 return *val;
1283             }
1284
1285         } else {
1286             ReturnType returnVal;
1287
1288             ref ReturnType fixRef(ref ReturnType val) {
1289                 return val;
1290             }
1291         }
1292     }
1293
1294     void enforcePool() {
1295         enforce(this.pool !is null, "Job not submitted yet.");
1296     }
1297
1298     private this(Args args) {
1299         static if(args.length > 0) {
1300             _args = args;
1301         }
1302     }
1303
1304     /**
1305     If the task isn't started yet, execute it in the current thread.
1306     If it's done, return its return value, if any.  If it's in progress,
1307     busy spin until it's done, then return the return value.
1308
1309     This function should be used when you expect the result of the
1310     task to be available relatively quickly, on a timescale shorter
1311     than that of an OS context switch.
1312      */
1313     @property ref ReturnType spinWait() @trusted {
1314         enforcePool();
1315
1316         this.pool.tryStealDelete( cast(AbstractTask*) &this);
1317
1318         while(atomicReadUbyte(this.taskStatus) != TaskState.done) {}
1319
1320         if(exception) {
1321             throw exception;
1322         }
1323
1324         static if(!is(ReturnType == void)) {
1325             return fixRef(this.returnVal);
1326         }
1327     }
1328
1329     /**
1330     If the task isn't started yet, execute it in the current thread.
1331     If it's done, return its return value, if any.  If it's in progress,
1332     wait on a condition variable.
1333
1334     This function should be used when you expect the result of the
1335     task to take a while, as waiting on a condition variable
1336     introduces latency, but results in negligible wasted CPU cycles.
1337      */
1338     @property ref ReturnType yieldWait() @trusted {
1339         enforcePool();
1340         this.pool.tryStealDelete( cast(AbstractTask*) &this);
1341         if(atomicReadUbyte(this.taskStatus) == TaskState.done) {
1342
1343             static if(is(ReturnType == void)) {
1344                 return;
1345             } else {
1346                 return fixRef(this.returnVal);
1347             }
1348         }
1349
1350         pool.lock();
1351         scope(exit) pool.unlock();
1352
1353         while(atomicReadUbyte(this.taskStatus) != TaskState.done) {
1354             pool.waitUntilCompletion();
1355         }
1356
1357         if(exception) {
1358             throw exception;
1359         }
1360
1361         static if(!is(ReturnType == void)) {
1362             return fixRef(this.returnVal);
1363         }
1364     }
1365
1366     /**
1367     If this task is not started yet, execute it in the current
1368     thread.  If it is finished, return its result.  If it is in progress,
1369     execute any other available tasks from the task pool until this one
1370     is finished.  If no other tasks are available, yield wait.
1371      */
1372     @property ref ReturnType workWait() @trusted {
1373         enforcePool();
1374         this.pool.tryStealDelete( cast(AbstractTask*) &this);
1375
1376         while(true) {
1377             if(done) {
1378                 static if(is(ReturnType == void)) {
1379                     return;
1380                 } else {
1381                     return fixRef(this.returnVal);
1382                 }
1383             }
1384
1385             pool.lock();
1386             AbstractTask* job;
1387             try {
1388                 // Locking explicitly and calling popNoSync() because
1389                 // pop() waits on a condition variable if there are no jobs
1390                 // in the queue.
1391                 job = pool.popNoSync();
1392             } finally {
1393                 pool.unlock();
1394             }
1395
1396             if(job !is null) {
1397
1398                 version(verboseUnittest) {
1399                     stderr.writeln("Doing workWait work.");
1400                 }
1401
1402                 pool.doJob(job);
1403
1404                 if(done) {
1405                     static if(is(ReturnType == void)) {
1406                         return;
1407                     } else {
1408                         return fixRef(this.returnVal);
1409                     }
1410                 }
1411             } else {
1412                 version(verboseUnittest) {
1413                     stderr.writeln("Yield from workWait.");
1414                 }
1415
1416                 return yieldWait();
1417             }
1418         }
1419     }
1420
1421     ///
1422     @property bool done() @trusted {
1423         // Explicitly forwarded for documentation purposes.
1424         return Base.done();
1425     }
1426
1427     @safe ~this() {
1428         if(pool !is null && taskStatus != TaskState.done) {
1429             yieldWait();
1430         }
1431     }
1432
1433     // When this is uncommented, it somehow gets called even though it's
1434     // disabled and Bad Things Happen.
1435     //@disable this(this) { assert(0);}
1436 }
1437
1438 /**
1439 Creates a task that calls an alias.
1440
1441 Examples:
1442 ---
1443 auto pool = new TaskPool();
1444 uint[] foo = [1, 2, 3, 4, 5];
1445
1446 // Create a task to sum this array in the background.
1447 auto myTask = task!( reduce!"a + b" )(foo);
1448 pool.put(myTask);
1449
1450 // Do other stuff.
1451
1452 // Get value.  Execute it in the current thread if it hasn't been started by a
1453 // worker thread yet.
1454 writeln("Sum = ", myFuture.spinWait());
1455 ---
1456
1457 Note:
1458 This method of creating tasks allocates on the stack and requires an explicit
1459 submission to the pool.  It is designed for tasks that are to finish before
1460 the function in which they are created returns.  If you want to escape the
1461 Task object from the function in which it was created or prefer to heap
1462 allocate and automatically submit to the pool, see $(D TaskPool.task()).
1463  */
1464 Task!(fun, Args) task(alias fun, Args...)(Args args) {
1465     alias Task!(fun, Args) RetType;
1466     return RetType(args);
1467 }
1468
1469 /**
1470 Create a task that calls a function pointer, delegate, or functor.
1471 This works for anonymous delegates.
1472
1473 Examples:
1474 ---
1475 auto pool = new TaskPool();
1476 auto myTask = task({
1477     stderr.writeln("I've completed a task.");
1478 });
1479 pool.put(myTask);
1480
1481 // Do other stuff.
1482
1483 myTask.yieldWait();
1484 ---
1485
1486 Notes:
1487 This method of creating tasks allocates on the stack and requires an explicit
1488 submission to the pool.  It is designed for tasks that are to finish before
1489 the function in which they are created returns.  If you want to escape the
1490 Task object from the function in which it was created or prefer to heap
1491 allocate and automatically submit to the pool, see $(D TaskPool.task()).
1492
1493 In the case of delegates, this function takes a $(D scope) delegate to prevent
1494 the allocation of closures, since its intended use is for tasks that will
1495 be finished before the function in which they're created returns.
1496 pool.task() takes a non-scope delegate and will allow the use of closures.
1497  */
1498 Task!(run, TypeTuple!(F, Args))
1499 task(F, Args...)(scope F delegateOrFp, Args args)
1500 if(is(typeof(delegateOrFp(args))) && !isSafeTask!F) {
1501     alias typeof(return) RT;
1502     return RT(delegateOrFp, args);
1503 }
1504
1505 /**
1506 Safe version of Task, usable from $(D @safe) code.  This has the following
1507 restrictions:
1508
1509 1.  $(D F) must not have any unshared aliasing.  Basically, this means that it
1510     may not be an unshared delegate.  This also precludes accepting template
1511     alias paramters.
1512
1513 2.  $(D Args) must not have unshared aliasing.
1514
1515 3.  The return type must not have unshared aliasing unless $(D fun) is pure.
1516
1517 4.  $(D fun) must return by value, not by reference.
1518 */
1519 @trusted Task!(run, TypeTuple!(F, Args))
1520 task(F, Args...)(F fun, Args args)
1521 if(is(typeof(fun(args))) && isSafeTask!F) {
1522     alias typeof(return) RT;
1523     return RT(fun, args);
1524 }
1525
1526 private struct ParallelForeachTask(R, Delegate)
1527 if(isRandomAccessRange!R && hasLength!R) {
1528     enum withIndex = ParameterTypeTuple!(Delegate).length == 2;
1529
1530     static void impl(void* myTask) {
1531         auto myCastedTask = cast(ParallelForeachTask!(R, Delegate)*) myTask;
1532         foreach(i; myCastedTask.lowerBound..myCastedTask.upperBound) {
1533
1534             static if(hasLvalueElements!R) {
1535                 static if(withIndex) {
1536                     if(myCastedTask.runMe(i, myCastedTask.myRange[i])) break;
1537                 } else {
1538                     if(myCastedTask.runMe( myCastedTask.myRange[i])) break;
1539                 }
1540             } else {
1541                 auto valToPass = myCastedTask.myRange[i];
1542                 static if(withIndex) {
1543                     if(myCastedTask.runMe(i, valToPass)) break;
1544                 } else {
1545                     if(myCastedTask.runMe(valToPass)) break;
1546                 }
1547             }
1548         }
1549
1550         // Allow some memory reclamation.
1551         myCastedTask.myRange = R.init;
1552         myCastedTask.runMe = null;
1553     }
1554
1555     mixin BaseMixin!(TaskState.done);
1556
1557     TaskPool pool;
1558
1559     // More specific stuff.
1560     size_t lowerBound;
1561     size_t upperBound;
1562     R myRange;
1563     Delegate runMe;
1564
1565     void wait() {
1566         if(pool is null) {
1567             // Never submitted.  No need to wait.
1568             return;
1569         }
1570
1571         pool.lock();
1572         scope(exit) pool.unlock();
1573
1574         // No work stealing here b/c the function that waits on this task
1575         // wants to recycle it as soon as it finishes.
1576         while(!done()) {
1577             pool.waitUntilCompletion();
1578         }
1579
1580         if(exception) {
1581             throw exception;
1582         }
1583     }
1584 }
1585
1586 private struct ParallelForeachTask(R, Delegate)
1587 if(!isRandomAccessRange!R || !hasLength!R) {
1588     enum withIndex = ParameterTypeTuple!(Delegate).length == 2;
1589
1590     static void impl(void* myTask) {
1591         auto myCastedTask = cast(ParallelForeachTask!(R, Delegate)*) myTask;
1592
1593         static ref ElementType!(R) getElement(T)(ref T elemOrPtr) {
1594             static if(is(typeof(*elemOrPtr) == ElementType!R)) {
1595                 return *elemOrPtr;
1596             } else {
1597                 return elemOrPtr;
1598             }
1599         }
1600
1601         foreach(i, element; myCastedTask.elements) {
1602             static if(withIndex) {
1603                 size_t lValueIndex = i + myCastedTask.startIndex;
1604                 if(myCastedTask.runMe(lValueIndex, getElement(element))) break;
1605             } else {
1606                 if(myCastedTask.runMe(getElement(element))) break;
1607             }
1608         }
1609
1610         // Make memory easier to reclaim.
1611         myCastedTask.runMe = null;
1612     }
1613
1614     mixin BaseMixin!(TaskState.done);
1615
1616     TaskPool pool;
1617
1618     // More specific stuff.
1619     alias ElementType!R E;
1620     Delegate runMe;
1621
1622     static if(hasLvalueElements!(R)) {
1623         E*[] elements;
1624     } else {
1625         E[] elements;
1626     }
1627     size_t startIndex;
1628
1629     void wait() {
1630         if(pool is null) {
1631             // Never submitted.  No need to wait.
1632             return;
1633         }
1634
1635         pool.lock();
1636         scope(exit) pool.unlock();
1637
1638         // No work stealing here b/c the function that waits on this task
1639         // wants to recycle it as soon as it finishes.
1640
1641         while(!done()) {
1642             pool.waitUntilCompletion();
1643         }
1644
1645         if(exception) {
1646             throw exception;
1647         }
1648     }
1649 }
1650
1651 private struct MapTask(alias fun, R, ReturnType)
1652 if(isRandomAccessRange!R && hasLength!R) {
1653     static void impl(void* myTask) {
1654         auto myCastedTask = cast(MapTask!(fun, R, ReturnType)*) myTask;
1655
1656         foreach(i; myCastedTask.lowerBound..myCastedTask.upperBound) {
1657             myCastedTask.results[i] = uFun(myCastedTask.range[i]);
1658         }
1659
1660         // Nullify stuff, make GC's life easier.
1661         myCastedTask.results = null;
1662         myCastedTask.range = R.init;
1663     }
1664
1665     mixin BaseMixin!(TaskState.done);
1666
1667     TaskPool pool;
1668
1669     // More specific stuff.
1670     alias unaryFun!fun uFun;
1671     R range;
1672     alias ElementType!R E;
1673     ReturnType results;
1674     size_t lowerBound;
1675     size_t upperBound;
1676
1677     void wait() {
1678         if(pool is null) {
1679             // Never submitted.  No need to wait on it.
1680             return;
1681         }
1682
1683         pool.lock();
1684         scope(exit) pool.unlock();
1685
1686         // Again, no work stealing.
1687
1688         while(!done()) {
1689             pool.waitUntilCompletion();
1690         }
1691
1692         if(exception) {
1693             throw exception;
1694         }
1695     }
1696 }
1697
1698 ///
1699 template LazyMap(functions...) {
1700     static if(functions.length == 1) {
1701         alias unaryFun!(functions[0]) fun;
1702     } else {
1703          alias adjoin!(staticMap!(unaryFun, functions)) fun;
1704     }
1705
1706     /**
1707     Maps a function onto a range in a semi-lazy fashion, using a finite
1708     buffer.  $(D LazyMap) is a forward range.  For usage details
1709     see $(D TaskPool.lazyMap).
1710     */
1711     final class LazyMap(R)
1712     if(isInputRange!R) {
1713
1714         // This is a class because the task needs to be located on the heap
1715         // and in the non-random access case the range needs to be on the
1716         // heap, too.
1717
1718     private:
1719         alias MapType!(R, functions) E;
1720         E[] buf1, buf2;
1721         R range;
1722         TaskPool pool;
1723         Task!(run, E[] delegate(E[]), E[]) nextBufTask;
1724         size_t blockSize;
1725         size_t bufPos;
1726         bool lastTaskWaited;
1727
1728         static if(isRandomAccessRange!R) {
1729             alias R FromType;
1730
1731             void popRange() {
1732                 static if(__traits(compiles, range[0..range.length])) {
1733                     range = range[min(buf1.length, range.length)..range.length];
1734                 } else static if(__traits(compiles, range[0..$])) {
1735                     range = range[min(buf1.length, range.length)..$];
1736                 } else {
1737                     static assert(0, "R must have slicing for LazyMap."
1738                         ~ "  " ~ R.stringof ~ " doesn't.");
1739                 }
1740             }
1741
1742         } else static if(is(typeof(range.buf1)) && is(typeof(range.bufPos)) &&
1743           is(typeof(range.doBufSwap()))) {
1744
1745             version(unittest) {
1746                 pragma(msg, "LazyRange Special Case:  "
1747                     ~ typeof(range).stringof);
1748             }
1749
1750             alias typeof(range.buf1) FromType;
1751             FromType from;
1752
1753             // Just swap our input buffer with range's output buffer and get
1754             // range mapping again.  No need to copy element by element.
1755             FromType dumpToFrom() {
1756                 assert(range.buf1.length <= from.length);
1757                 from.length = range.buf1.length;
1758                 swap(range.buf1, from);
1759                 range._length -= (from.length - range.bufPos);
1760                 range.doBufSwap();
1761
1762                 return from;
1763             }
1764
1765         } else {
1766             alias ElementType!(R)[] FromType;
1767
1768             // The temporary array that data is copied to before being
1769             // mapped.
1770             FromType from;
1771
1772             FromType dumpToFrom() {
1773                 assert(from !is null);
1774
1775                 size_t i;
1776                 for(; !range.empty && i < from.length; range.popFront()) {
1777                     from[i++] = range.front;
1778                 }
1779
1780                 from = from[0..i];
1781                 return from;
1782             }
1783         }
1784
1785         static if(hasLength!R) {
1786             size_t _length;
1787
1788             /// Available if hasLength!(R).
1789             public @property size_t length() const pure nothrow @safe {
1790                 return _length;
1791             }
1792         }
1793
1794         this(R range, size_t bufSize, size_t blockSize, TaskPool pool) {
1795             static if(is(typeof(range.bufSize)) &&
1796             is(typeof(range.doBufSwap()))) {
1797                 bufSize = range.bufSize;
1798             }
1799
1800             buf1.length = bufSize;
1801             buf2.length = bufSize;
1802
1803             static if(!isRandomAccessRange!R) {
1804                 from.length = bufSize;
1805             }
1806
1807             this.blockSize = (blockSize == size_t.max) ?
1808                     pool.defaultBlockSize(bufSize) : blockSize;
1809             this.range = range;
1810             this.pool = pool;
1811
1812             static if(hasLength!R) {
1813                 _length = range.length;
1814             }
1815
1816             fillBuf(buf1);
1817             submitBuf2();
1818         }
1819
1820         // The from parameter is a dummy and ignored in the random access
1821         // case.
1822         E[] fillBuf(E[] buf) {
1823             static if(isRandomAccessRange!R) {
1824                 auto toMap = take(range, buf.length);
1825                 scope(success) popRange();
1826             } else {
1827                 auto toMap = dumpToFrom();
1828             }
1829
1830             buf = buf[0..min(buf.length, toMap.length)];
1831             pool.map!(functions)(
1832                     toMap,
1833                     blockSize,
1834                     buf
1835                 );
1836
1837             return buf;
1838         }
1839
1840         void submitBuf2() {
1841             // Hack to reuse the task object.
1842
1843             nextBufTask = typeof(nextBufTask).init;
1844             nextBufTask._args[0] = &fillBuf;
1845             nextBufTask._args[1] = buf2;
1846             pool.put(nextBufTask);
1847         }
1848
1849         void doBufSwap() {
1850             if(lastTaskWaited) {
1851                 // Then the range is empty.  Signal it here.
1852                 buf1 = null;
1853                 buf2 = null;
1854
1855                 static if(!isRandomAccessRange!R) {
1856                     from = null;
1857                 }
1858
1859                 return;
1860             }
1861
1862             buf2 = buf1;
1863             buf1 = nextBufTask.yieldWait();
1864             bufPos = 0;
1865
1866             if(range.empty) {
1867                 lastTaskWaited = true;
1868             } else {
1869                 submitBuf2();
1870             }
1871         }
1872
1873     public:
1874         ///
1875         MapType!(R, functions) front() @property {
1876             return buf1[bufPos];
1877         }
1878
1879         ///
1880         void popFront() {
1881             static if(hasLength!R) {
1882                 _length--;
1883             }
1884
1885             bufPos++;
1886             if(bufPos >= buf1.length) {
1887                 doBufSwap();
1888             }
1889         }
1890
1891         static if(std.range.isInfinite!R) {
1892             enum bool empty = false;
1893         } else {
1894
1895             ///
1896             bool empty() @property {
1897                 return buf1 is null;  // popFront() sets this when range is empty
1898             }
1899         }
1900     }
1901 }
1902
1903 /**
1904 Asynchronously buffers an expensive-to-iterate range using a background thread
1905 from a task pool.  For details see TaskPool.asyncBuf.
1906 */
1907 final class AsyncBuf(R) if(isInputRange!R) {
1908     // This is a class because the task and the range both need to be on the
1909     // heap.
1910
1911     /// The element type of R.
1912     public alias ElementType!R E;  // Needs to be here b/c of forward ref bugs.
1913
1914 private:
1915     E[] buf1, buf2;
1916     R range;
1917     TaskPool pool;
1918     Task!(run, E[] delegate(E[]), E[]) nextBufTask;
1919     size_t bufPos;
1920     bool lastTaskWaited;
1921
1922     static if(hasLength!R) {
1923         size_t _length;
1924
1925         /// Available if hasLength!(R).
1926         public @property size_t length() const pure nothrow @safe {
1927             return _length;
1928         }
1929     }
1930
1931     this(R range, size_t bufSize, TaskPool pool) {
1932         buf1.length = bufSize;
1933         buf2.length = bufSize;
1934
1935         this.range = range;
1936         this.pool = pool;
1937
1938         static if(hasLength!R) {
1939             _length = range.length;
1940         }
1941
1942         fillBuf(buf1);
1943         submitBuf2();
1944     }
1945
1946     // The from parameter is a dummy and ignored in the random access
1947     // case.
1948     E[] fillBuf(E[] buf) {
1949         assert(buf !is null);
1950
1951         size_t i;
1952         for(; !range.empty && i < buf.length; range.popFront()) {
1953             buf[i++] = range.front;
1954         }
1955
1956         buf = buf[0..i];
1957         return buf;
1958     }
1959
1960     void submitBuf2() {
1961         // Hack to reuse the task object.
1962
1963         nextBufTask = typeof(nextBufTask).init;
1964         nextBufTask._args[0] = &fillBuf;
1965         nextBufTask._args[1] = buf2;
1966         pool.put(nextBufTask);
1967     }
1968
1969     void doBufSwap() {
1970         if(lastTaskWaited) {
1971             // Then the range is empty.  Signal it here.
1972             buf1 = null;
1973             buf2 = null;
1974             return;
1975         }
1976
1977         buf2 = buf1;
1978         buf1 = nextBufTask.yieldWait();
1979         bufPos = 0;
1980
1981         if(range.empty) {
1982             lastTaskWaited = true;
1983         } else {
1984             submitBuf2();
1985         }
1986     }
1987
1988 public:
1989
1990     ///
1991     E front() @property {
1992         return buf1[bufPos];
1993     }
1994
1995     ///
1996     void popFront() {
1997         static if(hasLength!R) {
1998             _length--;
1999         }
2000
2001         bufPos++;
2002         if(bufPos >= buf1.length) {
2003             doBufSwap();
2004         }
2005     }
2006
2007     static if(std.range.isInfinite!R) {
2008         enum bool empty = false;
2009     } else {
2010
2011         ///
2012         bool empty() @property {
2013             return buf1 is null;  // popFront() sets this when range is empty
2014         }
2015     }
2016 }
2017
2018 /**
2019 Struct for creating worker-local storage.  Worker-local storage is basically
2020 thread-local storage that exists only for workers in a given pool, is
2021 allocated on the heap in a way that avoids false sharing,
2022 and doesn't necessarily have global scope within any
2023 thread.  It can be accessed from any worker thread in the pool that created
2024 it, and one thread outside this pool.  All threads outside the pool that created
2025 a given instance of worker-local storage share a single slot.
2026
2027 Since the underlying data for this struct is heap-allocated, this struct
2028 has reference semantics when passed around.
2029
2030 At a more concrete level, the main uses case for $(D WorkerLocal) are:
2031
2032 1.  Performing parallel reductions with an imperative, as opposed to functional,
2033 programming style.  In this case, it's useful to treat WorkerLocal as local
2034 to each thread for only the parallel portion of an algorithm.
2035
2036 2.  Recycling temporary buffers across iterations of a parallel foreach loop.
2037
2038 Examples:
2039 ---
2040 auto pool = new TaskPool;
2041 auto sumParts = pool.createWorkerLocal!(uint)();
2042 foreach(i; pool.parallel(iota(someLargeNumber))) {
2043     // Do complicated stuff.
2044     sumParts.get += resultOfComplicatedStuff;
2045 }
2046
2047 writeln("Sum = ", reduce!"a + b"(sumParts.toRange));
2048 ---
2049  */
2050 struct WorkerLocal(T) {
2051 private:
2052     TaskPool pool;
2053     size_t size;
2054
2055     static immutable size_t cacheLineSize;
2056     size_t elemSize;
2057     bool* stillThreadLocal;
2058
2059     shared static this() {
2060         size_t lineSize = 0;
2061         foreach(cachelevel; datacache) {
2062             if(cachelevel.lineSize > lineSize && cachelevel.lineSize < uint.max) {
2063                 lineSize = cachelevel.lineSize;
2064             }
2065         }
2066
2067         cacheLineSize = lineSize;
2068     }
2069
2070     static size_t roundToLine(size_t num) pure nothrow {
2071         if(num % cacheLineSize == 0) {
2072             return num;
2073         } else {
2074             return ((num / cacheLineSize) + 1) * cacheLineSize;
2075         }
2076     }
2077
2078     void* data;
2079
2080     void initialize(TaskPool pool) {
2081         this.pool = pool;
2082         size = pool.size + 1;
2083         stillThreadLocal = new bool;
2084         *stillThreadLocal = true;
2085
2086         // Determines whether the GC should scan the array.
2087         auto blkInfo = (typeid(T).flags & 1) ?
2088                        cast(GC.BlkAttr) 0 :
2089                        GC.BlkAttr.NO_SCAN;
2090
2091         immutable nElem = pool.size + 1;
2092         elemSize = roundToLine(T.sizeof);
2093
2094         // The + 3 is to pad one full cache line worth of space on either side
2095         // of the data structure to make sure false sharing with completely
2096         // unrelated heap data is prevented, and to provide enough padding to
2097         // make sure that data is cache line-aligned.
2098         data = GC.malloc(elemSize * (nElem + 3), blkInfo) + elemSize;
2099
2100         // Cache line align data ptr.
2101         data = cast(void*) roundToLine(cast(size_t) data);
2102
2103         foreach(i; 0..nElem) {
2104             this.opIndex(i) = T.init;
2105         }
2106     }
2107
2108     ref T opIndex(size_t index) {
2109         assert(index < size, text(index, '\t', uint.max));
2110         return *(cast(T*) (data + elemSize * index));
2111     }
2112
2113     void opIndexAssign(T val, size_t index) {
2114         assert(index < size);
2115         *(cast(T*) (data + elemSize * index)) = val;
2116     }
2117
2118 public:
2119     /**
2120     Get the current thread's instance.  Returns by reference even though
2121     ddoc refuses to say so.  Note that calling $(D get()) from any thread
2122     outside the pool that created this instance will return the
2123     same reference, so an instance of worker-local storage should only be
2124     accessed from one thread outside the pool that created it.  If this
2125     rule is violated, undefined behavior will result.
2126
2127     If assertions are enabled and $(D toRange()) has been called, then this
2128     WorkerLocal instance is no longer worker-local and an assertion
2129     failure will result when calling this method.  This is not checked
2130     when assertions are disabled for performance reasons.
2131      */
2132     ref T get() @property {
2133         assert(*stillThreadLocal,
2134                "Cannot call get() on this instance of WorkerLocal because it" ~
2135                " is no longer worker-local."
2136         );
2137         return opIndex(pool.workerIndex);
2138     }
2139
2140     /**
2141     Assign a value to the current thread's instance.  This function has
2142     the same caveats as its overload.
2143     */
2144     void get(T val) @property {
2145         assert(*stillThreadLocal,
2146                "Cannot call get() on this instance of WorkerLocal because it" ~
2147                " is no longer worker-local."
2148         );
2149
2150         opIndexAssign(val, pool.workerIndex);
2151     }
2152
2153     /**
2154     Returns a range view of the values for all threads, which can be used
2155     to do stuff with the results of each thread after running the parallel
2156     part of your algorithm.  Do NOT use this method in the parallel portion
2157     of your algorithm.
2158
2159     Calling this function will also set a flag indicating
2160     that this struct is no longer thread-local, and attempting to use the
2161     get() method again will result in an assertion failure if assertions
2162     are enabled.
2163      */
2164     WorkerLocalRange!T toRange() @property {
2165         if(*stillThreadLocal) {
2166             *stillThreadLocal = false;
2167
2168             // Make absolutely sure results are visible to all threads.
2169             synchronized {}
2170         }
2171
2172        return WorkerLocalRange!(T)(this);
2173     }
2174 }
2175
2176 /**
2177 Range primitives for worker-local storage.  The purpose of this is to
2178 access results produced by each worker thread from a single thread once you
2179 are no longer using the worker-local storage from multiple threads.
2180 Do NOT use this struct in the parallel portion of your algorithm.
2181  */
2182 struct WorkerLocalRange(T) {
2183 private:
2184     WorkerLocal!T workerLocal;
2185
2186     size_t _length;
2187     size_t beginOffset;
2188
2189     this(WorkerLocal!(T) wl) {
2190         this.workerLocal = wl;
2191         _length = wl.size;
2192     }
2193
2194 public:
2195     ///
2196     ref T front() @property {
2197         return this[0];
2198     }
2199
2200     ///
2201     ref T back() @property {
2202         return this[_length - 1];
2203     }
2204
2205     ///
2206     void popFront() {
2207         if(_length > 0) {
2208             beginOffset++;
2209             _length--;
2210         }
2211     }
2212
2213     ///
2214     void popBack() {
2215         if(_length > 0) {
2216             _length--;
2217         }
2218     }
2219
2220     ///
2221     typeof(this) save() @property {
2222         return this;
2223     }
2224
2225     ///
2226     ref T opIndex(size_t index) {
2227         assert(index < _length);
2228         return workerLocal[index + beginOffset];
2229     }
2230
2231     ///
2232     void opIndexAssign(T val, size_t index) {
2233         assert(index < _length);
2234         workerLocal[index] = val;
2235     }
2236
2237     ///
2238     typeof(this) opSlice(size_t lower, size_t upper) {
2239         assert(upper <= _length);
2240         auto newWl = this.workerLocal;
2241         newWl.data += lower * newWl.elemSize;
2242         newWl.size = upper - lower;
2243         return typeof(this)(newWl);
2244     }
2245
2246     ///
2247     bool empty() @property {
2248         return length == 0;
2249     }
2250
2251     ///
2252     size_t length() @property {
2253         return _length;
2254     }
2255 }
2256
2257 // Where the magic happens.  This mixin causes tasks to be submitted lazily to
2258 // the task pool.  Attempts are then made by the calling thread to steal
2259 // them.
2260 enum submitAndSteal = q{
2261
2262     // See documentation for BaseMixin.shouldSetDone.
2263     submitNextBatch.shouldSetDone = false;
2264
2265     // Submit first batch from this thread.
2266     submitJobs();
2267
2268     while( !atomicReadUbyte(doneSubmitting) ) {
2269         // Try to steal parallel foreach tasks.
2270         foreach(ref task; tasks) {
2271             pool.tryStealDelete( cast(AbstractTask*) &task);
2272         }
2273
2274         // All tasks in progress or done unless next
2275         // submission task started running.  Try to steal the submission task.
2276         pool.tryStealDelete(cast(AbstractTask*) &submitNextBatch);
2277     }
2278
2279     // Steal one last time, after they're all submitted.
2280     foreach(ref task; tasks) {
2281         pool.tryStealDelete( cast(AbstractTask*) &task);
2282     }
2283
2284
2285     foreach(ref task; tasks) {
2286         task.wait();
2287     }
2288 };
2289
2290 /*------Structs that implement opApply for parallel foreach.------------------*/
2291 template randLen(R) {
2292     enum randLen = isRandomAccessRange!R && hasLength!R;
2293 }
2294
2295 private enum string parallelApplyMixin = q{
2296     alias ParallelForeachTask!(R, typeof(dg)) PTask;
2297
2298     // Handle empty thread pool as special case.
2299     if(pool.size == 0) {
2300         int res = 0;
2301         size_t index = 0;
2302
2303         // The explicit ElementType!R in the foreach loops is necessary for
2304         // correct behavior when iterating over strings.
2305         static if(hasLvalueElements!(R)) {
2306             foreach(ref ElementType!R elem; range) {
2307                 static if(ParameterTypeTuple!(dg).length == 2) {
2308                     res = dg(index, elem);
2309                 } else {
2310                     res = dg(elem);
2311                 }
2312                 index++;
2313             }
2314         } else {
2315             foreach(ElementType!R elem; range) {
2316                 static if(ParameterTypeTuple!(dg).length == 2) {
2317                     res = dg(index, elem);
2318                 } else {
2319                     res = dg(elem);
2320                 }
2321                 index++;
2322             }
2323         }
2324         return res;
2325     }
2326
2327     PTask[] tasks = (cast(PTask*) alloca(pool.size * PTask.sizeof * 2))
2328                     [0..pool.size * 2];
2329     tasks[] = PTask.init;
2330     Task!(run, void delegate()) submitNextBatch;
2331
2332     static if(is(typeof(range.buf1)) && is(typeof(range.bufPos)) &&
2333     is(typeof(range.doBufSwap()))) {
2334         enum bool bufferTrick = true;
2335
2336         version(unittest) {
2337             pragma(msg, "Parallel Foreach Buffer Trick:  " ~ R.stringof);
2338         }
2339
2340     } else {
2341         enum bool bufferTrick = false;
2342     }
2343
2344
2345     static if(randLen!R) {
2346
2347         immutable size_t len = range.length;
2348         size_t curPos = 0;
2349
2350         void useTask(ref PTask task) {
2351             task.lowerBound = curPos;
2352             task.upperBound = min(len, curPos + blockSize);
2353             task.myRange = range;
2354             task.runMe = dg;
2355             task.pool = pool;
2356             curPos += blockSize;
2357
2358             pool.lock();
2359             atomicSetUbyte(task.taskStatus, TaskState.notStarted);
2360             pool.abstractPutNoSync(cast(AbstractTask*) &task);
2361             pool.unlock();
2362         }
2363
2364         void submitJobs() {
2365             // Search for slots to recycle.
2366             foreach(ref task; tasks) if(task.done) {
2367                 useTask(task);
2368                 if(curPos >= len) {
2369                     atomicSetUbyte(doneSubmitting, 1);
2370                     return;
2371                 }
2372             }
2373
2374             // Now that we've submitted all the worker tasks, submit
2375             // the next submission task.  Synchronizing on the pool
2376             // to prevent the stealing thread from deleting the job
2377             // before it's submitted.
2378             pool.lock();
2379             atomicSetUbyte(submitNextBatch.taskStatus, TaskState.notStarted);
2380             pool.abstractPutNoSync( cast(AbstractTask*) &submitNextBatch);
2381             pool.unlock();
2382         }
2383
2384     } else {
2385
2386         static if(bufferTrick) {
2387             blockSize = range.buf1.length;
2388         }
2389
2390         void useTask(ref PTask task) {
2391             task.runMe = dg;
2392             task.pool = pool;
2393
2394             static if(bufferTrick) {
2395                 // Elide copying by just swapping buffers.
2396                 task.elements.length = range.buf1.length;
2397                 swap(range.buf1, task.elements);
2398                 range._length -= (task.elements.length - range.bufPos);
2399                 range.doBufSwap();
2400
2401             } else {
2402                 size_t copyIndex = 0;
2403
2404                 if(task.elements.length == 0) {
2405                     task.elements.length = blockSize;
2406                 }
2407
2408                 for(; copyIndex < blockSize && !range.empty; copyIndex++) {
2409                     static if(hasLvalueElements!R) {
2410                         task.elements[copyIndex] = &range.front();
2411                     } else {
2412                         task.elements[copyIndex] = range.front;
2413                     }
2414                     range.popFront;
2415                 }
2416
2417                 // We only actually change the array  size on the last task,
2418                 // when the range is empty.
2419                 task.elements = task.elements[0..copyIndex];
2420             }
2421
2422             pool.lock();
2423             task.startIndex = this.startIndex;
2424             this.startIndex += task.elements.length;
2425             atomicSetUbyte(task.taskStatus, TaskState.notStarted);
2426             pool.abstractPutNoSync(cast(AbstractTask*) &task);
2427             pool.unlock();
2428         }
2429
2430
2431         void submitJobs() {
2432             // Search for slots to recycle.
2433             foreach(ref task; tasks) if(task.done) {
2434                 useTask(task);
2435                 if(range.empty) {
2436                     atomicSetUbyte(doneSubmitting, 1);
2437                     return;
2438                 }
2439             }
2440
2441             // Now that we've submitted all the worker tasks, submit
2442             // the next submission task.  Synchronizing on the pool
2443             // to prevent the stealing thread from deleting the job
2444             // before it's submitted.
2445             pool.lock();
2446             atomicSetUbyte(submitNextBatch.taskStatus, TaskState.notStarted);
2447             pool.abstractPutNoSync( cast(AbstractTask*) &submitNextBatch);
2448             pool.unlock();
2449         }
2450
2451     }
2452     submitNextBatch = task(&submitJobs);
2453
2454     mixin(submitAndSteal);
2455
2456     return 0;
2457 };
2458
2459 private struct ParallelForeach(R) {
2460     TaskPool pool;
2461     R range;
2462     size_t blockSize;
2463     size_t startIndex;
2464     ubyte doneSubmitting;
2465
2466     alias ElementType!R E;
2467
2468     int opApply(scope int delegate(ref E) dg) {
2469         mixin(parallelApplyMixin);
2470     }
2471
2472     int opApply(scope int delegate(ref size_t, ref E) dg) {
2473         mixin(parallelApplyMixin);
2474     }
2475 }
2476
2477 version(unittest) {
2478     // This was the only way I could get nested maps to work.
2479     __gshared TaskPool poolInstance;
2480 }
2481
2482 // These test basic functionality but don't stress test for threading bugs.
2483 // These are the tests that should be run every time Phobos is compiled.
2484 unittest {
2485     poolInstance = new TaskPool(2);
2486     scope(exit) poolInstance.stop;
2487
2488     static void refFun(ref uint num) {
2489         num++;
2490     }
2491
2492     uint x;
2493     auto t = task!refFun(x);
2494     poolInstance.put(t);
2495     t.yieldWait();
2496     assert(t.args[0] == 1);
2497
2498     auto t2 = task(&refFun, x);
2499     poolInstance.put(t2);
2500     t2.yieldWait();
2501     assert(t2.args[0] == 1);
2502
2503     // Test ref return.
2504     uint toInc = 0;
2505     static ref T makeRef(T)(ref T num) {
2506         return num;
2507     }
2508
2509     auto t3 = task!makeRef(toInc);
2510     taskPool.put(t3);//.submit;
2511     assert(t3.args[0] == 0);
2512     t3.spinWait++;
2513     assert(t3.args[0] == 1);
2514
2515     static void testSafe() @safe {
2516         static int bump(int num) {
2517             return num + 1;
2518         }
2519
2520         auto safePool = new TaskPool(0);
2521         auto t = task(&bump, 1);
2522         taskPool.put(t);
2523         assert(t.yieldWait == 2);
2524         safePool.stop;
2525     }
2526
2527     auto arr = [1,2,3,4,5];
2528     auto appNums = appender!(uint[])();
2529     auto appNums2 = appender!(uint[])();
2530     foreach(i, ref elem; poolInstance.parallel(arr)) {
2531         elem++;
2532         synchronized {
2533             appNums.put(cast(uint) i + 2);
2534             appNums2.put(elem);
2535         }
2536     }
2537
2538     uint[] nums = appNums.data, nums2 = appNums2.data;
2539     sort!"a.at!0 < b.at!0"(zip(nums, nums2));
2540     assert(nums == [2,3,4,5,6]);
2541     assert(nums2 == nums);
2542     assert(arr == nums);
2543
2544     // Test parallel foreach with non-random access range.
2545     nums = null;
2546     nums2 = null;
2547     auto range = filter!"a != 666"([0, 1, 2, 3, 4]);
2548     foreach(i, elem; poolInstance.parallel(range)) {
2549         synchronized {
2550             nums ~= cast(uint) i;
2551             nums2 ~= cast(uint) i;
2552         }
2553     }
2554
2555     sort!"a.at!0 < b.at!0"(zip(nums, nums2));
2556     assert(nums == nums2);
2557     assert(nums == [0,1,2,3,4]);
2558
2559
2560     assert(poolInstance.map!"a * a"([1,2,3,4,5]) == [1,4,9,16,25]);
2561     assert(poolInstance.map!("a * a", "-a")([1,2,3]) ==
2562         [tuple(1, -1), tuple(4, -2), tuple(9, -3)]);
2563
2564     auto tupleBuf = new Tuple!(int, int)[3];
2565     poolInstance.map!("a * a", "-a")([1,2,3], tupleBuf);
2566     assert(tupleBuf == [tuple(1, -1), tuple(4, -2), tuple(9, -3)]);
2567     poolInstance.map!("a * a", "-a")([1,2,3], 5, tupleBuf);
2568     assert(tupleBuf == [tuple(1, -1), tuple(4, -2), tuple(9, -3)]);
2569
2570     auto buf = new int[5];
2571     poolInstance.map!"a * a"([1,2,3,4,5], buf);
2572     assert(buf == [1,4,9,16,25]);
2573     poolInstance.map!"a * a"([1,2,3,4,5], 4, buf);
2574     assert(buf == [1,4,9,16,25]);
2575
2576
2577     assert(poolInstance.reduce!"a + b"([1,2,3,4]) == 10);
2578     assert(poolInstance.reduce!"a + b"(5.0, [1,2,3,4]) == 15);
2579     assert(poolInstance.reduce!(min, max)([1,2,3,4]) == tuple(1, 4));
2580     assert(poolInstance.reduce!("a + b", "a * b")(tuple(5, 2), [1,2,3,4]) ==
2581         tuple(15, 48));
2582
2583     // Test worker-local storage.
2584     auto wl = poolInstance.createWorkerLocal(0);
2585     foreach(i; poolInstance.parallel(iota(1000), 1)) {
2586         wl.get = wl.get + i;
2587     }
2588
2589     auto wlRange = wl.toRange;
2590     auto parallelSum = poolInstance.reduce!"a + b"(wlRange);
2591     assert(parallelSum == 499500);
2592     assert(wlRange[0..1][0] == wlRange[0]);
2593     assert(wlRange[1..2][0] == wlRange[1]);
2594
2595     // Test default pool stuff.
2596     assert(taskPool.size == core.cpuid.coresPerCPU - 1);
2597
2598     nums = null;
2599     foreach(i; parallel(iota(1000))) {
2600         synchronized {
2601             nums ~= i;
2602         }
2603     }
2604     sort(nums);
2605     assert(equal(nums, iota(1000)));
2606
2607     assert(equal(
2608         poolInstance.lazyMap!"a * a"(iota(30_000_001), 10_000, 1000),
2609         std.algorithm.map!"a * a"(iota(30_000_001))
2610     ));
2611
2612     // The filter is to kill random access and test the non-random access
2613     // branch.
2614     assert(equal(
2615         poolInstance.lazyMap!"a * a"(
2616             filter!"a == a"(iota(30_000_001)
2617         ), 10_000, 1000),
2618         std.algorithm.map!"a * a"(iota(30_000_001))
2619     ));
2620
2621     assert(
2622         reduce!"a + b"(0UL,
2623             poolInstance.lazyMap!"a * a"(iota(3_000_001), 10_000)
2624         ) ==
2625         reduce!"a + b"(0UL,
2626             std.algorithm.map!"a * a"(iota(3_000_001))
2627         )
2628     );
2629
2630     assert(equal(
2631         iota(1_000_002),
2632         poolInstance.asyncBuf(filter!"a == a"(iota(1_000_002)))
2633     ));
2634
2635     // Test LazyMap/AsyncBuf chaining.
2636     auto lmchain = poolInstance.lazyMap!"a * a"(
2637         poolInstance.lazyMap!sqrt(
2638             poolInstance.asyncBuf(
2639                 iota(3_000_000)
2640             )
2641         )
2642     );
2643
2644     foreach(i, elem; parallel(lmchain)) {
2645         assert(approxEqual(elem, i));
2646     }
2647
2648     auto myTask = task!(std.math.abs)(-1);
2649     taskPool.put(myTask);
2650     assert(myTask.spinWait == 1);
2651
2652     // Test that worker local storage from one pool receives an index of 0
2653     // when the index is queried w.r.t. another pool.  The only way to do this
2654     // is non-deterministically.
2655     foreach(i; parallel(iota(1000), 1)) {
2656         assert(poolInstance.workerIndex == 0);
2657     }
2658
2659     foreach(i; poolInstance.parallel(iota(1000), 1)) {
2660         assert(taskPool.workerIndex == 0);
2661     }
2662 }
2663
2664 version = parallelismStressTest;
2665
2666 // These are more like stress tests than real unit tests.  They print out
2667 // tons of stuff and should not be run every time make unittest is run.
2668 version(parallelismStressTest) {
2669     // These unittests are intended to also function as an example of how to
2670     // use this module.
2671     unittest {
2672         size_t attempt;
2673         for(; attempt < 10; attempt++)
2674         foreach(poolSize; [0, 4]) {
2675
2676             // Create a TaskPool object with the default number of threads.
2677             poolInstance = new TaskPool(poolSize);
2678
2679             // Create some data to work on.
2680             uint[] numbers = new uint[1_000];
2681
2682             // Fill in this array in parallel, using default block size.
2683             // Note:  Be careful when writing to adjacent elements of an arary from
2684             // different threads, as this can cause word tearing bugs when
2685             // the elements aren't properly aligned or aren't the machine's native
2686             // word size.  In this case, though, we're ok.
2687             foreach(i; poolInstance.parallel( iota(0, numbers.length)) ) {
2688                 numbers[i] = cast(uint) i;
2689             }
2690
2691             // Make sure it works.
2692             foreach(i; 0..numbers.length) {
2693                 assert(numbers[i] == i);
2694             }
2695
2696             stderr.writeln("Done creating nums.");
2697
2698             // Parallel foreach also works on non-random access ranges, albeit
2699             // less efficiently.
2700             auto myNumbers = filter!"a % 7 > 0"( iota(0, 1000));
2701             foreach(num; poolInstance.parallel(myNumbers)) {
2702                 assert(num % 7 > 0 && num < 1000);
2703             }
2704             stderr.writeln("Done modulus test.");
2705
2706             // Use parallel map to calculate the square of each element in numbers,
2707             // and make sure it's right.
2708             uint[] squares = poolInstance.map!"a * a"(numbers, 100);
2709             assert(squares.length == numbers.length);
2710             foreach(i, number; numbers) {
2711                 assert(squares[i] == number * number);
2712             }
2713             stderr.writeln("Done squares.");
2714
2715             // Sum up the array in parallel with the current thread.
2716             auto sumFuture = poolInstance.task!( reduce!"a + b" )(numbers);
2717
2718             // Go off and do other stuff while that future executes:
2719             // Find the sum of squares of numbers.
2720             ulong sumSquares = 0;
2721             foreach(elem; numbers) {
2722                 sumSquares += elem * elem;
2723             }
2724
2725             // Ask for our result.  If the pool has not yet started working on
2726             // this task, spinWait() automatically steals it and executes it in this
2727             // thread.
2728             uint mySum = sumFuture.spinWait();
2729             assert(mySum == 999 * 1000 / 2);
2730
2731             // We could have also computed this sum in parallel using parallel
2732             // reduce.
2733             auto mySumParallel = poolInstance.reduce!"a + b"(numbers);
2734             assert(mySum == mySumParallel);
2735             stderr.writeln("Done sums.");
2736
2737             // Execute an anonymous delegate as a task.
2738             auto myTask = task({
2739                 synchronized writeln("Our lives are parallel...Our lives are parallel.");
2740             });
2741             poolInstance.put(myTask);
2742
2743             // Parallel foreach loops can also be nested, and can have an index
2744             // variable attached to the foreach loop.
2745             auto nestedOuter = "abcd";
2746             auto nestedInner =  iota(0, 10, 2);
2747
2748             foreach(i, letter; poolInstance.parallel(nestedOuter, 1)) {
2749                 foreach(j, number; poolInstance.parallel(nestedInner, 1)) {
2750                     synchronized writeln
2751                         (i, ": ", letter, "  ", j, ": ", number);
2752                 }
2753             }
2754
2755             // Block until all jobs are finished and then shut down the thread pool.
2756             poolInstance.join();
2757         }
2758
2759         assert(attempt == 10);
2760         writeln("Press enter to go to next round of unittests.");
2761         readln();
2762     }
2763
2764     // These unittests are intended more for actual testing and not so much
2765     // as examples.
2766     unittest {
2767         foreach(attempt; 0..10)
2768         foreach(poolSize; [0, 4]) {
2769             poolInstance = new TaskPool(poolSize);
2770
2771             // Test indexing.
2772             stderr.writeln("Creator Raw Index:  ", poolInstance.threadIndex);
2773             assert(poolInstance.workerIndex() == 0);
2774
2775             // Test worker-local storage.
2776             auto workerLocal = poolInstance.createWorkerLocal!(uint)(1);
2777             foreach(i; poolInstance.parallel(iota(0U, 1_000_000))) {
2778                 workerLocal.get++;
2779             }
2780             assert(reduce!"a + b"(workerLocal.toRange) ==
2781                 1_000_000 + poolInstance.size + 1);
2782
2783             // Make sure work is reasonably balanced among threads.  This test is
2784             // non-deterministic and is more of a sanity check than something that
2785             // has an absolute pass/fail.
2786             uint[void*] nJobsByThread;
2787             foreach(thread; poolInstance.pool) {
2788                 nJobsByThread[cast(void*) thread] = 0;
2789             }
2790             nJobsByThread[ cast(void*) Thread.getThis] = 0;
2791
2792             foreach(i; poolInstance.parallel( iota(0, 1_000_000), 100 )) {
2793                 atomicIncUint( nJobsByThread[ cast(void*) Thread.getThis() ]);
2794             }
2795
2796             stderr.writeln("\nCurrent (stealing) thread is:  ",
2797                 cast(void*) Thread.getThis());
2798             stderr.writeln("Workload distribution:  ");
2799             foreach(k, v; nJobsByThread) {
2800                 stderr.writeln(k, '\t', v);
2801             }
2802
2803             // Test whether map can be nested.
2804             real[][] matrix = new real[][](1000, 1000);
2805             foreach(i; poolInstance.parallel( iota(0, matrix.length) )) {
2806                 foreach(j; poolInstance.parallel( iota(0, matrix[0].length) )) {
2807                     matrix[i][j] = i * j;
2808                 }
2809             }
2810
2811             // Get around weird bugs having to do w/ sqrt being an intrinsic:
2812             static real mySqrt(real num) {
2813                 return sqrt(num);
2814             }
2815
2816             static real[] parallelSqrt(real[] nums) {
2817                 return poolInstance.map!mySqrt(nums);
2818             }
2819
2820             real[][] sqrtMatrix = poolInstance.map!parallelSqrt(matrix);
2821
2822             foreach(i, row; sqrtMatrix) {
2823                 foreach(j, elem; row) {
2824                     real shouldBe = sqrt( cast(real) i * j);
2825                     assert(approxEqual(shouldBe, elem));
2826                     sqrtMatrix[i][j] = shouldBe;
2827                 }
2828             }
2829
2830             auto saySuccess = task({
2831                 stderr.writeln(
2832                     "Success doing matrix stuff that involves nested pool use.");
2833             });
2834             poolInstance.put(saySuccess);
2835             saySuccess.workWait();
2836
2837             // A more thorough test of map, reduce:  Find the sum of the square roots of
2838             // matrix.
2839
2840             static real parallelSum(real[] input) {
2841                 return poolInstance.reduce!"a + b"(input);
2842             }
2843
2844             auto sumSqrt = poolInstance.reduce!"a + b"(
2845                 poolInstance.map!parallelSum(
2846                     sqrtMatrix
2847                 )
2848             );
2849
2850             assert(approxEqual(sumSqrt, 4.437e8));
2851             stderr.writeln("Done sum of square roots.");
2852
2853             // Test whether tasks work with function pointers.
2854             auto nanTask = poolInstance.task(&isNaN, 1.0L);
2855             assert(nanTask.spinWait == false);
2856
2857             if(poolInstance.size > 0) {
2858                 // Test work waiting.
2859                 static void uselessFun() {
2860                     foreach(i; 0..1_000_000) {}
2861                 }
2862
2863                 auto uselessTasks = new typeof(task(&uselessFun))[1000];
2864                 foreach(ref uselessTask; uselessTasks) {
2865                     uselessTask = task(&uselessFun);
2866                 }
2867                 foreach(ref uselessTask; uselessTasks) {
2868                     poolInstance.put(uselessTask);
2869                 }
2870                 foreach(ref uselessTask; uselessTasks) {
2871                     uselessTask.workWait();
2872                 }
2873             }
2874
2875             // Test the case of non-random access + ref returns.
2876             int[] nums = [1,2,3,4,5];
2877             static struct RemoveRandom {
2878                 int[] arr;
2879
2880                 ref int front() { return arr.front; }
2881                 void popFront() { arr.popFront(); }
2882                 bool empty() { return arr.empty; }
2883             }
2884
2885             auto refRange = RemoveRandom(nums);
2886             foreach(ref elem; poolInstance.parallel(refRange)) {
2887                 elem++;
2888             }
2889             assert(nums == [2,3,4,5,6]);
2890             stderr.writeln("Nums:  ", nums);
2891
2892             poolInstance.stop();
2893         }
2894     }
2895 }
2896
2897 void main() {}
Note: See TracBrowser for help on using the browser.