hi @Bitcot_Kaushal
As a reference, here is the simple code structure I’m using:
- Guardrail Subgraph
class GuardrailInput(TypedDict):
question: str
class GuardrailOutput(TypedDict):
output: str
@observe()
async def guardrail(state: GuardrailInput) -> GuardrailInput:
# Write to OverallState
model = ChatOpenAI(name=settings.local_chat.large_model, api_key=settings.local_chat.large_api_key, base_url=settings.local_chat.large_api_base)
result = await model.ainvoke(f"Return false or true wether this user query is negative or not {state.get('question')}")
output = result.content
return {"output": output}
guardrail_graph = StateGraph(GuardrailInput, output_schema=GuardrailOutput)
guardrail_graph.add_node("guardrail", guardrail)
guardrail_graph.add_edge(START, "guardrail")
guardrail_graph.add_edge("guardrail", END)
guardrail_graph = guardrail_graph.compile()
- Short Answer Subgraph
class shortInput(TypedDict):
question: str
class shortOutput(TypedDict):
output: str
@observe()
async def short(state: shortInput) -> shortInput:
model = ChatOpenAI(name=settings.local_chat.large_model, api_key=settings.local_chat.large_api_key, base_url=settings.local_chat.large_api_base)
# Write to OverallState
result = await model.ainvoke(f"answer this user query in 30 sentence long: {state.get('question')}")
output = result.content
return {"output": output}
short_graph = StateGraph(shortInput, output_schema=shortOutput)
short_graph.add_node("short", short)
short_graph.add_edge(START, "short")
short_graph.add_edge("short", END)
short_graph = short_graph.compile()
- Main Graph that calling Subgraph
class MainInput(TypedDict):
question: str
guardrail: str
short: str
class MainOutput(TypedDict):
guardrail: str
short: str
@observe()
async def guardrail(state: MainInput) -> MainOutput:
# Write to OverallState
result = await guardrail_graph.ainvoke({'question': state.get('question')})
return {"guardrail": result}
@observe()
async def short(state: MainInput) -> MainOutput:
# Write to OverallState
result = await short_graph.ainvoke({'question': state.get('question')})
return {"short": result}
@observe()
async def orchestrator(state: MainInput) -> MainOutput:
# Write to OverallState
print(state)
return {"state": state}
main_graph = StateGraph(MainInput, output_schema=MainOutput)
main_graph.add_node("guardrail", guardrail)
main_graph.add_node("short", short)
main_graph.add_node("orchestrator", orchestrator)
main_graph.add_edge(START, "short")
main_graph.add_edge(START, "guardrail")
main_graph.add_edge('short', 'orchestrator')
main_graph.add_edge('guardrail', "orchestrator")
main_graph.add_edge("orchestrator", END)
main_graph = main_graph.compile()
This is how i start my code
@observe()
async def main():
result = await main_graph.ainvoke({"question": 'hi'})
return result
result = await main()
This is the running result
I’m seeing a significant latency gap in my short node within this LangGraph implementation. I’ve confirmed that Langfuse is not the cause, and I am already using asynchronous ainvoke throughout the process.
-
Orchestration Overhead: Is this delay an inherent behavior of nesting StateGraph objects, or is there a bottleneck in how I’ve structured these parallel transitions to the orchestrator?
-
Architecture Feedback: Do you see any issues with how MainInput and MainOutput are managing the merged state that could be causing a processing lag?