root/trunk/tools/tools/threads.d

Revision 843, 15.8 kB (checked in by FeepingCreature, 1 year ago)

Accumulated stuff over the last few years

Line 
1 module tools.threads;
2
3 import tools.page_queue, tools.base, tools.compat;
4 import tools.behave_as;
5
6 Object ThreadSync, GenSync;
7 static this() { New(ThreadSync); New(GenSync); }
8
9 interface LockImpl {
10   void sys_lock(); void sys_unlock();
11 }
12
13 class Lock {
14   LockImpl syslock;
15   this() { version(Windows) syslock = new Win32Lock; else syslock = new PosixLock; }
16   import tools.log;
17   invariant() { if (!syslock) { logln("Lock violation!"); *(cast(int*) null) = 0; } }
18   Thread owner; int count;
19   // removed final, due to GDC bug. TODO: restore
20   void lock() {
21     auto thr = Thread.getThis();
22     if (owner !is thr) {
23       syslock.sys_lock;
24       if (owner) throw new Exception("We have the lock but there's a previous owner - wtf?");
25       if (count) throw new Exception("Lock newly acquired but nesting count set - wtf?");
26       owner = thr;
27     }
28     count ++;
29   }
30   void unlock() {
31     auto thr = Thread.getThis();
32     if (owner !is thr) throw new Exception("Trying to unlock a lock that you don't hold - wtf?");
33     if (!count) throw new Exception("Trying to unlock a lock more times than you locked it - wtf?");
34     count --;
35     if (!count) { owner = null; syslock.sys_unlock; }
36   }
37   void Synchronized(void delegate()[] dgs...) { lock; scope(exit) unlock; foreach (dg; dgs) dg(); }
38   void Unsynchronized(void delegate()[] dgs...) { unlock; scope(exit) lock; foreach (dg; dgs) dg(); }
39 }
40
41 final class CondVar {
42   bool blocking;
43   // HAAAAAAAAAAAAX
44   void wait(Lock unlock = null) { blocking=true; if (unlock) unlock.unlock; while(blocking) slowyield(); if (unlock) unlock.lock; }
45   void signal() { blocking=false; }
46 }
47
48 version(Windows) {
49   struct crits {
50     void* DebugInfo;
51     int LockCount, RecursionCount;
52     void* OwningThread, LockSemaphore;
53     uint SpinCount;
54   }
55   extern(Windows) {
56     void InitializeCriticalSection(crits*);
57     void EnterCriticalSection(crits*);
58     void LeaveCriticalSection(crits*);
59     void DeleteCriticalSection(crits*);
60   }
61   class Win32Lock : LockImpl {
62     crits crit;
63     this() { InitializeCriticalSection(&crit); }
64     ~this() { DeleteCriticalSection(&crit); }
65     override void sys_lock() { EnterCriticalSection(&crit); }
66     override void sys_unlock() { LeaveCriticalSection(&crit); }
67   }
68   extern(Windows) {
69     void* CreateSemaphoreA(void*, int, int, char* name = null); alias CreateSemaphoreA CreateSemaphore;
70     bool CloseHandle (void*);
71     const INFTE = uint.max; // not INFINITE: overlaps with math
72     uint WaitForSingleObject(void*, uint);
73     bool ReleaseSemaphore (void*, int, int*);
74   }
75   final class Semaphore {
76     const WAIT_TIMEOUT = 0x0000_0102, WAIT_ABANDONED = 0x0000_0080;
77     private void* handle;
78     this(int count=0) { handle=CreateSemaphore (null, count, int.max); }
79     ~this() { CloseHandle (handle); }
80     bool try_acquire() {
81       auto res = WaitForSingleObject (handle, 0);
82       if (res == WAIT_TIMEOUT) return false;
83       if (res == WAIT_ABANDONED) throw new Exception("State violation: thread abandoned semaphore!");
84       return true;
85     }
86     void acquire() {
87       auto res = WaitForSingleObject (handle, INFTE);
88       if (res == WAIT_TIMEOUT) throw new Exception("Reached end of time. Aborting.");
89       if (res == WAIT_ABANDONED) throw new Exception("State violation: thread abandoned semaphore!");
90     }
91     void release() {
92       auto res = ReleaseSemaphore (handle, 1, null);
93       if (!res) throw new Exception("Cannot release semaphore - win32 error!");
94     }
95   }
96 } else {
97   extern(C) char* strerror(int);
98   class PosixLock : LockImpl {
99     pthread_mutex_t mutex;
100     this() {
101       if (auto res = pthread_mutex_init(&mutex, null))
102         throw new Exception(Format("Cannot create Mutex - ", res));
103     }
104     ~this() { pthread_mutex_destroy(&mutex); }
105     override void sys_lock() {
106       if (auto res = pthread_mutex_lock(&mutex))
107         throw new Exception(Format("Cannot lock mutex - ", res));
108     }
109     override void sys_unlock() {
110       if (auto res = pthread_mutex_unlock(&mutex))
111         throw new Exception(Format("Cannot unlock mutex - ", res));
112     }
113   }
114   extern(C) {
115     version(freebsd) alias void* pthread_cond_t;
116     else struct pthread_cond_t { ubyte[48] data; }
117     int pthread_cond_init(pthread_cond_t*, void* attr);
118     int pthread_cond_destroy(pthread_cond_t*);
119     int pthread_cond_signal(pthread_cond_t*);
120     int pthread_cond_broadcast(pthread_cond_t*);
121     int pthread_cond_wait(pthread_cond_t*, pthread_mutex_t* mutex);
122   }
123   // Something is broken with condvar
124   // use the default impl instead
125   // removed
126   static if (!is(typeof(EAGAIN))) const EAGAIN = 11;
127   // Yay, a GDC bug >_< TODO: remove on latter versions
128   /*final */class Semaphore {
129     private sem_t handle;
130     this(int count=0) { if (sem_init(&handle, false, count) == -1) throw new Exception("sem_init failed!"); }
131     ~this() { sem_destroy(&handle); }
132     template sem_fn(string S) {
133       const string sem_fn = "
134         static if (!is(typeof(EINVAL))) const EINVAL = 22;
135         typeof("~S~") res;
136         do {
137           res = "~S~";
138           if (res == -1) {
139             auto errno = getErrno();
140             if (errno == EINTR) continue;
141             if (errno == EINVAL) throw new Exception(\""~S~" failed: not a semaphore!\");
142             if (errno == EAGAIN) break; // sem_trywait
143             throw new Exception(Format(\""~S~": unknown error: \", strerror(errno), \"!\"));
144           }
145           break;
146         } while (true);
147       ";
148     }
149     void acquire() { mixin(sem_fn!("sem_wait(&handle)")); }
150     void release() { mixin(sem_fn!("sem_post(&handle)")); }
151     bool try_acquire() {
152       mixin(sem_fn!("sem_trywait(&handle)"));
153       return !res;
154     }
155   }
156 }
157
158 class ThreadBlock {
159   private Thread which;
160   private char[] mesg="!";
161   this(char[] msg="") { if (msg.length) mesg=": "~msg; }
162   void checkSame() {
163     if (which) {
164       if (which!=Thread.getThis) throw new Exception("ThreadBlock failed (checkSame)"~mesg);
165     } else which = Thread.getThis();
166   }
167   void checkDifferent() { if (which && which==Thread.getThis) throw new Exception("ThreadBlock failed (checkDifferent)"~mesg); }
168 }
169
170 class DifferentThreadsBlock(int count) {
171   private { ThreadBlock[count] block; char[] mesg; }
172   this(char[] mesg) { this.mesg=mesg; foreach (ref bl; block) New(bl, mesg.length?"Thread "~.toString(which)~": "~mesg:""); }
173   void opIn(int which) {
174     if ((which<0)||(which!<count)) throw new Exception("Trying to block on invalid thread in DifferentThreadsBlock!");
175     synchronized(this) {
176       block[which].checkSame;
177       foreach (id, b; block) if (id!=which) b.checkDifferent;
178     }
179   }
180   void resume() { foreach (b; block) if (b.which) b.which.resume; }
181 }
182
183 import tools.log;
184 class MessageMultiChannel(T, bool SingleGet, bool SinglePut) {
185   private {
186     PageQueue!(T) mesgs;
187     bool[Thread] waiting_threads;
188   }
189   static if (SinglePut) ThreadBlock put_block;
190   static if (SingleGet) ThreadBlock get_block;
191   Semaphore sem;
192   this() {
193     static if (SinglePut) New (put_block, "MessageMultiChannel_put");
194     static if (SingleGet) New (get_block, "MessageMultiChannel_get");
195     New(sem);
196     New(mesgs);
197   }
198   int messages() { return mesgs.length; }
199   bool active() { return mesgs.has; }
200   void put(T t) {
201     static if (SingleGet) get_block.checkDifferent;
202     static if (SinglePut) {
203       put_block.checkSame;
204       mesgs.push(t);
205     } else synchronized(this) mesgs.push(t);
206     sem.release;
207   }
208   bool try_get(out T t) {
209     static if (SinglePut) put_block.checkDifferent;
210     static if (SingleGet) {
211       get_block.checkSame;
212       if (sem.try_acquire) {
213         assert (mesgs.has);
214         t = mesgs.pop;
215         return true;
216       } else return false;
217     } else {
218       if (sem.try_acquire) {
219         assert (mesgs.has);
220         synchronized(this) t = mesgs.pop;
221         return true;
222       } else return false;
223     }
224   }
225   T get() {
226     static if (SinglePut) put_block.checkDifferent;
227     static if (SingleGet) {
228       get_block.checkSame;
229       sem.acquire;
230       assert (mesgs.has);
231       return mesgs.pop;
232     } else {
233       sem.acquire;
234       synchronized(this) {
235         if (!mesgs.has) {
236           logln("Semaphore acquired but no message available!");
237           int* ip; *ip = 0;
238         }
239         return mesgs.pop;
240       }
241     }
242   }
243 }
244
245 template MessageChannel(T) { alias MessageMultiChannel!(T, true, true) MessageChannel; }
246
247 template _SyncObj(alias A) { Object obj; }
248 Object SyncObj(alias A)() {
249   mixin(ctReplace(`
250   if (!·)
251     synchronized(GenSync)
252       if (!·)
253         New(·);
254   return ·;
255   `, `·`, `_SyncObj!(A).obj`));
256 }
257
258 bool StackGrowsDown;
259 static this() {
260   selfcall((void delegate() dg) { int a; StackGrowsDown = &a < dg.ptr; })();
261 }
262
263 struct ThreadInfo {
264   void* low, high;
265   void* base() {
266     if (StackGrowsDown) return high;
267     else return low;
268   }
269   void* base(void* nv) {
270     if (StackGrowsDown) return high = nv;
271     else return low = nv;
272   }
273   void* tip() {
274     if (StackGrowsDown) return low;
275     else return high;
276   }
277   void* tip(void* nv) {
278     if (StackGrowsDown) return low = nv;
279     else return high = nv;
280   }
281   void *[] list;
282   void setEntry(int id, void* p) {
283     if (id !< list.length) list.length = id + 1;
284     list[id] = p;
285   }
286   void* getEntry(int id) {
287     // logln("Get ", id, " from ", cast(void*) this);
288     if (id !< list.length) list.length = id + 1;
289     return list[id];
290   }
291   bool opIn_r(void* p) {
292     // logln(p, " in ", low, " .. ", high);
293     return p >= low && p <= high;
294   }
295 }
296
297 ThreadInfo[] threads; // immutable .. kind of.
298
299 version(Tango) { const NoGoodThread = true; }
300 else version(Win32) { const NoGoodThread = true; }
301 else const NoGoodThread = false;
302 static if (NoGoodThread) {
303   // TODO: unbreak
304   pragma(msg, "Multithreading TLS is unimplemented and buggy! ");
305   void* getBottom(Thread thr) { return cast(void*) -1; }
306   void* getTop(Thread thr) { return cast(void*) 1; }
307   Stuple!(void*, void*) getStackRange(Thread thr) { return stuple(getBottom(thr), getTop(thr)); }
308 } else {
309   import std.thread : Thread;
310   import std.c.unix.unix;
311   extern(C) void* __libc_stack_end;
312   struct rlimit {
313     int rlim_cur, rlim_max;
314     string toString() { return Format("cur ", rlim_cur, " max ", rlim_max); }
315   }
316   const RLIMIT_STACK = 3;
317   extern(C) int getrlimit(int, rlimit*);
318   Stuple!(void*, void*) getStackRange(Thread thr) {
319     if (thr.stackBottom) return stuple(thr.stackBottom, thr.stackTop);
320     void check(string s, int i) {
321       if (i != 0) {
322         logln("! ", s, ": ", i);
323         fail;
324       }
325     }
326     pthread_attr_t attr;
327     if (pthread_getattr_np(pthread_self(), &attr) != 0) {
328       // fallback
329       // assume we're the main thread. (!)
330       auto maps = cast(string) std.file.read("/proc/self/maps");
331       foreach (line; maps.split("\n")) {
332         if (line.find("[stack]") != -1) {
333           auto words = line.between("", " ").split("-");
334           uint hex2int(string s) {
335             uint res;
336             foreach (ch; s) {
337               if (ch >= '0' && ch <= '9') res = res * 16 + ch - '0';
338               else if (ch >= 'a' && ch <= 'f') res = res * 16 + ch - 'a' + 10;
339               else if (ch >= 'A' && ch <= 'F') res = res * 16 + ch - 'A' + 10;
340               else fail;
341             }
342             return res;
343           }
344           return stuple(cast(void*) hex2int(words[1]), cast(void*) hex2int(words[0]));
345         }
346       }
347     }
348     void* bottom; size_t size;
349     check("pthread_attr_getstack", pthread_attr_getstack(&attr, &bottom, &size));
350     pthread_attr_destroy(&attr);
351     return stuple(bottom + size, bottom);
352   }
353 }
354
355 ThreadInfo* lookupThread(bool doAdd = true) {
356   int local;
357   auto lp = &local; // pointer of a local variable
358   if (threads.length == 1 && lp in threads[0]) return &threads[0]; // trivial case
359   // binary search the thread list
360   auto low = 0, high = threads.length;
361   while (low != high) {
362     // remember, high is +1 to emulate slice semantics
363     auto pivot = (low + high - 1) / 2;
364     if (high < low) fail;
365     if (lp in threads[pivot]) {
366       return &threads[pivot];
367     }
368     if (lp < threads[pivot].low) high = pivot;
369     if (lp > threads[pivot].high) low = pivot + 1;
370   }
371   // logln("Add/extend thread. ");
372   // thread not found. Query GC for thread info and look up again
373   auto thr = Thread.getThis();
374   void* bottom, top;
375   ptuple(bottom, top) = getStackRange(thr);
376   if (!top) top = bottom;
377   if (!bottom) throw new Exception("Need a reliable thread bottom");
378   foreach (ref thread; threads) {
379     if (thread.base == bottom) {
380       if (lp in thread) {
381         // Must have missed that one?
382         // Alternately, it was added in-between. Meh. Works for me.
383         return &thread;
384       }
385       if (top /notin/ thread)
386         thread.tip = top; // correct it
387       if (lp /notin/ thread)
388         thread.tip = lp;
389       return &thread;
390     }
391     if (bottom in thread) {
392       throw new Exception(Format(bottom, " in ", thread, ": fuck-up"));
393     }
394   }
395   if (!doAdd) return null;
396   // Entire new thread entry required
397   synchronized {
398     // Double Checked Locking Biatch
399     if (auto res = lookupThread(false)) return res;
400     ThreadInfo nthread;
401     nthread.tip = top;
402     nthread.base = bottom;
403     if (lp /notin/ nthread)
404       nthread.tip = lp;
405     threads = threads[0 .. low] ~ nthread ~ threads[low .. $]; // reallocate!
406     return &threads[low];
407   }
408 }
409
410 int tls_max_key;
411 int[] freelist;
412
413 int getKey() {
414   synchronized(SyncObj!(tls_max_key)) {
415     if (freelist.length) return freelist.take();
416     else return tls_max_key ++;
417   }
418 }
419
420 template TLS(T) {
421   final class TLS {
422     static if (is(typeof(T.classinfo))) alias T Ref;
423     else alias T* Ref;
424     Ref delegate() maker;
425     int key;
426     private Ref[] values;
427     static if (is(typeof(T.classinfo))) void each(void delegate(T) dg) { foreach (value; values) dg(value); }
428     else void each(void delegate(T) dg) { foreach (value; values) dg(*value); }
429     this(typeof(maker) m = null) {
430       key = getKey();
431       if (m) maker = m;
432       else {
433         static if (is(typeof(T.classinfo)))
434           static if (is(T: Object))
435             static if (is(typeof(new T)))
436               maker = { return new T; };
437             else assert(false);
438           else maker = { return cast(T) null; };
439         else maker = { return &(new Stuple!(T))._0; };
440       }
441     }
442     static if (is(typeof(T.classinfo))) {
443       this(void delegate(T) i) {
444         this(i /apply/ (typeof(i) i) {
445           static if (is(T: Object))
446             static if (is(typeof(new T)))
447               auto t = new T;
448             else {
449               T t;
450               assert(false);
451             }
452           else {
453             T t;
454           }
455           i(t);
456           return t;
457         });
458       }
459     } else {
460       this(void delegate(ref T) i) {
461         this(i /apply/ (typeof(i) i) {
462           auto t = &(new Stuple!(T))._0;
463           i(*t);
464           return t;
465         });
466       }
467     }
468     void set(Ref r) {
469       lookupThread().setEntry(key, cast(void*) r);
470     }
471     Ref check() { return cast(Ref) lookupThread().getEntry(key); }
472     Ref ptr() {
473       auto res = cast(Ref) lookupThread().getEntry(key);
474       if (!res) {
475         res = maker();
476         if (res) {
477           synchronized(this) {
478             values ~= res;
479           }
480           set(res);
481         }
482       }
483       return res;
484     }
485     static if (canEmulate!(T)) {
486       PointerProxy!(T) opCall() {
487         PointerProxy!(T) res = void;
488         res.ptr = ptr();
489         return res;
490       }
491     } else alias ptr opCall;
492     T value() {
493       static if (is(typeof(T.classinfo))) return ptr();
494       else return *ptr();
495     }
496     void dealloc() {
497       synchronized(SyncObj!(tls_max_key)) {
498         freelist ~= key;
499       }
500     }
501   }
502 }
503
504 version(DS) { }
505 else {
506   struct Sync(T) {
507     static assert(canEmulate!(T), "Cannot emulate "~T.stringof~": sync not possible!");
508     T value; Lock lock;
509     T access() { return value; } T access(T t) { return value = t; }
510     mixin(PropertyForward!("access",
511       "if (!lock) volatile synchronized if (!lock) lock = new Lock; lock.lock(); scope(exit) lock.unlock();"
512     ));
513   }
514 }
515
516 static TLS!(int) ThreadID;
517 static int numThreads;
518 static this() {
519   New(ThreadID, (ref int i) {
520     synchronized i = numThreads ++;
521   });
522 }
Note: See TracBrowser for help on using the browser.