Feature Request: support concurrency safe store.put operations

While using the langgraph store for managing short-term and long-term memory (through langmem), one problem encountered is that in a distributed environment (especially with background memory management processes), there may be concurrent workloads attempting to perform “get_store_record → run_logic → update_store_record”, leading to race conditions and potential loss of data/context.

A potential way to make store operations concurrency safe is to introduce an “optimistic locking” mechanism, i.e., in store.put operation, to have it only update a row if the current row value meets the expectation, and return the number of rows updated – 1 indicates success, while 0 indicates a concurrent conflict happened – some other process has updated the same row record while this process is running, thus this current process has to retry.

To be more explicit, if we can:

  1. Expose an additional field expected_value to the store.put/aput interface Storage (LangGraph) | LangChain Reference
  2. Update the SQL PUT OPS query langgraph/libs/checkpoint-postgres/langgraph/store/postgres/base.py at 2d3121a17cb070d980450b404299dacdcd60a6bd · langchain-ai/langgraph · GitHub to be something like below
INSERT INTO store (prefix, key, value, created_at, updated_at, expires_at, ttl_minutes)
VALUES (:prefix, :key, :new_value, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, :expires_at, :ttl_minutes)
ON CONFLICT (prefix, key) DO UPDATE
SET value = EXCLUDED.value,
    updated_at = CURRENT_TIMESTAMP,
    expires_at = EXCLUDED.expires_at,
    ttl_minutes = EXCLUDED.ttl_minutes
-- optimistic check: only update if current matches what the caller expects
WHERE store.value = :expected_value
RETURNING
  prefix, key, value,
  (xmax = 0)  AS was_inserted,
  (xmax <> 0) AS was_updated;

(may need to further convert this query into batch style, make relevant change to query dedupe logic, and only update the embedding table for non-conflict records)

Thanks!

A potential way towards batched query with optimisitc locking (error prone – suggested by AI)

WITH incoming(prefix, key, new_value, expires_at, ttl_minutes, expected_value) AS (
  VALUES
  -- Your Python builds this:
  -- {values_str}
),
updated AS (
  UPDATE store s
  SET value = i.new_value,
      updated_at = CURRENT_TIMESTAMP,
      expires_at = i.expires_at,
      ttl_minutes = i.ttl_minutes
  FROM incoming i
  WHERE s.prefix = i.prefix
    AND s.key = i.key
    AND s.value = i.expected_value     -- optimistic check
  RETURNING s.prefix, s.key, s.value, 'updated'::text AS action
),
inserted AS (
  INSERT INTO store (prefix, key, value, created_at, updated_at, expires_at, ttl_minutes)
  SELECT i.prefix, i.key, i.new_value,
         CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, i.expires_at, i.ttl_minutes
  FROM incoming i
  LEFT JOIN store s
    ON s.prefix = i.prefix AND s.key = i.key
  WHERE s.prefix IS NULL               -- only truly missing rows
  RETURNING prefix, key, value, 'inserted'::text AS action
)
SELECT *
FROM (
  SELECT * FROM updated
  UNION ALL
  SELECT * FROM inserted
) d;