Hey folks , I am trying to use a redis checkpoiter to store short term message history but I get the below error and I am not able to debug it . seems like an underlying where the message conversion is not happening the correct way
I am using the latest langraph - 0.6.4
and langgraph-checkpoint-redis: 0.1.0
{“message”: "Error in streaming request: Message dict must contain ‘role’ and ‘content’ keys, got {‘lc’: 1, ‘type’: ‘constructor’, ‘id’: [‘langchain’, ‘schema’, ‘messages’, ‘HumanMessage’], ‘kwargs’: {‘content’: ‘hey’, ‘type’: ‘human’, ‘id’: ‘cae63b17-6103-420d-afec-9f9a0cf1ccf2’}}\nFor troubleshooting, visit: https://python.langchain.com/docs/troubleshooting/errors/MESSAGE_COERCION_FAILURE` "}`
Below is the code - Any help would be appreciated
class MarketingSupervisorState(MessagesState):
use_case_selected: Optional[str] # The preselected use case ID
space_id: Optional[str] # Space ID for conversation context
class MarketingOrchestrator:
"""
Marketing Supervisor Agent using LangGraph's handoff-based multi-agent pattern.
This implementation uses the existing agent classes from the agents folder,
which already have proper handoff capabilities using LangGraph's Command primitives.
Now includes short-term memory persistence using MongoDB checkpointer.
"""
def __init__(self, llm: Optional[ChatOpenAI] = None, batch_interval: float = 4.0, min_content_threshold: int = 10):
self.llm = llm or ChatOpenAI(model="gpt-4o-mini")
self.logger = Logger.get_logger(__name__)
self.use_case_router = UseCaseRouter()
# Batching configuration
self.batch_interval = batch_interval
self.min_content_threshold = min_content_threshold
# Initialize specialized agents
self.supervisor_agent = MarketingSupervisorAgent(self.llm)
self.linkedin_agent = LinkedInAgent(self.llm)
self.content_strategy_agent = ContentStrategyAgent(self.llm)
self.social_media_agent = SocialMediaAgent(self.llm)
# Build the multi-agent graph builder (will be compiled with checkpointer)
self.multi_agent_graph_builder = self._build_multi_agent_graph()
def configure_batching(self, batch_interval: float = None, min_content_threshold: int = None):
"""
Configure batching parameters for content streaming.
Args:
batch_interval: Time interval in seconds for batching content
min_content_threshold: Minimum content length to trigger a batch send
"""
if batch_interval is not None:
self.batch_interval = batch_interval
if min_content_threshold is not None:
self.min_content_threshold = min_content_threshold
def get_batching_config(self) -> dict:
"""Get current batching configuration."""
return {
"batch_interval": self.batch_interval,
"min_content_threshold": self.min_content_threshold
}
def _build_multi_agent_graph(self):
"""Build the multi-agent graph with routing logic and checkpointer."""
def check_use_case_and_route(state: MarketingSupervisorState, runtime: Runtime[UserContext]) -> Literal[SUPERVISOR_AGENT, LINKEDIN_AGENT, CONTENT_STRATEGY_AGENT, SOCIAL_MEDIA_AGENT]:
"""
Dedicated routing node that checks use_case_selected and routes accordingly.
Now also uses runtime context for user personalization.
Args:
state: Current state containing use_case_selected
runtime: Runtime context with user information
Returns:
Target agent based on use case or supervisor if none selected
"""
# Access runtime context for user personalization
if "use_case_selected" in state and state["use_case_selected"]:
use_case = state["use_case_selected"]
target_agent = AGENT_MAPPING.get(use_case)
if target_agent:
self.logger.info(f"Routing directly to {target_agent} for use case: {use_case}")
return target_agent
else:
self.logger.warning(f"Unknown use case: {use_case}, routing to supervisor")
# Default to supervisor for general routing
self.logger.info("No use case preselected, routing to supervisor")
return DEFAULT_ROUTING_AGENT
# Build the multi-agent graph with dedicated routing
builder = StateGraph(MarketingSupervisorState,context_schema=UserContext)
# Add nodes - each agent handles its own state management
builder.add_node(SUPERVISOR_AGENT, self.supervisor_agent.get_agent())
builder.add_node(LINKEDIN_AGENT, self.linkedin_agent.get_agent())
builder.add_node(CONTENT_STRATEGY_AGENT, self.content_strategy_agent.get_agent())
builder.add_node(SOCIAL_MEDIA_AGENT, self.social_media_agent.get_agent())
# Add conditional routing from START based on use case
builder.add_conditional_edges(
START,
check_use_case_and_route,
{
SUPERVISOR_AGENT: SUPERVISOR_AGENT,
LINKEDIN_AGENT: LINKEDIN_AGENT,
CONTENT_STRATEGY_AGENT: CONTENT_STRATEGY_AGENT,
SOCIAL_MEDIA_AGENT: SOCIAL_MEDIA_AGENT
}
)
# Add routing from supervisor to specialized agents (for handoffs)
builder.add_conditional_edges(
SUPERVISOR_AGENT,
lambda state: self._get_supervisor_routing(state),
{
LINKEDIN_AGENT: LINKEDIN_AGENT,
CONTENT_STRATEGY_AGENT: CONTENT_STRATEGY_AGENT,
SOCIAL_MEDIA_AGENT: SOCIAL_MEDIA_AGENT,
"__end__": END
}
)
# Add routing from specialized agents to END
builder.add_edge(LINKEDIN_AGENT, END)
builder.add_edge(CONTENT_STRATEGY_AGENT, END)
builder.add_edge(SOCIAL_MEDIA_AGENT, END)
return builder
def _get_supervisor_routing(self, state: MarketingSupervisorState) -> Literal[LINKEDIN_AGENT, CONTENT_STRATEGY_AGENT, SOCIAL_MEDIA_AGENT, "__end__"]:
"""
Determine routing from supervisor agent based on state.
Args:
state: Current state containing messages and use case info
Returns:
Target agent or END if no further routing needed
"""
# Check if supervisor has decided to route to a specific agent
if "use_case_selected" in state and state["use_case_selected"]:
use_case = state["use_case_selected"]
target_agent = AGENT_MAPPING.get(use_case)
if target_agent:
self.logger.info(f"Routing to {target_agent} for use case: {use_case}")
return target_agent
else:
self.logger.warning(f"Unknown use case: {use_case}, ending")
return "__end__"
return "__end__"
async def _get_or_create_thread_id(self, space_id: Optional[str]) -> str:
"""
Get or create a thread ID for the given space.
This ensures conversation continuity across multiple interactions.
Args:
space_id: Optional space ID
Returns:
A unique thread ID for the conversation
"""
if not space_id:
# Generate a unique thread ID if no space_id is provided
return f"temp_thread_{uuid.uuid4().hex[:8]}"
# Use space_id directly as thread_id for simplicity
thread_id = space_id
return thread_id
async def stream_request(self, request: SupervisorRequest) -> AsyncGenerator[StreamingEvent, None]:
"""Stream process a request using the handoff-based multi-agent system with memory and time-based batching."""
try:
start_time = time.time()
last_progress_update = start_time
progress_update_interval = 5.0 # Send progress updates every 5 seconds
# Initialize utilities with time-based batching (4 second intervals)
message_manager = MessageGroupingManager(
batch_interval=self.batch_interval,
min_content_threshold=self.min_content_threshold
)
# Initialize message ID manager for conversation flow
message_id_manager = MessageIDManager()
initial_message_id = message_id_manager.start_conversation()
# Send start event with initial message ID
yield StartEvent(
message="Thinking..",
task_name="marketing_supervisor",
message_id=initial_message_id,
metadata=StreamingEventFormatter.clean_metadata(
use_case_id=request.use_case_id,
space_id=request.space_id
)
)
initial_state = MarketingSupervisorState(
messages=[
{
"role": "user",
"content": request.query
}
],
use_case_selected=request.use_case_id,
space_id=request.space_id
)
# Get or create thread ID for memory persistence
thread_id = await self._get_or_create_thread_id(request.space_id)
# Create runtime context with user information
user_data = await get_user_data(request.user_id)
runtime_context = user_data
# Use Redis checkpointer with context manager pattern
redis_url = sttgs.get("REDIS_URL", "redis://localhost:6379")
async with AsyncRedisSaver.from_conn_string(redis_url) as checkpointer:
await checkpointer.asetup()
# Compile graph once with this checkpointer (like the working example)
graph = self.multi_agent_graph_builder.compile(checkpointer=checkpointer)
# Stream the request and yield responses
async for item in graph.astream(
initial_state,
config={"configurable": {"thread_id": thread_id,"user_id":request.user_id}},
context=runtime_context,
subgraphs=True,
stream_mode=["messages","custom"]
):
event_id = str(uuid.uuid4())[:8] # Generate one UUID per event for internal tracking
# Handle different types of LangGraph events
if isinstance(item, tuple) and len(item) == 3:
node_name, event_type, event_data = item
# Extract agent name from node
agent_name = str(node_name[0]) if node_name else 'unknown'
if event_type == 'custom':
// not related