Simple File-Based Distributed Job Queue in Smalltalk
By Ramon Leon - 27 March 2025 under Linux, Smalltalk
Simple File-Based Distributed Job Queue in Smalltalk
There's a certain elegance to simple solutions. When faced with a distributed processing challenge, my first instinct wasn't to reach for Kafka, RabbitMQ, or some other enterprise message broker - all carrying their own complexity taxes. Instead, I built a file-based job queue in Smalltalk that's been quietly powering my production systems for years.
The Problem
Distributed work processing is a common need. You have jobs that:
- Need to survive image restarts
- Should process asynchronously
- Might need to run on separate machines
- Shouldn't be lost if something crashes
The Solution: Files and Rename
The core insight is that the filesystem already solves most of these problems. Files persist, they can be accessed from multiple machines via NFS, and most importantly, the rename
operation is atomic even across NFS mounts.
The entire system is built around a few key classes:
Object subclass: #SFileQueue instanceVariableNames: 'queueName' classVariableNames: 'FileQueue' poolDictionaries: '' category: 'MultiProcessFileQueue'
With just a few methods, we get a distributed processing system:
deQueue | dir name workingName | dir := self queueDirectory. name := self nextJobNameFrom: dir. name ifNil: [ ^ nil ]. workingName := name copyReplaceAll: self jobExtension with: self workingExtension. [ dir primRename: (dir fullNameFor: name) asVmPathName to: (dir fullNameFor: workingName) asVmPathName ] on: Error do: [ :error | "rename is atomic, if a rename failed, someone else got that file, recurse and try again" ^ self deQueue ]. ^ [ self deserializerFromFile: (dir fullNameFor: workingName) ] ensure: [ dir deleteFileNamed: workingName ]
The critical piece here is using the primitive file rename operation (primRename:to:
). By going directly to the primitive that wraps the POSIX rename system call, we ensure true atomicity across NFS without any extra file existence checks that could create race conditions.
Command Pattern
Jobs themselves are just serialized command objects:
Object subclass: #SMultiProcessCommand instanceVariableNames: 'returnAddress hasAnswered expireOn startOn' classVariableNames: '' poolDictionaries: '' category: 'MultiProcessFileQueue'
Subclasses override execute
to do the actual work. Want to add a new job type? Just create a new subclass and implement execute
.
execute self subclassResponsibility!
Service Startup and Runtime
The queue service runs as a system process that's registered with the Smalltalk image for automatic startup and shutdown:
initialize "self initialize" SmalltalkImage current addToStartUpList: self. SmalltalkImage current addToShutDownList: self
On system startup, it checks an environment variable to decide if this image should be a queue server:
startUp (OSProcess thisOSProcess environment at: 'QUEUE_SERVER' ifAbsent: 'false') asLowercase = 'true' or: [ ^ self ]. self startDefaultQueue
When started, it creates a background Smalltalk process with high priority:
startDefaultQueue | queue | queue := self default. queue queueName notifyLogWith: 'Starting Queue in:'. FileQueue := [ [ [ self workFileQueue: queue ] ignoreErrors ] repeat ] newProcess. FileQueue priority: Processor systemBackgroundPriority + 1; name: self name; resume
The queue directory name can be configured with an environment variable:
default ^ self named: (OSProcess thisOSProcess environment at: 'QUEUE_DIRECTORY' ifAbsent: 'commands')
Background Processing
The queue gets processed in a background process:
workFileQueue: aQueue | delay | delay := 10 milliSeconds asDelay. Smalltalk at: #ThreadPool ifPresent: [ :pool | [ pool isBackedUp ] whileTrue: [ delay wait ] ]. aQueue deQueue ifNil: [ delay wait ] ifNotNilDo: [ :value | | block | block := [ [ value execute ] on: Error do: [ :error | error notifyLog ] ensure: [ value returnToSenderOn: aQueue ] ]. Smalltalk at: #ThreadPool ifPresent: [ :pool | block queueWork ]. Smalltalk at: #ThreadPool ifAbsent: [ block forkBackgroundNamed: 'file queue worker' ] ]
This method is particularly clever - it checks for a ThreadPool (which might be available in some Smalltalk dialects) and if present, it uses that for efficient work processing. Otherwise, it falls back to basic forking. It also waits if the ThreadPool is backed up, providing rudimentary back-pressure.
Job Selection Strategy
Instead of always taking the first job, it takes a random job from the oldest few, reducing contention:
nextJobNameFrom: aDir | jobs | "grab a random one of the top few newest jobs in the queue to reduce contention for the top of the queue" ^ [ jobs := (aDir entries asPipe) select: [ :e | e name endsWith: self jobExtension ]; sorted: [ :a :b | a creationTime <= b creationTime ]; yourself. jobs size > self topFew ifTrue: [ jobs := jobs first: self topFew ]. jobs ifEmpty: [ nil ] ifNotEmpty: [ jobs atRandom name ] ] on: Error do: [ :err | nil ]
This approach helps prevent multiple servers from continually colliding when trying to grab the next job.
Enqueueing Work
Adding jobs to the queue is straightforward:
enQueue: anSMultiProcessCommand [ self serialize: anSMultiProcessCommand toFile: (self queueDirectory fullNameFor: self uniqueName , self jobExtension) ] on: Error do: [ :error | error notifyLog ]
It serializes the command object to a file with a unique name and the .job
extension.
Request/Response Pattern
The queue also provides bidirectional communication. Command objects can return values to callers through a separate results directory:
returnToSenderOn: aQueue aQueue set: returnAddress value: self!
Setting a result is simply serializing to the answer directory:
set: aKey value: anObject self serialize: anObject toFile: (self answerDirectory fullNameFor: aKey)
The caller can fetch the response and automatically clean up the result file:
get: aKey | dir | dir := self answerDirectory. (dir fileExists: aKey) ifFalse: [ ^ nil ]. ^ [ [ self deserializerFromFile: (dir fullNameFor: aKey) ] ifError: [ :error | SysLog devLog: error. nil ] ] ensure: [ dir deleteFileNamed: aKey ]
Commands check for their answers with timeouts:
tryAnswerOn: aQueue hasAnswered ifTrue: [ ^ self ]. DateAndTime now > expireOn ifTrue: [ self handleAnswer: nil ] ifFalse: [ (aQueue get: returnAddress) ifNotNilDo: [ :answer | self handleAnswer: answer ] ]
Scalability Through NFS
With NFS mounts, this system transparently handles distributed processing across multiple Pharo/Smalltalk VMs on different machines. No additional code needed - it just works.
The Unix Philosophy in Action
This implementation follows the Unix philosophy: write programs that do one thing and do it well. The file queue does just that - reliable job distribution with minimal complexity.
It's not flashy, doesn't have a marketing site, and won't get you a $100M valuation. But it works, it's simple, and you can understand the entire implementation in a few minutes. Build your own tools that you know inside and out, it's not that hard.
That's what the Smalltalk way is all about - solving real problems with elegant, comprehensible code.