By Ramon Leon - 26 October 2007 under Smalltalk
Concurrency is always a source of problems in complex systems and one of the coolest patterns I've seen for simplifying it is Futures. I thought I'd explore the idea today and hack up a quick and simple implementation of a dynamic Future proxy.
The basic idea is to take a block of code, schedule it on another thread and return a dynamic proxy that if accessed, blocks until the value returns. This lets useful work continue on the main thread until you access the value.
A nice way to break up a big task concurrently might be to #collect: all the futures for a bunch of work processes you have, say fetching rates for a bunch of hotels that require calls to outside systems that may or may not return quickly, and then aggregate the results at the end.
Here's the complete implementation, it's quite simple but seems to work pretty well while playing around in a workspace and makes concurrency seem less of a beast.
First the class, a subclass of ProtoObject since we're building a proxy...
ProtoObject subclass: #SFuture instanceVariableNames: 'futureValue error lock' classVariableNames: '' poolDictionaries: '' category: 'OnSmalltalk'
Then a #value: write accessor which eagerly kicks off the process, sets up, and clears the lock after fetching the future value.
value: aBlock lock := Semaphore new. [ [ futureValue := aBlock value ] on: Error do: [ :err | error := err ] ensure: [ lock signal ] ] forkBackground
Now a #value read accessor that blocks if the lock still exists, re-throws any error that may have happened on the worker thread in the context of the main thread, and finally returns the future value.
value lock isSignaled ifFalse: [ lock wait ]. error ifNotNil: [ error privHandlerContext: thisContext; signal ]. ^ futureValue
A quick testing method for checking if the future has finished executing (useful for doing what work you can with the results that have returned).
hasValue ^ lock isSignaled
And the all important #doesNotUnderstand: override that intercepts any message sent to the proxy and sends it to the future value, causing the thread to block until the result is finished computing.
doesNotUnderstand: aMessage ^ self value perform: aMessage selector withArguments: aMessage arguments
Finally, a single extension method to BlockContext to make using the future more natural and ensuring to call fixTemps so I can collect future values in a loop with the assumption that the block will act like a proper closure.
For Squeak (older ones without the new compiler)...
BlockContext>>future ^ SFuture new value: self fixTemps
And for Pharo...
BlockClosure>>future ^ SFuture new value: self fixTemps
Now we can ask any block for its future value and just pretend we have it. Executing some test code in a workspace...
value1 := [200 timesRepeat:[Transcript show: '.']. 6] future. value2 := [200 timesRepeat:[Transcript show: '+']. 6] future. Transcript show: 'other work'. Transcript show: (value1 + value2).
Reveals the string 'other work', a long string of interspersed periods and pluses, and finally 12, the result of adding the value returned by each future. In all, a pretty nice way to handle concurrency, I'll have to see where I can simplify some code with the use of Futures, I can already think of a few.