Why doesn’t add reducer append properly to my state list in Command(update), even when I always pass a list?

I’m using a state variable like this:

workflow_logs: Optional[Annotated[list[str], add]]

At each step in my flow, I return a Command like this:

return Command(
    update={"workflow_logs": ["🔍 Step completed"]},
    goto="next_stage",
)

Or sometimes:

return Command(
    update={"workflow_logs": workflow_logs},
    goto="next_stage",
)

Where workflow_logs is always a list

But when I inspect the state (e.g. in tracing), I always see only the last update, not a cumulative list. It’s as if the add reducer is ignored and the list is overwritten every time.

:white_check_mark: Expected behavior:

I want workflow_logs to grow over time with each new message appended automatically, thanks to the add reducer.

:cross_mark: Actual behavior:

Only the most recent list is in state — all previous logs are lost.

:magnifying_glass_tilted_left: Extra info:

  • The value I pass is always a list ([“something”] or workflow_logs which is a list).

  • If I manage the list manually in Python, it works — but I thought the reducer should handle accumulation for me.

  • Using the add reducer as per docs.

What am I missing here? Is add reducer only meant for single values? Or does it not handle list merges the way I assumed?

Thanks in advance!

This is expected to work. Would you mind pasting a reproducible example?

Here is my code (some examples):

class State(AgentState):

    workflow_logs: Optional[Annotated[list[str], add]]

My one of tools:
@tool
def update_pdf(
    tool_call_id: Annotated[str, InjectedToolCallId],
    populated_form_fields: Annotated[
        list[dict], InjectedState("populated_form_fields")
    ],
    field: str,
    label: str,
    value: str,
    type: str,
):
    """

    """

SOME CODE ......

    log_message = (
        f"Field '{label}' (internal name: '{field}', type: {type}) "
        f"updated with value: '{value}'. Total populated fields: {len(populated_form_fields)}."
    )

    return Command(
        update={
            "populated_form_fields": populated_form_fields,
            "updated_form_field": {},
            "messages": [ToolMessage(message_content, tool_call_id=tool_call_id)],
            "workflow_logs": [log_message],
        },
    )
GRAPH.PY

builder = StateGraph(State, config_schema=CombinedConfiguration)
pdf_processor = PDFProcessor()


async def process_pdf_node(
    state: State,
) -> Command[Literal["rag_agent", "router", "error"]]:
    return await pdf_processor.process(state)


builder.add_node("doc2store", doc2store)
builder.add_node("conditional_entry_point", conditional_entry_point)
builder.add_node("router", router_node)
builder.add_node("error", validator_node)
builder.add_node("check_updated_form_fields", form_field_validation_agent)
builder.add_node("rag_agent", rag_node)
builder.add_node("pdf_agent", pdf_node)
builder.add_node("memory2state", memory2state)
builder.add_node("upload_pdf", upload_pdf_node)
builder.add_node("extract_context", extract_context_node)
builder.add_node("ocr_process", ocr_process_node)
builder.add_node("structure_ocr", process_pdf_node)


builder.add_conditional_edges(
    START,
    route_based_on_additional_docs,
    {
        "doc2store": "doc2store",
        "conditional_entry_point": "conditional_entry_point",
    },
)

builder.add_edge("doc2store", "conditional_entry_point")

builder.add_conditional_edges(
    "router",
    route_decision,
    {"pdf_agent": "pdf_agent", "rag_agent": "rag_agent"},
)

builder.add_edge("pdf_agent", END)
builder.add_edge("rag_agent", END)
builder.add_edge("error", END)
graph = builder.compile(
    interrupt_before=[],
    interrupt_after=[],
)
graph.name = "phill"

memory2state node:

async def memory2state(
    state: State,
) -> Command[Literal["rag_agent", "router", "error"]]:
    """
    Pulls data from store into the LangGraph state and routes forward.
    Always returns detailed workflow_logs for tracing.
    """
    try:
        store = get_store()
        form_id = state.get("form_id")
        user_id = state.get("user_id")

        cet = pytz.timezone("Europe/Belgrade")
        current_time = datetime.datetime.now(cet)
        if not state.get("workflow_logs"):
            workflow_logs = []
        workflow_logs = state.get("workflow_logs")

        if not form_id:
            raise ValueError("memory2state failed: Missing `form_id` in state")
        if not user_id:
            raise ValueError("memory2state failed: Missing `user_id` in state")

        fill_mode_disable_state = state.get("fill_mode_disable", False)
        populated_form_fields = state.get("populated_form_fields", [])

        if isinstance(populated_form_fields, str):
            try:
                populated_form_fields = json.loads(populated_form_fields)
            except json.JSONDecodeError as e:
                raise ValueError(
                    f"memory2state failed: Invalid JSON in `populated_form_fields` → {e}"
                )

        welcome_message = await store.aget((user_id, form_id), "welcome_message")
        desired_language = await store.aget((user_id, form_id), "desired_language")
        creator_instructions = await store.aget(
            (user_id, form_id), "creator_instructions"
        )

        # Validate retrieved data
        if welcome_message and not isinstance(welcome_message.value, str):
            raise TypeError("memory2state failed: `welcome_message` must be a string")

        if desired_language is None:
            desired_language_value = "English"
        elif not isinstance(desired_language.value, str):
            raise TypeError("memory2state failed: `desired_language` must be a string")
        else:
            desired_language_value = desired_language.value

        fill_mode_disable = False
        # Ako ga nema u state onda idemo vidjet ima li ga u memoriji vec psotavljeno
        if not fill_mode_disable_state:
            fill_mode_disable_result = await store.aget(
                (user_id, form_id), "fill_mode_disable"
            )
            if fill_mode_disable_result:
                fill_mode_disable = fill_mode_disable_result.value
        else:
            fill_mode_disable = fill_mode_disable_state

        updates = {
            "initialization_flag": True,
            "additional_knowledge": [],
            "populated_form_fields": populated_form_fields,
            "welcome_message": welcome_message.value if welcome_message else "",
            "creator_instructions": (
                creator_instructions.value["creator_instructions"]
                if creator_instructions and creator_instructions.value
                else ""
            ),
            "pdf_bytes": "",
            "form_fields": [],
            "fill_mode_disable": fill_mode_disable,
            "desired_language": desired_language_value,
        }

        workflow_logs.append(
            f"memory2state: Successfully reloaded data from memory at {current_time.strftime('%Y-%m-%d %H:%M:%S CET')}. "
            f"State keys: {list(updates.keys())}"
        )

        if fill_mode_disable:
            return Command(
                goto="rag_agent",
                update={
                    **updates,
                    "rag_agent_flag": True,
                    "workflow_logs": workflow_logs,
                },
            )

        return Command(
            goto="router", update={**updates, "workflow_logs": workflow_logs}
        )

    except Exception as e:
        log.exception("Unexpected error in memory2state")
        cet = pytz.timezone("Europe/Belgrade")
        current_time = datetime.datetime.now(cet)
        error_log = f"memory2state failed with error at {current_time.strftime('%Y-%m-%d %H:%M:%S CET')}: {str(e)}"
        return Command(
            update={
                "error_messages": [str(e)],
                "workflow_logs": [error_log],
            },
            goto="error",
        )
Routing_fucntions node:
async def fill_mode_disable(state: State) -> Literal["rag_agent", "router"]:
    if state.get("fill_mode_disable"):
        return "rag_agent"
    else:
        return "router"


async def route_based_on_additional_docs(
    state: State,
) -> Literal["doc2store", "conditional_entry_point"]:
    pdfs = state.get("additional_knowledge")
    return "doc2store" if pdfs else "conditional_entry_point"


log = logging.getLogger(__name__)


async def conditional_entry_point(
    state: State,
) -> Command[
    Literal[
        "router",
        "memory2state",
        "upload_pdf",
        "rag_agent",
        "check_updated_form_fields",
        "error",
        "__end__",
    ]
]:
    try:
        store = get_store()
        cet = pytz.timezone("Europe/Belgrade")
        current_time = datetime.datetime.now(cet)
        logs = []

        # FLAGS
        initialization_flag = state.get("initialization_flag", False)
        rag_agent_flag = state.get("rag_agent_flag", False)

        # PDF FORM VARIABLES
        form_id = state.get("form_id")
        user_id = state.get("user_id")
        doc_name = state.get("form_name")

        updated_form_field = state.get("updated_form_field")
        pdf_bytes = state.get("pdf_bytes")

        additional_documentation = None
        doc_name_store = f"Additional_knowladge_{doc_name}" if doc_name else None
        if user_id and doc_name_store:
            try:
                additional_documentation = await store.aget(
                    (user_id, doc_name_store), "_0_"
                )
            except Exception as e:
                log.warning(f"Could not retrieve additional_documentation: {e}")
                exception_log = format_log(
                    state,
                    current_time,
                    error=str(e),
                    node_name="conditional_entry_point",
                )
                logs.append(exception_log)

        merged_entry = None
        if user_id and form_id:
            try:
                merged_entry = await store.aget(
                    (user_id, form_id), "merged_fields_data"
                )
            except Exception as e:
                exception_log = format_log(
                    state,
                    current_time,
                    error=str(e),
                    node_name="conditional_entry_point",
                )
                logs.append(exception_log)

        # Routing logic with Command and state update
        if updated_form_field:
            if not logs:
                logs.append(
                    f"Router decided to go to 'check_updated_form_fields' at {current_time.strftime('%Y-%m-%d %H:%M:%S CET')}."
                )
            populated_from_fields = state.get("populated_form_fields")
            if populated_from_fields is None:
                # populated_from_fields na [] ako ne psotoji tako da nam u toolu u injected state arg ne izbaci error u slcuaju da je None
                return Command(
                    update={"workflow_logs": logs, "populated_form_fields": []},
                    goto="check_updated_form_fields",
                )
            return Command(
                update={"workflow_logs": logs},
                goto="check_updated_form_fields",
            )
        if rag_agent_flag:
            if not logs:
                logs.append(
                    f"Router decided to go to 'rag_agent' at {current_time.strftime('%Y-%m-%d %H:%M:%S CET')}."
                )
            return Command(update={"workflow_logs": logs}, goto="rag_agent")
        if initialization_flag:
            if not logs:
                logs.append(
                    f"Router decided to go to 'router' at {current_time.strftime('%Y-%m-%d %H:%M:%S CET')}."
                )
            return Command(update={"workflow_logs": logs}, goto="router")
        if merged_entry:
            if not logs:
                logs.append(
                    f"Router decided to go to 'memory2state' at {current_time.strftime('%Y-%m-%d %H:%M:%S CET')}."
                )
            return Command(update={"workflow_logs": logs}, goto="memory2state")
        if pdf_bytes:
            if not logs:
                logs.append(
                    f"Router decided to go to 'upload_pdf' at {current_time.strftime('%Y-%m-%d %H:%M:%S CET')}."
                )
            return Command(update={"workflow_logs": logs}, goto="upload_pdf")
        if additional_documentation:
            if not logs:
                logs.append(
                    f"Router decided to go to 'rag_agent' at {current_time.strftime('%Y-%m-%d %H:%M:%S CET')}."
                )
            return Command(update={"workflow_logs": logs}, goto="rag_agent")

        if not logs:
            logs.append(
                f"Memory brancher: No conditions matched, defaulting to '__end__' at {current_time.strftime('%Y-%m-%d %H:%M:%S CET')}."
            )

        log.info("Memory brancher: No conditions matched, defaulting to '__end__'.")
        return Command(update={"workflow_logs": logs}, goto="__end__")

    except Exception as e:
        log.exception("Unexpected error in conditional_entry_point")
        cet = pytz.timezone("Europe/Belgrade")
        current_time = datetime.datetime.now(cet)
        error_log = format_log(
            state,
            current_time,
            error=str(e),
            node_name="conditional_entry_point",
        )
        return Command(update={"workflow_logs": [error_log]}, goto="error")


async def doc2store(state: State):
    """
    Ako user upload-a additional knowledge, ovaj node ce biti pozvan.
    Odmah na pocetku sprema PDF dokumente u LangGraph cloud postgres bazu kao vektorsku bazu podataka.
    Takoder u returnu je nuzno vratiti additional_knowledge kao prazan niz da se izbjegne infinitive loop.
    """
    store = get_store()
    user_id = state.get("user_id")
    doc_name = state.get("form_name", "Doc without name")
    if "." in doc_name:
        doc_name = doc_name.replace(".", "_")
    doc_name_store = f"rag_docs_{doc_name}"
    additional_docs = state.get("additional_knowledge", [])

    workflow_logs = []

    for i, pdf_data in enumerate(additional_docs):
        if isinstance(pdf_data, str):
            try:
                pdf_bytes = base64.b64decode(pdf_data)
            except Exception as e:
                error_msg = f"Failed to base64 decode PDF data for doc {i}: {e}"
                log.error(error_msg)
                workflow_logs.append(error_msg)
                continue
        elif isinstance(pdf_data, bytes):
            pdf_bytes = pdf_data
        else:
            error_msg = f"Unexpected type for PDF data for doc {i}: {type(pdf_data)}"
            log.error(error_msg)
            workflow_logs.append(error_msg)
            continue

        text = ""
        try:
            with tempfile.NamedTemporaryFile(delete=True, mode="wb+") as temp:
                temp.write(pdf_bytes)
                temp.flush()
                temp.seek(0)
                reader = PdfReader(temp)
                for page in reader.pages:
                    text += page.extract_text() or ""
        except Exception as e:
            error_msg = f"Failed to extract text from PDF doc {i}: {e}"
            log.error(error_msg)
            workflow_logs.append(error_msg)
            continue

        try:
            await store.aput((user_id, doc_name_store), f"_{i}_", {"text": text})
            success_msg = f"Successfully stored document {i} for user {user_id} inside vector database at {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}."
            workflow_logs.append(success_msg)
        except Exception as e:
            error_msg = f"Failed to store document {i} for user {user_id}: {e}"
            log.error(error_msg)
            workflow_logs.append(error_msg)

    if not workflow_logs:
        workflow_logs.append(
            f"No documents to store for user {user_id} at {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}."
        )

    return {
        "workflow_logs": workflow_logs,
        "additional_knowledge": [],
    }


async def helper_branch_node(state: State) -> State:
    """
    Ovaj node je pomocni nodi koji zapravno nista konkrento neradi
    NBe mijenja state vec samo je krositen u smislu pomoci kako bi se helper_branch_node mogao korsitit kao ulaz za naš conditional brancher
    """
    cet = pytz.timezone("Europe/Belgrade")
    current_time = datetime.datetime.now(cet)
    node_name = "helper_branch_node"
    log = format_log(state, current_time, error="", node_name=node_name)
    return {"workflow_logs": [log]}


@eyurtsev Any update here? I am getting the same thing where I can’t get updates and goto in the Command function to work. Goto works find but my state does not update. It stems from having a list of Sends in goto and update. when I have one send, my state gets updated just fine.

@Petar @eyurtsev

I’ve not dove into the internals of how langgraph is referencing the state variables at different times, but I’m wondering if this stackoverflow is of any use to you:

@Petar have you tried passing extend to the annotator instead of add?

No i ll try this stackoverflow solution.Thanks!

@Petar
Have you maybe tried changing the state?
Maybe something like this:

workflow_logs: Annotated[Optional[list[str]], add]

I think the reason the reducer doesn’t work is because the state expects the reducer to be in an Annotation, however yours is in an Optional. I maybe wrong though.