import asyncio
from asyncio import Lock
from typing import Dict
from aiohttp import web
import socketio
from dotenv import load_dotenv
from langchain.agents import create_agent
from langchain_openai import ChatOpenAI
is_loaded = load_dotenv()
if not is_loaded:
print("Can't load .env file")
exit(0)
sio = socketio.AsyncServer(cors_allowed_origins='*')
app = web.Application()
sio.attach(app)
processing_locks: Dict[str, Lock] = {}
model = ChatOpenAI(
model="gpt-4o-mini",
temperature=0.1,
max_tokens=1000,
timeout=30
)
agent = create_agent(
model=model,
# system_prompt="You are a helpful assistant. Be concise and accurate."
system_prompt="You are a chat bot."
)
@sio.event
def connect(sid, environ):
print("connect ", sid)
processing_locks[sid] = Lock()
@sio.event
def disconnect(sid):
print('disconnect ', sid)
if sid in processing_locks:
del processing_locks[sid]
print(f"🗑️ Cleaned up lock for {sid}")
@sio.on('chat_message')
async def handle_chat_message(sid, data):
user = data.get('user', 'Unknown')
message = data.get('message', '')
print(f"đź’¬ Message from {user} ({sid}): {message}")
if processing_locks[sid].locked():
print(f"⚠️ Lock held for {sid}. Skipping message to prevent concurrency.")
await sio.emit(
'chat_message',
{'user': 'System',
'message': 'The Assistant is currently busy processing a previous request. Please wait.'},
room=sid
)
return
async with processing_locks[sid]:
try:
await sio.emit('processing_status', {'status': 'processing'}, room=sid)
# await asyncio.sleep(5)
config = {"configurable": {"session_id": sid}}
agent_response = await asyncio.to_thread(
agent.invoke,
{"input": message},
config=config
)
response_text = agent_response.get('messages', [])[0].content
assistant_user = "Assistant"
print(f"🤖 Assistant response to {user} ({sid}): {response_text}")
await sio.emit(
'chat_message',
{'user': assistant_user, 'message': response_text},
room=sid
)
except Exception as e:
error_message = f"Error processing message with agent: {e}"
print(f"🚨 {error_message}")
await sio.emit(
'chat_message',
{'user': 'System Error', 'message': 'Sorry, the AI agent is offline or encountered an error.'},
room=sid
)
finally:
await sio.emit('processing_status', {'status': 'idle'}, room=sid)
if __name__ == '__main__':
web.run_app(app, host='0.0.0.0', port=8080)
I want to keep the conversation for three reasons:
-
To monitor usage for business purposes.
-
To record all messages exchanged.
-
To save experience for the future — to track what the user’s problem was and how it was resolved.
I don’t understand whether I should use LangGraph or just use it raw.
In the new version, it only has:
from langchain.agents
from langchain.tools
from langchain.messages
from langchain.chat_models
from langchain.embeddings
from langchain.rate_limiters
And it doesn’t have ConversationBufferMemory, and I couldn’t find a good example in the documentation.
If I want a regular chat where messages are stored in a structure like [{}] properly, so that I can later retrieve or track them, or eventually connect them to MongoDB, how should I do it?
I used to use the official OpenAI library and it was straightforward — I could just put the messages in a list and store the same messages in the database.
But with LangChain, I got confused!
Also, there are no examples on GitHub, or at least I couldn’t find any that are compatible with the latest changes.