Stopping endpoint for deep agents

Hi I am using Fast API and Langgraph deep agents in my application I want to implement a stop button in UI that will stop the execution need guidance to implement it how can it be done i am directlly using create_deep_agent abstraction

Hi @chinu0609

maybe sth like this:

make you graph resumable with a checkpointer + breakpoints, then drive it step‑by‑step from FastAPI.

The Stop button just means: “after the next safe checkpoint, don’t resume this thread any further”.

1. Create a resumable deep agent

from deepagents import create_deep_agent
from langgraph.checkpoint.sqlite import SqliteSaver  # or Postgres, etc.

checkpointer = SqliteSaver("deep_agent.sqlite")

agent = create_deep_agent(
    tools=my_tools,
    system_prompt="You are a helpful assistant.",
    checkpointer=checkpointer,  # critical for stop/resume
)

def config_for(thread_id: str) -> dict:
    return {"configurable": {"thread_id": thread_id}}

Any request for a given conversation uses the same thread_id.

2. Run with breakpoints so you can stop safely

Use interrupt_after="*" with astream (or stream) so the graph automatically pauses after each node.
Your driver loop decides whether to resume or stop after each pause.

from typing import AsyncIterator, Any
from langgraph.types import Command  # new-style resume; if your version uses None, see note below
from .deep_agent import agent, config_for

stop_flags: dict[str, bool] = {}  # keyed by thread_id

async def run_until_stopped_or_done(
    thread_id: str,
    first_input: dict,
) -> AsyncIterator[Any]:
    """
    Yields streamed chunks for this turn.
    Stops after:
      - graph finishes, or
      - stop_flags[thread_id] becomes True
    """
    cfg = config_for(thread_id)
    next_input: Any = first_input  # first: user message; later: Command(resume=...)

    while True:
        # Run until END or an interrupt-after-* breakpoint
        async for chunk in agent.astream(
            next_input,
            config=cfg,
            stream_mode="values",
            interrupt_after="*",  # pause after every node
        ):
            yield chunk

        # Check if graph is done (no next nodes)
        state = agent.get_state(cfg)
        if not state.next:  # or other termination check
            break

        # If user pressed stop while we were in this step, exit
        if stop_flags.get(thread_id):
            break

        # Otherwise, resume from the interrupt at the next node
        next_input = Command(resume=True)
        # If your LangGraph version uses the older API, use:
        #   next_input = None

Key points:

  • You never cancel the task mid‑node. Each loop iteration ends at a clean checkpoint.
  • State is persisted by the checkpointer, so you can resume later from the last completed node.

3. Wire it into FastAPI + a Stop endpoint

Example with WebSockets for a chat UI; SSE is conceptually identical.

from fastapi import FastAPI, WebSocket
from pydantic import BaseModel
from .runner import run_until_stopped_or_done, stop_flags

app = FastAPI()

class ClientMessage(BaseModel):
    message: str

@app.websocket("/ws/{thread_id}")
async def chat_ws(ws: WebSocket, thread_id: str):
    await ws.accept()
    while True:
        data = await ws.receive_json()
        msg = ClientMessage(**data)

        # Clear any previous stop request for this thread
        stop_flags[thread_id] = False

        first_input = {
            "messages": [{"role": "user", "content": msg.message}]
        }

        async for chunk in run_until_stopped_or_done(thread_id, first_input):
            # Forward streamed output (tokens / node outputs) to UI
            await ws.send_json(chunk)

        # At this point we either:
        # - finished the run, or
        # - hit Stop and paused at a checkpoint

@app.post("/stop/{thread_id}")
async def stop(thread_id: str):
    # Called by your UI Stop button
    stop_flags[thread_id] = True
    return {"status": "stopping"}

Frontend sketch:

  • Each conversation has a thread_id
  • When user sends a message:
    • open/keep a WebSocket to /ws/{thread_id} and send {"message": "..."}
    • render streamed chunks
  • When user clicks Stop:
    • POST /stop/{thread_id}
    • The backend will finish the current node, then see stop_flags[thread_id] and not resume further

Later, to continue the conversation, you just send another user message with the same thread_id; the deep agent resumes from the last checkpointed state.

4. About “hard kill” (if you really need it)

You can keep a map of asyncio.Tasks for each thread and call task.cancel() when /stop is hit, but:

  • You may lose the in‑flight node’s state (exact problem described in your interrupt_and_resume_at_any_time_impl.md).
  • You can’t reliably resume from that exact point.

The breakpoint pattern above is the LangGraph‑native, safe approach and is what I’d recommend for deep agents.

Minimal “hard kill” sketch with asyncio.Task.cancel()

Important: this cancels mid‑node and can lose in‑flight state; use only if you truly want a hard kill.

import asyncio
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from deepagents import create_deep_agent

app = FastAPI()

agent = create_deep_agent(tools=[], system_prompt="You are a helpful assistant.")

class RunRequest(BaseModel):
    message: str

# Track one background task per thread/conversation
running_tasks: dict[str, asyncio.Task] = {}


@app.post("/run/{thread_id}")
async def start_run(thread_id: str, req: RunRequest):
    # prevent overlapping runs for same thread_id
    if thread_id in running_tasks and not running_tasks[thread_id].done():
        raise HTTPException(status_code=409, detail="Run already in progress")

    async def worker():
        try:
            # You can also use astream / astream_events here
            async for event in agent.astream_events(
                {"messages": [{"role": "user", "content": req.message}]},
                config={"configurable": {"thread_id": thread_id}},
            ):
                # TODO: forward event to client (WebSocket, queue, etc.)
                ...
        except asyncio.CancelledError:
            # Hard kill happened; current node may be partially executed.
            # Do any cleanup/logging you need here.
            raise

    task = asyncio.create_task(worker())
    running_tasks[thread_id] = task
    return {"status": "started"}


@app.post("/stop/{thread_id}")
async def stop_run(thread_id: str):
    task = running_tasks.get(thread_id)
    if task is None or task.done():
        return {"status": "no-active-task"}

    # HARD KILL: cancel the background task immediately
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        pass

    return {"status": "cancelled"}

This is the core pattern: start = create and store an asyncio.Task that runs the deep agent; stop = look up the task by thread_id and call task.cancel() to kill it.


If you share how you’re currently calling create_deep_agent in FastAPI (sync vs async, invoke vs stream), I can adapt this to your exact code with minimal changes.

1 Like

I will try it thank you.

1 Like

I forgot about a simple cleanup to prevent mem leak

@app.post("/run/{thread_id}")
async def start_run(thread_id: str, req: RunRequest):
    if thread_id in running_tasks and not running_tasks[thread_id].done():
        raise HTTPException(status_code=409, detail="Run already in progress")

    async def worker():
        try:
            async for event in agent.astream_events(
                {"messages": [{"role": "user", "content": req.message}]},
                config={"configurable": {"thread_id": thread_id}},
            ):
                ...
        except asyncio.CancelledError:
            raise
        finally:
            # Clean up if this task is still the active one for this thread_id
            current = running_tasks.get(thread_id)
            if current is asyncio.current_task():
                running_tasks.pop(thread_id, None)

    task = asyncio.create_task(worker())
    running_tasks[thread_id] = task
    return {"status": "started"}
1 Like

Hi @pawel-twardziak

The example is very helpful, but I have a question.

If the agent is running several tools one after another and I want to stop the execution (because I don’t want those tools to be executed) and prevent the rest of the tools from running, is it possible to continue the conversation without executing the remaining tools?

PS: Happy New Year!

@pawel-twardziak how would you implement a generic cancellation workflow for any kind of agent to achieve the UX similar to the popular chatbots (user may hit stop mid-LLM-generation, see incomplete message, and still be able to continue the conversation)? is it even possible currently with LangGraph Cloud where you have little control over how your graph is invoked?