Download Reference Manual
The Developer's Library for D
About Wiki Forums Source Search Contact

Ticket #576 (closed enhancement: fixed)

Opened 1 year ago

Last modified 6 months ago

Thread Job Queuer Class

Reported by: Nietsnie Assigned to: kris
Priority: normal Milestone: 0.99.6
Component: Tango Version: 0.99 RC3 Xammy
Keywords: Cc:

Description

Here's an example of a Thread Pool class. It's not all documented, or in Tango specific coding style. However, if there's interest, I'll do the final touch-ups.

Attachments

pool.d (3.8 kB) - added by Nietsnie on 08/16/07 16:10:38.
Example of a Thread Pool class.
ThreadPool.d (11.2 kB) - added by Nietsnie on 11/08/07 16:39:24.
ThreadPool2.d (5.1 kB) - added by anders0 on 01/10/08 11:11:46.
ThreadPool.2.d (5.1 kB) - added by anders0 on 03/15/08 07:22:30.
ThreadPool.3.d (5.1 kB) - added by anders0 on 03/16/08 07:23:47.
ThreadPool.4.d (7.1 kB) - added by anders0 on 03/16/08 19:23:39.
ThreadPool_new.d (9.0 kB) - added by anders0 on 03/21/08 10:55:06.
Lets hope this is the last one needed :)

Change History

08/16/07 16:10:38 changed by Nietsnie

  • attachment pool.d added.

Example of a Thread Pool class.

08/28/07 02:17:41 changed by kris

  • owner changed from kris to sean.

would you take a gander at this please, sean?

08/28/07 11:33:48 changed by sean

  • status changed from new to assigned.

Looks pretty nice. Don't worry about cleanup though--I'll want to give the impl a once-over if it's used regardless. I have a question about functionality however:

What should the pool do if all threads are busy? Right now, start() blocks until a thread comes available. An alternative would be to enqueue the job until a thread is available to handle it. Would this be preferable? What I've actually done in the past is provide two methods: queue() and execute(). queue() will enqueue the task if all threads are busy and execute() will start a new thread if necessary to ensure that the task is processed immediately. Perhaps some combination of these behaviors should be provided?

And as an aside, threads now have the ability to set stack size. I added the option a few weeks ago :-) It's the second "sz" argument in the thread ctor.

08/28/07 12:08:12 changed by Nietsnie

Basically in the past (with C) this module would either block until a worker was available, or you could set it to non-blocking and try again later.

With this version, it only blocks. However, adding a "queue" system is a good point, though may be less desirable if you're being inundated with jobs (creating a forever larger queue).

As for the execute point, we do a lot of caching per thread. I didn't add it to this version, but we used to keep a hash that could be used to store data and was passed to each thread. Due to some platform issues, we couldn't use TLS, so that might be less useful here. Adding an execute would basically need to recreate this data for a temporary run thread. For other purposes, however, having an execute probably makes sense.

Regardless, if this module is useful for you, by all means use it. If you need me to add the queue/execute features let me know and I'll get that in for you.

08/28/07 12:41:11 changed by kris

FWIW, my gut feel is to keep it simple? Block on call, with an non-blocking 'failure' option seems like a reasonable way to go. Good point about the TLS stuff, which would tend to suggest that the threads themselves should potentially be instantiated via a delegate -- so that Thread subclasses could be provided instead of just the basic Thread?

2 cents

08/28/07 15:54:45 changed by Nietsnie

I just went over the source again.

It currently has the non-blocking method, and just returns false if it couldn't start the thread.

As for TLS, it would be relatively simple to add a HashMap? or void * to the PooledThread? class that gets passed to the delegate.

08/28/07 22:01:25 changed by sean

Hm... so say you're being inundated with jobs. Is the purpose of the blocking approach to throttle processing? I'm unclear about when it would be preferable to block on the thread pool as opposed to handing the job off with the knowledge that it will be processed eventually. Also, what do you normally do with the non-blocking approach when the pool is busy?

08/29/07 03:25:04 changed by kris

another way to ask the same question might be this: suppose you have a queue to feed the thread-pool ... what should happen if the queue fills up?

I see nietsnie's current design as something with a queue size of zero :)

08/29/07 14:26:18 changed by Nietsnie

Honestly, I don't use the non-blocking functionality. As for the queue part.. you either have an infinite queue size, or you limit it. If you limit it, what happens when you reach that limit, do you block? If you block, then Kris' point is a good one.

09/07/07 13:26:37 changed by sean

I was thinking about this a bit this morning and I decided there's no point in having an arbitrarily length-limited queue. Either the user wants to have a job taken care of "eventually" or he wants to throttle the producer speed based on the consumer rate. In the latter case, there is effectively no difference between having no queue and having a queue with a limit of N. Either way, if the producer is faster than the consumer, the queue will fill and the producer speed will begin throttling on consumer speed. Therefore, I've decided on the following behavior:

// enqueue the task and return
void append( void delegate() dg );

// assign the task to a worker thread -- block until one is available
void assign( void delegate() dg );

// if a worker thread is available, assign the task and return true.
// if a worker thread is not available, return false immediately.
bool tryAssign( void delegate() sg );

The actual implementation for all cases will be a producer/consumer model backed by a queue. The difference comes in what happens before a task is enqueued:

  • 'append' enqueues immediately, notifies one worker, and returns.
  • 'assign' blocks until a 'freeWorkerCount' value is nonzero then enqueues, notifies, and returns.
  • 'tryAssign' returns false if the 'freeWorkerCount' it zero, otherwise it behaves like 'assign'.

Using the producer/consumer model in this way increases the chance that a worker thread will immediately pick up a task and continue running rather than blocking for the remaining portion of its timeslice, and I think the approach described above will keep the implementation fairly simple.

Please let me know if you have any problems with this approach or suggestions for how to improve it. I'll likely be writing the code in over the next few days.

09/07/07 16:28:02 changed by Nietsnie

Looks fine to me.

09/07/07 17:59:05 changed by kris

Yeah, me too. How is the queue size configured?

09/07/07 18:14:03 changed by sean

You mean on construction? I guess a parameter could be passed to indicate the initial number of empty spaces in the queue, if pre-allocation is important. For implementation, I was just going to use a dynamic array and double the array's size when full.

09/07/07 19:01:01 changed by kris

ah, right. I think there has to be an exceptional condition attached, rather than simply permitting uncontrolled queue growth?

If the developer expects the queue to never hold more than 100 entries, they can set an exception limit at (say) 300. That way, at least an exception would be generated where uncontrolled growth occurred ?

09/08/07 10:07:45 changed by sean

Yeah that seems reasonable. Or at least some means of finding out how many jobs the queue holds.

09/30/07 19:11:09 changed by kris

  • milestone set to 0.99.3 RC6.

10/01/07 11:00:43 changed by sean

I hot a bit of a roadblock with this class. The actual implementation is pretty straightforward, but shutting the worker threads down in a reasonable manner is kind of a mess. I need to give this a bit more thought before committing anything.

10/15/07 19:08:07 changed by Nietsnie

Set them all to detached and let the OS kill em! :)

Also, with the implementation, I found a small race condition if you init the library, and then .close it shortly after. (The state isn't set and mutex locked on the underlying thread before the notification is sent).

I'll upload a more current version if you care about it.

10/15/07 21:01:18 changed by kris

I'm interested, Nietsnie: please upload a new one :)

As far as shutdown goes, I suggest the class expose a mechanism for doing exactly this (overridden/intercepted by the developer as necessary). The default impl could, for example, give the queue a specific amount of time after setting a class shutdown-flag or whatever.

Basically, the developer ought to be able to control the shutdown, and be responsible for deciding how the threads involved will interact with a queue request for them quit. A specific implementation could pass the ShutdownHandler? (or whatever) as a ctor argument, such that the worker threads have a fixed point of reference (can't be changed after construction). That ctor argument could perhaps be null for default behavior.

2c

10/16/07 14:04:23 changed by Nietsnie

Uploaded.

I moved everything to use isDaemon = true so that you can just exit the software.. and changed the close functionality to reflect that (it no longer uses joins).

10/16/07 14:58:32 changed by kris

cool, but close() still wait for everything to halt? If that's the case, I think you could perhaps mark the pool as dead, and leave the daemons running until the app exits? Those daemons would thus be treated like any other, at that time.

Isn't there some phrase about 'excising one's inner daemons" ?

(follow-up: ↓ 23 ) 10/16/07 16:40:58 changed by Nietsnie

Sure, perhaps make it an optional boolean flag?

10/16/07 19:35:59 changed by sean

What I'm going to do is set them all to daemon threads but still wait for all pending jobs to complete before app exit. I'll likely also add some means of indicating that pending jobs should be ignored, perhaps by expecting the user to call a purge() op in a module dtor.

(in reply to: ↑ 21 ) 11/05/07 12:22:40 changed by kris

Replying to Nietsnie:

Sure, perhaps make it an optional boolean flag?

Yeah, that sounds fine. Can you update the unittest so that it doesn't use tetra.utils? Also, can we use a factory-function to create instances of PooledThread?, so that users can derive from it and add their own additional attributes (or cached state) during pool creation? I'd like to get this in the next release, Nietsnie, so if you can address this in the next couple of days that would be great. Sorry about the short notice :)

(Sean: I'm adopting this in order to relieve your workload)

11/05/07 12:22:56 changed by kris

  • owner changed from sean to kris.
  • status changed from assigned to new.

11/05/07 12:23:10 changed by kris

  • status changed from new to assigned.

11/07/07 16:19:49 changed by Nietsnie

Yup.

I actually have an update version of this that's templated, with variable arguments. Let me get that up here, and then let me know how'd you like me to change it and I'll make it happen.

11/07/07 18:16:59 changed by kris

thank you

11/08/07 16:39:24 changed by Nietsnie

  • attachment ThreadPool.d added.

11/08/07 16:41:47 changed by Nietsnie

Added new one that's a tad different (it's templated). Also made it more tango-esque (spaces instead of tabs, debug (UnitTest?) etc..).

By the way, the "tetra.util.Test" is just this: http://www.dsource.org/projects/tango.scrapple/browser/trunk/tango/scrapple/util/Test.d

11/10/07 18:48:40 changed by kris

  • milestone changed from 0.99.3 to 0.99.4.

12/17/07 15:35:25 changed by kris

  • milestone changed from 0.99.4 to 0.99.5.

ok, this will be in the next release ;)

01/10/08 11:11:20 changed by anders0

I don't know if someone has done an updated version of this already, but I've made one that uses plain threads and assign, tryAssign and append as suggested by Sean. Oh and also no linear searches for available threads

Currently it uses a stack instead of a queue and theres no example usage but I would be happy to update it, if you want it.

01/10/08 11:11:46 changed by anders0

  • attachment ThreadPool2.d added.

(follow-up: ↓ 35 ) 01/10/08 11:15:31 changed by anders0

A usage example:

void main(char[][] args)
{
    void hashJob(char[] file) {
        try {
            long n = Integer.parse(file);
            Stdout.formatln("fib({}) = {}", n, fib(n));
        }
        // If we don't catch exceptions the thread-pool will still work, but the
        // job will fail silently
        catch (Exception ex) {
            Stdout.formatln("Exception: {}", ex.msg);
        }
    }

    // Create new thread pool with one worker thread per file given
    auto thread_pool = new ThreadPool!(char[])(args.length - 1);
    foreach (file; args[1 .. args.length])
        thread_pool.assign(&hashJob, file);
    thread_pool.finish();
}

01/10/08 21:26:21 changed by kris

very clean, Anders0

02/14/08 17:20:10 changed by svanleent

While mentioning a free pool, perhaps it is best to keep the needle of the current thread and add one to it (or make it 0 if there are no available threads anymore). Another option would be to add working threads to another stack and only retrieve idle threads from that stack if there are no idle threads anymore on the first stack. This enhances scanning quite a bit. Example:

stack 1    | stack 2
-----------+----------- init
idle 1     |
idle 2     |
idle 3     |
-----------+----------- get a thread
idle 2     | run 1
idle 3     |
-----------+----------- get a thread
idle 3     | run 1
           | run 2
-----------+----------- thread 1 stops
idle 3     | idle 1
           | run 2
-----------+----------- get a thread
           | idle 1
           | run 2
           | run 3
-----------+----------- thread 3 stops
           | idle 1
           | run 2
           | idle 3
-----------+----------- get a thread
?          | idle 1
           | run 2
           | idle 3
-------MOVE BACK-------
idle 1     | run 2
idle 3     |
-------CONTINUE0-------
idle 3     | run 2
           | run 1

etc.

(in reply to: ↑ 32 ; follow-up: ↓ 37 ) 02/23/08 23:13:19 changed by kris

Replying to anders0:

A usage example: {{{ void main(char[][] args) { void hashJob(char[] file) { try { long n = Integer.parse(file); Stdout.formatln("fib({}) = {}", n, fib(n)); } // If we don't catch exceptions the thread-pool will still work, but the // job will fail silently catch (Exception ex) { Stdout.formatln("Exception: {}", ex.msg); } } // Create new thread pool with one worker thread per file given auto thread_pool = new ThreadPool?!(char[])(args.length - 1); foreach (file; args[1 .. args.length]) thread_pool.assign(&hashJob, file); thread_pool.finish(); } }}}

We can't get this to work ... it fails in the Condition.wait() function. Did you have this running before?

03/04/08 04:20:36 changed by Jim Panic

  • milestone changed from 0.99.5 to 0.99.6.

(in reply to: ↑ 35 ; follow-up: ↓ 38 ) 03/15/08 07:22:09 changed by anders0

Replying to kris:

Replying to anders0:

A usage example: {{{ void main(char[][] args) { void hashJob(char[] file) { try { long n = Integer.parse(file); Stdout.formatln("fib({}) = {}", n, fib(n)); } // If we don't catch exceptions the thread-pool will still work, but the // job will fail silently catch (Exception ex) { Stdout.formatln("Exception: {}", ex.msg); } } // Create new thread pool with one worker thread per file given auto thread_pool = new ThreadPool?!(char[])(args.length - 1); foreach (file; args[1 .. args.length]) thread_pool.assign(&hashJob, file); thread_pool.finish(); } }}}

We can't get this to work ... it fails in the Condition.wait() function. Did you have this running before?

A last-minute change screwed it up i think :(

Line 31 has 'done.store(true);' which is obviously wrong as the pool isn't done right after construction :)

Also found another bug, i will upload a fixed file (just 2 lines changed)

03/15/08 07:22:30 changed by anders0

  • attachment ThreadPool.2.d added.

(in reply to: ↑ 37 ; follow-up: ↓ 39 ) 03/15/08 18:57:04 changed by kris

Replying to anders0:

A last-minute change screwed it up i think :( Line 31 has 'done.store(true);' which is obviously wrong as the pool isn't done right after construction :) Also found another bug, i will upload a fixed file

Cool, thanks. Though, it still just hangs. What O/S are you testing this on?

(in reply to: ↑ 38 ) 03/16/08 07:23:15 changed by anders0

Replying to kris:

Replying to anders0:

A last-minute change screwed it up i think :( Line 31 has 'done.store(true);' which is obviously wrong as the pool isn't done right after construction :) Also found another bug, i will upload a fixed file

Cool, thanks. Though, it still just hangs. What O/S are you testing this on?

I will update the file once again, this time only append was correct. I guess i should have some unittests, but now the test code works with assign, tryAssign and append (on my computer at least.. single core, ubuntu 32 bit)

03/16/08 07:23:47 changed by anders0

  • attachment ThreadPool.3.d added.

(follow-up: ↓ 41 ) 03/16/08 14:31:55 changed by kris

Nice ... that version works on Win32 also, using the example you gave earlier. Would you mind adding some further documentation about how to use the pool and so on?

(in reply to: ↑ 40 ) 03/16/08 18:48:07 changed by anders0

No, i wouldn't mind. How about the usage? Anything that needs improving, or are you just going to add that your selves?

03/16/08 19:17:52 changed by kris

Looks pretty good to me :)

We might change the stack to a queue, or perhaps not. Generally looks good though (IMO)

03/16/08 19:23:39 changed by anders0

  • attachment ThreadPool.4.d added.

03/16/08 19:25:28 changed by anders0

Great, here you go :)

Line 240 has a Thread.yield() that i'm pretty sure is unnecessary but it should be tested on an actual multi-core processor.

03/17/08 11:32:20 changed by Nietsnie

I'll do some testing later today on my quad-core.

(follow-up: ↓ 46 ) 03/20/08 14:04:45 changed by Nietsnie

At first glance, everything seems well, passed my basic tests. However, I ran about 100 iterations of the unittests which are include in the other ThreadPool?, and managed to get to deadlock.

(in reply to: ↑ 45 ) 03/21/08 09:59:35 changed by anders0

Replying to Nietsnie:

At first glance, everything seems well, passed my basic tests. However, I ran about 100 iterations of the unittests which are include in the other ThreadPool?, and managed to get to deadlock.

I'm trying to find the bug, but i cant really find anything wrong even though i can reproduce the deadlock. I will keep debugging, it's bound to pop up sooner or later :)

03/21/08 10:53:51 changed by anders0

Well i fixed it just a few minutes after posting that -- I'm not sure why i need locks around notify some places and not others though.

But here is the diff, and the new file will be attached.

--- ThreadPool_old.d    2008-03-17 00:49:54.000000000 +0100
+++ ThreadPool_new.d    2008-03-21 15:52:42.000000000 +0100
@@ -149,7 +149,9 @@
     void shutdown()
     {
         done.store(true);
+        m.lock();
         q.length = 0;
+        m.unlock();
         poolActivity.notifyAll();
         foreach (thread; pool)
             thread.join();
@@ -240,11 +242,14 @@
             active_jobs.decrement();

             // Tell the pool that we are done with something
+            m.lock();
             workerActivity.notify();
-            Thread.yield();
+            m.unlock();
         }
         // Tell the pool that we are now done
+        m.lock();
         workerActivity.notify();
+        m.unlock();
     }
 }

03/21/08 10:55:06 changed by anders0

  • attachment ThreadPool_new.d added.

Lets hope this is the last one needed :)

(follow-up: ↓ 49 ) 03/21/08 17:27:06 changed by sean

Two comments. First, since condvars have to be enclosed in a mutex lock anyway, I'm not sure there's a need for the Atomic stuff. Just be sure you're holding the locks long enough, etc (I didn't inspect the code closely enough to be sure that it's all fine). Also, Mutex can be used with the synchronized statement like so:

Mutex m = new Mutex;
Condition c = new Condition( m );

synchronized( m )
{
    c.wait(); // atomically unlocks m and waits, then relocks m before returning
}

This should simplify the code in that ThreadPool? vs. using lock and unlock manually.

(in reply to: ↑ 48 ) 03/25/08 18:12:56 changed by Nietsnie

Replying to sean:

Two comments. First, since condvars have to be enclosed in a mutex lock anyway, I'm not sure there's a need for the Atomic stuff. Just be sure you're holding the locks long enough, etc (I didn't inspect the code closely enough to be sure that it's all fine).

Yea, I haven't taken the time to follow the atomic locking and conditional variables to make sure things make sense either. I just ran it through some testing. I'll try the latest one on my quad-core later today and see how it goes.

I suppose doing a quick code-review and making sure everything makes sense might be a good idea to do as well.

04/14/08 13:21:26 changed by larsivi

Any updates on this? How is the version in SVN compared to what is discussed above? I want this ticket closed.

04/18/08 00:55:31 changed by kris

  • status changed from assigned to closed.
  • resolution set to fixed.

has been in tango.core for a little while, and seems to be coping well. I think it's "good enough" at this point in time, so other tickets can be opened if there are issues with it?

Big thank-you to Anders0