Self-healing computation: building resilient financial computational services

May 10, 2025

Introduction

Designing a system to compute financial metrics often involves modeling each metric as a node within a computational graph. Each node corresponds to a specific financial measure, such as returns or market exposures. Whenever a node’s data changes or becomes invalid, the entire downstream portion of the graph must be recomputed to maintain consistency and correctness. The complexity of this challenge increases substantially when scaling up to support thousands of accounts, each with independent computational graphs that require concurrent processing and robust fault handling.

Selecting the appropriate technology to address this type of problem is important. Key requirements include strong concurrency handling, effective fault isolation to prevent cascading failures, accurate state tracking, and precise management of dependencies. Several technologies stand out as viable candidates, notably Elixir, Scala with Akka, Clojure, and Python with Airflow.

Airflow initially seems attractive due to its familiar tooling, mature DAG orchestration capabilities, and extensive ecosystem of libraries for finance and data science. Airflow simplifies workflow scheduling, monitoring, and task retries, enabling rapid prototyping and development. However, achieving fine-grained fault isolation and high concurrency at scale is challenging within Airflow’s scheduling and execution model, making it less suitable for detailed per-account, real-time computations.

In fact, upon closer examination, Airflow excels in orchestrating high-level workflows and scheduled task dependencies but quickly becomes overly rigid and heavy for per-account fine-grained graph updates. Incremental, stateful recomputation of nodes and rapid propagation of updates (“dirty states”) across many accounts do not align well with Airflow’s batch-oriented approach. The scheduling and execution overhead associated with Airflow makes it impractical for granular, real-time updates at the required scale.

For detailed, per-account, near-real-time dependency management, lightweight actor-based or process-oriented frameworks such as Elixir, Scala with Akka, or Clojure are more suitable. These technologies facilitate fine-grained concurrency, robust fault isolation, and incremental state management—areas where Airflow is notably limited.

Python implementation

Let’s implement a simple version of our computational graph using Redis as a backing store. This implementation will track dependencies between nodes, manage dirty states, and handle value updates. We’ll use Redis to store node values, track dirty states, and maintain version information for each node-account pair. The code below shows a basic implementation that can be extended with more sophisticated features like caching, batching, and error handling.

import uuid
import redis
from collections import defaultdict, deque
from typing import Callable, Dict, Set, Any

# decode_responses=True -> str output
r = redis.Redis(host='localhost', port=6379, decode_responses=True)  
class RedisBackedGraph:
    def __init__(self):
        self.nodes: Dict[str, Callable] = {}
        self.dependencies: Dict[str, Set[str]] = {}
        self.dependents: Dict[str, Set[str]] = defaultdict(set)

    def add_node(self, name: str, compute_fn: Callable, deps: Set[str]):
        self.nodes[name] = compute_fn
        self.dependencies[name] = deps
        for d in deps:
            self.dependents[d].add(name)

    def mark_dirty(self, node_name: str, account_id: str):
        queue = deque([node_name])
        visited = set()

        while queue:
            node = queue.popleft()
            if (node, account_id) in visited:
                continue
            visited.add((node, account_id))
            r.set(f"dirty:{node}:{account_id}", "1")
            for child in self.dependents.get(node, []):
                queue.append(child)

    def is_dirty(self, node: str, account_id: str) -> bool:
        return r.get(f"dirty:{node}:{account_id}") == "1"

    def set_value(self, node: str, account_id: str, value: Any):
        r.set(f"value:{node}:{account_id}", value)
        r.set(f"version:{node}:{account_id}", str(uuid.uuid4()))
        r.delete(f"dirty:{node}:{account_id}")

    def get_value(self, node: str, account_id: str) -> Any:
        return float(r.get(f"value:{node}:{account_id}"))

    def compute(self, account_id: str):
        for node in self.topo_sort():
            if not self.is_dirty(node, account_id):
                continue
            deps = self.dependencies[node]
            if any(self.is_dirty(dep, account_id) for dep in deps):
                continue  # ensure dependencies are clean

            inputs = {dep: self.get_value(dep, account_id) for dep in deps}
            val = self.nodes[node](inputs)
            self.set_value(node, account_id, val)

    def topo_sort(self):
        indegree = {k: 0 for k in self.nodes}
        for k, deps in self.dependencies.items():
            for d in deps:
                indegree[k] += 1

        queue = deque([k for k, v in indegree.items() if v == 0])
        order = []

        while queue:
            node = queue.popleft()
            order.append(node)
            for dep in self.dependents.get(node, []):
                indegree[dep] -= 1
                if indegree[dep] == 0:
                    queue.append(dep)
        return order
g = RedisBackedGraph()

# Add nodes
g.add_node("capital_gains", lambda _: 150.0, deps=set())
g.add_node("income", lambda _: 50.0, deps=set())
g.add_node("return", lambda d: d["capital_gains"] + d["income"], 
    deps={"capital_gains", "income"})
g.add_node("tax_cost", lambda d: d["return"] * 0.25, deps={"return"})

account_id = "acct_XYZ"

# Simulate capital gains update
g.mark_dirty("capital_gains", account_id)
g.compute(account_id)

# Check result
print("Return:", g.get_value("return", account_id))
print("Tax cost:", g.get_value("tax_cost", account_id))

Another toy example with postgres backend.

CREATE TABLE node_state (
    account_id TEXT,
    node_name TEXT,
    value DOUBLE PRECISION,
    version UUID,
    is_dirty BOOLEAN DEFAULT TRUE,
    PRIMARY KEY (account_id, node_name)
);

import uuid
import psycopg2
from collections import defaultdict, deque
from typing import Callable, Dict, Set, Any

conn = psycopg2.connect("dbname=yourdb user=youruser password=yourpass")
conn.autocommit = True
class PostgresBackedGraph:
    def __init__(self, conn):
        self.conn = conn
        self.nodes: Dict[str, Callable] = {}
        self.dependencies: Dict[str, Set[str]] = {}
        self.dependents: Dict[str, Set[str]] = defaultdict(set)

    def add_node(self, name: str, compute_fn: Callable, deps: Set[str]):
        self.nodes[name] = compute_fn
        self.dependencies[name] = deps
        for d in deps:
            self.dependents[d].add(name)

    def mark_dirty(self, node_name: str, account_id: str):
        visited = set()
        queue = deque([node_name])

        while queue:
            node = queue.popleft()
            if (node, account_id) in visited:
                continue
            visited.add((node, account_id))
            with self.conn.cursor() as cur:
                cur.execute("""
                    INSERT INTO node_state (account_id, node_name, is_dirty)
                    VALUES (%s, %s, TRUE)
                    ON CONFLICT (account_id, node_name) DO UPDATE
                    SET is_dirty = TRUE
                """, (account_id, node))
            queue.extend(self.dependents.get(node, []))

    def is_dirty(self, node: str, account_id: str) -> bool:
        with self.conn.cursor() as cur:
            cur.execute("""
                SELECT is_dirty FROM node_state
                WHERE account_id = %s AND node_name = %s
            """, (account_id, node))
            res = cur.fetchone()
            return res and res[0]

    def get_value(self, node: str, account_id: str) -> float:
        with self.conn.cursor() as cur:
            cur.execute("""
                SELECT value FROM node_state
                WHERE account_id = %s AND node_name = %s
            """, (account_id, node))
            return cur.fetchone()[0]

    def set_value(self, node: str, account_id: str, value: float):
        with self.conn.cursor() as cur:
            cur.execute("""
                INSERT INTO node_state (account_id, node_name, value, version, is_dirty)
                VALUES (%s, %s, %s, %s, FALSE)
                ON CONFLICT (account_id, node_name) DO UPDATE
                SET value = EXCLUDED.value, version = EXCLUDED.version, is_dirty = FALSE
            """, (account_id, node, value, str(uuid.uuid4())))

    def compute(self, account_id: str):
        for node in self.topo_sort():
            if not self.is_dirty(node, account_id):
                continue
            if any(self.is_dirty(dep, account_id) for dep in self.dependencies[node]):
                continue

            inputs = {dep: self.get_value(dep, account_id) 
                for dep in self.dependencies[node]}
            val = self.nodes[node](inputs)
            self.set_value(node, account_id, val)

    def topo_sort(self):
        indegree = {k: 0 for k in self.nodes}
        for k, deps in self.dependencies.items():
            for d in deps:
                indegree[k] += 1
        queue = deque([k for k, v in indegree.items() if v == 0])
        order = []
        while queue:
            node = queue.popleft()
            order.append(node)
            for child in self.dependents.get(node, []):
                indegree[child] -= 1
                if indegree[child] == 0:
                    queue.append(child)
        return order
g = PostgresBackedGraph(conn)

# Graph structure
g.add_node("capital_gains", lambda _: 200.0, deps=set())
g.add_node("income", lambda _: 80.0, deps=set())
g.add_node("return", lambda d: d["capital_gains"] + d["income"], 
    deps={"capital_gains", "income"})
g.add_node("tax_cost", lambda d: d["return"] * 0.3, deps={"return"})

account_id = "acct_456"
g.mark_dirty("capital_gains", account_id)
g.compute(account_id)

print("Return:", g.get_value("return", account_id))
print("Tax cost:", g.get_value("tax_cost", account_id))

Elixir and Clojure

Elixir is appealing due to its native support for concurrency, fault tolerance, and lightweight processes. Its supervision trees and isolated processes provide excellent fault-handling capabilities ideal for systems requiring continuous availability. Scala with Akka is another strong candidate, offering a mature actor model, strong static typing, and close integration with JVM-based enterprise environments, making it particularly suitable for large-scale, complex applications. Clojure offers functional programming elegance, immutable data structures, and an interactive, REPL-driven development experience, allowing for expressive and maintainable graph computations.

Clojure example

(ns dag.core
  (:require [clojure.set :as set]))

(def nodes
  {:equity_start_value {:value 100000}
   :equity_end_value   {:value 112000}
   :equity_net_transfer {:value 5000}
   :equity_return_pre
   {:deps [:equity_start_value :equity_end_value :equity_net_transfer]
    :fn (fn [{:keys [equity_start_value equity_end_value equity_net_transfer]}]
          (/ (- equity_end_value equity_start_value equity_net_transfer)
             equity_start_value))}})

(defn topo-sort [graph]
  (letfn [(visit [n visited sorted]
            (if (visited n)
              [visited sorted]
              (let [deps (get-in graph [n :deps] [])]
                (reduce (fn [[v s] d] (visit d v s))
                        [(conj visited n) (conj sorted n)]
                        deps))))]
    (second (reduce (fn [[v s] n] (visit n v s))
                    [#{} []]
                    (keys graph)))))

(defn compute [graph]
  (reduce
   (fn [acc node]
     (let [{:keys [deps fn value]} (get graph node)]
       (if value
         (assoc-in acc [node :computed] value)
         (let [input (into {} (map (fn [d] [d (get-in acc [d :computed])]) deps))]
           (assoc-in acc [node :computed] (fn input))))))
   graph
   (topo-sort graph)))
(def computed (compute nodes))
(println "Equity Return Pre:" (get-in computed [:equity_return_pre :computed]))

Elixir example

defmodule GraphNode do
  use GenServer

  def start_link({name, compute_fn, deps}) do
    opts = [name: via_tuple(name)]
    GenServer.start_link(__MODULE__, {name, compute_fn, deps}, opts)
  end

  def via_tuple(name) do
    {:via, Registry, {:node_registry, name}}
  end

  def mark_dirty(name) do
    GenServer.cast(via_tuple(name), :mark_dirty)
  end

  def get_value(name) do
    GenServer.call(via_tuple(name), :get_value)
  end

  def recompute(name) do
    GenServer.cast(via_tuple(name), :recompute)
  end

  def init({name, compute_fn, deps}) do
    state = %{
      name: name,
      deps: deps,
      compute_fn: compute_fn,
      value: nil,
      version: nil,
      dirty: true
    }
    {:ok, state}
  end

  def handle_cast(:mark_dirty, state) do
    new_state = %{state | dirty: true}
    {:noreply, new_state}
  end

  def handle_call(:get_value, _from, state) do
    result = {state.value, state.version}
    {:reply, result, state}
  end

  def handle_cast(:recompute, %{dirty: false} = state) do
    {:noreply, state}
  end

  def handle_cast(:recompute, state) do
    dep_vals = Enum.map(state.deps, &get_dep_value/1)
    new_value = apply(state.compute_fn, dep_vals)
    new_version = UUID.uuid4()
    new_state = %{state | value: new_value, version: new_version, dirty: false}
    {:noreply, new_state}
  end

  defp get_dep_value(dep) do
    {val, _version} = GraphNode.get_value(dep)
    val
  end
end
defmodule GraphSupervisor do
  use Supervisor

  def start_link(_) do
    Supervisor.start_link(__MODULE__, :ok, name: __MODULE__)
  end

  def init(:ok) do
    children = [
      {Registry, keys: :unique, name: :node_registry}
    ]

    Supervisor.init(children, strategy: :one_for_one)
  end
end
defmodule Graph do
  def build do
    GraphSupervisor.start_link(:ok)

    # Example:
    # return = (end - start - transfer) / start

    GraphNode.start_link({"start", fn -> 100_000 end, []})
    GraphNode.start_link({"end", fn -> 112_000 end, []})
    GraphNode.start_link({"transfer", fn -> 5_000 end, []})

    GraphNode.start_link({
      "return",
      fn start, end_, transfer -> (end_ - start - transfer) / start end,
      ["start", "end", "transfer"]
    })

    :ok
  end

  def run do
    # mark dirty and recompute
    GraphNode.mark_dirty("return")
    GraphNode.recompute("return")

    {:ok, value} = Task.async(fn -> GraphNode.get_value("return") end) |> Task.await()
    IO.inspect(value, label: "Computed return")
  end
end
defp deps do
  [
    {:uuid, "~> 1.1"},
    {:elixir_uuid, "~> 1.2"},  # optional alternative
  ]
end
# Start the Elixir application
mix deps.get
iex -S mix
Graph.build()
Graph.run()

← Back to all posts