How to handle AWS Bedrock and Open AI model providers with the same graph

Is there a proper way to define graphs/nodes whether async/sync depending on the provider? AWS Bedrock does not support async but I would like to define on graph irrespective of the model provider. Is this possible or is anyone else dealing with this?

LangGraph can handle mixed sync/async providers in the same graph. Define all nodes as async def (the safe default) and use ainvoke() LangGraph will automatically handle the differences. However, be aware that Bedrock doesn’t support async natively, so llm.ainvoke() uses LangChain’s thread executor which can block under high load and has different error handling compared to truly async providers like OpenAI. The consistent async interface works across providers, but Bedrock calls will still block the event loop internally due to the thread wrapper.

1 Like

This is all I had to do to get it to work with my current setup

    def _create_bedrock_model_sync(self, temperature: float):
        """Synchronous ChatBedrock creation - will be wrapped in asyncio.to_thread"""
        # Create a properly configured boto3 client first to ensure region is set correctly
        try:
            import boto3
            session = boto3.Session()
            client_kwargs = {
                'service_name': 'bedrock-runtime',
                'region_name': self.region
            }
            
            if self.base_url:
                client_kwargs['endpoint_url'] = self.base_url
                
            if self.api_key:
                client_kwargs['aws_access_key_id'] = self.api_key
            
            bedrock_client = session.client(**client_kwargs)
            
        except Exception as e:
            logger.error(f"Failed to create boto3 client: {e}")
            bedrock_client = None

        # Set up ChatBedrock kwargs
        kwargs = {
            "model_id": self.model_id,
            "beta_use_converse_api": True,
            "provider": "anthropic",
            "model_kwargs": {
                "temperature": temperature,
                "max_tokens_to_sample": 4096,
                "anthropic_version": "bedrock-2023-05-31"
            }
        }
        
        # Use explicit client if available, otherwise fall back to region_name
        if bedrock_client:
            kwargs["client"] = bedrock_client
        else:
            kwargs["region_name"] = self.region
            if self.base_url:
                kwargs["endpoint_url"] = self.base_url
            if self.api_key:
                kwargs["aws_access_key_id"] = self.api_key
                
        try:
            from langchain_aws import ChatBedrock
            return ChatBedrock(**kwargs)
            
        except Exception as e:
            logger.error(f"Failed to create ChatBedrock: {e}")
            raise
    
    async def _get_model(self, temperature: float):
        """Async wrapper for ChatBedrock creation to avoid blocking the event loop"""
        import asyncio
        
        # Check cache first to avoid recreating models
        cache_key = f"{self.model_id}_{temperature}_{self.region}"
        if cache_key in self._model_cache:
            return self._model_cache[cache_key]
        
        # Run the synchronous ChatBedrock creation in a separate thread
        model = await asyncio.to_thread(self._create_bedrock_model_sync, temperature)
        

how would failures be handled for such async threads?