Login

A Simple Thread Pool for Smalltalk

Forking a thread in Smalltalk is easy, wrap something in a block and call fork. It's so easy that you can easily become fork happy and get yourself into trouble by launching too may processes. About 6 months ago, my excessive background forking in a Seaside web app finally starting hurting; I'd have images that seemed to lock up for no reason using 100% CPU and they'd get killed by monitoring processes causing lost sessions. There was a reason; the process scheduler in Squeak/Pharo just isn't built to handle a crazy amount of threads and everything will slow to a crawl if you launch too many.

I had a search result page in Seaside that launched about 10 background threads for every page render and then the page would poll for the results of those computations, collect up any results found, and AJAX them into the page. Each one needs to run in its own thread because any one of them may hang up and take upwards of 30 seconds to finish its work even though the average time would be under a second. I don't want all the results being stalled waiting for the one slow result, so it made sense to have each on its own thread. This worked for quite a while with nothing but simple forking, but eventually, the load rose to the point that I needed a thread pool so I could limit the number of threads actually doing the work to a reasonable amount. So, let's write a thread pool.

First we'll need a unit of work to put on the thread, similar to a block or a future. Something we can return right away when an item is queued that can be checked for a result or used as a future result. We'll start by declaring a worker class with a few instance variables I know I'll need. A block for the actual work to be done, an expiration time to know if the work still needs done, a value cache to avoid doing the work more than once, a lock to block a calling thread treating the worker as a future value, and an error in case of failure to store the exception to be re-thrown on the main thread.

Object subclass: #ThreadWorker
    instanceVariableNames: 'block expires value lock error'
    classVariableNames: ''
    poolDictionaries: ''
    category: 'ThreadPool'

I'll also want a few constructors for creating them, one that just takes a block, and one that takes a block and an expiration time. For my app, if I don't have results within a certain amount of time, I just don't care anymore, so I'd rather have the work item expire and skip the work.

ThreadWorker class>>on: aBlock
    ^ self new
        block: aBlock;
        yourself 

ThreadWorker class>>on: aBlock expires: aTime
    ^ self new
        block: aBlock;
        expires: aTime;
        yourself

On the instance side let's initialize the instance and setup the necessary accessors for the constructors above.

ThreadWorker>>initialize
    super initialize.
    lock := Semaphore new 

ThreadWorker>>block: aBlock
    block := aBlock 

ThreadWorker>>expires: aTime
    expires := aTime

Now, since this is for use in a thread pool, I'll want a non-blocking method of forcing evaluation of the work so the thread worker isn't blocked. So if the work hasn't expired, evaluate the block and store any errors, then signal the Semaphore so any waiting clients are unblocked.

ThreadWorker>>evaluate
    DateAndTime now < expires ifTrue: 
        [ [ value := block value ] 
            on: Error
            do: [ :err | error := err ] ].
    lock signal

I'll also want a possibly blocking value method for retrieving the results of the work. If you call this right away, then it'll act like a future and block the caller until the queue has had time to process it using the evaluate method above.

ThreadWorker>>value
    lock isSignaled ifFalse: [ lock wait ].
    "rethrow any error from worker thread on calling thread"
    error ifNotNil: 
        [ error
            privHandlerContext: thisContext;
            signal ].
    ^ value

But if you want to poll for a result, we'll need a method to see if the work has been done yet. We can do this by checking the state of the Semaphore; the worker has a value only after the Semaphore has been signaled.

ThreadWorker>>hasValue
    ^ lock isSignaled

That's all we need for the worker. Now we need a queue to make use of it. So we'll declare the class with some necessary instance variables and initialize them to some reasonable defaults along with some accessors to adjust the pool sizes. Now, since a thread pool is generally, by nature, something you only want one of (there are always exceptions, but I prefer simplicity) then we'll just rely on Smalltalk itself to ensure only one pool by making all of the pool methods class methods and the ThreadPool the only instance. I'll use a shared queue to handle the details of locking to ensure the workers share the pool of work safely.

Object subclass: #ThreadPool
    instanceVariableNames: ''
    classVariableNames: 'MaxPoolSize MinPoolSize PoolManager QueueWorkers WorkQueue'
    poolDictionaries: ''
    category: 'ThreadPool'

ThreadPool class>>initialize
    "self initialize"
    WorkQueue := SharedQueue2 new.
    QueueWorkers := OrderedCollection new.
    MinPoolSize := 5.
    MaxPoolSize := 15.
    Smalltalk addToStartUpList: self 

ThreadPool class>>maxPoolSize: aSize
    MaxPoolSize := aSize 

ThreadPool class>>minPoolSize: aSize
    MinPoolSize := aSize

Once you have a pool, you need to manage how many threads are actually in it and have it adjust to adapt to the workload. There are two main questions we need to ask ourselves to do this: are there enough threads or are there too many threads given the current workload. Let's answer those questions.

ThreadPool class>>isPoolTooBig
    ^ QueueWorkers size > MinPoolSize 
        and: [ WorkQueue size < QueueWorkers size ]

ThreadPool class>>isPoolTooSmall
    ^ QueueWorkers size < MinPoolSize 
        or: [ WorkQueue size > QueueWorkers size 
            and: [ QueueWorkers size < MaxPoolSize ] ]

We also need a method for a worker to grab a queued work item and work it, and we don't ever want this to error out killing a worker thread since the worker thread should trap any error and re-throw them to the queuing thread. But just to be safe, we'll wrap it.

ThreadPool class>>processQueueElement
    [ WorkQueue next evaluate ] 
        on: Error
        do: [  ]

Now that workers have something to do, we'll need to be able to start and stop worker threads in order to increase or decrease the working thread count. Once a worker is started, we'll want it to simply work forever and the shared queue will handle blocking the workers when there's no work to do. We'll also want the worker threads running in the background so they aren't taking priority over foreground work like serving HTTP requests.

ThreadPool class>>startWorker
    QueueWorkers add: ([ [ self processQueueElement ] repeat ] 
            forkAt: Processor systemBackgroundPriority
            named: 'pool worker')

To kill a worker, we'll just queue a job to kill the active process, which will be whatever worker picks up the job. This is a simple way to ensure we don't kill a worker that is doing something important. This requires actually using the queue, so a quick couple methods to actually queue a job and some extensions on BlockClosure/BlockContext to make using the queue as simple as forking.

ThreadPool class>>queueWorkItem: aBlock expiresAt: aTimestamp 
    | worker |
    worker := ThreadWorker on: aBlock expires: aTimestamp.
    WorkQueue nextPut: worker.
    ^ worker 

ThreadPool class>>queueWorkItem: aBlock expiresAt: aTimestamp session: aSession 
    | worker |
    "a special method for Seaside2.8 so the worker threads still have access to the current session"
    worker := ThreadWorker 
        on: 
            [ WACurrentSession 
                use: aSession
                during: aBlock ]
        expires: aTimestamp.
    WorkQueue nextPut: worker.
    ^ worker 

BlockClosure>>queueWorkAndExpireIn: aDuration
    ^ ThreadPool 
        queueWorkItem: self
        expiresAt: DateAndTime now + aDuration

BlockClosure>>queueWorkAndExpireIn: aDuration session: aSession 
    "a special method for Seaside2.8 so the worker threads still have access to the current session"
    ^ ThreadPool 
        queueWorkItem: self
        expiresAt: DateAndTime now + aDuration
        session: aSession

And now we're able to queue a job to kill a thread, making sure to double check at time of actual execution that the pool is still too big and the thread still needs to die.

ThreadPool class>>killWorker
    "just queue a task that kill the activeProcess, which will be the worker that picks it up"

    [ self isPoolTooBig ifTrue: [ (QueueWorkers remove: Processor activeProcess) terminate ] ] 
        queueWorkAndExpireIn: 10 minutes

Of course, something has to decide when to increase the size of the queue and when to decrease it, and it needs to a method to do so.

ThreadPool class>>adjustThreadPoolSize
    "starting up processes too fast is dangerous and wasteful, ensure a reasonable delay"
    1 second asDelay wait.
    self isPoolTooSmall 
        ifTrue: [ self startWorker ]
        ifFalse: [ self isPoolTooBig ifTrue: [ self killWorker ] ]

We need to ensure the thread pool is always up and running, and that something is managing it, so we'll hook the system startUp routine and kick off the minimum number of workers and start a single manager process to continually adjust the pool size to match the workload.

ThreadPool class>>startUp
    "self startUp"
    self shutDown.
    MinPoolSize timesRepeat: [ self startWorker ].
    PoolManager := [ [ self adjustThreadPoolSize ] repeat ] 
        forkAt: Processor systemBackgroundPriority
        named: 'pool manager'

And cleanup everything on shutdown so every time the image starts up we're starting from a clean slate.

ThreadPool class>>shutDown
    "self shutDown"
    WorkQueue := SharedQueue2 new.
    PoolManager ifNotNil: [ PoolManager terminate ].
    QueueWorkers do: [ :each | each terminate ].
    QueueWorkers removeAll.

And that's it, a simple thread pool using a shared queue to do all the dirty work of dealing with concurrency. I now queue excessively without suffering the punishment entailed by forking excessively. Now rather than...

[ self someTaskToDo ] fork

I just do...

[ self someTaskToDo ] queueWorkAndExpireIn: 25 seconds

Or in Seaside...

[ self someTaskToDo ] queueWorkAndExpireIn: 25 seconds session: self session

And my app is running like a champ again, no more hanging images due to forking like a drunken sailor.

Dynamic Web Development with Seaside PDF Available for Purchase

Reposted from Lukas Renggli blog:

The PDF version of the book Dynamic Web Development with Seaside is available to download now.

At the end of the payment process (PayPal) you will be redirected to the download area where you are able to get the latest builds of the PDF version of the book. If you bookmark the page you will be able to download fixes and extra chapters as we integrate them into the online version. By buying the PDF version you support our hard work on the book.

We wish to thank the European Smalltalk User Group, inceptive.be, Cincom Smalltalk and GemStone Smalltalk for generously sponsoring this book. We are looking for additional sponsors. If you are interested, please contact us. If you are a publisher and interested in publishing this material, please let us know.

So please, support the Seaside community and buy the book, I know I will.

SandstoneDb GOODS adaptor

SandstoneDb was written mostly as a rails'ish API for a simple object database for use in small office and prototype applications (plus I needed a db for this blog). Which object database wasn't really important to me at the time, it was the API that I wanted, so I made the actual object store backing it pluggable and initially wrote two different store adaptors for it. The first was a memory store which was little more than a dictionary of dictionaries against which I wrote all the unit tests. The second was a prevayler style file based store that used SmartRefStream serialization and loaded everything from disk on startup; this provided a crash proof Squeak images which wouldn't lose data.

I figured eventually, for fun I might get around to writing adaptors for some of the other object database back-ends that are in use: GOODS and Omnibase. I never really got around to it; however, Nico Schwarz has written a GOODS adaptor for SandstoneDb. This will let you hook up multiple squeak images to a single store and should scale better than the file store that SandstoneDb defaults to.

Go check it out and let him know what you think of it. This is just the kind of project that'll help programmers new to Seaside get going and get accustomed to using an object database rather than a relational one. It looks like his first blog post as well, so swing by and leave a comment to encourage more posts, we need more bloggers spreading the word!

On Twitter

OK, so I'm finally going to try out this Twitter thing. I still don't see why everyone is so obsessed about it but what the heck, they are, so maybe it is cool. Maybe some micro blogging will get me back in the mood to do some real blogging. If any of you guys are twitterers, come follow me so I have someone to tweet to.

Started working on a GLASS project, so maybe I'll tweet about that, and eventually blog about it as well (so far it fracking rocks).

Stateless Sitemap in Seaside

Originally I generated the sitemap for onsmalltalk as a file on disk and let Apache serve it up. There's nothing wrong with this approach but it'd be cooler if I had Seaside generate and render it on demand and serve as a good excuse to talk about serving up content statelessly in Seaside.

Seaside is a session based web framework, but there's nothing really session specific about a sitemap and I really don't want a new session created when a request for a sitemap is made. There's a lot of overhead in doing that and sometimes you just want to serve up stuff statelessly. When a request comes in, the application mounted on the base URL handles the request by plucking the session id out of the cookie or the URL and either creates a new session or finds the existing one needed to handle the current request. Once found, the request is pumped through the current session which runs through a similar procedure looking for a continuation to invoke.

Since I want to avoid all that and just handle the request at the application level I'll override #handleRequest: in my custom WAApplication subclass, check the URL of the current request and either render the sitemap and end the request by immediately returning a response, or allow processing to continue normally into the session lookup done by the call to super.

handleRequest: aRequest 
    (aRequest url endsWith: '/sitemap.xml') ifTrue: 
        [ ^WAResponse new
              beXML;
              cacheFor: 1 hour;
              contents: (self siteMapFrom: aRequest) asString readStream;
              yourself ]
    ^super handleRequest: aRequest

siteMapFrom: aRequest 
    ^ (SBSiteMapGenerator blogRoot: ('http://{1}/' format: {  (aRequest host)  })) 
        generateFromItems: {  (SBPost new)  } , (SBBlog onSmalltalk publicPosts) , SBTag findAll.

If you have things in your application that can be done statelessly, this is a good place to hook into the framework and take care of that stuff at the application level. Sometimes you don't need all that fancy Seaside stuff and you just want to work directly with HTTP requests and responses.

Two small methods and the sitemap is now generated dynamically, and statelessly directly from Seaside, removing the need to manually invoke the sitemap generation as I had previously been doing to the file system.

Oh, one small extension that I've put on WAResponse and use occasionally...

cacheFor: aDuration 
    self headerAt: 'Expires' put: (TimeStamp now + aDuration) httpFormat
<< 1 2 3 4 5 6 7 8 9 10 >>
about me|my pharo image|good books|popular posts|atom|rss