Interrupt does not work correctly in LangGraph

Hello, I am learning langgraph and encounter some problems about `interrupt`. Below are my codes. The first interrupt works correctly but the second does not work. The workflow indeed enters the “get_input” node but it exits on the `interrupts` line without any errors. Anyone has some ideas ? Thanks.

from langgraph.graph import StateGraph, START, END 
from typing import TypedDict
from random import randint
import time 
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import MemorySaver
from langgraph.config import get_stream_writer
import asyncio

class MyState(TypedDict):
    data: int
    is_good: bool = True

class Test:
    def __init__(self):
        workflow = StateGraph(MyState)
        workflow.add_node("node_rand", self.node_rand)
        workflow.add_node("node_add",  self.node_add)
        workflow.add_node("stop", self.stop)
        workflow.add_node("get_input", self.get_input)

        workflow.add_edge(START, "node_rand")
        workflow.add_edge("node_rand", "node_add")
        workflow.add_edge("node_add", "stop")
        workflow.add_conditional_edges("stop", self.branch, {END:END, "get_input": "get_input"})
        workflow.add_edge("get_input", END)
        self.graph = workflow.compile(checkpointer=MemorySaver())

    def node_rand(self, state:MyState):
        get_stream_writer()("This is a custom message. ")
        return {"data": randint(0, 10)}
    
    def node_add(self, state:MyState):
        return {"data": state["data"] + 1}

    def stop(self, state:MyState):
        response = interrupt({
            "value": state["data"], 
            "message": "Are you satisfied with this value ?"
        })
        return {"is_good": response}

    def branch(self, state:MyState):
        return END if state["is_good"] else "get_input"

    def get_input(self, state:MyState):
       # print("Enter get_input.")
        response = interrupt("Input a new value. ")
        return {"data": response}
    
    async def run(self):
        config = {"configurable": {"thread_id": "thread-1"}}
        async for chunk in self.graph.astream({},  
                                              stream_mode=["values", "custom"], 
                                              config=config,  
                                              version="v2"):
            yield chunk 
    
    def resume(self, data):
        config = {"configurable": {"thread_id": "thread-1"}}
        self.graph.invoke(Command(resume=data), config=config, version="v2")

async def run_test():
    test = Test()
    async for chunk in test.run():
        print("chunk ==>", chunk)
        if chunk['type'] == 'values' and chunk['interrupts']:
            interrupts_value = chunk['interrupts'][0].value
            if type(interrupts_value) is dict:
                ans = input("Input yes or no: ")
                test.resume(True if ans=='yes' else False)
            else:
                ans = int(input("Input a int: "))
                test.resume(ans)
asyncio.run(run_test())



Hello @MSJavaScript, welcome to LangChain Community!!!

Your first interrupt works because you handle it from the initial astream. The second interrupt happens during invoke(Command(resume=...)), but you never stream or inspect that run, your async for is still tied to the first astream, which already ended at the first pause.

Fix: treat every resume as a new astream/ainvoke with Command(resume=...) (loop until no interrupts), and use ainvoke/astream consistently instead of sync invoke inside async code

If interrupt() in get_input returns without pausing, a resume value was already applied (often from resuming while the first stream hadn’t fully finished). Drain the stream first, then resume on a new call with the same thread_id.

Please see below the updated code.

import asyncio
from random import randint
from typing import TypedDict

from langgraph.checkpoint.memory import InMemorySaver
from langgraph.config import get_stream_writer
from langgraph.graph import END, START, StateGraph
from langgraph.types import Command, interrupt


class MyState(TypedDict):
    data: int
    is_good: bool


class Test:
    def __init__(self):
        workflow = StateGraph(MyState)
        workflow.add_node("node_rand", self.node_rand)
        workflow.add_node("node_add", self.node_add)
        workflow.add_node("stop", self.stop)
        workflow.add_node("get_input", self.get_input)

        workflow.add_edge(START, "node_rand")
        workflow.add_edge("node_rand", "node_add")
        workflow.add_edge("node_add", "stop")
        workflow.add_conditional_edges(
            "stop", self.branch, {END: END, "get_input": "get_input"}
        )
        workflow.add_edge("get_input", END)

        self.graph = workflow.compile(checkpointer=InMemorySaver())

    def node_rand(self, state: MyState):
        get_stream_writer()("This is a custom message.")
        return {"data": randint(0, 10)}

    def node_add(self, state: MyState):
        return {"data": state["data"] + 1}

    def stop(self, state: MyState):
        response = interrupt(
            {
                "value": state["data"],
                "message": "Are you satisfied with this value?",
            }
        )
        return {"is_good": response}

    def branch(self, state: MyState):
        return END if state["is_good"] else "get_input"

    def get_input(self, state: MyState):
        response = interrupt("Input a new value.")
        return {"data": response}


async def run_test():
    test = Test()
    config = {"configurable": {"thread_id": "thread-1"}}
    graph_input: dict | Command = {}

    while True:
        saw_interrupt = False

        async for chunk in test.graph.astream(
            graph_input,
            config=config,
            stream_mode=["values", "custom"],
            version="v2",
        ):
            print("chunk ==>", chunk)

            if chunk["type"] == "values" and chunk.get("interrupts"):
                saw_interrupt = True
                payload = chunk["interrupts"][0].value

                if isinstance(payload, dict):
                    ans = input("Input yes or no: ")
                    graph_input = Command(resume=(ans == "yes"))
                else:
                    ans = int(input("Input an int: "))
                    graph_input = Command(resume=ans)

                # Important: start a NEW astream with Command(resume=...)
                break

        if not saw_interrupt:
            print("done")
            break


if __name__ == "__main__":
    asyncio.run(run_test())

I hope this helps, let me know if there are more questions about the interrupt or about its functionality.

Doc Reference: Interrupts - Docs by LangChain.

Thanks, it’s very helpful