Login

A Simple Thread Pool for Smalltalk

Forking a thread in Smalltalk is easy, wrap something in a block and call fork. It's so easy that you can easily become fork happy and get yourself into trouble by launching too may processes. About 6 months ago, my excessive background forking in a Seaside web app finally starting hurting; I'd have images that seemed to lock up for no reason using 100% CPU and they'd get killed by monitoring processes causing lost sessions. There was a reason; the process scheduler in Squeak/Pharo just isn't built to handle a crazy amount of threads and everything will slow to a crawl if you launch too many.

I had a search result page in Seaside that launched about 10 background threads for every page render and then the page would poll for the results of those computations, collect up any results found, and AJAX them into the page. Each one needs to run in its own thread because any one of them may hang up and take upwards of 30 seconds to finish its work even though the average time would be under a second. I don't want all the results being stalled waiting for the one slow result, so it made sense to have each on its own thread. This worked for quite a while with nothing but simple forking, but eventually, the load rose to the point that I needed a thread pool so I could limit the number of threads actually doing the work to a reasonable amount. So, let's write a thread pool.

First we'll need a unit of work to put on the thread, similar to a block or a future. Something we can return right away when an item is queued that can be checked for a result or used as a future result. We'll start by declaring a worker class with a few instance variables I know I'll need. A block for the actual work to be done, an expiration time to know if the work still needs done, a value cache to avoid doing the work more than once, a lock to block a calling thread treating the worker as a future value, and an error in case of failure to store the exception to be re-thrown on the main thread.

Object subclass: #ThreadWorker
    instanceVariableNames: 'block expires value lock error'
    classVariableNames: ''
    poolDictionaries: ''
    category: 'ThreadPool'

I'll also want a few constructors for creating them, one that just takes a block, and one that takes a block and an expiration time. For my app, if I don't have results within a certain amount of time, I just don't care anymore, so I'd rather have the work item expire and skip the work.

ThreadWorker class>>on: aBlock
    ^ self new
        block: aBlock;
        yourself 

ThreadWorker class>>on: aBlock expires: aTime
    ^ self new
        block: aBlock;
        expires: aTime;
        yourself

On the instance side let's initialize the instance and setup the necessary accessors for the constructors above.

ThreadWorker>>initialize
    super initialize.
    lock := Semaphore new 

ThreadWorker>>block: aBlock
    block := aBlock 

ThreadWorker>>expires: aTime
    expires := aTime

Now, since this is for use in a thread pool, I'll want a non-blocking method of forcing evaluation of the work so the thread worker isn't blocked. So if the work hasn't expired, evaluate the block and store any errors, then signal the Semaphore so any waiting clients are unblocked.

ThreadWorker>>evaluate
    DateAndTime now < expires ifTrue: 
        [ [ value := block value ] 
            on: Error
            do: [ :err | error := err ] ].
    lock signal

I'll also want a possibly blocking value method for retrieving the results of the work. If you call this right away, then it'll act like a future and block the caller until the queue has had time to process it using the evaluate method above.

ThreadWorker>>value
    lock isSignaled ifFalse: [ lock wait ].
    "rethrow any error from worker thread on calling thread"
    error ifNotNil: 
        [ error
            privHandlerContext: thisContext;
            signal ].
    ^ value

But if you want to poll for a result, we'll need a method to see if the work has been done yet. We can do this by checking the state of the Semaphore; the worker has a value only after the Semaphore has been signaled.

ThreadWorker>>hasValue
    ^ lock isSignaled

That's all we need for the worker. Now we need a queue to make use of it. So we'll declare the class with some necessary instance variables and initialize them to some reasonable defaults along with some accessors to adjust the pool sizes. Now, since a thread pool is generally, by nature, something you only want one of (there are always exceptions, but I prefer simplicity) then we'll just rely on Smalltalk itself to ensure only one pool by making all of the pool methods class methods and the ThreadPool the only instance. I'll use a shared queue to handle the details of locking to ensure the workers share the pool of work safely.

Object subclass: #ThreadPool
    instanceVariableNames: ''
    classVariableNames: 'MaxPoolSize MinPoolSize 
        PoolManager QueueWorkers WorkQueue'
    poolDictionaries: ''
    category: 'ThreadPool'

ThreadPool class>>initialize
    "self initialize"
    WorkQueue := SharedQueue2 new.
    QueueWorkers := OrderedCollection new.
    MinPoolSize := 5.
    MaxPoolSize := 15.
    Smalltalk addToStartUpList: self 

ThreadPool class>>maxPoolSize: aSize
    MaxPoolSize := aSize 

ThreadPool class>>minPoolSize: aSize
    MinPoolSize := aSize

Once you have a pool, you need to manage how many threads are actually in it and have it adjust to adapt to the workload. There are two main questions we need to ask ourselves to do this: are there enough threads or are there too many threads given the current workload. Let's answer those questions.

ThreadPool class>>isPoolTooBig
    ^ QueueWorkers size > MinPoolSize 
        and: [ WorkQueue size < QueueWorkers size ]

ThreadPool class>>isPoolTooSmall
    ^ QueueWorkers size < MinPoolSize 
        or: [ WorkQueue size > QueueWorkers size 
            and: [ QueueWorkers size < MaxPoolSize ] ]

We also need a method for a worker to grab a queued work item and work it, and we don't ever want this to error out killing a worker thread since the worker thread should trap any error and re-throw them to the queuing thread. But just to be safe, we'll wrap it.

ThreadPool class>>processQueueElement
    [ WorkQueue next evaluate ] 
        on: Error
        do: [  ]

Now that workers have something to do, we'll need to be able to start and stop worker threads in order to increase or decrease the working thread count. Once a worker is started, we'll want it to simply work forever and the shared queue will handle blocking the workers when there's no work to do. We'll also want the worker threads running in the background so they aren't taking priority over foreground work like serving HTTP requests.

ThreadPool class>>startWorker
    QueueWorkers add: ([ [ self processQueueElement ] repeat ] 
            forkAt: Processor systemBackgroundPriority
            named: 'pool worker')

To kill a worker, we'll just queue a job to kill the active process, which will be whatever worker picks up the job. This is a simple way to ensure we don't kill a worker that is doing something important. This requires actually using the queue, so a quick couple methods to actually queue a job and some extensions on BlockClosure/BlockContext to make using the queue as simple as forking.

ThreadPool class>>queueWorkItem: aBlock expiresAt: aTimestamp 
    | worker |
    worker := ThreadWorker on: aBlock expires: aTimestamp.
    WorkQueue nextPut: worker.
    ^ worker 

ThreadPool class>>queueWorkItem: aBlock expiresAt: aTimestamp 
    session: aSession 
    | worker |
    "a special method for Seaside2.8 so the worker threads 
    still have access to the current session"
    worker := ThreadWorker 
        on: 
            [ WACurrentSession 
                use: aSession
                during: aBlock ]
        expires: aTimestamp.
    WorkQueue nextPut: worker.
    ^ worker 

BlockClosure>>queueWorkAndExpireIn: aDuration
    ^ ThreadPool 
        queueWorkItem: self
        expiresAt: DateAndTime now + aDuration

BlockClosure>>queueWorkAndExpireIn: aDuration session: aSession 
    "a special method for Seaside2.8 so the worker threads 
     still have access to the current session"
    ^ ThreadPool 
        queueWorkItem: self
        expiresAt: DateAndTime now + aDuration
        session: aSession

And now we're able to queue a job to kill a thread, making sure to double check at time of actual execution that the pool is still too big and the thread still needs to die.

ThreadPool class>>killWorker
    "just queue a task that kill the activeProcess, 
    which will be the worker that picks it up"

    [ self isPoolTooBig ifTrue: 
       [ (QueueWorkers remove: Processor activeProcess) terminate ] ] 
        queueWorkAndExpireIn: 10 minutes

Of course, something has to decide when to increase the size of the queue and when to decrease it, and it needs to a method to do so.

ThreadPool class>>adjustThreadPoolSize
    "starting up processes too fast is dangerous 
     and wasteful, ensure a reasonable delay"
    1 second asDelay wait.
    self isPoolTooSmall 
        ifTrue: [ self startWorker ]
        ifFalse: [ self isPoolTooBig ifTrue: [ self killWorker ] ]

We need to ensure the thread pool is always up and running, and that something is managing it, so we'll hook the system startUp routine and kick off the minimum number of workers and start a single manager process to continually adjust the pool size to match the workload.

ThreadPool class>>startUp
    "self startUp"
    self shutDown.
    MinPoolSize timesRepeat: [ self startWorker ].
    PoolManager := [ [ self adjustThreadPoolSize ] repeat ] 
        forkAt: Processor systemBackgroundPriority
        named: 'pool manager'

And cleanup everything on shutdown so every time the image starts up we're starting from a clean slate.

ThreadPool class>>shutDown
    "self shutDown"
    WorkQueue := SharedQueue2 new.
    PoolManager ifNotNil: [ PoolManager terminate ].
    QueueWorkers do: [ :each | each terminate ].
    QueueWorkers removeAll.

And that's it, a simple thread pool using a shared queue to do all the dirty work of dealing with concurrency. I now queue excessively without suffering the punishment entailed by forking excessively. Now rather than...

[ self someTaskToDo ] fork

I just do...

[ self someTaskToDo ] queueWorkAndExpireIn: 25 seconds

Or in Seaside...

[ self someTaskToDo ] queueWorkAndExpireIn: 25 seconds session: self session

And my app is running like a champ again, no more hanging images due to forking like a drunken sailor.

UPDATE: For the source, see the ThreadPool package on SqueakSource.

Comments (automatically disabled after 1 year)

Vadim Tsushko 1359 days ago

Nice to see you again, Ramon. And with such a real good post, too ;)

Paul DeBruicker 1359 days ago

Shoud this:

ThreadPool class&gt;&gt;isPoolTooBig
    ^ QueueWorkers size &gt; MinPoolSize 
        and: [ WorkQueue size &lt; QueueWorkers size ]

be this:

ThreadPool class&gt;&gt;isPoolTooBig
    ^ QueueWorkers size &gt; MaxPoolSize 
        and: [ WorkQueue size &lt; QueueWorkers size ]
Ramon Leon 1359 days ago

No, it's correct. Read it as "if there are at least the minimum number of workers but there is less work to do than there are workers, the pool is too big."

isPoolTooSmall checks the max and prevents the queue from ever growing larger than it.

Michael van der Gulik 1358 days ago

Personally, I think it would be worth improving the scheduler in Squeak. Thread pools are a hack around an inefficient threading system.

Squeak's processes are very light-weight; they're just one object and comparable in overhead to a stack frame. If the scheduler was improved, you could fork like a drunken sailer who has no shortage of wenches.

Ramon Leon 1358 days ago

That's a big if. Thread pools are a hack, but they work and it solves my problem quickly with a library rather than a change to a core part of the image that I'd then have to try and get accepted. Plus I don't know enough about the process scheduler to even know why I can't fork a thousand threads, let alone fix it.

This was also something I did a while back, I just need a topic for a post because I felt like writing something again. I just did a file out and started writing around the code.

Colin 1356 days ago

Great to see you back!

As a newcomer to smalltalk your blog is brilliant reading. Ever thought about writing a book in a similar vein?

Cheers

Ramon Leon 1356 days ago

Nah, a blog is the modern version of a self published book. Sitting down to write an actual books is a labor of love, a lot of work for very little return, especially in a market as small as Smalltalk programmers.

Tim Ziebart 1271 days ago

Ramon, is this ready to go for Seaside 3.0? I noticed a possible change needed in "queueWorkItem: aBlock expiresAt: aTimestamp session: aSession" from WACurrentSession to WACurrentRequestContext. If I do this I then get the error in the thread worker 'key not found'. This on the block closure on: do:. Are there other changes needed? I cannot seem to get this working on 3. Thank you.

Ramon Leon 1271 days ago

Two things, no, it's not ready for 3.0, but I can fix that today. Secondly, load the latest code from the ThreadPool package on SqueakSource. I'll get it updated for 3.0 later today or tonight. I'm still using 2.8 so I just haven't gotten around to it.

Tim Ziebart 1270 days ago

Thanks Ramon, I loaded this 2 days ago so, I assume it is the latest. Look forward to update.

about me|good books|popular posts|atom|rss