Multi-tenant / per-user checkpoint querying with AsyncPostgresSaver

Hi @Shahar19

I think checkpoint_ns is for internal purposes - a hierarchical identifier of where you are in the graph/subgraph tree. Each subgraph or nested task gets its own namespace, so its checkpoints don’t collide with others in the same thread.

It is also being used for time‑travel and subgraph state retrieval, streaming and debugging metadata as well as for handling nested graphs and scratchpads.

So, in almost all cases you should treat checkpoint_ns as internal and not hand‑craft or mutate it yourself. The runtime builds and manages it for you - you usually just propagate whatever config it gives you back when resuming/time‑traveling.

What I would go for is:

  • Keep AsyncPostgresSaver’s schema as‑is and treat it as an internal state store.
  • Make thread_id your conversation ID and ensure it is globally unique (e.g. UUID/ULID).
  • Model tenants/users outside the checkpointer:
    • a separate conversations table keyed by thread_id with tenant_id and user_id columns + RLS on that table.
    • optional/secondary: also store tenant_id / user_id in the metadata JSONB column that the Postgres checkpointer already persists and index it.
  • Do not overload checkpoint_ns or encode user into thread_id for multi‑tenancy; use them only as secondary dimensions if you really need to.
  • Leave adelete_thread’s behavior (delete by thread_id only) alone; if you need finer‑grained deletion by namespace, add a separate helper instead of changing that method’s contract.

This approach aligns with how the Postgres checkpointer is implemented and used in the LangGraph docs and tests, while giving you clean SQL and RLS control.

Practical pattern

1. Schema

Create an application‑level table that owns the tenant/user relationship and conversation metadata, and use its primary key as thread_id:

CREATE TABLE conversations (
  id           uuid PRIMARY KEY,        -- this will be your thread_id
  tenant_id    uuid NOT NULL,
  user_id      uuid NOT NULL,
  title        text,
  created_at   timestamptz NOT NULL DEFAULT now(),
  updated_at   timestamptz NOT NULL DEFAULT now()
);

Then treat checkpoints.thread_id as a foreign key to conversations.id conceptually, even if you don’t add an actual FK constraint (you can, but you’ll have to maintain it yourself since the library doesn’t create it).

When you invoke the graph:

  • Use thread_id = conversations.id in config["configurable"].
  • Keep checkpoint_ns for its intended use (graph namespace / branch), usually "" for a single graph per conversation.

You can optionally also duplicate tenant_id and user_id into metadata when writing checkpoints, e.g. using get_serializable_checkpoint_metadata(config, metadata) (called in aput) and custom code that injects those IDs into metadata before saving.

2. RLS

Put your primary RLS logic on conversations, for example:

ALTER TABLE conversations ENABLE ROW LEVEL SECURITY;

CREATE POLICY conversations_rls ON conversations
USING (
  tenant_id = current_setting('app.tenant_id')::uuid
  AND user_id = current_setting('app.user_id')::uuid
);

Then either:

  • Only ever access checkpoints via authorized thread_id values from conversations in your FastAPI layer (simplest); or
  • Add RLS on checkpoints that joins to conversations:
ALTER TABLE checkpoints ENABLE ROW LEVEL SECURITY;

CREATE POLICY checkpoints_rls ON checkpoints
USING (
  EXISTS (
    SELECT 1
    FROM conversations c
    WHERE c.id = checkpoints.thread_id
      AND c.tenant_id = current_setting('app.tenant_id')::uuid
      AND c.user_id   = current_setting('app.user_id')::uuid
  )
);

That way, row ownership lives in your own schema, not in the checkpointer internals, and you can easily query “all conversations for a user”, “all conversations for a tenant”, etc., without fighting the checkpointer’s shape.

3. How this gives you your three operations

  • List all users’ conversations (latest checkpoint per thread per user)

    • Get the list of conversations from your conversations table (it is already tenant/user‑scoped by RLS).
    • For each conversation, either:
      • Use graph.get_state() / checkpointer.aget_tuple(...) with the thread_id and default checkpoint_ns to fetch current state, or
      • Run a custom SQL query on checkpoints if you need DB‑side aggregation.
  • Fetch a specific user’s conversation (by user + thread_id)

    • First select from conversations with WHERE id = :thread_id AND tenant_id = :tenant AND user_id = :user (enforced again by RLS).
    • Then call graph.get_state / aget_tuple for that thread_id.
  • Delete a specific user’s conversation history

    • Delete from conversations (optionally with ON DELETE CASCADE if you add an FK).
    • Or call await checkpointer.adelete_thread(thread_id) from your FastAPI handler and then delete from conversations (or mark it as archived).

This keeps LangGraph’s checkpointer as a pure state store, and your own schema as the source of truth for tenants/users and conversations.

2 Likes