Zahir

Perhaps I shall conclude by wearing away the Zahir simply through thinking of it again and again.

Around Christmas, out of a mix of curiousity, need for an improved photo-encoding backend, and reasons known to a few coworkers I decided to build a workflow engine. The main insights that I started from was that:

I built it, though it took a few hundred commits. The engine was expressive enough, simple to write workflows on, and functioned sufficiently well for me to build workflows like my photo-management system. I recently decided to rewrite it from scratch.

What Was Wrong?

While I liked these core concepts (dynamically evolving graph of jobs, distributed across processes, awaiting dependencies), I felt the implementation was heavyweight and suspected I was missing a few additional theoretic underpinnings in the design. There were some bugs (in particular, rerunning crashed workflows was slow), but my main issues were more with how the implementation turnt out.

After some thought, I figured out the underlying causes of the issue was that I built Zahir ā€œwideā€, not ā€œdeepā€. That is; I did not find atomic primitives I could stack together into more complex structure. I instead defined large interfaces on each class, mixing responsibilities like serialisation, rescheduling, queuing, and everything else into one layer. In the rewrite, I repeated a trick that’s been successful for me in the past; borrowing complex ideas from functional programming, simplifying them and translating them into something I can comprehend, and composing small simple pieces into a final product.

(Re)design

I read about Algebraic Effects midway through implementing v1. I did not fully understand the concept (and given that I only implemented a limited version, probably still don’t), but approximated effects as:

These approximations in aggregate are probably still painting an incomplete picture, but it was sufficiently clear for me to build with them. I realised that response = yield Await(job) was a half-unwitting use of effects that my engine had one overspecialised code-path to handle. Zahir was already an event-driven system where jobs were generators, so I saw an easy way of factoring out this piece. I saw that Zahir could be composed of four stacked layers:

So, I set out building the chain of libraries to implement all this.

Layer zero: Orbis

Effects would be embedded in generators, so we could write composable programs that decouple ā€œwhatā€ from ā€œhowā€.

def fetch(url: str):
  return request(url, headers={...})

def main():
  url = "https://en.wikipedia.org/wiki/Slovenia"
  for attempt in range(3):
    try:
      res = yield EFetch(url)
      return res
    except:
      yield ESleep(1000)

  raise Exception("Slovenia not found.")

res = complete(main(), {
  "fetch": fetch,
  "sleep": sleep
})

The library reduces to a short function _drive which maintains a generator stack and:

It defines a small eval-loop on which we can build ā€œrequest-responseā€ style generator yields. A pretty small library overall.

Layer one: Tertius

Tertius was broader and trickier to get right (race conditions). The fundamentals are:

The core loop is short enough to share. It takes a few specialised handlers, and defines a loop for receiving messages, updating state, responding. It’s deceptively simple, though, since we have to actually implement handlers this time.

def _gen_server_loop[StateT](
    init: InitHandler[StateT],
    handle_cast: CastHandler[StateT] | None,
    handle_call: CallHandler[StateT],
    handle_info: InfoHandler[StateT] | None,
    *args: Any,
) -> ServerGen:
    state = yield from init(*args)

    while True:
        envelope = yield EReceive()
        if envelope is None:
            raise RuntimeError("EReceive yielded None — broker sent no envelope")

        match envelope.body:
            case CastMsg(body=body):
                if handle_cast is not None:
                    state = yield from handle_cast(state, body)

            case CallMsg(ref=ref, body=body):
                state, reply = yield from handle_call(state, body)
                yield ESend(envelope.sender, ReplyMsg(ref=ref, body=reply))

            case _:
                if handle_info is not None:
                    state = yield from handle_info(state, envelope.body)

Tertius is effects-driven; ESpawn, ESend, EReceive in particular manage process setup and intercommunication over sockets. We relay messages through a broker router, and can receive control messages on another broker thread to perform actions like spawning processes. The programs supported by tertius, as with orbis, are generators that yield effects. Practically speaking this is abstracted away behind gen_server for most uses; we have servers that follow an actor-pattern style of messaging, with the possibility of composing further effects on top.

I ran into a few hanging tests while building it, so built fuzz-tests to try simulate a wide array of messaging patterns in order to catch race conditions. It’s dug up plenty, especially relating to shutdown coordination.

Layer two: Zahir

I wanted to ditch all of the boilerplate and implementation-detail complexity from Zahir v1. So, I removed concepts like dynamic scope population (just give me a dictionary of functions), job and dependency classes (now, only generators), and the distinction between jobs and generators itself. Telemetry was externalised from the logic; we compose telemetry functions onto handlers rather than bake in additional eventing. I ditched the eighteen-state machine for tracking running / idle. It lead to much simpler workflow design with no real loss in expressive power; ultimately I just wanted to be able to schedule jobs, distribute them across processes, and keep track of execution.

The strict factoring of any interesting interprocess logic into effects and handler functions helped keep the core simple; that constrained everything into a series of single-purpose events vs mixed-purpose megaclasses. It also made it easy (though out of scope) to re-add the old SQLite storage-backend; I’d just need to swap that set of handlers. And, additional functionality could be layered outwards based on emitted effects, leading to the final layer

Layer three: Bookman

Zahir emits telemetry during execution but I wanted to decouple metric aggregation into its own layer. This part is still in development, but it’s somewhat lifted from algebird and structures aggregations as a combiner monoid and ways of inserting into and extracting from that type.

It’s worked reasonably well so far; many things can be represented in this way, and complex aggregates can be stitched together. But, there’s a lot of work needed for this concept to really prove itself. Managing progress bar statistics is not a very complex application of this structure & the associated combinators.

Results

Ultimately, the same as before. The layered architecture worked out as planned; separating each layer out made them more testable and easier to reason about individually, though with the usual rounds of debugging stuck tests or missing events. I can upload my photos efficiently without OOMing my laptop, and with a progress bar that tells fewer lies than the old one.

a progress bar

At the moment, Zahir is more of a concurrent runtime than a true workflow engine. I’m still trying to design a persistence mechanism that’s more general-purpose than pretending jobs are functionally pure or implementing EGetState / ESetState and delegating that task to workers themselves. But, perhaps that’s missing the moral of the compositional approach where we layer responsibilities rather than broaden them. In any case, I need to think of it again and again (Zahir really was the right name for this project…)

Takeaway Points