I’ve implemented the AsyncPostgresSaver as my checkpointer and connected it to my database. However I keep getting the errors: "Object of type HumanMessage is not JSON serializable"
and "Object of type AIMessage is not JSON serializable"
when I invoke the graph with graph.astream_events
. I fix the HumanMessage error with
messages = [HumanMessage(content=content)]
messages = dumps(messages, ensure_ascii=False)
But I still get the Object of type AIMessage is not JSON serializable error.
AsyncPostgresSaver implementation
Class Checkpointer:
"""Class to handle memory checkpointing and environment variable management."""
@staticmethod
def is_langgraph_studio_session() -> bool:
"""Check if the application is running in LangGraph API environment.
Returns:
bool: True if running in LangGraph API environment
"""
return os.getenv("IS_LANGGRAPH_STUDIO_SESSION", "false").lower() == "true"
@staticmethod
# async def get_graph_with_memory_saver(graph: StateGraph):
def get_graph_with_memory_saver(graph: StateGraph):
"""Compile the graph with appropriate memory saver based on the environment.
Args:
graph (BaseGraph): The graph to compile
Returns:
Compiled graph with appropriate memory saver configuration
"""
# Don't use custom checkpointer when running in LangGraph Studio
if Checkpointer.is_langgraph_studio_session():
return graph.compile()
else:
db = os.getenv("DATABASE_URL")
if not db:
raise RuntimeError("DATABASE_URL missing")
saver_cm = AsyncPostgresSaver.from_conn_string(
db, serde= JsonPlusSerializer(pickle_fallback=True)
)
# unwrap context manager to actual saver with Asynchronous Context Managers
saver = asyncio.get_event_loop().run_until_complete(saver_cm.__aenter__())
return graph.compile(checkpointer=saver)
I’ve tried this implementation as well:
async with AsyncPostgresSaver.from_conn_string(DB_URI) as checkpointer:
Here is the table in my database that I’ve made myself so I don’t call .setup()
CREATE TABLE checkpoint_migrations (
v INTEGER NOT NULL PRIMARY KEY
);
CREATE TABLE checkpoints (
thread_id TEXT NOT NULL,
checkpoint_ns TEXT NOT NULL DEFAULT '',
checkpoint_id TEXT NOT NULL,
parent_checkpoint_id TEXT,
type TEXT,
checkpoint JSONB NOT NULL,
metadata JSONB NOT NULL DEFAULT '{}',
PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id)
);
CREATE TABLE checkpoint_blobs (
thread_id TEXT NOT NULL,
checkpoint_ns TEXT NOT NULL DEFAULT '',
channel TEXT NOT NULL,
version TEXT NOT NULL,
type TEXT NOT NULL,
blob BYTEA,
PRIMARY KEY (thread_id, checkpoint_ns, channel, version)
);
CREATE TABLE checkpoint_writes (
thread_id TEXT NOT NULL,
checkpoint_ns TEXT NOT NULL DEFAULT '',
checkpoint_id TEXT NOT NULL,
task_id TEXT NOT NULL,
task_path TEXT NOT NULL,
idx INTEGER NOT NULL,
channel TEXT NOT NULL,
type TEXT,
blob BYTEA NOT NULL,
PRIMARY KEY (thread_id, checkpoint_ns, checkpoint_id, task_id, idx)
);
I’m not sure why this error is happening, it is not happening with MemorySaver() as my checkpointer and I think AsyncPostgresSaver is having trouble saving into my database