This is the code below i’m currently running:
I dont see any entries in the postgres db
import os
from deepagents import create_deep_agent
from deepagents.backends import CompositeBackend, StateBackend, StoreBackend
from langgraph.store.postgres import PostgresStore
DB_URI = os.environ.get("DATABASE_URL")
if not DB_URI:
raise RuntimeError(
"Please set DATABASE_URL env var to your Postgres connection string, "
"for example: postgresql://user:password@localhost:5432/deepagents_test",
)
# NOTE:
# In langgraph>=1.0, PostgresStore.from_conn_string is a context manager
# factory. We enter it once at module import time and reuse the store.
_store_ctx = PostgresStore.from_conn_string(DB_URI)
STORE: PostgresStore = _store_ctx.__enter__() # type: ignore[assignment]
STORE.setup() # Idempotent; safe to call multiple times
def make_store() -> PostgresStore:
"""Return the already-initialized, module-level Postgres-backed store."""
return STORE
def make_backend(runtime):
"""Route /memories/* to persistent store; everything else is ephemeral.
- default: StateBackend -> per-thread, non-persistent
- /memories/: StoreBackend -> backed by Postgres store (long-term memory)
"""
return CompositeBackend(
default=StateBackend(runtime),
routes={
"/memories/": StoreBackend(runtime),
},
)
def create_agent():
store = make_store()
agent = create_deep_agent(
store=store,
backend=make_backend, # type: ignore[arg-type] # pyright: make_backend matches BackendFactory at runtime
system_prompt=(
"You are a simple test agent. "
"Use the virtual filesystem tools. "
"Anything in /memories/ is persistent long-term memory backed by Postgres."
),
)
return agent
def run_memory_test() -> None:
agent = create_agent()
# First run: write a memory file in thread_1
thread_1 = "thread_1"
print("\n=== First run: write memory in thread_1 ===")
result1 = agent.invoke(
{
"thread_id": thread_1,
"messages": [
{
"role": "user",
"content": (
"Create a file at /memories/user_profile.txt that states "
"'My favorite color is blue.' Then confirm that you saved it."
),
}
],
}
)
last_msg_1 = result1["messages"][-1]
print("Agent final message (thread_1):")
print(last_msg_1["content"])
# Second run: new thread_2, try to read the persistent memory
thread_2 = "thread_2"
print("\n=== Second run: read memory from thread_2 ===")
result2 = agent.invoke(
{
"thread_id": thread_2,
"messages": [
{
"role": "user",
"content": (
"Read /memories/user_profile.txt and tell me what my "
"favorite color is. If you cannot read it, say so."
),
}
],
}
)
last_msg_2 = result2["messages"][-1]
print("Agent final message (thread_2):")
print(last_msg_2["content"])
print(
"\nIf the second response correctly says that your favorite color is blue, "
"then the Postgres-backed long-term memory is working."
)
if __name__ == "__main__":
run_memory_test()