From 86f45af9ccf460ab4b5aec94f63a85f9e7adcd8e Mon Sep 17 00:00:00 2001 From: RulaKhaled Date: Mon, 21 Jul 2025 09:52:51 +0200 Subject: [PATCH 01/11] feat: Stream responses for openai node --- packages/core/src/utils/gen-ai-attributes.ts | 9 +- packages/core/src/utils/openai/constants.ts | 1 + packages/core/src/utils/openai/index.ts | 85 ++--- packages/core/src/utils/openai/streaming.ts | 136 +++++++ packages/core/src/utils/openai/types.ts | 106 ++++++ packages/core/src/utils/openai/utils.ts | 111 +++++- .../core/test/lib/utils/openai-utils.test.ts | 67 ++++ .../tracing/openai/openai.test.ts | 352 ++++++++++++++++++ 8 files changed, 802 insertions(+), 65 deletions(-) create mode 100644 packages/core/src/utils/openai/streaming.ts create mode 100644 packages/node/test/integrations/tracing/openai/openai.test.ts diff --git a/packages/core/src/utils/gen-ai-attributes.ts b/packages/core/src/utils/gen-ai-attributes.ts index cf8a073a4313..9a0fd5e28e7a 100644 --- a/packages/core/src/utils/gen-ai-attributes.ts +++ b/packages/core/src/utils/gen-ai-attributes.ts @@ -127,15 +127,20 @@ export const OPENAI_RESPONSE_MODEL_ATTRIBUTE = 'openai.response.model'; export const OPENAI_RESPONSE_TIMESTAMP_ATTRIBUTE = 'openai.response.timestamp'; /** - * The number of completion tokens used (OpenAI specific) + * The number of completion tokens used */ export const OPENAI_USAGE_COMPLETION_TOKENS_ATTRIBUTE = 'openai.usage.completion_tokens'; /** - * The number of prompt tokens used (OpenAI specific) + * The number of prompt tokens used */ export const OPENAI_USAGE_PROMPT_TOKENS_ATTRIBUTE = 'openai.usage.prompt_tokens'; +/** + * Whether the response is a stream response + */ +export const OPENAI_RESPONSE_STREAM_ATTRIBUTE = 'openai.response.stream'; + // ============================================================================= // OPENAI OPERATIONS // ============================================================================= diff --git a/packages/core/src/utils/openai/constants.ts b/packages/core/src/utils/openai/constants.ts index e552616cc1db..f818c9e4c48f 100644 --- a/packages/core/src/utils/openai/constants.ts +++ b/packages/core/src/utils/openai/constants.ts @@ -3,3 +3,4 @@ export const OPENAI_INTEGRATION_NAME = 'OpenAI'; // https://platform.openai.com/docs/quickstart?api-mode=responses // https://platform.openai.com/docs/quickstart?api-mode=chat export const INSTRUMENTED_METHODS = ['responses.create', 'chat.completions.create'] as const; +export const RESPONSE_EVENT_TYPES = ['response.created', 'response.in_progress', 'response.failed', 'response.completed', 'response.incomplete', 'response.queued', 'response.output_text.delta'] as const; diff --git a/packages/core/src/utils/openai/index.ts b/packages/core/src/utils/openai/index.ts index 2b5fdbef9c11..bf3fd1416787 100644 --- a/packages/core/src/utils/openai/index.ts +++ b/packages/core/src/utils/openai/index.ts @@ -8,24 +8,18 @@ import { GEN_AI_REQUEST_MESSAGES_ATTRIBUTE, GEN_AI_REQUEST_MODEL_ATTRIBUTE, GEN_AI_REQUEST_PRESENCE_PENALTY_ATTRIBUTE, + GEN_AI_REQUEST_STREAM_ATTRIBUTE, GEN_AI_REQUEST_TEMPERATURE_ATTRIBUTE, GEN_AI_REQUEST_TOP_P_ATTRIBUTE, GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE, - GEN_AI_RESPONSE_ID_ATTRIBUTE, - GEN_AI_RESPONSE_MODEL_ATTRIBUTE, GEN_AI_RESPONSE_TEXT_ATTRIBUTE, GEN_AI_SYSTEM_ATTRIBUTE, - GEN_AI_USAGE_INPUT_TOKENS_ATTRIBUTE, - GEN_AI_USAGE_OUTPUT_TOKENS_ATTRIBUTE, - GEN_AI_USAGE_TOTAL_TOKENS_ATTRIBUTE, - OPENAI_RESPONSE_ID_ATTRIBUTE, - OPENAI_RESPONSE_MODEL_ATTRIBUTE, - OPENAI_RESPONSE_TIMESTAMP_ATTRIBUTE, - OPENAI_USAGE_COMPLETION_TOKENS_ATTRIBUTE, - OPENAI_USAGE_PROMPT_TOKENS_ATTRIBUTE, + OPENAI_RESPONSE_STREAM_ATTRIBUTE, } from '../gen-ai-attributes'; import { OPENAI_INTEGRATION_NAME } from './constants'; +import { instrumentStream } from './streaming'; import type { + ChatCompletionChunk, InstrumentedMethod, OpenAiChatCompletionObject, OpenAiClient, @@ -33,6 +27,8 @@ import type { OpenAiOptions, OpenAiResponse, OpenAIResponseObject, + OpenAIStream, + ResponseStreamingEvent, } from './types'; import { buildMethodPath, @@ -40,6 +36,9 @@ import { getSpanOperation, isChatCompletionResponse, isResponsesApiResponse, + isStream, + setCommonResponseAttributes, + setTokenUsageAttributes, shouldInstrument, } from './utils'; @@ -61,6 +60,7 @@ function extractRequestAttributes(args: unknown[], methodPath: string): Record): void if ('input' in params) { span.setAttributes({ [GEN_AI_REQUEST_MESSAGES_ATTRIBUTE]: JSON.stringify(params.input) }); } + if ('stream' in params) { + span.setAttributes({ [OPENAI_RESPONSE_STREAM_ATTRIBUTE]: Boolean(params.stream) }); + } } function getOptionsFromIntegration(): OpenAiOptions { @@ -239,7 +191,16 @@ function instrumentMethod( } const result = await originalMethod.apply(context, args); - // TODO: Add streaming support + + if (isStream(result)) { + return instrumentStream( + result as OpenAIStream, + span, + finalOptions.recordOutputs ?? false, + ) as unknown as R; + } + + // Handle non-streaming responses addResponseAttributes(span, result, finalOptions.recordOutputs); return result; } catch (error) { diff --git a/packages/core/src/utils/openai/streaming.ts b/packages/core/src/utils/openai/streaming.ts new file mode 100644 index 000000000000..dbefe9dc9a82 --- /dev/null +++ b/packages/core/src/utils/openai/streaming.ts @@ -0,0 +1,136 @@ +import { captureException } from '../../exports'; +import type { Span } from '../../types-hoist/span'; +import { GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE, GEN_AI_RESPONSE_TEXT_ATTRIBUTE } from '../gen-ai-attributes'; +import { RESPONSE_EVENT_TYPES } from './constants'; +import type { OpenAIResponseObject } from './types'; +import { type ChatCompletionChunk, type ResponseStreamingEvent } from './types'; +import { + isChatCompletionChunk, + isResponsesApiStreamEvent, + setCommonResponseAttributes, + setTokenUsageAttributes, +} from './utils'; + +interface StreamingState { + eventTypes: string[]; + responseTexts: string[]; + finishReasons: string[]; + responseId?: string; + responseModel?: string; + responseTimestamp?: number; + promptTokens?: number; + completionTokens?: number; + totalTokens?: number; +} + +function processChatCompletionChunk(chunk: ChatCompletionChunk, state: StreamingState, recordOutputs: boolean): void { + state.responseId = chunk.id ?? state.responseId; + state.responseModel = chunk.model ?? state.responseModel; + state.responseTimestamp = chunk.created ?? state.responseTimestamp; + + if (chunk.usage) { + state.promptTokens = chunk.usage.prompt_tokens; + state.completionTokens = chunk.usage.completion_tokens; + state.totalTokens = chunk.usage.total_tokens; + } + + for (const choice of chunk.choices ?? []) { + if (recordOutputs && choice.delta?.content) { + state.responseTexts.push(choice.delta.content); + } + if (choice.finish_reason) { + state.finishReasons.push(choice.finish_reason); + } + } +} + +function processResponsesApiEvent( + streamEvent: ResponseStreamingEvent | unknown | Error, + state: StreamingState, + recordOutputs: boolean, +): void { + if (!(streamEvent && typeof streamEvent === 'object')) { + state.eventTypes.push('unknown:non-object'); + return; + } + if (streamEvent instanceof Error) { + captureException(streamEvent); + return; + } + + if (!('type' in streamEvent)) return; + const event = streamEvent as ResponseStreamingEvent; + + if (!RESPONSE_EVENT_TYPES.includes(event.type)) { + state.eventTypes.push(event.type); + return; + } + + if (recordOutputs && event.type === 'response.output_text.delta' && 'delta' in event && event.delta) { + state.responseTexts.push(event.delta); + return; + } + + const { response } = event as { response: OpenAIResponseObject }; + state.responseId = response.id ?? state.responseId; + state.responseModel = response.model ?? state.responseModel; + state.responseTimestamp = response.created_at ?? state.responseTimestamp; + + if (response.usage) { + state.promptTokens = response.usage.input_tokens; + state.completionTokens = response.usage.output_tokens; + state.totalTokens = response.usage.total_tokens; + } + + if (response.status) { + state.finishReasons.push(response.status); + } + + if (recordOutputs && response.output_text) { + state.responseTexts.push(response.output_text); + } +} +/** + * Instrument a stream of OpenAI events + * @param stream - The stream of events to instrument + * @param span - The span to add attributes to + * @param recordOutputs - Whether to record outputs + * @returns A generator that yields the events + */ +export async function* instrumentStream( + stream: AsyncIterable, + span: Span, + recordOutputs: boolean, +): AsyncGenerator { + const state: StreamingState = { + eventTypes: [], + responseTexts: [], + finishReasons: [], + }; + + try { + for await (const event of stream) { + if (isChatCompletionChunk(event)) { + processChatCompletionChunk(event as ChatCompletionChunk, state, recordOutputs); + } else if (isResponsesApiStreamEvent(event)) { + processResponsesApiEvent(event as ResponseStreamingEvent, state, recordOutputs); + } + yield event; + } + } finally { + setCommonResponseAttributes(span, state.responseId, state.responseModel, state.responseTimestamp); + setTokenUsageAttributes(span, state.promptTokens, state.completionTokens, state.totalTokens); + + if (state.finishReasons.length) { + span.setAttributes({ + [GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE]: JSON.stringify(state.finishReasons), + }); + } + + if (recordOutputs && state.responseTexts.length) { + span.setAttributes({ + [GEN_AI_RESPONSE_TEXT_ATTRIBUTE]: JSON.stringify(state.responseTexts), + }); + } + } +} diff --git a/packages/core/src/utils/openai/types.ts b/packages/core/src/utils/openai/types.ts index c9a3870a959e..a5854868fbe2 100644 --- a/packages/core/src/utils/openai/types.ts +++ b/packages/core/src/utils/openai/types.ts @@ -132,6 +132,112 @@ export interface OpenAIResponseObject { export type OpenAiResponse = OpenAiChatCompletionObject | OpenAIResponseObject; +/** + * Streaming event types for the Responses API + * @see https://platform.openai.com/docs/api-reference/responses-streaming + * @see https://platform.openai.com/docs/guides/streaming-responses#read-the-responses for common events + */ +export type ResponseStreamingEvent = + | ResponseCreatedEvent + | ResponseInProgressEvent + | ResponseFailedEvent + | ResponseCompletedEvent + | ResponseIncompleteEvent + | ResponseQueuedEvent + | ResponseOutputTextDeltaEvent; + +interface ResponseCreatedEvent { + type: 'response.created'; + response: OpenAIResponseObject; + sequence_number: number; +} + +interface ResponseInProgressEvent { + type: 'response.in_progress'; + response: OpenAIResponseObject; + sequence_number: number; +} + +interface ResponseOutputTextDeltaEvent { + content_index: number; + delta: string; + item_id: string; + logprobs: object; + output_index: number; + sequence_number: number; + type: 'response.output_text.delta'; +} + +interface ResponseFailedEvent { + type: 'response.failed'; + response: OpenAIResponseObject; + sequence_number: number; +} + +interface ResponseIncompleteEvent { + type: 'response.incomplete'; + response: OpenAIResponseObject; + sequence_number: number; +} + +interface ResponseCompletedEvent { + type: 'response.completed'; + response: OpenAIResponseObject; + sequence_number: number; +} +interface ResponseQueuedEvent { + type: 'response.queued'; + response: OpenAIResponseObject; + sequence_number: number; +} + +/** + * Chat Completion streaming chunk type + * @see https://platform.openai.com/docs/api-reference/chat-streaming/streaming + */ +export interface ChatCompletionChunk { + id: string; + object: 'chat.completion.chunk'; + created: number; + model: string; + system_fingerprint: string; + service_tier?: string; + choices: Array<{ + index: number; + delta: { + content: string | null; + role: string; + function_call?: object; + refusal?: string | null; + tool_calls?: Array; + }; + logprobs?: unknown | null; + finish_reason?: string | null; + }>; + usage?: { + prompt_tokens: number; + completion_tokens: number; + total_tokens: number; + completion_tokens_details: { + accepted_prediction_tokens: number; + audio_tokens: number; + reasoning_tokens: number; + rejected_prediction_tokens: number; + }; + prompt_tokens_details: { + audio_tokens: number; + cached_tokens: number; + }; + }; +} + +/** + * Represents a stream of events from OpenAI APIs + */ +export interface OpenAIStream extends AsyncIterable { + [Symbol.asyncIterator](): AsyncIterator; +} + /** * OpenAI Integration interface for type safety */ diff --git a/packages/core/src/utils/openai/utils.ts b/packages/core/src/utils/openai/utils.ts index b7d5e12ecf62..a1b2074f9af0 100644 --- a/packages/core/src/utils/openai/utils.ts +++ b/packages/core/src/utils/openai/utils.ts @@ -1,4 +1,17 @@ -import { OPENAI_OPERATIONS } from '../gen-ai-attributes'; +import type { Span } from '../../types-hoist/span'; +import { + GEN_AI_RESPONSE_ID_ATTRIBUTE, + GEN_AI_RESPONSE_MODEL_ATTRIBUTE, + GEN_AI_USAGE_INPUT_TOKENS_ATTRIBUTE, + GEN_AI_USAGE_OUTPUT_TOKENS_ATTRIBUTE, + GEN_AI_USAGE_TOTAL_TOKENS_ATTRIBUTE, + OPENAI_OPERATIONS, + OPENAI_RESPONSE_ID_ATTRIBUTE, + OPENAI_RESPONSE_MODEL_ATTRIBUTE, + OPENAI_RESPONSE_TIMESTAMP_ATTRIBUTE, + OPENAI_USAGE_COMPLETION_TOKENS_ATTRIBUTE, + OPENAI_USAGE_PROMPT_TOKENS_ATTRIBUTE, +} from '../gen-ai-attributes'; import { INSTRUMENTED_METHODS } from './constants'; import type { InstrumentedMethod, OpenAiChatCompletionObject, OpenAIResponseObject } from './types'; @@ -61,3 +74,99 @@ export function isResponsesApiResponse(response: unknown): response is OpenAIRes (response as Record).object === 'response' ); } + +/** + * Check if a result is a stream (AsyncIterable) + */ +export function isStream(result: unknown): result is AsyncIterable { + return ( + result !== null && + typeof result === 'object' && + Symbol.asyncIterator in result && + typeof (result as Record)[Symbol.asyncIterator] === 'function' + ); +} + +/** + * Check if streaming event is from the Responses API + */ +export function isResponsesApiStreamEvent(event: unknown): boolean { + return ( + event !== null && + typeof event === 'object' && + 'type' in event && + typeof (event as Record).type === 'string' && + ((event as Record).type as string).startsWith('response.') + ); +} + +/** + * Check if streaming event is a chat completion chunk + */ +export function isChatCompletionChunk(event: unknown): boolean { + return ( + event !== null && + typeof event === 'object' && + 'object' in event && + (event as Record).object === 'chat.completion.chunk' + ); +} + +/** + * Set token usage attributes + * @param span - The span to add attributes to + * @param promptTokens - The number of prompt tokens + * @param completionTokens - The number of completion tokens + * @param totalTokens - The number of total tokens + */ +export function setTokenUsageAttributes( + span: Span, + promptTokens?: number, + completionTokens?: number, + totalTokens?: number, +): void { + if (promptTokens !== undefined) { + span.setAttributes({ + [OPENAI_USAGE_PROMPT_TOKENS_ATTRIBUTE]: promptTokens, + [GEN_AI_USAGE_INPUT_TOKENS_ATTRIBUTE]: promptTokens, + }); + } + if (completionTokens !== undefined) { + span.setAttributes({ + [OPENAI_USAGE_COMPLETION_TOKENS_ATTRIBUTE]: completionTokens, + [GEN_AI_USAGE_OUTPUT_TOKENS_ATTRIBUTE]: completionTokens, + }); + } + if (totalTokens !== undefined) { + span.setAttributes({ + [GEN_AI_USAGE_TOTAL_TOKENS_ATTRIBUTE]: totalTokens, + }); + } +} + +/** + * Set common response attributes + * @param span - The span to add attributes to + * @param id - The response id + * @param model - The response model + * @param timestamp - The response timestamp + */ +export function setCommonResponseAttributes(span: Span, id?: string, model?: string, timestamp?: number): void { + if (id) { + span.setAttributes({ + [OPENAI_RESPONSE_ID_ATTRIBUTE]: id, + [GEN_AI_RESPONSE_ID_ATTRIBUTE]: id, + }); + } + if (model) { + span.setAttributes({ + [OPENAI_RESPONSE_MODEL_ATTRIBUTE]: model, + [GEN_AI_RESPONSE_MODEL_ATTRIBUTE]: model, + }); + } + if (timestamp) { + span.setAttributes({ + [OPENAI_RESPONSE_TIMESTAMP_ATTRIBUTE]: new Date(timestamp * 1000).toISOString(), + }); + } +} diff --git a/packages/core/test/lib/utils/openai-utils.test.ts b/packages/core/test/lib/utils/openai-utils.test.ts index bcff545627ed..3ffa865ad2bf 100644 --- a/packages/core/test/lib/utils/openai-utils.test.ts +++ b/packages/core/test/lib/utils/openai-utils.test.ts @@ -3,8 +3,11 @@ import { buildMethodPath, getOperationName, getSpanOperation, + isChatCompletionChunk, isChatCompletionResponse, isResponsesApiResponse, + isResponsesApiStreamEvent, + isStream, shouldInstrument, } from '../../../src/utils/openai/utils'; @@ -101,4 +104,68 @@ describe('openai-utils', () => { expect(isResponsesApiResponse({ object: null })).toBe(false); }); }); + + describe('isStream', () => { + it('should return true for AsyncIterable objects', () => { + const validStream = { + async *[Symbol.asyncIterator]() { + yield 'test'; + }, + }; + expect(isStream(validStream)).toBe(true); + }); + + it('should return false for non-stream objects', () => { + expect(isStream(null)).toBe(false); + expect(isStream(undefined)).toBe(false); + expect(isStream('string')).toBe(false); + expect(isStream(123)).toBe(false); + expect(isStream({})).toBe(false); + expect(isStream({ [Symbol.asyncIterator]: 'not-a-function' })).toBe(false); + expect(isStream([])).toBe(false); + }); + }); + + describe('isResponsesApiStreamEvent', () => { + it('should return true for valid responses API stream events', () => { + expect(isResponsesApiStreamEvent({ type: 'response.created' })).toBe(true); + expect(isResponsesApiStreamEvent({ type: 'response.in_progress' })).toBe(true); + expect(isResponsesApiStreamEvent({ type: 'response.completed' })).toBe(true); + expect(isResponsesApiStreamEvent({ type: 'response.failed' })).toBe(true); + expect(isResponsesApiStreamEvent({ type: 'response.output_text.delta' })).toBe(true); + }); + + it('should return false for non-response events', () => { + expect(isResponsesApiStreamEvent(null)).toBe(false); + expect(isResponsesApiStreamEvent(undefined)).toBe(false); + expect(isResponsesApiStreamEvent('string')).toBe(false); + expect(isResponsesApiStreamEvent(123)).toBe(false); + expect(isResponsesApiStreamEvent({})).toBe(false); + expect(isResponsesApiStreamEvent({ type: 'chat.completion' })).toBe(false); + expect(isResponsesApiStreamEvent({ type: null })).toBe(false); + expect(isResponsesApiStreamEvent({ type: 123 })).toBe(false); + }); + }); + + describe('isChatCompletionChunk', () => { + it('should return true for valid chat completion chunks', () => { + const validChunk = { + object: 'chat.completion.chunk', + id: 'chatcmpl-123', + model: 'gpt-4', + choices: [], + }; + expect(isChatCompletionChunk(validChunk)).toBe(true); + }); + + it('should return false for invalid chunks', () => { + expect(isChatCompletionChunk(null)).toBe(false); + expect(isChatCompletionChunk(undefined)).toBe(false); + expect(isChatCompletionChunk('string')).toBe(false); + expect(isChatCompletionChunk(123)).toBe(false); + expect(isChatCompletionChunk({})).toBe(false); + expect(isChatCompletionChunk({ object: 'chat.completion' })).toBe(false); + expect(isChatCompletionChunk({ object: null })).toBe(false); + }); + }); }); diff --git a/packages/node/test/integrations/tracing/openai/openai.test.ts b/packages/node/test/integrations/tracing/openai/openai.test.ts new file mode 100644 index 000000000000..8c8633fb8258 --- /dev/null +++ b/packages/node/test/integrations/tracing/openai/openai.test.ts @@ -0,0 +1,352 @@ +import type { OpenAiClient } from '@sentry/core'; +import { instrumentOpenAiClient } from '@sentry/core'; +import { beforeEach, describe, expect, it, vi } from 'vitest'; + +// Mock OpenAI client structures +const mockStreamResponse = { + async *[Symbol.asyncIterator]() { + // Chat completion chunks + yield { + id: 'chatcmpl-123', + object: 'chat.completion.chunk', + created: 1677652288, + model: 'gpt-4', + choices: [ + { + index: 0, + delta: { content: 'Hello' }, + finish_reason: null, + }, + ], + }; + yield { + id: 'chatcmpl-123', + object: 'chat.completion.chunk', + created: 1677652288, + model: 'gpt-4', + choices: [ + { + index: 0, + delta: { content: ' world!' }, + finish_reason: null, + }, + ], + }; + yield { + id: 'chatcmpl-123', + object: 'chat.completion.chunk', + created: 1677652288, + model: 'gpt-4', + choices: [ + { + index: 0, + delta: {}, + finish_reason: 'stop', + }, + ], + usage: { + prompt_tokens: 10, + completion_tokens: 5, + total_tokens: 15, + }, + }; + }, +}; + +const mockResponsesApiStream = { + async *[Symbol.asyncIterator]() { + // Responses API events + yield { + type: 'response.created', + response: { + id: 'resp_123', + object: 'response', + created_at: 1677652288, + model: 'gpt-4', + status: 'in_progress', + }, + }; + yield { + type: 'response.output_text.delta', + delta: { text: 'Hello' }, + }; + yield { + type: 'response.output_text.delta', + delta: { text: ' world!' }, + }; + yield { + type: 'response.completed', + response: { + id: 'resp_123', + object: 'response', + created_at: 1677652288, + model: 'gpt-4', + status: 'completed', + usage: { + input_tokens: 10, + output_tokens: 5, + total_tokens: 15, + }, + }, + }; + }, +}; + +describe('OpenAI Streaming Integration', () => { + let mockClient: OpenAiClient; + let startSpanMock: any; + let setAttributesMock: any; + + beforeEach(() => { + setAttributesMock = vi.fn(); + startSpanMock = vi.fn((options, callback) => { + const span = { setAttributes: setAttributesMock }; + return callback(span); + }); + + // Mock Sentry's startSpan + vi.mock('@sentry/core', async () => { + const actual = await vi.importActual('@sentry/core'); + return { + ...actual, + startSpan: startSpanMock, + getCurrentScope: () => ({ + getClient: () => ({ + getIntegrationByName: () => ({ options: { recordInputs: true, recordOutputs: true } }), + getOptions: () => ({ sendDefaultPii: true }), + }), + }), + }; + }); + + // Create mock OpenAI client + mockClient = { + chat: { + completions: { + create: vi.fn(), + }, + }, + responses: { + create: vi.fn(), + }, + }; + }); + + describe('Chat Completion Streaming', () => { + it('should instrument streaming chat completions', async () => { + mockClient.chat.completions.create.mockResolvedValue(mockStreamResponse); + const instrumentedClient = instrumentOpenAiClient(mockClient); + + const params = { + model: 'gpt-4', + messages: [{ role: 'user', content: 'Say hello' }], + stream: true, + }; + + const stream = await instrumentedClient.chat.completions.create(params); + + // Consume the stream + const chunks = []; + for await (const chunk of stream) { + chunks.push(chunk); + } + + expect(chunks).toHaveLength(3); + expect(startSpanMock).toHaveBeenCalledWith( + expect.objectContaining({ + name: 'chat gpt-4', + op: 'gen_ai.chat', + attributes: expect.objectContaining({ + 'gen_ai.system': 'openai', + 'gen_ai.operation.name': 'chat', + 'gen_ai.request.model': 'gpt-4', + 'gen_ai.request.stream': true, + }), + }), + expect.any(Function), + ); + + // Check that attributes were set after streaming + expect(setAttributesMock).toHaveBeenCalledWith( + expect.objectContaining({ + 'openai.response.id': 'chatcmpl-123', + 'gen_ai.response.id': 'chatcmpl-123', + 'openai.response.model': 'gpt-4', + 'gen_ai.response.model': 'gpt-4', + }), + ); + + expect(setAttributesMock).toHaveBeenCalledWith( + expect.objectContaining({ + 'openai.usage.prompt_tokens': 10, + 'gen_ai.usage.input_tokens': 10, + 'openai.usage.completion_tokens': 5, + 'gen_ai.usage.output_tokens': 5, + 'gen_ai.usage.total_tokens': 15, + }), + ); + + expect(setAttributesMock).toHaveBeenCalledWith( + expect.objectContaining({ + 'gen_ai.response.finish_reasons': JSON.stringify(['stop']), + 'gen_ai.response.text': JSON.stringify(['Hello', ' world!']), + }), + ); + }); + }); + + describe('Responses API Streaming', () => { + it('should instrument streaming responses API', async () => { + mockClient.responses.create.mockResolvedValue(mockResponsesApiStream); + const instrumentedClient = instrumentOpenAiClient(mockClient); + + const params = { + model: 'gpt-4', + input: [{ role: 'user', content: 'Say hello' }], + stream: true, + }; + + const stream = await instrumentedClient.responses.create(params); + + // Consume the stream + const events = []; + for await (const event of stream) { + events.push(event); + } + + expect(events).toHaveLength(4); + expect(startSpanMock).toHaveBeenCalledWith( + expect.objectContaining({ + name: 'chat gpt-4', + op: 'gen_ai.chat', + attributes: expect.objectContaining({ + 'gen_ai.system': 'openai', + 'gen_ai.operation.name': 'chat', + 'gen_ai.request.model': 'gpt-4', + 'gen_ai.request.stream': true, + }), + }), + expect.any(Function), + ); + + // Check that attributes were set after streaming + expect(setAttributesMock).toHaveBeenCalledWith( + expect.objectContaining({ + 'openai.response.id': 'resp_123', + 'gen_ai.response.id': 'resp_123', + 'openai.response.model': 'gpt-4', + 'gen_ai.response.model': 'gpt-4', + }), + ); + + expect(setAttributesMock).toHaveBeenCalledWith( + expect.objectContaining({ + 'openai.usage.prompt_tokens': 10, + 'gen_ai.usage.input_tokens': 10, + 'openai.usage.completion_tokens': 5, + 'gen_ai.usage.output_tokens': 5, + 'gen_ai.usage.total_tokens': 15, + }), + ); + + expect(setAttributesMock).toHaveBeenCalledWith( + expect.objectContaining({ + 'gen_ai.response.finish_reasons': JSON.stringify(['completed']), + 'gen_ai.response.text': JSON.stringify(['Hello', ' world!']), + }), + ); + }); + }); + + describe('Non-streaming Responses', () => { + it('should still handle non-streaming responses', async () => { + const nonStreamResponse = { + id: 'chatcmpl-456', + object: 'chat.completion', + created: 1677652288, + model: 'gpt-4', + choices: [ + { + index: 0, + message: { role: 'assistant', content: 'Hello world!' }, + finish_reason: 'stop', + }, + ], + usage: { + prompt_tokens: 10, + completion_tokens: 5, + total_tokens: 15, + }, + }; + + mockClient.chat.completions.create.mockResolvedValue(nonStreamResponse); + const instrumentedClient = instrumentOpenAiClient(mockClient); + + const params = { + model: 'gpt-4', + messages: [{ role: 'user', content: 'Say hello' }], + stream: false, + }; + + const response = await instrumentedClient.chat.completions.create(params); + + expect(response).toEqual(nonStreamResponse); + expect(startSpanMock).toHaveBeenCalledWith( + expect.objectContaining({ + attributes: expect.objectContaining({ + 'gen_ai.request.stream': false, + }), + }), + expect.any(Function), + ); + }); + }); + + describe('Error Handling', () => { + it('should handle errors in streaming', async () => { + const errorStream = { + async *[Symbol.asyncIterator]() { + yield { + id: 'chatcmpl-789', + object: 'chat.completion.chunk', + created: 1677652288, + model: 'gpt-4', + choices: [ + { + index: 0, + delta: { content: 'Error' }, + finish_reason: null, + }, + ], + }; + throw new Error('Stream error'); + }, + }; + + mockClient.chat.completions.create.mockResolvedValue(errorStream); + const instrumentedClient = instrumentOpenAiClient(mockClient); + + const params = { + model: 'gpt-4', + messages: [{ role: 'user', content: 'Cause error' }], + stream: true, + }; + + const stream = await instrumentedClient.chat.completions.create(params); + + await expect(async () => { + // eslint-disable-next-line @typescript-eslint/no-unused-vars + for await (const chunk of stream) { + // Consume stream + } + }).rejects.toThrow('Stream error'); + + // Even with error, collected data should be saved + expect(setAttributesMock).toHaveBeenCalledWith( + expect.objectContaining({ + 'gen_ai.response.text': JSON.stringify(['Error']), + }), + ); + }); + }); +}); From b8bd91c23808a5007b955a15590852f6b8241a8d Mon Sep 17 00:00:00 2001 From: RulaKhaled Date: Mon, 21 Jul 2025 14:18:34 +0200 Subject: [PATCH 02/11] manually end the span --- packages/core/src/utils/openai/index.ts | 66 +++++++++++++-------- packages/core/src/utils/openai/streaming.ts | 10 +++- 2 files changed, 50 insertions(+), 26 deletions(-) diff --git a/packages/core/src/utils/openai/index.ts b/packages/core/src/utils/openai/index.ts index bf3fd1416787..a92de8258def 100644 --- a/packages/core/src/utils/openai/index.ts +++ b/packages/core/src/utils/openai/index.ts @@ -1,6 +1,6 @@ import { getCurrentScope } from '../../currentScopes'; import { captureException } from '../../exports'; -import { startSpan } from '../../tracing/trace'; +import { startSpan, startSpanManual } from '../../tracing/trace'; import type { Span, SpanAttributeValue } from '../../types-hoist/span'; import { GEN_AI_OPERATION_NAME_ATTRIBUTE, @@ -178,37 +178,55 @@ function instrumentMethod( const model = (requestAttributes[GEN_AI_REQUEST_MODEL_ATTRIBUTE] as string) || 'unknown'; const operationName = getOperationName(methodPath); - return startSpan( - { - name: `${operationName} ${model}`, - op: getSpanOperation(methodPath), - attributes: requestAttributes as Record, - }, - async (span: Span) => { - try { - if (finalOptions.recordInputs && args[0] && typeof args[0] === 'object') { - addRequestAttributes(span, args[0] as Record); - } + const result = await originalMethod.apply(context, args); - const result = await originalMethod.apply(context, args); + if (isStream(result)) { + return startSpanManual( + { + name: `${operationName} ${model}`, + op: getSpanOperation(methodPath), + attributes: requestAttributes as Record, + }, + (span: Span, finish: () => void) => { + try { + if (finalOptions.recordInputs && args[0] && typeof args[0] === 'object') { + addRequestAttributes(span, args[0] as Record); + } - if (isStream(result)) { return instrumentStream( result as OpenAIStream, span, finalOptions.recordOutputs ?? false, + finish, ) as unknown as R; + } catch (error) { + captureException(error); + finish(); + throw error; } - - // Handle non-streaming responses - addResponseAttributes(span, result, finalOptions.recordOutputs); - return result; - } catch (error) { - captureException(error); - throw error; - } - }, - ); + }, + ); + } else { + return startSpan( + { + name: `${operationName} ${model}`, + op: getSpanOperation(methodPath), + attributes: requestAttributes as Record, + }, + async (span: Span) => { + try { + if (finalOptions.recordInputs && args[0] && typeof args[0] === 'object') { + addRequestAttributes(span, args[0] as Record); + } + addResponseAttributes(span, result, finalOptions.recordOutputs); + return result; + } catch (error) { + captureException(error); + throw error; + } + }, + ); + } }; } diff --git a/packages/core/src/utils/openai/streaming.ts b/packages/core/src/utils/openai/streaming.ts index dbefe9dc9a82..7e28f9389e4a 100644 --- a/packages/core/src/utils/openai/streaming.ts +++ b/packages/core/src/utils/openai/streaming.ts @@ -95,12 +95,14 @@ function processResponsesApiEvent( * @param stream - The stream of events to instrument * @param span - The span to add attributes to * @param recordOutputs - Whether to record outputs + * @param finishSpan - Optional function to finish the span manually * @returns A generator that yields the events */ export async function* instrumentStream( stream: AsyncIterable, span: Span, recordOutputs: boolean, + finishSpan?: () => void, ): AsyncGenerator { const state: StreamingState = { eventTypes: [], @@ -123,14 +125,18 @@ export async function* instrumentStream( if (state.finishReasons.length) { span.setAttributes({ - [GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE]: JSON.stringify(state.finishReasons), + [GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE]: state.finishReasons.join(','), }); } if (recordOutputs && state.responseTexts.length) { span.setAttributes({ - [GEN_AI_RESPONSE_TEXT_ATTRIBUTE]: JSON.stringify(state.responseTexts), + [GEN_AI_RESPONSE_TEXT_ATTRIBUTE]: state.responseTexts.join(''), }); } } + + if (finishSpan) { + finishSpan(); + } } From 0a8ea4f8e560396325a2ff9f810b024f95a14bb5 Mon Sep 17 00:00:00 2001 From: Rola Abuhasna Date: Mon, 21 Jul 2025 16:16:40 +0200 Subject: [PATCH 03/11] Delete packages/node/test/integrations/tracing/openai/openai.test.ts --- .../tracing/openai/openai.test.ts | 352 ------------------ 1 file changed, 352 deletions(-) delete mode 100644 packages/node/test/integrations/tracing/openai/openai.test.ts diff --git a/packages/node/test/integrations/tracing/openai/openai.test.ts b/packages/node/test/integrations/tracing/openai/openai.test.ts deleted file mode 100644 index 8c8633fb8258..000000000000 --- a/packages/node/test/integrations/tracing/openai/openai.test.ts +++ /dev/null @@ -1,352 +0,0 @@ -import type { OpenAiClient } from '@sentry/core'; -import { instrumentOpenAiClient } from '@sentry/core'; -import { beforeEach, describe, expect, it, vi } from 'vitest'; - -// Mock OpenAI client structures -const mockStreamResponse = { - async *[Symbol.asyncIterator]() { - // Chat completion chunks - yield { - id: 'chatcmpl-123', - object: 'chat.completion.chunk', - created: 1677652288, - model: 'gpt-4', - choices: [ - { - index: 0, - delta: { content: 'Hello' }, - finish_reason: null, - }, - ], - }; - yield { - id: 'chatcmpl-123', - object: 'chat.completion.chunk', - created: 1677652288, - model: 'gpt-4', - choices: [ - { - index: 0, - delta: { content: ' world!' }, - finish_reason: null, - }, - ], - }; - yield { - id: 'chatcmpl-123', - object: 'chat.completion.chunk', - created: 1677652288, - model: 'gpt-4', - choices: [ - { - index: 0, - delta: {}, - finish_reason: 'stop', - }, - ], - usage: { - prompt_tokens: 10, - completion_tokens: 5, - total_tokens: 15, - }, - }; - }, -}; - -const mockResponsesApiStream = { - async *[Symbol.asyncIterator]() { - // Responses API events - yield { - type: 'response.created', - response: { - id: 'resp_123', - object: 'response', - created_at: 1677652288, - model: 'gpt-4', - status: 'in_progress', - }, - }; - yield { - type: 'response.output_text.delta', - delta: { text: 'Hello' }, - }; - yield { - type: 'response.output_text.delta', - delta: { text: ' world!' }, - }; - yield { - type: 'response.completed', - response: { - id: 'resp_123', - object: 'response', - created_at: 1677652288, - model: 'gpt-4', - status: 'completed', - usage: { - input_tokens: 10, - output_tokens: 5, - total_tokens: 15, - }, - }, - }; - }, -}; - -describe('OpenAI Streaming Integration', () => { - let mockClient: OpenAiClient; - let startSpanMock: any; - let setAttributesMock: any; - - beforeEach(() => { - setAttributesMock = vi.fn(); - startSpanMock = vi.fn((options, callback) => { - const span = { setAttributes: setAttributesMock }; - return callback(span); - }); - - // Mock Sentry's startSpan - vi.mock('@sentry/core', async () => { - const actual = await vi.importActual('@sentry/core'); - return { - ...actual, - startSpan: startSpanMock, - getCurrentScope: () => ({ - getClient: () => ({ - getIntegrationByName: () => ({ options: { recordInputs: true, recordOutputs: true } }), - getOptions: () => ({ sendDefaultPii: true }), - }), - }), - }; - }); - - // Create mock OpenAI client - mockClient = { - chat: { - completions: { - create: vi.fn(), - }, - }, - responses: { - create: vi.fn(), - }, - }; - }); - - describe('Chat Completion Streaming', () => { - it('should instrument streaming chat completions', async () => { - mockClient.chat.completions.create.mockResolvedValue(mockStreamResponse); - const instrumentedClient = instrumentOpenAiClient(mockClient); - - const params = { - model: 'gpt-4', - messages: [{ role: 'user', content: 'Say hello' }], - stream: true, - }; - - const stream = await instrumentedClient.chat.completions.create(params); - - // Consume the stream - const chunks = []; - for await (const chunk of stream) { - chunks.push(chunk); - } - - expect(chunks).toHaveLength(3); - expect(startSpanMock).toHaveBeenCalledWith( - expect.objectContaining({ - name: 'chat gpt-4', - op: 'gen_ai.chat', - attributes: expect.objectContaining({ - 'gen_ai.system': 'openai', - 'gen_ai.operation.name': 'chat', - 'gen_ai.request.model': 'gpt-4', - 'gen_ai.request.stream': true, - }), - }), - expect.any(Function), - ); - - // Check that attributes were set after streaming - expect(setAttributesMock).toHaveBeenCalledWith( - expect.objectContaining({ - 'openai.response.id': 'chatcmpl-123', - 'gen_ai.response.id': 'chatcmpl-123', - 'openai.response.model': 'gpt-4', - 'gen_ai.response.model': 'gpt-4', - }), - ); - - expect(setAttributesMock).toHaveBeenCalledWith( - expect.objectContaining({ - 'openai.usage.prompt_tokens': 10, - 'gen_ai.usage.input_tokens': 10, - 'openai.usage.completion_tokens': 5, - 'gen_ai.usage.output_tokens': 5, - 'gen_ai.usage.total_tokens': 15, - }), - ); - - expect(setAttributesMock).toHaveBeenCalledWith( - expect.objectContaining({ - 'gen_ai.response.finish_reasons': JSON.stringify(['stop']), - 'gen_ai.response.text': JSON.stringify(['Hello', ' world!']), - }), - ); - }); - }); - - describe('Responses API Streaming', () => { - it('should instrument streaming responses API', async () => { - mockClient.responses.create.mockResolvedValue(mockResponsesApiStream); - const instrumentedClient = instrumentOpenAiClient(mockClient); - - const params = { - model: 'gpt-4', - input: [{ role: 'user', content: 'Say hello' }], - stream: true, - }; - - const stream = await instrumentedClient.responses.create(params); - - // Consume the stream - const events = []; - for await (const event of stream) { - events.push(event); - } - - expect(events).toHaveLength(4); - expect(startSpanMock).toHaveBeenCalledWith( - expect.objectContaining({ - name: 'chat gpt-4', - op: 'gen_ai.chat', - attributes: expect.objectContaining({ - 'gen_ai.system': 'openai', - 'gen_ai.operation.name': 'chat', - 'gen_ai.request.model': 'gpt-4', - 'gen_ai.request.stream': true, - }), - }), - expect.any(Function), - ); - - // Check that attributes were set after streaming - expect(setAttributesMock).toHaveBeenCalledWith( - expect.objectContaining({ - 'openai.response.id': 'resp_123', - 'gen_ai.response.id': 'resp_123', - 'openai.response.model': 'gpt-4', - 'gen_ai.response.model': 'gpt-4', - }), - ); - - expect(setAttributesMock).toHaveBeenCalledWith( - expect.objectContaining({ - 'openai.usage.prompt_tokens': 10, - 'gen_ai.usage.input_tokens': 10, - 'openai.usage.completion_tokens': 5, - 'gen_ai.usage.output_tokens': 5, - 'gen_ai.usage.total_tokens': 15, - }), - ); - - expect(setAttributesMock).toHaveBeenCalledWith( - expect.objectContaining({ - 'gen_ai.response.finish_reasons': JSON.stringify(['completed']), - 'gen_ai.response.text': JSON.stringify(['Hello', ' world!']), - }), - ); - }); - }); - - describe('Non-streaming Responses', () => { - it('should still handle non-streaming responses', async () => { - const nonStreamResponse = { - id: 'chatcmpl-456', - object: 'chat.completion', - created: 1677652288, - model: 'gpt-4', - choices: [ - { - index: 0, - message: { role: 'assistant', content: 'Hello world!' }, - finish_reason: 'stop', - }, - ], - usage: { - prompt_tokens: 10, - completion_tokens: 5, - total_tokens: 15, - }, - }; - - mockClient.chat.completions.create.mockResolvedValue(nonStreamResponse); - const instrumentedClient = instrumentOpenAiClient(mockClient); - - const params = { - model: 'gpt-4', - messages: [{ role: 'user', content: 'Say hello' }], - stream: false, - }; - - const response = await instrumentedClient.chat.completions.create(params); - - expect(response).toEqual(nonStreamResponse); - expect(startSpanMock).toHaveBeenCalledWith( - expect.objectContaining({ - attributes: expect.objectContaining({ - 'gen_ai.request.stream': false, - }), - }), - expect.any(Function), - ); - }); - }); - - describe('Error Handling', () => { - it('should handle errors in streaming', async () => { - const errorStream = { - async *[Symbol.asyncIterator]() { - yield { - id: 'chatcmpl-789', - object: 'chat.completion.chunk', - created: 1677652288, - model: 'gpt-4', - choices: [ - { - index: 0, - delta: { content: 'Error' }, - finish_reason: null, - }, - ], - }; - throw new Error('Stream error'); - }, - }; - - mockClient.chat.completions.create.mockResolvedValue(errorStream); - const instrumentedClient = instrumentOpenAiClient(mockClient); - - const params = { - model: 'gpt-4', - messages: [{ role: 'user', content: 'Cause error' }], - stream: true, - }; - - const stream = await instrumentedClient.chat.completions.create(params); - - await expect(async () => { - // eslint-disable-next-line @typescript-eslint/no-unused-vars - for await (const chunk of stream) { - // Consume stream - } - }).rejects.toThrow('Stream error'); - - // Even with error, collected data should be saved - expect(setAttributesMock).toHaveBeenCalledWith( - expect.objectContaining({ - 'gen_ai.response.text': JSON.stringify(['Error']), - }), - ); - }); - }); -}); From bb6b91d4f14a401d0dd9c061ff309488682f178b Mon Sep 17 00:00:00 2001 From: RulaKhaled Date: Mon, 21 Jul 2025 16:18:18 +0200 Subject: [PATCH 04/11] more refactoring --- packages/core/src/utils/openai/index.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/core/src/utils/openai/index.ts b/packages/core/src/utils/openai/index.ts index a92de8258def..e7c6501d75b6 100644 --- a/packages/core/src/utils/openai/index.ts +++ b/packages/core/src/utils/openai/index.ts @@ -183,7 +183,7 @@ function instrumentMethod( if (isStream(result)) { return startSpanManual( { - name: `${operationName} ${model}`, + name: `${operationName} ${model} stream-response`, op: getSpanOperation(methodPath), attributes: requestAttributes as Record, }, @@ -199,6 +199,7 @@ function instrumentMethod( finalOptions.recordOutputs ?? false, finish, ) as unknown as R; + } catch (error) { captureException(error); finish(); @@ -207,6 +208,7 @@ function instrumentMethod( }, ); } else { + // Non-streaming responses return startSpan( { name: `${operationName} ${model}`, From d1ebaaab655a8c2fbd14d9151aaaaf203d13e618 Mon Sep 17 00:00:00 2001 From: RulaKhaled Date: Wed, 23 Jul 2025 11:39:53 +0200 Subject: [PATCH 05/11] Add tests --- .../suites/tracing/openai/scenario.mjs | 212 ++++++++++++++++++ .../suites/tracing/openai/test.ts | 155 ++++++++++++- packages/core/src/utils/openai/constants.ts | 10 +- packages/core/src/utils/openai/index.ts | 19 +- packages/core/src/utils/openai/streaming.ts | 7 +- packages/core/src/utils/openai/utils.ts | 12 - .../core/test/lib/utils/openai-utils.test.ts | 22 -- 7 files changed, 386 insertions(+), 51 deletions(-) diff --git a/dev-packages/node-integration-tests/suites/tracing/openai/scenario.mjs b/dev-packages/node-integration-tests/suites/tracing/openai/scenario.mjs index 3958517bea40..997f6ca47c6e 100644 --- a/dev-packages/node-integration-tests/suites/tracing/openai/scenario.mjs +++ b/dev-packages/node-integration-tests/suites/tracing/openai/scenario.mjs @@ -18,6 +18,11 @@ class MockOpenAI { throw error; } + // If stream is requested, return an async generator + if (params.stream) { + return this._createChatCompletionStream(params); + } + return { id: 'chatcmpl-mock123', object: 'chat.completion', @@ -48,6 +53,11 @@ class MockOpenAI { create: async params => { await new Promise(resolve => setTimeout(resolve, 10)); + // If stream is requested, return an async generator + if (params.stream) { + return this._createResponsesApiStream(params); + } + return { id: 'resp_mock456', object: 'response', @@ -65,6 +75,163 @@ class MockOpenAI { }, }; } + + // Create a mock streaming response for chat completions + async *_createChatCompletionStream(params) { + // First chunk with basic info + yield { + id: 'chatcmpl-stream-123', + object: 'chat.completion.chunk', + created: 1677652300, + model: params.model, + system_fingerprint: 'fp_stream_123', + choices: [ + { + index: 0, + delta: { + role: 'assistant', + content: 'Hello', + }, + finish_reason: null, + }, + ], + }; + + // Second chunk with more content + yield { + id: 'chatcmpl-stream-123', + object: 'chat.completion.chunk', + created: 1677652300, + model: params.model, + system_fingerprint: 'fp_stream_123', + choices: [ + { + index: 0, + delta: { + content: ' from OpenAI streaming!', + }, + finish_reason: 'stop', + }, + ], + usage: { + prompt_tokens: 12, + completion_tokens: 18, + total_tokens: 30, + completion_tokens_details: { + accepted_prediction_tokens: 0, + audio_tokens: 0, + reasoning_tokens: 0, + rejected_prediction_tokens: 0, + }, + prompt_tokens_details: { + audio_tokens: 0, + cached_tokens: 0, + }, + }, + }; + } + + // Create a mock streaming response for responses API + async *_createResponsesApiStream(params) { + // Response created event + yield { + type: 'response.created', + response: { + id: 'resp_stream_456', + object: 'response', + created_at: 1677652310, + model: params.model, + status: 'in_progress', + error: null, + incomplete_details: null, + instructions: params.instructions, + max_output_tokens: 1000, + parallel_tool_calls: false, + previous_response_id: null, + reasoning: { + effort: null, + summary: null, + }, + store: false, + temperature: 0.7, + text: { + format: { + type: 'text', + }, + }, + tool_choice: 'auto', + tools: [], + top_p: 1.0, + truncation: 'disabled', + user: null, + metadata: {}, + output: [], + output_text: '', + usage: { + input_tokens: 0, + output_tokens: 0, + total_tokens: 0, + }, + }, + sequence_number: 1, + }; + + // Response in progress with output text delta + yield { + type: 'response.output_text.delta', + delta: 'Streaming response to: ', + sequence_number: 2, + }; + + yield { + type: 'response.output_text.delta', + delta: params.input, + sequence_number: 3, + }; + + // Response completed event + yield { + type: 'response.completed', + response: { + id: 'resp_stream_456', + object: 'response', + created_at: 1677652310, + model: params.model, + status: 'completed', + error: null, + incomplete_details: null, + instructions: params.instructions, + max_output_tokens: 1000, + parallel_tool_calls: false, + previous_response_id: null, + reasoning: { + effort: null, + summary: null, + }, + store: false, + temperature: 0.7, + text: { + format: { + type: 'text', + }, + }, + tool_choice: 'auto', + tools: [], + top_p: 1.0, + truncation: 'disabled', + user: null, + metadata: {}, + output: [], + output_text: `Streaming response to: ${params.input}`, + usage: { + input_tokens: 6, + output_tokens: 10, + total_tokens: 16, + }, + }, + sequence_number: 4, + }; + } } async function run() { @@ -102,6 +269,51 @@ async function run() { } catch { // Error is expected and handled } + + // Fourth test: chat completions streaming + const stream1 = await client.chat.completions.create({ + model: 'gpt-4', + messages: [ + { role: 'system', content: 'You are a helpful assistant.' }, + { role: 'user', content: 'Tell me about streaming' }, + ], + stream: true, + temperature: 0.8, + }); + + // Consume the stream to trigger span instrumentation + for await (const chunk of stream1) { + // Stream chunks are processed automatically by instrumentation + void chunk; // Prevent unused variable warning + } + + // Fifth test: responses API streaming + const stream2 = await client.responses.create({ + model: 'gpt-4', + input: 'Test streaming responses API', + instructions: 'You are a streaming assistant', + stream: true, + }); + + for await (const chunk of stream2) { + void chunk; + } + + // Sixth test: error handling in streaming context + try { + const errorStream = await client.chat.completions.create({ + model: 'error-model', + messages: [{ role: 'user', content: 'This will fail' }], + stream: true, + }); + + // Try to consume the stream (this should not execute) + for await (const chunk of errorStream) { + void chunk; + } + } catch { + // Error is expected and handled + } }); } diff --git a/dev-packages/node-integration-tests/suites/tracing/openai/test.ts b/dev-packages/node-integration-tests/suites/tracing/openai/test.ts index ec6f97a6aa00..d6c1d4b6f628 100644 --- a/dev-packages/node-integration-tests/suites/tracing/openai/test.ts +++ b/dev-packages/node-integration-tests/suites/tracing/openai/test.ts @@ -72,6 +72,59 @@ describe('OpenAI integration', () => { origin: 'manual', status: 'unknown_error', }), + // Fourth span - chat completions streaming + expect.objectContaining({ + data: { + 'gen_ai.operation.name': 'chat', + 'sentry.op': 'gen_ai.chat', + 'sentry.origin': 'manual', + 'gen_ai.system': 'openai', + 'gen_ai.request.model': 'gpt-4', + 'gen_ai.request.temperature': 0.8, + 'gen_ai.request.stream': true, + 'gen_ai.response.model': 'gpt-4', + 'gen_ai.response.id': 'chatcmpl-stream-123', + 'gen_ai.response.finish_reasons': 'stop', + 'gen_ai.usage.input_tokens': 12, + 'gen_ai.usage.output_tokens': 18, + 'gen_ai.usage.total_tokens': 30, + 'openai.response.id': 'chatcmpl-stream-123', + 'openai.response.model': 'gpt-4', + 'openai.response.timestamp': '2023-03-01T06:31:40.000Z', + 'openai.usage.completion_tokens': 18, + 'openai.usage.prompt_tokens': 12, + }, + description: 'chat gpt-4 stream-response', + op: 'gen_ai.chat', + origin: 'manual', + status: 'ok', + }), + // Fifth span - responses API streaming + expect.objectContaining({ + data: { + 'gen_ai.operation.name': 'chat', + 'sentry.op': 'gen_ai.chat', + 'sentry.origin': 'manual', + 'gen_ai.system': 'openai', + 'gen_ai.request.model': 'gpt-4', + 'gen_ai.request.stream': true, + 'gen_ai.response.model': 'gpt-4', + 'gen_ai.response.id': 'resp_stream_456', + 'gen_ai.response.finish_reasons': 'in_progress', + 'gen_ai.usage.input_tokens': 0, + 'gen_ai.usage.output_tokens': 0, + 'gen_ai.usage.total_tokens': 0, + 'openai.response.id': 'resp_stream_456', + 'openai.response.model': 'gpt-4', + 'openai.response.timestamp': '2023-03-01T06:31:50.000Z', + 'openai.usage.completion_tokens': 0, + 'openai.usage.prompt_tokens': 0, + }, + description: 'chat gpt-4 stream-response', + op: 'gen_ai.chat', + origin: 'manual', + status: 'ok', + }), ]), }; @@ -147,6 +200,82 @@ describe('OpenAI integration', () => { origin: 'manual', status: 'unknown_error', }), + // Fourth span - chat completions streaming with PII + expect.objectContaining({ + data: expect.objectContaining({ + 'gen_ai.operation.name': 'chat', + 'sentry.op': 'gen_ai.chat', + 'sentry.origin': 'manual', + 'gen_ai.system': 'openai', + 'gen_ai.request.model': 'gpt-4', + 'gen_ai.request.temperature': 0.8, + 'gen_ai.request.stream': true, + 'gen_ai.request.messages': expect.stringContaining('Tell me about streaming'), + 'gen_ai.response.text': 'Hello from OpenAI streaming!', + 'gen_ai.response.finish_reasons': 'stop', + 'gen_ai.response.id': 'chatcmpl-stream-123', + 'gen_ai.response.model': 'gpt-4', + 'gen_ai.usage.input_tokens': 12, + 'gen_ai.usage.output_tokens': 18, + 'gen_ai.usage.total_tokens': 30, + 'openai.response.id': 'chatcmpl-stream-123', + 'openai.response.model': 'gpt-4', + 'openai.response.stream': true, + 'openai.response.timestamp': '2023-03-01T06:31:40.000Z', + 'openai.usage.completion_tokens': 18, + 'openai.usage.prompt_tokens': 12, + }), + description: 'chat gpt-4 stream-response', + op: 'gen_ai.chat', + origin: 'manual', + status: 'ok', + }), + // Fifth span - responses API streaming with PII + expect.objectContaining({ + data: expect.objectContaining({ + 'gen_ai.operation.name': 'chat', + 'sentry.op': 'gen_ai.chat', + 'sentry.origin': 'manual', + 'gen_ai.system': 'openai', + 'gen_ai.request.model': 'gpt-4', + 'gen_ai.request.stream': true, + 'gen_ai.request.messages': '"Test streaming responses API"', + 'gen_ai.response.text': expect.stringContaining('Streaming response to: Test streaming responses API'), + 'gen_ai.response.finish_reasons': 'completed', + 'gen_ai.response.id': 'resp_stream_456', + 'gen_ai.response.model': 'gpt-4', + 'gen_ai.usage.input_tokens': 6, + 'gen_ai.usage.output_tokens': 10, + 'gen_ai.usage.total_tokens': 16, + 'openai.response.id': 'resp_stream_456', + 'openai.response.model': 'gpt-4', + 'openai.response.stream': true, + 'openai.response.timestamp': '2023-03-01T06:31:50.000Z', + 'openai.usage.completion_tokens': 10, + 'openai.usage.prompt_tokens': 6, + }), + description: 'chat gpt-4 stream-response', + op: 'gen_ai.chat', + origin: 'manual', + status: 'ok', + }), + // Sixth span - error handling in streaming context with PII + expect.objectContaining({ + data: { + 'gen_ai.operation.name': 'chat', + 'gen_ai.request.model': 'error-model', + 'gen_ai.request.stream': true, + 'gen_ai.request.messages': expect.stringContaining('This will fail'), + 'gen_ai.system': 'openai', + 'openai.response.stream': true, + 'sentry.op': 'gen_ai.chat', + 'sentry.origin': 'manual', + }, + description: 'chat error-model stream-response', + op: 'gen_ai.chat', + origin: 'manual', + status: 'ok', + }), ]), }; @@ -160,24 +289,44 @@ describe('OpenAI integration', () => { 'gen_ai.response.text': expect.any(String), // Should include response text when recordOutputs: true }), }), + // Check that custom options are respected for streaming + expect.objectContaining({ + data: expect.objectContaining({ + 'gen_ai.request.messages': expect.any(String), // Should include messages when recordInputs: true + 'gen_ai.response.text': expect.any(String), // Should include response text when recordOutputs: true + 'gen_ai.request.stream': true, // Should be marked as stream + }), + }), ]), }; createEsmAndCjsTests(__dirname, 'scenario.mjs', 'instrument.mjs', (createRunner, test) => { test('creates openai related spans with sendDefaultPii: false', async () => { - await createRunner().expect({ transaction: EXPECTED_TRANSACTION_DEFAULT_PII_FALSE }).start().completed(); + await createRunner() + .ignore('event') + .expect({ transaction: EXPECTED_TRANSACTION_DEFAULT_PII_FALSE }) + .start() + .completed(); }); }); createEsmAndCjsTests(__dirname, 'scenario.mjs', 'instrument-with-pii.mjs', (createRunner, test) => { test('creates openai related spans with sendDefaultPii: true', async () => { - await createRunner().expect({ transaction: EXPECTED_TRANSACTION_DEFAULT_PII_TRUE }).start().completed(); + await createRunner() + .ignore('event') + .expect({ transaction: EXPECTED_TRANSACTION_DEFAULT_PII_TRUE }) + .start() + .completed(); }); }); createEsmAndCjsTests(__dirname, 'scenario.mjs', 'instrument-with-options.mjs', (createRunner, test) => { test('creates openai related spans with custom options', async () => { - await createRunner().expect({ transaction: EXPECTED_TRANSACTION_WITH_OPTIONS }).start().completed(); + await createRunner() + .ignore('event') + .expect({ transaction: EXPECTED_TRANSACTION_WITH_OPTIONS }) + .start() + .completed(); }); }); }); diff --git a/packages/core/src/utils/openai/constants.ts b/packages/core/src/utils/openai/constants.ts index f818c9e4c48f..462f007b0585 100644 --- a/packages/core/src/utils/openai/constants.ts +++ b/packages/core/src/utils/openai/constants.ts @@ -3,4 +3,12 @@ export const OPENAI_INTEGRATION_NAME = 'OpenAI'; // https://platform.openai.com/docs/quickstart?api-mode=responses // https://platform.openai.com/docs/quickstart?api-mode=chat export const INSTRUMENTED_METHODS = ['responses.create', 'chat.completions.create'] as const; -export const RESPONSE_EVENT_TYPES = ['response.created', 'response.in_progress', 'response.failed', 'response.completed', 'response.incomplete', 'response.queued', 'response.output_text.delta'] as const; +export const RESPONSE_EVENT_TYPES = [ + 'response.created', + 'response.in_progress', + 'response.failed', + 'response.completed', + 'response.incomplete', + 'response.queued', + 'response.output_text.delta', +] as const; diff --git a/packages/core/src/utils/openai/index.ts b/packages/core/src/utils/openai/index.ts index e7c6501d75b6..a7244f58bbdc 100644 --- a/packages/core/src/utils/openai/index.ts +++ b/packages/core/src/utils/openai/index.ts @@ -36,7 +36,6 @@ import { getSpanOperation, isChatCompletionResponse, isResponsesApiResponse, - isStream, setCommonResponseAttributes, setTokenUsageAttributes, shouldInstrument, @@ -178,37 +177,39 @@ function instrumentMethod( const model = (requestAttributes[GEN_AI_REQUEST_MODEL_ATTRIBUTE] as string) || 'unknown'; const operationName = getOperationName(methodPath); - const result = await originalMethod.apply(context, args); + const params = args[0] as Record | undefined; + const isStreamRequested = params && typeof params === 'object' && params.stream === true; - if (isStream(result)) { + if (isStreamRequested) { + // For streaming responses, use manual span management to properly handle the async generator lifecycle return startSpanManual( { name: `${operationName} ${model} stream-response`, op: getSpanOperation(methodPath), attributes: requestAttributes as Record, }, - (span: Span, finish: () => void) => { + async (span: Span) => { try { if (finalOptions.recordInputs && args[0] && typeof args[0] === 'object') { addRequestAttributes(span, args[0] as Record); } + const result = await originalMethod.apply(context, args); + return instrumentStream( result as OpenAIStream, span, finalOptions.recordOutputs ?? false, - finish, ) as unknown as R; - } catch (error) { captureException(error); - finish(); + span.end(); throw error; } }, ); } else { - // Non-streaming responses + // Non-streaming responses return startSpan( { name: `${operationName} ${model}`, @@ -220,6 +221,8 @@ function instrumentMethod( if (finalOptions.recordInputs && args[0] && typeof args[0] === 'object') { addRequestAttributes(span, args[0] as Record); } + + const result = await originalMethod.apply(context, args); addResponseAttributes(span, result, finalOptions.recordOutputs); return result; } catch (error) { diff --git a/packages/core/src/utils/openai/streaming.ts b/packages/core/src/utils/openai/streaming.ts index 7e28f9389e4a..c6f8f3cf0b74 100644 --- a/packages/core/src/utils/openai/streaming.ts +++ b/packages/core/src/utils/openai/streaming.ts @@ -102,7 +102,6 @@ export async function* instrumentStream( stream: AsyncIterable, span: Span, recordOutputs: boolean, - finishSpan?: () => void, ): AsyncGenerator { const state: StreamingState = { eventTypes: [], @@ -125,7 +124,7 @@ export async function* instrumentStream( if (state.finishReasons.length) { span.setAttributes({ - [GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE]: state.finishReasons.join(','), + [GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE]: state.finishReasons[state.finishReasons.length - 1], }); } @@ -134,9 +133,7 @@ export async function* instrumentStream( [GEN_AI_RESPONSE_TEXT_ATTRIBUTE]: state.responseTexts.join(''), }); } - } - if (finishSpan) { - finishSpan(); + span.end(); } } diff --git a/packages/core/src/utils/openai/utils.ts b/packages/core/src/utils/openai/utils.ts index a1b2074f9af0..0032f72d6b2c 100644 --- a/packages/core/src/utils/openai/utils.ts +++ b/packages/core/src/utils/openai/utils.ts @@ -75,18 +75,6 @@ export function isResponsesApiResponse(response: unknown): response is OpenAIRes ); } -/** - * Check if a result is a stream (AsyncIterable) - */ -export function isStream(result: unknown): result is AsyncIterable { - return ( - result !== null && - typeof result === 'object' && - Symbol.asyncIterator in result && - typeof (result as Record)[Symbol.asyncIterator] === 'function' - ); -} - /** * Check if streaming event is from the Responses API */ diff --git a/packages/core/test/lib/utils/openai-utils.test.ts b/packages/core/test/lib/utils/openai-utils.test.ts index 3ffa865ad2bf..76748c2fe473 100644 --- a/packages/core/test/lib/utils/openai-utils.test.ts +++ b/packages/core/test/lib/utils/openai-utils.test.ts @@ -7,7 +7,6 @@ import { isChatCompletionResponse, isResponsesApiResponse, isResponsesApiStreamEvent, - isStream, shouldInstrument, } from '../../../src/utils/openai/utils'; @@ -105,27 +104,6 @@ describe('openai-utils', () => { }); }); - describe('isStream', () => { - it('should return true for AsyncIterable objects', () => { - const validStream = { - async *[Symbol.asyncIterator]() { - yield 'test'; - }, - }; - expect(isStream(validStream)).toBe(true); - }); - - it('should return false for non-stream objects', () => { - expect(isStream(null)).toBe(false); - expect(isStream(undefined)).toBe(false); - expect(isStream('string')).toBe(false); - expect(isStream(123)).toBe(false); - expect(isStream({})).toBe(false); - expect(isStream({ [Symbol.asyncIterator]: 'not-a-function' })).toBe(false); - expect(isStream([])).toBe(false); - }); - }); - describe('isResponsesApiStreamEvent', () => { it('should return true for valid responses API stream events', () => { expect(isResponsesApiStreamEvent({ type: 'response.created' })).toBe(true); From e41d2ad58ae69f367cf52bbca08715ade2e006f9 Mon Sep 17 00:00:00 2001 From: RulaKhaled Date: Wed, 23 Jul 2025 15:23:06 +0200 Subject: [PATCH 06/11] just bump bundle size a bit --- .size-limit.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.size-limit.js b/.size-limit.js index 13b963bacd8d..058f298c02f3 100644 --- a/.size-limit.js +++ b/.size-limit.js @@ -233,7 +233,7 @@ module.exports = [ import: createImport('init'), ignore: [...builtinModules, ...nodePrefixedBuiltinModules], gzip: true, - limit: '144 KB', + limit: '145 KB', }, { name: '@sentry/node - without tracing', From 4fba2477486bdbe1d3e4963503a5e7ce993ea429 Mon Sep 17 00:00:00 2001 From: RulaKhaled Date: Wed, 23 Jul 2025 15:38:01 +0200 Subject: [PATCH 07/11] Revert "try timeout sol" This reverts commit 16073049bd6fa83ead1294041bcad78d1e8b17a9. --- .../vercelai/scenario-express-error.mjs | 81 +++++++++++++++++ .../suites/tracing/vercelai/test.ts | 90 +++++++++++++++++++ 2 files changed, 171 insertions(+) create mode 100644 dev-packages/node-integration-tests/suites/tracing/vercelai/scenario-express-error.mjs diff --git a/dev-packages/node-integration-tests/suites/tracing/vercelai/scenario-express-error.mjs b/dev-packages/node-integration-tests/suites/tracing/vercelai/scenario-express-error.mjs new file mode 100644 index 000000000000..896ceceba6f4 --- /dev/null +++ b/dev-packages/node-integration-tests/suites/tracing/vercelai/scenario-express-error.mjs @@ -0,0 +1,81 @@ +import * as Sentry from '@sentry/node'; +import { generateText, tool } from 'ai'; +import { MockLanguageModelV1 } from 'ai/test'; +import express from 'express'; +import { createServer } from 'http'; +import { z } from 'zod'; + +async function run() { + const app = express(); + + app.get('/api/chat', async (req, res) => { + try { + await generateText({ + model: new MockLanguageModelV1({ + doGenerate: async () => ({ + rawCall: { rawPrompt: null, rawSettings: {} }, + finishReason: 'stop', + usage: { promptTokens: 10, completionTokens: 20 }, + text: 'Processing your request...', + toolCalls: [ + { + toolCallType: 'function', + toolCallId: 'call-1', + toolName: 'calculateTool', + args: '{ "a": 1, "b": 2 }', + }, + ], + }), + }), + experimental_telemetry: { + functionId: 'Chat Assistant', + recordInputs: true, + recordOutputs: true, + isEnabled: true, + }, + tools: { + calculateTool: tool({ + description: 'Calculate the result of a math problem. Returns a number.', + parameters: z.object({ + a: z.number().describe('First number'), + b: z.number().describe('Second number'), + }), + type: 'function', + execute: async () => { + throw new Error('Calculation service unavailable'); + }, + }), + }, + maxSteps: 2, + system: 'You are a helpful chat assistant.', + prompt: 'What is 1 + 1?', + }); + + res.json({ success: true }); + } catch (error) { + res.status(500).json({ error: error.message }); + } + }); + + Sentry.setupExpressErrorHandler(app); + + const server = createServer(app); + + // Start server and make request + server.listen(0, () => { + const port = server.address()?.port; + // eslint-disable-next-line no-console + console.log(JSON.stringify({ port })); + + // Make the request that will trigger the error + fetch(`http://localhost:${port}/api/chat`) + .then(() => { + server.close(); + }) + .catch(() => { + server.close(); + }); + }); +} + +run(); diff --git a/dev-packages/node-integration-tests/suites/tracing/vercelai/test.ts b/dev-packages/node-integration-tests/suites/tracing/vercelai/test.ts index f9b853aa4946..f58cf63cdd58 100644 --- a/dev-packages/node-integration-tests/suites/tracing/vercelai/test.ts +++ b/dev-packages/node-integration-tests/suites/tracing/vercelai/test.ts @@ -416,4 +416,94 @@ describe('Vercel AI integration', () => { await createRunner().expect({ transaction: EXPECTED_TRANSACTION_DEFAULT_PII_TRUE }).start().completed(); }); }); + + createEsmAndCjsTests(__dirname, 'scenario-error.mjs', 'instrument-with-pii.mjs', (createRunner, test) => { + test('Vercel AI errors should inherit parent trace context from manually created outer span', async () => { + let capturedTransaction: any; + let capturedEvent: any; + + const runner = createRunner() + .expect({ + transaction: (transaction: any) => { + capturedTransaction = transaction; + expect(transaction.transaction).toBe('outer span'); + }, + }) + .expect({ + event: (event: any) => { + capturedEvent = event; + + expect(event).toMatchObject({ + exception: { + values: expect.arrayContaining([ + expect.objectContaining({ + type: 'AI_ToolExecutionError', + value: 'Error executing tool calculateTool: Not implemented', + }), + ]), + }, + }); + }, + }) + .start(); + + await runner.completed(); + + const transactionTraceId = capturedTransaction?.contexts?.trace?.trace_id; + const errorTraceId = capturedEvent?.contexts?.trace?.trace_id; + + expect(transactionTraceId).toBeDefined(); + expect(errorTraceId).toBeDefined(); + expect(transactionTraceId).toMatch(/^[a-f0-9]{32}$/); + expect(errorTraceId).toMatch(/^[a-f0-9]{32}$/); + + expect(errorTraceId).toBe(transactionTraceId); + }); + }); + + createEsmAndCjsTests(__dirname, 'scenario-express-error.mjs', 'instrument-with-pii.mjs', (createRunner, test) => { + test('Vercel AI errors should inherit parent trace context from server HTTP request', async () => { + let capturedTransaction: any; + let capturedEvent: any; + + const runner = createRunner() + .withMockSentryServer() + .expect({ + transaction: (transaction: any) => { + capturedTransaction = transaction; + // Express creates a transaction like "GET /api/chat" + expect(transaction.transaction).toBe('GET /api/chat'); + }, + }) + .expect({ + event: (event: any) => { + capturedEvent = event; + + expect(event).toMatchObject({ + exception: { + values: expect.arrayContaining([ + expect.objectContaining({ + type: 'AI_ToolExecutionError', + value: 'Error executing tool calculateTool: Calculation service unavailable', + }), + ]), + }, + }); + }, + }) + .start(); + + await runner.completed(); + + const transactionTraceId = capturedTransaction?.contexts?.trace?.trace_id; + const errorTraceId = capturedEvent?.contexts?.trace?.trace_id; + + expect(transactionTraceId).toBeDefined(); + expect(errorTraceId).toBeDefined(); + expect(transactionTraceId).toMatch(/^[a-f0-9]{32}$/); + expect(errorTraceId).toMatch(/^[a-f0-9]{32}$/); + + expect(errorTraceId).toBe(transactionTraceId); + }); + }); }); From 305fd527a17688dc21b46ba9e857f19b2318cfad Mon Sep 17 00:00:00 2001 From: RulaKhaled Date: Fri, 25 Jul 2025 14:41:36 +0200 Subject: [PATCH 08/11] quick refactor --- .../suites/tracing/openai/scenario.mjs | 2 +- .../suites/tracing/openai/test.ts | 15 ++++++++------- packages/core/src/utils/openai/streaming.ts | 2 +- packages/core/src/utils/openai/utils.ts | 12 +++++++++--- 4 files changed, 19 insertions(+), 12 deletions(-) diff --git a/dev-packages/node-integration-tests/suites/tracing/openai/scenario.mjs b/dev-packages/node-integration-tests/suites/tracing/openai/scenario.mjs index 997f6ca47c6e..b234182bad2c 100644 --- a/dev-packages/node-integration-tests/suites/tracing/openai/scenario.mjs +++ b/dev-packages/node-integration-tests/suites/tracing/openai/scenario.mjs @@ -222,7 +222,7 @@ class MockOpenAI { user: null, metadata: {}, output: [], - output_text: `Streaming response to: ${params.input}`, + output_text: params.input, usage: { input_tokens: 6, output_tokens: 10, diff --git a/dev-packages/node-integration-tests/suites/tracing/openai/test.ts b/dev-packages/node-integration-tests/suites/tracing/openai/test.ts index d6c1d4b6f628..f0f35b3b3086 100644 --- a/dev-packages/node-integration-tests/suites/tracing/openai/test.ts +++ b/dev-packages/node-integration-tests/suites/tracing/openai/test.ts @@ -84,7 +84,7 @@ describe('OpenAI integration', () => { 'gen_ai.request.stream': true, 'gen_ai.response.model': 'gpt-4', 'gen_ai.response.id': 'chatcmpl-stream-123', - 'gen_ai.response.finish_reasons': 'stop', + 'gen_ai.response.finish_reasons': '["stop"]', 'gen_ai.usage.input_tokens': 12, 'gen_ai.usage.output_tokens': 18, 'gen_ai.usage.total_tokens': 30, @@ -110,7 +110,7 @@ describe('OpenAI integration', () => { 'gen_ai.request.stream': true, 'gen_ai.response.model': 'gpt-4', 'gen_ai.response.id': 'resp_stream_456', - 'gen_ai.response.finish_reasons': 'in_progress', + 'gen_ai.response.finish_reasons': '["in_progress"]', 'gen_ai.usage.input_tokens': 0, 'gen_ai.usage.output_tokens': 0, 'gen_ai.usage.total_tokens': 0, @@ -210,9 +210,10 @@ describe('OpenAI integration', () => { 'gen_ai.request.model': 'gpt-4', 'gen_ai.request.temperature': 0.8, 'gen_ai.request.stream': true, - 'gen_ai.request.messages': expect.stringContaining('Tell me about streaming'), + 'gen_ai.request.messages': + '[{"role":"system","content":"You are a helpful assistant."},{"role":"user","content":"Tell me about streaming"}]', 'gen_ai.response.text': 'Hello from OpenAI streaming!', - 'gen_ai.response.finish_reasons': 'stop', + 'gen_ai.response.finish_reasons': '["stop"]', 'gen_ai.response.id': 'chatcmpl-stream-123', 'gen_ai.response.model': 'gpt-4', 'gen_ai.usage.input_tokens': 12, @@ -240,8 +241,8 @@ describe('OpenAI integration', () => { 'gen_ai.request.model': 'gpt-4', 'gen_ai.request.stream': true, 'gen_ai.request.messages': '"Test streaming responses API"', - 'gen_ai.response.text': expect.stringContaining('Streaming response to: Test streaming responses API'), - 'gen_ai.response.finish_reasons': 'completed', + 'gen_ai.response.text': 'Streaming response to: Test streaming responses APITest streaming responses API', + 'gen_ai.response.finish_reasons': '["in_progress","completed"]', 'gen_ai.response.id': 'resp_stream_456', 'gen_ai.response.model': 'gpt-4', 'gen_ai.usage.input_tokens': 6, @@ -265,7 +266,7 @@ describe('OpenAI integration', () => { 'gen_ai.operation.name': 'chat', 'gen_ai.request.model': 'error-model', 'gen_ai.request.stream': true, - 'gen_ai.request.messages': expect.stringContaining('This will fail'), + 'gen_ai.request.messages': '[{"role":"user","content":"This will fail"}]', 'gen_ai.system': 'openai', 'openai.response.stream': true, 'sentry.op': 'gen_ai.chat', diff --git a/packages/core/src/utils/openai/streaming.ts b/packages/core/src/utils/openai/streaming.ts index c6f8f3cf0b74..28e24d2cf034 100644 --- a/packages/core/src/utils/openai/streaming.ts +++ b/packages/core/src/utils/openai/streaming.ts @@ -124,7 +124,7 @@ export async function* instrumentStream( if (state.finishReasons.length) { span.setAttributes({ - [GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE]: state.finishReasons[state.finishReasons.length - 1], + [GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE]: JSON.stringify(state.finishReasons), }); } diff --git a/packages/core/src/utils/openai/utils.ts b/packages/core/src/utils/openai/utils.ts index 0032f72d6b2c..ea3119cfcfb7 100644 --- a/packages/core/src/utils/openai/utils.ts +++ b/packages/core/src/utils/openai/utils.ts @@ -13,7 +13,13 @@ import { OPENAI_USAGE_PROMPT_TOKENS_ATTRIBUTE, } from '../gen-ai-attributes'; import { INSTRUMENTED_METHODS } from './constants'; -import type { InstrumentedMethod, OpenAiChatCompletionObject, OpenAIResponseObject } from './types'; +import type { + ChatCompletionChunk, + InstrumentedMethod, + OpenAiChatCompletionObject, + OpenAIResponseObject, + ResponseStreamingEvent, +} from './types'; /** * Maps OpenAI method paths to Sentry operation names @@ -78,7 +84,7 @@ export function isResponsesApiResponse(response: unknown): response is OpenAIRes /** * Check if streaming event is from the Responses API */ -export function isResponsesApiStreamEvent(event: unknown): boolean { +export function isResponsesApiStreamEvent(event: unknown): event is ResponseStreamingEvent { return ( event !== null && typeof event === 'object' && @@ -91,7 +97,7 @@ export function isResponsesApiStreamEvent(event: unknown): boolean { /** * Check if streaming event is a chat completion chunk */ -export function isChatCompletionChunk(event: unknown): boolean { +export function isChatCompletionChunk(event: unknown): event is ChatCompletionChunk { return ( event !== null && typeof event === 'object' && From b4a43e283b5c19acec0c40f96d986be4106197a4 Mon Sep 17 00:00:00 2001 From: Rola Abuhasna Date: Fri, 25 Jul 2025 14:45:51 +0200 Subject: [PATCH 09/11] Delete dev-packages/node-integration-tests/suites/tracing/vercelai/scenario-express-error.mjs --- .../vercelai/scenario-express-error.mjs | 81 ------------------- 1 file changed, 81 deletions(-) delete mode 100644 dev-packages/node-integration-tests/suites/tracing/vercelai/scenario-express-error.mjs diff --git a/dev-packages/node-integration-tests/suites/tracing/vercelai/scenario-express-error.mjs b/dev-packages/node-integration-tests/suites/tracing/vercelai/scenario-express-error.mjs deleted file mode 100644 index 896ceceba6f4..000000000000 --- a/dev-packages/node-integration-tests/suites/tracing/vercelai/scenario-express-error.mjs +++ /dev/null @@ -1,81 +0,0 @@ -import * as Sentry from '@sentry/node'; -import { generateText, tool } from 'ai'; -import { MockLanguageModelV1 } from 'ai/test'; -import express from 'express'; -import { createServer } from 'http'; -import { z } from 'zod'; - -async function run() { - const app = express(); - - app.get('/api/chat', async (req, res) => { - try { - await generateText({ - model: new MockLanguageModelV1({ - doGenerate: async () => ({ - rawCall: { rawPrompt: null, rawSettings: {} }, - finishReason: 'stop', - usage: { promptTokens: 10, completionTokens: 20 }, - text: 'Processing your request...', - toolCalls: [ - { - toolCallType: 'function', - toolCallId: 'call-1', - toolName: 'calculateTool', - args: '{ "a": 1, "b": 2 }', - }, - ], - }), - }), - experimental_telemetry: { - functionId: 'Chat Assistant', - recordInputs: true, - recordOutputs: true, - isEnabled: true, - }, - tools: { - calculateTool: tool({ - description: 'Calculate the result of a math problem. Returns a number.', - parameters: z.object({ - a: z.number().describe('First number'), - b: z.number().describe('Second number'), - }), - type: 'function', - execute: async () => { - throw new Error('Calculation service unavailable'); - }, - }), - }, - maxSteps: 2, - system: 'You are a helpful chat assistant.', - prompt: 'What is 1 + 1?', - }); - - res.json({ success: true }); - } catch (error) { - res.status(500).json({ error: error.message }); - } - }); - - Sentry.setupExpressErrorHandler(app); - - const server = createServer(app); - - // Start server and make request - server.listen(0, () => { - const port = server.address()?.port; - // eslint-disable-next-line no-console - console.log(JSON.stringify({ port })); - - // Make the request that will trigger the error - fetch(`http://localhost:${port}/api/chat`) - .then(() => { - server.close(); - }) - .catch(() => { - server.close(); - }); - }); -} - -run(); From ffcd2f7b8aeb2da77769e6240da8660815f7e35e Mon Sep 17 00:00:00 2001 From: Rola Abuhasna Date: Fri, 25 Jul 2025 14:47:43 +0200 Subject: [PATCH 10/11] Update test.ts --- .../suites/tracing/vercelai/test.ts | 90 ------------------- 1 file changed, 90 deletions(-) diff --git a/dev-packages/node-integration-tests/suites/tracing/vercelai/test.ts b/dev-packages/node-integration-tests/suites/tracing/vercelai/test.ts index f58cf63cdd58..f9b853aa4946 100644 --- a/dev-packages/node-integration-tests/suites/tracing/vercelai/test.ts +++ b/dev-packages/node-integration-tests/suites/tracing/vercelai/test.ts @@ -416,94 +416,4 @@ describe('Vercel AI integration', () => { await createRunner().expect({ transaction: EXPECTED_TRANSACTION_DEFAULT_PII_TRUE }).start().completed(); }); }); - - createEsmAndCjsTests(__dirname, 'scenario-error.mjs', 'instrument-with-pii.mjs', (createRunner, test) => { - test('Vercel AI errors should inherit parent trace context from manually created outer span', async () => { - let capturedTransaction: any; - let capturedEvent: any; - - const runner = createRunner() - .expect({ - transaction: (transaction: any) => { - capturedTransaction = transaction; - expect(transaction.transaction).toBe('outer span'); - }, - }) - .expect({ - event: (event: any) => { - capturedEvent = event; - - expect(event).toMatchObject({ - exception: { - values: expect.arrayContaining([ - expect.objectContaining({ - type: 'AI_ToolExecutionError', - value: 'Error executing tool calculateTool: Not implemented', - }), - ]), - }, - }); - }, - }) - .start(); - - await runner.completed(); - - const transactionTraceId = capturedTransaction?.contexts?.trace?.trace_id; - const errorTraceId = capturedEvent?.contexts?.trace?.trace_id; - - expect(transactionTraceId).toBeDefined(); - expect(errorTraceId).toBeDefined(); - expect(transactionTraceId).toMatch(/^[a-f0-9]{32}$/); - expect(errorTraceId).toMatch(/^[a-f0-9]{32}$/); - - expect(errorTraceId).toBe(transactionTraceId); - }); - }); - - createEsmAndCjsTests(__dirname, 'scenario-express-error.mjs', 'instrument-with-pii.mjs', (createRunner, test) => { - test('Vercel AI errors should inherit parent trace context from server HTTP request', async () => { - let capturedTransaction: any; - let capturedEvent: any; - - const runner = createRunner() - .withMockSentryServer() - .expect({ - transaction: (transaction: any) => { - capturedTransaction = transaction; - // Express creates a transaction like "GET /api/chat" - expect(transaction.transaction).toBe('GET /api/chat'); - }, - }) - .expect({ - event: (event: any) => { - capturedEvent = event; - - expect(event).toMatchObject({ - exception: { - values: expect.arrayContaining([ - expect.objectContaining({ - type: 'AI_ToolExecutionError', - value: 'Error executing tool calculateTool: Calculation service unavailable', - }), - ]), - }, - }); - }, - }) - .start(); - - await runner.completed(); - - const transactionTraceId = capturedTransaction?.contexts?.trace?.trace_id; - const errorTraceId = capturedEvent?.contexts?.trace?.trace_id; - - expect(transactionTraceId).toBeDefined(); - expect(errorTraceId).toBeDefined(); - expect(transactionTraceId).toMatch(/^[a-f0-9]{32}$/); - expect(errorTraceId).toMatch(/^[a-f0-9]{32}$/); - - expect(errorTraceId).toBe(transactionTraceId); - }); - }); }); From 4dd32e8983be0e4fcb5954308b4b43d2c44b7eba Mon Sep 17 00:00:00 2001 From: RulaKhaled Date: Sun, 27 Jul 2025 22:03:29 +0200 Subject: [PATCH 11/11] refactor tests, and add more docs --- .../suites/tracing/openai/scenario.mjs | 6 +- .../suites/tracing/openai/test.ts | 36 +++-- packages/core/src/utils/openai/index.ts | 14 +- packages/core/src/utils/openai/streaming.ts | 123 +++++++++++++----- packages/core/src/utils/openai/utils.ts | 30 ++--- 5 files changed, 145 insertions(+), 64 deletions(-) diff --git a/dev-packages/node-integration-tests/suites/tracing/openai/scenario.mjs b/dev-packages/node-integration-tests/suites/tracing/openai/scenario.mjs index b234182bad2c..faf554ede924 100644 --- a/dev-packages/node-integration-tests/suites/tracing/openai/scenario.mjs +++ b/dev-packages/node-integration-tests/suites/tracing/openai/scenario.mjs @@ -61,11 +61,11 @@ class MockOpenAI { return { id: 'resp_mock456', object: 'response', - created: 1677652290, + created_at: 1677652290, model: params.model, input_text: params.input, output_text: `Response to: ${params.input}`, - finish_reason: 'stop', + status: 'completed', usage: { input_tokens: 5, output_tokens: 8, @@ -260,7 +260,7 @@ async function run() { instructions: 'You are a translator', }); - // Third test: error handling + // Third test: error handling in chat completions try { await client.chat.completions.create({ model: 'error-model', diff --git a/dev-packages/node-integration-tests/suites/tracing/openai/test.ts b/dev-packages/node-integration-tests/suites/tracing/openai/test.ts index f0f35b3b3086..0a0d0418da14 100644 --- a/dev-packages/node-integration-tests/suites/tracing/openai/test.ts +++ b/dev-packages/node-integration-tests/suites/tracing/openai/test.ts @@ -45,11 +45,13 @@ describe('OpenAI integration', () => { 'gen_ai.request.model': 'gpt-3.5-turbo', 'gen_ai.response.model': 'gpt-3.5-turbo', 'gen_ai.response.id': 'resp_mock456', + 'gen_ai.response.finish_reasons': '["completed"]', 'gen_ai.usage.input_tokens': 5, 'gen_ai.usage.output_tokens': 8, 'gen_ai.usage.total_tokens': 13, 'openai.response.id': 'resp_mock456', 'openai.response.model': 'gpt-3.5-turbo', + 'openai.response.timestamp': '2023-03-01T06:31:30.000Z', 'openai.usage.completion_tokens': 8, 'openai.usage.prompt_tokens': 5, }, @@ -90,6 +92,7 @@ describe('OpenAI integration', () => { 'gen_ai.usage.total_tokens': 30, 'openai.response.id': 'chatcmpl-stream-123', 'openai.response.model': 'gpt-4', + 'openai.response.stream': true, 'openai.response.timestamp': '2023-03-01T06:31:40.000Z', 'openai.usage.completion_tokens': 18, 'openai.usage.prompt_tokens': 12, @@ -110,21 +113,37 @@ describe('OpenAI integration', () => { 'gen_ai.request.stream': true, 'gen_ai.response.model': 'gpt-4', 'gen_ai.response.id': 'resp_stream_456', - 'gen_ai.response.finish_reasons': '["in_progress"]', - 'gen_ai.usage.input_tokens': 0, - 'gen_ai.usage.output_tokens': 0, - 'gen_ai.usage.total_tokens': 0, + 'gen_ai.response.finish_reasons': '["in_progress","completed"]', + 'gen_ai.usage.input_tokens': 6, + 'gen_ai.usage.output_tokens': 10, + 'gen_ai.usage.total_tokens': 16, 'openai.response.id': 'resp_stream_456', 'openai.response.model': 'gpt-4', + 'openai.response.stream': true, 'openai.response.timestamp': '2023-03-01T06:31:50.000Z', - 'openai.usage.completion_tokens': 0, - 'openai.usage.prompt_tokens': 0, + 'openai.usage.completion_tokens': 10, + 'openai.usage.prompt_tokens': 6, }, description: 'chat gpt-4 stream-response', op: 'gen_ai.chat', origin: 'manual', status: 'ok', }), + // Sixth span - error handling in streaming context + expect.objectContaining({ + data: { + 'gen_ai.operation.name': 'chat', + 'gen_ai.request.model': 'error-model', + 'gen_ai.request.stream': true, + 'gen_ai.system': 'openai', + 'sentry.op': 'gen_ai.chat', + 'sentry.origin': 'manual', + }, + description: 'chat error-model stream-response', + op: 'gen_ai.chat', + origin: 'manual', + status: 'internal_error', + }), ]), }; @@ -170,6 +189,7 @@ describe('OpenAI integration', () => { 'gen_ai.request.model': 'gpt-3.5-turbo', 'gen_ai.request.messages': '"Translate this to French: Hello"', 'gen_ai.response.text': 'Response to: Translate this to French: Hello', + 'gen_ai.response.finish_reasons': '["completed"]', 'gen_ai.response.model': 'gpt-3.5-turbo', 'gen_ai.response.id': 'resp_mock456', 'gen_ai.usage.input_tokens': 5, @@ -177,6 +197,7 @@ describe('OpenAI integration', () => { 'gen_ai.usage.total_tokens': 13, 'openai.response.id': 'resp_mock456', 'openai.response.model': 'gpt-3.5-turbo', + 'openai.response.timestamp': '2023-03-01T06:31:30.000Z', 'openai.usage.completion_tokens': 8, 'openai.usage.prompt_tokens': 5, }, @@ -268,14 +289,13 @@ describe('OpenAI integration', () => { 'gen_ai.request.stream': true, 'gen_ai.request.messages': '[{"role":"user","content":"This will fail"}]', 'gen_ai.system': 'openai', - 'openai.response.stream': true, 'sentry.op': 'gen_ai.chat', 'sentry.origin': 'manual', }, description: 'chat error-model stream-response', op: 'gen_ai.chat', origin: 'manual', - status: 'ok', + status: 'internal_error', }), ]), }; diff --git a/packages/core/src/utils/openai/index.ts b/packages/core/src/utils/openai/index.ts index 68bd8c30beea..bb8e4f983ee7 100644 --- a/packages/core/src/utils/openai/index.ts +++ b/packages/core/src/utils/openai/index.ts @@ -1,5 +1,6 @@ import { getCurrentScope } from '../../currentScopes'; import { captureException } from '../../exports'; +import { SPAN_STATUS_ERROR } from '../../tracing'; import { startSpan, startSpanManual } from '../../tracing/trace'; import type { Span, SpanAttributeValue } from '../../types-hoist/span'; import { @@ -14,7 +15,6 @@ import { GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE, GEN_AI_RESPONSE_TEXT_ATTRIBUTE, GEN_AI_SYSTEM_ATTRIBUTE, - OPENAI_RESPONSE_STREAM_ATTRIBUTE, } from '../gen-ai-attributes'; import { OPENAI_INTEGRATION_NAME } from './constants'; import { instrumentStream } from './streaming'; @@ -143,9 +143,6 @@ function addRequestAttributes(span: Span, params: Record): void if ('input' in params) { span.setAttributes({ [GEN_AI_REQUEST_MESSAGES_ATTRIBUTE]: JSON.stringify(params.input) }); } - if ('stream' in params) { - span.setAttributes({ [OPENAI_RESPONSE_STREAM_ATTRIBUTE]: Boolean(params.stream) }); - } } function getOptionsFromIntegration(): OpenAiOptions { @@ -202,7 +199,14 @@ function instrumentMethod( finalOptions.recordOutputs ?? false, ) as unknown as R; } catch (error) { - captureException(error); + // For streaming requests that fail before stream creation, we still want to record + // them as streaming requests but end the span gracefully + span.setStatus({ code: SPAN_STATUS_ERROR, message: 'internal_error' }); + captureException(error, { + mechanism: { + handled: false, + }, + }); span.end(); throw error; } diff --git a/packages/core/src/utils/openai/streaming.ts b/packages/core/src/utils/openai/streaming.ts index 28e24d2cf034..88d4c6adf893 100644 --- a/packages/core/src/utils/openai/streaming.ts +++ b/packages/core/src/utils/openai/streaming.ts @@ -1,6 +1,11 @@ import { captureException } from '../../exports'; +import { SPAN_STATUS_ERROR } from '../../tracing'; import type { Span } from '../../types-hoist/span'; -import { GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE, GEN_AI_RESPONSE_TEXT_ATTRIBUTE } from '../gen-ai-attributes'; +import { + GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE, + GEN_AI_RESPONSE_TEXT_ATTRIBUTE, + OPENAI_RESPONSE_STREAM_ATTRIBUTE, +} from '../gen-ai-attributes'; import { RESPONSE_EVENT_TYPES } from './constants'; import type { OpenAIResponseObject } from './types'; import { type ChatCompletionChunk, type ResponseStreamingEvent } from './types'; @@ -11,24 +16,48 @@ import { setTokenUsageAttributes, } from './utils'; +/** + * State object used to accumulate information from a stream of OpenAI events/chunks. + */ interface StreamingState { + /** Types of events encountered in the stream. */ eventTypes: string[]; + /** Collected response text fragments (for output recording). */ responseTexts: string[]; + /** Reasons for finishing the response, as reported by the API. */ finishReasons: string[]; - responseId?: string; - responseModel?: string; - responseTimestamp?: number; - promptTokens?: number; - completionTokens?: number; - totalTokens?: number; + /** The response ID. */ + responseId: string; + /** The model name. */ + responseModel: string; + /** The timestamp of the response. */ + responseTimestamp: number; + /** Number of prompt/input tokens used. */ + promptTokens: number | undefined; + /** Number of completion/output tokens used. */ + completionTokens: number | undefined; + /** Total number of tokens used (prompt + completion). */ + totalTokens: number | undefined; } +/** + * Processes a single OpenAI ChatCompletionChunk event, updating the streaming state. + * + * @param chunk - The ChatCompletionChunk event to process. + * @param state - The current streaming state to update. + * @param recordOutputs - Whether to record output text fragments. + */ function processChatCompletionChunk(chunk: ChatCompletionChunk, state: StreamingState, recordOutputs: boolean): void { state.responseId = chunk.id ?? state.responseId; state.responseModel = chunk.model ?? state.responseModel; state.responseTimestamp = chunk.created ?? state.responseTimestamp; if (chunk.usage) { + // For stream responses, the input tokens remain constant across all events in the stream. + // Output tokens, however, are only finalized in the last event. + // Since we can't guarantee that the last event will include usage data or even be a typed event, + // we update the output token values on every event that includes them. + // This ensures that output token usage is always set, even if the final event lacks it. state.promptTokens = chunk.usage.prompt_tokens; state.completionTokens = chunk.usage.completion_tokens; state.totalTokens = chunk.usage.total_tokens; @@ -44,17 +73,31 @@ function processChatCompletionChunk(chunk: ChatCompletionChunk, state: Streaming } } +/** + * Processes a single OpenAI Responses API streaming event, updating the streaming state and span. + * + * @param streamEvent - The event to process (may be an error or unknown object). + * @param state - The current streaming state to update. + * @param recordOutputs - Whether to record output text fragments. + * @param span - The span to update with error status if needed. + */ function processResponsesApiEvent( streamEvent: ResponseStreamingEvent | unknown | Error, state: StreamingState, recordOutputs: boolean, + span: Span, ): void { if (!(streamEvent && typeof streamEvent === 'object')) { state.eventTypes.push('unknown:non-object'); return; } if (streamEvent instanceof Error) { - captureException(streamEvent); + span.setStatus({ code: SPAN_STATUS_ERROR, message: 'internal_error' }); + captureException(streamEvent, { + mechanism: { + handled: false, + }, + }); return; } @@ -71,32 +114,42 @@ function processResponsesApiEvent( return; } - const { response } = event as { response: OpenAIResponseObject }; - state.responseId = response.id ?? state.responseId; - state.responseModel = response.model ?? state.responseModel; - state.responseTimestamp = response.created_at ?? state.responseTimestamp; - - if (response.usage) { - state.promptTokens = response.usage.input_tokens; - state.completionTokens = response.usage.output_tokens; - state.totalTokens = response.usage.total_tokens; - } + if ('response' in event) { + const { response } = event as { response: OpenAIResponseObject }; + state.responseId = response.id ?? state.responseId; + state.responseModel = response.model ?? state.responseModel; + state.responseTimestamp = response.created_at ?? state.responseTimestamp; + + if (response.usage) { + // For stream responses, the input tokens remain constant across all events in the stream. + // Output tokens, however, are only finalized in the last event. + // Since we can't guarantee that the last event will include usage data or even be a typed event, + // we update the output token values on every event that includes them. + // This ensures that output token usage is always set, even if the final event lacks it. + state.promptTokens = response.usage.input_tokens; + state.completionTokens = response.usage.output_tokens; + state.totalTokens = response.usage.total_tokens; + } - if (response.status) { - state.finishReasons.push(response.status); - } + if (response.status) { + state.finishReasons.push(response.status); + } - if (recordOutputs && response.output_text) { - state.responseTexts.push(response.output_text); + if (recordOutputs && response.output_text) { + state.responseTexts.push(response.output_text); + } } } + /** - * Instrument a stream of OpenAI events - * @param stream - The stream of events to instrument - * @param span - The span to add attributes to - * @param recordOutputs - Whether to record outputs - * @param finishSpan - Optional function to finish the span manually - * @returns A generator that yields the events + * Instruments a stream of OpenAI events, updating the provided span with relevant attributes and + * optionally recording output text. This function yields each event from the input stream as it is processed. + * + * @template T - The type of events in the stream. + * @param stream - The async iterable stream of events to instrument. + * @param span - The span to add attributes to and to finish at the end of the stream. + * @param recordOutputs - Whether to record output text fragments in the span. + * @returns An async generator yielding each event from the input stream. */ export async function* instrumentStream( stream: AsyncIterable, @@ -107,6 +160,12 @@ export async function* instrumentStream( eventTypes: [], responseTexts: [], finishReasons: [], + responseId: '', + responseModel: '', + responseTimestamp: 0, + promptTokens: undefined, + completionTokens: undefined, + totalTokens: undefined, }; try { @@ -114,7 +173,7 @@ export async function* instrumentStream( if (isChatCompletionChunk(event)) { processChatCompletionChunk(event as ChatCompletionChunk, state, recordOutputs); } else if (isResponsesApiStreamEvent(event)) { - processResponsesApiEvent(event as ResponseStreamingEvent, state, recordOutputs); + processResponsesApiEvent(event as ResponseStreamingEvent, state, recordOutputs, span); } yield event; } @@ -122,6 +181,10 @@ export async function* instrumentStream( setCommonResponseAttributes(span, state.responseId, state.responseModel, state.responseTimestamp); setTokenUsageAttributes(span, state.promptTokens, state.completionTokens, state.totalTokens); + span.setAttributes({ + [OPENAI_RESPONSE_STREAM_ATTRIBUTE]: true, + }); + if (state.finishReasons.length) { span.setAttributes({ [GEN_AI_RESPONSE_FINISH_REASONS_ATTRIBUTE]: JSON.stringify(state.finishReasons), diff --git a/packages/core/src/utils/openai/utils.ts b/packages/core/src/utils/openai/utils.ts index ea3119cfcfb7..66e727aff075 100644 --- a/packages/core/src/utils/openai/utils.ts +++ b/packages/core/src/utils/openai/utils.ts @@ -145,22 +145,16 @@ export function setTokenUsageAttributes( * @param model - The response model * @param timestamp - The response timestamp */ -export function setCommonResponseAttributes(span: Span, id?: string, model?: string, timestamp?: number): void { - if (id) { - span.setAttributes({ - [OPENAI_RESPONSE_ID_ATTRIBUTE]: id, - [GEN_AI_RESPONSE_ID_ATTRIBUTE]: id, - }); - } - if (model) { - span.setAttributes({ - [OPENAI_RESPONSE_MODEL_ATTRIBUTE]: model, - [GEN_AI_RESPONSE_MODEL_ATTRIBUTE]: model, - }); - } - if (timestamp) { - span.setAttributes({ - [OPENAI_RESPONSE_TIMESTAMP_ATTRIBUTE]: new Date(timestamp * 1000).toISOString(), - }); - } +export function setCommonResponseAttributes(span: Span, id: string, model: string, timestamp: number): void { + span.setAttributes({ + [OPENAI_RESPONSE_ID_ATTRIBUTE]: id, + [GEN_AI_RESPONSE_ID_ATTRIBUTE]: id, + }); + span.setAttributes({ + [OPENAI_RESPONSE_MODEL_ATTRIBUTE]: model, + [GEN_AI_RESPONSE_MODEL_ATTRIBUTE]: model, + }); + span.setAttributes({ + [OPENAI_RESPONSE_TIMESTAMP_ATTRIBUTE]: new Date(timestamp * 1000).toISOString(), + }); }