- When using the Interrupts mechanism in LangGraph, how should the graph utilize
ainvoke? - When multiple interrupts exist, how to integrate (the Interrupts mechanism) with other frameworks (e.g., the FastAPI framework)? How can the backend obtain the result of the user’s click?
Below is the error I encountered after converting the official example to use asynchronous calls… I don’t quite understand why it raises a RuntimeError: Called get_config outside of a runnable context, even though I clearly passed the config.
from typing import Literal, Optional, TypedDict
import asyncio
from langgraph.checkpoint.memory import MemorySaver, InMemorySaver
from langgraph.graph import StateGraph, START, END
from langgraph.types import Command, interrupt
class ApprovalState(TypedDict):
action_details: str
status: Optional[Literal["pending", "approved", "rejected"]]
def approval_node(state: ApprovalState) -> Command[Literal["proceed", "cancel"]]:
# Expose details so the caller can render them in a UI
decision = interrupt({
"question": "Approve this action?",
"details": state["action_details"],
})
# Route to the appropriate node after resume
return Command(goto="proceed" if decision else "cancel")
def proceed_node(state: ApprovalState):
return {"status": "approved"}
def cancel_node(state: ApprovalState):
return {"status": "rejected"}
async def main():
builder = StateGraph(ApprovalState)
builder.add_node("approval", approval_node)
builder.add_node("proceed", proceed_node)
builder.add_node("cancel", cancel_node)
builder.add_edge(START, "approval")
builder.add_edge("proceed", END)
builder.add_edge("cancel", END)
# Use a more durable checkpointer in production
checkpointer = InMemorySaver()
graph = builder.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "approval-123"}}
initial = await graph.ainvoke( # 改为ainvoke
{"action_details": "Transfer $500", "status": "pending"},
config=config,
)
print(initial["__interrupt__"]) # -> [Interrupt(value={'question': ..., 'details': ...})]
resumed = await graph.ainvoke(Command(resume=True), config=config) # 改为ainvoke
print(resumed["status"]) # -> "approved"
if __name__ == '__main__':
asyncio.run(main())
File “D:\anaconda3\envs\first-pro\lib\site-packages\langgraph\types.py”, line 500, in interrupt
conf = get_config()[“configurable”]
File “D:\anaconda3\envs\first-pro\lib\site-packages\langgraph\config.py”, line 29, in get_config
raise RuntimeError(“Called get_config outside of a runnable context”)
RuntimeError: Called get_config outside of a runnable context