Persisting HITL payloads

Hello!

I’m trying to persist HITL payloads in the AIMessage additional_kwargs. To do so, I send() a channel update with the updated message to include it in a checkpoint save.

It works when I set durability="exit" but I then lose the progressive persistence. With either “async” (default) or “sync”, I get unreliable results, but most of the time, the last message in the checkpoint won’t contain the extra payload when I resume the conversation. From what I can read, “async” will only save when the next node runs. If a user reloads the conversation before approving/rejecting the HITL prompt, the state won’t be saved, right?

Is there a way to combine the benefits of “exit” and “async”? Or is my approach to retain pending interrupts fundamentally wrong?

Note: I’ve had a look at these related threads, but couldn’t find exactly what I’m looking for:

I’m attaching a minimal, self-contained test case. Replace durability="exit" with something else to make it fail.

from __future__ import annotations

from typing import Annotated, TypedDict

import pytest
from langchain_core.messages import AIMessage, HumanMessage
from langchain_core.runnables import RunnableConfig
from langgraph._internal._constants import CONFIG_KEY_SEND
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.config import get_config
from langgraph.constants import START
from langgraph.graph import StateGraph
from langgraph.graph.message import AnyMessage, add_messages
from langgraph.types import Command, interrupt


class State(TypedDict):
    messages: Annotated[list[AnyMessage], add_messages]


@pytest.mark.anyio
async def test_human_hitl_interrupt_checkpoint_behavior():
    """Minimal repro: persist AIMessage additional_kwargs before interrupt and after resume."""
    checkpointer = InMemorySaver()
    node_name = "pause_node"
    persisted_payload = "payload"

    def node(state: State) -> None:
        # Persist metadata onto the latest AI message before pausing.
        latest_ai_message = next(
            (msg for msg in reversed(state["messages"]) if isinstance(msg, AIMessage)),
            None,
        )
        assert latest_ai_message is not None

        latest_ai_message.additional_kwargs["hitl_payload"] = persisted_payload

        configurable = get_config().get("configurable", {})
        send = configurable.get(CONFIG_KEY_SEND)
        assert callable(send)
        send([("messages", [latest_ai_message])])
        interrupt("pause")
        return None

    graph = (
        StateGraph(State)
        .add_node(node_name, node)
        .add_edge(START, node_name)
        .compile(checkpointer=checkpointer)
    )

    config = RunnableConfig(
        configurable={"thread_id": "test-thread-interrupt-repro", "checkpoint_ns": ""}
    )

    result = graph.invoke(
        {
            "messages": [
                HumanMessage(content="Do something"),
                AIMessage(content="Request approval", id="assistant-1"),
            ]
        },
        config,
        durability="exit",
    )
    assert "__interrupt__" in result

    latest_before_resume = await anext(checkpointer.alist(config=config, limit=1), None)
    assert latest_before_resume is not None

    channel_values_before_resume = latest_before_resume.checkpoint.get(
        "channel_values", {}
    )
    messages_before_resume = channel_values_before_resume.get("messages", [])
    persisted_ai_before_resume = next(
        (msg for msg in reversed(messages_before_resume) if isinstance(msg, AIMessage)),
        None,
    )
    assert persisted_ai_before_resume is not None
    assert persisted_ai_before_resume.id == "assistant-1"
    assert (
        persisted_ai_before_resume.additional_kwargs.get("hitl_payload")
        == persisted_payload
    )

    graph.invoke(Command(resume="approved"), config, durability="exit")

    latest_after_resume = await anext(checkpointer.alist(config=config, limit=1), None)
    assert latest_after_resume is not None

    channel_values_after_resume = latest_after_resume.checkpoint.get(
        "channel_values", {}
    )
    messages_after_resume = channel_values_after_resume.get("messages", [])
    persisted_ai_after_resume = next(
        (msg for msg in reversed(messages_after_resume) if isinstance(msg, AIMessage)),
        None,
    )
    assert persisted_ai_after_resume is not None
    assert persisted_ai_after_resume.id == "assistant-1"
    assert (
        persisted_ai_after_resume.additional_kwargs.get("hitl_payload")
        == persisted_payload
    )

Hi @warrenseine ,

This actually makes sense :grinning_face_with_smiling_eyes: you’re not doing anything “wrong.” You’ve just run into how durability timing really works under the hood.

With async (default), the checkpoint doesn’t flush immediately. It usually waits for the next node execution. So if you:

send(...)
interrupt(...)

and the user refreshes before anything else runs… yeah, that update might never get saved. That’s why it feels unreliable.

exit works because it forces a save when the graph exits (including on interrupt). So it’s consistent — but you lose progressive persistence.

So you’re basically trying to get:

“Save this right now… but still behave async afterward.”

Totally reasonable ask — but that combo doesn’t really exist today.


The cleaner fix

Instead of mutating AIMessage.additional_kwargs, I’d honestly suggest storing the pending HITL data directly in graph state:

class State(TypedDict):
    messages: ...
    pending_hitl: dict | None

That way:

  • It’s first-class state

  • It checkpoints naturally

  • No dependency on send

  • No weird timing edge cases

Much less fragile.


If you really want to keep it on the message

Then you’ll need either:

  • durability="exit" at interrupt boundaries
    or

  • A small extra node before interrupt to force a checkpoint step

Otherwise async can always drop that last write if nothing else runs.


So no — your approach isn’t fundamentally wrong. It’s just brushing up against how checkpoint flushing works. For something like HITL (which must survive reloads), I’d go with explicit state. Much safer long term.

Hi @warrenseine if it’s solved, a huge favor - please mark this post as Solved for the others, so that they can get benefits from it :rocket:

Thanks Kaushal. This is also the answer I got from Gemini.

Unfortunately, this doesn’t work at all. There doesn’t seem to be a difference between saving in a message additional_kwargs or in another state key. The problem is that send() is asynchronous and unreliable.

I managed to mitigate the issue with a delayed state read in the original code, but we shouldn’t have to poll the graph to know if a state change was applied.

I think this is a design issue in LangGraph. “exit” shouldn’t be a durability option, it should be a separate boolean that applies the persistence according to durability on graph exit. I could see that it was actually implemented as such before: feat: Implement durability mode argument by nfcampos · Pull Request #5432 · langchain-ai/langgraph · GitHub. Unfortunately, I’m not familiar with LangGraph internals to recommend anything, but it smells like a regression to me.

Hi @warrenseine

afaik you are right that moving data to a different state key doesn’t help. the issue isn’t where the data lives, it’s when writes are presisted relative to interrupt().

It’s not that send() is async and ureliable. It’s that writes from an interrupted node are intentionally discarded by design.

When interrupt() raises GraphInterrupt inside your node, the runner’s commit() method deliberately creates a new writes list containing only INTERRUPT and RESUME channel data:

# langgraph/pregel/_runner.py, lines 436-442
if isinstance(exception, GraphInterrupt):
    if exception.args[0]:
        writes = [(INTERRUPT, exception.args[0])]
        if resumes := [w for w in task.writes if w[0] == RESUME]:
            writes.extend(resumes)
        self.put_writes()(task.id, writes)  # ← only INTERRUPT + RESUME

Everything your node wrote via CONFIG_KEY_SEND before the interrupt() call is still in task.writes, but commit() ignores it. Then put_writes() (langgraph/libs/langgraph/langgraph/pregel/_loop.py at main · langchain-ai/langgraph · GitHub) replaces all prior pending writes for that task:

# langgraph/pregel/_loop.py, lines 322-326
self.checkpoint_pending_writes = [
    w for w in self.checkpoint_pending_writes if w[0] != task_id
]

This happens regardless of durability mode - it’s not a timing issue, it’s a filtering issue. The writes never reach the checkpointer because they’re stripped out before any persistence logic runs.

IMHO this is intentional because of how interrupt() works: when you resume with Command(resume=...), the entire node re-executes from scratch (source). On the second run, interrupt() returns the resume value instead of raising. So any writes the node made before interrupt() would be duplicated if they were persisted - the node will produce them again on re-execution.

That’s why LangGraph discards them: the node didn’t complete, so its writes are treated as uncommitted.

You referenced PR #5432 and suggested "exit" should be a separate boolean. Looking at the implementation, durability="exit" does behave differently at the _suppress_interrupt() (langgraph/libs/langgraph/langgraph/pregel/_loop.py at main · langchain-ai/langgraph · GitHub) boundary - it calls _put_checkpoint() + _put_pending_writes() when the graph exits. However, even with "exit", the writes were already filtered in commit(), so the checkpoint doesn’t actually contain your send() data. If it appeared to work in your test, it’s likely due to a timing artifact with InMemorySaver rather than the data actually being persisted.

The fix isn’t about durability modes - it’s about restructuring your graph so that the state update is committed by a completed node, not one that gets interrupted mid-execution:

Option A - split the node:

def prepare_hitl(state: State) -> dict:
    """Runs to completion → writes are committed in after_tick()."""
    last_msg = state["messages"][-1]
    updated = AIMessage(
        content=last_msg.content,
        id=last_msg.id,
        additional_kwargs={**last_msg.additional_kwargs, "hitl_payload": "payload"},
    )
    return {"messages": [updated]}

def pause_node(state: State) -> dict:
    """Separate node for the interrupt."""
    answer = interrupt("Review and approve/reject")
    return {"messages": [HumanMessage(content=answer)]}

builder = StateGraph(State)
builder.add_node("prepare_hitl", prepare_hitl)
builder.add_node("pause_node", pause_node)
builder.add_edge(START, "prepare_hitl")
builder.add_edge("prepare_hitl", "pause_node")
builder.add_edge("pause_node", END)
graph = builder.compile(checkpointer=InMemorySaver())

prepare_hitl completes normally → after_tick() calls apply_writes()_put_checkpoint() persists the updated message. Then pause_node runs and interrupts. The message update is already safely checkpointed at this point, with any durability mode.

Option B - use interrupt_before on the next node:

graph = builder.compile(
    checkpointer=InMemorySaver(),
    interrupt_before=["pause_node"],
)

With interrupt_before, the checkpoint is saved at the end of after_tick() with all completed writes, and then the interrupt fires in the next tick() before the node starts. Same result - your data is persisted before the interrupt.

Option C - use update_state() while paused:

If you need to attach metadata after the interrupt fires (e.g., from the client side):

snapshot = graph.get_state(config)
graph.update_state(config, {"messages": [updated_message]})
graph.invoke(Command(resume="approved"), config)

Thank you Pawel. Indeed, I misused “unreliable”. More like “not a good fit here”.

I do see some flakiness in a live environment, meaning that send() occasionally works from the interruption node. Code seems pretty deterministic though, so I’m not sure what causes it. In any case, whether it’s flaky or deterministic, it won’t solve my persistence issue.

Regarding the proposed options:

  • Option A sounds ideal, but I’m in a deep agent context so adding intermediate nodes is not trivial
  • Option B looks like an API variation of A
  • Option C works, but it basically means persisting outside of LangGraph checkpoints (be it on the client or a separate database)

I guess C is the only reliable option, so I’ll probably store the HITL payload in a database indexed by message IDs.