Inter Communication between agents using create_agent as a supervisor and as a tools

Context not shared across sub-agents despite same thread_id

Setup

A supervisor agent created with create_agent
Multiple sub-agents created with create_agent (patient_agent, appt_agent, book_appointment)

You expose each sub-agent to the supervisor as a tool, and you pass the same thread_id (from runtime.config["configurable"]["thread_id"]) to each sub-agent.

Behavior Goal

User asks for patient info → patient agent may ask for patient id / mobile / name+dob → user provides it → patient agent fetches and responds.

Later in the same thread, user asks for appointment details → appointment agent should reuse the patient identifier already provided, and not ask again.

Main Issue

Context is not shared across sub-agents, even though thread id is the same.

patient_agent = patient_information_retrieval_agent(
    patient_info_tools, language_id, checkpointer
)

appt_agent = appointment_management_agent(
    appointment_tools, language_id, checkpointer
)

book_appointment = book_appointment_agent(
    book_appointment_tools, language_id, checkpointer
)

# ✅ Tools that call the sub-agent instances (closures)

@tool(return_direct=True)
async def fetch_patient_information_tool(request: str, runtime: ToolRuntime) -> str:
    """
    Use this tool to retrieve patient information such as name, mobile number,
    date of birth, or address.

    This tool handles ONLY patient personal information queries. Call this when
    the user asks about their personal details or provides a patient ID to retrieve
    information.

    Args:
        request: The user's request for patient information
    """
    root_thread_id = runtime.config["configurable"]["thread_id"]
    logger.info(f"Root Thread ID: {root_thread_id}")

    patient_info_config = {
        "configurable": {"thread_id": root_thread_id},
        # "metadata": {"sub_agent": "patient_info", "tool_name": "get_patient_info_tool"}
    }

    logger.info(f"Using patient_info_config: {patient_info_config}")

    result = await patient_agent.ainvoke(
        {"messages": [{"role": "user", "content": request}]},
        config=patient_info_config
    )

    response = extract_last_ai_message(result)
    logger.info(f"✅ get_patient_info_tool returning: {response[:200]}…")
    return response

@tool(return_direct=True)
async def manage_appointment_tool(request: str, runtime: ToolRuntime) -> str:
    """
    Use this tool to manage appointments including viewing, booking, canceling,
    sending appointment details via message or whatsapp, or sending project location
    via message or whatsapp.

    This tool handles ALL appointment-management tasks. Call this when the user asks
    about appointments details, sending appointment details via message or whatsapp,
    sending project location via message or whatsapp, or wants to cancel or confirm an
    appointment.

    Args:
        request: The user's appointment-related request
    """
    root_thread_id = runtime.config["configurable"]["thread_id"]

    appointment_management_config = {
        "configurable": {"thread_id": root_thread_id},
    }

    result = await appt_agent.ainvoke(
        {"messages": [{"role": "user", "content": request}]},
        config=appointment_management_config
    )

    response = extract_last_ai_message(result)
    logger.info(f"✅ manage_appointment_tool returning: {response[:200]}…")
    return response

@tool(return_direct=True)
async def book_appointment_tool(request: str, runtime: ToolRuntime) -> str:
    """
    Use this tool to book new appointments including selecting projects, clinics,
    doctors, and time slots.

    This tool handles ALL new appointment booking tasks. Call this when the user
    wants to book an appointment.

    Args:
        request: The user's booking-related request
    """
    root_thread_id = runtime.config["configurable"]["thread_id"]

    book_appointment_config = {
        "configurable": {"thread_id": root_thread_id},
    }

    result = await book_appointment.ainvoke(
        {"messages": [{"role": "user", "content": request}]},
        config=book_appointment_config
    )

    response = extract_last_ai_message(result)
    logger.info(f"✅ book_appointment_tool returning: {response[:200]}…")
    return response

# Log registered tools
logger.info(
    "📋 Registering supervisor tools: "
    "fetch_patient_information_tool, manage_appointment_tool, book_appointment_tool"
)

supervisor = create_agent(
    model=model,
    system_prompt=supervisor_prompt,
    tools=[fetch_patient_information_tool, manage_appointment_tool, book_appointment_tool],
    checkpointer=checkpointer
)

return supervisor, mcp, checkpointer_context

def patient_information_retrieval_agent(tools, language_id, checkpointer=None):
    """
    Create a patient information retrieval agent.
    """
    logger.info(f"Creating patient information retrieval agent with checkpointer: {checkpointer}")

    prompt_folder = "english_prompts" if language_id == 2 else "arabic_prompts"
    prompt_path = os.path.join(
        os.path.dirname(__file__),
        "..", "prompts", prompt_folder,
        "patient_information_retrieval_prompt.yml"
    )

    logger.info(f"Loading patient information retrieval prompt with language id: {language_id}")

    with open(prompt_path, "r", encoding="utf-8") as f:
        prompts = yaml.safe_load(f)

    return create_agent(
        model=model,
        tools=tools,
        system_prompt=prompts["patient_information_retrieval_prompt"].format(
            LANGUAGE_ID=language_id
        ),
        name="patient_information_retrieval_agent",
        checkpointer=checkpointer
    )

hi @razaullah

could you please reformat your post and use code block? Right now it’s quite hard to read it or copy your code.

Can you look into it now.

Yes. It’s even worse now :smiley: now everything inside code block - only relevant code/text/json etc.
Draw insporation from there please → Stopping endpoint for deep agents - #2 by pawel-twardziak

1 Like

Hi @razaullah , so the issue isn’t the thread_id but that each graph keeps state in its own checkpoint namespace. Give all agents the same checkpointer instance and the same checkpoint_ns, and pass that config on every call (including the supervisor). Then they share state and the appointment agent will reuse the patient identifier learned earlier.

from langgraph.checkpoint.memory import MemorySaver

checkpointer = MemorySaver()  # swap in your real backend in prod
SHARED_NS = "patient_and_appointment"

def _shared_config(thread_id: str):
    """Use the same thread_id + checkpoint_ns for every agent call."""
    return {"configurable": {"thread_id": thread_id, "checkpoint_ns": SHARED_NS}}

I assume that your create agent functions look more or less as follows:

def patient_information_retrieval_agent(tools, language_id, checkpointer=None):
    return create_agent(
        model=model,
        tools=tools,
        system_prompt="...",  # your prompt here
        name="patient_information_retrieval_agent",
        checkpointer=checkpointer,
    )

# (same for appointment_management_agent and book_appointment_agent)

Now you need to build sub-agents with the same checpointer:

patient_agent = patient_information_retrieval_agent(patient_info_tools, language_id, checkpointer)
appt_agent = appointment_management_agent(appointment_tools, language_id, checkpointer)
book_agent = book_appointment_agent(book_appointment_tools, language_id, checkpointer)

Wrap the tools so every call uses the shared config:

@tool(return_direct=True)
async def fetch_patient_information_tool(request: str, runtime: Any) -> str:
    """Your docstring ..."""
    thread_id = runtime.config["configurable"]["thread_id"]
    result = await patient_agent.ainvoke(
        {"messages": [{"role": "user", "content": request}]},
        config=_shared_config(thread_id),
    )
    return extract_last_ai_message(result)


@tool(return_direct=True)
async def manage_appointment_tool(request: str, runtime: Any) -> str:
    """Your docstring ..."""
    thread_id = runtime.config["configurable"]["thread_id"]
    result = await appt_agent.ainvoke(
        {"messages": [{"role": "user", "content": request}]},
        config=_shared_config(thread_id),
    )
    return extract_last_ai_message(result)


@tool(return_direct=True)
async def book_appointment_tool(request: str, runtime: Any) -> str:
    """Your docstring ..."""
    thread_id = runtime.config["configurable"]["thread_id"]
    result = await book_agent.ainvoke(
        {"messages": [{"role": "user", "content": request}]},
        config=_shared_config(thread_id),
    )
    return extract_last_ai_message(result)

Supervisor also uses the same checkpointer:

def build_supervisor():
    supervisor = create_agent(
        model=model,
        system_prompt=supervisor_prompt,
        tools=[
            fetch_patient_information_tool,
            manage_appointment_tool,
            book_appointment_tool,
        ],
        checkpointer=checkpointer,
        name="supervisor",
    )
    return supervisor

Example run (note the shared config):

supervisor = build_supervisor()
thread_id = "abc123"
supervisor.run(
    {"messages": [{"role": "user", "content": "I want to see my patient info"}]},
    config=_shared_config(thread_id),
)

Now all agents read/write the same checkpoint bucket (thread_id + checkpoint_ns), so the appointment agent can reuse the patient identifier instead of asking again.


Btw, as @pawel-twardziak suggested, please follow his recommendation on how to properly write posts in the future :slight_smile:

1 Like

Thank you very much for the suggestions for the code formatting, I will keep it in mind next time. I tried checkpoint_ns but it is not storing the values in the database. when I run the agent, checkpoint_ns values are not storing in the database.

1 Like

@razaullah The namespace flag only works when a real checkpointer is wired in on both the agents and the supervisor. In the snippet that I provided I used MemorySaver as an exmaple, so nothing ever hits the DB and checkpoint_ns has no effect. Swap in your database-backed saver and pass the shared configurable block on every call.

Here is an example with SqliteSaver:

from langgraph.checkpoint.sqlite import SqliteSaver

# one shared checkpointer for every agent + supervisor
checkpointer = SqliteSaver.from_conn_string("sqlite:///./checkpoints.db")
checkpointer.create_schema()  # run once at startup

SHARED_NS = "patient_and_appointment"

def _shared_config(thread_id: str):
    return {"configurable": {"thread_id": thread_id, "checkpoint_ns": SHARED_NS}}

patient_agent = patient_information_retrieval_agent(
    patient_info_tools, language_id, checkpointer
)
appt_agent = appointment_management_agent(appointment_tools, language_id, checkpointer)
book_agent = book_appointment_agent(book_appointment_tools, language_id, checkpointer)

def build_supervisor():
    return create_agent(
        model=model,
        system_prompt=supervisor_prompt,
        tools=[
            fetch_patient_information_tool,
            manage_appointment_tool,
            book_appointment_tool,
        ],
        checkpointer=checkpointer,  # same instance
        name="supervisor",
    )

# when running
supervisor = build_supervisor()
supervisor.invoke(
    {"messages": [{"role": "user", "content": "…"}]},
    config=_shared_config("abc123"),
)

For proper database checkpointer you can look up here: Persistence - Docs by LangChain

If you’re using Postgres/Redis/etc., instantiate the matching saver there instead of SqliteSaver, but keep the same pattern: one shared saver, same checkpoint_ns, and always pass _shared_config (including inside tool calls). That will make the checkpoints land in your database under the chosen namespace.

1 Like

Thank you @razaullah let me know, when it’s formatted correctly :slight_smile:

One of my sub-agent is defined as follows;

async def patient_information_retrieval_agent(tools, language_id, checkpointer=None):

    """
    Create a patient information retrieval agent.
    """
    prompt_folder = 'english_prompts' if language_id == 2 else 'arabic_prompts'
    prompt_path = os.path.join(os.path.dirname(__file__), '..', 'prompts', prompt_folder, 'patient_information_retrieval_prompt.yml')

    with open(prompt_path, 'r', encoding='utf-8') as f:
        prompts = yaml.safe_load(f)
    return create_agent(
        model=model,
        tools=tools,        system_prompt=prompts['patient_information_retrieval_prompt'].format(LANGUAGE_ID=language_id),
        name="patient_information_retrieval_agent", 
        checkpointer=checkpointer
    )

The tool I defined which uses the above sub-agent is as follows

@tool(return_direct=True)
async def fetch_patient_information_tool(request: str, runtime: ToolRuntime) -> str:

"""Use this tool to retrieve patient information such as name, mobile number, date of birth, or address. 

This tool handles ONLY patient personal information queries. Call this when the user asks about their personal details or provides a patient ID to retrieve information.
            Args:
                request: The user's request for patient information
            """
            root_thread_id = runtime.config["configurable"]["thread_id"]
            patient_info_config = {
                "configurable": {"thread_id": root_thread_id}
            }
            result = await patient_agent.ainvoke(
                {"messages": [{"role": "user", "content": request}]},
                config=patient_info_config
            )
            response = extract_last_ai_message(result)
            return response

My Supervisor is defined as follows:

supervisor = create_agent(
            model=model,
            system_prompt=supervisor_prompt,
            tools=[fetch_patient_information_tool, manage_appointment_tool, book_appointment_tool],
            checkpointer=checkpointer,
            name="supervisor_agent",
        )

I am passing the same checkpointer to all the agents, but the when the sub_agents executes the tools, those tools data are not persisted in the database. As I have added @tool(return_direct=True) to my tools, I don’t know if it is due to this. When I return the sub_agents response to the supervisor it is modifying the response but I need the response to be the same as of the sub_agent return response.
@simon.budziak

@razaullah The missing piece is the namespace on every call. Each graph/agent gets its own default checkpoint_ns (graph id), so your supervisor and sub-agents are writing to different namespaces even though they share the same checkpointer instance. return_direct=True doesn’t block persistence.

The solution is to use one checkpointer and one namespace everywhere:

from langgraph.checkpoint.sqlite import SqliteSaver  # or your saver

SHARED_NS = "patient_and_appointment"
checkpointer = SqliteSaver.from_conn_string("sqlite:///./checkpoints.db")
checkpointer.create_schema()  # run once at startup

def shared_config(thread_id: str):
    return {"configurable": {"thread_id": thread_id, "checkpoint_ns": SHARED_NS}}

# agent factory (can be sync btw)
def patient_information_retrieval_agent(tools, language_id, checkpointer=None):
    ...
    return create_agent(
        model=model,
        tools=tools,
        system_prompt=prompts["patient_information_retrieval_prompt"].format(
            LANGUAGE_ID=language_id
        ),
        name="patient_information_retrieval_agent",
        checkpointer=checkpointer,
    )

# make sure to pass it when creating an agent
patient_agent = patient_information_retrieval_agent(..., checkpointer=checkpointer)

@tool(return_direct=True)
async def fetch_patient_information_tool(request: str, runtime: ToolRuntime) -> str:
    thread_id = runtime.config["configurable"]["thread_id"]
    cfg = shared_config(thread_id)
    result = await patient_agent.ainvoke(
        {"messages": [{"role": "user", "content": request}]},
        config=cfg,
    ) # here, in your case, you are still using patient_info_config that is defined inside the agent, you should use a config that is shared
    return extract_last_ai_message(result)

supervisor = create_agent(
    model=model,
    system_prompt=supervisor_prompt,
    tools=[fetch_patient_information_tool, manage_appointment_tool, book_appointment_tool],
    checkpointer=checkpointer,
    name="supervisor_agent",
)

# invoke supervisor with the same config
supervisor.invoke(..., config=shared_config("root-thread-id"))

So to sum up:

  • Every nested call (tools → sub-agents) must pass checkpoint_ns + thread_id in SHARED configurable.
  • Keep using the same checkpointer instance for all agents/supervisor; make sure the saver has its schema created as mine in the above example.
  • return_direct=True only controls how the supervisor replies; to avoid supervisor rewrites, add a line to its prompt like “When a tool returns a result, forward it verbatim as the final answer.”

If you have further questions, please let me know.

@simon.budziak thanks for the suggestions, I tried that approach but the checkpoint_ns values are not inserted into the database, that is still empty string.

@razaullah could you please share your code, I will try to run it locally.

Dear @simon.budziak , my tools are defined with a local internet connection that uses firewall and is not accessible from outside that network. Can you try a general setup and look if checkpoint_ns values are inserted correctly in a multiagent setup using create_agent as a tools in the supervisor.

Hi @razaullah huge favor again :slight_smile: please, format the main message. It’s not only for us guys, but also for the others so they can make use of this post, especially when it’s solved. So please, format the main message. Thanks in advance :slight_smile:

1 Like

@pawel-twardziak Sure, I will format that. Thanks

1 Like

Thank you so much @simon.budziak I’ve successfully been able to pass the memory in sub-agent using “shared_config” – really appreciate your help and guidance. Your advice was spot on! I’ll definitely reach out again if I run into another problem. Thanks again! .

@pawel-twardziak I have formatted the code. kindly check, also Thank you so much for faster inputs..

2 Likes

Thanks @razaullah, it’s formatted correctly now :slight_smile:
The issue is solved! Awesome :heart: Now, @razaullah please mark the post as Solved picking the right answer from @simon.budziak - so the others can follow the solution if having the same problem. Thanks and post anytime you face problems with LangChain/LangGraph :slight_smile:

1 Like

@pawel-twardziak @simon.budziak Unfortunately after completely testing, I am getting the same issue, that context is not storing when I am returning the response from the tool’s directly (like sub_agents). Although I have used the following code in my app. When I return the response to the supervisor, then I am able to persist the memory but the agent is modifying the response.

SHARED_NS = "patient_and_appointment"
def shared_config(thread_id: str):
    config = {"configurable": {"thread_id": thread_id, "checkpoint_ns": SHARED_NS}}
    logger.info(f"Using shared_config: {config}")
    return config

@razaullah could you verify those 4 things:

  1. One shared checkpointer object is passed to supervisor AND every sub-agent at creation (don’t re-create per request).
  2. Every invoke, including those inside tools, uses both the thread_id and the same checkpoint_ns.
  3. Supervisor itself is created with that same checkpointer and invoked with the shared config.
  4. When calling a sub-agent from a tool, extend the incoming runtime.config instead of rebuilding it, so you don’t drop fields:
SHARED_NS = "patient_and_appointment"

def shared_config(runtime):
    cfg = runtime.config.get("configurable", {}) if runtime else {}
    return {"configurable": {**cfg, "checkpoint_ns": SHARED_NS}}

@tool(return_direct=True)
async def fetch_patient_information_tool(request: str, runtime):
    result = await patient_agent.ainvoke(
        {"messages": [{"role": "user", "content": request}]},
        config=shared_config(runtime),
    )
    return extract_last_ai_message(result)

What you also can do to make sure the config and checkpointer are passed correctly is to log the config and id(checkpointer) at each invoke to confirm you see the same thread_id, checkpoint_ns, and checkpointer object everywhere

Let me know if you checked all of it and the issue still persist.

I am trying to share my full code as follows:

async def build_supervisor(language_id: int = 2):
    """
    Build supervisor once per session.
    Returns: supervisor, mcp_client, checkpointer_context
    """
    logger.info("Building supervisor with language_id=%d", language_id)

    # Load & format prompt
    supervisor_prompt = load_supervisor_prompt(language_id)

    # MCP
    mcp = MCPClient()
    checkpointer, checkpointer_context = await init_checkpointer()

    try:
        mcp_session = await mcp.connect_via_http()
        tools = await load_mcp_tools(mcp_session)

        patient_info_tools = [
            tool for tool in tools
            if tool.name in ["mssql_get_patient_info", "mssql_get_patient_by_name_dob"]
        ]
        appointment_tools = [
            tool for tool in tools
            if tool.name in ["mssql_get_upcoming_appointment", "mssql_get_patient_by_name_dob", "mssql_get_patient_info",
                             "mssql_get_Projects_from_Location", "mssql_confirm_appointment", "mssql_cancel_appointment",
                             "api_send_ProjectLocationSms", "api_send_ProjectLocationWhatsapp", "api_send_AppointmentWhatsapp", 
                             "api_send_AppointmentSms"]
        ]
        book_appointment_tools = [
            tool for tool in tools
            if tool.name in ["mssql_get_patient_info", "mssql_get_patient_by_name_dob","mssql_get_Projects_from_Location", "mssql_get_clinics_for_project", "mssql_get_TopFive_availableDoctors_with_slots_byDate",
                             "mssql_get_TopFive_nearestClinic_have_doctorSlots", "api_book_Appointment"]
                        ]

        # ✅ IMPORTANT: build agent INSTANCES here (these have invoke/ainvoke)
        patient_agent = await patient_information_retrieval_agent(patient_info_tools, language_id, checkpointer)
        appt_agent = await appointment_management_agent(appointment_tools, language_id, checkpointer)
        book_appointment = await book_appointment_agent(book_appointment_tools, language_id, checkpointer)

        @tool(return_direct=True)
        async def fetch_patient_information_tool(request: str, runtime: ToolRuntime) -> str:
            """Use this tool to retrieve patient information such as name, mobile number, date of birth, or address. 
            This tool handles ONLY patient personal information queries. Call this when the user asks about their 
            personal details or provides a patient ID to retrieve information.
            
            Args:
                request: The user's request for patient information
            """
            logger.info(f"🔧 get_patient_info_tool called with request: {request}")
            # history_text = messages_to_text(runtime.state["messages"])
            # logger.info(f"History text for get_patient_info_tool: {history_text}")
            root_thread_id = runtime.config["configurable"]["thread_id"]
            logger.info(f"Root Thread ID: {root_thread_id}")
            result = await patient_agent.ainvoke(
                {"messages": [{"role": "user", "content": request}]},
                # config=shared_config(root_thread_id)
                config=shared_config(runtime)
            )
            
           response = extract_last_ai_message(result)
           return response

My patient information retrieval agent is defined as follows:

from langchain.agents import create_agent
import os
import yaml
from utils.llm_model import model   
from utils.logger import get_logger

logger = get_logger(__name__)

async def patient_information_retrieval_agent(tools, language_id, checkpointer=None):
    """
    Create a patient information retrieval agent.
    """
    logger.info(f"Creating patient information retrieval agent with checkpointer: {checkpointer}")
    prompt_folder = 'english_prompts' if language_id == 2 else 'arabic_prompts'
    prompt_path = os.path.join(os.path.dirname(__file__), '..', 'prompts', prompt_folder, 'patient_information_retrieval_prompt.yml')
    logger.info(f"Loading patient information retrieval prompt from with language id: {language_id}")
    
    with open(prompt_path, 'r', encoding='utf-8') as f:
        prompts = yaml.safe_load(f)

    return create_agent(
        model=model,
        tools=tools,
        system_prompt=prompts['patient_information_retrieval_prompt'].format(LANGUAGE_ID=language_id),
        name="patient_information_retrieval_agent", 
        checkpointer=checkpointer
    )

I am calling the build_supervisor as follows:

async def chat_loop():
    session_id = "1223"
    language_id = 2
    
    # Build supervisor once and reuse it
    supervisor = None
    mcp = None
    checkpointer_context = None
    
    try:
        supervisor, mcp, checkpointer_context = await build_supervisor(language_id=language_id)
        
        while True:
            try:
                user_prompt = input("User Prompt is: ").strip()
            except (EOFError, KeyboardInterrupt):
                print("\nGoodbye!")
                break

            if not user_prompt:
                continue
            if user_prompt.lower() in ("/exit", "exit", "quit", "q"):
                print("Goodbye!")
                break
            
            # Show conversation history
            # if user_prompt.lower() in ("/history", "history"):
            #     await get_conversation_history(supervisor, session_id)
            #     continue

            try:
                config = {"configurable": {"thread_id": session_id, "checkpoint_ns": SHARED_NS}}
                # config = shared_config(session_id)
                print(f"Config being used: {config}")
                messages = [{"role": "user", "content": user_prompt}]
                result = await supervisor.ainvoke({"messages": messages}, config=config)
                logger.info(f"🤖 Supervisor agent invoked successfully.{result}")
                # reply = extract_last_tool_message(result)
                reply = extract_last_ai_message(result)
                print(f"AI Response: {reply}")
                # return reply
            except Exception as e:
                logger.exception("Error in process_text: %s", e)
                print("AI: (error; check logs)")
                continue
    finally:
        # Cleanup
        if mcp:
            await mcp.close()
            logger.info("MCP connection closed.")
        if checkpointer_context:
            try:
                await checkpointer_context.__aexit__(None, None, None)
                logger.info("PostgreSQL checkpointer context closed.")
            except Exception as e:
                logger.error(f"Error closing checkpointer context: {e}")
 
async def main() -> None:
    await chat_loop()

Currently I am running it from the terminal.

if __name__ == "__main__":
    # Application run command
    # python -m app.agents.updated_process_trascribe_text_agen
    # Fix Windows event loop policy for AsyncPostgresSaver (must be before asyncio.run)
    if os.name == 'nt':  # Windows
        asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
    
    asyncio.run(main())

My shared configs are defined as follows:

SHARED_NS = "patient_and_appointment"

def shared_config(runtime):
    cfg = runtime.config.get("configurable", {}) if runtime else {}
    return {"configurable": {**cfg, "checkpoint_ns": SHARED_NS}}

I think it will now make sense. @simon.budziak