288 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
			
		
		
	
	
			288 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
	
	
| var _Stream_client;
 | |
| import { __classPrivateFieldGet, __classPrivateFieldSet } from "../internal/tslib.mjs";
 | |
| import { OpenAIError } from "./error.mjs";
 | |
| import { makeReadableStream } from "../internal/shims.mjs";
 | |
| import { findDoubleNewlineIndex, LineDecoder } from "../internal/decoders/line.mjs";
 | |
| import { ReadableStreamToAsyncIterable } from "../internal/shims.mjs";
 | |
| import { isAbortError } from "../internal/errors.mjs";
 | |
| import { encodeUTF8 } from "../internal/utils/bytes.mjs";
 | |
| import { loggerFor } from "../internal/utils/log.mjs";
 | |
| import { APIError } from "./error.mjs";
 | |
| export class Stream {
 | |
|     constructor(iterator, controller, client) {
 | |
|         this.iterator = iterator;
 | |
|         _Stream_client.set(this, void 0);
 | |
|         this.controller = controller;
 | |
|         __classPrivateFieldSet(this, _Stream_client, client, "f");
 | |
|     }
 | |
|     static fromSSEResponse(response, controller, client) {
 | |
|         let consumed = false;
 | |
|         const logger = client ? loggerFor(client) : console;
 | |
|         async function* iterator() {
 | |
|             if (consumed) {
 | |
|                 throw new OpenAIError('Cannot iterate over a consumed stream, use `.tee()` to split the stream.');
 | |
|             }
 | |
|             consumed = true;
 | |
|             let done = false;
 | |
|             try {
 | |
|                 for await (const sse of _iterSSEMessages(response, controller)) {
 | |
|                     if (done)
 | |
|                         continue;
 | |
|                     if (sse.data.startsWith('[DONE]')) {
 | |
|                         done = true;
 | |
|                         continue;
 | |
|                     }
 | |
|                     if (sse.event === null || !sse.event.startsWith('thread.')) {
 | |
|                         let data;
 | |
|                         try {
 | |
|                             data = JSON.parse(sse.data);
 | |
|                         }
 | |
|                         catch (e) {
 | |
|                             logger.error(`Could not parse message into JSON:`, sse.data);
 | |
|                             logger.error(`From chunk:`, sse.raw);
 | |
|                             throw e;
 | |
|                         }
 | |
|                         if (data && data.error) {
 | |
|                             throw new APIError(undefined, data.error, undefined, response.headers);
 | |
|                         }
 | |
|                         yield data;
 | |
|                     }
 | |
|                     else {
 | |
|                         let data;
 | |
|                         try {
 | |
|                             data = JSON.parse(sse.data);
 | |
|                         }
 | |
|                         catch (e) {
 | |
|                             console.error(`Could not parse message into JSON:`, sse.data);
 | |
|                             console.error(`From chunk:`, sse.raw);
 | |
|                             throw e;
 | |
|                         }
 | |
|                         // TODO: Is this where the error should be thrown?
 | |
|                         if (sse.event == 'error') {
 | |
|                             throw new APIError(undefined, data.error, data.message, undefined);
 | |
|                         }
 | |
|                         yield { event: sse.event, data: data };
 | |
|                     }
 | |
|                 }
 | |
|                 done = true;
 | |
|             }
 | |
|             catch (e) {
 | |
|                 // If the user calls `stream.controller.abort()`, we should exit without throwing.
 | |
|                 if (isAbortError(e))
 | |
|                     return;
 | |
|                 throw e;
 | |
|             }
 | |
|             finally {
 | |
|                 // If the user `break`s, abort the ongoing request.
 | |
|                 if (!done)
 | |
|                     controller.abort();
 | |
|             }
 | |
|         }
 | |
|         return new Stream(iterator, controller, client);
 | |
|     }
 | |
|     /**
 | |
|      * Generates a Stream from a newline-separated ReadableStream
 | |
|      * where each item is a JSON value.
 | |
|      */
 | |
|     static fromReadableStream(readableStream, controller, client) {
 | |
|         let consumed = false;
 | |
|         async function* iterLines() {
 | |
|             const lineDecoder = new LineDecoder();
 | |
|             const iter = ReadableStreamToAsyncIterable(readableStream);
 | |
|             for await (const chunk of iter) {
 | |
|                 for (const line of lineDecoder.decode(chunk)) {
 | |
|                     yield line;
 | |
|                 }
 | |
|             }
 | |
|             for (const line of lineDecoder.flush()) {
 | |
|                 yield line;
 | |
|             }
 | |
|         }
 | |
|         async function* iterator() {
 | |
|             if (consumed) {
 | |
|                 throw new OpenAIError('Cannot iterate over a consumed stream, use `.tee()` to split the stream.');
 | |
|             }
 | |
|             consumed = true;
 | |
|             let done = false;
 | |
|             try {
 | |
|                 for await (const line of iterLines()) {
 | |
|                     if (done)
 | |
|                         continue;
 | |
|                     if (line)
 | |
|                         yield JSON.parse(line);
 | |
|                 }
 | |
|                 done = true;
 | |
|             }
 | |
|             catch (e) {
 | |
|                 // If the user calls `stream.controller.abort()`, we should exit without throwing.
 | |
|                 if (isAbortError(e))
 | |
|                     return;
 | |
|                 throw e;
 | |
|             }
 | |
|             finally {
 | |
|                 // If the user `break`s, abort the ongoing request.
 | |
|                 if (!done)
 | |
|                     controller.abort();
 | |
|             }
 | |
|         }
 | |
|         return new Stream(iterator, controller, client);
 | |
|     }
 | |
|     [(_Stream_client = new WeakMap(), Symbol.asyncIterator)]() {
 | |
|         return this.iterator();
 | |
|     }
 | |
|     /**
 | |
|      * Splits the stream into two streams which can be
 | |
|      * independently read from at different speeds.
 | |
|      */
 | |
|     tee() {
 | |
|         const left = [];
 | |
|         const right = [];
 | |
|         const iterator = this.iterator();
 | |
|         const teeIterator = (queue) => {
 | |
|             return {
 | |
|                 next: () => {
 | |
|                     if (queue.length === 0) {
 | |
|                         const result = iterator.next();
 | |
|                         left.push(result);
 | |
|                         right.push(result);
 | |
|                     }
 | |
|                     return queue.shift();
 | |
|                 },
 | |
|             };
 | |
|         };
 | |
|         return [
 | |
|             new Stream(() => teeIterator(left), this.controller, __classPrivateFieldGet(this, _Stream_client, "f")),
 | |
|             new Stream(() => teeIterator(right), this.controller, __classPrivateFieldGet(this, _Stream_client, "f")),
 | |
|         ];
 | |
|     }
 | |
|     /**
 | |
|      * Converts this stream to a newline-separated ReadableStream of
 | |
|      * JSON stringified values in the stream
 | |
|      * which can be turned back into a Stream with `Stream.fromReadableStream()`.
 | |
|      */
 | |
|     toReadableStream() {
 | |
|         const self = this;
 | |
|         let iter;
 | |
|         return makeReadableStream({
 | |
|             async start() {
 | |
|                 iter = self[Symbol.asyncIterator]();
 | |
|             },
 | |
|             async pull(ctrl) {
 | |
|                 try {
 | |
|                     const { value, done } = await iter.next();
 | |
|                     if (done)
 | |
|                         return ctrl.close();
 | |
|                     const bytes = encodeUTF8(JSON.stringify(value) + '\n');
 | |
|                     ctrl.enqueue(bytes);
 | |
|                 }
 | |
|                 catch (err) {
 | |
|                     ctrl.error(err);
 | |
|                 }
 | |
|             },
 | |
|             async cancel() {
 | |
|                 await iter.return?.();
 | |
|             },
 | |
|         });
 | |
|     }
 | |
| }
 | |
| export async function* _iterSSEMessages(response, controller) {
 | |
|     if (!response.body) {
 | |
|         controller.abort();
 | |
|         if (typeof globalThis.navigator !== 'undefined' &&
 | |
|             globalThis.navigator.product === 'ReactNative') {
 | |
|             throw new OpenAIError(`The default react-native fetch implementation does not support streaming. Please use expo/fetch: https://docs.expo.dev/versions/latest/sdk/expo/#expofetch-api`);
 | |
|         }
 | |
|         throw new OpenAIError(`Attempted to iterate over a response with no body`);
 | |
|     }
 | |
|     const sseDecoder = new SSEDecoder();
 | |
|     const lineDecoder = new LineDecoder();
 | |
|     const iter = ReadableStreamToAsyncIterable(response.body);
 | |
|     for await (const sseChunk of iterSSEChunks(iter)) {
 | |
|         for (const line of lineDecoder.decode(sseChunk)) {
 | |
|             const sse = sseDecoder.decode(line);
 | |
|             if (sse)
 | |
|                 yield sse;
 | |
|         }
 | |
|     }
 | |
|     for (const line of lineDecoder.flush()) {
 | |
|         const sse = sseDecoder.decode(line);
 | |
|         if (sse)
 | |
|             yield sse;
 | |
|     }
 | |
| }
 | |
| /**
 | |
|  * Given an async iterable iterator, iterates over it and yields full
 | |
|  * SSE chunks, i.e. yields when a double new-line is encountered.
 | |
|  */
 | |
| async function* iterSSEChunks(iterator) {
 | |
|     let data = new Uint8Array();
 | |
|     for await (const chunk of iterator) {
 | |
|         if (chunk == null) {
 | |
|             continue;
 | |
|         }
 | |
|         const binaryChunk = chunk instanceof ArrayBuffer ? new Uint8Array(chunk)
 | |
|             : typeof chunk === 'string' ? encodeUTF8(chunk)
 | |
|                 : chunk;
 | |
|         let newData = new Uint8Array(data.length + binaryChunk.length);
 | |
|         newData.set(data);
 | |
|         newData.set(binaryChunk, data.length);
 | |
|         data = newData;
 | |
|         let patternIndex;
 | |
|         while ((patternIndex = findDoubleNewlineIndex(data)) !== -1) {
 | |
|             yield data.slice(0, patternIndex);
 | |
|             data = data.slice(patternIndex);
 | |
|         }
 | |
|     }
 | |
|     if (data.length > 0) {
 | |
|         yield data;
 | |
|     }
 | |
| }
 | |
| class SSEDecoder {
 | |
|     constructor() {
 | |
|         this.event = null;
 | |
|         this.data = [];
 | |
|         this.chunks = [];
 | |
|     }
 | |
|     decode(line) {
 | |
|         if (line.endsWith('\r')) {
 | |
|             line = line.substring(0, line.length - 1);
 | |
|         }
 | |
|         if (!line) {
 | |
|             // empty line and we didn't previously encounter any messages
 | |
|             if (!this.event && !this.data.length)
 | |
|                 return null;
 | |
|             const sse = {
 | |
|                 event: this.event,
 | |
|                 data: this.data.join('\n'),
 | |
|                 raw: this.chunks,
 | |
|             };
 | |
|             this.event = null;
 | |
|             this.data = [];
 | |
|             this.chunks = [];
 | |
|             return sse;
 | |
|         }
 | |
|         this.chunks.push(line);
 | |
|         if (line.startsWith(':')) {
 | |
|             return null;
 | |
|         }
 | |
|         let [fieldname, _, value] = partition(line, ':');
 | |
|         if (value.startsWith(' ')) {
 | |
|             value = value.substring(1);
 | |
|         }
 | |
|         if (fieldname === 'event') {
 | |
|             this.event = value;
 | |
|         }
 | |
|         else if (fieldname === 'data') {
 | |
|             this.data.push(value);
 | |
|         }
 | |
|         return null;
 | |
|     }
 | |
| }
 | |
| function partition(str, delimiter) {
 | |
|     const index = str.indexOf(delimiter);
 | |
|     if (index !== -1) {
 | |
|         return [str.substring(0, index), delimiter, str.substring(index + delimiter.length)];
 | |
|     }
 | |
|     return [str, '', ''];
 | |
| }
 | |
| //# sourceMappingURL=streaming.mjs.map
 |