Goal
I’m using Python and would like to use streaming output (LLM token streaming output, i.e., stream_mode=“messages”) and use it with human-in-the-loop.
环境
langchain 0.3.27
langchain-core 0.3.76
langchain-openai 0.3.33
langchain-text-splitters 0.3.11
langgraph 0.6.7
langgraph-api 0.4.26
langgraph-checkpoint 2.1.1
langgraph-cli 0.4.2
langgraph-prebuilt 0.6.4
langgraph-runtime-inmem 0.14.0
langgraph-sdk 0.2.9
langsmith 0.4.30
Existing Attempts
When stream_mode=“updates” is set, I implemented streaming output node results by referring to the relevant documentation and also successfully integrated human-in-the-loop. The following is the relevant code.
from __future__ import annotations
from typing import Annotated
from langchain.chat_models import init_chat_model
from langchain_core.messages import HumanMessage
from langchain_core.tools import tool
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.checkpoint.serde.types import INTERRUPT
from langgraph.constants import START, END
from langgraph.graph import StateGraph, add_messages
from langgraph.prebuilt import ToolNode
from langgraph.types import interrupt, Command
from pydantic import BaseModel
# --------------------tool-------------------------
@tool
def human_assistance(query: str) -> str:
"""Request assistance from a human."""
human_response = interrupt(query)
return human_response
tools = [human_assistance]
# -----------------------llm----------------------
llm = init_chat_model("Qwen/Qwen3-30B-A3B", model_provider="openai", base_url="http://127.0.0.1:22222/v1", api_key="sk-password")
llm_with_tools = llm.bind_tools(tools)
# ---------------state-----------------------
class State(BaseModel):
messages: Annotated[list, add_messages]
# ----------------node------------------
def chat_bot(state: State):
return {"messages": llm_with_tools.invoke(state.messages)}
tool_node = ToolNode(tools)
def route_tools(state: State) -> str:
messages = state.messages if state.messages else []
if messages:
ai_message = messages[-1]
else:
raise ValueError(f"No messages found in input state to tool_edge: {state}")
if hasattr(ai_message, "tool_calls") and len(ai_message.tool_calls) > 0:
return "tool_node"
return END
# ----------------graph----------------
graph = (
StateGraph(State)
.add_node("chat_bot", chat_bot)
.add_node("tool_node", tool_node)
.add_edge(START, "chat_bot")
.add_conditional_edges(
"chat_bot",
route_tools,
{"tool_node": "tool_node", END: END},
)
.add_edge("tool_node", "chat_bot")
.compile(name="New Graph", checkpointer=InMemorySaver())
)
def stream_graph_updates(user_input: str, thread_id: str):
for event in graph.stream(
{"messages": [{"role": "user", "content": user_input}]},
config={"configurable": {"thread_id": thread_id}},
stream_mode="updates"
):
assert len(event) == 1
if INTERRUPT in event:
user_decision = input("interrupt: " + event[INTERRUPT][0].value + ": ")
stream_graph_updates(user_decision, thread_id)
else:
state_dict = list(event.values())[-1]
state_dict['messages'].pretty_print()
def main():
while True:
try:
thread_id = input("thread_id: ")
user_input = input("user: ")
if user_input.lower() in ["quit", "exit", "q"]:
print("Goodbye!")
break
stream_graph_updates(user_input, thread_id)
except Exception as e:
print(e)
if "__main__" == __name__:
main()
After running, enter the following
1
这是一个测试,我需要你调用只一次human_assistance函数,参数随机即可
The results are as follows
Issues with the above
stream_mode=“updates” does not allow LLM to output token by token; this requires stream_mode=“messages”
Issues with stream_mode=“messages”
The interrupt cannot be performed normally at this time.
The code is as follows
from __future__ import annotations
from typing import Annotated
from langchain.chat_models import init_chat_model
from langchain_core.tools import tool
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.checkpoint.serde.types import INTERRUPT
from langgraph.constants import START, END
from langgraph.graph import StateGraph, add_messages
from langgraph.prebuilt import ToolNode
from langgraph.types import interrupt
from pydantic import BaseModel
# --------------------tool-------------------------
@tool
def human_assistance(query: str) -> str:
"""Request assistance from a human."""
human_response = interrupt(query)
return human_response
tools = [human_assistance]
# -----------------------llm----------------------
llm = init_chat_model("Qwen/Qwen3-30B-A3B", model_provider="openai", base_url="http://127.0.0.1:22222/v1", api_key="sk-password")
llm_with_tools = llm.bind_tools(tools)
# ---------------state-----------------------
class State(BaseModel):
messages: Annotated[list, add_messages]
# ----------------node------------------
def chat_bot(state: State):
return {"messages": llm_with_tools.invoke(state.messages)}
tool_node = ToolNode(tools)
def route_tools(state: State) -> str:
messages = state.messages if state.messages else []
if messages:
ai_message = messages[-1]
else:
raise ValueError(f"No messages found in input state to tool_edge: {state}")
if hasattr(ai_message, "tool_calls") and len(ai_message.tool_calls) > 0:
return "tool_node"
return END
# ----------------graph----------------
graph = (
StateGraph(State)
.add_node("chat_bot", chat_bot)
.add_node("tool_node", tool_node)
.add_edge(START, "chat_bot")
.add_conditional_edges(
"chat_bot",
route_tools,
{"tool_node": "tool_node", END: END},
)
.add_edge("tool_node", "chat_bot")
.compile(name="New Graph", checkpointer=InMemorySaver())
)
def stream_graph_updates(user_input: str, thread_id: str):
# for event in graph.stream(
# {"messages": [{"role": "user", "content": user_input}]},
# config={"configurable": {"thread_id": thread_id}},
# stream_mode="updates"
# ):
# assert len(event) == 1
# if INTERRUPT in event:
# user_decision = input("interrupt: " + event[INTERRUPT][0].value + ": ")
# stream_graph_updates(user_decision, thread_id)
# else:
# state_dict = list(event.values())[-1]
# state_dict['messages'].pretty_print()
for token, metadata in graph.stream(
{"messages": [{"role": "user", "content": user_input}]},
config={"configurable": {"thread_id": thread_id}},
stream_mode="messages"
):
print(token.content, end="") # print one word
print()
def main():
while True:
try:
thread_id = input("thread_id: ")
user_input = input("user: ")
if user_input.lower() in ["quit", "exit", "q"]:
print("Goodbye!")
break
stream_graph_updates(user_input, thread_id)
except Exception as e:
print(e)
if "__main__" == __name__:
main()
The results are as follows,Obviously there is no interruption step above.
thread_id: 1
user: 这是一个测试,我需要你调用只一次human_assistance函数,参数随机即可
<think>
好的,用户让我测试一下,需要我调用一次human_assistance函数,参数随便填。首先,我需要确认这个函数的用途,根据描述,它是用来请求人类帮助的,参数是一个字符串类型的query。用户可能是在检查我是否能够正确触发工具调用,或者想看看参数传递是否正常。
接下来,我要确保只调用一次这个函数,不能多也不能少。用户特别强调“只一次”,所以得注意不要重复调用。然后参数需要是随机的,但必须符合要求。这里可能需要生成一个随机的查询字符串,比如用一些随机的单词或者句子组合。
不过用户可能没有明确说明参数的具体内容,所以只要构造一个合理的随机字符串就行。比如“随机查询参数”或者“test query 123”。需要确认参数是否在JSON对象中正确传递,键是“query”,值是字符串。同时,要确保工具调用的格式正确,用XML标签包裹,并且JSON结构有效。
另外,用户可能希望测试我的响应是否符合预期,所以调用后可能需要生成一个假的响应,但根据指示,这里只需要生成工具调用部分。不需要实际执行函数,只是模拟调用。因此,正确生成工具调用的JSON结构是关键。
最后,检查是否有其他可能的工具,但根据提供的工具列表,只有human_assistance可用。所以直接调用这个函数即可,不需要考虑其他选项。确保参数没有语法错误,比如引号闭合,逗号正确等。
</think>
thread_id:
How can I make LLM output token by token and interrupts can also be executed normally?
