Can Create_supervisor Create_react_agent use checkpointer and store? for across thread memory?

Anyone know if graphconfigpydantic/ runnableconfig can permit or offer able to create new Agents that use persistent memory ?? ie where do I compile the graph for graph.compile(checkpointer=checkpointer, store=in_memory_store) ?

looks like my OAP Agents are created using Create_supervisor Create_react_agent. where I can’t see the graph.compile method

Overview # … Define the graph … # Compile the graph with the checkpointer and store graph = graph.compile(checkpointer=checkpointer, store=in_memory_store)

Hi @johnda98

I am not sure if this is what you are asking for:

class State(TypedDict):
    messages: Annotated[list, add_messages]

agent = create_react_agent(model=..., tools=...)

def agent_node(state: State, config: RunnableConfig, *, store: BaseStore):
    user_id = config["configurable"]["user_id"]
    # Example: read recent memories (namespace by user)
    # memories = store.search((user_id, "memories"), query=state["messages"][-1]["content"], limit=3)
    # ... use memories ...
    result = agent.invoke(state, config=config)
    # Example: write a memory
    # store.put((user_id, "memories"), str(uuid4()), {"memory": "..."})
    return result

builder = StateGraph(State)
builder.add_node("agent", agent_node)
builder.add_edge(START, "agent")
builder.add_edge("agent", END)

graph = builder.compile(checkpointer=InMemorySaver(), store=InMemoryStore())

# Per-run/thread state (conversation state) persists via checkpointer:
config = {"configurable": {"thread_id": "t-1", "user_id": "u-1"}}
  • You can use SqliteSaver/PostgresSaver/ShallowPostgresSaver instead of InMemorySaver and a persistent store implementation for durability.

If you are composing multiple prebuilt agents (including a supervisor)

  • Add each create_react_agent(...) runnable as a node in your orchestrating StateGraph.

  • Compile that orchestrating graph with checkpointer=... and store=... once; all nested agents run inside this persistence context.

1 Like

Thanks Pawel.. not 100% sure.. digesting now..

yes here is the Langgraph method Overview

that is used by the langchain’s team OAP platform.. I deploy to localhost langgraph for both agent and subagent.. here is the sub agent .py …. so I weave the checkpointer and store into the call that creates the sub agent(langgraph) .. I think the langgraph Agent and subagent graph code is out of my reach ?

cheers buddy for your help on this :slight_smile:

hi @johnda98

hmmm actually I am not familiar with the platform - OAP. I might need to play around with that :slight_smile:

If you have any further questions and some code snippets to share, just drop them in here :slight_smile:

this is where the create_react_agent call is made .. but I’m not sure how I can make that graph use the persistence memory.. ie the compile with checkpointer and store .. moreover on where to add the code that will enable the runnable agent to store memories and semantic or regular retievals

possible to take a look at the langchain’s OAP Repo here as it may give you more clues more than me..

I’m not sure if its possible to configure the create_react_agent for this long term persistent memory capability

thanks in advance for your respected opinions :slight_smile:
here it is oap-langgraph-tools-agent/tools_agent/agent.py at main · langchain-ai/oap-langgraph-tools-agent · GitHub

hi @johnda98

the agent definition there:

async def graph(config: RunnableConfig):
    cfg = GraphConfigPydantic(**config.get("configurable", {}))
    tools = []

    supabase_token = config.get("configurable", {}).get("x-supabase-access-token")
    if cfg.rag and cfg.rag.rag_url and cfg.rag.collections and supabase_token:
        for collection in cfg.rag.collections:
            rag_tool = await create_rag_tool(
                cfg.rag.rag_url, collection, supabase_token
            )
            tools.append(rag_tool)

    if cfg.mcp_config and cfg.mcp_config.auth_required:
        mcp_tokens = await fetch_tokens(config)
    else:
        mcp_tokens = None
    if (
        cfg.mcp_config
        and cfg.mcp_config.url
        and cfg.mcp_config.tools
        and (mcp_tokens or not cfg.mcp_config.auth_required)
    ):
        server_url = cfg.mcp_config.url.rstrip("/") + "/mcp"

        tool_names_to_find = set(cfg.mcp_config.tools)
        fetched_mcp_tools_list: list[StructuredTool] = []
        names_of_tools_added = set()

        # If the tokens are not None, then we need to add the authorization header. otherwise make headers None
        headers = (
            mcp_tokens is not None
            and {"Authorization": f"Bearer {mcp_tokens['access_token']}"}
            or None
        )
        try:
            async with streamablehttp_client(server_url, headers=headers) as streams:
                read_stream, write_stream, _ = streams
                async with ClientSession(read_stream, write_stream) as session:
                    await session.initialize()

                    page_cursor = None

                    while True:
                        tool_list_page = await session.list_tools(cursor=page_cursor)

                        if not tool_list_page or not tool_list_page.tools:
                            break

                        for mcp_tool in tool_list_page.tools:
                            if not tool_names_to_find or (
                                mcp_tool.name in tool_names_to_find
                                and mcp_tool.name not in names_of_tools_added
                            ):
                                langchain_tool = create_langchain_mcp_tool(
                                    mcp_tool, mcp_server_url=server_url, headers=headers
                                )
                                fetched_mcp_tools_list.append(
                                    wrap_mcp_authenticate_tool(langchain_tool)
                                )
                                if tool_names_to_find:
                                    names_of_tools_added.add(mcp_tool.name)

                        page_cursor = tool_list_page.nextCursor

                        if not page_cursor:
                            break
                        if tool_names_to_find and len(names_of_tools_added) == len(
                            tool_names_to_find
                        ):
                            break

                    tools.extend(fetched_mcp_tools_list)
        except Exception as e:
            print(f"Failed to fetch MCP tools: {e}")
            pass

    model = init_chat_model(
        cfg.model_name,
        temperature=cfg.temperature,
        max_tokens=cfg.max_tokens,
        api_key=get_api_key_for_model(cfg.model_name, config) or "No token found"
    )

    return create_react_agent(
        prompt=cfg.system_prompt + UNEDITABLE_SYSTEM_PROMPT,
        model=model,
        tools=tools,
        config_schema=GraphConfigPydantic,
    )

The graph function return and instance for create_react_agent invoacation return.
That means you can compile it in a regular manner:

builder = graph(config)
graph = builder.compile(checkpointer=InMemorySaver(), store=InMemoryStore())

Remember:

  • You can use SqliteSaver/PostgresSaver/ShallowPostgresSaver instead of InMemorySaver and a persistent store implementation for durability.

Hi Pawel.. you rock ! :slight_smile:

how does this look ?

from langchain_core.runnables import RunnableConfig

from typing import Optional, List

from pydantic import BaseModel, Field

from langgraph.prebuilt import create_react_agent

from vehicle_technician.utils.tools import create_rag_tool

from langchain.chat_models import init_chat_model

from vehicle_technician.utils.token import fetch_tokens

from mcp.client.streamable_http import streamablehttp_client

from mcp import ClientSession

from langchain_core.tools import StructuredTool

from vehicle_technician.utils.tools import (

wrap_mcp_authenticate_tool,

create_langchain_mcp_tool,

)

from typing import TypedDict, Annotated

from langgraph.graph import StateGraph

from langgraph.checkpoint.memory import InMemorySaver

from langgraph.store import InMemoryStore

from langgraph.prebuilt import create_react_agent

from langchain_core.messages import BaseMessage, AIMessage, HumanMessage, ToolMessage

from operator import itemgetter

class AgentState(TypedDict):

“”“The state of the agent.”“”

messages: Annotated[list[BaseMessage], itemgetter(“messages”)]

UNEDITABLE_SYSTEM_PROMPT = “\nIf the tool throws an error requiring authentication, provide the user with a Markdown link to the authentication page and prompt them to authenticate.”

DEFAULT_SYSTEM_PROMPT = (

“You are a helpful assistant that has access to a variety of tools including a RAG tool for accessing the users document collections on the Vehicle details that they own. Your focus is on the Vehicle details that the Supervising Agent passed to you. Your focus is on the <Engine/Transmission/Brakes/Electrical - remove this for each new Technician leaving only ONE speciality> systems of this specific vehicles and you can use any of your search tools and web search tools focusing on forums and social media, youtube video content searchs, vehicle type owners clubs or any discussion forums of the types on the transmission for this vehicle. You also have shopping tools if you need to search for parts and pricing”

)

class RagConfig(BaseModel):

rag_url: Optional[str] = None

“”“The URL of the rag server”“”

collections: Optional[List[str]] = None

“”“The collections to use for rag”“”

class MCPConfig(BaseModel):

url: Optional[str] = Field(

default=None,

optional=True,

)

“”“The URL of the MCP server”“”

tools: Optional[List[str]] = Field(

default=None,

optional=True,

)

“”“The tools to make available to the LLM”“”

auth_required: Optional[bool] = Field(

default=False,

optional=True,

)

“”“Whether the MCP server requires authentication”“”

class GraphConfigPydantic(BaseModel):

model_name: Optional[str] = Field(

default=“openai:gpt-4o”,

metadata={

“x_oap_ui_config”: {

“type”: “select”,

“default”: “openai:gpt-4o”,

“description”: “The model to use in all generations”,

“options”: [

                {

“label”: “Claude 3.7 Sonnet”,

“value”: “anthropic:claude-3-7-sonnet-latest”,

                },

                {

“label”: “Claude 3.5 Sonnet”,

“value”: “anthropic:claude-3-5-sonnet-latest”,

                },

                {"label": "GPT 4o", "value": "openai:gpt-4o"},

                {"label": "GPT 4o mini", "value": "openai:gpt-4o-mini"},

                {"label": "GPT 4.1", "value": "openai:gpt-4.1"},

            \],

        }

    },

)

temperature: Optional[float] = Field(

default=0.7,

metadata={

“x_oap_ui_config”: {

“type”: “slider”,

“default”: 0.7,

“min”: 0,

“max”: 2,

“step”: 0.1,

“description”: “Controls randomness (0 = deterministic, 2 = creative)”,

        }

    },

)

max_tokens: Optional[int] = Field(

default=4000,

metadata={

“x_oap_ui_config”: {

“type”: “number”,

“default”: 4000,

“min”: 1,

“description”: “The maximum number of tokens to generate”,

        }

    },

)

system_prompt: Optional[str] = Field(

default=DEFAULT_SYSTEM_PROMPT,

metadata={

“x_oap_ui_config”: {

“type”: “textarea”,

“placeholder”: “Enter a prompt for this Technician…”,

“description”: f"<Delete ALL the text in the brackets below EXCEPT the Vehicle Subsystem(eg. Brakes) that you want this Tech to focus on>. Your prompt below will always be included at the end of the system prompt:\n—{UNEDITABLE_SYSTEM_PROMPT}\n—",

“default”: DEFAULT_SYSTEM_PROMPT,

        }

    },

)

mcp_config: Optional[MCPConfig] = Field(

default=None,

optional=True,

metadata={

“x_oap_ui_config”: {

“type”: “mcp”,

# Here is where you would set the default tools.

# “default”: {

# “tools”: [“Math_Divide”, “Math_Mod”]

# }

        }

    },

)

rag: Optional[RagConfig] = Field(

default=None,

optional=True,

metadata={

“x_oap_ui_config”: {

“type”: “rag”,

# Here is where you would set the default collection. Use collection IDs

# “default”: {

# “collections”: [

# “fd4fac19-886c-4ac8-8a59-fff37d2b847f”,

# “659abb76-fdeb-428a-ac8f-03b111183e25”,

# ]

# },

        }

    },

)

async def graph(config: RunnableConfig):

cfg = GraphConfigPydantic(**config.get(“configurable”, {}))

tools =

supabase_token = config.get(“configurable”, {}).get(“x-supabase-access-token”)

if cfg.rag and cfg.rag.rag_url and cfg.rag.collections and supabase_token:

for collection in cfg.rag.collections:

rag_tool = await create_rag_tool(

cfg.rag.rag_url, collection, supabase_token

        )

tools.append(rag_tool)

if cfg.mcp_config and cfg.mcp_config.auth_required:

mcp_tokens = await fetch_tokens(config)

else:

mcp_tokens = None

if (

cfg.mcp_config

and cfg.mcp_config.url

and cfg.mcp_config.tools

and (mcp_tokens or not cfg.mcp_config.auth_required)

):

server_url = cfg.mcp_config.url.rstrip(“/”) + “/mcp”

tool_names_to_find = set(cfg.mcp_config.tools)

fetched_mcp_tools_list: list[StructuredTool] =

names_of_tools_added = set()

# If the tokens are not None, then we need to add the authorization header. otherwise make headers None

headers = (

mcp_tokens is not None

and {“Authorization”: f"Bearer {mcp_tokens[‘access_token’]}"}

or None

    )

try:

async with streamablehttp_client(server_url, headers=headers) as streams:

read_stream, write_stream, _ = streams

async with ClientSession(read_stream, write_stream) as session:

await session.initialize()

page_cursor = None

while True:

tool_list_page = await session.list_tools(cursor=page_cursor)

if not tool_list_page or not tool_list_page.tools:

break

for mcp_tool in tool_list_page.tools:

if not tool_names_to_find or (

mcp_tool.name in tool_names_to_find

and mcp_tool.name not in names_of_tools_added

                        ):

langchain_tool = create_langchain_mcp_tool(

mcp_tool, mcp_server_url=server_url, headers=headers

                            )

fetched_mcp_tools_list.append(

wrap_mcp_authenticate_tool(langchain_tool)

                            )

if tool_names_to_find:

names_of_tools_added.add(mcp_tool.name)

page_cursor = tool_list_page.nextCursor

if not page_cursor:

break

if tool_names_to_find and len(names_of_tools_added) == len(

tool_names_to_find

                    ):

break

tools.extend(fetched_mcp_tools_list)

except Exception as e:

print(f"Failed to fetch MCP tools: {e}")

pass

model = init_chat_model(

cfg.model_name,

temperature=cfg.temperature,

max_tokens=cfg.max_tokens,

)

# 1. Create the ReAct agent runnable

agent_runnable = create_react_agent(

prompt=cfg.system_prompt + UNEDITABLE_SYSTEM_PROMPT,

model=model,

tools=tools,

config_schema=GraphConfigPydantic,

)

# 2. Build the graph

builder = StateGraph(AgentState)

# 3. Define the nodes

builder.add_node(“agent”, agent_runnable)

# 4. Define the edges

builder.add_edge(START, “agent”)

builder.add_edge(“agent”, END)

# 5. Compile the graph with memory

return builder.compile(checkpointer=InMemorySaver(), store=InMemoryStore())

I think i am biting off more than i can chew.. trying to modify Langchain’s OAP to have its Agent frame being able to have a long term across session memory feature.

I think i need to step back a bit and keep learning more of the OAP repo and the langgraph SDK

1 Like