Zahir
Perhaps I shall conclude by wearing away the Zahir simply through thinking of it again and again.
Around Christmas, out of a mix of curiousity, need for an improved photo-encoding backend, and reasons known to a few coworkers I decided to build a workflow engine. The main insights that I started from was that:
- Jobs should be able to spawn and await other jobs; that is, the workflow graph should be dynamic so that workflows would be as expressive as other jobs
- A collection of concepts like signals, concurrency-limiting, and scheduling are ultimately a common construct; await a world-state, and do things accordingly.
- Workflow state could be communicated through eventing, and some events could receive responses from the workflow state-machine (like
Await(job))
I built it, though it took a few hundred commits. The engine was expressive enough, simple to write workflows on, and functioned sufficiently well for me to build workflows like my photo-management system. I recently decided to rewrite it from scratch.
What Was Wrong?
While I liked these core concepts (dynamically evolving graph of jobs, distributed across processes, awaiting dependencies), I felt the implementation was heavyweight and suspected I was missing a few additional theoretic underpinnings in the design. There were some bugs (in particular, rerunning crashed workflows was slow), but my main issues were more with how the implementation turnt out.
- Jobs and dependencies were encoded as complex classes that had arcane methods of composing functions around them, and rescheduling in retries (maybe a thousand lines of code)
- We had a notion of a job-registry; a storage interface for the job queue, results, errors. It was in practice a god-class, with dozens of methods that intermixed storage and scheduling concerns
- An absurdly complex core state-machine (about eighteen states) which partly reimplemented basic Python features like try-except.
- It took me many days to figure out how to implement a retry decorator for jobs.
After some thought, I figured out the underlying causes of the issue was that I built Zahir āwideā, not ādeepā. That is; I did not find atomic primitives I could stack together into more complex structure. I instead defined large interfaces on each class, mixing responsibilities like serialisation, rescheduling, queuing, and everything else into one layer. In the rewrite, I repeated a trick thatās been successful for me in the past; borrowing complex ideas from functional programming, simplifying them and translating them into something I can comprehend, and composing small simple pieces into a final product.
(Re)design
I read about Algebraic Effects midway through implementing v1. I did not fully understand the concept (and given that I only implemented a limited version, probably still donāt), but approximated effects as:
- events with handlers functions, analogous to event-driven programming
- events that can receive responses from their handlers, like with request-response servers
- handlers than request effects themselves while handling an effect
- separating out the action (e.g
GetDB) from the implementation; a FP analogue of interface implementation - if you squint, continuation-style passing a la NodeJSās callback-hell days;
These approximations in aggregate are probably still painting an incomplete picture, but it was sufficiently clear for me to build with them. I realised that response = yield Await(job) was a half-unwitting use of effects that my engine had one overspecialised code-path to handle. Zahir was already an event-driven system where jobs were generators, so I saw an easy way of factoring out this piece. I saw that Zahir could be composed of four stacked layers:
-
Layer zero: effects and handlers. A generator yields an effect, a handler performs it. This would be a simple eval-loop with some attention paid to ensuring we could compose handlers together (i.e we could emit effects from a core generator and have it handled later by some separate telemetry handlers, vs mixing all handlers together). This adds a ālanguage featureā to Python for us to express the workflow engine in terms of.
-
Layer one: concurrency. We need to spawn processes and message between them. In this case I borrowed from a second toolkit, Erlang. This would separate out IPC concerns from workflow execution.
-
Layer two: workflow execution. Weād follow the structure from Zahir; spawn an overseer process and worker processes. Theyād poll for work, and handle job-suspension in the case of dealing with
EAwait. Jobs would run, await dependencies, spawn more jobs. Jobs would just take inputs and return outputs; thereād be little additional ceremony when implementing workflows and in exchange our jobs would be distributed across processes with telemetry wired in -
Layer three: telemetry. Zahir would emit information (from functions composed onto handlers), and an aggregator combinator library would perform analytics like ETA of completion downstream of the workflow itself.
So, I set out building the chain of libraries to implement all this.
Layer zero: Orbis
Effects would be embedded in generators, so we could write composable programs that decouple āwhatā from āhowā.
def fetch(url: str):
return request(url, headers={...})
def main():
url = "https://en.wikipedia.org/wiki/Slovenia"
for attempt in range(3):
try:
res = yield EFetch(url)
return res
except:
yield ESleep(1000)
raise Exception("Slovenia not found.")
res = complete(main(), {
"fetch": fetch,
"sleep": sleep
})
The library reduces to a short function _drive which maintains a generator stack and:
- Starts with the main programās generator
- Advances the generators, looks up a handler when effects are returned, pushes that generator on the stack
- Yields unhandled effects outwards (so we can compose handler layers)
- Passes exceptions and return values along the ācall stackā
It defines a small eval-loop on which we can build ārequest-responseā style generator yields. A pretty small library overall.
Layer one: Tertius
Tertius was broader and trickier to get right (race conditions). The fundamentals are:
- Implement
genserverand Erlang-style process management - Hide process-management complexity using effects + handlers
- Use zeromq to handle IPC
The core loop is short enough to share. It takes a few specialised handlers, and defines a loop for receiving messages, updating state, responding. Itās deceptively simple, though, since we have to actually implement handlers this time.
def _gen_server_loop[StateT](
init: InitHandler[StateT],
handle_cast: CastHandler[StateT] | None,
handle_call: CallHandler[StateT],
handle_info: InfoHandler[StateT] | None,
*args: Any,
) -> ServerGen:
state = yield from init(*args)
while True:
envelope = yield EReceive()
if envelope is None:
raise RuntimeError("EReceive yielded None ā broker sent no envelope")
match envelope.body:
case CastMsg(body=body):
if handle_cast is not None:
state = yield from handle_cast(state, body)
case CallMsg(ref=ref, body=body):
state, reply = yield from handle_call(state, body)
yield ESend(envelope.sender, ReplyMsg(ref=ref, body=reply))
case _:
if handle_info is not None:
state = yield from handle_info(state, envelope.body)
Tertius is effects-driven; ESpawn, ESend, EReceive in particular manage process setup and intercommunication over sockets. We relay messages through a broker router, and can receive control messages on another broker thread to perform actions like spawning processes. The programs supported by tertius, as with orbis, are generators that yield effects. Practically speaking this is abstracted away behind gen_server for most uses; we have servers that follow an actor-pattern style of messaging, with the possibility of composing further effects on top.
I ran into a few hanging tests while building it, so built fuzz-tests to try simulate a wide array of messaging patterns in order to catch race conditions. Itās dug up plenty, especially relating to shutdown coordination.
Layer two: Zahir
I wanted to ditch all of the boilerplate and implementation-detail complexity from Zahir v1. So, I removed concepts like dynamic scope population (just give me a dictionary of functions), job and dependency classes (now, only generators), and the distinction between jobs and generators itself. Telemetry was externalised from the logic; we compose telemetry functions onto handlers rather than bake in additional eventing. I ditched the eighteen-state machine for tracking running / idle. It lead to much simpler workflow design with no real loss in expressive power; ultimately I just wanted to be able to schedule jobs, distribute them across processes, and keep track of execution.
The strict factoring of any interesting interprocess logic into effects and handler functions helped keep the core simple; that constrained everything into a series of single-purpose events vs mixed-purpose megaclasses. It also made it easy (though out of scope) to re-add the old SQLite storage-backend; Iād just need to swap that set of handlers. And, additional functionality could be layered outwards based on emitted effects, leading to the final layer
Layer three: Bookman
Zahir emits telemetry during execution but I wanted to decouple metric aggregation into its own layer. This part is still in development, but itās somewhat lifted from algebird and structures aggregations as a combiner monoid and ways of inserting into and extracting from that type.
insert:event -> monoid: lift the event into the monoid so we can combine itcombine:monoid -> monoid -> monoida way of combining two state objectsempty:monoidan empty state objectextract:monoid -> returnType: extract into a final result
Itās worked reasonably well so far; many things can be represented in this way, and complex aggregates can be stitched together. But, thereās a lot of work needed for this concept to really prove itself. Managing progress bar statistics is not a very complex application of this structure & the associated combinators.
Results
Ultimately, the same as before. The layered architecture worked out as planned; separating each layer out made them more testable and easier to reason about individually, though with the usual rounds of debugging stuck tests or missing events. I can upload my photos efficiently without OOMing my laptop, and with a progress bar that tells fewer lies than the old one.

At the moment, Zahir is more of a concurrent runtime than a true workflow engine. Iām still trying to design a persistence mechanism thatās more general-purpose than pretending jobs are functionally pure or implementing EGetState / ESetState and delegating that task to workers themselves. But, perhaps thatās missing the moral of the compositional approach where we layer responsibilities rather than broaden them. In any case, I need to think of it again and again (Zahir really was the right name for this projectā¦)
Takeaway Points
- Complex systems are better built through compositional layering of individual responsibilities than by defining broad catch-all interfaces
- Algebraic effects are surprisingly practical to work with, and encourage well-factored code
- Concurrent process management remains painful to implement, but itās possible to push most of that pain into the framework layer rather than workflows themselves.
- Functional-programming produces better results for me than object-oriented programming since it constraints programs in positive ways