I’m trying to identify nodes that trigger human interrupts using the callback. The goal is to get data like wait times, metadata about the interrupt, and the human response. It looks like no events get emitted through the callback when a node invokes interrupt(). Is my understanding correct?
hi @atc
Yes - your understanding is basically correct. In LangGraph.js, interrupt() pauses execution by throwing a special GraphInterrupt internally, which the runtime catches to checkpoint and return control to the caller, so you generally won’t see a dedicated “interrupt” event emitted via LangChain callbacks at the moment the node pauses. The interrupt() API reference explicitly calls out the “throws GraphInterrupt” behavior.
Doc: Interrupts - Docs by LangChain
How to do telemetry instead:
- Detect an interrupt: check for result.interrupt (interrupt payloads “surface as interrupt”)
- Identify which node caused it: fetch the state snapshot and inspect StateSnapshot.tasks — when dynamically interrupted, tasks include additional interrupt data; each task includes a name and interrupts. The interrupt value is also available as
task.interrupts[].value - Wait time: record a timestamp when you first observe interrupt (and/or use StateSnapshot.createdAt), keyed by (thread_id, checkpoint_id, task.id). When you later resume, compute
t(resume) - t(interrupt) - Human response: the response is whatever you pass via new Command({ resume: … }) (and it becomes the return value of interrupt() inside the node). Log that resume payload at the API boundary where you resume, or write it into state after resuming.
A simple example:
/**
* Telemetry for LangGraph.js dynamic interrupts (human-in-the-loop).
*
* What this demo shows:
* - Detect an interrupt: check `result.__interrupt__`
* - Identify which node caused it: `await graph.getState(config)` => `snapshot.tasks[*].interrupts`
* - Wait time: measure from interrupt detection to resume submission
* - Human response: capture CLI input and pass it via `new Command({ resume })`
*
* Setup:
* - Create a `.env` in repo root with:
* - OPENAI_API_KEY=...
* - POSTGRES_URI=postgresql://user:pass@host:5432/dbname
* - (optional) OPENAI_MODEL=gpt-4o-mini
* - (optional) PG_SCHEMA=public
*
* Install (if needed):
* npm i @langchain/langgraph-checkpoint-postgres
*
* Run:
* node src/telemetry_interrupts_openai_postgres.js
*/
import dotenv from "dotenv";
dotenv.config();
import { stdin as input, stdout as output } from "node:process";
import { createInterface } from "node:readline/promises";
import { ChatOpenAI } from "@langchain/openai";
import { HumanMessage, SystemMessage } from "@langchain/core/messages";
import { Annotation, Command, END, START, StateGraph, interrupt } from "@langchain/langgraph";
import { PostgresSaver } from "@langchain/langgraph-checkpoint-postgres";
function requireEnv(name) {
const value = process.env[name];
if (!value) throw new Error(`Missing required env var: ${name}`);
return value;
}
function nowIso() {
return new Date().toISOString();
}
function parseYesNo(s) {
const v = String(s ?? "").trim().toLowerCase();
if (["y", "yes", "true", "1"].includes(v)) return true;
if (["n", "no", "false", "0"].includes(v)) return false;
return null;
}
function findInterruptingTasks(snapshot) {
const tasks = Array.isArray(snapshot?.tasks) ? snapshot.tasks : [];
return tasks.filter((t) => Array.isArray(t?.interrupts) && t.interrupts.length > 0);
}
async function main() {
const OPENAI_API_KEY = requireEnv("OPENAI_API_KEY");
const POSTGRES_URI = requireEnv("POSTGRES_URI");
const modelName = process.env.OPENAI_MODEL || "gpt-4o-mini";
const schema = process.env.PG_SCHEMA || "public";
const threadId = process.env.THREAD_ID || `telemetry-demo-${Date.now()}`;
const config = { configurable: { thread_id: threadId } };
const checkpointer = PostgresSaver.fromConnString(POSTGRES_URI, { schema });
await checkpointer.setup();
const llm = new ChatOpenAI({
apiKey: OPENAI_API_KEY,
model: modelName,
temperature: 0.2,
});
const State = Annotation.Root({
request: Annotation({
default: () => "",
}),
draft: Annotation({
default: () => "",
}),
approved: Annotation({
default: () => null, // boolean | null
}),
final: Annotation({
default: () => "",
}),
});
const draftNode = async (state) => {
// Real LLM call (OpenAI) to generate content before the interrupt.
const res = await llm.invoke([
new SystemMessage(
"You write concise, professional emails. Output ONLY the email body (no subject line)."
),
new HumanMessage(
`Write an email draft for this request:\n\n${state.request}\n\nKeep it under 120 words.`
),
]);
const draft = typeof res.content === "string" ? res.content : JSON.stringify(res.content);
return { draft };
};
const approvalNode = async (state) => {
// No LLM calls here: this node is re-run on resume.
const decision = interrupt({
type: "approval",
question: "Approve sending this email draft? (y/n)",
draft: state.draft,
metadata: {
created_at: nowIso(),
thread_id: threadId,
},
});
return { approved: Boolean(decision) };
};
const finalizeNode = async (state) => {
// Second real LLM call, only happens if approved.
const res = await llm.invoke([
new SystemMessage(
"You polish emails for clarity and tone. Output ONLY the email body (no subject line)."
),
new HumanMessage(
`Improve this email draft (keep it under 120 words):\n\n${state.draft}`
),
]);
const final = typeof res.content === "string" ? res.content : JSON.stringify(res.content);
return { final };
};
const rejectNode = async (state) => {
return { final: "[REJECTED] Human did not approve sending this email." };
};
const builder = new StateGraph(State)
// Node IDs must NOT conflict with state channel names (e.g. "draft").
.addNode("make_draft", draftNode)
.addNode("human_approval", approvalNode)
.addNode("finalize", finalizeNode)
.addNode("reject", rejectNode)
.addEdge(START, "make_draft")
.addEdge("make_draft", "human_approval")
.addConditionalEdges("human_approval", (s) => (s.approved ? "finalize" : "reject"))
.addEdge("finalize", END)
.addEdge("reject", END);
const graph = builder.compile({ checkpointer });
const inputRequest =
process.env.REQUEST ||
"Ask Finance for approval to spend $1,200 on a conference ticket and travel.";
// -------- Step 1: Run until we hit an interrupt --------
const runStart = Date.now();
const result = await graph.invoke({ request: inputRequest }, config);
// Detect interrupt (recommended)
if (!result?.__interrupt__?.length) {
console.log("Graph completed without an interrupt.");
console.log(JSON.stringify({ result }, null, 2));
return;
}
const interruptedAt = Date.now();
const interruptPayloads = result.__interrupt__;
// Identify which node caused it (recommended)
const snapshot = await graph.getState(config);
const interruptingTasks = findInterruptingTasks(snapshot);
console.log("\n=== INTERRUPTED ===");
console.log("thread_id:", threadId);
console.log("interrupt payload(s):", JSON.stringify(interruptPayloads, null, 2));
console.log(
"interrupting task(s):",
JSON.stringify(
interruptingTasks.map((t) => ({
id: t.id,
name: t.name,
interrupts: t.interrupts?.map((i) => i?.value ?? i) ?? [],
})),
null,
2
)
);
// -------- Step 2: Collect “human” input + measure wait time --------
const rl = createInterface({ input, output });
const answer = await rl.question("\nApprove? (y/n): ");
await rl.close();
const parsed = parseYesNo(answer);
if (parsed === null) throw new Error(`Invalid response: "${answer}" (expected y/n)`);
const resumedAt = Date.now();
const waitMs = resumedAt - interruptedAt;
// Human response captured here (recommended)
const telemetry = {
thread_id: threadId,
interrupt_detected_at_ms: interruptedAt,
resume_submitted_at_ms: resumedAt,
wait_ms: waitMs,
human_response: parsed,
interrupt_payloads: interruptPayloads,
interrupting_tasks: interruptingTasks.map((t) => ({
id: t.id,
name: t.name,
interrupts: t.interrupts ?? [],
})),
// Helpful for correlating with persistence/checkpoints:
latest_checkpoint: snapshot?.config?.configurable ?? null,
total_runtime_ms_before_interrupt: interruptedAt - runStart,
};
console.log("\n=== TELEMETRY ===");
console.log(JSON.stringify(telemetry, null, 2));
// -------- Step 3: Resume the graph --------
const final = await graph.invoke(new Command({ resume: parsed }), config);
console.log("\n=== FINAL STATE ===");
console.log(JSON.stringify(final, null, 2));
}
await main();