/** * SSE 事件处理器 * 处理与后端的流式通信 * 使用 eventsource-parser + Node.js 原生 http 模块 */ import * as http from 'http'; import * as https from 'https'; import { URL } from 'url'; import { createParser, type EventSourceParser } from 'eventsource-parser'; import { getApiUrl, getConfig } from '../config/settings'; import type { DialogRequest, SSEEventType, TextDeltaEvent, ToolCallRequest, ToolConfirmEvent, PlanConfirmEvent, AskUserEvent, CompleteEvent, ErrorEvent, ToolStartEvent, ToolCompleteEvent, ToolErrorEvent, WarningEvent, NotificationEvent, DepthUpdateEvent, AgentStartEvent, AgentProgressEvent, AgentCompleteEvent, AgentErrorEvent, ContextUsageEvent, CreditUpdateEvent } from '../types/api'; import type { MemoryCompactedEvent } from '../types/memory'; /** * SSE 事件回调接口 */ export interface SSECallbacks { /** 收到文本增量 */ onTextDelta?: (data: TextDeltaEvent) => void; /** 收到工具调用请求 */ onToolCall?: (data: ToolCallRequest) => void; /** 收到工具确认请求(Ask 模式) */ onToolConfirm?: (data: ToolConfirmEvent) => void; /** 收到计划确认请求(Plan 模式) */ onPlanConfirm?: (data: PlanConfirmEvent) => void; /** 阶段进度更新 */ onPhaseProgress?: (data: import('../types/api').PhaseProgressEvent) => void; /** 添加计划步骤 */ onPlanStepAdd?: (data: import('../types/api').PlanStepAddEvent) => void; /** 删除计划步骤 */ onPlanStepRemove?: (data: import('../types/api').PlanStepRemoveEvent) => void; /** 更新计划步骤 */ onPlanStepUpdate?: (data: import('../types/api').PlanStepUpdateEvent) => void; /** 更新计划摘要 */ onPlanSummaryUpdate?: (data: import('../types/api').PlanSummaryUpdateEvent) => void; /** 工具开始执行 */ onToolStart?: (data: ToolStartEvent) => void; /** 工具执行完成 */ onToolComplete?: (data: ToolCompleteEvent) => void; /** 工具执行错误 */ onToolError?: (data: ToolErrorEvent) => void; /** 收到用户提问 */ onAskUser?: (data: AskUserEvent) => void; /** 对话完成 */ onComplete?: (data: CompleteEvent) => void; /** 错误 */ onError?: (data: ErrorEvent) => void; /** 警告 */ onWarning?: (data: WarningEvent) => void; /** 通知 */ onNotification?: (data: NotificationEvent) => void; /** 深度更新 */ onDepthUpdate?: (data: DepthUpdateEvent) => void; /** 子智能体启动 */ onAgentStart?: (data: AgentStartEvent) => void; /** 子智能体进度 */ onAgentProgress?: (data: AgentProgressEvent) => void; /** 子智能体完成 */ onAgentComplete?: (data: AgentCompleteEvent) => void; /** 子智能体错误 */ onAgentError?: (data: AgentErrorEvent) => void; /** 记忆压缩完成 */ onMemoryCompacted?: (data: MemoryCompactedEvent) => void; /** 上下文使用量更新 */ onContextUsage?: (data: ContextUsageEvent) => void; /** 资源点余额更新 */ onCreditUpdate?: (data: CreditUpdateEvent) => void; /** 连接打开 */ onOpen?: () => void; /** 连接关闭 */ onClose?: () => void; } /** * SSE 会话控制器 */ export class SSEController { private request: http.ClientRequest | null = null; private isConnected = false; private isAborted = false; /** * 是否已连接 */ get connected(): boolean { return this.isConnected; } /** * 设置请求对象 */ setRequest(req: http.ClientRequest): void { this.request = req; } /** * 设置连接状态 */ setConnected(connected: boolean): void { this.isConnected = connected; } /** * 是否已中止 */ get aborted(): boolean { return this.isAborted; } /** * 中止当前连接 */ abort(): void { if (this.request && !this.isAborted) { this.isAborted = true; this.request.destroy(); this.request = null; this.isConnected = false; } } } /** * 发起流式对话 * @param request 对话请求 * @param callbacks 事件回调 * @returns SSE 控制器(用于中止连接) */ export async function startStreamDialog( request: DialogRequest, callbacks: SSECallbacks ): Promise { const controller = new SSEController(); const urlString = getApiUrl('/api/dialog/stream'); const url = new URL(urlString); const isHttps = url.protocol === 'https:'; const httpModule = isHttps ? https : http; const body = JSON.stringify(request); console.log(`[SSE] 开始流式对话: taskId=${request.taskId}, mode=${request.mode}, url=${urlString}`); return new Promise((resolve, reject) => { const options: http.RequestOptions = { hostname: url.hostname, port: url.port || (isHttps ? 443 : 80), path: url.pathname + url.search, method: 'POST', headers: { 'Content-Type': 'application/json', 'Accept': 'text/event-stream', 'Cache-Control': 'no-cache', 'Content-Length': Buffer.byteLength(body), ...(request.token ? { 'Authorization': `Bearer ${request.token}` } : {}) } }; const req = httpModule.request(options, (res) => { // 检查响应状态 if (res.statusCode !== 200) { let errorBody = ''; res.on('data', chunk => errorBody += chunk); res.on('end', () => { // 检测是否是登录状态过期 const isLoginExpired = errorBody.includes('登录状态已过期') || errorBody.includes('token') && errorBody.includes('过期') || res.statusCode === 401; if (isLoginExpired) { const error = new Error('LOGIN_EXPIRED:登录状态已过期,请重新登录'); callbacks.onError?.({ message: error.message }); reject(error); } else { const error = new Error(`SSE 连接失败: ${res.statusCode} ${errorBody}`); callbacks.onError?.({ message: error.message }); reject(error); } }); return; } // 连接成功 console.log('[SSE] 连接已建立'); controller.setConnected(true); callbacks.onOpen?.(); resolve(controller); // 创建 SSE 解析器 const parser = createParser({ onEvent: (event) => { const eventType = event.event as SSEEventType; const eventData = event.data; if (!eventData) { console.log(`[SSE] 收到空事件: ${eventType}`); return; } try { const data = JSON.parse(eventData); console.log(`[SSE] 收到事件: ${eventType}`, data); // 分发事件到对应回调 dispatchEvent(eventType, data, callbacks); } catch (e) { console.error(`[SSE] 解析事件数据失败: ${eventData}`, e); } } }); // 设置编码 res.setEncoding('utf8'); // 处理数据流 res.on('data', (chunk: string) => { if (!controller.aborted) { console.log('[SSE] 收到原始数据块:', chunk.substring(0, 200)); // 检查是否是业务错误码(Gateway 返回 HTTP 200 但响应体是错误 JSON) try { const trimmed = chunk.trim(); if (trimmed.startsWith('{') && trimmed.includes('"code"')) { const json = JSON.parse(trimmed); if (json.code === 401 || json.msg?.includes('登录状态已过期')) { console.log('[SSE] 检测到登录过期业务错误'); const error = new Error('LOGIN_EXPIRED:登录状态已过期,请重新登录'); callbacks.onError?.({ message: error.message }); controller.abort(); reject(error); return; } } } catch { // 不是 JSON 格式,继续正常处理 } parser.feed(chunk); } }); // 处理连接关闭 res.on('end', () => { console.log('[SSE] 连接已关闭'); controller.setConnected(false); callbacks.onClose?.(); }); // 处理错误 res.on('error', (err) => { if (!controller.aborted) { console.error('[SSE] 响应错误:', err); controller.setConnected(false); callbacks.onError?.({ message: err.message }); } }); }); // 保存请求引用用于中止 controller.setRequest(req); // 处理请求错误 req.on('error', (err) => { if (!controller.aborted) { console.error('[SSE] 请求错误:', err); controller.setConnected(false); callbacks.onError?.({ message: err.message }); reject(err); } }); // 处理超时 const { timeout } = getConfig(); req.setTimeout(timeout, () => { if (!controller.aborted) { console.error('[SSE] 请求超时'); controller.abort(); const error = new Error('请求超时'); callbacks.onError?.({ message: error.message }); reject(error); } }); // 发送请求体 req.write(body); req.end(); }); } /** * 分发 SSE 事件到对应回调 */ function dispatchEvent( eventType: SSEEventType, data: unknown, callbacks: SSECallbacks ): void { switch (eventType) { case 'text_delta': callbacks.onTextDelta?.(data as TextDeltaEvent); break; case 'tool_call': callbacks.onToolCall?.(data as ToolCallRequest); break; case 'tool_confirm': callbacks.onToolConfirm?.(data as ToolConfirmEvent); break; case 'plan_confirm': callbacks.onPlanConfirm?.(data as PlanConfirmEvent); break; case 'phase_progress': callbacks.onPhaseProgress?.(data as import('../types/api').PhaseProgressEvent); break; case 'plan_step_add': callbacks.onPlanStepAdd?.(data as import('../types/api').PlanStepAddEvent); break; case 'plan_step_remove': callbacks.onPlanStepRemove?.(data as import('../types/api').PlanStepRemoveEvent); break; case 'plan_step_update': callbacks.onPlanStepUpdate?.(data as import('../types/api').PlanStepUpdateEvent); break; case 'plan_summary_update': callbacks.onPlanSummaryUpdate?.(data as import('../types/api').PlanSummaryUpdateEvent); break; case 'tool_start': callbacks.onToolStart?.(data as ToolStartEvent); break; case 'tool_complete': callbacks.onToolComplete?.(data as ToolCompleteEvent); break; case 'tool_error': callbacks.onToolError?.(data as ToolErrorEvent); break; case 'ask_user': callbacks.onAskUser?.(data as AskUserEvent); break; case 'complete': callbacks.onComplete?.(data as CompleteEvent); break; case 'error': callbacks.onError?.(data as ErrorEvent); break; case 'warning': callbacks.onWarning?.(data as WarningEvent); break; case 'notification': callbacks.onNotification?.(data as NotificationEvent); break; case 'depth_update': callbacks.onDepthUpdate?.(data as DepthUpdateEvent); break; case 'agent_start': callbacks.onAgentStart?.(data as AgentStartEvent); break; case 'agent_progress': callbacks.onAgentProgress?.(data as AgentProgressEvent); break; case 'agent_complete': callbacks.onAgentComplete?.(data as AgentCompleteEvent); break; case 'agent_error': callbacks.onAgentError?.(data as AgentErrorEvent); break; case 'memory_compacted': callbacks.onMemoryCompacted?.(data as MemoryCompactedEvent); break; case 'context_usage': callbacks.onContextUsage?.(data as ContextUsageEvent); break; case 'credit_update': callbacks.onCreditUpdate?.(data as CreditUpdateEvent); break; case 'heartbeat': // 心跳事件:仅用于保持连接,不需要特殊处理 // Node.js req.setTimeout 会在收到数据时自动重置计时器 console.log('[SSE] 收到心跳'); break; default: console.log(`[SSE] 未知事件类型: ${eventType}`, data); } } /** * 生成任务ID */ export function generateTaskId(): string { return `task-${Date.now()}-${Math.random().toString(36).substring(2, 8)}`; }