Skip to content

Commit beec5b8

Browse files
authored
fix(llamaindex): support streaming (#142)
1 parent f2708ee commit beec5b8

File tree

6 files changed

+812
-40
lines changed

6 files changed

+812
-40
lines changed

packages/instrumentation-llamaindex/recordings/Test-LlamaIndex-instrumentation_1988279490/should-add-span-for-all-instrumented-methods_2883459899/recording.har

+19-19
Large diffs are not rendered by default.

packages/instrumentation-llamaindex/recordings/Test-LlamaIndex-instrumentation_1988279490/should-build-proper-trace-on-streaming-query-engine_2069720152/recording.har

+667
Large diffs are not rendered by default.

packages/instrumentation-llamaindex/src/custom-llm-instrumentation.ts

+5-2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import type * as llamaindex from "llamaindex";
44
import {
55
Tracer,
66
Span,
7+
Context,
78
SpanKind,
89
SpanStatusCode,
910
trace,
@@ -14,7 +15,7 @@ import { safeExecuteInTheMiddle } from "@opentelemetry/instrumentation";
1415
import { SpanAttributes } from "@traceloop/ai-semantic-conventions";
1516

1617
import { LlamaIndexInstrumentationConfig } from "./types";
17-
import { shouldSendPrompts, generatorWrapper } from "./utils";
18+
import { shouldSendPrompts, llmGeneratorWrapper } from "./utils";
1819

1920
type LLM = llamaindex.LLM;
2021

@@ -85,6 +86,7 @@ export class CustomLLMInstrumentation {
8586
result = plugin.handleStreamingResponse(
8687
result,
8788
span,
89+
execContext,
8890
this.metadata,
8991
);
9092
} else {
@@ -139,6 +141,7 @@ export class CustomLLMInstrumentation {
139141
handleStreamingResponse<T extends AsyncResponseType>(
140142
result: T,
141143
span: Span,
144+
execContext: Context,
142145
metadata: llamaindex.LLMMetadata,
143146
): T {
144147
span.setAttribute(SpanAttributes.LLM_RESPONSE_MODEL, metadata.model);
@@ -148,7 +151,7 @@ export class CustomLLMInstrumentation {
148151
return result;
149152
}
150153

151-
return generatorWrapper(result, (message) => {
154+
return llmGeneratorWrapper(result, execContext, (message) => {
152155
span.setAttribute(`${SpanAttributes.LLM_COMPLETIONS}.0.content`, message);
153156
span.setStatus({ code: SpanStatusCode.OK });
154157
span.end();

packages/instrumentation-llamaindex/src/utils.ts

+61-17
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import * as llamaindex from "llamaindex";
33
import { trace, context, Tracer, SpanStatusCode } from "@opentelemetry/api";
44
import { LlamaIndexInstrumentationConfig } from "./types";
55
import { safeExecuteInTheMiddle } from "@opentelemetry/instrumentation";
6+
import { Context } from "@opentelemetry/api";
67
import {
78
TraceloopSpanKindValues,
89
SpanAttributes,
@@ -21,14 +22,46 @@ export const shouldSendPrompts = (config: LlamaIndexInstrumentationConfig) => {
2122
return config.traceContent !== undefined ? config.traceContent : true;
2223
};
2324

25+
// Adopted from https://github.com/open-telemetry/opentelemetry-js/issues/2951#issuecomment-1214587378
26+
export function bindAsyncGenerator<T = unknown, TReturn = any, TNext = unknown>(
27+
ctx: Context,
28+
generator: AsyncGenerator<T, TReturn, TNext>,
29+
): AsyncGenerator<T, TReturn, TNext> {
30+
return {
31+
next: context.bind(ctx, generator.next.bind(generator)),
32+
return: context.bind(ctx, generator.return.bind(generator)),
33+
throw: context.bind(ctx, generator.throw.bind(generator)),
34+
35+
[Symbol.asyncIterator]() {
36+
return bindAsyncGenerator(ctx, generator[Symbol.asyncIterator]());
37+
},
38+
};
39+
}
40+
2441
export async function* generatorWrapper(
42+
streamingResult: AsyncGenerator,
43+
ctx: Context,
44+
fn: () => void,
45+
) {
46+
for await (const chunk of bindAsyncGenerator(ctx, streamingResult)) {
47+
yield chunk;
48+
}
49+
fn();
50+
}
51+
52+
export async function* llmGeneratorWrapper(
2553
streamingResult:
2654
| AsyncIterable<llamaindex.ChatResponseChunk>
2755
| AsyncIterable<llamaindex.CompletionResponse>,
56+
ctx: Context,
2857
fn: (message: string) => void,
2958
) {
3059
let message = "";
31-
for await (const messageChunk of streamingResult) {
60+
61+
for await (const messageChunk of bindAsyncGenerator(
62+
ctx,
63+
streamingResult as AsyncGenerator,
64+
)) {
3265
if ((messageChunk as llamaindex.ChatResponseChunk).delta) {
3366
message += (messageChunk as llamaindex.ChatResponseChunk).delta;
3467
}
@@ -50,6 +83,9 @@ export function genericWrapper(
5083
// eslint-disable-next-line @typescript-eslint/ban-types
5184
return (original: Function) => {
5285
return function method(this: any, ...args: unknown[]) {
86+
const params = args[0];
87+
const streaming = params && (params as any).stream;
88+
5389
const name = `${lodash.snakeCase(className)}.${lodash.snakeCase(methodName)}`;
5490
const span = tracer().startSpan(`${name}`, {}, context.active());
5591
span.setAttribute(SpanAttributes.TRACELOOP_SPAN_KIND, kind);
@@ -98,25 +134,33 @@ export function genericWrapper(
98134
const wrappedPromise = execPromise
99135
.then((result: any) => {
100136
return new Promise((resolve) => {
101-
span.setStatus({ code: SpanStatusCode.OK });
137+
if (streaming) {
138+
result = generatorWrapper(result, execContext, () => {
139+
span.setStatus({ code: SpanStatusCode.OK });
140+
span.end();
141+
});
142+
resolve(result);
143+
} else {
144+
span.setStatus({ code: SpanStatusCode.OK });
102145

103-
try {
104-
if (shouldSendPrompts) {
105-
if (result instanceof Map) {
106-
span.setAttribute(
107-
SpanAttributes.TRACELOOP_ENTITY_OUTPUT,
108-
JSON.stringify(Array.from(result.entries())),
109-
);
110-
} else {
111-
span.setAttribute(
112-
SpanAttributes.TRACELOOP_ENTITY_OUTPUT,
113-
JSON.stringify(result),
114-
);
146+
try {
147+
if (shouldSendPrompts) {
148+
if (result instanceof Map) {
149+
span.setAttribute(
150+
SpanAttributes.TRACELOOP_ENTITY_OUTPUT,
151+
JSON.stringify(Array.from(result.entries())),
152+
);
153+
} else {
154+
span.setAttribute(
155+
SpanAttributes.TRACELOOP_ENTITY_OUTPUT,
156+
JSON.stringify(result),
157+
);
158+
}
115159
}
160+
} finally {
161+
span.end();
162+
resolve(result);
116163
}
117-
} finally {
118-
span.end();
119-
resolve(result);
120164
}
121165
});
122166
})

packages/instrumentation-llamaindex/test/instrumentation.test.ts

+54
Original file line numberDiff line numberDiff line change
@@ -200,4 +200,58 @@ describe("Test LlamaIndex instrumentation", async function () {
200200
result.response,
201201
);
202202
}).timeout(60000);
203+
204+
it("should build proper trace on streaming query engine", async () => {
205+
const directoryReader = new llamaindex.SimpleDirectoryReader();
206+
const documents = await directoryReader.loadData({ directoryPath: "test" });
207+
const embedModel = new llamaindex.OpenAIEmbedding();
208+
const vectorStore = new llamaindex.SimpleVectorStore();
209+
210+
const serviceContext = llamaindex.serviceContextFromDefaults({
211+
embedModel,
212+
});
213+
const storageContext = await llamaindex.storageContextFromDefaults({
214+
vectorStore,
215+
});
216+
217+
const index = await llamaindex.VectorStoreIndex.fromDocuments(documents, {
218+
storageContext,
219+
serviceContext,
220+
});
221+
222+
const queryEngine = index.asQueryEngine();
223+
224+
const result = await queryEngine.query({
225+
query: "Where was albert einstein born?",
226+
stream: true,
227+
});
228+
229+
for await (const res of result) {
230+
assert.ok(res);
231+
}
232+
233+
const spans = memoryExporter.getFinishedSpans();
234+
235+
// TODO: Need to figure out why this doesn't get logged
236+
// assert.ok(spanNames.includes("get_query_embedding.task"));
237+
238+
const retrieverQueryEngineSpan = spans.find(
239+
(span) => span.name === "retriever_query_engine.query",
240+
);
241+
const synthesizeSpan = spans.find(
242+
(span) => span.name === "response_synthesizer.synthesize",
243+
);
244+
const openAIChatSpan = spans.find(
245+
(span) => span.name === "llamaindex.open_ai.chat",
246+
);
247+
248+
assert.strictEqual(
249+
synthesizeSpan?.parentSpanId,
250+
retrieverQueryEngineSpan?.spanContext().spanId,
251+
);
252+
assert.strictEqual(
253+
openAIChatSpan?.parentSpanId,
254+
synthesizeSpan?.spanContext().spanId,
255+
);
256+
}).timeout(60000);
203257
});

packages/sample-app/src/sample_llamaindex.ts

+6-2
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ class SampleLlamaIndex {
3636

3737
const res = await queryEngine.query({
3838
query: "What did the author do growing up?",
39+
stream: true,
3940
});
4041
return res;
4142
}
@@ -45,7 +46,10 @@ traceloop.withAssociationProperties(
4546
{ user_id: "12345", chat_id: "789" },
4647
async () => {
4748
const sampleLlamaIndex = new SampleLlamaIndex();
48-
const result = await sampleLlamaIndex.query();
49-
console.log(result.response);
49+
const res = await sampleLlamaIndex.query();
50+
for await (const result of res) {
51+
process.stdout.write(result.response);
52+
}
53+
//console.log(result.response);
5054
},
5155
);

0 commit comments

Comments
 (0)