Hi I am using Fast API and Langgraph deep agents in my application I want to implement a stop button in UI that will stop the execution need guidance to implement it how can it be done i am directlly using create_deep_agent abstraction
Hi @chinu0609
maybe sth like this:
make you graph resumable with a checkpointer + breakpoints, then drive it step‑by‑step from FastAPI.
The Stop button just means: “after the next safe checkpoint, don’t resume this thread any further”.
1. Create a resumable deep agent
from deepagents import create_deep_agent
from langgraph.checkpoint.sqlite import SqliteSaver # or Postgres, etc.
checkpointer = SqliteSaver("deep_agent.sqlite")
agent = create_deep_agent(
tools=my_tools,
system_prompt="You are a helpful assistant.",
checkpointer=checkpointer, # critical for stop/resume
)
def config_for(thread_id: str) -> dict:
return {"configurable": {"thread_id": thread_id}}
Any request for a given conversation uses the same thread_id.
2. Run with breakpoints so you can stop safely
Use interrupt_after="*" with astream (or stream) so the graph automatically pauses after each node.
Your driver loop decides whether to resume or stop after each pause.
from typing import AsyncIterator, Any
from langgraph.types import Command # new-style resume; if your version uses None, see note below
from .deep_agent import agent, config_for
stop_flags: dict[str, bool] = {} # keyed by thread_id
async def run_until_stopped_or_done(
thread_id: str,
first_input: dict,
) -> AsyncIterator[Any]:
"""
Yields streamed chunks for this turn.
Stops after:
- graph finishes, or
- stop_flags[thread_id] becomes True
"""
cfg = config_for(thread_id)
next_input: Any = first_input # first: user message; later: Command(resume=...)
while True:
# Run until END or an interrupt-after-* breakpoint
async for chunk in agent.astream(
next_input,
config=cfg,
stream_mode="values",
interrupt_after="*", # pause after every node
):
yield chunk
# Check if graph is done (no next nodes)
state = agent.get_state(cfg)
if not state.next: # or other termination check
break
# If user pressed stop while we were in this step, exit
if stop_flags.get(thread_id):
break
# Otherwise, resume from the interrupt at the next node
next_input = Command(resume=True)
# If your LangGraph version uses the older API, use:
# next_input = None
Key points:
- You never cancel the task mid‑node. Each loop iteration ends at a clean checkpoint.
- State is persisted by the checkpointer, so you can resume later from the last completed node.
3. Wire it into FastAPI + a Stop endpoint
Example with WebSockets for a chat UI; SSE is conceptually identical.
from fastapi import FastAPI, WebSocket
from pydantic import BaseModel
from .runner import run_until_stopped_or_done, stop_flags
app = FastAPI()
class ClientMessage(BaseModel):
message: str
@app.websocket("/ws/{thread_id}")
async def chat_ws(ws: WebSocket, thread_id: str):
await ws.accept()
while True:
data = await ws.receive_json()
msg = ClientMessage(**data)
# Clear any previous stop request for this thread
stop_flags[thread_id] = False
first_input = {
"messages": [{"role": "user", "content": msg.message}]
}
async for chunk in run_until_stopped_or_done(thread_id, first_input):
# Forward streamed output (tokens / node outputs) to UI
await ws.send_json(chunk)
# At this point we either:
# - finished the run, or
# - hit Stop and paused at a checkpoint
@app.post("/stop/{thread_id}")
async def stop(thread_id: str):
# Called by your UI Stop button
stop_flags[thread_id] = True
return {"status": "stopping"}
Frontend sketch:
- Each conversation has a
thread_id - When user sends a message:
- open/keep a WebSocket to
/ws/{thread_id}and send{"message": "..."} - render streamed chunks
- open/keep a WebSocket to
- When user clicks Stop:
POST /stop/{thread_id}- The backend will finish the current node, then see stop_flags[thread_id] and not resume further
Later, to continue the conversation, you just send another user message with the same thread_id; the deep agent resumes from the last checkpointed state.
4. About “hard kill” (if you really need it)
You can keep a map of asyncio.Tasks for each thread and call task.cancel() when /stop is hit, but:
- You may lose the in‑flight node’s state (exact problem described in your
interrupt_and_resume_at_any_time_impl.md). - You can’t reliably resume from that exact point.
The breakpoint pattern above is the LangGraph‑native, safe approach and is what I’d recommend for deep agents.
Minimal “hard kill” sketch with asyncio.Task.cancel()
Important: this cancels mid‑node and can lose in‑flight state; use only if you truly want a hard kill.
import asyncio
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from deepagents import create_deep_agent
app = FastAPI()
agent = create_deep_agent(tools=[], system_prompt="You are a helpful assistant.")
class RunRequest(BaseModel):
message: str
# Track one background task per thread/conversation
running_tasks: dict[str, asyncio.Task] = {}
@app.post("/run/{thread_id}")
async def start_run(thread_id: str, req: RunRequest):
# prevent overlapping runs for same thread_id
if thread_id in running_tasks and not running_tasks[thread_id].done():
raise HTTPException(status_code=409, detail="Run already in progress")
async def worker():
try:
# You can also use astream / astream_events here
async for event in agent.astream_events(
{"messages": [{"role": "user", "content": req.message}]},
config={"configurable": {"thread_id": thread_id}},
):
# TODO: forward event to client (WebSocket, queue, etc.)
...
except asyncio.CancelledError:
# Hard kill happened; current node may be partially executed.
# Do any cleanup/logging you need here.
raise
task = asyncio.create_task(worker())
running_tasks[thread_id] = task
return {"status": "started"}
@app.post("/stop/{thread_id}")
async def stop_run(thread_id: str):
task = running_tasks.get(thread_id)
if task is None or task.done():
return {"status": "no-active-task"}
# HARD KILL: cancel the background task immediately
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
return {"status": "cancelled"}
This is the core pattern: start = create and store an asyncio.Task that runs the deep agent; stop = look up the task by thread_id and call task.cancel() to kill it.
If you share how you’re currently calling create_deep_agent in FastAPI (sync vs async, invoke vs stream), I can adapt this to your exact code with minimal changes.
I will try it thank you.
I forgot about a simple cleanup to prevent mem leak
@app.post("/run/{thread_id}")
async def start_run(thread_id: str, req: RunRequest):
if thread_id in running_tasks and not running_tasks[thread_id].done():
raise HTTPException(status_code=409, detail="Run already in progress")
async def worker():
try:
async for event in agent.astream_events(
{"messages": [{"role": "user", "content": req.message}]},
config={"configurable": {"thread_id": thread_id}},
):
...
except asyncio.CancelledError:
raise
finally:
# Clean up if this task is still the active one for this thread_id
current = running_tasks.get(thread_id)
if current is asyncio.current_task():
running_tasks.pop(thread_id, None)
task = asyncio.create_task(worker())
running_tasks[thread_id] = task
return {"status": "started"}
The example is very helpful, but I have a question.
If the agent is running several tools one after another and I want to stop the execution (because I don’t want those tools to be executed) and prevent the rest of the tools from running, is it possible to continue the conversation without executing the remaining tools?
PS: Happy New Year!