Here is my code (some examples):
class State(AgentState):
workflow_logs: Optional[Annotated[list[str], add]]
My one of tools:
@tool
def update_pdf(
tool_call_id: Annotated[str, InjectedToolCallId],
populated_form_fields: Annotated[
list[dict], InjectedState("populated_form_fields")
],
field: str,
label: str,
value: str,
type: str,
):
"""
"""
SOME CODE ......
log_message = (
f"Field '{label}' (internal name: '{field}', type: {type}) "
f"updated with value: '{value}'. Total populated fields: {len(populated_form_fields)}."
)
return Command(
update={
"populated_form_fields": populated_form_fields,
"updated_form_field": {},
"messages": [ToolMessage(message_content, tool_call_id=tool_call_id)],
"workflow_logs": [log_message],
},
)
GRAPH.PY
builder = StateGraph(State, config_schema=CombinedConfiguration)
pdf_processor = PDFProcessor()
async def process_pdf_node(
state: State,
) -> Command[Literal["rag_agent", "router", "error"]]:
return await pdf_processor.process(state)
builder.add_node("doc2store", doc2store)
builder.add_node("conditional_entry_point", conditional_entry_point)
builder.add_node("router", router_node)
builder.add_node("error", validator_node)
builder.add_node("check_updated_form_fields", form_field_validation_agent)
builder.add_node("rag_agent", rag_node)
builder.add_node("pdf_agent", pdf_node)
builder.add_node("memory2state", memory2state)
builder.add_node("upload_pdf", upload_pdf_node)
builder.add_node("extract_context", extract_context_node)
builder.add_node("ocr_process", ocr_process_node)
builder.add_node("structure_ocr", process_pdf_node)
builder.add_conditional_edges(
START,
route_based_on_additional_docs,
{
"doc2store": "doc2store",
"conditional_entry_point": "conditional_entry_point",
},
)
builder.add_edge("doc2store", "conditional_entry_point")
builder.add_conditional_edges(
"router",
route_decision,
{"pdf_agent": "pdf_agent", "rag_agent": "rag_agent"},
)
builder.add_edge("pdf_agent", END)
builder.add_edge("rag_agent", END)
builder.add_edge("error", END)
graph = builder.compile(
interrupt_before=[],
interrupt_after=[],
)
graph.name = "phill"
memory2state node:
async def memory2state(
state: State,
) -> Command[Literal["rag_agent", "router", "error"]]:
"""
Pulls data from store into the LangGraph state and routes forward.
Always returns detailed workflow_logs for tracing.
"""
try:
store = get_store()
form_id = state.get("form_id")
user_id = state.get("user_id")
cet = pytz.timezone("Europe/Belgrade")
current_time = datetime.datetime.now(cet)
if not state.get("workflow_logs"):
workflow_logs = []
workflow_logs = state.get("workflow_logs")
if not form_id:
raise ValueError("memory2state failed: Missing `form_id` in state")
if not user_id:
raise ValueError("memory2state failed: Missing `user_id` in state")
fill_mode_disable_state = state.get("fill_mode_disable", False)
populated_form_fields = state.get("populated_form_fields", [])
if isinstance(populated_form_fields, str):
try:
populated_form_fields = json.loads(populated_form_fields)
except json.JSONDecodeError as e:
raise ValueError(
f"memory2state failed: Invalid JSON in `populated_form_fields` → {e}"
)
welcome_message = await store.aget((user_id, form_id), "welcome_message")
desired_language = await store.aget((user_id, form_id), "desired_language")
creator_instructions = await store.aget(
(user_id, form_id), "creator_instructions"
)
# Validate retrieved data
if welcome_message and not isinstance(welcome_message.value, str):
raise TypeError("memory2state failed: `welcome_message` must be a string")
if desired_language is None:
desired_language_value = "English"
elif not isinstance(desired_language.value, str):
raise TypeError("memory2state failed: `desired_language` must be a string")
else:
desired_language_value = desired_language.value
fill_mode_disable = False
# Ako ga nema u state onda idemo vidjet ima li ga u memoriji vec psotavljeno
if not fill_mode_disable_state:
fill_mode_disable_result = await store.aget(
(user_id, form_id), "fill_mode_disable"
)
if fill_mode_disable_result:
fill_mode_disable = fill_mode_disable_result.value
else:
fill_mode_disable = fill_mode_disable_state
updates = {
"initialization_flag": True,
"additional_knowledge": [],
"populated_form_fields": populated_form_fields,
"welcome_message": welcome_message.value if welcome_message else "",
"creator_instructions": (
creator_instructions.value["creator_instructions"]
if creator_instructions and creator_instructions.value
else ""
),
"pdf_bytes": "",
"form_fields": [],
"fill_mode_disable": fill_mode_disable,
"desired_language": desired_language_value,
}
workflow_logs.append(
f"memory2state: Successfully reloaded data from memory at {current_time.strftime('%Y-%m-%d %H:%M:%S CET')}. "
f"State keys: {list(updates.keys())}"
)
if fill_mode_disable:
return Command(
goto="rag_agent",
update={
**updates,
"rag_agent_flag": True,
"workflow_logs": workflow_logs,
},
)
return Command(
goto="router", update={**updates, "workflow_logs": workflow_logs}
)
except Exception as e:
log.exception("Unexpected error in memory2state")
cet = pytz.timezone("Europe/Belgrade")
current_time = datetime.datetime.now(cet)
error_log = f"memory2state failed with error at {current_time.strftime('%Y-%m-%d %H:%M:%S CET')}: {str(e)}"
return Command(
update={
"error_messages": [str(e)],
"workflow_logs": [error_log],
},
goto="error",
)
Routing_fucntions node:
async def fill_mode_disable(state: State) -> Literal["rag_agent", "router"]:
if state.get("fill_mode_disable"):
return "rag_agent"
else:
return "router"
async def route_based_on_additional_docs(
state: State,
) -> Literal["doc2store", "conditional_entry_point"]:
pdfs = state.get("additional_knowledge")
return "doc2store" if pdfs else "conditional_entry_point"
log = logging.getLogger(__name__)
async def conditional_entry_point(
state: State,
) -> Command[
Literal[
"router",
"memory2state",
"upload_pdf",
"rag_agent",
"check_updated_form_fields",
"error",
"__end__",
]
]:
try:
store = get_store()
cet = pytz.timezone("Europe/Belgrade")
current_time = datetime.datetime.now(cet)
logs = []
# FLAGS
initialization_flag = state.get("initialization_flag", False)
rag_agent_flag = state.get("rag_agent_flag", False)
# PDF FORM VARIABLES
form_id = state.get("form_id")
user_id = state.get("user_id")
doc_name = state.get("form_name")
updated_form_field = state.get("updated_form_field")
pdf_bytes = state.get("pdf_bytes")
additional_documentation = None
doc_name_store = f"Additional_knowladge_{doc_name}" if doc_name else None
if user_id and doc_name_store:
try:
additional_documentation = await store.aget(
(user_id, doc_name_store), "_0_"
)
except Exception as e:
log.warning(f"Could not retrieve additional_documentation: {e}")
exception_log = format_log(
state,
current_time,
error=str(e),
node_name="conditional_entry_point",
)
logs.append(exception_log)
merged_entry = None
if user_id and form_id:
try:
merged_entry = await store.aget(
(user_id, form_id), "merged_fields_data"
)
except Exception as e:
exception_log = format_log(
state,
current_time,
error=str(e),
node_name="conditional_entry_point",
)
logs.append(exception_log)
# Routing logic with Command and state update
if updated_form_field:
if not logs:
logs.append(
f"Router decided to go to 'check_updated_form_fields' at {current_time.strftime('%Y-%m-%d %H:%M:%S CET')}."
)
populated_from_fields = state.get("populated_form_fields")
if populated_from_fields is None:
# populated_from_fields na [] ako ne psotoji tako da nam u toolu u injected state arg ne izbaci error u slcuaju da je None
return Command(
update={"workflow_logs": logs, "populated_form_fields": []},
goto="check_updated_form_fields",
)
return Command(
update={"workflow_logs": logs},
goto="check_updated_form_fields",
)
if rag_agent_flag:
if not logs:
logs.append(
f"Router decided to go to 'rag_agent' at {current_time.strftime('%Y-%m-%d %H:%M:%S CET')}."
)
return Command(update={"workflow_logs": logs}, goto="rag_agent")
if initialization_flag:
if not logs:
logs.append(
f"Router decided to go to 'router' at {current_time.strftime('%Y-%m-%d %H:%M:%S CET')}."
)
return Command(update={"workflow_logs": logs}, goto="router")
if merged_entry:
if not logs:
logs.append(
f"Router decided to go to 'memory2state' at {current_time.strftime('%Y-%m-%d %H:%M:%S CET')}."
)
return Command(update={"workflow_logs": logs}, goto="memory2state")
if pdf_bytes:
if not logs:
logs.append(
f"Router decided to go to 'upload_pdf' at {current_time.strftime('%Y-%m-%d %H:%M:%S CET')}."
)
return Command(update={"workflow_logs": logs}, goto="upload_pdf")
if additional_documentation:
if not logs:
logs.append(
f"Router decided to go to 'rag_agent' at {current_time.strftime('%Y-%m-%d %H:%M:%S CET')}."
)
return Command(update={"workflow_logs": logs}, goto="rag_agent")
if not logs:
logs.append(
f"Memory brancher: No conditions matched, defaulting to '__end__' at {current_time.strftime('%Y-%m-%d %H:%M:%S CET')}."
)
log.info("Memory brancher: No conditions matched, defaulting to '__end__'.")
return Command(update={"workflow_logs": logs}, goto="__end__")
except Exception as e:
log.exception("Unexpected error in conditional_entry_point")
cet = pytz.timezone("Europe/Belgrade")
current_time = datetime.datetime.now(cet)
error_log = format_log(
state,
current_time,
error=str(e),
node_name="conditional_entry_point",
)
return Command(update={"workflow_logs": [error_log]}, goto="error")
async def doc2store(state: State):
"""
Ako user upload-a additional knowledge, ovaj node ce biti pozvan.
Odmah na pocetku sprema PDF dokumente u LangGraph cloud postgres bazu kao vektorsku bazu podataka.
Takoder u returnu je nuzno vratiti additional_knowledge kao prazan niz da se izbjegne infinitive loop.
"""
store = get_store()
user_id = state.get("user_id")
doc_name = state.get("form_name", "Doc without name")
if "." in doc_name:
doc_name = doc_name.replace(".", "_")
doc_name_store = f"rag_docs_{doc_name}"
additional_docs = state.get("additional_knowledge", [])
workflow_logs = []
for i, pdf_data in enumerate(additional_docs):
if isinstance(pdf_data, str):
try:
pdf_bytes = base64.b64decode(pdf_data)
except Exception as e:
error_msg = f"Failed to base64 decode PDF data for doc {i}: {e}"
log.error(error_msg)
workflow_logs.append(error_msg)
continue
elif isinstance(pdf_data, bytes):
pdf_bytes = pdf_data
else:
error_msg = f"Unexpected type for PDF data for doc {i}: {type(pdf_data)}"
log.error(error_msg)
workflow_logs.append(error_msg)
continue
text = ""
try:
with tempfile.NamedTemporaryFile(delete=True, mode="wb+") as temp:
temp.write(pdf_bytes)
temp.flush()
temp.seek(0)
reader = PdfReader(temp)
for page in reader.pages:
text += page.extract_text() or ""
except Exception as e:
error_msg = f"Failed to extract text from PDF doc {i}: {e}"
log.error(error_msg)
workflow_logs.append(error_msg)
continue
try:
await store.aput((user_id, doc_name_store), f"_{i}_", {"text": text})
success_msg = f"Successfully stored document {i} for user {user_id} inside vector database at {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}."
workflow_logs.append(success_msg)
except Exception as e:
error_msg = f"Failed to store document {i} for user {user_id}: {e}"
log.error(error_msg)
workflow_logs.append(error_msg)
if not workflow_logs:
workflow_logs.append(
f"No documents to store for user {user_id} at {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}."
)
return {
"workflow_logs": workflow_logs,
"additional_knowledge": [],
}
async def helper_branch_node(state: State) -> State:
"""
Ovaj node je pomocni nodi koji zapravno nista konkrento neradi
NBe mijenja state vec samo je krositen u smislu pomoci kako bi se helper_branch_node mogao korsitit kao ulaz za naš conditional brancher
"""
cet = pytz.timezone("Europe/Belgrade")
current_time = datetime.datetime.now(cet)
node_name = "helper_branch_node"
log = format_log(state, current_time, error="", node_name=node_name)
return {"workflow_logs": [log]}