Login

Smalltalk Concurrency, Playing With Futures

Concurrency is always a source of problems in complex systems and one of the coolest patterns I've seen for simplifying it is Futures. I thought I'd explore the idea today and hack up a quick and simple implementation of a dynamic Future proxy.

The basic idea is to take a block of code, schedule it on another thread and return a dynamic proxy that if accessed, blocks until the value returns. This lets useful work continue on the main thread until you access the value.

A nice way to break up a big task concurrently might be to #collect: all the futures for a bunch of work processes you have, say fetching rates for a bunch of hotels that require calls to outside systems that may or may not return quickly, and then aggregate the results at the end.

Here's the complete implementation, it's quite simple but seems to work pretty well while playing around in a workspace and makes concurrency seem less of a beast.

First the class, a subclass of ProtoObject since we're building a proxy...

ProtoObject subclass: #SFuture
    instanceVariableNames: 'futureValue error lock'
    classVariableNames: ''
    poolDictionaries: ''
    category: 'OnSmalltalk'

Then a #value: write accessor which eagerly kicks off the process, sets up, and clears the lock after fetching the future value.

value: aBlock 
    lock := Semaphore new.

    [ [ futureValue := aBlock value ] 
        on: Error
        do: [ :err | error := err ]
        ensure: [ lock signal ] ] forkBackground

Now a #value read accessor that blocks if the lock still exists, re-throws any error that may have happened on the worker thread in the context of the main thread, and finally returns the future value.

value
    lock isSignaled ifFalse: [ lock wait ].
    error ifNotNil: 
        [ error
            privHandlerContext: thisContext;
            signal ].
    ^ futureValue

A quick testing method for checking if the future has finished executing (useful for doing what work you can with the results that have returned).

hasValue
    ^ lock isSignaled

And the all important #doesNotUnderstand: override that intercepts any message sent to the proxy and sends it to the future value, causing the thread to block until the result is finished computing.

doesNotUnderstand: aMessage 
    ^ self value 
        perform: aMessage selector
        withArguments: aMessage arguments

Finally, a single extension method to BlockContext to make using the future more natural and ensuring to call fixTemps so I can collect future values in a loop with the assumption that the block will act like a proper closure.

For Squeak (older ones without the new compiler)...

BlockContext>>future
    ^ SFuture new value: self fixTemps

And for Pharo...

BlockClosure>>future
    ^ SFuture new value: self fixTemps

Now we can ask any block for its future value and just pretend we have it. Executing some test code in a workspace...

value1 := [200 timesRepeat:[Transcript show: '.']. 6] future.
value2 := [200 timesRepeat:[Transcript show: '+']. 6] future.
Transcript show: 'other work'.
Transcript show: (value1 + value2).

Reveals the string 'other work', a long string of interspersed periods and pluses, and finally 12, the result of adding the value returned by each future. In all, a pretty nice way to handle concurrency, I'll have to see where I can simplify some code with the use of Futures, I can already think of a few.

Comments (automatically disabled after 1 year)

Isaac Gouy 3325 days ago

In Cincom Smalltalk (aka VisualWorks) you'll find the functionality and supporting methods named

Promise

Isaac Gouy 3324 days ago

Also GNU Smalltalk

http://www.gnu.org/software/smalltalk/manual-base/html_node/Promise.html

Ramon Leon 3324 days ago

Well, I'm sure you know this but Promises aren't quite the same thing as futures, though they're often mixed up, nor are they always implemented as proxies. Squeak also doesn't have them built in, which is why I wanted to hack one up and try it out.

However, having a look at the VisualWorks implementation, I can see why they implemented Promise instead of Future since Futures can be implemented via a promise from a forked block. Interesting indeed.

Marcin Tustin 3323 days ago

Question: why not have a method to test to see if the value is ready? That should make reducing a collection of futures faster if the reduction operation is commutative, because you can reduce the available futures first.

Ramon Leon 3323 days ago

True, I'll add it.

Mark Miller 3323 days ago

Hi Ramon. This is very nice. I'm wondering about going the other way as well: an unthreaded object receiving messages from multiple threads. It sounds like there are various ways of dealing with this scenario: semaphores, streams, monitors, and SharedQueue. Is there a simple construct like futures that helps with this scenario? A lot of the examples I see endorse the idea of using semaphores and critical sections for this.

Ramon Leon 3323 days ago

I'm not sure I follow, this example was collecting data from multiple threads. Can you give me a more concrete example?

I was planning on doing another example of a Promise next which is like a future anyone can fulfill.

Mark Miller 3322 days ago

@Ramon:

We could use the blog example you had earlier in Seaside, where you store blog posts and comments to them in OrderedCollections. You eventually went to a database, but I remember you said you wanted to use it only as a backup, that you would keep data in memory along with the database.

What I remember is you had the "component thread" (in Seaside), I guess, add to the OrderedCollection, but this creates a problem of multiple threads potentially adding to the collection at the same time. I'm talking about a shared resource.

You could use a critical section for this, inside an #add method in the class where you kept the OrderedCollection, I guess:

Blog>>add: comment | sema | sema := Semaphore forMutualExclusion. sema critical: [comments add: comment]

There's also the issue of sharing data between threads. I've heard others say SharedQueue is the best way to handle that. I've tried it and it works. I was trying out some code to see if I could do screen recording in Squeak. I got a little something working. In one thread I captured forms (frames) of the whole screen and put them into a SharedQueue. Another thread on a lower priority listened on the SharedQueue, grabbed frames out, and saved them to individual files. Later I could play them back, using a different routine, to see how it worked out.

BTW, perhaps this is out of your area of expertise, but maybe somebody else knows. Where could I find specs on video file formats? I tried looking up MPEG-1. The results I got indicated that somebody owns the rights to it, and you have to pay to get the spec. Is that correct?

Ramon Leon 3322 days ago

Yea, I don't know a way around the use of #critical: sections, and that's all a SharedQueue does. If I needed it, I'd subclass OrderedCollection and override and wrap all state changing methods with a critical block creating a thread safe OrderedCollection rather than doing critical sections outside like you did above. They all need to share the same semaphore.

Don't know anything about video, sorry.

Mark Miller 3321 days ago

If I needed it, I'd subclass OrderedCollection and override and wrap all state changing methods with a critical block creating a thread safe OrderedCollection rather than doing critical sections outside like you did above. They all need to share the same semaphore.

I agree. This sounds like a better approach. The thing is, what about the state changing methods in its base classes as well? Not everything that's useful about an OrderedCollection is in that class.

Something I've been exploring conceptually with another blogger just recently is the idea of a concurrent OOP, redefining the semantics of it so the whole system is concurrent by design, with an allowance for sequential execution. The link points to my comments about this. The comment I was responding to is instructive as well, though I'm sure you're familiar with the metaphor being discussed.

I don't have the first clue about how to actually implement it yet, but I was just throwing about some ideas. When I look at OOP as defined by Kay, it seems natural that just as there is concurrency on the internet, there can be concurrency between objects. It seems like this is going to be needed fairly soon, don't you think?

Mark Miller 3321 days ago

Re: concurrent OOP

The way I talked about it on the other forum was that all message sends could be concurrent, but a #continuation: message could be added to Object to allow sequential execution.

On second thought, there are other approaches that would probably be easier for people to deal with. Some other ideas knocking around my head are:

1) message receipt is concurrent, but message sending is blocking. We'd still need to keep the BlockClosure>>fork, and the futures/promise mechanism for concurrent execution of a block of code.

One of the points made by another participant in the discussion on the other forum was that in any program or system there's a need for some execution to be sequential (or blocking). Not everything can run in parallel.

2) Maybe lazy evaluation behavior would fix that. Just assume concurrent execution until there's a dependency, and then block on that dependency unless otherwise specified. This is like futures, but it would be an inherent part of how the system works.

Ramon Leon 3321 days ago

If you're not watching, you should check out the thread this post started in squeak-dev where they're discussing all of these potential options and weighing the actual implementation options. It's turned into a discussion on how to make the SqueakVM work with multiple core and cpu machines. It's way over my head, so I'm just watching and learning, but it's been a great discussion so far.

[...] into how cool futures can be when the technology uses a sufficient amount of introspection. The Smalltalk way is even more amazing (look at the very bottom of that article to see how easy it is to use). Very [...]

about me|good books|popular posts|atom|rss