I am using a Langsmith wrapper traceable around a complex agent with multiple different AI calls to group them, and using thread_id to merge agent responses in the UI. I need to teach the outer traceable how to process the final response for it to appear properly in the langsmith UI.
I tried using process_outputs and getting the content out of the stream but that blocks the result of traceable and therefore blocks my endpoint until the stream ends which defeats the purpose of streaming.
Do I need to reimplement the aggregator function above or is there a supplied easier way to handle this?
Yes we should have examples for this - if you are directly returning e.g. a final streamText response, can you try this?
If you are returning a stream directly from traceable, then yes you will need to re-implement an aggregator function. If you have an example of what you’re currently trying, I can advise further.
@jacoblee93 the issue with using await in processOutputs is it blocks the result of the traceable although ultimately this does work. Ultimately I ended up doing this:
let traceablePromise: Promise<{
result: StreamTextResult<Record<string, Tool>, never> | undefined;
}>;
const dataStreamResponsePromise: Promise<Response> = new Promise(
(resolve) => {
traceablePromise = traceable(
// messages are passed in as an argument so langsmith knows what the input is
async (messages: Message[]) => {
let result: StreamTextResult<Record<string, Tool>, never> | undefined;
const dataStream = createDataStreamResponse({
execute: async (dataStream) => {
result = streamText({ ... });
result.mergeIntoDataStream(dataStream);
},
});
resolve(dataStream);
return { result };
},
{
processOutputs: async ({
result,
}: {
result?: StreamTextResult<Record<string, Tool>, never>;
}) => {
const messages = (await result?.response)?.messages;
return { messages };
},
},
)(args.messages);
},
);
// wait until the response is complete and for the traces to finish before shutting down the runtime
async function waitForTracesToFinish() {
const { result } = await traceablePromise;
await result?.response;
await client.awaitPendingTraceBatches();
}
waitUntil(waitForTracesToFinish());
return dataStreamResponsePromise;
So I could return the data stream response before the traceable resolves (return the stream to client) and still have the traceable process the output correctly.