MESSAGE_COERCION_FAILURE Using Redis Checkpointer with langraph

Hey folks , I am trying to use a redis checkpoiter to store short term message history but I get the below error and I am not able to debug it . seems like an underlying where the message conversion is not happening the correct way

I am using the latest langraph - 0.6.4
and langgraph-checkpoint-redis: 0.1.0

{“message”: "Error in streaming request: Message dict must contain ‘role’ and ‘content’ keys, got {‘lc’: 1, ‘type’: ‘constructor’, ‘id’: [‘langchain’, ‘schema’, ‘messages’, ‘HumanMessage’], ‘kwargs’: {‘content’: ‘hey’, ‘type’: ‘human’, ‘id’: ‘cae63b17-6103-420d-afec-9f9a0cf1ccf2’}}\nFor troubleshooting, visit: https://python.langchain.com/docs/troubleshooting/errors/MESSAGE_COERCION_FAILURE` "}`

Below is the code - Any help would be appreciated

class MarketingSupervisorState(MessagesState):
    use_case_selected: Optional[str]  # The preselected use case ID
    space_id: Optional[str]  # Space ID for conversation context


class MarketingOrchestrator:
    """
    Marketing Supervisor Agent using LangGraph's handoff-based multi-agent pattern.
    
    This implementation uses the existing agent classes from the agents folder,
    which already have proper handoff capabilities using LangGraph's Command primitives.
    Now includes short-term memory persistence using MongoDB checkpointer.
    """
    
    def __init__(self, llm: Optional[ChatOpenAI] = None, batch_interval: float = 4.0, min_content_threshold: int = 10):
        self.llm = llm or ChatOpenAI(model="gpt-4o-mini")
        self.logger = Logger.get_logger(__name__)
        self.use_case_router = UseCaseRouter()
        
        # Batching configuration
        self.batch_interval = batch_interval
        self.min_content_threshold = min_content_threshold
        
        # Initialize specialized agents
        self.supervisor_agent = MarketingSupervisorAgent(self.llm)
        self.linkedin_agent = LinkedInAgent(self.llm)
        self.content_strategy_agent = ContentStrategyAgent(self.llm)
        self.social_media_agent = SocialMediaAgent(self.llm)
        
        # Build the multi-agent graph builder (will be compiled with checkpointer)
        self.multi_agent_graph_builder = self._build_multi_agent_graph()
    

        
    
    def configure_batching(self, batch_interval: float = None, min_content_threshold: int = None):
        """
        Configure batching parameters for content streaming.
        
        Args:
            batch_interval: Time interval in seconds for batching content
            min_content_threshold: Minimum content length to trigger a batch send
        """
        if batch_interval is not None:
            self.batch_interval = batch_interval
        
        if min_content_threshold is not None:
            self.min_content_threshold = min_content_threshold
    
    def get_batching_config(self) -> dict:
        """Get current batching configuration."""
        return {
            "batch_interval": self.batch_interval,
            "min_content_threshold": self.min_content_threshold
        }
    
    def _build_multi_agent_graph(self):
        """Build the multi-agent graph with routing logic and checkpointer."""
        
        def check_use_case_and_route(state: MarketingSupervisorState, runtime: Runtime[UserContext]) -> Literal[SUPERVISOR_AGENT, LINKEDIN_AGENT, CONTENT_STRATEGY_AGENT, SOCIAL_MEDIA_AGENT]:
            """
            Dedicated routing node that checks use_case_selected and routes accordingly.
            Now also uses runtime context for user personalization.
            
            Args:
                state: Current state containing use_case_selected
                runtime: Runtime context with user information
                
            Returns:
                Target agent based on use case or supervisor if none selected
            """
            # Access runtime context for user personalization
            
            if "use_case_selected" in state and state["use_case_selected"]:
                use_case = state["use_case_selected"]
                
                target_agent = AGENT_MAPPING.get(use_case)
                if target_agent:
                    self.logger.info(f"Routing directly to {target_agent} for use case: {use_case}")
                    return target_agent
                else:
                    self.logger.warning(f"Unknown use case: {use_case}, routing to supervisor")
            
            # Default to supervisor for general routing
            self.logger.info("No use case preselected, routing to supervisor")
            return DEFAULT_ROUTING_AGENT
        
        # Build the multi-agent graph with dedicated routing
        builder = StateGraph(MarketingSupervisorState,context_schema=UserContext)
        
        # Add nodes - each agent handles its own state management
        builder.add_node(SUPERVISOR_AGENT, self.supervisor_agent.get_agent())
        builder.add_node(LINKEDIN_AGENT, self.linkedin_agent.get_agent())
        builder.add_node(CONTENT_STRATEGY_AGENT, self.content_strategy_agent.get_agent())
        builder.add_node(SOCIAL_MEDIA_AGENT, self.social_media_agent.get_agent())
        
        # Add conditional routing from START based on use case
        builder.add_conditional_edges(
            START,
            check_use_case_and_route,
            {
                SUPERVISOR_AGENT: SUPERVISOR_AGENT,
                LINKEDIN_AGENT: LINKEDIN_AGENT,
                CONTENT_STRATEGY_AGENT: CONTENT_STRATEGY_AGENT,
                SOCIAL_MEDIA_AGENT: SOCIAL_MEDIA_AGENT
            }
        )
        
        # Add routing from supervisor to specialized agents (for handoffs)
        builder.add_conditional_edges(
            SUPERVISOR_AGENT,
            lambda state: self._get_supervisor_routing(state),
            {
                LINKEDIN_AGENT: LINKEDIN_AGENT,
                CONTENT_STRATEGY_AGENT: CONTENT_STRATEGY_AGENT,
                SOCIAL_MEDIA_AGENT: SOCIAL_MEDIA_AGENT,
                "__end__": END
            }
        )
        
        # Add routing from specialized agents to END
        builder.add_edge(LINKEDIN_AGENT, END)
        builder.add_edge(CONTENT_STRATEGY_AGENT, END)
        builder.add_edge(SOCIAL_MEDIA_AGENT, END)
        
        return builder
    
    def _get_supervisor_routing(self, state: MarketingSupervisorState) -> Literal[LINKEDIN_AGENT, CONTENT_STRATEGY_AGENT, SOCIAL_MEDIA_AGENT, "__end__"]:
        """
        Determine routing from supervisor agent based on state.
        
        Args:
            state: Current state containing messages and use case info
            
        Returns:
            Target agent or END if no further routing needed
        """
        # Check if supervisor has decided to route to a specific agent
        if "use_case_selected" in state and state["use_case_selected"]:
            use_case = state["use_case_selected"]
            
            target_agent = AGENT_MAPPING.get(use_case)
            if target_agent:
                self.logger.info(f"Routing to {target_agent} for use case: {use_case}")
                return target_agent
            else:
                self.logger.warning(f"Unknown use case: {use_case}, ending")
                return "__end__"
        
        return "__end__"
    
    async def _get_or_create_thread_id(self, space_id: Optional[str]) -> str:
        """
        Get or create a thread ID for the given space.
        This ensures conversation continuity across multiple interactions.
        
        Args:
            space_id: Optional space ID
            
        Returns:
            A unique thread ID for the conversation
        """
        if not space_id:
            # Generate a unique thread ID if no space_id is provided
            return f"temp_thread_{uuid.uuid4().hex[:8]}"
        
        # Use space_id directly as thread_id for simplicity
        thread_id = space_id
        
        return thread_id
    

    

    async def stream_request(self, request: SupervisorRequest) -> AsyncGenerator[StreamingEvent, None]:
        """Stream process a request using the handoff-based multi-agent system with memory and time-based batching."""
        try:
            start_time = time.time()
            last_progress_update = start_time
            progress_update_interval = 5.0  # Send progress updates every 5 seconds
            
            # Initialize utilities with time-based batching (4 second intervals)
            message_manager = MessageGroupingManager(
                batch_interval=self.batch_interval, 
                min_content_threshold=self.min_content_threshold
            )
            
            # Initialize message ID manager for conversation flow
            message_id_manager = MessageIDManager()
            initial_message_id = message_id_manager.start_conversation()
            
            # Send start event with initial message ID
            yield StartEvent(
                message="Thinking..",
                task_name="marketing_supervisor",
                message_id=initial_message_id,
                metadata=StreamingEventFormatter.clean_metadata(
                    use_case_id=request.use_case_id,
                    space_id=request.space_id
                )
            )
            
            initial_state = MarketingSupervisorState(
                messages=[
                    {
                        "role": "user",
                        "content": request.query
                    }
                ],
                use_case_selected=request.use_case_id,
                space_id=request.space_id
            )
            
            # Get or create thread ID for memory persistence
            thread_id = await self._get_or_create_thread_id(request.space_id)
            
            # Create runtime context with user information
            user_data = await get_user_data(request.user_id)
            runtime_context = user_data
            
            # Use Redis checkpointer with context manager pattern
            redis_url = sttgs.get("REDIS_URL", "redis://localhost:6379")
            async with AsyncRedisSaver.from_conn_string(redis_url) as checkpointer:

                await checkpointer.asetup()
                # Compile graph once with this checkpointer (like the working example)
                graph = self.multi_agent_graph_builder.compile(checkpointer=checkpointer)
                
                                # Stream the request and yield responses
                async for item in graph.astream(
                    initial_state,
                    config={"configurable": {"thread_id": thread_id,"user_id":request.user_id}},
                    context=runtime_context,
                    subgraphs=True,
                    stream_mode=["messages","custom"]
                ):
                    event_id = str(uuid.uuid4())[:8]  # Generate one UUID per event for internal tracking
                    
                    # Handle different types of LangGraph events
                    if isinstance(item, tuple) and len(item) == 3:
                        node_name, event_type, event_data = item
                        
                        # Extract agent name from node
                        agent_name = str(node_name[0]) if node_name else 'unknown'
                        
                        if event_type == 'custom':
                           // not related 

In absence of complete code, It cannot be said for sure, but I believe this error is because at some point you are trying to use create_react_agent and update the messages key in state with incorrect type
See below minimal example:

from langgraph.graph import START, StateGraph
from typing_extensions import TypedDict
from typing import Annotated, Sequence
from langchain_core.messages import (
    BaseMessage,
    HumanMessage
)

llm = <your_model>

agent = create_react_agent(
    model=llm,
    tools=[add, multiply],
    name="test_agent",
    prompt="You are a test agent. Always use one tool at a time."
)

class State(TypedDict):
    messages: Annotated[Sequence[BaseMessage], add_messages]

def graph_node_1(state: State):
    # Check here messages is expected to be list
    global agent
    response = agent.invoke(state)
    print(f"response: {response}") # This is a dict
    return Command(update={"messages": response}, goto="graph_node_2")

def graph_node_2(state: State):
    print("state in node 2")
    return state


builder = StateGraph(State)
builder.add_node(graph_node_1)
builder.add_node(graph_node_2)
builder.add_edge(START, "graph_node_1")
builder.add_edge("graph_node_1", "graph_node_2")
graph = builder.compile(name="test_graph")

graph.invoke({"messages": [HumanMessage(content="Hello")]})

Error:

---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
File /localdisk/workspace/langgraph-learning/.venv/lib64/python3.12/site-packages/langchain_core/messages/utils.py:330, in _convert_to_message(message)
    329 try:
--> 330     msg_type = msg_kwargs.pop("role")
    331 except KeyError:

KeyError: 'role'

During handling of the above exception, another exception occurred:

KeyError                                  Traceback (most recent call last)
File /localdisk/workspace/langgraph-learning/.venv/lib64/python3.12/site-packages/langchain_core/messages/utils.py:332, in _convert_to_message(message)
    331 except KeyError:
--> 332     msg_type = msg_kwargs.pop("type")
    333 # None msg content is not allowed

KeyError: 'type'

The above exception was the direct cause of the following exception:

ValueError                                Traceback (most recent call last)
Cell In[501], line 51
     48 builder.add_edge("graph_node_1", "graph_node_2")
     49 graph = builder.compile(name="test_graph")
---> 51 graph.invoke({"messages": [HumanMessage(content="Hello")]})

File /localdisk/workspace/langgraph-learning/.venv/lib64/python3.12/site-packages/langgraph/pregel/main.py:3026, in Pregel.invoke(self, input, config, context, stream_mode, print_mode, output_keys, interrupt_before, interrupt_after, durability, **kwargs)
   3023 chunks: list[dict[str, Any] | Any] = []
   3024 interrupts: list[Interrupt] = []
-> 3026 for chunk in self.stream(
   3027     input,
   3028     config,
   3029     context=context,
   3030     stream_mode=["updates", "values"]
   3031     if stream_mode == "values"
   3032     else stream_mode,
   3033     print_mode=print_mode,
   3034     output_keys=output_keys,
   3035     interrupt_before=interrupt_before,
   3036     interrupt_after=interrupt_after,
   3037     durability=durability,
   3038     **kwargs,
   3039 ):
   3040     if stream_mode == "values":
   3041         if len(chunk) == 2:

File /localdisk/workspace/langgraph-learning/.venv/lib64/python3.12/site-packages/langgraph/pregel/main.py:2657, in Pregel.stream(self, input, config, context, stream_mode, print_mode, output_keys, interrupt_before, interrupt_after, durability, subgraphs, debug, **kwargs)
   2647 for _ in runner.tick(
   2648     [t for t in loop.tasks.values() if not t.writes],
   2649     timeout=self.step_timeout,
   (...)   2652 ):
   2653     # emit output
   2654     yield from _output(
   2655         stream_mode, print_mode, subgraphs, stream.get, queue.Empty
   2656     )
-> 2657 loop.after_tick()
   2658 # wait for checkpoint
   2659 if durability_ == "sync":

File /localdisk/workspace/langgraph-learning/.venv/lib64/python3.12/site-packages/langgraph/pregel/_loop.py:525, in PregelLoop.after_tick(self)
    523 writes = [w for t in self.tasks.values() for w in t.writes]
    524 # all tasks have finished
--> 525 self.updated_channels = apply_writes(
    526     self.checkpoint,
    527     self.channels,
    528     self.tasks.values(),
    529     self.checkpointer_get_next_version,
    530     self.trigger_to_nodes,
    531 )
    532 # produce values output
    533 if not self.updated_channels.isdisjoint(
    534     (self.output_keys,)
    535     if isinstance(self.output_keys, str)
    536     else self.output_keys
    537 ):

File /localdisk/workspace/langgraph-learning/.venv/lib64/python3.12/site-packages/langgraph/pregel/_algo.py:298, in apply_writes(checkpoint, channels, tasks, get_next_version, trigger_to_nodes)
    296 for chan, vals in pending_writes_by_channel.items():
    297     if chan in channels:
--> 298         if channels[chan].update(vals) and next_version is not None:
    299             checkpoint["channel_versions"][chan] = next_version
    300             # unavailable channels can't trigger tasks, so don't add them

File /localdisk/workspace/langgraph-learning/.venv/lib64/python3.12/site-packages/langgraph/channels/binop.py:93, in BinaryOperatorAggregate.update(self, values)
     91     values = values[1:]
     92 for value in values:
---> 93     self.value = self.operator(self.value, value)
     94 return True

File /localdisk/workspace/langgraph-learning/.venv/lib64/python3.12/site-packages/langgraph/graph/message.py:47, in _add_messages_wrapper.<locals>._add_messages(left, right, **kwargs)
     43 def _add_messages(
     44     left: Messages | None = None, right: Messages | None = None, **kwargs: Any
     45 ) -> Messages | Callable[[Messages, Messages], Messages]:
     46     if left is not None and right is not None:
---> 47         return func(left, right, **kwargs)
     48     elif left is not None or right is not None:
     49         msg = (
     50             f"Must specify non-null arguments for both 'left' and 'right'. Only "
     51             f"received: '{'left' if left else 'right'}'."
     52         )

File /localdisk/workspace/langgraph-learning/.venv/lib64/python3.12/site-packages/langgraph/graph/message.py:190, in add_messages(left, right, format)
    183 # coerce to message
    184 left = [
    185     message_chunk_to_message(cast(BaseMessageChunk, m))
    186     for m in convert_to_messages(left)
    187 ]
    188 right = [
    189     message_chunk_to_message(cast(BaseMessageChunk, m))
--> 190     for m in convert_to_messages(right)
    191 ]
    192 # assign missing ids
    193 for m in left:

File /localdisk/workspace/langgraph-learning/.venv/lib64/python3.12/site-packages/langchain_core/messages/utils.py:368, in convert_to_messages(messages)
    366 if isinstance(messages, PromptValue):
    367     return messages.to_messages()
--> 368 return [_convert_to_message(m) for m in messages]

File /localdisk/workspace/langgraph-learning/.venv/lib64/python3.12/site-packages/langchain_core/messages/utils.py:340, in _convert_to_message(message)
    336         msg = f"Message dict must contain 'role' and 'content' keys, got {message}"
    337         msg = create_message(
    338             message=msg, error_code=ErrorCode.MESSAGE_COERCION_FAILURE
    339         )
--> 340         raise ValueError(msg) from e
    341     message_ = _create_message_from_message_type(
    342         msg_type, msg_content, **msg_kwargs
    343     )
    344 else:

ValueError: Message dict must contain 'role' and 'content' keys, got {'messages': [HumanMessage(content='Hello', additional_kwargs={}, response_metadata={}, id='ed5ebd5e-8591-40a9-a059-8fb40ead95fc'), AIMessage(content='Hello! How can I assist you today?', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 28, 'prompt_tokens': 83, 'total_tokens': 111, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'o4-mini-2025-04-16', 'system_fingerprint': None, 'id': 'chatcmpl-CKMtP0FbAKJ1S1lBUhE80QzIb9iGQ', 'service_tier': None, 'prompt_filter_results': [{'prompt_index': 0, 'content_filter_results': {'hate': {'filtered': False, 'severity': 'safe'}, 'self_harm': {'filtered': False, 'severity': 'safe'}, 'sexual': {'filtered': False, 'severity': 'safe'}, 'violence': {'filtered': False, 'severity': 'safe'}}}], 'finish_reason': 'stop', 'logprobs': None, 'content_filter_results': {'hate': {'filtered': False, 'severity': 'safe'}, 'self_harm': {'filtered': False, 'severity': 'safe'}, 'sexual': {'filtered': False, 'severity': 'safe'}, 'violence': {'filtered': False, 'severity': 'safe'}}}, name='test_agent', id='run--a9a33653-bc9c-4615-9480-25e2d9671301-0', usage_metadata={'input_tokens': 83, 'output_tokens': 28, 'total_tokens': 111, 'input_token_details': {'audio': 0, 'cache_read': 0}, 'output_token_details': {'audio': 0, 'reasoning': 0}})]}
For troubleshooting, visit: https://python.langchain.com/docs/troubleshooting/errors/MESSAGE_COERCION_FAILURE

Shared example will work fine, if graph_node_1 is updated with

def graph_node_1(state: State):
    # Check here messages is expected to be list
    global agent
    response = agent.invoke(state)
    print(f"response: {response}") # This is a dict of message, get the messages key to update the state
    return Command(update={"messages": response["messages"]}, goto="graph_node_2")
1 Like

Hi @AdityaVernekar

Your Redis checkpointer is restoring messages in the LangChain JSON (“lc”) shape instead of chat message shape. Use LangGraph’s JSON+ serializer for the checkpointer and ensure your state carries messages as either BaseMessage objects or {role, content} dicts. Clear any old thread state and re-run.

Symptom

You see:

Message dict must contain 'role' and 'content' keys, got {
  'lc': 1,
  'type': 'constructor',
  'id': ['langchain','schema','messages','HumanMessage'],
  'kwargs': {'content': 'hey','type': 'human','id': '...'}
}

That object is the LangChain JSON serialization of a HumanMessage (“lc” shape). Chat models in v0 expect either BaseMessage instances or dicts like {role: 'user', content: '...'}; they cannot consume the “lc” JSON payload.

Root cause

  • Messages persisted by the checkpointer are being reloaded as the LangChain JSON form instead of actual BaseMessages or {role, content} dicts.

  • This typically happens when the checkpointer is not using LangGraph’s JSON+ serializer, or when code somewhere stores message.to_dict() rather than the message itself.

Fix (LangGraph-v0-compatible)

  1. Initialize the Redis checkpointer with JSON+ serde and call asetup():
from langgraph.checkpoint.redis.aio import AsyncRedisSaver
from langgraph.checkpoint.serde.jsonplus import JsonPlusSerializer

redis_url = os.getenv("REDIS_URL", "redis://localhost:6379")
serde = JsonPlusSerializer()

async with AsyncRedisSaver.from_conn_string(redis_url, serde=serde) as checkpointer:
    await checkpointer.asetup()
    graph = builder.compile(checkpointer=checkpointer)
    # ... run astream / invoke with thread_id as shown below
  1. Ensure your state uses MessagesState and that you add messages as either dicts with role/content or BaseMessage instances (not their .to_dict()):
from typing import Optional
from typing_extensions import TypedDict
from langgraph.graph import MessagesState

class MarketingSupervisorState(MessagesState):
    use_case_selected: Optional[str]
    space_id: Optional[str]

# Good initial state
initial_state = MarketingSupervisorState(
    messages=[{"role": "user", "content": request.query}],
    use_case_selected=request.use_case_id,
    space_id=request.space_id,
)

# Alternatively (also good):
# from langchain_core.messages import HumanMessage
# initial_state = MarketingSupervisorState(
#     messages=[HumanMessage(content=request.query)],
#     use_case_selected=..., space_id=...
# )
  1. Pass a stable thread_id and clear legacy state before retesting:
  • If you previously wrote messages without JSON+ serde, wipe that thread or use a new thread_id to avoid mixing formats.
  • Always pass thread_id via config:
async for event in graph.astream(
    initial_state,
    config={"configurable": {"thread_id": thread_id, "user_id": request.user_id}},
    context=runtime_context,
    subgraphs=True,
    stream_mode=["messages", "custom"],
):
    ...
  1. Avoid producing “lc” JSON yourself
  • Do not call message.to_dict() when storing to state or emitting events.

  • Either pass BaseMessage objects or the simple {role, content} dicts.

Minimal working setup (excerpt)

from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.redis.aio import AsyncRedisSaver
from langgraph.checkpoint.serde.jsonplus import JsonPlusSerializer

builder = StateGraph(MarketingSupervisorState, context_schema=UserContext)
# builder.add_node(...); builder.add_edge(...); builder.add_conditional_edges(...)

redis_url = os.getenv("REDIS_URL", "redis://localhost:6379")
serde = JsonPlusSerializer()

async with AsyncRedisSaver.from_conn_string(redis_url, serde=serde) as checkpointer:
    await checkpointer.asetup()
    graph = builder.compile(checkpointer=checkpointer)

    initial_state = MarketingSupervisorState(
        messages=[{"role": "user", "content": "hey"}],
        use_case_selected=None,
        space_id=None,
    )

    async for _ in graph.astream(
        initial_state,
        config={"configurable": {"thread_id": "my-thread-1"}},
        subgraphs=True,
        stream_mode=["messages"],
    ):
        pass

Common pitfalls checklist

  • No JSON+ serde on the checkpointer: add serde=JsonPlusSerializer() and call asetup().

  • Storing .to_dict() outputs: store BaseMessage objects or {role, content} dicts instead.

  • Legacy persisted state: clear the thread or use a new thread_id after fixing serde.

  • Missing thread_id: always pass configurable.thread_id so short-term memory attaches to the right thread.

Once these are in place, the Redis checkpointer and MessagesState will round‑trip message history in the expected v0 chat format, eliminating the MESSAGE_COERCION_FAILURE.