While using the langgraph store for managing short-term and long-term memory (through langmem), one problem encountered is that in a distributed environment (especially with background memory management processes), there may be concurrent workloads attempting to perform “get_store_record → run_logic → update_store_record”, leading to race conditions and potential loss of data/context.
A potential way to make store operations concurrency safe is to introduce an “optimistic locking” mechanism, i.e., in store.put operation, to have it only update a row if the current row value meets the expectation, and return the number of rows updated – 1 indicates success, while 0 indicates a concurrent conflict happened – some other process has updated the same row record while this process is running, thus this current process has to retry.
To be more explicit, if we can:
- Expose an additional field
expected_valueto the store.put/aput interface Storage (LangGraph) | LangChain Reference - Update the SQL PUT OPS query langgraph/libs/checkpoint-postgres/langgraph/store/postgres/base.py at 2d3121a17cb070d980450b404299dacdcd60a6bd · langchain-ai/langgraph · GitHub to be something like below
INSERT INTO store (prefix, key, value, created_at, updated_at, expires_at, ttl_minutes)
VALUES (:prefix, :key, :new_value, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, :expires_at, :ttl_minutes)
ON CONFLICT (prefix, key) DO UPDATE
SET value = EXCLUDED.value,
updated_at = CURRENT_TIMESTAMP,
expires_at = EXCLUDED.expires_at,
ttl_minutes = EXCLUDED.ttl_minutes
-- optimistic check: only update if current matches what the caller expects
WHERE store.value = :expected_value
RETURNING
prefix, key, value,
(xmax = 0) AS was_inserted,
(xmax <> 0) AS was_updated;
(may need to further convert this query into batch style, make relevant change to query dedupe logic, and only update the embedding table for non-conflict records)
Thanks!