I’ve analyzed deeper and the result differs a bit from what I was thinking for a long time. Maybe something changed over time.
Having this code:
import os
from dotenv import load_dotenv
from typing_extensions import TypedDict
from langchain_openai import ChatOpenAI
from langchain_core.runnables import RunnableConfig
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.graph import StateGraph, START, END
from langgraph.types import interrupt, Command
load_dotenv()
POSTGRES_URI = os.getenv("POSTGRES_URI")
class SubgraphState(TypedDict):
prompt: str
answer: str
class ParentState(TypedDict):
response: str
def subgraph_interrupt(state: SubgraphState) -> SubgraphState:
prompt = interrupt("What is your name?:")
return {"prompt": prompt}
def subgraph_llm(state: SubgraphState) -> SubgraphState:
model = ChatOpenAI(model=os.getenv("OPENAI_MODEL", "gpt-4o-mini"))
response = model.invoke(state["prompt"])
return {"answer": response.content}
sub_builder = StateGraph(SubgraphState)
sub_builder.add_node("sub_interrupt", subgraph_interrupt)
sub_builder.add_node("sub_llm", subgraph_llm)
sub_builder.add_edge(START, "sub_interrupt")
sub_builder.add_edge("sub_interrupt", "sub_llm")
sub_builder.add_edge("sub_llm", END)
subgraph = sub_builder.compile()
def run_subgraph_node(state: ParentState, config: RunnableConfig) -> ParentState:
# print("Running subgraph config:", config)
sub_config = {"configurable": {"thread_id": config.get("metadata").get("thread_id")}}
# print("Running subgraph sub_config:", sub_config)
# print("Running subgraph subgraph:", subgraph)
# sub_result = subgraph.invoke({}, config)
# sub_result = subgraph.invoke({}, sub_config)
sub_result = subgraph.invoke({})
return {"response": sub_result.get("answer", "")}
parent_builder = StateGraph(ParentState)
parent_builder.add_node("run_subgraph", run_subgraph_node)
parent_builder.add_edge(START, "run_subgraph")
parent_builder.add_edge("run_subgraph", END)
def main() -> None:
thread_id = input("Thread id (default: thread-1): ").strip() or "thread-1"
config = {"configurable": {"thread_id": thread_id}}
with PostgresSaver.from_conn_string(POSTGRES_URI) if POSTGRES_URI else SqliteSaver.from_conn_string("subgraph_interrupt_postgres_sketch.sqlite") as checkpointer:
checkpointer.setup()
parent_graph = parent_builder.compile(checkpointer=checkpointer)
result = parent_graph.invoke({}, config)
while True:
state = parent_graph.get_state(config, subgraphs=True)
# print("The state:", state)
pending = [task for task in state.tasks if getattr(task, 'interrupts', None)]
if not pending:
print("Final result with no interrupt:", result)
break
print("Pending tasks number:", len(pending))
if len(pending) > 1 and all(hasattr(task, "id") for task in pending):
resume_map = {}
for task in pending:
for interrupt_task in task.interrupts:
prompt = getattr(interrupt_task, "value", None) or getattr(
interrupt_task, "message", None
)
prompt = prompt or "Provide resume value:"
resume_map[task.id] = input(f"{prompt} ")
result = parent_graph.invoke(Command(resume=resume_map), config)
continue
task = pending[0]
interrupt_task = task.interrupts[0]
prompt = getattr(interrupt_task, "value", None) or getattr(
interrupt_task, "message", None
)
prompt = prompt or "Provide resume value:"
user_value = input(f"{prompt} ")
result = parent_graph.invoke(Command(resume=user_value), config)
print("Final result:", result)
if __name__ == "__main__":
main()
In run_subgraph_node I compile the subgraph with no checkpointer and I invoke it with no config:
sub_builder = StateGraph(SubgraphState)
sub_builder.add_node("sub_interrupt", subgraph_interrupt)
sub_builder.add_node("sub_llm", subgraph_llm)
sub_builder.add_edge(START, "sub_interrupt")
sub_builder.add_edge("sub_interrupt", "sub_llm")
sub_builder.add_edge("sub_llm", END)
subgraph = sub_builder.compile()
def run_subgraph_node(state: ParentState, config: RunnableConfig) -> ParentState:
# ...
sub_result = subgraph.invoke({})
return {"response": sub_result.get("answer", "")}
But the parent graph is compiled with a Postgres checkpointer:
with PostgresSaver.from_conn_string(dsn) as checkpointer:
checkpointer.setup()
parent_graph = parent_builder.compile(checkpointer=checkpointer)
result = parent_graph.invoke({}, config)
LangGraph explicitly documents that checkpointer=None means “may inherit parent checkpointer when used as a subgraph”:
If `None`, it may inherit the parent graph's checkpointer when used as a subgraph.
If `False`, it will not use or inherit any checkpointer.
The two hidden mechanisms that make the inheritance happen
1) LangGraph injects the parent checkpointer + a task namespace into the node’s runnable config
When the parent pregel loop prepares the executable task for node run_subgraph, it computes a task checkpoint namespace and patches the node’s runnable config with:
__pregel_checkpointer = the parent checkpointer
checkpoint_ns = "run_subgraph:<task_id>"
checkpoint_ns = f"{parent_ns}{NS_SEP}{name}" if parent_ns else name
# ...
task_checkpoint_ns = f"{checkpoint_ns}{NS_END}{task_id}"
# ...
configurable={
# ...
CONFIG_KEY_CHECKPOINTER: (
checkpointer
or configurable.get(CONFIG_KEY_CHECKPOINTER)
),
# ...
CONFIG_KEY_CHECKPOINT_NS: task_checkpoint_ns,
# ...
},
Those keys are:
CONFIG_KEY_CHECKPOINTER = "__pregel_checkpointer"
CONFIG_KEY_CHECKPOINT_NS = "checkpoint_ns"
NS_END = ":" (this is exactly the delimiter you see in run_subgraph:<task_id>)
CONFIG_KEY_CHECKPOINTER = sys.intern("__pregel_checkpointer")
CONFIG_KEY_CHECKPOINT_NS = sys.intern("checkpoint_ns")
NS_SEP = sys.intern("|")
NS_END = sys.intern(":")
CONF = cast(Literal["configurable"], sys.intern("configurable"))
2) LangChain “Runnable config propagation” (ContextVar) makes subgraph.invoke({}) inherit that config anyway
When LangGraph executes run_subgraph_node, it executes it inside a LangChain Runnable context that sets a context variable (var_child_runnable_config) so downstream calls inherit config without explicit passing:
child_config = patch_config(config, callbacks=run_manager.get_child())
with set_config_context(child_config) as context:
output = cast(
"Output",
context.run(
call_func_with_variable_args,
func,
input_,
config,
run_manager,
**kwargs,
),
)
And set_config_context ultimately does:
var_child_runnable_config: ContextVar[RunnableConfig | None] = ContextVar(
"child_runnable_config", default=None
)
# ...
config_token = var_child_runnable_config.set(config)
Then, when your subgraph.invoke({}) starts, the subgraph’s Pregel.stream() merges configs using LangGraph’s ensure_config, which pulls from that context var:
if var_config := var_child_runnable_config.get():
empty.update(
{ ... for k, v in var_config.items() if _is_not_empty(v) }
)
And Pregel.stream() calls it even if you pass no config:
config = ensure_config(self.config, config)
The exact “why it writes to Postgres” decision in LangGraph
Inside _defaults(), a graph chooses its checkpointer like this:
if self.checkpointer is False:
checkpointer: BaseCheckpointSaver | None = None
elif CONFIG_KEY_CHECKPOINTER in config.get(CONF, {}):
checkpointer = config[CONF][CONFIG_KEY_CHECKPOINTER]
# ...
else:
checkpointer = self.checkpointer
So our subgraph (compiled with checkpointer=None) sees config["configurable"]["__pregel_checkpointer"] (inherited from the parent task context) and uses it → it’s the same PostgresSaver.
Wrapping up
There is no need to pass a checkpointer to any subgraph since it’s automatically inherited from the parent task/graph and it works for both - a subgraph as a node and a subgraph invoked inside a node