How to use human-in-the-loop with stream_mode="messages"

Goal

I’m using Python and would like to use streaming output (LLM token streaming output, i.e., stream_mode=“messages”) and use it with human-in-the-loop.

环境

langchain                 0.3.27
langchain-core            0.3.76
langchain-openai          0.3.33
langchain-text-splitters  0.3.11
langgraph                 0.6.7
langgraph-api             0.4.26
langgraph-checkpoint      2.1.1
langgraph-cli             0.4.2
langgraph-prebuilt        0.6.4
langgraph-runtime-inmem   0.14.0
langgraph-sdk             0.2.9
langsmith                 0.4.30

Existing Attempts

When stream_mode=“updates” is set, I implemented streaming output node results by referring to the relevant documentation and also successfully integrated human-in-the-loop. The following is the relevant code.

from __future__ import annotations

from typing import Annotated

from langchain.chat_models import init_chat_model
from langchain_core.messages import HumanMessage
from langchain_core.tools import tool
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.checkpoint.serde.types import INTERRUPT
from langgraph.constants import START, END
from langgraph.graph import StateGraph, add_messages
from langgraph.prebuilt import ToolNode
from langgraph.types import interrupt, Command
from pydantic import BaseModel


# --------------------tool-------------------------
@tool
def human_assistance(query: str) -> str:
    """Request assistance from a human."""
    human_response = interrupt(query) 
    return human_response


tools = [human_assistance]

# -----------------------llm----------------------
llm = init_chat_model("Qwen/Qwen3-30B-A3B", model_provider="openai", base_url="http://127.0.0.1:22222/v1", api_key="sk-password")

llm_with_tools = llm.bind_tools(tools)

# ---------------state-----------------------
class State(BaseModel):
    messages: Annotated[list, add_messages]


# ----------------node------------------
def chat_bot(state: State):
    return {"messages": llm_with_tools.invoke(state.messages)}


tool_node = ToolNode(tools)


def route_tools(state: State) -> str:
    messages = state.messages if state.messages else []
    if messages:
        ai_message = messages[-1]
    else:
        raise ValueError(f"No messages found in input state to tool_edge: {state}")
    if hasattr(ai_message, "tool_calls") and len(ai_message.tool_calls) > 0:
        return "tool_node"
    return END


# ----------------graph----------------
graph = (
    StateGraph(State)
    .add_node("chat_bot", chat_bot)
    .add_node("tool_node", tool_node)

    .add_edge(START, "chat_bot")
    .add_conditional_edges(
        "chat_bot",
        route_tools,
        {"tool_node": "tool_node", END: END},
    )
    .add_edge("tool_node", "chat_bot")

    .compile(name="New Graph", checkpointer=InMemorySaver())
)



def stream_graph_updates(user_input: str, thread_id: str):
    for event in graph.stream(
            {"messages": [{"role": "user", "content": user_input}]},
            config={"configurable": {"thread_id": thread_id}},
            stream_mode="updates"
    ):
        assert len(event) == 1
        if INTERRUPT in event:
            user_decision = input("interrupt: " + event[INTERRUPT][0].value + ": ")
            stream_graph_updates(user_decision, thread_id)
        else:
            state_dict = list(event.values())[-1]
            state_dict['messages'].pretty_print()


def main():
    while True:
        try:
            thread_id = input("thread_id: ")
            user_input = input("user: ")
            if user_input.lower() in ["quit", "exit", "q"]:
                print("Goodbye!")
                break

            stream_graph_updates(user_input, thread_id)
        except Exception as e:
            print(e)


if "__main__" == __name__:
    main()

After running, enter the following

1
这是一个测试,我需要你调用只一次human_assistance函数,参数随机即可

The results are as follows

Issues with the above

stream_mode=“updates” does not allow LLM to output token by token; this requires stream_mode=“messages”

Issues with stream_mode=“messages”

The interrupt cannot be performed normally at this time.

The code is as follows

from __future__ import annotations

from typing import Annotated

from langchain.chat_models import init_chat_model
from langchain_core.tools import tool
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.checkpoint.serde.types import INTERRUPT
from langgraph.constants import START, END
from langgraph.graph import StateGraph, add_messages
from langgraph.prebuilt import ToolNode
from langgraph.types import interrupt
from pydantic import BaseModel


# --------------------tool-------------------------
@tool
def human_assistance(query: str) -> str:
    """Request assistance from a human."""
    human_response = interrupt(query)
    return human_response


tools = [human_assistance]

# -----------------------llm----------------------
llm = init_chat_model("Qwen/Qwen3-30B-A3B", model_provider="openai", base_url="http://127.0.0.1:22222/v1", api_key="sk-password")

llm_with_tools = llm.bind_tools(tools)


# ---------------state-----------------------
class State(BaseModel):
    messages: Annotated[list, add_messages]


# ----------------node------------------
def chat_bot(state: State):
    return {"messages": llm_with_tools.invoke(state.messages)}


tool_node = ToolNode(tools)


def route_tools(state: State) -> str:
    messages = state.messages if state.messages else []
    if messages:
        ai_message = messages[-1]
    else:
        raise ValueError(f"No messages found in input state to tool_edge: {state}")
    if hasattr(ai_message, "tool_calls") and len(ai_message.tool_calls) > 0:
        return "tool_node"
    return END


# ----------------graph----------------
graph = (
    StateGraph(State)
    .add_node("chat_bot", chat_bot)
    .add_node("tool_node", tool_node)

    .add_edge(START, "chat_bot")
    .add_conditional_edges(
        "chat_bot",
        route_tools,
        {"tool_node": "tool_node", END: END},
    )
    .add_edge("tool_node", "chat_bot")

    .compile(name="New Graph", checkpointer=InMemorySaver())
)


def stream_graph_updates(user_input: str, thread_id: str):
    # for event in graph.stream(
    #         {"messages": [{"role": "user", "content": user_input}]},
    #         config={"configurable": {"thread_id": thread_id}},
    #         stream_mode="updates"
    # ):
    #     assert len(event) == 1
    #     if INTERRUPT in event:
    #         user_decision = input("interrupt: " + event[INTERRUPT][0].value + ": ")
    #         stream_graph_updates(user_decision, thread_id)
    #     else:
    #         state_dict = list(event.values())[-1]
    #         state_dict['messages'].pretty_print()

    for token, metadata in graph.stream(
            {"messages": [{"role": "user", "content": user_input}]},
            config={"configurable": {"thread_id": thread_id}},
            stream_mode="messages"
    ):
        print(token.content, end="")  # print one word
    print()


def main():
    while True:
        try:
            thread_id = input("thread_id: ")
            user_input = input("user: ")
            if user_input.lower() in ["quit", "exit", "q"]:
                print("Goodbye!")
                break

            stream_graph_updates(user_input, thread_id)
        except Exception as e:
            print(e)


if "__main__" == __name__:
    main()

The results are as follows,Obviously there is no interruption step above.

thread_id: 1
user: 这是一个测试,我需要你调用只一次human_assistance函数,参数随机即可
<think>
好的,用户让我测试一下,需要我调用一次human_assistance函数,参数随便填。首先,我需要确认这个函数的用途,根据描述,它是用来请求人类帮助的,参数是一个字符串类型的query。用户可能是在检查我是否能够正确触发工具调用,或者想看看参数传递是否正常。

接下来,我要确保只调用一次这个函数,不能多也不能少。用户特别强调“只一次”,所以得注意不要重复调用。然后参数需要是随机的,但必须符合要求。这里可能需要生成一个随机的查询字符串,比如用一些随机的单词或者句子组合。

不过用户可能没有明确说明参数的具体内容,所以只要构造一个合理的随机字符串就行。比如“随机查询参数”或者“test query 123”。需要确认参数是否在JSON对象中正确传递,键是“query”,值是字符串。同时,要确保工具调用的格式正确,用XML标签包裹,并且JSON结构有效。

另外,用户可能希望测试我的响应是否符合预期,所以调用后可能需要生成一个假的响应,但根据指示,这里只需要生成工具调用部分。不需要实际执行函数,只是模拟调用。因此,正确生成工具调用的JSON结构是关键。

最后,检查是否有其他可能的工具,但根据提供的工具列表,只有human_assistance可用。所以直接调用这个函数即可,不需要考虑其他选项。确保参数没有语法错误,比如引号闭合,逗号正确等。
</think>


thread_id: 

How can I make LLM output token by token and interrupts can also be executed normally?

Hi @zjt148680

have you tried this approach:

def stream_graph(user_input: str, thread_id: str):
    config = {"configurable": {"thread_id": thread_id}}

    # First invocation uses the user's message. Resumes use Command(...)
    pending_input = {"messages": [{"role": "user", "content": user_input}]}

    while True:
        interrupted = False

        for mode, chunk in graph.stream(
            pending_input,
            config=config,
            stream_mode=["messages", "updates"],  # tokens + interrupts
        ):
            if mode == "messages":
                token, meta = chunk
                # Only print real token content (function-call chunks can be None)
                if token.content and meta.get("langgraph_node") == "chat_bot":
                    print(token.content, end="", flush=True)

            elif mode == "updates":
                event = chunk
                # Interrupts are surfaced via updates/debug
                if isinstance(event, dict) and INTERRUPT in event:
                    print()  # newline before prompting
                    prompt = event[INTERRUPT][0].value
                    human = input(f"interrupt: {prompt}: ")
                    # Resume the same run using Command(resume=...)
                    pending_input = Command(resume=human)
                    interrupted = True
                    break  # exit inner for-loop and resume

        if not interrupted:
            print()  # finish the line of streamed tokens
            return

@zjt148680

I made this (both messages and updates are streamable):

import os

from dotenv import load_dotenv

from typing import Annotated

from langchain.chat_models import init_chat_model
from langchain_core.messages import HumanMessage
from langchain_core.tools import tool
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.checkpoint.serde.types import INTERRUPT
from langgraph.constants import START, END
from langgraph.graph import StateGraph, add_messages
from langgraph.prebuilt import ToolNode
from langgraph.config import get_stream_writer
from langgraph.types import interrupt, Command
from pydantic import BaseModel

load_dotenv()

# --------------------tool-------------------------
@tool
def human_assistance(query: str) -> str:
    """Request assistance from a human."""
    # Emit a preview of the interrupt prompt to the client via custom stream
    try:
        writer = get_stream_writer()
        # You can emit structured data; here we send a simple dict payload
        writer({"type": "interrupt_preview", "prompt": query})
    except Exception:
        # get_stream_writer is only available within a LangGraph execution context
        # If not available, silently skip emitting custom data
        pass

    human_response = interrupt(query) 
    return human_response


tools = [human_assistance]

# -----------------------llm----------------------
llm = init_chat_model(
    "claude-3-7-sonnet-latest",
    model_provider="anthropic",
    api_key=os.getenv("ANTHROPIC_API_KEY", ""),
)

llm_with_tools = llm.bind_tools(tools)

# ---------------state-----------------------
class State(BaseModel):
    messages: Annotated[list, add_messages]


# ----------------node------------------
def chat_bot(state: State):
    print("Chat history:")
    for msg in state.messages:
        role = getattr(msg, "role", None) or getattr(msg, "type", None) or "unknown"
        content = getattr(msg, "content", None) or str(msg)
        print(f"  [{role}] {content}")
    ai_msg = llm_with_tools.invoke(state.messages)
    # IMPORTANT: return new message(s); do NOT mutate state (append returns None)
    return {"messages": [ai_msg]}


tool_node = ToolNode(tools)


def route_tools(state: State) -> str:
    messages = state.messages if state.messages else []
    if messages:
        ai_message = messages[-1]
    else:
        raise ValueError(f"No messages found in input state to tool_edge: {state}")
    if hasattr(ai_message, "tool_calls") and len(ai_message.tool_calls) > 0:
        return "tool_node"
    return END

graph = (
    StateGraph(State)
    .add_node("chat_bot", chat_bot)
    .add_node("tool_node", tool_node)

    .add_edge(START, "chat_bot")
    .add_conditional_edges(
        "chat_bot",
        route_tools,
        {"tool_node": "tool_node", END: END},
    )
    .add_edge("tool_node", "chat_bot")

    .compile(name="New Graph", checkpointer=InMemorySaver())
)

def stream_graph(user_input: str, thread_id: str):
    config = {"configurable": {"thread_id": thread_id}}

    # First invocation uses the user's message. Resumes use Command(...)
    pending_input = {"messages": [{"role": "user", "content": user_input}]}

    while True:
        interrupted = False

        for mode, chunk in graph.stream(
            pending_input,
            config=config,
            stream_mode=["messages", "updates", "custom"],  # tokens + interrupts + custom tool events
        ):
            # print(f"\n--- Stream mode: {mode} ---")

            if mode == "messages":
                token, meta = chunk
                # Only print real token content (function-call chunks can be None)
                if token.content and meta.get("langgraph_node") == "chat_bot":
                    content = token.content
                    if isinstance(content, str):
                        text = content
                    elif isinstance(content, list):
                        # Extract text blocks for providers that return content blocks
                        parts = []
                        for block in content:
                            if isinstance(block, dict) and block.get("type") == "text":
                                parts.append(block.get("text", ""))
                        text = "".join(parts)
                    else:
                        text = str(content)
                    if text:
                        print(text, end="", flush=True)

            elif mode == "custom":
                # Print any custom tool emissions (e.g., interrupt previews)
                try:
                    print(chunk.prompt, flush=True)
                except Exception:
                    pass

            elif mode == "updates":
                event = chunk
                # Interrupts are surfaced via updates/debug
                if isinstance(event, dict) and INTERRUPT in event:
                    print()  # newline before prompting
                    # prompt = event[INTERRUPT][0].value
                    # human = input(f"interrupt: {prompt}: ")
                    human = input(f"interrupt: ")
                    # Resume the same run using Command(resume=...)
                    pending_input = Command(resume=human)
                    interrupted = True
                    break  # exit inner for-loop and resume

        if not interrupted:
            print()  # finish the line of streamed tokens
            return


def main():
    while True:
        try:
            thread_id = input("thread_id: ")
            user_input = input("user: ")
            if user_input.lower() in ["quit", "exit", "q"]:
                print("Goodbye!")
                break

            stream_graph(user_input, thread_id)
        except Exception as e:
            print(e)


if "__main__" == __name__:
    main()
1 Like

it works,thanks @pawel-twardziak

1 Like