Forking a thread in Smalltalk is easy, wrap something in a block and call fork. It's so easy that you can easily become fork happy and get yourself into trouble by launching too many processes. About 6 months ago, my excessive background forking in a Seaside web app finally starting hurting; I'd have images that seemed to lock up for no reason using 100% CPU and they'd get killed by monitoring processes causing lost sessions. There was a reason; the process scheduler in Squeak/Pharo just isn't built to handle a crazy amount of threads and everything will slow to a crawl if you launch too many.
I had a search result page in Seaside that launched about 10 background threads for every page render and then the page would poll for the results of those computations, collect up any results found, and AJAX them into the page. Each one needs to run in its own thread because any one of them may hang up and take upwards of 30 seconds to finish its work even though the average time would be under a second. I don't want all the results being stalled waiting for the one slow result, so it made sense to have each on its own thread. This worked for quite a while with nothing but simple forking, but eventually, the load rose to the point that I needed a thread pool so I could limit the number of threads actually doing the work to a reasonable amount. So, let's write a thread pool.
First, we'll need a unit of work to put on the thread, similar to a block or a future. Something we can return right away when an item is queued that can be checked for a result or used as a future result. We'll start by declaring a worker class with a few instance variables I know I'll need. A block for the actual work to be done, an expiration time to know if the work still needs to be done, a value cache to avoid doing the work more than once, a lock to block a calling thread treating the worker as a future value, and an error in case of failure to store the exception to be re-thrown on the main thread.
Object subclass: #ThreadWorker
instanceVariableNames: 'block expires value lock error'
classVariableNames: ''
poolDictionaries: ''
category: 'ThreadPool'
I'll also want a few constructors for creating them, one that just takes a block, and one that takes a block and an expiration time. For my app, if I don't have results within a certain amount of time, I just don't care anymore, so I'd rather have the work item expire and skip the work.
ThreadWorker class>>on: aBlock
^ self new
block: aBlock;
yourself
ThreadWorker class>>on: aBlock expires: aTime
^ self new
block: aBlock;
expires: aTime;
yourself
On the instance side let's initialize the instance and set up the necessary accessors for the constructors above.
ThreadWorker>>initialize
super initialize.
lock := Semaphore new
ThreadWorker>>block: aBlock
block := aBlock
ThreadWorker>>expires: aTime
expires := aTime
Now, since this is for use in a thread pool, I'll want a non-blocking method of forcing evaluation of the work so the thread worker isn't blocked. So if the work hasn't expired, evaluate the block and store any errors, then signal the Semaphore so any waiting clients are unblocked.
ThreadWorker>>evaluate
DateAndTime now < expires ifTrue:
[ [ value := block value ]
on: Error
do: [ :err | error := err ] ].
lock signal
I'll also want a possibly blocking value method for retrieving the results of the work. If you call this right away, then it'll act like a future and block the caller until the queue has had time to process it using the evaluate method above.
ThreadWorker>>value
lock isSignaled ifFalse: [ lock wait ].
"rethrow any error from worker thread on calling thread"
error ifNotNil:
[ error
privHandlerContext: thisContext;
signal ].
^ value
But if you want to poll for a result, we'll need a method to see if the work has been done yet. We can do this by checking the state of the Semaphore; the worker has a value only after the Semaphore has been signaled.
ThreadWorker>>hasValue
^ lock isSignaled
That's all we need for the worker. Now we need a queue to make use of it. So we'll declare the class with some necessary instance variables and initialize them to some reasonable defaults along with some accessors to adjust the pool sizes. Now, since a thread pool is generally, by nature, something you only want one of (there are always exceptions, but I prefer simplicity) then we'll just rely on Smalltalk itself to ensure only one pool by making all of the pool methods class methods and the ThreadPool the only instance. I'll use a shared queue to handle the details of locking to ensure the workers share the pool of work safely.
Object subclass: #ThreadPool
instanceVariableNames: ''
classVariableNames: 'MaxPoolSize MinPoolSize
PoolManager QueueWorkers WorkQueue'
poolDictionaries: ''
category: 'ThreadPool'
ThreadPool class>>initialize
"self initialize"
WorkQueue := SharedQueue2 new.
QueueWorkers := OrderedCollection new.
MinPoolSize := 5.
MaxPoolSize := 15.
Smalltalk addToStartUpList: self
ThreadPool class>>maxPoolSize: aSize
MaxPoolSize := aSize
ThreadPool class>>minPoolSize: aSize
MinPoolSize := aSize
Once you have a pool, you need to manage how many threads are actually in it and have it adjust to adapt to the workload. There are two main questions we need to ask ourselves to do this: are there enough threads or are there too many threads given the current workload. Let's answer those questions.
ThreadPool class>>isPoolTooBig
^ QueueWorkers size > MinPoolSize
and: [ WorkQueue size < QueueWorkers size ]
ThreadPool class>>isPoolTooSmall
^ QueueWorkers size < MinPoolSize
or: [ WorkQueue size > QueueWorkers size
and: [ QueueWorkers size < MaxPoolSize ] ]
We also need a method for a worker to grab a queued work item and work it, and we don't ever want this to error out killing a worker thread since the worker thread should trap any error and re-throw them to the queuing thread. But just to be safe, we'll wrap it.
ThreadPool class>>processQueueElement
[ WorkQueue next evaluate ]
on: Error
do: [ ]
Now that workers have something to do, we'll need to be able to start and stop worker threads in order to increase or decrease the working thread count. Once a worker is started, we'll want it to simply work forever and the shared queue will handle blocking the workers when there's no work to do. We'll also want the worker threads running in the background so they aren't taking priority over foreground work like serving HTTP requests.
ThreadPool class>>startWorker
QueueWorkers add: ([ [ self processQueueElement ] repeat ]
forkAt: Processor systemBackgroundPriority
named: 'pool worker')
To kill a worker, we'll just queue a job to kill the active process, which will be whatever worker picks up the job. This is a simple way to ensure we don't kill a worker that is doing something important. This requires actually using the queue, so a quick couple methods to actually queue a job and some extensions on BlockClosure/BlockContext to make using the queue as simple as forking.
ThreadPool class>>queueWorkItem: aBlock expiresAt: aTimestamp
| worker |
worker := ThreadWorker on: aBlock expires: aTimestamp.
WorkQueue nextPut: worker.
^ worker
ThreadPool class>>queueWorkItem: aBlock expiresAt: aTimestamp
session: aSession
| worker |
"a special method for Seaside2.8 so the worker threads
still have access to the current session"
worker := ThreadWorker
on:
[ WACurrentSession
use: aSession
during: aBlock ]
expires: aTimestamp.
WorkQueue nextPut: worker.
^ worker
BlockClosure>>queueWorkAndExpireIn: aDuration
^ ThreadPool
queueWorkItem: self
expiresAt: DateAndTime now + aDuration
BlockClosure>>queueWorkAndExpireIn: aDuration session: aSession
"a special method for Seaside2.8 so the worker threads
still have access to the current session"
^ ThreadPool
queueWorkItem: self
expiresAt: DateAndTime now + aDuration
session: aSession
And now we're able to queue a job to kill a thread, making sure to double-check at time of actual execution that the pool is still too big and the thread still needs to die.
ThreadPool class>>killWorker
"just queue a task that kill the activeProcess,
which will be the worker that picks it up"
[ self isPoolTooBig ifTrue:
[ (QueueWorkers remove: Processor activeProcess) terminate ] ]
queueWorkAndExpireIn: 10 minutes
Of course, something has to decide when to increase the size of the queue and when to decrease it, and it needs to a method to do so.
ThreadPool class>>adjustThreadPoolSize
"starting up processes too fast is dangerous
and wasteful, ensure a reasonable delay"
1 second asDelay wait.
self isPoolTooSmall
ifTrue: [ self startWorker ]
ifFalse: [ self isPoolTooBig ifTrue: [ self killWorker ] ]
We need to ensure the thread pool is always up and running, and that something is managing it, so we'll hook the system startUp routine and kick off the minimum number of workers and start a single manager process to continually adjust the pool size to match the workload.
ThreadPool class>>startUp
"self startUp"
self shutDown.
MinPoolSize timesRepeat: [ self startWorker ].
PoolManager := [ [ self adjustThreadPoolSize ] repeat ]
forkAt: Processor systemBackgroundPriority
named: 'pool manager'
And clean up everything on shutdown so every time the image starts up we're starting from a clean slate.
ThreadPool class>>shutDown
"self shutDown"
WorkQueue := SharedQueue2 new.
PoolManager ifNotNil: [ PoolManager terminate ].
QueueWorkers do: [ :each | each terminate ].
QueueWorkers removeAll.
And that's it, a simple thread pool using a shared queue to do all the dirty work of dealing with concurrency. I now queue excessively without suffering the punishment entailed by forking excessively. Now rather than...
[ self someTaskToDo ] fork
I just do...
[ self someTaskToDo ] queueWorkAndExpireIn: 25 seconds
Or in Seaside...
[ self someTaskToDo ] queueWorkAndExpireIn: 25 seconds session: self session
And my app is running like a champ again, no more hanging images due to forking like a drunken sailor.
UPDATE: For the source, see the ThreadPool package on SqueakSource.