Subgraph checkpointing

I’ve analyzed deeper and the result differs a bit from what I was thinking for a long time. Maybe something changed over time.

Having this code:

import os

from dotenv import load_dotenv
from typing_extensions import TypedDict

from langchain_openai import ChatOpenAI
from langchain_core.runnables import RunnableConfig
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.graph import StateGraph, START, END
from langgraph.types import interrupt, Command

load_dotenv()

POSTGRES_URI = os.getenv("POSTGRES_URI")

class SubgraphState(TypedDict):
    prompt: str
    answer: str


class ParentState(TypedDict):
    response: str


def subgraph_interrupt(state: SubgraphState) -> SubgraphState:
    prompt = interrupt("What is your name?:")
    return {"prompt": prompt}


def subgraph_llm(state: SubgraphState) -> SubgraphState:
    model = ChatOpenAI(model=os.getenv("OPENAI_MODEL", "gpt-4o-mini"))
    response = model.invoke(state["prompt"])
    return {"answer": response.content}


sub_builder = StateGraph(SubgraphState)
sub_builder.add_node("sub_interrupt", subgraph_interrupt)
sub_builder.add_node("sub_llm", subgraph_llm)
sub_builder.add_edge(START, "sub_interrupt")
sub_builder.add_edge("sub_interrupt", "sub_llm")
sub_builder.add_edge("sub_llm", END)
subgraph = sub_builder.compile()


def run_subgraph_node(state: ParentState, config: RunnableConfig) -> ParentState:
    # print("Running subgraph config:", config)
    sub_config = {"configurable": {"thread_id": config.get("metadata").get("thread_id")}}
    # print("Running subgraph sub_config:", sub_config)
    # print("Running subgraph subgraph:", subgraph)
    # sub_result = subgraph.invoke({}, config)
    # sub_result = subgraph.invoke({}, sub_config)
    sub_result = subgraph.invoke({})
    return {"response": sub_result.get("answer", "")}


parent_builder = StateGraph(ParentState)
parent_builder.add_node("run_subgraph", run_subgraph_node)
parent_builder.add_edge(START, "run_subgraph")
parent_builder.add_edge("run_subgraph", END)


def main() -> None:
    thread_id = input("Thread id (default: thread-1): ").strip() or "thread-1"
    config = {"configurable": {"thread_id": thread_id}}

    with PostgresSaver.from_conn_string(POSTGRES_URI) if POSTGRES_URI else SqliteSaver.from_conn_string("subgraph_interrupt_postgres_sketch.sqlite") as checkpointer:
        checkpointer.setup()
        parent_graph = parent_builder.compile(checkpointer=checkpointer)

        result = parent_graph.invoke({}, config)
        while True:
            state = parent_graph.get_state(config, subgraphs=True)
            # print("The state:", state)
            pending = [task for task in state.tasks if getattr(task, 'interrupts', None)]
            if not pending:
                print("Final result with no interrupt:", result)
                break

            print("Pending tasks number:", len(pending))

            if len(pending) > 1 and all(hasattr(task, "id") for task in pending):
                resume_map = {}
                for task in pending:
                    for interrupt_task in task.interrupts:
                        prompt = getattr(interrupt_task, "value", None) or getattr(
                            interrupt_task, "message", None
                        )
                        prompt = prompt or "Provide resume value:"
                        resume_map[task.id] = input(f"{prompt} ")
                result = parent_graph.invoke(Command(resume=resume_map), config)
                continue

            task = pending[0]
            interrupt_task = task.interrupts[0]
            prompt = getattr(interrupt_task, "value", None) or getattr(
                interrupt_task, "message", None
            )
            prompt = prompt or "Provide resume value:"
            user_value = input(f"{prompt} ")
            result = parent_graph.invoke(Command(resume=user_value), config)

    print("Final result:", result)


if __name__ == "__main__":
    main()

In run_subgraph_node I compile the subgraph with no checkpointer and I invoke it with no config:

sub_builder = StateGraph(SubgraphState)
sub_builder.add_node("sub_interrupt", subgraph_interrupt)
sub_builder.add_node("sub_llm", subgraph_llm)
sub_builder.add_edge(START, "sub_interrupt")
sub_builder.add_edge("sub_interrupt", "sub_llm")
sub_builder.add_edge("sub_llm", END)
subgraph = sub_builder.compile()

def run_subgraph_node(state: ParentState, config: RunnableConfig) -> ParentState:
    # ...
    sub_result = subgraph.invoke({})
    return {"response": sub_result.get("answer", "")}

But the parent graph is compiled with a Postgres checkpointer:

with PostgresSaver.from_conn_string(dsn) as checkpointer:
    checkpointer.setup()
    parent_graph = parent_builder.compile(checkpointer=checkpointer)

    result = parent_graph.invoke({}, config)

LangGraph explicitly documents that checkpointer=None means “may inherit parent checkpointer when used as a subgraph”:

If `None`, it may inherit the parent graph's checkpointer when used as a subgraph.

If `False`, it will not use or inherit any checkpointer.

The two hidden mechanisms that make the inheritance happen

1) LangGraph injects the parent checkpointer + a task namespace into the node’s runnable config

When the parent pregel loop prepares the executable task for node run_subgraph, it computes a task checkpoint namespace and patches the node’s runnable config with:

  • __pregel_checkpointer = the parent checkpointer
  • checkpoint_ns = "run_subgraph:<task_id>"
checkpoint_ns = f"{parent_ns}{NS_SEP}{name}" if parent_ns else name
# ...
task_checkpoint_ns = f"{checkpoint_ns}{NS_END}{task_id}"
# ...
configurable={
    # ...
    CONFIG_KEY_CHECKPOINTER: (
        checkpointer
        or configurable.get(CONFIG_KEY_CHECKPOINTER)
    ),
    # ...
    CONFIG_KEY_CHECKPOINT_NS: task_checkpoint_ns,
    # ...
},

Those keys are:

  • CONFIG_KEY_CHECKPOINTER = "__pregel_checkpointer"
  • CONFIG_KEY_CHECKPOINT_NS = "checkpoint_ns"
  • NS_END = ":" (this is exactly the delimiter you see in run_subgraph:<task_id>)
CONFIG_KEY_CHECKPOINTER = sys.intern("__pregel_checkpointer")
CONFIG_KEY_CHECKPOINT_NS = sys.intern("checkpoint_ns")
NS_SEP = sys.intern("|")
NS_END = sys.intern(":")
CONF = cast(Literal["configurable"], sys.intern("configurable"))

2) LangChain “Runnable config propagation” (ContextVar) makes subgraph.invoke({}) inherit that config anyway

When LangGraph executes run_subgraph_node, it executes it inside a LangChain Runnable context that sets a context variable (var_child_runnable_config) so downstream calls inherit config without explicit passing:

child_config = patch_config(config, callbacks=run_manager.get_child())
with set_config_context(child_config) as context:
    output = cast(
        "Output",
        context.run(
            call_func_with_variable_args,
            func,
            input_,
            config,
            run_manager,
            **kwargs,
        ),
    )

And set_config_context ultimately does:

var_child_runnable_config: ContextVar[RunnableConfig | None] = ContextVar(
    "child_runnable_config", default=None
)
# ...
config_token = var_child_runnable_config.set(config)

Then, when your subgraph.invoke({}) starts, the subgraph’s Pregel.stream() merges configs using LangGraph’s ensure_config, which pulls from that context var:

if var_config := var_child_runnable_config.get():
    empty.update(
        { ... for k, v in var_config.items() if _is_not_empty(v) }
    )

And Pregel.stream() calls it even if you pass no config:

config = ensure_config(self.config, config)

The exact “why it writes to Postgres” decision in LangGraph

Inside _defaults(), a graph chooses its checkpointer like this:

if self.checkpointer is False:
    checkpointer: BaseCheckpointSaver | None = None
elif CONFIG_KEY_CHECKPOINTER in config.get(CONF, {}):
    checkpointer = config[CONF][CONFIG_KEY_CHECKPOINTER]
# ...
else:
    checkpointer = self.checkpointer

So our subgraph (compiled with checkpointer=None) sees config["configurable"]["__pregel_checkpointer"] (inherited from the parent task context) and uses it → it’s the same PostgresSaver.


Wrapping up

There is no need to pass a checkpointer to any subgraph since it’s automatically inherited from the parent task/graph and it works for both - a subgraph as a node and a subgraph invoked inside a node