"use strict"; var _Stream_client; Object.defineProperty(exports, "__esModule", { value: true }); exports.Stream = void 0; exports._iterSSEMessages = _iterSSEMessages; const tslib_1 = require("../internal/tslib.js"); const error_1 = require("./error.js"); const shims_1 = require("../internal/shims.js"); const line_1 = require("../internal/decoders/line.js"); const shims_2 = require("../internal/shims.js"); const errors_1 = require("../internal/errors.js"); const bytes_1 = require("../internal/utils/bytes.js"); const log_1 = require("../internal/utils/log.js"); const error_2 = require("./error.js"); class Stream { constructor(iterator, controller, client) { this.iterator = iterator; _Stream_client.set(this, void 0); this.controller = controller; tslib_1.__classPrivateFieldSet(this, _Stream_client, client, "f"); } static fromSSEResponse(response, controller, client) { let consumed = false; const logger = client ? (0, log_1.loggerFor)(client) : console; async function* iterator() { if (consumed) { throw new error_1.OpenAIError('Cannot iterate over a consumed stream, use `.tee()` to split the stream.'); } consumed = true; let done = false; try { for await (const sse of _iterSSEMessages(response, controller)) { if (done) continue; if (sse.data.startsWith('[DONE]')) { done = true; continue; } if (sse.event === null || !sse.event.startsWith('thread.')) { let data; try { data = JSON.parse(sse.data); } catch (e) { logger.error(`Could not parse message into JSON:`, sse.data); logger.error(`From chunk:`, sse.raw); throw e; } if (data && data.error) { throw new error_2.APIError(undefined, data.error, undefined, response.headers); } yield data; } else { let data; try { data = JSON.parse(sse.data); } catch (e) { console.error(`Could not parse message into JSON:`, sse.data); console.error(`From chunk:`, sse.raw); throw e; } // TODO: Is this where the error should be thrown? if (sse.event == 'error') { throw new error_2.APIError(undefined, data.error, data.message, undefined); } yield { event: sse.event, data: data }; } } done = true; } catch (e) { // If the user calls `stream.controller.abort()`, we should exit without throwing. if ((0, errors_1.isAbortError)(e)) return; throw e; } finally { // If the user `break`s, abort the ongoing request. if (!done) controller.abort(); } } return new Stream(iterator, controller, client); } /** * Generates a Stream from a newline-separated ReadableStream * where each item is a JSON value. */ static fromReadableStream(readableStream, controller, client) { let consumed = false; async function* iterLines() { const lineDecoder = new line_1.LineDecoder(); const iter = (0, shims_2.ReadableStreamToAsyncIterable)(readableStream); for await (const chunk of iter) { for (const line of lineDecoder.decode(chunk)) { yield line; } } for (const line of lineDecoder.flush()) { yield line; } } async function* iterator() { if (consumed) { throw new error_1.OpenAIError('Cannot iterate over a consumed stream, use `.tee()` to split the stream.'); } consumed = true; let done = false; try { for await (const line of iterLines()) { if (done) continue; if (line) yield JSON.parse(line); } done = true; } catch (e) { // If the user calls `stream.controller.abort()`, we should exit without throwing. if ((0, errors_1.isAbortError)(e)) return; throw e; } finally { // If the user `break`s, abort the ongoing request. if (!done) controller.abort(); } } return new Stream(iterator, controller, client); } [(_Stream_client = new WeakMap(), Symbol.asyncIterator)]() { return this.iterator(); } /** * Splits the stream into two streams which can be * independently read from at different speeds. */ tee() { const left = []; const right = []; const iterator = this.iterator(); const teeIterator = (queue) => { return { next: () => { if (queue.length === 0) { const result = iterator.next(); left.push(result); right.push(result); } return queue.shift(); }, }; }; return [ new Stream(() => teeIterator(left), this.controller, tslib_1.__classPrivateFieldGet(this, _Stream_client, "f")), new Stream(() => teeIterator(right), this.controller, tslib_1.__classPrivateFieldGet(this, _Stream_client, "f")), ]; } /** * Converts this stream to a newline-separated ReadableStream of * JSON stringified values in the stream * which can be turned back into a Stream with `Stream.fromReadableStream()`. */ toReadableStream() { const self = this; let iter; return (0, shims_1.makeReadableStream)({ async start() { iter = self[Symbol.asyncIterator](); }, async pull(ctrl) { try { const { value, done } = await iter.next(); if (done) return ctrl.close(); const bytes = (0, bytes_1.encodeUTF8)(JSON.stringify(value) + '\n'); ctrl.enqueue(bytes); } catch (err) { ctrl.error(err); } }, async cancel() { await iter.return?.(); }, }); } } exports.Stream = Stream; async function* _iterSSEMessages(response, controller) { if (!response.body) { controller.abort(); if (typeof globalThis.navigator !== 'undefined' && globalThis.navigator.product === 'ReactNative') { throw new error_1.OpenAIError(`The default react-native fetch implementation does not support streaming. Please use expo/fetch: https://docs.expo.dev/versions/latest/sdk/expo/#expofetch-api`); } throw new error_1.OpenAIError(`Attempted to iterate over a response with no body`); } const sseDecoder = new SSEDecoder(); const lineDecoder = new line_1.LineDecoder(); const iter = (0, shims_2.ReadableStreamToAsyncIterable)(response.body); for await (const sseChunk of iterSSEChunks(iter)) { for (const line of lineDecoder.decode(sseChunk)) { const sse = sseDecoder.decode(line); if (sse) yield sse; } } for (const line of lineDecoder.flush()) { const sse = sseDecoder.decode(line); if (sse) yield sse; } } /** * Given an async iterable iterator, iterates over it and yields full * SSE chunks, i.e. yields when a double new-line is encountered. */ async function* iterSSEChunks(iterator) { let data = new Uint8Array(); for await (const chunk of iterator) { if (chunk == null) { continue; } const binaryChunk = chunk instanceof ArrayBuffer ? new Uint8Array(chunk) : typeof chunk === 'string' ? (0, bytes_1.encodeUTF8)(chunk) : chunk; let newData = new Uint8Array(data.length + binaryChunk.length); newData.set(data); newData.set(binaryChunk, data.length); data = newData; let patternIndex; while ((patternIndex = (0, line_1.findDoubleNewlineIndex)(data)) !== -1) { yield data.slice(0, patternIndex); data = data.slice(patternIndex); } } if (data.length > 0) { yield data; } } class SSEDecoder { constructor() { this.event = null; this.data = []; this.chunks = []; } decode(line) { if (line.endsWith('\r')) { line = line.substring(0, line.length - 1); } if (!line) { // empty line and we didn't previously encounter any messages if (!this.event && !this.data.length) return null; const sse = { event: this.event, data: this.data.join('\n'), raw: this.chunks, }; this.event = null; this.data = []; this.chunks = []; return sse; } this.chunks.push(line); if (line.startsWith(':')) { return null; } let [fieldname, _, value] = partition(line, ':'); if (value.startsWith(' ')) { value = value.substring(1); } if (fieldname === 'event') { this.event = value; } else if (fieldname === 'data') { this.data.push(value); } return null; } } function partition(str, delimiter) { const index = str.indexOf(delimiter); if (index !== -1) { return [str.substring(0, index), delimiter, str.substring(index + delimiter.length)]; } return [str, '', '']; } //# sourceMappingURL=streaming.js.map