How to build a custom subagent which by itself a langgraph graph which have its own schema?

does it need to inhert from the Agentstate ?
should it have at least a field in the schema the same as the main agent orchestrator

can i utilize the ui in a way which the start is not a baseMessage type ?

hi @mohamedsheded

yes, you can build a custom subagent as its own LangGraph graph with its own schema.

But there are two different cases, and the rules differ:

  1. pure LangGraph parent/subgraph composition
  2. Deep Agents CompiledSubAgent integration

1) Does custom subagent state need to inherit from AgentState?

It depends on how you build that subagent:

  • If you build subagent with langchain.agents.create_agent(...):
    • Your custom schema should extend AgentState (TypedDict-based)
    • This is explicitly documented in LangChain docs and reference
  • If you build subagent as a raw StateGraph(...) manually:
    • It does not have to inherit AgentState
    • It can be any TypedDict schema you define

So: inheriting AgentState is required for the create_agent path, not for every possible custom StateGraph.

2) Should schema share at least one field with main orchestrator?

Again, depends on composition style:

  • In LangGraph, direct subgraph-as-node wiring:
    • Parent/subgraph should share state keys for automatic channel passing
  • In LangGraph, subgraph called inside a wrapper node:
    • No shared key is required; you map input/output manually
  • In Deep Agents CompiledSubAgent:
    • Your runnable must return state containing a messages key
    • Deep Agents middleware validates this and raises an error if missing

So in Deep Agents, the practical minimum common contract is the messages channel for returning results.

3) Can UI start with something that is not BaseMessage?

Yes, with nuance:

  • For create_agent / Deep Agents execution, state must include messages
  • But those messages can be passed as dicts ({"role": "...", "content": "..."}), not necessarily instantiated BaseMessage objects
  • If your UI has a custom payload shape, you can keep that on the UI side and convert to LangChain message format at the agent boundary

So the UI does not need to construct BaseMessage classes directly, but the backend agent interface still expects a messages state entry.


If you want a custom Deep Agents subagent with private schema, a safe pattern is:

  • Build a custom graph with your private fields.
  • Include messages in that graph state.
  • Return final result in the last message.
  • Wrap it as CompiledSubAgent and pass to create_deep_agent(...).

That keeps your subagent autonomous while still satisfying Deep Agents orchestration requirements.

hi @pawel-twardziak

thank you very much for that information
i’ve build a wrapper over CompiledSubAgent

"""
Matching subagent wrapper for integrating CampaignWorkflow with deep agents.

This module provides a CompiledSubAgent that wraps the campaign matching workflow,
bridging the state schema differences between DeepMarketingAgentState and
CampaignWorkflowState.
"""

from __future__ import annotations

import logging
from typing import Annotated, Any, Dict, List, Optional, TypedDict

from langchain_core.messages import AIMessage
from langgraph.graph.message import add_messages
from langgraph.graph import StateGraph, END, START

from src.domain.campaign.state import CampaignBriefStructured
from src.services.campaign.workflow import CampaignWorkflow

logger = logging.getLogger(__name__)


class MatchingSubagentState(TypedDict):
    """State schema for the matching subagent wrapper.
    
    This state bridges the parent agent (DeepMarketingAgentState) with the
    CampaignWorkflow. The messages key is required by CompiledSubAgent.
    """
    
    messages: Annotated[List, add_messages]
    camp_brief_structured: Optional[CampaignBriefStructured]
    matching_results: Optional[Dict[str, Any]]


def _run_matching_workflow(state: MatchingSubagentState) -> Dict[str, Any]:
    """
    Execute the campaign matching workflow and return results.
    
    This node:
    1. Reads camp_brief_structured from parent state
    2. Instantiates CampaignWorkflow (connects to Qdrant via env vars)
    3. Runs the matching workflow
    4. Returns results in matching_results and a summary message
    
    Args:
        state: The current subagent state
        
    Returns:
        Dictionary with matching_results and messages updates
    """
    brief = state.get("camp_brief_structured")
    
    if brief is None:
        error_msg = "camp_brief_structured is not set in state. Cannot run matching workflow."
        logger.error(error_msg)
        return {
            "matching_results": {"success": False, "error": error_msg},
            "messages": [AIMessage(content=error_msg)]
        }
    
    # Workflow.run() expects a dict; state may have Pydantic model or dict (e.g. from LangSmith)
    brief_payload = brief if isinstance(brief, dict) else brief.model_dump()
    
    try:
        logger.info("Initializing CampaignWorkflow for matching subagent")
        workflow = CampaignWorkflow()
        
        logger.info("Running campaign matching workflow")
        results = workflow.run(brief_payload)
        
        if not results.get("success", False):
            error_msg = f"Matching workflow failed: {results.get('error', 'Unknown error')}"
            logger.error(error_msg)
            return {
                "matching_results": results,
                "messages": [AIMessage(content=error_msg)]
            }
        
        total_creators = results.get("total_creators_found", 0)
        creator_ids = results.get("creator_ids", [])
        processing_time = results.get("processing_time", 0)
        
        summary = (
            f"Creator matching completed successfully. "
            f"Found {total_creators} matching creators in {processing_time:.2f}s. "
            f"Returned {len(creator_ids)} creator IDs."
        )
        
        logger.info(summary)
        
        return {
            "matching_results": results,
            "messages": [AIMessage(content=summary)]
        }
        
    except Exception as e:
        error_msg = f"Matching workflow execution failed: {str(e)}"
        logger.exception(error_msg)
        return {
            "matching_results": {"success": False, "error": str(e)},
            "messages": [AIMessage(content=error_msg)]
        }


def _build_matching_wrapper_graph():
    """Build the matching subagent wrapper graph.
    
    This is a minimal graph with a single node that adapts between the
    parent agent's state schema and the CampaignWorkflow.
    
    Returns:
        Compiled StateGraph ready for use as a CompiledSubAgent runnable
    """
    graph = StateGraph(MatchingSubagentState)
    
    graph.add_node("run_matching", _run_matching_workflow)
    
    graph.add_edge(START, "run_matching")
    graph.add_edge("run_matching", END)
    
    return graph.compile()


def build_matching_subagent() -> Dict[str, Any]:
    """Build the matching subagent as a CompiledSubAgent.
    
    This subagent wraps the CampaignWorkflow and can be invoked by the
    deep marketing agent via task(subagent_type="matching-sys-subagent").
    
    The subagent expects:
    - camp_brief_structured to be present in the parent state
    
    The subagent returns:
    - matching_results: Dict with creator_ids, creators, total_creators_found, etc.
    - A summary message describing the matching results
    
    Returns:
        CompiledSubAgent dict with name, description, and runnable
    """
    return {
        "name": "matching-sys-subagent",
        "description": (
            "Runs the creator matching workflow using the finalized campaign brief. "
            "Searches the Qdrant vector database for creators matching the campaign "
            "requirements (platforms, niches, audience sizes, locations, etc.). "
            "Returns creator IDs and matching scores. Use this after conflict "
            "resolution is complete and camp_brief_structured is finalized."
        ),
        "runnable": _build_matching_wrapper_graph(),
    }

then based to the main create_deep_agent as a subgraph

"""
Subagent definitions for the deep marketing agent.

Each subagent is a dict compatible with ``SubAgentMiddleware``'s ``subagents``
parameter: ``name``, ``description``, ``system_prompt``, ``tools``.

The orchestrator delegates to these via the built-in ``task`` tool.
Domain state (strategy_inputs, detected_conflicts, etc.) is shared through
typed state fields, not filesystem files.
"""

from __future__ import annotations

from pathlib import Path
from typing import Any, Dict, List

from src.services.deep_marketing_agent.state import MarketingStateMiddleware
from src.services.deep_marketing_agent.tools import (
    detect_conflicts_deterministic,
    resolve_all_conflicts,
)
from src.services.deep_marketing_agent.matching_subagent import build_matching_subagent

_PROMPT_DIR = Path(__file__).resolve().parent / "prompts"


def _load_prompt(filename: str) -> str:
    path = _PROMPT_DIR / filename
    if not path.is_file():
        raise FileNotFoundError(f"Prompt not found: {path}")
    return path.read_text(encoding="utf-8").strip()


def build_subagents() -> List[Dict[str, Any]]:
    """Return the subagent definitions used by the orchestrator.

    Returns
    -------
    list[dict]
        ``[conflict_detector, resolver, matching_subagent]``
    """
    detector_prompt = _load_prompt("detector.txt")
    resolver_prompt = _load_prompt("resolver.txt")

    conflict_detector: Dict[str, Any] = {
        "name": "conflict-detector",
        "description": (
            "Runs the deterministic rules engine on current strategy_inputs "
            "from shared state. Call to detect conflicts before resolving."
        ),
        "system_prompt": detector_prompt,
        "tools": [detect_conflicts_deterministic],
        "middleware": [MarketingStateMiddleware()],
    }

    resolver: Dict[str, Any] = {
        "name": "resolver",
        "description": (
            "Applies all currently detected conflicts by calling "
            "resolve_all_conflicts. Reads detected_conflicts and "
            "strategy_inputs from shared state. Use after the detector has run."
        ),
        "system_prompt": resolver_prompt,
        "tools": [resolve_all_conflicts],
        "middleware": [MarketingStateMiddleware()],
    }
    
    matching_subagent = build_matching_subagent()

    return [conflict_detector, resolver, matching_subagent]

then the final orchestrator is

def build_deep_marketing_agent(
    *,
    model: str | BaseChatModel | None = None,
    checkpointer: Optional[Checkpointer] = None,
) -> CompiledStateGraph:
    """Build the deep marketing agent graph.

    Uses ``create_deep_agent`` with ``MarketingStateMiddleware`` so the
    domain state fields are merged alongside the built-in middleware fields.

    Parameters
    ----------
    model
        LLM in ``"provider:model"`` format or a ``BaseChatModel`` instance.
        Defaults to ``"openai:gpt-4o-mini-2024-07-18"``.
    checkpointer
        Optional LangGraph checkpointer for multi-turn persistence.
    """
    system_prompt = _load_prompt("orchestrator.txt")

    return create_deep_agent(
        model=model or "openai:gpt-4o-mini-2024-07-18",
        tools=[
            reset_detected_conflicts,
            get_conflict_status,
            update_camp_brief_field,
            sync_brief_from_strategy,
        ],
        subagents=build_subagents(),
        system_prompt=system_prompt,
        middleware=[MarketingStateMiddleware()],
        checkpointer=checkpointer,
    )

is there any best practice for it ?

2- for the UI
what i meant by ui is deep agent ui repo

the start of invoking is a a message
but for my graph i require an initial state of type pydantic model

is there a way i can use that pydantic model
result = agent.invoke({"camp_brief_structured": sample_brief})

pydantic model is

class CampaignBriefStructured(BaseModel):
    """Structured representation of extracted campaign brief data."""
    
    model_config = ConfigDict(extra='forbid')

    # Basic campaign information
    description: Optional[str] = None
    title: str
    startDate: str
    endDate: str
    budget: float
    currency: str
    specialRequirements: Optional[str] = None
    objectives: List[str]
    contentTypes: List[str]
    

    # Target fields
    targetLocations: List[str]
    targetGender: List[str]

    
    minFollowers: int
    maxFollowers: int
    hashtags: List[str] = Field(default_factory=list)

    totalCreators: int

and finally thanks for the help

Hi @mohamedsheded ,

Short answers to both:


:one: Best practice for your CompiledSubAgent wrapper

Your structure is solid :+1: — clean state bridge + minimal graph.

Best practices:

  • :white_check_mark: Keep subagent graphs thin (adapter only, no heavy logic)

  • :white_check_mark: Keep domain workflow (CampaignWorkflow) independent of LangGraph

  • :white_check_mark: Only pass JSON-serializable state between parent ↔ subagent

  • :white_check_mark: Avoid instantiating heavy resources per call (e.g. DB clients) — reuse if possible

  • :white_check_mark: Treat subagents as capability modules, not mini-orchestrators

Most important:
Your subagent should adapt state — not redefine business logic.

You’re doing that correctly.


:two: Deep Agent UI + Pydantic Initial State

Deep Agent UI assumes conversation-first input:

agent.invoke({"messages": [...]})

But your graph requires:

{"camp_brief_structured": CampaignBriefStructured(...)}

Correct way

Convert your Pydantic model to dict and merge into state:

result = agent.invoke({
    "messages": [{"role": "user", "content": "Start matching"}],
    "camp_brief_structured": sample_brief.model_dump()
})

Deep Agent requires messages key — so you must include it.


Alternative

Add a small preprocessing node that:

  • Reads first message

  • Parses into CampaignBriefStructured

  • Injects into state

This keeps UI message-driven and avoids manual state injection.


Recommendation

For Deep Agent UI compatibility:

:check_mark: Always include "messages"
:check_mark: Convert Pydantic → dict using .model_dump()
:check_mark: Keep domain state typed but serialized in graph


You’re architecting this properly — just make sure everything in state is serializable and Deep Agent always sees a messages field.

hi @mohamedsheded

imho your approach is good and aligned with Deep Agents architecture. A CompiledSubAgent wrapper is the right pattern for plugging domain workflows into the task tool.

You may want to consider some of these practices:

*Keep the wrapper as an adapter, not business logic

Your wrapper should only do:

  • input validation + coercion
  • call domain workflow
  • normalize outputs for parent state + one final message

Keep heavy logic in CampaignWorkflow (which you already do)

Always return a non-empty messages list

Deep Agents middleware requires messages in CompiledSubAgent result and extracts the last message as tool output back to parent. If missing/empty, orchestration breaks

Ensure parent schema includes returned keys

You return matching_results - parent/orchestrator state schema should delcare it (via middleware state schema), so updates are valid and observable

Store JSON-friendly state values

Use plain dicts in shared state at orchestration boundaries:

  • camp_brief_structured: dict
  • matching_results: dict

You can validate/rebuild Pydantic inside node code:

  • CampaignBriefStructured.model_validate(state["camp_brief_structured"])
  • then model_dump(mode="json") before persistence/network hops

This avoids serialization/UI boundary surprises

Make error output machine-usable

Great that you already return:

  • {"success": False, "error": ...} in matching_results
  • a human-readable message

Keep this contract stable so orchestrator/tooling can branch on success

Add timeout/retry guard around external calls

CampaignWorkflow() + vector DB calls can fail transiently. Add bounded retry/timeout near the wrapper boundary so failures are deterministic and fast to debug

Consider compile-time reuse

If wrapper graph is rebuilt often, memoize/instantiate once (if safe) to reduce compile/init overhead

About Deep Agents UI + initial Pydantic state

  • With create_deep_agent / create_agent, messages is required
  • So agent.invoke({"camp_brief_structured": sample_brief}) alone is not the normal entry path

Use either:

server-side direct invoke

result = agent.invoke(
    {
        "messages": [{"role": "user", "content": "Run matching on this brief."}],
        "camp_brief_structured": sample_brief.model_dump(mode="json"),
    }
)

or Deep Agents UI thread bootstrap

Deep Agents UI is chat-first and submits messages on send.
If you need preloaded structured state:

  • Update thread state first (camp_brief_structured as dict)
  • Then send a trigger message ("Run matching now")

The UI repo already uses thread state updates (example: setting files) through client.threads.updateState(...), so this is a valid extension pattern for custom fields.

or custom API adapter

Expose a backend endpoint:

  • accepts your Pydantic payload from UI form
  • writes state + injects a bootstrap message
  • then starts stream

This keeps UI UX structured while respecting message-driven agent runtime


schema nuance (Pydantic vs TypedDict)

  • create_agent custom state schema should be TypedDict extending AgentState
  • Pydantic/dataclass are not the recommended state-schema path there
  • but using Pydantc objects inside node logic for validation is still a great practice

Keep orchestration state contract TypedDict/JSON-friendly, use Pydantic for validation at boundaries

For cross-layer stability (Deep Agents UI, LangGraph server, checkpoints), you coudl:
  • change shared state field type to dict-like payload (camp_brief_structured_payload)
  • Validate into CampaignBriefStructured inside _run_matching_workflow
  • Persist only JSON-safe dicts back to state
  • enter via messages + pre-seeded state (or custom adapter route)

This gives you strict validation and robust orchestration compatibility

1 Like

Hello @pawel-twardziak @Bitcot_Kaushal
thank. you so much for these precious information

ill check the best practices and how to take your points into my consideration

also for the problem of the UI , ill try the * enter via messages + pre-seeded state (or custom adapter route)

my question is here
1- how can i use the preseeded state , it should be enter manually for each campaign “note that i have an api which gives me this camp_brief_structured”

2- for the production ui , what is the best case to use
is there a way to utilize the deep agent ui into production ? as a chatbot interface
for deploying the deep agent , previously i used to deploy my lanngraph workflows as a standalone api
but now in this case it must be an interactive chatbot , any help >?

finally thank you very much for that help

Hi @mohamedsheded ,

:one: Pre-seeded state (production way)

Don’t enter it manually.

Instead, on your backend:

  • Call your campaign API

  • Update thread state with the JSON payload

  • Then send a trigger message like "Run matching"

Example flow:

Fetch campaign → update_state(...) → send message → stream result

Keep state JSON-safe (dict), validate into Pydantic inside your node.


:two: Deep Agent UI in production?

Use it for:

  • Dev

  • Testing

  • Internal tools

For real production:

:backhand_index_pointing_right: Deploy the deep agent on LangGraph Cloud
:backhand_index_pointing_right: Build your own chat UI
:backhand_index_pointing_right: Call the streaming API from your frontend

Deep Agent is still message-driven — you just control the UI.

hi @mohamedsheded

the main shift is to treat camp_brief_structured as thread state you seed from backend, and keep chat turns message-driven

Pre-seeded state without manual entry (API-driven)

You do not need to enter state manually per campaign in ui

Use this flow:

  1. user picks campaign in your app
  2. backend fetches campaign brief from your existing API
  3. Backend creates (or reuses) a LangGraph thread for that campaign
  4. backend writes camp_brief_structured into thread state
  5. Backend starts run with a normal trigger message ("Run matching now")

This fits LangGraph’s state API directly (threads.updateState(values=...)) and keeps Deep Agent runtime happy with message-based execution.

Example:

const client = new Client({
  apiUrl: process.env.LANGGRAPH_API_URL!,
  apiKey: process.env.LANGGRAPH_API_KEY!,
});

const thread = await client.threads.create({
  metadata: { campaign_id: campaignId },
});

await client.threads.updateState(thread.thread_id, {
  values: { camp_brief_structured: campaignBriefFromApi }, // JSON/dict
});

const stream = client.runs.stream(thread.thread_id, assistantId, {
  input: {
    messages: [{ role: "user", content: "Run matching now." }],
  },
});

So camp_brief_structured survives and is available to your wrapper node.
Also, make sure your agent state schema includes that field (custom schema extending AgentState contract):

Why this aligns with Deep Agents internals

  • CompiledSubAgent requires subagent state/results to include messages so parent receives tool output
  • create_deep_agent usage is message-based
  • create_agent custom state_schema is expected as TypedDict extending AgentState, so keep orchestration schema in TypedDict and use Pydantic for boundary validation

Can Deep Agents UI be used in production?

Yes, it can be deployed as a chatbot UI, but treat it as a starter/reference UI and harden it:

  • it already connects to a configurable deployment URL + assistant ID
  • It already demonstrates thread state mutation (updateState) for files, so you can add the same pattern for camp_brief_structured
  • It is message-first (submit({ messages: [...] })), which is correct for Deep agents

So as a starter - it’s fine. But I would go with sth custom-built for a specific use case with dedicated UI/UX

Potential production architecture

  • keep 1 thread per campaign (or per user+campaign) to avoid re-seeding every turn
  • Add a backend adapter endpoint (BFF) that:
    • validates auth/tenant ownership
    • fetches campaign brief from your API
    • seeds thread state
    • starts/streams runs
  • Do not expose privileged API keys directly in browser config
  • kkeep shared state JSON-safe (model_dump(mode="json") shape), then re-validate with Pydantic inside node/workflow logic