Saving User message manually for AsyncPostgreSaver checkpointer

Hey

I’m using AsyncPostgreSaver checkpointer and I want to save only new user message without invoking the graph. Checkpoints for this thread_id already exist in checkpoints, checkpoint_blobs, checkpoint_writes tables in PostgreSQL. I just want to append a new message to last checkpoint with “aput” function.

The code below doesn’t work. I’m getting checkpoint with no new message when executing aget_tuple after saving a new checkpoint.

How to do it correctly?

Code:

import asyncio
import uuid
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
from langchain_core.messages import HumanMessage

async def main():
    config = {“configurable”: {“thread_id”: “28f6b69f-ad07-4937-9e65-fa6afa53f3f5”}}
    async with AsyncPostgresSaver.from_conn_string(DB_URI) as checkpointer:
        #await checkpointer.setup()  
        state = await checkpointer.aget_tuple(config)
        checkpoint = state.checkpoint
        config = state.config
        print("state:", state)
        checkpoint["channel_values"]["messages"].append(HumanMessage(content="test", id=str(uuid.uuid4())))

        await checkpointer.aput(
            config=config,
            checkpoint=checkpoint,
            metadata={},
            new_versions={}
        )

        state = await checkpointer.aget_tuple(config)
        print("state:", state)

if __name__=="__main__":
    result = asyncio.run(main())

Hi @J3zus

it is usually not a good practise to mutate checkpoint directly. I’d rather go with the dedicated API, such as aput_writes to append a message to the messages channel.

Checkpointers persist a snapshot plus per-channel writes. Appending to a channel should be expressed as a write so it’s recorded in checkpoint_writes and merged by reducers.

async def main():
    config = {"configurable": {"thread_id": "28f6b69f-ad07-4937-9e65-fa6afa53f3f5"}}

    async with AsyncPostgresSaver.from_conn_string(DB_URI) as checkpointer:
        # First-time only: await checkpointer.setup()
        state = await checkpointer.aget_tuple(config)
        if state is None:
            raise RuntimeError("No existing checkpoint for this thread_id")

        # Append a user message via per-channel writes
        await checkpointer.aput_writes(
            config=state.config,  # should reference the target checkpoint_id
            writes=[("messages", [HumanMessage(content="test", id=str(uuid.uuid4()))])],
            task_id=str(uuid.uuid4()),
            task_path="", # keep it empty or "manual" to distinguish graph-external updates
        )

        # Note: aget_tuple returns the snapshot. Per-channel writes are stored separately
        # and merged when creating a new checkpoint (e.g., on next graph step).
        updated = await checkpointer.aget_tuple(config)
        print("updated:", updated)

Why your snippet didn’t show the new message

  • Using aput with new_versions={} causes non-primitive channel values (like a list of messages) to be popped into blob_values but not persisted, so you read back a checkpoint without the new message.
  • Mutating checkpoint["channel_values"]["messages"] and passing that dict to aput does not register a channel write. The saver expects per-channel deltas in writes so they can be stored in checkpoint_writes and merged by reducers on read.

References (official docs and source)

@pawel-twardziak

I am trying something like:

            latest_checkpoint = await checkpointer.aget_tuple(config)
            await checkpointer.adelete_thread(config["configurable"]["thread_id"])
            if latest_checkpoint:

                LOGGER.info(f"Channel versions: {latest_checkpoint.checkpoint.get("channel_versions", {})}")
                await checkpointer.aput(
                    latest_checkpoint.config,
                    latest_checkpoint.checkpoint,
                    latest_checkpoint.metadata,
                    latest_checkpoint.checkpoint.get("channel_versions", {}))
            return response
    except Exception as e:
        LOGGER.error(f"Error invoking agent: {e}")
        raise e

but this is also not working, channel versions are not empty dict.