root/trunk/infrastructure/st/stackthread.d

Revision 40, 54.6 kB (checked in by KirkMcDonald, 2 years ago)

Meta-programming library replaced; large re-write. StackThreads? updated to 0.3.2. Iteration updates.

Line 
1 /******************************************************
2  * StackThreads are userland, cooperative, lightweight
3  * threads. StackThreads are very efficient, requiring
4  * much less time per context switch than real threads.
5  * They also require far fewer resources than real
6  * threads, which allows many more StackThreads to exist
7  * simultaneously. In addition, StackThreads do not
8  * require explicit synchronization since they are
9  * non-preemptive.  There is no requirement that code
10  * be reentrant.
11  *
12  * This module implements the stack thread system on top
13  * of the context layer.
14  *
15  * Version: 0.3
16  * Date: July 4, 2006
17  * Authors:
18  *  Mikola Lysenko, mclysenk@mtu.edu
19  * License: Use/copy/modify freely, just give credit.
20  * Copyright: Public domain.
21  *
22  * Bugs:
23  *  Not thread safe.  May be changed in future versions,
24  *  however this will require a radical refactoring.
25  *
26  * History:
27  *  v0.7 - Switched timing resolution to milliseconds.
28  *
29  *  v0.6 - Removed timing functions from st_yield/st_throwYield
30  *
31  *  v0.5 - Addded st_throwYield and MAX/MIN_THREAD_PRIORITY
32  *
33  *  v0.4 - Unittests finished-ready for an initial release.
34  *
35  *  v0.3 - Changed name back to StackThread and added
36  *      linux support.  Context switching is now handled
37  *      in the stackcontext module, and much simpler to
38  *      port.
39  *
40  *  v0.2 - Changed name to QThread, fixed many issues.
41  * 
42  *  v0.1 - Initial stack thread system. Very buggy.
43  *
44  ******************************************************/
45 module st.stackthread;
46
47 //Module imports
48 private import
49     st.stackcontext,
50     std.stdio,
51     std.string;
52
53 /// The priority of a stack thread determines its order in
54 /// the scheduler.  Higher priority threads go first.
55 alias int priority_t;
56
57 /// The default priority for a stack thread is 0.
58 const priority_t DEFAULT_STACKTHREAD_PRIORITY = 0;
59
60 /// Maximum thread priority
61 const priority_t MAX_STACKTHREAD_PRIORITY = 0x7fffffff;
62
63 /// Minimum thread priority
64 const priority_t MIN_STACKTHREAD_PRIORITY = 0x80000000;
65
66 /// The state of a stack thread
67 enum THREAD_STATE
68 {
69     READY,      /// Thread is ready to run
70     RUNNING,    /// Thread is currently running
71     DEAD,       /// Thread has terminated
72     SUSPENDED,  /// Thread is suspended
73 }
74
75 /// The state of the scheduler
76 enum SCHEDULER_STATE
77 {
78     READY,      /// Scheduler is ready to run a thread
79     RUNNING,    /// Scheduler is running a timeslice
80 }
81
82 //Timeslices
83 private STPriorityQueue active_slice;
84 private STPriorityQueue next_slice;
85
86 //Scheduler state
87 private SCHEDULER_STATE sched_state;
88    
89 //Start time of the time slice
90 private ulong sched_t0;
91
92 //Currently active stack thread
93 private StackThread sched_st;
94
95 version(Win32)
96 {
97     private extern(Windows) int QueryPerformanceFrequency(ulong *);
98     private ulong sched_perf_freq;
99 }
100
101
102 //Initialize the scheduler
103 static this()
104 {
105     active_slice = new STPriorityQueue();
106     next_slice = new STPriorityQueue();
107     sched_state = SCHEDULER_STATE.READY;
108     sched_t0 = -1;
109     sched_st = null;
110    
111     version(Win32)
112         QueryPerformanceFrequency(&sched_perf_freq);
113 }
114
115
116 /******************************************************
117  * StackThreadExceptions are generated whenever the
118  * stack threads are incorrectly invoked.  Trying to
119  * run a time slice while a time slice is in progress
120  * will result in a StackThreadException.
121  ******************************************************/
122 class StackThreadException : Exception
123 {
124     this(char[] msg)
125     {
126         super(msg);
127     }
128    
129     this(StackThread st, char[] msg)
130     {
131         super(format("%s: %s", st.toString, msg));
132     }
133 }
134
135
136
137 /******************************************************
138  * StackThreads are much like regular threads except
139  * they are cooperatively scheduled.  A user may switch
140  * between StackThreads using st_yield.
141  ******************************************************/
142 class StackThread
143 {
144     /**
145      * Creates a new stack thread and adds it to the
146      * scheduler.
147      *
148      * Params:
149      *  dg = The delegate we are invoking
150      *  stack_size = The size of the stack for the stack
151      *  thread.
152      *  priority = The priority of the stack thread.
153      */
154     public this
155     (
156         void delegate() dg,
157         priority_t priority = DEFAULT_STACKTHREAD_PRIORITY,
158         size_t stack_size = DEFAULT_STACK_SIZE
159     )
160     {
161         this.m_delegate = dg;
162         this.context = new StackContext(&m_proc, DEFAULT_STACK_SIZE);
163         this.m_priority = priority;
164        
165         //Schedule the thread
166         st_schedule(this);
167        
168         debug (StackThread) writefln("Created thread, %s", toString);
169     }
170    
171     /**
172      * Creates a new stack thread and adds it to the
173      * scheduler, using a function pointer.
174      *
175      * Params:
176      *  fn = The function pointer that the stack thread
177      *  invokes.
178      *  stack_size = The size of the stack for the stack
179      *  thread.
180      *  priority = The priority of the stack thread.
181      */
182     public this
183     (
184         void function() fn,
185         priority_t priority = DEFAULT_STACKTHREAD_PRIORITY,
186         size_t stack_size = DEFAULT_STACK_SIZE
187     )
188     {
189         this.m_delegate = &delegator;
190         this.m_function = fn;
191         this.context = new StackContext(&m_proc, DEFAULT_STACK_SIZE);
192         this.m_priority = priority;
193        
194         //Schedule the thread
195         st_schedule(this);
196        
197         debug (StackThread) writefln("Created thread, %s", toString);
198     }
199    
200     /**
201      * Converts the thread to a string.
202      *
203      * Returns: A string representing the stack thread.
204      */
205     public char[] toString()
206     {
207         debug(PQueue)
208         {
209             return format("ST[t:%8x,p:%8x,l:%8x,r:%8x]",
210                 cast(void*)this,
211                 cast(void*)parent,
212                 cast(void*)left,
213                 cast(void*)right);
214         }
215         else
216         {
217         static char[][] state_names =
218         [
219             "RDY",
220             "RUN",
221             "XXX",
222             "PAU",
223         ];
224        
225         //horrid hack for getting the address of a delegate
226         union hack
227         {
228             struct dele
229             {
230                 void * frame;
231                 void * fptr;
232             }
233            
234             dele d;
235             void delegate () dg;
236         }
237         hack h;
238         if(m_function !is null)
239             h.d.fptr = cast(void*) m_function;
240         else if(m_delegate !is null)
241             h.dg = m_delegate;
242         else
243             h.dg = &run;
244        
245         return format(
246             "Thread[pr=%d,st=%s,fn=%8x]",
247             priority,
248             state_names[cast(uint)state],
249             h.d.fptr);
250         }
251     }
252    
253     invariant
254     {
255         assert(context);
256        
257         switch(state)
258         {
259             case THREAD_STATE.READY:
260                 assert(context.ready);
261             break;
262            
263             case THREAD_STATE.RUNNING:
264                 assert(context.running);
265             break;
266            
267             case THREAD_STATE.DEAD:
268                 assert(!context.running);
269             break;
270            
271             case THREAD_STATE.SUSPENDED:
272                 assert(context.ready);
273             break;
274
275             default: assert(false);
276         }
277        
278         if(left !is null)
279         {
280             assert(left.parent is this);
281         }
282        
283         if(right !is null)
284         {
285             assert(right.parent is this);
286         }
287     }
288    
289     /**
290      * Removes this stack thread from the scheduler. The
291      * thread will not be run until it is added back to
292      * the scheduler.
293      */
294     public final void pause()
295     {
296         debug (StackThread) writefln("Pausing %s", toString);
297        
298         switch(state)
299         {
300             case THREAD_STATE.READY:
301                 st_deschedule(this);
302                 state = THREAD_STATE.SUSPENDED;
303             break;
304            
305             case THREAD_STATE.RUNNING:
306                 transition(THREAD_STATE.SUSPENDED);
307             break;
308            
309             case THREAD_STATE.DEAD:
310                 throw new StackThreadException(this, "Cannot pause a dead thread");
311            
312             case THREAD_STATE.SUSPENDED:
313                 throw new StackThreadException(this, "Cannot pause a paused thread");
314
315             default: assert(false);
316         }
317     }
318    
319     /**
320      * Adds the stack thread back to the scheduler. It
321      * will resume running with its priority & state
322      * intact.
323      */
324     public final void resume()
325     {
326         debug (StackThread) writefln("Resuming %s", toString);
327        
328         //Can only resume paused threads
329         if(state != THREAD_STATE.SUSPENDED)
330         {
331             throw new StackThreadException(this, "Thread is not suspended");
332         }
333        
334         //Set state to ready and schedule
335         state = THREAD_STATE.READY;
336         st_schedule(this);
337     }
338    
339     /**
340      * Kills this stack thread in a violent manner.  The
341      * thread does not get a chance to end itself or clean
342      * anything up, it is descheduled and all GC references
343      * are released.
344      */
345     public final void kill()
346     {
347         debug (StackThread) writefln("Killing %s", toString);
348        
349         switch(state)
350         {
351             case THREAD_STATE.READY:
352                 //Kill thread and remove from scheduler
353                 st_deschedule(this);
354                 state = THREAD_STATE.DEAD;
355                 context.kill();
356             break;
357            
358             case THREAD_STATE.RUNNING:
359                 //Transition to dead
360                 transition(THREAD_STATE.DEAD);
361             break;
362            
363             case THREAD_STATE.DEAD:
364                 throw new StackThreadException(this, "Cannot kill already dead threads");
365            
366             case THREAD_STATE.SUSPENDED:
367                 //We need to kill the stack, no need to touch scheduler
368                 state = THREAD_STATE.DEAD;
369                 context.kill();
370             break;
371
372             default: assert(false);
373         }
374     }
375    
376     /**
377      * Waits to join with this thread.  If the given amount
378      * of milliseconds expires before the thread is dead,
379      * then we return automatically.
380      *
381      * Params:
382      *  ms = The maximum amount of time the thread is
383      *  allowed to wait. The special value -1 implies that
384      *  the join will wait indefinitely.
385      *
386      * Returns:
387      *  The amount of millieconds the thread was actually
388      *  waiting.
389      */
390     public final ulong join(ulong ms = -1)
391     {
392         debug (StackThread) writefln("Joining %s", toString);
393        
394         //Make sure we are in a timeslice
395         if(sched_state != SCHEDULER_STATE.RUNNING)
396         {
397             throw new StackThreadException(this, "Cannot join unless a timeslice is currently in progress");
398         }
399        
400         //And make sure we are joining with a valid thread
401         switch(state)
402         {
403             case THREAD_STATE.READY:
404                 break;
405            
406             case THREAD_STATE.RUNNING:
407                 throw new StackThreadException(this, "A thread cannot join with itself!");
408            
409             case THREAD_STATE.DEAD:
410                 throw new StackThreadException(this, "Cannot join with a dead thread");
411            
412             case THREAD_STATE.SUSPENDED:
413                 throw new StackThreadException(this, "Cannot join with a paused thread");
414
415             default: assert(false);
416         }
417        
418         //Do busy waiting until the thread dies or the
419         //timer runs out.
420         ulong start_time = getSysMillis();
421         ulong timeout = (ms == -1) ? ms : start_time + ms;
422        
423         while(
424             state != THREAD_STATE.DEAD &&
425             timeout > getSysMillis())
426         {
427             StackContext.yield();
428         }
429        
430         return getSysMillis() - start_time;
431     }
432    
433     /**
434      * Restarts the thread's execution from the very
435      * beginning.  Suspended and dead threads are not
436      * resumed, but upon resuming, they will restart.
437      */
438     public final void restart()
439     {
440         debug (StackThread) writefln("Restarting %s", toString);
441        
442         //Each state needs to be handled carefully
443         switch(state)
444         {
445             case THREAD_STATE.READY:
446                 //If we are ready,
447                 context.restart();
448             break;
449            
450             case THREAD_STATE.RUNNING:
451                 //Reset the thread.
452                 transition(THREAD_STATE.READY);
453             break;
454            
455             case THREAD_STATE.DEAD:
456                 //Dead threads become suspended
457                 context.restart();
458                 state = THREAD_STATE.SUSPENDED;
459             break;
460            
461             case THREAD_STATE.SUSPENDED:
462                 //Suspended threads stay suspended
463                 context.restart();
464             break;
465
466             default: assert(false);
467         }
468     }
469    
470     /**
471      * Grabs the thread's priority.  Intended for use
472      * as a property.
473      *
474      * Returns: The stack thread's priority.
475      */
476     public final priority_t priority()
477     {
478         return m_priority;
479     }
480    
481     /**
482      * Sets the stack thread's priority.  Used to either
483      * reschedule or reset the thread.  Changes do not
484      * take effect until the next round of scheduling.
485      *
486      * Params:
487      *  p = The new priority for the thread
488      *
489      * Returns:
490      *  The new priority for the thread.
491      */
492     public final priority_t priority(priority_t p)
493     {
494         //Update priority
495         if(sched_state == SCHEDULER_STATE.READY &&
496             state == THREAD_STATE.READY)
497         {
498             next_slice.remove(this);
499             m_priority = p;
500             next_slice.add(this);
501         }
502        
503         return m_priority = p;
504     }
505    
506     /**
507      * Returns: The state of this thread.
508      */
509     public final THREAD_STATE getState()
510     {
511         return state;
512     }
513    
514     /**
515      * Returns: True if the thread is ready to run.
516      */
517     public final bool ready()
518     {
519         return state == THREAD_STATE.READY;
520     }
521    
522     /**
523      * Returns: True if the thread is currently running.
524      */
525     public final bool running()
526     {
527         return state == THREAD_STATE.RUNNING;
528     }
529    
530     /**
531      * Returns: True if the thread is dead.
532      */
533     public final bool dead()
534     {
535         return state == THREAD_STATE.DEAD;
536     }
537    
538     /**
539      * Returns: True if the thread is not dead.
540      */
541     public final bool alive()
542     {
543         return state != THREAD_STATE.DEAD;
544     }
545    
546     /**
547      * Returns: True if the thread is paused.
548      */
549     public final bool paused()
550     {
551         return state == THREAD_STATE.SUSPENDED;
552     }
553
554     /**
555      * Creates a stack thread without a function pointer
556      * or delegate.  Used when a user overrides the stack
557      * thread class.
558      */
559     protected this
560     (
561         priority_t priority = DEFAULT_STACKTHREAD_PRIORITY,
562         size_t stack_size = DEFAULT_STACK_SIZE
563     )
564     {
565         this.context = new StackContext(&m_proc, stack_size);
566         this.m_priority = priority;
567        
568         //Schedule the thread
569         st_schedule(this);
570        
571         debug (StackThread) writefln("Created thread, %s", toString);
572     }
573    
574     /**
575      * Run the stack thread.  This method may be overloaded
576      * by classes which inherit from stack thread, as an
577      * alternative to passing delegates.
578      *
579      * Throws: Anything.
580      */
581     protected void run()
582     {
583         m_delegate();
584     }
585    
586     // Heap information
587     private StackThread parent = null;
588     private StackThread left = null;
589     private StackThread right = null;
590
591     // The thread's priority
592     private priority_t m_priority;
593
594     // The state of the thread
595     private THREAD_STATE state;
596
597     // The thread's context
598     private StackContext context;
599
600     //Delegate handler
601     private void function() m_function;
602     private void delegate() m_delegate;
603     private void delegator() { m_function(); }
604    
605     //My procedure
606     private final void m_proc()
607     {
608         try
609         {
610             debug (StackThread) writefln("Starting %s", toString);
611             run;
612         }
613         catch(Object o)
614         {
615             debug (StackThread) writefln("Got a %s exception from %s", o.toString, toString);
616             throw o;
617         }
618         finally
619         {
620             debug (StackThread) writefln("Finished %s", toString);
621             state = THREAD_STATE.DEAD;
622         }
623     }
624
625     /**
626      * Used to change the state of a running thread
627      * gracefully
628      */
629     private final void transition(THREAD_STATE next_state)
630     {
631         state = next_state;
632         StackContext.yield();
633     }
634 }
635
636
637
638 /******************************************************
639  * The STPriorityQueue is used by the scheduler to
640  * order the objects in the stack threads.  For the
641  * moment, the implementation is binary heap, but future
642  * versions might use a binomial heap for performance
643  * improvements.
644  ******************************************************/
645 private class STPriorityQueue
646 {
647 public:
648    
649     /**
650      * Add a stack thread to the queue.
651      *
652      * Params:
653      *  st = The thread we are adding.
654      */
655     void add(StackThread st)
656     in
657     {
658         assert(st !is null);
659         assert(st);
660         assert(st.parent is null);
661         assert(st.left is null);
662         assert(st.right is null);
663     }
664     body
665     {
666         size++;
667        
668         //Handle trivial case
669         if(head is null)
670         {
671             head = st;
672             return;
673         }
674        
675         //First, insert st
676         StackThread tmp = head;
677         int pos;
678         for(pos = size; pos>3; pos>>>=1)
679         {
680             assert(tmp);
681             tmp = (pos & 1) ? tmp.right : tmp.left;
682         }
683        
684         assert(tmp !is null);
685         assert(tmp);
686        
687         if(pos&1)
688         {
689             assert(tmp.left !is null);
690             assert(tmp.right is null);
691             tmp.right = st;
692         }
693         else
694         {
695             assert(tmp.left is null);
696             assert(tmp.right is null);
697             tmp.left = st;
698         }
699         st.parent = tmp;
700        
701         assert(tmp);
702         assert(st);
703        
704         //Fixup the stack and we're good.
705         bubble_up(st);
706     }
707    
708     /**
709      * Remove a stack thread.
710      *
711      * Params:
712      *  st = The stack thread we are removing.
713      */
714     void remove(StackThread st)
715     in
716     {
717         assert(st);
718         assert(hasThread(st));
719     }
720     out
721     {
722         assert(st);
723         assert(st.left is null);
724         assert(st.right is null);
725         assert(st.parent is null);
726     }
727     body
728     {
729         //Handle trivial case
730         if(size == 1)
731         {
732             assert(st is head);
733            
734             --size;
735            
736             st.parent =
737             st.left =
738             st.right =
739             head = null;
740            
741             return;
742         }
743        
744         //Cycle to the bottom of the heap
745         StackThread tmp = head;
746         int pos;
747         for(pos = size; pos>3; pos>>>=1)
748         {
749             assert(tmp);
750             tmp = (pos & 1) ? tmp.right : tmp.left;
751         }
752         tmp = (pos & 1) ? tmp.right : tmp.left;
753        
754        
755         assert(tmp !is null);
756         assert(tmp.left is null);
757         assert(tmp.right is null);
758        
759         //Remove tmp
760         if(tmp.parent.left is tmp)
761         {
762             tmp.parent.left = null;
763         }
764         else
765         {
766             assert(tmp.parent.right is tmp);
767             tmp.parent.right = null;
768