Is there a proper way to define graphs/nodes whether async/sync depending on the provider? AWS Bedrock does not support async but I would like to define on graph irrespective of the model provider. Is this possible or is anyone else dealing with this?
LangGraph can handle mixed sync/async providers in the same graph. Define all nodes as async def (the safe default) and use ainvoke() LangGraph will automatically handle the differences. However, be aware that Bedrock doesn’t support async natively, so llm.ainvoke() uses LangChain’s thread executor which can block under high load and has different error handling compared to truly async providers like OpenAI. The consistent async interface works across providers, but Bedrock calls will still block the event loop internally due to the thread wrapper.
1 Like
This is all I had to do to get it to work with my current setup
def _create_bedrock_model_sync(self, temperature: float):
"""Synchronous ChatBedrock creation - will be wrapped in asyncio.to_thread"""
# Create a properly configured boto3 client first to ensure region is set correctly
try:
import boto3
session = boto3.Session()
client_kwargs = {
'service_name': 'bedrock-runtime',
'region_name': self.region
}
if self.base_url:
client_kwargs['endpoint_url'] = self.base_url
if self.api_key:
client_kwargs['aws_access_key_id'] = self.api_key
bedrock_client = session.client(**client_kwargs)
except Exception as e:
logger.error(f"Failed to create boto3 client: {e}")
bedrock_client = None
# Set up ChatBedrock kwargs
kwargs = {
"model_id": self.model_id,
"beta_use_converse_api": True,
"provider": "anthropic",
"model_kwargs": {
"temperature": temperature,
"max_tokens_to_sample": 4096,
"anthropic_version": "bedrock-2023-05-31"
}
}
# Use explicit client if available, otherwise fall back to region_name
if bedrock_client:
kwargs["client"] = bedrock_client
else:
kwargs["region_name"] = self.region
if self.base_url:
kwargs["endpoint_url"] = self.base_url
if self.api_key:
kwargs["aws_access_key_id"] = self.api_key
try:
from langchain_aws import ChatBedrock
return ChatBedrock(**kwargs)
except Exception as e:
logger.error(f"Failed to create ChatBedrock: {e}")
raise
async def _get_model(self, temperature: float):
"""Async wrapper for ChatBedrock creation to avoid blocking the event loop"""
import asyncio
# Check cache first to avoid recreating models
cache_key = f"{self.model_id}_{temperature}_{self.region}"
if cache_key in self._model_cache:
return self._model_cache[cache_key]
# Run the synchronous ChatBedrock creation in a separate thread
model = await asyncio.to_thread(self._create_bedrock_model_sync, temperature)
how would failures be handled for such async threads?