From 43a727d46d456284559a9b42c1e9133587cc5932 Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Tue, 21 Apr 2026 20:02:27 -0700 Subject: [PATCH 1/2] Avoid bun memory leak bug from TransformStream --- apps/sim/executor/execution/block-executor.ts | 167 +++++++----------- .../executor/handlers/agent/agent-handler.ts | 10 +- apps/sim/executor/handlers/agent/memory.ts | 29 --- apps/sim/executor/types.ts | 6 + 4 files changed, 83 insertions(+), 129 deletions(-) diff --git a/apps/sim/executor/execution/block-executor.ts b/apps/sim/executor/execution/block-executor.ts index c348aa401fb..30ffdbae7fc 100644 --- a/apps/sim/executor/execution/block-executor.ts +++ b/apps/sim/executor/execution/block-executor.ts @@ -34,6 +34,7 @@ import { type ExecutionContext, getNextExecutionOrder, type NormalizedBlockOutput, + type StreamingExecution, } from '@/executor/types' import { streamingResponseFormatProcessor } from '@/executor/utils' import { buildBlockExecutionError, normalizeError } from '@/executor/utils/errors' @@ -140,7 +141,7 @@ export class BlockExecutor { let normalizedOutput: NormalizedBlockOutput if (isStreamingExecution) { - const streamingExec = output as { stream: ReadableStream; execution: any } + const streamingExec = output as StreamingExecution if (ctx.onStream) { await this.handleStreamingExecution( @@ -602,7 +603,7 @@ export class BlockExecutor { ctx: ExecutionContext, node: DAGNode, block: SerializedBlock, - streamingExec: { stream: ReadableStream; execution: any }, + streamingExec: StreamingExecution, resolvedInputs: Record, selectedOutputs: string[] ): Promise { @@ -613,56 +614,37 @@ export class BlockExecutor { (block.config?.params as Record | undefined)?.responseFormat ?? (block.config as Record | undefined)?.responseFormat - const stream = streamingExec.stream - if (typeof stream.tee !== 'function') { - await this.forwardStream(ctx, blockId, streamingExec, stream, responseFormat, selectedOutputs) - return - } + const sourceReader = streamingExec.stream.getReader() + const decoder = new TextDecoder() + const accumulated: string[] = [] + let drainError: unknown - const [clientStream, executorStream] = stream.tee() + const clientSource = new ReadableStream({ + async pull(controller) { + try { + const { done, value } = await sourceReader.read() + if (done) { + const tail = decoder.decode() + if (tail) accumulated.push(tail) + controller.close() + return + } + accumulated.push(decoder.decode(value, { stream: true })) + controller.enqueue(value) + } catch (error) { + drainError = error + controller.error(error) + } + }, + async cancel(reason) { + try { + await sourceReader.cancel(reason) + } catch {} + }, + }) const processedClientStream = streamingResponseFormatProcessor.processStream( - clientStream, - blockId, - selectedOutputs, - responseFormat - ) - - const clientStreamingExec = { - ...streamingExec, - stream: processedClientStream, - } - - const executorConsumption = this.consumeExecutorStream( - executorStream, - streamingExec, - blockId, - responseFormat - ) - - const clientConsumption = (async () => { - try { - await ctx.onStream?.(clientStreamingExec) - } catch (error) { - this.execLogger.error('Error in onStream callback', { blockId, error }) - // Cancel the client stream to release the tee'd buffer - await processedClientStream.cancel().catch(() => {}) - } - })() - - await Promise.all([clientConsumption, executorConsumption]) - } - - private async forwardStream( - ctx: ExecutionContext, - blockId: string, - streamingExec: { stream: ReadableStream; execution: any }, - stream: ReadableStream, - responseFormat: any, - selectedOutputs: string[] - ): Promise { - const processedStream = streamingResponseFormatProcessor.processStream( - stream, + clientSource, blockId, selectedOutputs, responseFormat @@ -670,72 +652,59 @@ export class BlockExecutor { try { await ctx.onStream?.({ - ...streamingExec, - stream: processedStream, + stream: processedClientStream, + execution: streamingExec.execution, }) } catch (error) { this.execLogger.error('Error in onStream callback', { blockId, error }) - await processedStream.cancel().catch(() => {}) - } - } - - private async consumeExecutorStream( - stream: ReadableStream, - streamingExec: { execution: any }, - blockId: string, - responseFormat: any - ): Promise { - const reader = stream.getReader() - const decoder = new TextDecoder() - const chunks: string[] = [] - - try { - while (true) { - const { done, value } = await reader.read() - if (done) break - chunks.push(decoder.decode(value, { stream: true })) - } - const tail = decoder.decode() - if (tail) chunks.push(tail) - } catch (error) { - this.execLogger.error('Error reading executor stream for block', { blockId, error }) + await processedClientStream.cancel().catch(() => {}) } finally { try { - await reader.cancel().catch(() => {}) + sourceReader.releaseLock() } catch {} } - const fullContent = chunks.join('') - if (!fullContent) { + if (drainError) { + this.execLogger.error('Error reading stream for block', { blockId, error: drainError }) return } - const executionOutput = streamingExec.execution?.output - if (!executionOutput || typeof executionOutput !== 'object') { - return + const fullContent = accumulated.join('') + if (fullContent) { + const executionOutput = streamingExec.execution?.output + if (executionOutput && typeof executionOutput === 'object') { + let parsedForFormat = false + if (responseFormat) { + try { + const parsed = JSON.parse(fullContent.trim()) + streamingExec.execution.output = { + ...parsed, + tokens: executionOutput.tokens, + toolCalls: executionOutput.toolCalls, + providerTiming: executionOutput.providerTiming, + cost: executionOutput.cost, + model: executionOutput.model, + } + parsedForFormat = true + } catch (error) { + this.execLogger.warn('Failed to parse streamed content for response format', { + blockId, + error, + }) + } + } + if (!parsedForFormat) { + executionOutput.content = fullContent + } + } } - if (responseFormat) { + if (streamingExec.onFullContent) { try { - const parsed = JSON.parse(fullContent.trim()) - - streamingExec.execution.output = { - ...parsed, - tokens: executionOutput.tokens, - toolCalls: executionOutput.toolCalls, - providerTiming: executionOutput.providerTiming, - cost: executionOutput.cost, - model: executionOutput.model, - } - return + await streamingExec.onFullContent(fullContent) } catch (error) { - this.execLogger.warn('Failed to parse streamed content for response format', { - blockId, - error, - }) + this.execLogger.error('onFullContent callback failed', { blockId, error }) } } - - executionOutput.content = fullContent } } diff --git a/apps/sim/executor/handlers/agent/agent-handler.ts b/apps/sim/executor/handlers/agent/agent-handler.ts index fcf43b6844b..a77807b8430 100644 --- a/apps/sim/executor/handlers/agent/agent-handler.ts +++ b/apps/sim/executor/handlers/agent/agent-handler.ts @@ -958,8 +958,16 @@ export class AgentBlockHandler implements BlockHandler { streamingExec: StreamingExecution ): StreamingExecution { return { - stream: memoryService.wrapStreamForPersistence(streamingExec.stream, ctx, inputs), + stream: streamingExec.stream, execution: streamingExec.execution, + onFullContent: async (content: string) => { + if (!content.trim()) return + try { + await memoryService.appendToMemory(ctx, inputs, { role: 'assistant', content }) + } catch (error) { + logger.error('Failed to persist streaming response:', error) + } + }, } } diff --git a/apps/sim/executor/handlers/agent/memory.ts b/apps/sim/executor/handlers/agent/memory.ts index 6428f0b7607..fcba3628226 100644 --- a/apps/sim/executor/handlers/agent/memory.ts +++ b/apps/sim/executor/handlers/agent/memory.ts @@ -111,35 +111,6 @@ export class Memory { }) } - wrapStreamForPersistence( - stream: ReadableStream, - ctx: ExecutionContext, - inputs: AgentInputs - ): ReadableStream { - const chunks: string[] = [] - const decoder = new TextDecoder() - - const transformStream = new TransformStream({ - transform: (chunk, controller) => { - controller.enqueue(chunk) - const decoded = decoder.decode(chunk, { stream: true }) - chunks.push(decoded) - }, - - flush: () => { - const content = chunks.join('') - if (content.trim()) { - this.appendToMemory(ctx, inputs, { - role: 'assistant', - content, - }).catch((error) => logger.error('Failed to persist streaming response:', error)) - } - }, - }) - - return stream.pipeThrough(transformStream) - } - private requireWorkspaceId(ctx: ExecutionContext): string { if (!ctx.workspaceId) { throw new Error('workspaceId is required for memory operations') diff --git a/apps/sim/executor/types.ts b/apps/sim/executor/types.ts index 00caff1d9ef..b8b0d4cfa0d 100644 --- a/apps/sim/executor/types.ts +++ b/apps/sim/executor/types.ts @@ -359,6 +359,12 @@ export interface ExecutionResult { export interface StreamingExecution { stream: ReadableStream execution: ExecutionResult & { isStreaming?: boolean } + /** + * Invoked with the assembled response text after the stream drains. Lets agent + * blocks persist the full response without interposing a TransformStream on a + * fetch-backed source — that pattern amplifies memory on Bun via #28035. + */ + onFullContent?: (content: string) => void | Promise } export interface BlockExecutor { From c29447afd6493f663d328f38ae75d9a4e9e8adf5 Mon Sep 17 00:00:00 2001 From: Theodore Li Date: Wed, 22 Apr 2026 17:27:36 -0700 Subject: [PATCH 2/2] fix(executor): skip content persistence when stream consumer exits early MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously, if the onStream consumer caught an internal error without re-throwing, the block-executor would treat the shortened accumulator as the complete response, persist a truncated string to memory via appendToMemory, and set it as executionOutput.content. Track whether the source ReadableStream actually closed (done=true) in the pull handler. If onStream returns before the source drains, skip content persistence and log a warning — the old tee()-based flow was immune to this because the executor branch drained independently of the client branch. Co-Authored-By: Claude Opus 4.7 (1M context) --- apps/sim/executor/execution/block-executor.ts | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/apps/sim/executor/execution/block-executor.ts b/apps/sim/executor/execution/block-executor.ts index 30ffdbae7fc..15de50e6a1c 100644 --- a/apps/sim/executor/execution/block-executor.ts +++ b/apps/sim/executor/execution/block-executor.ts @@ -618,6 +618,7 @@ export class BlockExecutor { const decoder = new TextDecoder() const accumulated: string[] = [] let drainError: unknown + let sourceFullyDrained = false const clientSource = new ReadableStream({ async pull(controller) { @@ -626,6 +627,7 @@ export class BlockExecutor { if (done) { const tail = decoder.decode() if (tail) accumulated.push(tail) + sourceFullyDrained = true controller.close() return } @@ -669,6 +671,17 @@ export class BlockExecutor { return } + // If the onStream consumer exited before the source drained (e.g. it caught + // an internal error and returned normally), `accumulated` holds a truncated + // response. Persisting that to memory or setting it as the block output + // would corrupt downstream state — skip and log instead. + if (!sourceFullyDrained) { + this.execLogger.warn('Stream consumer exited before source drained; skipping content persistence', { + blockId, + }) + return + } + const fullContent = accumulated.join('') if (fullContent) { const executionOutput = streamingExec.execution?.output