How to update graph state while preserving interrupts?

Hey folks! Is there a way to use aupdate_state in LangGraph without potentially blowing away state like human-in-the-loop interrupts? We’re doing a fairly simple update where we mutate messages as part of a migration:

aupdate_state(values={"messages": messages})

We’re noticing that when we do this, things like interrupts are blown away. The workarounds we’ve come up with are all pretty ugly and involve us reaching into checkpoint/checkpoint_blobs/checkpoint_write tables which very much feels like an anti-pattern.

Hi @mike-elvex

could you share some code? Working example (with the issue) would be great :slight_smile:

I quickly put together something like this:

import asyncio
import uuid
from operator import add
from typing import Annotated

from dotenv import load_dotenv
from typing_extensions import TypedDict

from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import interrupt, Command
from langchain.chat_models import init_chat_model

load_dotenv()

# 1) State with reducer: messages are MERGED (not overwritten)
class State(TypedDict):
    messages: Annotated[list[dict], add]


# 2) Anthropic model (Claude 3.7 Sonnet)
llm = init_chat_model(
    "anthropic:claude-3-7-sonnet-latest",
    temperature=0,
)


# 3) Graph nodes
def generate_reply(state: State) -> State:
    user_text = "Write a short response in English (1 sentence)."
    ai = llm.invoke(user_text)
    return {
        "messages": [{"role": "assistant", "content": ai.content}]
    }

def human_review(state: State) -> State:
    # We pause execution and ask the human for a decision/edit
    value = interrupt({
        "prompt": "Edit/accept the assistant's response.",
        "assistant_reply": state["messages"][-1]["content"],
        "hint": "Return a string text (e.g., 'OK' or an edited version)."
    })
    # After resuming, the returned 'value' goes into the state
    # The 'add' reducer will append the message, not overwrite
    return {
        "messages": [{"role": "user", "content": str(value)}]
    }


# 4) Build the graph
builder = StateGraph(State)
builder.add_node("generate_reply", generate_reply)
builder.add_node("human_review", human_review)
builder.add_edge(START, "generate_reply")
builder.add_edge("generate_reply", "human_review")
builder.add_edge("human_review", END)

checkpointer = InMemorySaver()  # required persistence for interrupt
graph = builder.compile(checkpointer=checkpointer)

async def main():
    # 5) Run until the interrupt
    config = {"configurable": {"thread_id": str(uuid.uuid4())}}
    result = await graph.ainvoke({"messages": []}, config=config)
    # The paused state contains information about the interrupt
    print("Paused at interrupt:", result.get("__interrupt__"))

    # 6) AT THIS POINT, update state asynchronously (aupdate_state),
    #    so that we DO NOT disrupt the interrupt. Key points:
    #    - use reducers (messages: add), so nothing is overwritten
    #    - update the checkpoint selected by snapshot.config (fork)
    snapshot = graph.get_state(config)
    print("Before aupdate_state, messages:", snapshot.values.get("messages", []))

    # append a migration note to messages (without disturbing the interrupt)
    new_config = await graph.aupdate_state(
        snapshot.config,
        values={"messages": [{"role": "system", "content": "[migration] message normalization"}]},
    )

    # 7) Resume with Command(resume=...), the interrupt process remains consistent
    resumed = await graph.ainvoke(
        Command(resume="OK"),  # or e.g., "This is edited content"
        config=new_config       # resume from the new checkpoint after aupdate_state
    )

    print("\nAfter resuming:")
    for i, m in enumerate(resumed["messages"], 1):
        print(f"{i}. {m['role']}: {m['content']}")


if __name__ == "__main__":
    asyncio.run(main())

Seems like it does the job.

Editing state via time-travel rather than raw replacement should also work

imho LangGraph’s persistence is designed to be manipulated via the APIs. Direct DB edits to checkpoint tables risk corrupting lineage and losing interrupts.

Hi @pawel-twardziak

Thanks for your response. Ive had a look at your code snippets, and while similar to what we current do, one important detail is that we are attempting to do a replacement of the messages. Let me give you more context.

As Mike mentioned we are doing a migration, because the signature of a key tool has changed slightly.

So we created a migration script that will read our current messages for conversations, and modify the messages.

The high level operations we are doing on the messages is the following:

  • Modify tool call args for that specific tool
  • Find the relevant tool result and modify the result content

The rest of the messages are left as is, and the messages order and parent-child relationships do not change.

In our script we are doing the following:

Where lc_messages is the full messages history that has been selectively modified. We use the AsyncShallowPostgresSaver and our reducer is also simple.

async with AsyncShallowPostgresSaver.from_session(session) as checkpointer:
    config = await ctx.get_config(thread_id=thread_id)
    graph_builder = await ctx.get_react_agent()
    graph = graph_builder.compile(checkpointer=checkpointer)
    snapshot = await graph.aget_state(config)
    await graph.aupdate_state(
        config=snapshot.config,
        values={"messages": lc_messages},
    )


class ElvexGraphState(T.TypedDict):
    messages: T.Annotated[T.Sequence[AnyMessage], add_messages]


What we notice is that after we run these lines, the messages do get overwritten in the state correctly, but the interrupts and tasks that existed are blown away. There is a new checkpoint ID.

We want to do this operation and preserve the existing checkpoint metadata that exists.

Hi @tharshan-elvex

thanks for your reply. Let me investigate a bit more and I will be back with some further answers.

1 Like

Hi @tharshan-elvex

do you actually have to overwrite the messages in the state? Is this a must?

aupdate_state/update_state methods always create a brand new checkpoint with the overwritten state which means all the checkpoint metadata (tasks, interrupts) are gone.

I am afraid it is not possible to finalize the migration with your approach.

I would rather go with one of the approaches below (quick shots).

Having this:

def migrate_messages_to_v2(messages: list[dict]) -> list[dict]:
    migrated = []
    for m in messages:
        m2 = dict(m)

        # 1) Tool call migration (example)
        # e.g., assistant.tool_calls / m["type"] == "tool_call" / m["name"] == "old_tool"
        if m2.get("type") == "tool_call" and m2.get("name") == "old_tool":
            args = dict(m2.get("args", {}))
            # ... mapping old -> new args ...
            # args["new_param"] = args.pop("old_param")
            m2["name"] = "new_tool"
            m2["args"] = args

        # 2) Tool result migration (example)
        if m2.get("type") == "tool_result" and m2.get("name") in ("old_tool", "new_tool"):
            res = m2.get("content")
            # ... change result format ...
            # res = migrate_result(res)
            m2["name"] = "new_tool"
            m2["content"] = res

        migrated.append(m2)
    return migrated

Pattern A (keep the pause): fork before the pause, replace messages, replay to the same interrupt

  • When: you want to keep “standing” at the pause after migration.

  • How it works: fork from the parent of the current checkpoint (right before the pause), run aupdate_state with full messages, then ainvoke(None) to hit the same interrupt again.

snapshot = await graph.aget_state(config)         # checkpoint with pause (interrupt)
parent_cfg = snapshot.parent_config                # checkpoint right before the pause

# optionally preserve routing: last writer from metadata["writes"]
last_writer = None
writes = (snapshot.metadata or {}).get("writes")
if isinstance(writes, dict) and writes:
    last_writer = next(reversed(writes))

migrated = migrate_messages_to_v2(snapshot.values["messages"])

new_cfg = await graph.aupdate_state(
    config=parent_cfg,
    values={"messages": migrated},   # full replacement
    as_node=last_writer              # preserve original next steps, if clear
)

paused_again = await graph.ainvoke(None, config=new_cfg)  # you’ll hit the same interrupt again
  • Effect: a new checkpoint_id (immutability), but functionally you are still paused, with messages already updated.

Pattern B (migration on resume): resume + update

  • When: you want to exit the pause and continue, and “inject” the migration atomically at resume time.

  • How it works: from the paused checkpoint, call ainvoke(Command(resume=…, update={“messages”: migrated})).

snapshot = await graph.aget_state(config)

migrated = migrate_messages_to_v2(snapshot.values["messages"])

resumed = await graph.ainvoke(
    Command(
        resume="OK",                    # or a map of multiple interrupts: resume={id: val, ...}
        update={"messages": migrated}   # full replacement at resume
    ),
    config=config
)
  • Parallel pause variant: use resume={interrupt_id: value, …} (map), still with update={“messages”: migrated}.

Pattern C (time‑travel): choose an earlier checkpoint, replace, replay forward

  • When: you want to update state “back in time” (e.g., before the node that uses the tool), then replay the run to the current point/pause.

  • How it works: pick a checkpoint_id from history, aupdate_state there, then ainvoke(None).

history = list(await graph.aget_state_history(config))
# choose a checkpoint BEFORE the tool is used or BEFORE the pause
base = history[-2]  # example: an older checkpoint; pick based on your logic

migrated = migrate_messages_to_v2(base.values["messages"])

new_cfg = await graph.aupdate_state(
    config=base.config,                # fork at the selected point in history
    values={"messages": migrated},
    # optionally as_node like in A to preserve routing
)

# replay to the target point (e.g., you’ll hit the interrupt again)
state = await graph.ainvoke(None, config=new_cfg)

Pattern D: migrate to a new thread

# -------- Pattern D: migrate into a NEW THREAD --------
async def migrate_to_new_thread_with_interrupt_parity(
    old_thread_id: str,
    *,
    entry_node: str = "generate_reply",
) -> None:
    # 1) Read latest state from the OLD thread (paused or not)
    old_cfg = {"configurable": {"thread_id": old_thread_id}}
    old_snapshot = await graph.aget_state(old_cfg)
    old_messages = list(old_snapshot.values.get("messages", []))

    # (optional) capture old interrupts for mapping/reporting
    old_interrupts = tuple(getattr(old_snapshot, "interrupts", ()) or ())

    # 2) Migrate messages to new tool signature
    migrated = migrate_messages_to_v2(old_messages)

    # 3) Create a NEW thread and seed migrated state
    new_cfg = {"configurable": {"thread_id": str(uuid.uuid4())}}

    # Seed via update_state so we don't run any node yet.
    # We set as_node to the graph entry node so the next step is well-defined.
    await graph.aupdate_state(
        config=new_cfg,
        values={"messages": migrated},
        as_node=entry_node,
    )

    # 4) Proceed: run until next interrupt (or completion)
    # If your graph triggers the same dynamic interrupt condition, you'll pause again.
    paused_or_done = await graph.ainvoke(None, config=new_cfg)

    print("NEW THREAD ID:", new_cfg["configurable"]["thread_id"])
    print("Result/Interrupt:", paused_or_done.get(INTERRUPT) or paused_or_done)


async def demo_end_to_end():
    # Simulate an OLD thread that reaches an interrupt
    old_cfg = {"configurable": {"thread_id": old_thread_id)}}
    paused = await graph.ainvoke({"messages": []}, config=old_cfg)
    print("OLD THREAD paused at:", paused.get(INTERRUPT))

    # Migrate that OLD thread to a NEW thread (Pattern D)
    await migrate_to_new_thread_with_interrupt_parity(old_cfg["configurable"]["thread_id"])

if __name__ == "__main__":
    asyncio.run(demo_end_to_end())

Then I would simply map each legacy threadId to its migration equivalent for using the new tool signatures.


  • “Migration of tasks/interrupts”: you don’t transfer metadata 1:1; it is reconstructed on the new fork during re‑play (A/C) or naturally consumed at resume (B). This aligns with checkpoint immutability.

  • Concurrency: avoid concurrent writes to the same thread_id (otherwise INVALID_CONCURRENT_GRAPH_UPDATE).

  • If the messages reducer appends, but you need a hard replacement: perform aupdate_state at the correct point (A/C) so the “new” state is the base, or use a reducer with replace semantics (if possible in your definition).

All of this lets you automatically update tool calls and results (new signature) and preserve the semantics of pauses/interrupts without touching checkpointer tables.

Thanks for your reply @pawel-twardziak. I tried your approaches so far, but with our current langchain setup - I had some issues. One important thing to note with our setup is that we only keep one checkpoint (the latest one), and it’s overwritten each time.

With Pattern A - our snapshot.parent_config is always None.

Pattern B - This seems to actually call out to the LLM with the ainvoke. Is that expected? So every conversation we migrate will require an LLM invocation?

Pattern C - We only have one checkpoint, so no older checkpoint to choose from.

Pattern D - This sound like it can work - but I had an issue implementing this. What is the entry_node, and what is the correct value? In your code it’s default to “generate_reply”. How is the old_interrupts value meant to be used?

Hi @tharshan-elvex

thanks for your feedback! Let me follow up on your reply tomorrow or Friday.

One additional thing to note here, so Pattern D I gave it a try. However, in this pattern, we create a new thread_id. In our current setup, we only have the latest checkpoint and use a thread ID that looks like this lvx_conversation_{conversation_id}

conversation_id is a known value in our codebase, so we construct the thread_id with the format above, when we use get_config.

In pattern ID, we create a new thread_id using a new UUID. So we loose track of the checkpoint in relationship to the conversation.

Ive also noticed that when I run it with a new UUID, the metadata from the previous snapshot is lost.

Hi @pawel-twardziak any suggestions on how to proceed based on my feedback?

hi @tharshan-elvex not yet, I still need to make some time to go and code through the real case.
But something crossed my mind - have you experminented with aput_writes Checkpointing ?

Is the the thread ID pattern lvx_conversation_{conversation_id} a must?

How have you coded the last checkpoint preservation?

Hello @pawel-twardziak , we did use aput_writes at one point, but we had issues because LangGraph used this method for intermediate states. From our testing, aput_writes seemed like it was intended for intermediate states while a graph is executing to support resumption upon failure or interrupts. In testing, it seemed possible for errors during execution lead to bad states between LangGraph checkpointer tables and our message v2 table. aput is more aligned with our intention of trying to synchronize writes to the messages.

Here is our checkpointer class that overwrites aput method. It uses AsyncShallowPostgresSaver. However, do not modify the checkpointing process in any material way that affects LC.

class AsyncConversationSaver(AsyncShallowPostgresSaver):
    @T.override
    async def aput(
        self,
        config: schemas.RunnableConfig,
        checkpoint: Checkpoint,
        metadata: CheckpointMetadata,
        new_versions: ChannelVersions,
    ) -> RunnableConfig:
        # We call the super method, to run aput.
        next_cfg = await super().aput(config, checkpoint, metadata, new_versions)

        # After checkpointing is saved, we write to our conversation and message tables.

        return next_cfg

Since we have only one checkpoint, we thought tying a Thread ID to the conversation ID would make it easier to identify. Is there something wrong with this approach*?*

hi @tharshan-elvex

imo there is nothing wrong with this apprach. But that is agreat input for me at the same time! Thank you.

One more question about that comment:

# After checkpointing is saved, we write to our conversation and message tables.

Does this mean you maintain your own custom tables for the conversations and messages?

Does this mean you maintain your own custom tables for the conversations and messages?

That’s correct we do. We do this as it contains relationships and metadata that’s useful for our application.

That’s ok. However, it seems not relevant to the migration process.
I am playing around now, let’s see where it goes…

1 Like

hi @tharshan-elvex

here is the results:

having this agent invocation:

import os

from dotenv import load_dotenv
from langchain_core.messages import SystemMessage, HumanMessage
from langgraph.checkpoint.postgres import PostgresSaver, ShallowPostgresSaver

from langgraph.graph import START, StateGraph, MessagesState
from langgraph.prebuilt import tools_condition, ToolNode

load_dotenv()

def add(a: int, b: int) -> int:
    """Adds a and b.

    Args:
        a: first int
        b: second int
    """
    return a + b

def add_v2(a: int, b: int, subtract: bool) -> int:
    """Adds a and b.

    Args:
        a: first int
        b: second int
        subtract: bool
    """
    return a + b if subtract == False else b - a

def multiply(a: int, b: int) -> int:
    """Multiplies a and b.

    Args:
        a: first int
        b: second int
    """
    return a * b

def divide(a: int, b: int) -> float:
    """Divide a and b.

    Args:
        a: first int
        b: second int
    """
    return a / b

tools = [add, multiply, divide]

# Define LLM with bound tools
from langchain_anthropic import ChatAnthropic

llm = ChatAnthropic(model_name='claude-3-7-sonnet-latest', timeout=60000, stop=None)
llm_with_tools = llm.bind_tools(tools)

# System message
sys_msg = SystemMessage(content="You are a helpful assistant tasked with writing performing arithmetic on a set of inputs.")

# Node
def assistant(state: MessagesState):
   return {"messages": [llm_with_tools.invoke([sys_msg] + state["messages"])]}

# Build graph
builder = StateGraph(MessagesState)
builder.add_node("assistant", assistant)
builder.add_node("tools", ToolNode(tools))
builder.add_edge(START, "assistant")
builder.add_conditional_edges(
    "assistant",
    # If the latest message (result) from assistant is a tool call -> tools_condition routes to tools
    # If the latest message (result) from assistant is a not a tool call -> tools_condition routes to END
    tools_condition,
)
builder.add_edge("tools", "assistant")

with ShallowPostgresSaver.from_conn_string(os.getenv("POSTGRES_URI")) as memory:
    config = {"configurable": {"thread_id": "unique_id"}}
    # memory.setup()
    checkpoint = memory.get(config)

    # Compile graph
    graph = builder.compile(checkpointer=memory)

    graph.invoke({"messages":[HumanMessage(content="multiply by 2")]}, config)
    graph.invoke({"messages":[HumanMessage(content="6")]}, config)
    graph.invoke({"messages":[HumanMessage(content="add 5")]}, config)
    graph.invoke({"messages":[HumanMessage(content="divide by 2")]}, config)

    snapshot = graph.get_state(config)

    for msg in snapshot.values.get("messages"):
        msg.pretty_print()

I can see this printout:

================================ Human Message =================================

multiply by 2
================================== Ai Message ==================================

I'd be happy to help you multiply by 2, but I need to know what number you want to multiply. Could you please provide the number that you want to multiply by 2?
================================ Human Message =================================

6
================================== Ai Message ==================================

[{'text': "I'll multiply 6 by 2 for you.", 'type': 'text'}, {'id': 'toolu_01VPaD8JBaTncisctVXBb7Gj', 'input': {'a': 6, 'b': 2}, 'name': 'multiply', 'type': 'tool_use'}]
Tool Calls:
  multiply (toolu_01VPaD8JBaTncisctVXBb7Gj)
 Call ID: toolu_01VPaD8JBaTncisctVXBb7Gj
  Args:
    a: 6
    b: 2
================================= Tool Message =================================
Name: multiply

12
================================== Ai Message ==================================

The result of multiplying 6 by 2 is 12.
================================ Human Message =================================

add 5
================================== Ai Message ==================================

[{'text': "I'll add 5 to the previous result of 12.", 'type': 'text'}, {'id': 'toolu_012AVvxdMs5ixYUunQSEeN5B', 'input': {'a': 12, 'b': 5}, 'name': 'add', 'type': 'tool_use'}]
Tool Calls:
  add (toolu_012AVvxdMs5ixYUunQSEeN5B)
 Call ID: toolu_012AVvxdMs5ixYUunQSEeN5B
  Args:
    a: 12
    b: 5
================================= Tool Message =================================
Name: add

17
================================== Ai Message ==================================

The result of adding 5 to 12 is 17.
================================ Human Message =================================

divide by 2
================================== Ai Message ==================================

[{'text': "I'll divide the previous result of 17 by 2.", 'type': 'text'}, {'id': 'toolu_017xDHJViuLCXLHvRp3pqCCb', 'input': {'a': 17, 'b': 2}, 'name': 'divide', 'type': 'tool_use'}]
Tool Calls:
  divide (toolu_017xDHJViuLCXLHvRp3pqCCb)
 Call ID: toolu_017xDHJViuLCXLHvRp3pqCCb
  Args:
    a: 17
    b: 2
================================= Tool Message =================================
Name: divide

8.5
================================== Ai Message ==================================

The result of dividing 17 by 2 is 8.5.

Now I am running this migration script (tool add → tool add_v2):

"""
Full checkpoint migration script: Rewrite tool calls from 'add' to 'add_v2'
"""

import os
from dotenv import load_dotenv
from langchain_core.messages import AIMessage, ToolMessage
from langgraph.checkpoint.postgres import ShallowPostgresSaver
from langgraph.graph import StateGraph, MessagesState, START
from langgraph.prebuilt import tools_condition, ToolNode
from langchain_anthropic import ChatAnthropic
from langchain_core.messages import SystemMessage

load_dotenv()

# Import BOTH old and new tools
def add(a: int, b: int) -> int:
    """Adds a and b."""
    return a + b

def add_v2(a: int, b: int, subtract: bool = False) -> int:
    """Adds a and b, or subtracts if subtract=True."""
    return a + b if not subtract else b - a

def multiply(a: int, b: int) -> int:
    """Multiplies a and b."""
    return a * b

def divide(a: int, b: int) -> float:
    """Divide a and b."""
    return a / b

# Build graph with NEW tools
tools = [add_v2, multiply, divide]
llm = ChatAnthropic(model_name='claude-3-7-sonnet-latest', timeout=60000, stop=None)
llm_with_tools = llm.bind_tools(tools)
sys_msg = SystemMessage(content="You are a helpful assistant tasked with performing arithmetic.")

def assistant(state: MessagesState):
    return {"messages": [llm_with_tools.invoke([sys_msg] + state["messages"])]}

builder = StateGraph(MessagesState)
builder.add_node("assistant", assistant)
builder.add_node("tools", ToolNode(tools))
builder.add_edge(START, "assistant")
builder.add_conditional_edges("assistant", tools_condition)
builder.add_edge("tools", "assistant")

with ShallowPostgresSaver.from_conn_string(os.getenv("POSTGRES_URI")) as memory:
    memory.setup()

    config = {"configurable": {"thread_id": "unique_id"}}
    graph = builder.compile(checkpointer=memory)

    # Step 1: Get current state
    state = graph.get_state(config)

    print("=== BEFORE MIGRATION ===")
    print(f"Total messages: {len(state.values['messages'])}\n")

    # Step 2: Transform messages - rewrite tool calls from 'add' to 'add_v2'
    updated_messages = []

    for msg in state.values["messages"]:
        if isinstance(msg, AIMessage) and hasattr(msg, 'tool_calls') and msg.tool_calls:
            # Check if this message contains 'add' tool calls
            modified = False
            new_tool_calls = []
            new_content = msg.content

            for tool_call in msg.tool_calls:
                if tool_call['name'] == 'add':
                    # Rewrite to add_v2 with new signature
                    new_tool_call = tool_call.copy()
                    new_tool_call['name'] = 'add_v2'
                    new_tool_call['args']['subtract'] = False  # Add new parameter
                    new_tool_calls.append(new_tool_call)
                    modified = True
                    print(f"✓ Migrating AIMessage tool_call: add → add_v2 (id: {msg.id})")
                else:
                    new_tool_calls.append(tool_call)

            if modified:
                # Create new AIMessage with same ID to overwrite
                # Update content if it mentions the old tool
                if isinstance(new_content, str):
                    new_content = new_content.replace("add tool", "add_v2 tool")
                elif isinstance(new_content, list):
                    new_content = [
                        {**item, 'name': 'add_v2'} if isinstance(item, dict) and item.get('name') == 'add' else item
                        for item in new_content
                    ]

                new_msg = AIMessage(
                    content=new_content,
                    id=msg.id,  # ← CRITICAL: Same ID = overwrite existing message
                    tool_calls=new_tool_calls,
                    additional_kwargs=msg.additional_kwargs,
                    response_metadata=msg.response_metadata,
                    usage_metadata=getattr(msg, 'usage_metadata', None)
                )
                updated_messages.append(new_msg)
            else:
                updated_messages.append(msg)

        elif isinstance(msg, ToolMessage) and msg.name == 'add':
            # Rewrite ToolMessage to reference add_v2
            print(f"✓ Migrating ToolMessage: add → add_v2 (id: {msg.id})")
            new_msg = ToolMessage(
                content=msg.content,
                id=msg.id,  # ← CRITICAL: Same ID = overwrite existing message
                tool_call_id=msg.tool_call_id,
                name='add_v2',  # Updated tool name
                additional_kwargs=msg.additional_kwargs,
            )
            updated_messages.append(new_msg)
        else:
            updated_messages.append(msg)

    # Step 3: Update state with transformed messages
    # Key insight from docs: add_messages reducer overwrites messages with same ID
    print("\n=== APPLYING MIGRATION ===")
    new_config = graph.update_state(config, {"messages": updated_messages})

    print(f"New checkpoint created: {new_config['configurable']['checkpoint_id']}")

    # Step 4: Verify migration
    print("\n=== AFTER MIGRATION ===")
    new_state = graph.get_state(config)

    print(f"Total messages: {len(new_state.values['messages'])}")

    # Check for any remaining 'add' references
    add_count = 0
    add_v2_count = 0

    for msg in new_state.values["messages"]:
        if isinstance(msg, AIMessage) and hasattr(msg, 'tool_calls') and msg.tool_calls:
            for tc in msg.tool_calls:
                if tc['name'] == 'add':
                    add_count += 1
                elif tc['name'] == 'add_v2':
                    add_v2_count += 1
        elif isinstance(msg, ToolMessage):
            if msg.name == 'add':
                add_count += 1
            elif msg.name == 'add_v2':
                add_v2_count += 1

    print(f"\nTool call counts:")
    print(f"  'add' references: {add_count}")
    print(f"  'add_v2' references: {add_v2_count}")

    if add_count == 0:
        print("\n✓ Migration successful! All 'add' tool calls converted to 'add_v2'")
    else:
        print(f"\n⚠ Warning: {add_count} 'add' references still remain")

    print("\n=== SAMPLE MESSAGES ===")
    for i, msg in enumerate(new_state.values["messages"]):
        if isinstance(msg, (AIMessage, ToolMessage)):
            print(f"\nMessage {i}:")
            msg.pretty_print()

and getting this printout:

=== BEFORE MIGRATION ===
Total messages: 14

✓ Migrating AIMessage tool_call: add → add_v2 (id: run--1038a9e8-72e4-4b1c-ac17-2806a3b09382-0)
✓ Migrating ToolMessage: add → add_v2 (id: e192120a-6fb1-40fa-add5-1baa52116eba)

=== APPLYING MIGRATION ===
New checkpoint created: 1f0a2f55-5ec6-610c-8011-b43cd9795e4a

=== AFTER MIGRATION ===
Total messages: 14

Tool call counts:
  'add' references: 0
  'add_v2' references: 2

✓ Migration successful! All 'add' tool calls converted to 'add_v2'

=== SAMPLE MESSAGES ===

Message 1:
================================== Ai Message ==================================

I'd be happy to help you multiply by 2, but I need to know what number you want to multiply. Could you please provide the number that you want to multiply by 2?

Message 3:
================================== Ai Message ==================================

[{'text': "I'll multiply 6 by 2 for you.", 'type': 'text'}, {'id': 'toolu_01VPaD8JBaTncisctVXBb7Gj', 'input': {'a': 6, 'b': 2}, 'name': 'multiply', 'type': 'tool_use'}]
Tool Calls:
  multiply (toolu_01VPaD8JBaTncisctVXBb7Gj)
 Call ID: toolu_01VPaD8JBaTncisctVXBb7Gj
  Args:
    a: 6
    b: 2

Message 4:
================================= Tool Message =================================
Name: multiply

12

Message 5:
================================== Ai Message ==================================

The result of multiplying 6 by 2 is 12.

Message 7:
================================== Ai Message ==================================

[{'text': "I'll add 5 to the previous result of 12.", 'type': 'text'}, {'id': 'toolu_012AVvxdMs5ixYUunQSEeN5B', 'input': {'a': 12, 'b': 5}, 'name': 'add_v2', 'type': 'tool_use'}]
Tool Calls:
  add_v2 (toolu_012AVvxdMs5ixYUunQSEeN5B)
 Call ID: toolu_012AVvxdMs5ixYUunQSEeN5B
  Args:
    a: 12
    b: 5
    subtract: False

Message 8:
================================= Tool Message =================================
Name: add_v2

17

Message 9:
================================== Ai Message ==================================

The result of adding 5 to 12 is 17.

Message 11:
================================== Ai Message ==================================

[{'text': "I'll divide the previous result of 17 by 2.", 'type': 'text'}, {'id': 'toolu_017xDHJViuLCXLHvRp3pqCCb', 'input': {'a': 17, 'b': 2}, 'name': 'divide', 'type': 'tool_use'}]
Tool Calls:
  divide (toolu_017xDHJViuLCXLHvRp3pqCCb)
 Call ID: toolu_017xDHJViuLCXLHvRp3pqCCb
  Args:
    a: 17
    b: 2

Message 12:
================================= Tool Message =================================
Name: divide

8.5

Message 13:
================================== Ai Message ==================================

The result of dividing 17 by 2 is 8.5.

So now, running this:

import os

from dotenv import load_dotenv
from langchain_core.messages import SystemMessage, HumanMessage
from langgraph.checkpoint.postgres import PostgresSaver, ShallowPostgresSaver

from langgraph.graph import START, StateGraph, MessagesState
from langgraph.prebuilt import tools_condition, ToolNode

load_dotenv()

def add(a: int, b: int) -> int:
    """Adds a and b.

    Args:
        a: first int
        b: second int
    """
    return a + b

def add_v2(a: int, b: int, subtract: bool) -> int:
    """Adds a and b.

    Args:
        a: first int
        b: second int
        subtract: bool
    """
    return a + b if subtract == False else b - a

def multiply(a: int, b: int) -> int:
    """Multiplies a and b.

    Args:
        a: first int
        b: second int
    """
    return a * b

def divide(a: int, b: int) -> float:
    """Divide a and b.

    Args:
        a: first int
        b: second int
    """
    return a / b

tools = [add_v2, multiply, divide]

# Define LLM with bound tools
from langchain_anthropic import ChatAnthropic

llm = ChatAnthropic(model_name='claude-3-7-sonnet-latest', timeout=60000, stop=None)
llm_with_tools = llm.bind_tools(tools)

# System message
sys_msg = SystemMessage(content="You are a helpful assistant tasked with writing performing arithmetic on a set of inputs.")

# Node
def assistant(state: MessagesState):
   return {"messages": [llm_with_tools.invoke([sys_msg] + state["messages"])]}

# Build graph
builder = StateGraph(MessagesState)
builder.add_node("assistant", assistant)
builder.add_node("tools", ToolNode(tools))
builder.add_edge(START, "assistant")
builder.add_conditional_edges(
    "assistant",
    # If the latest message (result) from assistant is a tool call -> tools_condition routes to tools
    # If the latest message (result) from assistant is a not a tool call -> tools_condition routes to END
    tools_condition,
)
builder.add_edge("tools", "assistant")


with ShallowPostgresSaver.from_conn_string(os.getenv("POSTGRES_URI")) as memory:
    config = {"configurable": {"thread_id": "unique_id"}}
    checkpoint = memory.get(config)

    # Compile graph
    graph = builder.compile(checkpointer=memory)

    answer = graph.invoke(None, config)
    for msg in answer["messages"]:
        msg.pretty_print()

I can see this:

================================ Human Message =================================

multiply by 2
================================== Ai Message ==================================

I'd be happy to help you multiply by 2, but I need to know what number you want to multiply. Could you please provide the number that you want to multiply by 2?
================================ Human Message =================================

6
================================== Ai Message ==================================

[{'text': "I'll multiply 6 by 2 for you.", 'type': 'text'}, {'id': 'toolu_01VPaD8JBaTncisctVXBb7Gj', 'input': {'a': 6, 'b': 2}, 'name': 'multiply', 'type': 'tool_use'}]
Tool Calls:
  multiply (toolu_01VPaD8JBaTncisctVXBb7Gj)
 Call ID: toolu_01VPaD8JBaTncisctVXBb7Gj
  Args:
    a: 6
    b: 2
================================= Tool Message =================================
Name: multiply

12
================================== Ai Message ==================================

The result of multiplying 6 by 2 is 12.
================================ Human Message =================================

add 5
================================== Ai Message ==================================

[{'text': "I'll add 5 to the previous result of 12.", 'type': 'text'}, {'id': 'toolu_012AVvxdMs5ixYUunQSEeN5B', 'input': {'a': 12, 'b': 5}, 'name': 'add_v2', 'type': 'tool_use'}]
Tool Calls:
  add_v2 (toolu_012AVvxdMs5ixYUunQSEeN5B)
 Call ID: toolu_012AVvxdMs5ixYUunQSEeN5B
  Args:
    a: 12
    b: 5
    subtract: False
================================= Tool Message =================================
Name: add_v2

17
================================== Ai Message ==================================

The result of adding 5 to 12 is 17.
================================ Human Message =================================

divide by 2
================================== Ai Message ==================================

[{'text': "I'll divide the previous result of 17 by 2.", 'type': 'text'}, {'id': 'toolu_017xDHJViuLCXLHvRp3pqCCb', 'input': {'a': 17, 'b': 2}, 'name': 'divide', 'type': 'tool_use'}]
Tool Calls:
  divide (toolu_017xDHJViuLCXLHvRp3pqCCb)
 Call ID: toolu_017xDHJViuLCXLHvRp3pqCCb
  Args:
    a: 17
    b: 2
================================= Tool Message =================================
Name: divide

8.5
================================== Ai Message ==================================

The result of dividing 17 by 2 is 8.5.

@tharshan-elvex is this something that could help figure out the migration?


Small update - to migrate the input field too:

            for tool_call in msg.tool_calls:
                if tool_call['name'] == 'add':
                    # Rewrite to add_v2 with new signature
                    new_tool_call = tool_call.copy()
                    new_tool_call['name'] = 'add_v2'

                    # CRITICAL: Add 'subtract' parameter to args/input
                    if 'args' not in new_tool_call:
                        new_tool_call['args'] = {}
                    new_tool_call['args']['subtract'] = False

                    # Also update 'input' field if it exists
                    if 'input' in new_tool_call:
                        new_tool_call['input']['subtract'] = False

                    new_tool_calls.append(new_tool_call)
                    modified = True
                    print(f"✓ Migrating AIMessage tool_call: add → add_v2 (id: {msg.id})")
                    print(f"  Updated args: {new_tool_call.get('args', new_tool_call.get('input'))}")
                else:
                    new_tool_calls.append(tool_call)

Hey @pawel-twardziak Is there a finding here specific to preserving the interrupt state?

Right now we have a working migration script, that we have validated. The state that gets migrated and written is correct for the the v2 of the tool.

However, to bring back the initial focus of the issue at hand: the messages do get overwritten in the state correctly, but the interrupts and tasks that existed are blown away. There is a new checkpoint ID.

You had posted a few alternative patterns to try migrate the state and keep the interrupts, however they relied on having time-travel enables with previous checkpooints.

With Pattern A - our snapshot.parent_config is always None.

Pattern B - This seems to actually call out to the LLM with the ainvoke. Is that expected? So every conversation we migrate will require an LLM invocation?

Pattern C - We only have one checkpoint, so no older checkpoint to choose from.

Pattern D - Cannot use this since we have a consistent thread_id.

hi @tharshan-elvex yes, you are right, I lost track

I will follow up tomorrow, already too tired today

1 Like