Hello, I am learning langgraph and encounter some problems about `interrupt`. Below are my codes. The first interrupt works correctly but the second does not work. The workflow indeed enters the “get_input” node but it exits on the `interrupts` line without any errors. Anyone has some ideas ? Thanks.
from langgraph.graph import StateGraph, START, END
from typing import TypedDict
from random import randint
import time
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import MemorySaver
from langgraph.config import get_stream_writer
import asyncio
class MyState(TypedDict):
data: int
is_good: bool = True
class Test:
def __init__(self):
workflow = StateGraph(MyState)
workflow.add_node("node_rand", self.node_rand)
workflow.add_node("node_add", self.node_add)
workflow.add_node("stop", self.stop)
workflow.add_node("get_input", self.get_input)
workflow.add_edge(START, "node_rand")
workflow.add_edge("node_rand", "node_add")
workflow.add_edge("node_add", "stop")
workflow.add_conditional_edges("stop", self.branch, {END:END, "get_input": "get_input"})
workflow.add_edge("get_input", END)
self.graph = workflow.compile(checkpointer=MemorySaver())
def node_rand(self, state:MyState):
get_stream_writer()("This is a custom message. ")
return {"data": randint(0, 10)}
def node_add(self, state:MyState):
return {"data": state["data"] + 1}
def stop(self, state:MyState):
response = interrupt({
"value": state["data"],
"message": "Are you satisfied with this value ?"
})
return {"is_good": response}
def branch(self, state:MyState):
return END if state["is_good"] else "get_input"
def get_input(self, state:MyState):
# print("Enter get_input.")
response = interrupt("Input a new value. ")
return {"data": response}
async def run(self):
config = {"configurable": {"thread_id": "thread-1"}}
async for chunk in self.graph.astream({},
stream_mode=["values", "custom"],
config=config,
version="v2"):
yield chunk
def resume(self, data):
config = {"configurable": {"thread_id": "thread-1"}}
self.graph.invoke(Command(resume=data), config=config, version="v2")
async def run_test():
test = Test()
async for chunk in test.run():
print("chunk ==>", chunk)
if chunk['type'] == 'values' and chunk['interrupts']:
interrupts_value = chunk['interrupts'][0].value
if type(interrupts_value) is dict:
ans = input("Input yes or no: ")
test.resume(True if ans=='yes' else False)
else:
ans = int(input("Input a int: "))
test.resume(ans)
asyncio.run(run_test())