Harmony Response Format sometimes outputted when using gpt-oss-120b as an Agent

Hello everyone,

I have been working on creating a RAG agent using create_agent()and the gpt-oss-120bmodel hosted on Azure AI Foundry, and I am encountering a very weird issue which I am unable to solve after weeks of trying.

Basically, when you ask the agent questions which require just a couple of tool calls, everything works fine. However, if you ask the agent a question which would require around 5+ tool calls, at some point the agent outputs a raw tool call in the OpenAI Harmony Response Format, which is then treated as the final answer and ends the agent graph prematurely. I am really confused why this happens, because the previous tool calls in this same agent invocation were all parsed fine and the tools were called successfully; this issue just randomly happens after many tool calls within one agent invocation.

Here is an example snippet of the issue:

================================ Human Message =================================

What updates have there been since v8.0?


================================== Ai Message ==================================
Tool Calls:
  retrieve_from_manual (call_5b4617e1c3c34e69a510dbe9)
 Call ID: call_5b4617e1c3c34e69a510dbe9
  Args:
    query: release notes 8.5

================================= Tool Message =================================
Name: retrieve_from_manual

Here are the relevant passages from the user manual (only use them if they are relevant to the user's question):

... (output truncated for simplicity)


================================== Ai Message ==================================
Tool Calls:
  retrieve_from_manual (call_5b4617e1c3c34e69a510dbe9)
 Call ID: call_5b4617e1c3c34e69a510dbe9
  Args:
    query: release notes 9.0



...



================================== Ai Message ==================================

<|start|>assistant<|channel|>analysis to=functions.retrieve_from_manual <|constrain|>json<|message|>{"query": "release notes 10.0"}<|call|>

I have been looking online to try find anyone else encountering this same issue, but couldn’t find anything at all with this same behavior. I am assuming that this is a LangChain issue and not an issue with the model itself.

Here is my code, which was more complex as it contained memory management, more middleware, etc., but I simplified it to the basic elements of creating the agent to try find the issue, but it still occurs anyways:

Package versions:

langchain==1.1.3
langchain-azure-ai==1.0.4
langchain-classic==1.0.0
langchain-community==0.4.1
langchain-core==1.1.3
langchain-openai==1.1.1
langchain-text-splitters==1.0.0
langgraph==1.0.4
langgraph-checkpoint==2.1.1
langgraph-prebuilt==1.0.5
langgraph-sdk==0.2.9
qdrant-client==1.15.1
sentence-transformers==5.1.0

Code:

import os
import re
from pathlib import Path
from dotenv import load_dotenv
from dataclasses import dataclass
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.checkpoint.serde.jsonplus import JsonPlusSerializer
from langchain.tools import tool, ToolRuntime
from langchain_core.messages import HumanMessage
from langchain.agents import create_agent
from langchain.agents.middleware import dynamic_prompt, ModelRequest
from qdrant_client import QdrantClient
from fastembed import SparseTextEmbedding 
from sentence_transformers import SentenceTransformer
from qdrant_client import models
from qdrant_client.hybrid.fusion import distribution_based_score_fusion
from langchain_azure_ai.chat_models import AzureAIChatCompletionsModel


# Loading environment variables
load_dotenv()

# Initialize LLM for generation
llm = AzureAIChatCompletionsModel(
	endpoint=os.getenv("AZURE_LLM_ENDPOINT"),
	credential=os.getenv("AZURE_LLM_KEY"),
	model="gpt-oss-120b",
	max_tokens=4096,
	temperature=0.0,
	verbose=False,
	client_kwargs={"logging_enable": False},
	model_kwargs={"reasoning_effort": "high"},  # Only keep this uncommented for gpt-oss-120b
)

# Initialize vector DB
qdrant_client = QdrantClient(path="data/qdrant_vectordb")

# Initialize dense and sparse embedding models
dense_embedding_model = SentenceTransformer("Alibaba-NLP/gte-modernbert-base")
sparse_embedding_model = SparseTextEmbedding("Qdrant/bm25")

@dataclass
class Context:
	user_name: str
	language: str
	version: str

# --------------------------------------------------------------------------------------------------------------------------
# Tool definitions
# --------------------------------------------------------------------------------------------------------------------------

@tool
def retrieve_from_manual(
	query: str, 
	runtime: ToolRuntime[Context],
) -> str:
	"""
	Use this tool for searching the user manual to retrieve the passages relevant to the given query.

	Args:
		query: a short query string describing what you are searching for

	Returns:
		str: the relevant user manual passages
	"""
	if query == "":
		# No query -> no retrieval
		return "No query given."

	# Dense retrieval
	dense_results = qdrant_client.query_points(
		collection_name="user_manual_collection",
		query=dense_embedding_model.encode_query(query, normalize_embeddings=True),
		using="dense",
		limit=3000,
		with_payload=True,
	)
	dense_results = dense_results.points

	# Copy of IDs and scores since `dense_results` get updated after dbsf + score conversion
	dense_ids_and_scores = [(doc.id, 1 - (2 - (doc.score*2))) for doc in dense_results[:101]] 

	# Sparse retireval
	sparse_results = qdrant_client.query_points(
		collection_name="user_manual_collection",
		query=models.SparseVector(**next(sparse_embedding_model.embed(query)).as_object()),
		using="sparse",
		limit=3000,
		with_payload=True,
	)
	sparse_results = sparse_results.points

	dense_and_sparse_results = [dense_results, sparse_results]

	# Hybrid search fusion
	hybrid_dbsf_results = distribution_based_score_fusion(dense_and_sparse_results, limit=3)

	# Store retrieved docs as (doc, dense_score)
	retrieved_docs = []
	for doc in hybrid_dbsf_results:
		for dense_id, dense_score in dense_ids_and_scores:
			if doc.id == dense_id:
				retrieved_docs.append((doc, dense_score))

	context = "Here are the relevant passages from the user manual (only use them if they are relevant to the user's question): \n\n"
	for i, (doc, score) in enumerate(retrieved_docs):
		title = doc.payload["metadata"].get("title", "Title N/A")  # Get title for logging
		url = doc.payload["metadata"].get("url", "URL N/A")
		page_id = doc.payload["metadata"].get("page_id", "Page ID N/A")
		is_full_page = doc.payload["metadata"].get("is_full_page", "is_full_page N/A")
		if isinstance(url, str) and runtime.context.language != "ar": 
			url = url.replace("/en/", f"/{runtime.context.language}/", 1)
		page_content = doc.payload["page_content"]

		context += "-"*100 + "\n"
		context += f"Passage {i+1}:\nPage Title: {title}\nPage URL: {url}\nPage ID: {page_id}\nis_full_page: {is_full_page}\nRelevance score: {score}\n"
		context += page_content + "\n\n\n"

	return context


def _parse_frontmatter(text: str) -> tuple[dict[str, str], str]:
	"""
	Parse pseudo-YAML frontmatter delimited by lines that equal '-_-_-_-'.
	Returns (metadata_dict, body_without_frontmatter).
	"""
	# Match:
	# - start of file
	# - delimiter line
	# - any content (frontmatter)
	# - delimiter line
	# - rest is body
	pat = re.compile(r"^\s*-_-_-_-\s*\n(.*?)\n\s*-_-_-_-\s*\n?", re.DOTALL)
	m = pat.match(text)
	if not m:
		return {}, text  # no frontmatter, return as-is

	raw_meta = m.group(1)
	body = text[m.end():]

	meta: dict[str, str] = {}
	for line in raw_meta.splitlines():
		line = line.strip()
		if not line or line.startswith("#"):
			continue
		# Basic "key: value" parse
		if ":" not in line:
			continue
		k, v = line.split(":", 1)
		k = k.strip()
		v = v.strip()
		# strip quotes if present
		if len(v) >= 2 and ((v[0] == v[-1] == "'") or (v[0] == v[-1] == '"')):
			v = v[1:-1].strip()
		# normalize null-ish
		if v.lower() in {"null", "none"}:
			v = ""
		meta[k] = v
	return meta, body


@tool
def get_full_page(page_id: str, runtime: ToolRuntime[Context],):
	"""
	Use this tool to return the full content of a specifc page given the page ID. This can be used if the 
	retrieved passages from the `retrieve_from_manual` tool contain a passage from a relevant page but are 
	missing additional relevant content typically found in a different passage in the same page.

	Args:
		page_id (str): the page ID of the relevant page.
	
	Returns:
		str: the full content of the specified page, or a helpful message if not found.
	"""
	markdown_dir = Path("data/processed_data")
	max_body_chars = 125000  # safety character limit for very large files (applied to content body only)

	# Sanitize page_id to prevent path traversal / weird chars
	safe_id = re.sub(r"[^0-9A-Za-z_-]", "", str(page_id).strip())
	if not safe_id:
		return "No page_id provided (or it contained only invalid characters)."

	try:
		# 1) Exact prefix match: {page_id}_*.md
		prefix_matches = sorted(
			markdown_dir.rglob(f"{safe_id}_*.md"),
			key=lambda p: len(p.name)
		)
		candidates = prefix_matches

		# 2) Fallback: fuzzy match anywhere in the name
		if not candidates:
			fuzzy_matches = sorted(
				markdown_dir.rglob(f"*{safe_id}*.md"),
				key=lambda p: len(p.name)
			)
			candidates = fuzzy_matches

		if not candidates:
			return f"No Markdown file found for page_id '{safe_id}'."

		# Prefer shortest filename on the assumption it's the canonical page file
		target = candidates[0]

		# Async file read via thread (keeps event loop responsive)
		raw_text = target.read_text

		meta, body = _parse_frontmatter(raw_text)

		# Truncate BODY only (keep header intact)
		truncated = False
		if len(body) > max_body_chars:
			body = body[:max_body_chars] + "\n\n…(truncated)…"
			truncated = True


		# Assemble the final output format
		header_lines = [
			"Here is the full content of the page:",
			"",
			f"Page ID: {meta.get('page_id', safe_id)}",
			f"Page URL: {meta.get('url', 'N/A')}",
			f"Module: {meta.get('module', 'N/A')}",
			"",
		]
		full_page = "\n".join(header_lines) + body
		return full_page

	except Exception as e:
		return f"Failed to load page '{safe_id}': {e}"

# --------------------------------------------------------------------------------------------------------------------------
# Dynamic system prompt definition
# --------------------------------------------------------------------------------------------------------------------------

@dynamic_prompt
def dynamic_context_system_prompt(request: ModelRequest) -> str:
	"""Create the Agent's system prompt using dynamic context via @dynamic_prompt wrapper"""
	version = request.runtime.context.version 

	system_prompt = f"""
(*Big system prompt describing behavior and tool use*)
"""
	return system_prompt


# --------------------------------------------------------------------------------------------------------------------------
# Compiling LangChain Agent
# --------------------------------------------------------------------------------------------------------------------------

def build_agent():
	"""Compile the LangChain Agent and set the global `rag_agent`"""
	rag_agent = create_agent(
		model=llm,
		tools=[
			retrieve_from_manual,
			get_full_page,
		],
		middleware=[
			dynamic_context_system_prompt,
		],
		context_schema=Context,
		checkpointer=InMemorySaver(serde=JsonPlusSerializer(pickle_fallback=True)),
	)
	
	return rag_agent


if __name__ == "__main__":
	rag_agent = build_agent()

	# Runtime context for the agent
	context = {
		"user_name": "Adam", 
		"language": "en",
		"version": "11.0",
	}

	thread_id = 123

	print("\nStarting interactive demo. Type 'quit' to exit.\n")

	while True:
		user_input = input("\nUser: ").strip()
		if user_input.lower() in ("quit", "exit"):
			print("Exiting demo.")
			break

		user_msg = HumanMessage(content=user_input)
		state = {"messages": [user_msg]}

		print("\n--- STREAM OUTPUT ---")
		prev_len = 0
		for step in rag_agent.stream(input=state, config={"configurable": {"thread_id": thread_id}, "recursion_limit": 75}, context=context, stream_mode="values"):
			### Values stream_mode ###
			messages = step["messages"]

			# Only print when a new message was appended
			if len(messages) <= prev_len:
				continue
			prev_len = len(messages)

			last_message = messages[-1]
			last_message.pretty_print()

I don’t think this is an issue with the printing in the terminal, because I also have a version which is integrated in a FastAPI backend and a TypeScript frontend, and the same issue happens there.

If anyone is able to help that would be much appreciated!

Hello! I faced same problem with same conditions, i even had to move on custom graphs with structured output instead of using agent with tools. I hope this problem will be fixed one day or at least it will be figured out what’s wrong.

2 Likes

Hey Roman, thanks a lot for sharing! Felt like I was the only one facing this issue since I couldn’t find anything about it online, so it’s good to know I’m not alone with this issue. How exactly did you do it with structured output? Did you basically do some Harmony Format parsing there or something simpler?

Hopefully they do indeed fix this bug soon!

1 Like

I totally changed my approach and made step out of using create_agent. Instead, i created simple graph on langgraph and passed instructions and possible so called “tools” inside prompt message, not inside “tools” key. So, here is how it looked like before (i’ll show it to you on basic example from langchain docs) :

@tool
def search(query: str) -> str:
    """Search for information."""
    return f"Results for: {query}"

@tool
def get_weather(location: str) -> str:
    """Get weather information for a location."""
    return f"Weather in {location}: Sunny, 72°F"

agent = create_agent(model, tools=[search, get_weather])

Here langchain takes care of transform it to proper JSON format for request to API, then passing it to LLM API, then LLM API parses it, processes, do some magic under the hood and return some response to you. Langchain after takes response, parses it, blah blah blah.

LLM decides does it need to call tool or not relying on message history content and tools array attached to last message. That’s a basic LLM (OpenAI compatible particularly) behaviour.

However, while using gpt-oss we are facing some problems, because gpt-oss can’t process tools call properly somehow. So i decided to imitate this behaviour. This approach is more complex and not so elegant comparing to basic create_agent using, but… We have what we have :slight_smile:

Here is a code, i hope you will get the main idea:

from __future__ import annotations

import operator
from dataclasses import dataclass, field
from typing import Any, Optional, Literal, cast, Annotated

from langgraph.graph import StateGraph, START, END
from langgraph.types import Command

from pydantic import BaseModel, Field
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage, AnyMessage

# âś… Your LLM (same pattern as in your example)
from agent.models import llm


# -----------------------------
# "Tools" as plain Python functions
# -----------------------------
def search(query: str) -> str:
    """Search for information."""
    return f"Results for: {query}"


def get_weather(location: str) -> str:
    """Get weather information for a location."""
    return f"Weather in {location}: Sunny, 72°F"


# -----------------------------
# Structured outputs (Pydantic)
# -----------------------------
class ToolCall(BaseModel):
    """A unified schema describing a single tool invocation."""
    tool: Literal["search", "get_weather"] = Field(description="Which tool to call")
    args: dict[str, Any] = Field(description="Arguments for the tool")


class RouterOutput(BaseModel):
    """
    LLM routing decision:
    - either answer the user directly,
    - or request a tool call, then we continue the graph.
    """
    to: Literal["answer", "call_tool"] = Field(description="Next node: direct answer or tool call")
    response: str = Field(description="Text to the user or a tool-call instruction")
    tool_call: Optional[ToolCall] = Field(
        default=None,
        description="If to='call_tool' — the tool call payload (tool + args)",
    )


SYSTEM_PROMPT = f"""
You are an assistant that can:
1) Answer the user directly if no tool is needed.
2) If an external action is needed, choose EXACTLY ONE tool call from this list:
   - search(query: str)
   - get_weather(location: str)

Rules:
- If the user asks to find information, use 'search'.
- If the user asks about weather in a place, use 'get_weather'.
- If the request does not require tools, answer directly (to="answer").
- If you choose a tool, return to="call_tool" and fill tool_call (tool + args).
- Always return JSON strictly following this schema:
{RouterOutput.model_json_schema()}

Never return an empty response.
"""


# -----------------------------
# Graph state
# -----------------------------
@dataclass
class AgentState:
    """
    Minimal state:
    - conversation messages (aggregated by operator.add)
    - last_llm_response: what the router LLM returned in 'response'
    - last_tool_result: what the tool returned (string)
    """
    last_llm_response: str = ""
    last_tool_result: str = ""
    messages: Annotated[list[AnyMessage], operator.add] = field(default_factory=list)

    # Optional: dict-like helpers (matches the style from your sample)
    def __getitem__(self, key: str):
        return getattr(self, key)

    def __setitem__(self, key: str, value):
        setattr(self, key, value)

    def __contains__(self, key: str) -> bool:
        return hasattr(self, key)

    def get(self, key: str, default: Optional[Any] = None) -> Any:
        return getattr(self, key, default)


# -----------------------------
# Nodes
# -----------------------------
async def call_llm_node(state: AgentState) -> Command:
    """
    1) Ask the LLM for a structured routing decision (RouterOutput).
    2) Route via Command(goto=...).
    3) Persist router response + tool_call (if any) inside state.messages (optional)
       and in last_llm_response for later nodes.
    """
    router: RouterOutput = cast(
        RouterOutput,
        await llm.with_structured_output(RouterOutput).ainvoke(
            [
                SystemMessage(SYSTEM_PROMPT),
                *state["messages"],
            ]
        ),
    )

    # Guardrail: if the model routes to tool but forgot the payload — fallback to answer.
    if router.to == "call_tool" and router.tool_call is None:
        return Command(
            goto="answer",
            update={"last_llm_response": router.response},
        )

    # Note: we store the RouterOutput JSON-ish info in state via last_llm_response,
    # and for tool calls we also stash the serialized tool call as an AI message.
    updates: dict[str, Any] = {"last_llm_response": router.response}

    # We can optionally store the tool request in messages for traceability.
    if router.to == "call_tool" and router.tool_call is not None:
        updates["messages"] = [
            AIMessage(
                content=router.response,
                tool_calls=[
                    {
                        "id": "router_requested_tool",
                        "name": router.tool_call.tool,
                        "args": router.tool_call.args,
                    }
                ],
            )
        ]

    return Command(goto=router.to, update=updates)


async def call_tool_node(state: AgentState) -> Command:
    """
    Execute the requested tool NOT via LLM tool-calling, but as a regular graph node.
    We extract the tool call from the last AIMessage tool_calls payload if present.
    """
    # Find the most recent AIMessage that contains tool_calls.
    last_tool_call: Optional[dict[str, Any]] = None
    for msg in reversed(state["messages"]):
        if isinstance(msg, AIMessage) and msg.tool_calls:
            last_tool_call = msg.tool_calls[0]
            break

    if not last_tool_call:
        # If something went wrong, we fallback to a safe answer node.
        return Command(
            goto="answer_after_tool",
            update={"last_tool_result": "No tool call was found in the message history."},
        )

    tool_name = cast(str, last_tool_call.get("name"))
    tool_args = cast(dict[str, Any], last_tool_call.get("args", {}))

    # Dispatch to actual functions
    if tool_name == "search":
        query = cast(str, tool_args.get("query", ""))
        result = search(query=query)
    elif tool_name == "get_weather":
        location = cast(str, tool_args.get("location", ""))
        result = get_weather(location=location)
    else:
        result = f"Unknown tool: {tool_name}"

    return Command(
        goto="answer_after_tool",
        update={"last_tool_result": result},
    )


async def answer_node(state: AgentState) -> dict:
    """Direct answer to the user (no tool needed)."""
    return {
        "messages": [AIMessage(state["last_llm_response"])],
    }


async def answer_after_tool_node(state: AgentState) -> dict:
    """
    Final response after tool execution.
    Here we let the LLM turn the tool result into a user-facing answer with structured output.
    """
    class FinalAnswer(BaseModel):
        response: str = Field(description="Final user-facing response")

    prompt = HumanMessage(
        "Write a final answer for the user using the tool result.\n"
        f"Tool result: {state['last_tool_result']}\n"
        f"Return JSON following this schema: {FinalAnswer.model_json_schema()}"
    )

    final: FinalAnswer = cast(
        FinalAnswer,
        await llm.with_structured_output(FinalAnswer).ainvoke([prompt]),
    )

    # Optional: attach a "tool result" tool_call to mimic tool-call traces.
    return {
        "messages": [
            AIMessage(
                content=final.response,
                tool_calls=[
                    {
                        "id": "tool_result",
                        "name": "tool_result",
                        "args": {"result": state["last_tool_result"]},
                    }
                ],
            )
        ]
    }


# -----------------------------
# Build the graph
# -----------------------------
graph_builder = StateGraph(AgentState)

graph_builder.add_node("call_llm", call_llm_node)
graph_builder.add_node("call_tool", call_tool_node)
graph_builder.add_node("answer", answer_node)
graph_builder.add_node("answer_after_tool", answer_after_tool_node)

graph_builder.add_edge(START, "call_llm")
graph_builder.add_edge("answer", END)
graph_builder.add_edge("call_tool", "answer_after_tool")
graph_builder.add_edge("answer_after_tool", END)

agent_graph = graph_builder.compile()

The main idea is that we tell LLM in message itself what kind of structure we want back, what tools it can use, what structure it has and so on. After LLM makes a desicion, it send us back the response with requested format. We parse it, see if it’s tool call or not, then go to certain graph node.

This approach can (and must) be improved and to be made more generic, however for now it solved my (our) problem for me and works well.

I hope it’s gonna help you!

P. S. To be honest, my second big chunk of code above is a compilation of first chunk and my existed code from my project, passed through ChatGPT with prompt something like “make this code working the next way”. But i looked through - it looks legit :slight_smile:

1 Like

Hi Roman,

Thanks a lot for sharing the code for your workaround! In the meantime, I also figured out what the issue is and managed to find a workaround which actually allows me to still use create_agent. The issue is that the gpt-oss-120b model sometimes decides to output a tool call in the analysischannel instead of the commentarychannel, which it is not supposed to do. The analysischannel is typically where the model’s CoT reasoning is outputted, but also the where the calls for the built-in tools of the gpt-oss-120b model such as the web search tool are outputted. All other tool calls are supposed to be outputted in the commentarychannel.

I believe that LangChain’s model wrappers do not currently handle the calls for built-in tools of OpenAI models correctly, which is why the raw Harmony Response Format is outputted and stops the run. What I did to fix this issue for me was to make a custom version of the AzureAIChatCompletionsModelwrapper which handles this case and creates a tool call out of the output (and also include the model’s CoT reasoning so that you can access them). Here is my custom wrapper:

"""
Custom Azure AI Inference Chat Models API for the gpt-oss-120b model.

This is a copy of the existing langchain_azure_ai.chat_models.AzureAIChatCompletionsModel
with the following changes:
- Including the gpt-oss-120b model's reasoning content in the additional_kwagrs of 
any AIMessage, AIMessageChunk, or inference message, where there are a few line changes/additions highlighted 
using a comment that says: # ADDED REASONING CONTENT FROM GPT-OSS-120B

- A fallback for tool calling using the gpt-oss-120b model, as sometimes the model outputs a response with the
raw Harmony Response Format during tool calling, where the model calls that tool in the analysis channel instead 
of the commentary channel. This case is not handled by the Azure or LangChain Harmony parser, so we add the helper
function `_extract_harmony_tool_call` and a small code block in `from_inference_message` to handle this.
"""

from .logging_utils import log_with_ctx

import re
import uuid
import json
import logging
from operator import itemgetter
from typing import (
    Any,
    AsyncIterator,
    Callable,
    Dict,
    Iterable,
    Iterator,
    List,
    Literal,
    Optional,
    Sequence,
    Type,
    Union,
    cast,
)

from azure.ai.inference import ChatCompletionsClient
from azure.ai.inference.aio import ChatCompletionsClient as ChatCompletionsClientAsync
from azure.ai.inference.models import (
    ChatCompletions,
    ChatRequestMessage,
    ChatResponseMessage,
    JsonSchemaFormat,
    StreamingChatCompletionsUpdate,
)
from azure.core.credentials import AzureKeyCredential
from azure.core.exceptions import HttpResponseError
from langchain_core.callbacks import (
    AsyncCallbackManagerForLLMRun,
    CallbackManagerForLLMRun,
)
from langchain_core.language_models import LanguageModelInput
from langchain_core.language_models.chat_models import BaseChatModel, ChatGeneration
from langchain_core.messages import (
    AIMessage,
    AIMessageChunk,
    BaseMessage,
    BaseMessageChunk,
    ChatMessage,
    ChatMessageChunk,
    FunctionMessageChunk,
    HumanMessage,
    HumanMessageChunk,
    InvalidToolCall,
    SystemMessage,
    SystemMessageChunk,
    ToolCall,
    ToolCallChunk,
    ToolMessage,
    ToolMessageChunk,
)
from langchain_core.messages.tool import tool_call_chunk
from langchain_core.messages.tool import tool_call as create_tool_call
from langchain_core.output_parsers import JsonOutputParser, PydanticOutputParser
from langchain_core.output_parsers.openai_tools import (
    make_invalid_tool_call,
    parse_tool_call,
)
from langchain_core.outputs import ChatGenerationChunk, ChatResult
from langchain_core.runnables import Runnable, RunnableMap, RunnablePassthrough
from langchain_core.tools import BaseTool
from langchain_core.utils.function_calling import convert_to_openai_tool
from langchain_core.utils.pydantic import is_basemodel_subclass
from pydantic import BaseModel, Field, PrivateAttr, model_validator

from langchain_azure_ai._api.base import experimental
from langchain_azure_ai._resources import ModelInferenceService

logger = logging.getLogger(__name__)


def _extract_harmony_tool_call(text: str):
    """
    Returns (tool_name, args_dict) if text looks like a Harmony tool call, else None.
    Designed to be tolerant of extra tokens around it.
    """
    HARMONY_PREFIX = "to=functions."
    MSG_MARK = "<|message|>"
    CALL_MARK = "<|call|>"

    if "to=functions." not in text or MSG_MARK not in text or CALL_MARK not in text:
        return None

    # tool name: take substring after "to=functions." up to whitespace or '<'
    start = text.find(HARMONY_PREFIX)
    if start == -1:
        return None
    start += len(HARMONY_PREFIX)

    end = len(text)
    for sep in [" ", "<", "\n", "\r", "\t"]:
        i = text.find(sep, start)
        if i != -1:
            end = min(end, i)
    tool_name = text[start:end].strip()
    if not tool_name:
        return None

    # args: between <|message|> and <|call|>
    msg_i = text.find(MSG_MARK)
    call_i = text.rfind(CALL_MARK)
    if msg_i == -1 or call_i == -1 or call_i <= msg_i:
        return None

    raw_args = text[msg_i + len(MSG_MARK):call_i].strip()
    try:
        args = json.loads(raw_args) if raw_args else {}
    except json.JSONDecodeError:
        return None

    return tool_name, args

def _maybe_convert_harmony_to_toolcall(ai_msg, create_tool_call_fn):
    # ai_msg is a LangChain AIMessage
    if ai_msg.tool_calls:
        return ai_msg

    extracted = _extract_harmony_tool_call(ai_msg.content or "")
    if not extracted:
        return ai_msg

    tool_name, args = extracted
    tool_call_id = str(uuid.uuid4())

    tc = create_tool_call_fn(name=tool_name, args=args, id=tool_call_id)
    # Strip the Harmony blob out of visible content
    return ai_msg.__class__(
        id=ai_msg.id,
        content="",
        additional_kwargs=dict(ai_msg.additional_kwargs),
        tool_calls=[tc],
        invalid_tool_calls=list(getattr(ai_msg, "invalid_tool_calls", [])),
    )


def to_inference_message(
    messages: List[BaseMessage],
) -> List[ChatRequestMessage]:
    """Converts a sequence of `BaseMessage` to `ChatRequestMessage`.

    Args:
        messages (Sequence[BaseMessage]): The messages to convert.

    Returns:
        List[ChatRequestMessage]: The converted messages.
    """
    new_messages = []
    for m in messages:
        message_dict: Dict[str, Any] = {}
        if isinstance(m, ChatMessage):
            message_dict = {
                "role": m.type,
                "content": m.content,
            }
        elif isinstance(m, HumanMessage):
            message_dict = {
                "role": "user",
                "content": m.content,
            }
        elif isinstance(m, AIMessage):
            message_dict = {
                "role": "assistant",
                "content": m.content,
            }
            tool_calls = []
            if m.tool_calls:
                for tool_call in m.tool_calls:
                    tool_calls.append(_format_tool_call_for_azure_inference(tool_call))
            elif "tool_calls" in m.additional_kwargs:
                for tc in m.additional_kwargs["tool_calls"]:
                    chunk = {
                        "function": {
                            "name": tc["function"]["name"],
                            "arguments": tc["function"]["arguments"],
                        }
                    }
                    if _id := tc.get("id"):
                        chunk["id"] = _id
                    tool_calls.append(chunk)
            else:
                pass
            if tool_calls:
                message_dict["tool_calls"] = tool_calls
            if "reasoning_content" in m.additional_kwargs:  # ADDED REASONING CONTENT FROM GPT-OSS-120B
                message_dict["reasoning_content"] = m.additional_kwargs["reasoning_content"]

        elif isinstance(m, SystemMessage):
            message_dict = {
                "role": "system",
                "content": m.content,
            }
        elif isinstance(m, ToolMessage):
            message_dict = {
                "role": "tool",
                "content": m.content,
                "name": m.name,
                "tool_call_id": m.tool_call_id,
            }
        new_messages.append(ChatRequestMessage(message_dict))
    return new_messages


def from_inference_message(message: ChatResponseMessage) -> BaseMessage:
    """Convert an inference message dict to generic message."""
    if message.role == "user":
        return HumanMessage(content=message.content)
    elif message.role == "assistant":
        tool_calls: List[dict[str, Any]] = []
        invalid_tool_calls: List[InvalidToolCall] = []
        additional_kwargs: Dict = {}
        
		# Normal tool call
        if message.tool_calls:
            for tool_call in message.tool_calls:
                try:
                    raw_tool_call = parse_tool_call(tool_call.as_dict(), return_id=True)
                    if raw_tool_call:
                        tool_calls.append(raw_tool_call)
                except json.JSONDecodeError as e:
                    invalid_tool_calls.append(
                        make_invalid_tool_call(tool_call.as_dict(), str(e))
                    )
        
		# Fallback: Harmony tool call returned as raw text in content
        if not tool_calls and not invalid_tool_calls:
            content_text = message.content or ""

            # Match one or more Harmony tool calls in the text
            # - tool name after `to=functions.`
            # - JSON arguments between `<|message|>` and `<|call|>`
            pattern = re.compile(
                r"to=functions\.(?P<name>[A-Za-z0-9_.-]+)\s*.*?<\|message\|>(?P<args>.*?)<\|call\|>",
                re.DOTALL,
            )

            matches = list(pattern.finditer(content_text))
            if matches:
                # If we successfully parse Harmony tool calls, we generally want to
                # hide the raw Harmony blob from the visible assistant content.

                for m in matches:
                    tool_name = m.group("name").strip()
                    raw_args = (m.group("args") or "").strip()
                    tc_id = str(uuid.uuid4())

                    # Build a raw OpenAI-style tool_call dict so we can reuse LangChain helpers
                    raw_tool_call = {
                        "id": tc_id,
                        "type": "function",
                        "function": {"name": tool_name, "arguments": raw_args},
                    }

                    try:
                        args_obj = json.loads(raw_args) if raw_args else {}
                        tool_calls.append(
                            create_tool_call(name=tool_name, args=args_obj, id=tc_id)
                        )
                    except json.JSONDecodeError as e:
                        invalid_tool_calls.append(make_invalid_tool_call(raw_tool_call, str(e)))

                log_with_ctx(logging.WARNING, "llm.harmony_tool_call_fixed", raw_content=content_text)  # Log that a Harmony tool call was fixed

                # If we parsed Harmony tool calls successfully, clear content entirely
                if matches and tool_calls:
                    message.content = ""

        if audio := message.get("audio"):
            additional_kwargs.update(audio=audio)

        reasoning_content = message.get("reasoning_content", "")  # ADDED REASONING CONTENT FROM GPT-OSS-120B
        additional_kwargs["reasoning_content"] = reasoning_content

        return AIMessage(
            id=message.get("id"),
            content=message.content or "",
            additional_kwargs=additional_kwargs,
            tool_calls=tool_calls,
            invalid_tool_calls=invalid_tool_calls,
        )
    elif message.role == "system":
        return SystemMessage(content=message.content)
    elif message.role == "tool":  # Bug fix?
        additional_kwargs = {}
        if tool_name := message.get("name"):
            additional_kwargs["name"] = tool_name
        return ToolMessage(
            content=message.content,
            tool_call_id=cast(str, message.get("tool_call_id")),
            additional_kwargs=additional_kwargs,
            name=tool_name,
            id=message.get("id"),
        )
    else:
        return ChatMessage(content=message.content, role=message.role)


def _convert_streaming_result_to_message_chunk(
    chunk: StreamingChatCompletionsUpdate,
    default_class: Type[BaseMessageChunk],
) -> Iterable[ChatGenerationChunk]:
    token_usage = chunk.get("usage", {})
    for res in chunk["choices"]:
        finish_reason = res.get("finish_reason")
        message = _convert_delta_to_message_chunk(res.delta, default_class)
        if token_usage and isinstance(message, AIMessage):
            message.usage_metadata = {
                "input_tokens": token_usage.get("prompt_tokens", 0),
                "output_tokens": token_usage.get("completion_tokens", 0),
                "total_tokens": token_usage.get("total_tokens", 0),
            }
        gen = ChatGenerationChunk(
            message=message,
            generation_info={"finish_reason": finish_reason},
        )
        yield gen


def _convert_delta_to_message_chunk(
    _dict: Any, default_class: Type[BaseMessageChunk]
) -> BaseMessageChunk:
    """Convert a delta response to a message chunk."""
    id = _dict.get("id", None)
    role = _dict.role
    content = _dict.content or ""
    additional_kwargs: Dict = {}

    reasoning_content = _dict.get("reasoning_content", '')  # ADDED REASONING CONTENT FROM GPT-OSS-120B
    additional_kwargs["reasoning_content"] = reasoning_content

    tool_call_chunks: List[ToolCallChunk] = []
    if raw_tool_calls := _dict.get("tool_calls"):
        try:
            tool_call_chunks = [
                tool_call_chunk(
                    name=rtc["function"].get("name"),
                    args=rtc["function"].get("arguments"),
                    id=rtc.get("id"),
                    index=rtc["index"],
                )
                for rtc in raw_tool_calls
            ]
        except KeyError:
            pass

    if role == "user" or default_class == HumanMessageChunk:
        return HumanMessageChunk(content=content)
    elif role == "assistant" or default_class == AIMessageChunk:
        return AIMessageChunk(
            id=id,
            content=content,
            additional_kwargs=additional_kwargs,
            tool_call_chunks=tool_call_chunks,
        )
    elif role == "system" or default_class == SystemMessageChunk:
        return SystemMessageChunk(content=content)
    elif role == "function" or default_class == FunctionMessageChunk:
        return FunctionMessageChunk(content=content, name=_dict.name)
    elif role == "tool" or default_class == ToolMessageChunk:
        return ToolMessageChunk(
            content=content, tool_call_id=_dict["tool_call_id"], id=id
        )
    elif role or default_class == ChatMessageChunk:
        return ChatMessageChunk(content=content, role=role)
    else:
        return default_class(content=content)  # type: ignore[call-arg]


def _format_tool_call_for_azure_inference(tool_call: ToolCall) -> dict:
    """Format Langchain ToolCall to dict expected by Azure AI Inference."""
    result: Dict[str, Any] = {
        "function": {
            "name": tool_call["name"],
            "arguments": json.dumps(tool_call["args"]),
        },
        "type": "function",
    }
    if _id := tool_call.get("id"):
        result["id"] = _id

    return result


class GptOssAzureAIChatCompletionsModel(BaseChatModel, ModelInferenceService):
    """Azure AI Chat Completions Model.
... (rest of the file is the same as it is)

I assume something similar can be done depending on which LLM provider you use (and therefore which LangChain LLM wrapper you use), as I made this with the help of ChatGPT after explaining the issue to it. I hope this helps you and would allow you to revert back to using create_agent! :slight_smile:

1 Like

You must be more experienced in ChatGPT prompts than me, because i tried so many times to figure out with ChatGPT helping what’s going on and it did not help me at all :rofl:

Thank you so much for sharing your decision, that’s a bit sad that we have to implement so big code workarounds just for a thing that must work properly out of the box :slight_smile:

It looks like much easier to take another model and don’t suffer with this stuff :grin:

Anyway, i hope our decisions will help somebody sometime who faces same problems.

1 Like

Hahaha it was a mix of doing manual research and then asking ChatGPT to analyze some of the related LangChain package files :joy:

Yeah quite annoying that we have to come up with these workarounds. Actually, I saw the release notes for the LangChain 1.2 update, and it mention fixes for some things including built in tools for OpenAI models, which I would assume should contain the fix for this issue, but I am too worried to update my LangChain version in case my workaround stops working :joy: Might be worth a look and a try for anyone else facing this issue and hasn’t done a workaround yet

1 Like

Hi @adam , hi all,

I’m facing the same issue too.

Just wanted to mention that I’m already on version 1.2 and this is not resolved.

2 Likes

Hi @kanios ,

Thank you for sharing this, you saved me the hassle of trying it out and reverting back. Pretty annoying to hear there wasn’t a fix for this; I am assuming the built-in tools of the non open source OpenAI models are the ones that got updated, but not the gpt-oss models.

I really hope someone from LangChain would have a look at this issue soon

Nah nah nah, it’s not an issue of langchain, it is the problem in the gpt-oss model itself. Langchain is the only a wrapper for using it, remember? So i think problem is much deeper on the model layer rather then on langchain part or whatever. As an experiment we should try to reproduce the same problem using any other agentic framework. I bet it’s gonna be the same result.