Login

Simple File-Based Distributed Job Queue in Smalltalk

Simple File-Based Distributed Job Queue in Smalltalk

There's a certain elegance to simple solutions. When faced with a distributed processing challenge, my first instinct wasn't to reach for Kafka, RabbitMQ, or some other enterprise message broker - all carrying their own complexity taxes. Instead, I built a file-based job queue in Smalltalk that's been quietly powering my production systems for years.

The Problem

Distributed work processing is a common need. You have jobs that:

  • Need to survive image restarts
  • Should process asynchronously
  • Might need to run on separate machines
  • Shouldn't be lost if something crashes

The Solution: Files and Rename

The core insight is that the filesystem already solves most of these problems. Files persist, they can be accessed from multiple machines via NFS, and most importantly, the rename operation is atomic even across NFS mounts.

The entire system is built around a few key classes:

Object subclass: #SFileQueue
    instanceVariableNames: 'queueName'
    classVariableNames: 'FileQueue'
    poolDictionaries: ''
    category: 'MultiProcessFileQueue'

With just a few methods, we get a distributed processing system:

deQueue
    | dir name workingName |
    dir := self queueDirectory.
    name := self nextJobNameFrom: dir.
    name ifNil: [ ^ nil ].
    workingName := name copyReplaceAll: self jobExtension with: self workingExtension.
    [ dir primRename: (dir fullNameFor: name) asVmPathName to: (dir fullNameFor: workingName) asVmPathName ]
        on: Error
        do: [ :error | 
            "rename is atomic, if a rename failed, someone else got that file, recurse and try again"
            ^ self deQueue ].
    ^ [ self deserializerFromFile: (dir fullNameFor: workingName) ] ensure: [ dir deleteFileNamed: workingName ]

The critical piece here is using the primitive file rename operation (primRename:to:). By going directly to the primitive that wraps the POSIX rename system call, we ensure true atomicity across NFS without any extra file existence checks that could create race conditions.

Command Pattern

Jobs themselves are just serialized command objects:

Object subclass: #SMultiProcessCommand
    instanceVariableNames: 'returnAddress hasAnswered expireOn startOn'
    classVariableNames: ''
    poolDictionaries: ''
    category: 'MultiProcessFileQueue'

Subclasses override execute to do the actual work. Want to add a new job type? Just create a new subclass and implement execute.

execute
    self subclassResponsibility!

Service Startup and Runtime

The queue service runs as a system process that's registered with the Smalltalk image for automatic startup and shutdown:

initialize
    "self initialize"
    SmalltalkImage current addToStartUpList: self.
    SmalltalkImage current addToShutDownList: self

On system startup, it checks an environment variable to decide if this image should be a queue server:

startUp
    (OSProcess thisOSProcess environment at: 'QUEUE_SERVER' ifAbsent: 'false') asLowercase = 'true' or: [ ^ self ].
    self startDefaultQueue

When started, it creates a background Smalltalk process with high priority:

startDefaultQueue
    | queue |
    queue := self default.
    queue queueName notifyLogWith: 'Starting Queue in:'.
    FileQueue := [ [ [ self workFileQueue: queue ] ignoreErrors ] repeat ] newProcess.
    FileQueue
        priority: Processor systemBackgroundPriority + 1;
        name: self name;
        resume

The queue directory name can be configured with an environment variable:

default
    ^ self named: (OSProcess thisOSProcess environment at: 'QUEUE_DIRECTORY' ifAbsent: 'commands')

Background Processing

The queue gets processed in a background process:

workFileQueue: aQueue
    | delay |
    delay := 10 milliSeconds asDelay.
    Smalltalk at: #ThreadPool ifPresent: [ :pool | [ pool isBackedUp ] whileTrue: [ delay wait ] ].
    aQueue deQueue
        ifNil: [ delay wait ]
        ifNotNilDo: [ :value | 
            | block |
            block := [ [ value execute ] on: Error do: [ :error | error notifyLog ] ensure: [ value returnToSenderOn: aQueue ] ].
            Smalltalk at: #ThreadPool ifPresent: [ :pool | block queueWork ].
            Smalltalk at: #ThreadPool ifAbsent: [ block forkBackgroundNamed: 'file queue worker' ] ]

This method is particularly clever - it checks for a ThreadPool (which might be available in some Smalltalk dialects) and if present, it uses that for efficient work processing. Otherwise, it falls back to basic forking. It also waits if the ThreadPool is backed up, providing rudimentary back-pressure.

Job Selection Strategy

Instead of always taking the first job, it takes a random job from the oldest few, reducing contention:

nextJobNameFrom: aDir
    | jobs |
    "grab a random one of the top few newest jobs in the queue to reduce contention for the top of the queue"
    ^ [ 
    jobs := (aDir entries asPipe)
        select: [ :e | e name endsWith: self jobExtension ];
        sorted: [ :a :b | a creationTime <= b creationTime ];
        yourself.
    jobs size > self topFew
        ifTrue: [ jobs := jobs first: self topFew ].
    jobs ifEmpty: [ nil ] ifNotEmpty: [ jobs atRandom name ] ] on: Error do: [ :err | nil ]

This approach helps prevent multiple servers from continually colliding when trying to grab the next job.

Enqueueing Work

Adding jobs to the queue is straightforward:

enQueue: anSMultiProcessCommand
    [ self serialize: anSMultiProcessCommand toFile: (self queueDirectory fullNameFor: self uniqueName , self jobExtension) ]
        on: Error
        do: [ :error | error notifyLog ]

It serializes the command object to a file with a unique name and the .job extension.

Request/Response Pattern

The queue also provides bidirectional communication. Command objects can return values to callers through a separate results directory:

returnToSenderOn: aQueue
    aQueue set: returnAddress value: self!

Setting a result is simply serializing to the answer directory:

set: aKey value: anObject
    self serialize: anObject toFile: (self answerDirectory fullNameFor: aKey)

The caller can fetch the response and automatically clean up the result file:

get: aKey
    | dir |
    dir := self answerDirectory.
    (dir fileExists: aKey)
        ifFalse: [ ^ nil ].
    ^ [ 
    [ self deserializerFromFile: (dir fullNameFor: aKey) ]
        ifError: [ :error | 
            SysLog devLog: error.
            nil ] ] ensure: [ dir deleteFileNamed: aKey ]

Commands check for their answers with timeouts:

tryAnswerOn: aQueue
    hasAnswered
        ifTrue: [ ^ self ].
    DateAndTime now > expireOn
        ifTrue: [ self handleAnswer: nil ]
        ifFalse: [ (aQueue get: returnAddress) ifNotNilDo: [ :answer | self handleAnswer: answer ] ]

Scalability Through NFS

With NFS mounts, this system transparently handles distributed processing across multiple Pharo/Smalltalk VMs on different machines. No additional code needed - it just works.

The Unix Philosophy in Action

This implementation follows the Unix philosophy: write programs that do one thing and do it well. The file queue does just that - reliable job distribution with minimal complexity.

It's not flashy, doesn't have a marketing site, and won't get you a $100M valuation. But it works, it's simple, and you can understand the entire implementation in a few minutes. Build your own tools that you know inside and out, it's not that hard.

That's what the Smalltalk way is all about - solving real problems with elegant, comprehensible code.

Comments are formatted with Markdown

Comments

tim Rowledge 25 days ago

Nice work!

Trampek 23 days ago

I don't think this works. I never used SmallTalk and may have misread the code, but it seems like you're renaming the file from file.jobExtension to file.workingExtension. This is indeed atomic, but if the job processor dies immediately afterwards, then you end up with a file that isn't picked up by anyone else in the nextJobNameFrom method.

You need to introduce self-healing mechanisms. But then you realize, it's much harder than just the atomic rename, as you can't even know whether the job processor is dead or if it's just slow.

Ramon Leon 23 days ago

It does work, been doing it for years; and you don't have to nail every little edge case that's possible in the real world, I'd just have a cron job rename those back to .job if over a certain age. If you're that concerned about edge cases, use a real persistent queue like the ones mentioned at the beginning; the whole point is the guarantees aren't worth the effort and you can pragmatically get away with vastly simpler stuff. I've processed hundreds of millions of jobs through this to "multi-process" these old pharo images across many cores and many computers and, it works great.

Rabarj 15 days ago

Hi Ramon,

Nice article, this approach makes sense for lots of projects and probably scales to surprising limits given today's hardware.

Just curious, what Smalltalk implementation(s) and libraries are you currently using professionally and for personal projects?

or Cancel

about me|good books|popular posts|atom|rss