Architectural Help For Production Chat App

I have been trying to do is make a production level chat app using langchain in the backend.
I am using the `checkpointer` provided by langchain for the storage as the only source of truth of the chat (apart form the thread related information). But I am not sure if having checkpointer as the only source of truth is the best thing to do.

Or should I have , a seperate table , that also stores `Messages,`Artifacts` ? It would be really helpful if someone could spread some light on this topic.

I know the answer depends, but as per the implementation of `checkpointerits really difficult to get the messages when there are interrupts involved. And when i usecheckpointer` along with custom storage (Messages, Artifacts table), would that not be , duplication of data ?

I am using the `PostgresCheckpointer`.

Here are the problem that I am facing when using checkpointer only

  • Difficulty fetching all the message when I have some kind of interrupt , only gives me message till the second last turn, the latest turn where the message was sent , gets left out from the `app.getState(), so I have to nest and fetch using ,`state.tasks`
  • For long threads I use compaction , so it becomes difficult to manage the actual UI state (all the messages), and the active message (that the agent gets)

I have been thinking of migrating to architecture, where I have `Messages` and checkpointer snapshot seperatly. Such that the messages for the UI state are kept in this Messages table and all the ai related stuff in checkpointer. I just want to discuss. What I need to do !

Thanks for your support ! : )

hi @ENMA

Actually, this is a really common architectural challenge once you move from prototyping to production with LangGraph. Let me address each of your concerns.

1. Why getState() Misses the Latest Turn During Interrupts

not a bug - it’s how checkpointers are designed, but it’s definitely confusing. Here’s what’s happening under the hood.

When you call app.getState(config) without specifying a checkpoint_id, LangGraph sets applyPendingWrites = true internally:

// Source: langgraph-core/src/pregel/index.ts (line 985)
const snapshot = await this._prepareStateSnapshot({
  config: mergedConfig,
  saved,
  subgraphCheckpointer: options?.subgraphs ? checkpointer : undefined,
  applyPendingWrites: !config.configurable?.checkpoint_id,
});

Even with applyPendingWrites = true, here’s the catch: when _prepareStateSnapshot() processes pending writes, it skips special channels like ERROR, INTERRUPT, and SCHEDULED:

// Source: langgraph-core/src/pregel/index.ts (lines 870-880)
for (const [taskId, channel, value] of saved.pendingWrites) {
  if ([ERROR, INTERRUPT, SCHEDULED].includes(channel)) {
    continue;
  }
  if (!(taskId in nextTaskById)) {
    continue;
  }
  nextTaskById[taskId].writes.push([String(channel), value]);
}

Additionally, the tasksWithWrites() function in debug.ts explicitly returns result: undefined when there are interrupts for a task:

// Source: langgraph-core/src/pregel/debug.ts (line 250)
const result = (() => {
  if (error || interrupts.length || !pendingWrites.length) return undefined;
  // ...
})();

This means that during an interrupt, the checkpoint represents the state before the interrupted node executed. The writes from the interrupted node are stored as pendingWrites but may not be fully merged into state.values.

Accessing state.tasks is the intended way to get the latest writes during an interrupt. Each task in state.tasks carries its own interrupts and partial results. To reconstruct the full message history during an interrupt, you’d do something like:

const state = await app.getState({ configurable: { thread_id } });

// Messages from committed checkpoints (up to second-last turn)
const committedMessages = state.values.messages;

// Latest messages from the interrupted task
for (const task of state.tasks) {
  if (task.interrupts?.length) {
    // task.result may contain the latest AI message with tool calls
    // task.interrupts contains the interrupt values
  }
}

This is documented in the LangGraph Persistence docs:

“A checkpoint is a snapshot of the graph state saved at each super-step.”

The interrupt happens mid-super-step, so the checkpoint captures state before that step. The in-flight writes live in pendingWrites / state.tasks.

Your proposal to split storage into:

  • messages table → UI-facing, complete conversation history
  • checkpointer → agent execution state (active context, tool calls, pending actions)

…is a valid and production-proven pattern.

The checkpointer is designed for agent execution state, not UI state

From the official docs:

“Checkpointers create snapshots of graph state at each super-step… enabling human-in-the-loop workflows, conversational memory, time travel debugging, and fault-tolerant execution.”

It’s optimized for resumability and fault tolerance - not for being a general-purpose message store. The data model (checkpoints → blobs → writes) is structured around execution steps, not around messages-as-a-resource.

So…

It’s not really “data duplication” - it’s separation of concerns

Think of it this way:

  • your messages table is the system of record for what the user sees. It stores the complete, ordered conversation history in a query-friendly format.
  • the checkpointer is the execution engine’s scratchpad. It stores whatever the agent needs to resume: channel values, pending writes, version vectors, etc.

They serve different access patterns:

Concern Messages Table Checkpointer
Query pattern “Get all messages for thread X” “Resume execution from step N”
Lifecycle Lives forever Can be pruned/compacted
Schema Simple: id, thread_id, role, content, metadata, created_at Complex: blobs, writes, version vectors
Consumers UI, search, analytics, export Agent runtime only

@ENMA could you share your code? It would be easier to provide an accurate solution.

Thanks a lot @pawel-twardziak

"use server";

import { ChatMessage } from "@langchain/core/messages";
import { type StateSnapshot } from "@langchain/langgraph";

import { getAllweoneAgent } from "@/ai/agents/allweone/graph";
import { type ALLWEONEAgentState } from "@/ai/agents/allweone/types";
import {
  type ArtifactUnion,
  type TravelPlannerArtifact,
} from "@/hooks/chat/types/chat";
import { normalizeRagProcessingStatus } from "@/lib/rag/processing-status";
import { parseTravelPlannerItinerary } from "@/lib/ai/uiMessageParts";
import { safeJsonParse } from "@/lib/utils";
import { toHydratedUiMessages } from "@/server/ai/chatMessages";
import { auth } from "@/server/auth";
import { resolveChatAccess } from "@/server/chat/access";
import { db } from "@/server/db";
import { normalizeShareEmail } from "@/server/share/utils";

export async function getMessages(chatId: string) {
  const session = await auth();
  const userId = session?.user.id ?? null;
  const userEmail = session?.user.email
    ? normalizeShareEmail(session.user.email)
    : null;

  const app = await getAllweoneAgent(); // Langgraph agent 
 
  // Fetch chat with all needed data in a single query
  const chat = await db.chat.findUnique({
    where: {
      id: chatId,
    },
    include: {
      files: {
        include: {
          rag: {
            include: {
              fileAsset: {
                select: {
                  id: true,
                  url: true,
                  userId: true,
                },
              },
            },
          },
        },
      },
    },
  });

  if (!chat) {
    throw new Error("Unauthorized: Chat not found or access denied");
  }

  const isOwner = Boolean(userId && chat.userId === userId);
  const access = !isOwner
    ? await resolveChatAccess(chatId, {
        userId,
        userEmail,
      })
    : null;

  if (!isOwner) {
    if (!access?.canRead) {
      throw new Error("Unauthorized: Chat not found or access denied");
    }
  }

  const canPreviewRagInUi = (file: (typeof chat.files)[number]) => {
    if (userId && file.rag.fileAsset?.userId === userId) {
      return true;
    }

    return Boolean(access?.hasDirectShareAccess);
  };

  // Fetch agent state (no need to re-fetch chat from DB)
  const state = await app
    .getState(
      {
        configurable: {
          thread_id: chatId,
        },
      },
      { subgraphs: true },
    )
    .catch(() => null);

  let allMessages = state
    ? [
        ...((state.values as ALLWEONEAgentState)?.messages ?? []),
        ...((state.values as ALLWEONEAgentState)?.activeMessages ?? []),
      ].filter(
        (message) => message.type !== "system" && message.type !== "developer",
      )
    : [];

  if (state?.tasks && state.tasks.length > 0) {
    // Find tasks with interrupts
    const taskWithInterrupt = state.tasks.find(
      (task) => task.interrupts && task.interrupts.length > 0,
    );
    if (taskWithInterrupt) {
      const newState = (
        taskWithInterrupt.state as { tasks: { state: StateSnapshot }[] }
      ).tasks[0]?.state;

      if (newState) {
        allMessages = [
          ...newState.values.messages,
          ...newState.values.activeMessages,
        ].filter(
          (message) =>
            message.type !== "system" && message.type !== "developer",
        );
      }
    }
  }

  const uniqueMessagesMap = new Map();
  allMessages.forEach((message) => {
    const messageId = message.id;
    const messageKey = messageId || `${message.content}-${message.getType()}`;

    if (!uniqueMessagesMap.has(messageKey)) {
      uniqueMessagesMap.set(messageKey, message);
    }
  });

  const uniqueMessages = Array.from(uniqueMessagesMap.values());

  // Process messages to convert AI messages with response_metadata.from to artifacts
  const processedMessages: typeof uniqueMessages = [];
  const artifacts: ArtifactUnion[] = [];

  const totalChatFileCount = chat.files.length;
  let previewableFileCount = 0;

  if (chat?.files) {
    chat.files.forEach((file) => {
      if (!canPreviewRagInUi(file)) {
        return;
      }

      previewableFileCount += 1;
      artifacts.push({
        type: "pdf",
        data: {
          fileName: file.rag.fileName ?? "File",
          fileUrl: file.rag.fileAsset?.url ?? file.rag.fileUrl,
          processingStatus: normalizeRagProcessingStatus(
            file.rag.processingStatus,
          ),
          pageCount: file.rag.pageCount ?? undefined,
          chatId: chat.id,
        },
        id: file.rag.id,
        title: file.rag.fileName ?? "File",
        isComplete: file.rag.processingStatus === "COMPLETE",
        createdAt: file.rag.createdAt.getTime(),
        updatedAt: file.rag.updatedAt.getTime(),
      });
    });
  }

  uniqueMessages.forEach((message) => {
    // Check if this is an AI message with response_metadata containing "from"
    if (
      message.getType() === "ai" &&
      message.response_metadata &&
      typeof message.response_metadata === "object" &&
      "from" in message.response_metadata &&
      typeof message.response_metadata.from === "string"
    ) {
      if (message.response_metadata.isRouter) {
        processedMessages.push(
          new ChatMessage({
            role: "router",
            content: message.content,
            name: "router",
          }),
        );
        return;
      }

      if (message.response_metadata.from === "travelPlanner") {
        processedMessages.push(message);
        const artifact: TravelPlannerArtifact = {
          type: "travelPlanner",
          title: "Trip Planner",
          id: message.additional_kwargs.artifactId,
          isComplete: true,
          createdAt: Date.now(),
          updatedAt: Date.now(),
          data: {
            itinerary: parseTravelPlannerItinerary(message.content as string),
            mapMarkers: [],
          },
        };

        artifacts.push(artifact);
        return;
      }

      const artifactType = message.response_metadata.from;
      const additionalData = message.response_metadata.data ?? {};
      // Convert to ChatMessage with artifact role
      const artifactMessage = new ChatMessage({
        role: "artifact",
        content: JSON.stringify({
          ...additionalData,
        }),
        name: artifactType,
        additional_kwargs: message.additional_kwargs || {},
        response_metadata: message.response_metadata || {},
        id: message.id,
      });

      // Set the ID to match the original message
      processedMessages.push(artifactMessage);

      // Try to parse the content as an artifact and add to artifacts array
      const messageContent = message.content as string;
      const parsedContent = safeJsonParse(messageContent);

      if (parsedContent) {
        const artifact: ArtifactUnion = {
          type: artifactType,
          title: additionalData.title ?? artifactType,
          id: message.id,
          isComplete: true,
          createdAt: Date.now(),
          updatedAt: Date.now(),
          data: {
            content: parsedContent,
            ...additionalData,
          },
        };

        artifacts.push(artifact);
      }
    } else {
      // Keep other messages as they are
      processedMessages.push(message);
    }
  });

  // Extract suggestion data from the agent state
  const suggestion = state
    ? (state.values as ALLWEONEAgentState).suggestion
    : undefined;

  return {
    messages: toHydratedUiMessages(processedMessages),
    chatName: chat?.name || "Untitled Chat",
    dbiId: chat?.dbiId ?? null,
    suggestion,
    artifacts,
    viewer: {
      isAuthenticated: Boolean(userId),
      isOwner,
      canEdit: isOwner || Boolean(access?.canEdit),
      canManageShare: isOwner || Boolean(access?.canManageShare),
      canClone: Boolean(
        userId &&
          !isOwner &&
          (access?.canRead ?? false) &&
          !(access?.canEdit ?? false),
      ),
      agentOnlyFileCount: Math.max(totalChatFileCount - previewableFileCount, 0),
    },
  };
}

Its a huge mess of a code. But here is the code. That i am using for getting all the message for a particular thread right now.

  • Thread is stored in the Chat table.
  • I have authentication and access validation code
  • I have interrupts for some tools
  • I have files that can be viewed as artifacts for a particular chat
  • I have specific agent whose response need to be displayed in an artifact (travel agent in this case)

This code is a mess and I need to clean it up. ; )

@ENMA lemme look at it

Okay, Please do : )

Alright, today :slight_smile: the weekend just ended hehe

FYI I’m still on it…

No problem : )