Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 80 additions & 98 deletions apps/sim/executor/execution/block-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import {
type ExecutionContext,
getNextExecutionOrder,
type NormalizedBlockOutput,
type StreamingExecution,
} from '@/executor/types'
import { streamingResponseFormatProcessor } from '@/executor/utils'
import { buildBlockExecutionError, normalizeError } from '@/executor/utils/errors'
Expand Down Expand Up @@ -140,7 +141,7 @@ export class BlockExecutor {

let normalizedOutput: NormalizedBlockOutput
if (isStreamingExecution) {
const streamingExec = output as { stream: ReadableStream; execution: any }
const streamingExec = output as StreamingExecution

if (ctx.onStream) {
await this.handleStreamingExecution(
Expand Down Expand Up @@ -602,7 +603,7 @@ export class BlockExecutor {
ctx: ExecutionContext,
node: DAGNode,
block: SerializedBlock,
streamingExec: { stream: ReadableStream; execution: any },
streamingExec: StreamingExecution,
resolvedInputs: Record<string, any>,
selectedOutputs: string[]
): Promise<void> {
Expand All @@ -613,129 +614,110 @@ export class BlockExecutor {
(block.config?.params as Record<string, any> | undefined)?.responseFormat ??
(block.config as Record<string, any> | undefined)?.responseFormat

const stream = streamingExec.stream
if (typeof stream.tee !== 'function') {
await this.forwardStream(ctx, blockId, streamingExec, stream, responseFormat, selectedOutputs)
return
}
const sourceReader = streamingExec.stream.getReader()
const decoder = new TextDecoder()
const accumulated: string[] = []
let drainError: unknown
let sourceFullyDrained = false

const [clientStream, executorStream] = stream.tee()
const clientSource = new ReadableStream<Uint8Array>({
async pull(controller) {
try {
const { done, value } = await sourceReader.read()
if (done) {
const tail = decoder.decode()
if (tail) accumulated.push(tail)
sourceFullyDrained = true
controller.close()
return
}
accumulated.push(decoder.decode(value, { stream: true }))
controller.enqueue(value)
} catch (error) {
drainError = error
controller.error(error)
}
},
async cancel(reason) {
try {
await sourceReader.cancel(reason)
} catch {}
},
})

const processedClientStream = streamingResponseFormatProcessor.processStream(
clientStream,
blockId,
selectedOutputs,
responseFormat
)

const clientStreamingExec = {
...streamingExec,
stream: processedClientStream,
}

const executorConsumption = this.consumeExecutorStream(
executorStream,
streamingExec,
blockId,
responseFormat
)

const clientConsumption = (async () => {
try {
await ctx.onStream?.(clientStreamingExec)
} catch (error) {
this.execLogger.error('Error in onStream callback', { blockId, error })
// Cancel the client stream to release the tee'd buffer
await processedClientStream.cancel().catch(() => {})
}
})()

await Promise.all([clientConsumption, executorConsumption])
}

private async forwardStream(
ctx: ExecutionContext,
blockId: string,
streamingExec: { stream: ReadableStream; execution: any },
stream: ReadableStream,
responseFormat: any,
selectedOutputs: string[]
): Promise<void> {
const processedStream = streamingResponseFormatProcessor.processStream(
stream,
clientSource,
blockId,
selectedOutputs,
responseFormat
)

try {
await ctx.onStream?.({
...streamingExec,
stream: processedStream,
stream: processedClientStream,
execution: streamingExec.execution,
})
} catch (error) {
this.execLogger.error('Error in onStream callback', { blockId, error })
await processedStream.cancel().catch(() => {})
}
}

private async consumeExecutorStream(
stream: ReadableStream,
streamingExec: { execution: any },
blockId: string,
responseFormat: any
): Promise<void> {
const reader = stream.getReader()
const decoder = new TextDecoder()
const chunks: string[] = []

try {
while (true) {
const { done, value } = await reader.read()
if (done) break
chunks.push(decoder.decode(value, { stream: true }))
}
const tail = decoder.decode()
if (tail) chunks.push(tail)
} catch (error) {
this.execLogger.error('Error reading executor stream for block', { blockId, error })
await processedClientStream.cancel().catch(() => {})
} finally {
try {
await reader.cancel().catch(() => {})
sourceReader.releaseLock()
} catch {}
}

const fullContent = chunks.join('')
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partial content persisted when stream consumer errors internally

Medium Severity

Content accumulation is now coupled to the client's consumption of the stream via the pull callback in clientSource. Previously, stream.tee() gave the executor an independent copy that was always fully consumed regardless of client-side failures. Now, if an onStream consumer encounters an internal error (e.g., SSE sendEvent throws) but catches it without re-throwing, onStream returns normally, drainError is not set, and the code proceeds to process and persist the partially accumulated content — setting incomplete text on executionOutput.content and invoking onFullContent with a truncated string. This can cause incomplete assistant responses to be persisted to memory via appendToMemory.

Additional Locations (1)
Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 43a727d. Configure here.

if (!fullContent) {
if (drainError) {
this.execLogger.error('Error reading stream for block', { blockId, error: drainError })
return
}

const executionOutput = streamingExec.execution?.output
if (!executionOutput || typeof executionOutput !== 'object') {
// If the onStream consumer exited before the source drained (e.g. it caught
// an internal error and returned normally), `accumulated` holds a truncated
// response. Persisting that to memory or setting it as the block output
// would corrupt downstream state — skip and log instead.
if (!sourceFullyDrained) {
this.execLogger.warn('Stream consumer exited before source drained; skipping content persistence', {
blockId,
})
return
}

if (responseFormat) {
try {
const parsed = JSON.parse(fullContent.trim())

streamingExec.execution.output = {
...parsed,
tokens: executionOutput.tokens,
toolCalls: executionOutput.toolCalls,
providerTiming: executionOutput.providerTiming,
cost: executionOutput.cost,
model: executionOutput.model,
const fullContent = accumulated.join('')
if (fullContent) {
const executionOutput = streamingExec.execution?.output
if (executionOutput && typeof executionOutput === 'object') {
let parsedForFormat = false
if (responseFormat) {
try {
const parsed = JSON.parse(fullContent.trim())
streamingExec.execution.output = {
...parsed,
tokens: executionOutput.tokens,
toolCalls: executionOutput.toolCalls,
providerTiming: executionOutput.providerTiming,
cost: executionOutput.cost,
model: executionOutput.model,
}
parsedForFormat = true
} catch (error) {
this.execLogger.warn('Failed to parse streamed content for response format', {
blockId,
error,
})
}
}
if (!parsedForFormat) {
executionOutput.content = fullContent
}
return
} catch (error) {
this.execLogger.warn('Failed to parse streamed content for response format', {
blockId,
error,
})
}
}

executionOutput.content = fullContent
if (streamingExec.onFullContent) {
try {
await streamingExec.onFullContent(fullContent)
} catch (error) {
this.execLogger.error('onFullContent callback failed', { blockId, error })
}
}
}
}
10 changes: 9 additions & 1 deletion apps/sim/executor/handlers/agent/agent-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -958,8 +958,16 @@ export class AgentBlockHandler implements BlockHandler {
streamingExec: StreamingExecution
): StreamingExecution {
return {
stream: memoryService.wrapStreamForPersistence(streamingExec.stream, ctx, inputs),
stream: streamingExec.stream,
execution: streamingExec.execution,
onFullContent: async (content: string) => {
if (!content.trim()) return
try {
await memoryService.appendToMemory(ctx, inputs, { role: 'assistant', content })
} catch (error) {
logger.error('Failed to persist streaming response:', error)
}
},
}
}

Expand Down
29 changes: 0 additions & 29 deletions apps/sim/executor/handlers/agent/memory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,35 +111,6 @@ export class Memory {
})
}

wrapStreamForPersistence(
stream: ReadableStream<Uint8Array>,
ctx: ExecutionContext,
inputs: AgentInputs
): ReadableStream<Uint8Array> {
const chunks: string[] = []
const decoder = new TextDecoder()

const transformStream = new TransformStream<Uint8Array, Uint8Array>({
transform: (chunk, controller) => {
controller.enqueue(chunk)
const decoded = decoder.decode(chunk, { stream: true })
chunks.push(decoded)
},

flush: () => {
const content = chunks.join('')
if (content.trim()) {
this.appendToMemory(ctx, inputs, {
role: 'assistant',
content,
}).catch((error) => logger.error('Failed to persist streaming response:', error))
}
},
})

return stream.pipeThrough(transformStream)
}

private requireWorkspaceId(ctx: ExecutionContext): string {
if (!ctx.workspaceId) {
throw new Error('workspaceId is required for memory operations')
Expand Down
6 changes: 6 additions & 0 deletions apps/sim/executor/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,12 @@ export interface ExecutionResult {
export interface StreamingExecution {
stream: ReadableStream
execution: ExecutionResult & { isStreaming?: boolean }
/**
* Invoked with the assembled response text after the stream drains. Lets agent
* blocks persist the full response without interposing a TransformStream on a
* fetch-backed source — that pattern amplifies memory on Bun via #28035.
*/
onFullContent?: (content: string) => void | Promise<void>
}

export interface BlockExecutor {
Expand Down
Loading