Forking a thread in Smalltalk is easy, wrap something in a block and call fork. It's so easy that you can easily become fork happy and get yourself into trouble by launching too may processes. About 6 months ago, my excessive background forking in a Seaside web app finally starting hurting; I'd have images that seemed to lock up for no reason using 100% CPU and they'd get killed by monitoring processes causing lost sessions. There was a reason; the process scheduler in Squeak/Pharo just isn't built to handle a crazy amount of threads and everything will slow to a crawl if you launch too many.
I had a search result page in Seaside that launched about 10 background threads for every page render and then the page would poll for the results of those computations, collect up any results found, and AJAX them into the page. Each one needs to run in its own thread because any one of them may hang up and take upwards of 30 seconds to finish its work even though the average time would be under a second. I don't want all the results being stalled waiting for the one slow result, so it made sense to have each on its own thread. This worked for quite a while with nothing but simple forking, but eventually, the load rose to the point that I needed a thread pool so I could limit the number of threads actually doing the work to a reasonable amount. So, let's write a thread pool.
First we'll need a unit of work to put on the thread, similar to a block or a future. Something we can return right away when an item is queued that can be checked for a result or used as a future result. We'll start by declaring a worker class with a few instance variables I know I'll need. A block for the actual work to be done, an expiration time to know if the work still needs done, a value cache to avoid doing the work more than once, a lock to block a calling thread treating the worker as a future value, and an error in case of failure to store the exception to be re-thrown on the main thread.
Object subclass: #ThreadWorker instanceVariableNames: 'block expires value lock error' classVariableNames: '' poolDictionaries: '' category: 'ThreadPool'
I'll also want a few constructors for creating them, one that just takes a block, and one that takes a block and an expiration time. For my app, if I don't have results within a certain amount of time, I just don't care anymore, so I'd rather have the work item expire and skip the work.
ThreadWorker class>>on: aBlock ^ self new block: aBlock; yourself ThreadWorker class>>on: aBlock expires: aTime ^ self new block: aBlock; expires: aTime; yourself
On the instance side let's initialize the instance and setup the necessary accessors for the constructors above.
ThreadWorker>>initialize super initialize. lock := Semaphore new ThreadWorker>>block: aBlock block := aBlock ThreadWorker>>expires: aTime expires := aTime
Now, since this is for use in a thread pool, I'll want a non-blocking method of forcing evaluation of the work so the thread worker isn't blocked. So if the work hasn't expired, evaluate the block and store any errors, then signal the Semaphore so any waiting clients are unblocked.
ThreadWorker>>evaluate DateAndTime now < expires ifTrue: [ [ value := block value ] on: Error do: [ :err | error := err ] ]. lock signal
I'll also want a possibly blocking value method for retrieving the results of the work. If you call this right away, then it'll act like a future and block the caller until the queue has had time to process it using the evaluate method above.
ThreadWorker>>value lock isSignaled ifFalse: [ lock wait ]. "rethrow any error from worker thread on calling thread" error ifNotNil: [ error privHandlerContext: thisContext; signal ]. ^ value
But if you want to poll for a result, we'll need a method to see if the work has been done yet. We can do this by checking the state of the Semaphore; the worker has a value only after the Semaphore has been signaled.
ThreadWorker>>hasValue ^ lock isSignaled
That's all we need for the worker. Now we need a queue to make use of it. So we'll declare the class with some necessary instance variables and initialize them to some reasonable defaults along with some accessors to adjust the pool sizes. Now, since a thread pool is generally, by nature, something you only want one of (there are always exceptions, but I prefer simplicity) then we'll just rely on Smalltalk itself to ensure only one pool by making all of the pool methods class methods and the ThreadPool the only instance. I'll use a shared queue to handle the details of locking to ensure the workers share the pool of work safely.
Object subclass: #ThreadPool instanceVariableNames: '' classVariableNames: 'MaxPoolSize MinPoolSize PoolManager QueueWorkers WorkQueue' poolDictionaries: '' category: 'ThreadPool' ThreadPool class>>initialize "self initialize" WorkQueue := SharedQueue2 new. QueueWorkers := OrderedCollection new. MinPoolSize := 5. MaxPoolSize := 15. Smalltalk addToStartUpList: self ThreadPool class>>maxPoolSize: aSize MaxPoolSize := aSize ThreadPool class>>minPoolSize: aSize MinPoolSize := aSize
Once you have a pool, you need to manage how many threads are actually in it and have it adjust to adapt to the workload. There are two main questions we need to ask ourselves to do this: are there enough threads or are there too many threads given the current workload. Let's answer those questions.
ThreadPool class>>isPoolTooBig ^ QueueWorkers size > MinPoolSize and: [ WorkQueue size < QueueWorkers size ] ThreadPool class>>isPoolTooSmall ^ QueueWorkers size < MinPoolSize or: [ WorkQueue size > QueueWorkers size and: [ QueueWorkers size < MaxPoolSize ] ]
We also need a method for a worker to grab a queued work item and work it, and we don't ever want this to error out killing a worker thread since the worker thread should trap any error and re-throw them to the queuing thread. But just to be safe, we'll wrap it.
ThreadPool class>>processQueueElement [ WorkQueue next evaluate ] on: Error do: [ ]
Now that workers have something to do, we'll need to be able to start and stop worker threads in order to increase or decrease the working thread count. Once a worker is started, we'll want it to simply work forever and the shared queue will handle blocking the workers when there's no work to do. We'll also want the worker threads running in the background so they aren't taking priority over foreground work like serving HTTP requests.
ThreadPool class>>startWorker QueueWorkers add: ([ [ self processQueueElement ] repeat ] forkAt: Processor systemBackgroundPriority named: 'pool worker')
To kill a worker, we'll just queue a job to kill the active process, which will be whatever worker picks up the job. This is a simple way to ensure we don't kill a worker that is doing something important. This requires actually using the queue, so a quick couple methods to actually queue a job and some extensions on BlockClosure/BlockContext to make using the queue as simple as forking.
ThreadPool class>>queueWorkItem: aBlock expiresAt: aTimestamp | worker | worker := ThreadWorker on: aBlock expires: aTimestamp. WorkQueue nextPut: worker. ^ worker ThreadPool class>>queueWorkItem: aBlock expiresAt: aTimestamp session: aSession | worker | "a special method for Seaside2.8 so the worker threads still have access to the current session" worker := ThreadWorker on: [ WACurrentSession use: aSession during: aBlock ] expires: aTimestamp. WorkQueue nextPut: worker. ^ worker BlockClosure>>queueWorkAndExpireIn: aDuration ^ ThreadPool queueWorkItem: self expiresAt: DateAndTime now + aDuration BlockClosure>>queueWorkAndExpireIn: aDuration session: aSession "a special method for Seaside2.8 so the worker threads still have access to the current session" ^ ThreadPool queueWorkItem: self expiresAt: DateAndTime now + aDuration session: aSession
And now we're able to queue a job to kill a thread, making sure to double check at time of actual execution that the pool is still too big and the thread still needs to die.
ThreadPool class>>killWorker "just queue a task that kill the activeProcess, which will be the worker that picks it up" [ self isPoolTooBig ifTrue: [ (QueueWorkers remove: Processor activeProcess) terminate ] ] queueWorkAndExpireIn: 10 minutes
Of course, something has to decide when to increase the size of the queue and when to decrease it, and it needs to a method to do so.
ThreadPool class>>adjustThreadPoolSize "starting up processes too fast is dangerous and wasteful, ensure a reasonable delay" 1 second asDelay wait. self isPoolTooSmall ifTrue: [ self startWorker ] ifFalse: [ self isPoolTooBig ifTrue: [ self killWorker ] ]
We need to ensure the thread pool is always up and running, and that something is managing it, so we'll hook the system startUp routine and kick off the minimum number of workers and start a single manager process to continually adjust the pool size to match the workload.
ThreadPool class>>startUp "self startUp" self shutDown. MinPoolSize timesRepeat: [ self startWorker ]. PoolManager := [ [ self adjustThreadPoolSize ] repeat ] forkAt: Processor systemBackgroundPriority named: 'pool manager'
And cleanup everything on shutdown so every time the image starts up we're starting from a clean slate.
ThreadPool class>>shutDown "self shutDown" WorkQueue := SharedQueue2 new. PoolManager ifNotNil: [ PoolManager terminate ]. QueueWorkers do: [ :each | each terminate ]. QueueWorkers removeAll.
And that's it, a simple thread pool using a shared queue to do all the dirty work of dealing with concurrency. I now queue excessively without suffering the punishment entailed by forking excessively. Now rather than...
[ self someTaskToDo ] fork
I just do...
[ self someTaskToDo ] queueWorkAndExpireIn: 25 seconds
Or in Seaside...
[ self someTaskToDo ] queueWorkAndExpireIn: 25 seconds session: self session
And my app is running like a champ again, no more hanging images due to forking like a drunken sailor.
UPDATE: For the source, see the ThreadPool package on SqueakSource.