Thanks a lot @pawel-twardziak
"use server";
import { ChatMessage } from "@langchain/core/messages";
import { type StateSnapshot } from "@langchain/langgraph";
import { getAllweoneAgent } from "@/ai/agents/allweone/graph";
import { type ALLWEONEAgentState } from "@/ai/agents/allweone/types";
import {
type ArtifactUnion,
type TravelPlannerArtifact,
} from "@/hooks/chat/types/chat";
import { normalizeRagProcessingStatus } from "@/lib/rag/processing-status";
import { parseTravelPlannerItinerary } from "@/lib/ai/uiMessageParts";
import { safeJsonParse } from "@/lib/utils";
import { toHydratedUiMessages } from "@/server/ai/chatMessages";
import { auth } from "@/server/auth";
import { resolveChatAccess } from "@/server/chat/access";
import { db } from "@/server/db";
import { normalizeShareEmail } from "@/server/share/utils";
export async function getMessages(chatId: string) {
const session = await auth();
const userId = session?.user.id ?? null;
const userEmail = session?.user.email
? normalizeShareEmail(session.user.email)
: null;
const app = await getAllweoneAgent(); // Langgraph agent
// Fetch chat with all needed data in a single query
const chat = await db.chat.findUnique({
where: {
id: chatId,
},
include: {
files: {
include: {
rag: {
include: {
fileAsset: {
select: {
id: true,
url: true,
userId: true,
},
},
},
},
},
},
},
});
if (!chat) {
throw new Error("Unauthorized: Chat not found or access denied");
}
const isOwner = Boolean(userId && chat.userId === userId);
const access = !isOwner
? await resolveChatAccess(chatId, {
userId,
userEmail,
})
: null;
if (!isOwner) {
if (!access?.canRead) {
throw new Error("Unauthorized: Chat not found or access denied");
}
}
const canPreviewRagInUi = (file: (typeof chat.files)[number]) => {
if (userId && file.rag.fileAsset?.userId === userId) {
return true;
}
return Boolean(access?.hasDirectShareAccess);
};
// Fetch agent state (no need to re-fetch chat from DB)
const state = await app
.getState(
{
configurable: {
thread_id: chatId,
},
},
{ subgraphs: true },
)
.catch(() => null);
let allMessages = state
? [
...((state.values as ALLWEONEAgentState)?.messages ?? []),
...((state.values as ALLWEONEAgentState)?.activeMessages ?? []),
].filter(
(message) => message.type !== "system" && message.type !== "developer",
)
: [];
if (state?.tasks && state.tasks.length > 0) {
// Find tasks with interrupts
const taskWithInterrupt = state.tasks.find(
(task) => task.interrupts && task.interrupts.length > 0,
);
if (taskWithInterrupt) {
const newState = (
taskWithInterrupt.state as { tasks: { state: StateSnapshot }[] }
).tasks[0]?.state;
if (newState) {
allMessages = [
...newState.values.messages,
...newState.values.activeMessages,
].filter(
(message) =>
message.type !== "system" && message.type !== "developer",
);
}
}
}
const uniqueMessagesMap = new Map();
allMessages.forEach((message) => {
const messageId = message.id;
const messageKey = messageId || `${message.content}-${message.getType()}`;
if (!uniqueMessagesMap.has(messageKey)) {
uniqueMessagesMap.set(messageKey, message);
}
});
const uniqueMessages = Array.from(uniqueMessagesMap.values());
// Process messages to convert AI messages with response_metadata.from to artifacts
const processedMessages: typeof uniqueMessages = [];
const artifacts: ArtifactUnion[] = [];
const totalChatFileCount = chat.files.length;
let previewableFileCount = 0;
if (chat?.files) {
chat.files.forEach((file) => {
if (!canPreviewRagInUi(file)) {
return;
}
previewableFileCount += 1;
artifacts.push({
type: "pdf",
data: {
fileName: file.rag.fileName ?? "File",
fileUrl: file.rag.fileAsset?.url ?? file.rag.fileUrl,
processingStatus: normalizeRagProcessingStatus(
file.rag.processingStatus,
),
pageCount: file.rag.pageCount ?? undefined,
chatId: chat.id,
},
id: file.rag.id,
title: file.rag.fileName ?? "File",
isComplete: file.rag.processingStatus === "COMPLETE",
createdAt: file.rag.createdAt.getTime(),
updatedAt: file.rag.updatedAt.getTime(),
});
});
}
uniqueMessages.forEach((message) => {
// Check if this is an AI message with response_metadata containing "from"
if (
message.getType() === "ai" &&
message.response_metadata &&
typeof message.response_metadata === "object" &&
"from" in message.response_metadata &&
typeof message.response_metadata.from === "string"
) {
if (message.response_metadata.isRouter) {
processedMessages.push(
new ChatMessage({
role: "router",
content: message.content,
name: "router",
}),
);
return;
}
if (message.response_metadata.from === "travelPlanner") {
processedMessages.push(message);
const artifact: TravelPlannerArtifact = {
type: "travelPlanner",
title: "Trip Planner",
id: message.additional_kwargs.artifactId,
isComplete: true,
createdAt: Date.now(),
updatedAt: Date.now(),
data: {
itinerary: parseTravelPlannerItinerary(message.content as string),
mapMarkers: [],
},
};
artifacts.push(artifact);
return;
}
const artifactType = message.response_metadata.from;
const additionalData = message.response_metadata.data ?? {};
// Convert to ChatMessage with artifact role
const artifactMessage = new ChatMessage({
role: "artifact",
content: JSON.stringify({
...additionalData,
}),
name: artifactType,
additional_kwargs: message.additional_kwargs || {},
response_metadata: message.response_metadata || {},
id: message.id,
});
// Set the ID to match the original message
processedMessages.push(artifactMessage);
// Try to parse the content as an artifact and add to artifacts array
const messageContent = message.content as string;
const parsedContent = safeJsonParse(messageContent);
if (parsedContent) {
const artifact: ArtifactUnion = {
type: artifactType,
title: additionalData.title ?? artifactType,
id: message.id,
isComplete: true,
createdAt: Date.now(),
updatedAt: Date.now(),
data: {
content: parsedContent,
...additionalData,
},
};
artifacts.push(artifact);
}
} else {
// Keep other messages as they are
processedMessages.push(message);
}
});
// Extract suggestion data from the agent state
const suggestion = state
? (state.values as ALLWEONEAgentState).suggestion
: undefined;
return {
messages: toHydratedUiMessages(processedMessages),
chatName: chat?.name || "Untitled Chat",
dbiId: chat?.dbiId ?? null,
suggestion,
artifacts,
viewer: {
isAuthenticated: Boolean(userId),
isOwner,
canEdit: isOwner || Boolean(access?.canEdit),
canManageShare: isOwner || Boolean(access?.canManageShare),
canClone: Boolean(
userId &&
!isOwner &&
(access?.canRead ?? false) &&
!(access?.canEdit ?? false),
),
agentOnlyFileCount: Math.max(totalChatFileCount - previewableFileCount, 0),
},
};
}
Its a huge mess of a code. But here is the code. That i am using for getting all the message for a particular thread right now.
- Thread is stored in the Chat table.
- I have authentication and access validation code
- I have interrupts for some tools
- I have files that can be viewed as artifacts for a particular chat
- I have specific agent whose response need to be displayed in an artifact (travel agent in this case)
This code is a mess and I need to clean it up. ; )