Typing of streamed chunks?

Hey folks,
I get a terrible developer experience whenever using `agent.stream`, as chunks are totally untyped and made of relatively complex nested dictionary, with advanced scenarios such as handling HITL.
Do you have tips for a better typing? Do you write your own classes for each chunk type or perhaps reuse types from LangChain or a third-party lib?

Here is a sample code to illustrate:

for chunk in agent.stream(command, config=config):
            # Not typed
            if "__interrupt__" in chunk:
                RUN_LOOP = True
                # Not typed either
                interrupt_val = chunk["__interrupt__"][0].value
                 decisions = []
                # Not typed access
                for action in interrupt_val["action_requests"]:
                    # NOTE: zipping could work but not sure if order is guaranteed
                    review_config = next(rc for rc in
                                         # Deep and not typed...
                                         interrupt_val["review_configs"] if rc["action_name"] == action["name"])
                    response = Prompt.ask(
                        f"Interrupt received for action {action['name']} with config {review_config}.", choices=review_config["choices"])
                    decision = {"type": response}
                    # Default possible responses not typed
                    if response == "edit": 
                         # Handling edition : not typed ':)
                        decision["edited_action"] = console.input(
                            "Proposed edit?")
                    decisions.append(decision)
                # Resume is an untyped dict
                command = Command(resume={"decisions": decisions})

just released version=”v2” if you want all typed chunks!!

Mar-ve-lous :slight_smile: Will give it a shot ASAP thank you.

1 Like