I’m using AsyncPostgreSaver checkpointer and I want to save only new user message without invoking the graph. Checkpoints for this thread_id already exist in checkpoints, checkpoint_blobs, checkpoint_writes tables in PostgreSQL. I just want to append a new message to last checkpoint with “aput” function.
The code below doesn’t work. I’m getting checkpoint with no new message when executing aget_tuple after saving a new checkpoint.
How to do it correctly?
Code:
import asyncio
import uuid
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
from langchain_core.messages import HumanMessage
async def main():
config = {“configurable”: {“thread_id”: “28f6b69f-ad07-4937-9e65-fa6afa53f3f5”}}
async with AsyncPostgresSaver.from_conn_string(DB_URI) as checkpointer:
#await checkpointer.setup()
state = await checkpointer.aget_tuple(config)
checkpoint = state.checkpoint
config = state.config
print("state:", state)
checkpoint["channel_values"]["messages"].append(HumanMessage(content="test", id=str(uuid.uuid4())))
await checkpointer.aput(
config=config,
checkpoint=checkpoint,
metadata={},
new_versions={}
)
state = await checkpointer.aget_tuple(config)
print("state:", state)
if __name__=="__main__":
result = asyncio.run(main())
it is usually not a good practise to mutate checkpoint directly. I’d rather go with the dedicated API, such as aput_writes to append a message to the messages channel.
Checkpointers persist a snapshot plus per-channel writes. Appending to a channel should be expressed as a write so it’s recorded in checkpoint_writes and merged by reducers.
async def main():
config = {"configurable": {"thread_id": "28f6b69f-ad07-4937-9e65-fa6afa53f3f5"}}
async with AsyncPostgresSaver.from_conn_string(DB_URI) as checkpointer:
# First-time only: await checkpointer.setup()
state = await checkpointer.aget_tuple(config)
if state is None:
raise RuntimeError("No existing checkpoint for this thread_id")
# Append a user message via per-channel writes
await checkpointer.aput_writes(
config=state.config, # should reference the target checkpoint_id
writes=[("messages", [HumanMessage(content="test", id=str(uuid.uuid4()))])],
task_id=str(uuid.uuid4()),
task_path="", # keep it empty or "manual" to distinguish graph-external updates
)
# Note: aget_tuple returns the snapshot. Per-channel writes are stored separately
# and merged when creating a new checkpoint (e.g., on next graph step).
updated = await checkpointer.aget_tuple(config)
print("updated:", updated)
Why your snippet didn’t show the new message
Using aput with new_versions={} causes non-primitive channel values (like a list of messages) to be popped into blob_values but not persisted, so you read back a checkpoint without the new message.
Mutating checkpoint["channel_values"]["messages"] and passing that dict to aput does not register a channel write. The saver expects per-channel deltas in writes so they can be stored in checkpoint_writes and merged by reducers on read.