I’ve been implementing a human-in-the-loop flow using LangGraph. It works fine when I use:
await graph.inoke
await graph.stream
However, when I switch to:
await graph.streamEvents()
the human approval logic (e.g., interruption/resume, Command({ resume: true })
, etc.) does not seem to trigger or resume properly.
I have debug the checkpointer. Look like the checkpoint didnt save anything when I use the await graph.streamEvents()
This is my example coding
const humanNode = async (state: typeof this.CustomAnnotation.State) => {
console.log("------------------ humanNode ----------------")
console.log("state.some_text", state.some_text)
const isApproved = interrupt({
question: "Is this correct?",
// Surface the output that should be
// reviewed and approved by the human.
llm_output: state.some_text,
})
console.log("isApproved", isApproved)
if (isApproved) {
return new Command({ goto: "some_node" })
} else {
return new Command({ goto: "another_node" })
}
}
const workflow = new StateGraph(this.createCustomAnnotation()))
.addNode("human", humanNode)
.addEdge(START, "human")
.addEdge("human", END)
const graph = workflow.compile({
checkpointer: this.memory,
})
const threadConfig = {
version: "v2" as const,
configurable: { thread_id: "some_id" },
encoding: "text/event-stream" as const,
stream_mode: "messages" as const,
}
const result = await graph.streamEvents(
{messages: [new HumanMessage("testing")]},
threadConfig
)
console.log("result", this.memory)
const result1 = await graph.streamEvents(
new Command({ resume: true }),
threadConfig
)
- The server didn’t reload, so local storage should still be intact. This works as expected with both
invoke
andstream
methods, but not withstreamEvents
.
Would appreciate any insights