Files
IC-Coder-Plugin/src/services/sseHandler.ts
XiaoFeng 72a84ed9e2 fix: 修复 showPlan 工具交互逻辑和 JWT Token 问题
- 修复 pendingQuestions 缺失时无法提交回答的问题
- 添加 fallbackTaskId 参数支持直接发送到后端
- apiClient 自动获取 JWT Token
- 取消按钮改为中止对话而非发送消息
2026-01-13 10:58:33 +08:00

393 lines
11 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/**
* SSE 事件处理器
* 处理与后端的流式通信
* 使用 eventsource-parser + Node.js 原生 http 模块
*/
import * as http from 'http';
import * as https from 'https';
import { URL } from 'url';
import { createParser, type EventSourceParser } from 'eventsource-parser';
import { getApiUrl, getConfig } from '../config/settings';
import type {
DialogRequest,
SSEEventType,
TextDeltaEvent,
ToolCallRequest,
ToolConfirmEvent,
PlanConfirmEvent,
AskUserEvent,
CompleteEvent,
ErrorEvent,
ToolStartEvent,
ToolCompleteEvent,
ToolErrorEvent,
WarningEvent,
NotificationEvent,
DepthUpdateEvent,
AgentStartEvent,
AgentProgressEvent,
AgentCompleteEvent,
AgentErrorEvent,
ContextUsageEvent,
CreditUpdateEvent
} from '../types/api';
import type { MemoryCompactedEvent } from '../types/memory';
/**
* SSE 事件回调接口
*/
export interface SSECallbacks {
/** 收到文本增量 */
onTextDelta?: (data: TextDeltaEvent) => void;
/** 收到工具调用请求 */
onToolCall?: (data: ToolCallRequest) => void;
/** 收到工具确认请求Ask 模式) */
onToolConfirm?: (data: ToolConfirmEvent) => void;
/** 收到计划确认请求Plan 模式) */
onPlanConfirm?: (data: PlanConfirmEvent) => void;
/** 阶段进度更新 */
onPhaseProgress?: (data: import('../types/api').PhaseProgressEvent) => void;
/** 添加计划步骤 */
onPlanStepAdd?: (data: import('../types/api').PlanStepAddEvent) => void;
/** 删除计划步骤 */
onPlanStepRemove?: (data: import('../types/api').PlanStepRemoveEvent) => void;
/** 更新计划步骤 */
onPlanStepUpdate?: (data: import('../types/api').PlanStepUpdateEvent) => void;
/** 更新计划摘要 */
onPlanSummaryUpdate?: (data: import('../types/api').PlanSummaryUpdateEvent) => void;
/** 工具开始执行 */
onToolStart?: (data: ToolStartEvent) => void;
/** 工具执行完成 */
onToolComplete?: (data: ToolCompleteEvent) => void;
/** 工具执行错误 */
onToolError?: (data: ToolErrorEvent) => void;
/** 收到用户提问 */
onAskUser?: (data: AskUserEvent) => void;
/** 对话完成 */
onComplete?: (data: CompleteEvent) => void;
/** 错误 */
onError?: (data: ErrorEvent) => void;
/** 警告 */
onWarning?: (data: WarningEvent) => void;
/** 通知 */
onNotification?: (data: NotificationEvent) => void;
/** 深度更新 */
onDepthUpdate?: (data: DepthUpdateEvent) => void;
/** 子智能体启动 */
onAgentStart?: (data: AgentStartEvent) => void;
/** 子智能体进度 */
onAgentProgress?: (data: AgentProgressEvent) => void;
/** 子智能体完成 */
onAgentComplete?: (data: AgentCompleteEvent) => void;
/** 子智能体错误 */
onAgentError?: (data: AgentErrorEvent) => void;
/** 记忆压缩完成 */
onMemoryCompacted?: (data: MemoryCompactedEvent) => void;
/** 上下文使用量更新 */
onContextUsage?: (data: ContextUsageEvent) => void;
/** 资源点余额更新 */
onCreditUpdate?: (data: CreditUpdateEvent) => void;
/** 连接打开 */
onOpen?: () => void;
/** 连接关闭 */
onClose?: () => void;
}
/**
* SSE 会话控制器
*/
export class SSEController {
private request: http.ClientRequest | null = null;
private isConnected = false;
private isAborted = false;
/**
* 是否已连接
*/
get connected(): boolean {
return this.isConnected;
}
/**
* 设置请求对象
*/
setRequest(req: http.ClientRequest): void {
this.request = req;
}
/**
* 设置连接状态
*/
setConnected(connected: boolean): void {
this.isConnected = connected;
}
/**
* 是否已中止
*/
get aborted(): boolean {
return this.isAborted;
}
/**
* 中止当前连接
*/
abort(): void {
if (this.request && !this.isAborted) {
this.isAborted = true;
this.request.destroy();
this.request = null;
this.isConnected = false;
}
}
}
/**
* 发起流式对话
* @param request 对话请求
* @param callbacks 事件回调
* @returns SSE 控制器(用于中止连接)
*/
export async function startStreamDialog(
request: DialogRequest,
callbacks: SSECallbacks
): Promise<SSEController> {
const controller = new SSEController();
const urlString = getApiUrl('/api/dialog/stream');
const url = new URL(urlString);
const isHttps = url.protocol === 'https:';
const httpModule = isHttps ? https : http;
const body = JSON.stringify(request);
console.log(`[SSE] 开始流式对话: taskId=${request.taskId}, mode=${request.mode}, url=${urlString}`);
return new Promise((resolve, reject) => {
const options: http.RequestOptions = {
hostname: url.hostname,
port: url.port || (isHttps ? 443 : 80),
path: url.pathname + url.search,
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Accept': 'text/event-stream',
'Cache-Control': 'no-cache',
'Content-Length': Buffer.byteLength(body),
...(request.token ? { 'Authorization': `Bearer ${request.token}` } : {})
}
};
const req = httpModule.request(options, (res) => {
// 检查响应状态
if (res.statusCode !== 200) {
let errorBody = '';
res.on('data', chunk => errorBody += chunk);
res.on('end', () => {
// 检测是否是登录状态过期
const isLoginExpired = errorBody.includes('登录状态已过期') ||
errorBody.includes('token') && errorBody.includes('过期') ||
res.statusCode === 401;
if (isLoginExpired) {
const error = new Error('LOGIN_EXPIRED:登录状态已过期,请重新登录');
callbacks.onError?.({ message: error.message });
reject(error);
} else {
const error = new Error(`SSE 连接失败: ${res.statusCode} ${errorBody}`);
callbacks.onError?.({ message: error.message });
reject(error);
}
});
return;
}
// 连接成功
console.log('[SSE] 连接已建立');
controller.setConnected(true);
callbacks.onOpen?.();
resolve(controller);
// 创建 SSE 解析器
const parser = createParser({
onEvent: (event) => {
const eventType = event.event as SSEEventType;
const eventData = event.data;
if (!eventData) {
console.log(`[SSE] 收到空事件: ${eventType}`);
return;
}
try {
const data = JSON.parse(eventData);
console.log(`[SSE] 收到事件: ${eventType}`, data);
// 分发事件到对应回调
dispatchEvent(eventType, data, callbacks);
} catch (e) {
console.error(`[SSE] 解析事件数据失败: ${eventData}`, e);
}
}
});
// 设置编码
res.setEncoding('utf8');
// 处理数据流
res.on('data', (chunk: string) => {
if (!controller.aborted) {
console.log('[SSE] 收到原始数据块:', chunk.substring(0, 200));
parser.feed(chunk);
}
});
// 处理连接关闭
res.on('end', () => {
console.log('[SSE] 连接已关闭');
controller.setConnected(false);
callbacks.onClose?.();
});
// 处理错误
res.on('error', (err) => {
if (!controller.aborted) {
console.error('[SSE] 响应错误:', err);
controller.setConnected(false);
callbacks.onError?.({ message: err.message });
}
});
});
// 保存请求引用用于中止
controller.setRequest(req);
// 处理请求错误
req.on('error', (err) => {
if (!controller.aborted) {
console.error('[SSE] 请求错误:', err);
controller.setConnected(false);
callbacks.onError?.({ message: err.message });
reject(err);
}
});
// 处理超时
const { timeout } = getConfig();
req.setTimeout(timeout, () => {
if (!controller.aborted) {
console.error('[SSE] 请求超时');
controller.abort();
const error = new Error('请求超时');
callbacks.onError?.({ message: error.message });
reject(error);
}
});
// 发送请求体
req.write(body);
req.end();
});
}
/**
* 分发 SSE 事件到对应回调
*/
function dispatchEvent(
eventType: SSEEventType,
data: unknown,
callbacks: SSECallbacks
): void {
switch (eventType) {
case 'text_delta':
callbacks.onTextDelta?.(data as TextDeltaEvent);
break;
case 'tool_call':
callbacks.onToolCall?.(data as ToolCallRequest);
break;
case 'tool_confirm':
callbacks.onToolConfirm?.(data as ToolConfirmEvent);
break;
case 'plan_confirm':
callbacks.onPlanConfirm?.(data as PlanConfirmEvent);
break;
case 'phase_progress':
callbacks.onPhaseProgress?.(data as import('../types/api').PhaseProgressEvent);
break;
case 'plan_step_add':
callbacks.onPlanStepAdd?.(data as import('../types/api').PlanStepAddEvent);
break;
case 'plan_step_remove':
callbacks.onPlanStepRemove?.(data as import('../types/api').PlanStepRemoveEvent);
break;
case 'plan_step_update':
callbacks.onPlanStepUpdate?.(data as import('../types/api').PlanStepUpdateEvent);
break;
case 'plan_summary_update':
callbacks.onPlanSummaryUpdate?.(data as import('../types/api').PlanSummaryUpdateEvent);
break;
case 'tool_start':
callbacks.onToolStart?.(data as ToolStartEvent);
break;
case 'tool_complete':
callbacks.onToolComplete?.(data as ToolCompleteEvent);
break;
case 'tool_error':
callbacks.onToolError?.(data as ToolErrorEvent);
break;
case 'ask_user':
callbacks.onAskUser?.(data as AskUserEvent);
break;
case 'complete':
callbacks.onComplete?.(data as CompleteEvent);
break;
case 'error':
callbacks.onError?.(data as ErrorEvent);
break;
case 'warning':
callbacks.onWarning?.(data as WarningEvent);
break;
case 'notification':
callbacks.onNotification?.(data as NotificationEvent);
break;
case 'depth_update':
callbacks.onDepthUpdate?.(data as DepthUpdateEvent);
break;
case 'agent_start':
callbacks.onAgentStart?.(data as AgentStartEvent);
break;
case 'agent_progress':
callbacks.onAgentProgress?.(data as AgentProgressEvent);
break;
case 'agent_complete':
callbacks.onAgentComplete?.(data as AgentCompleteEvent);
break;
case 'agent_error':
callbacks.onAgentError?.(data as AgentErrorEvent);
break;
case 'memory_compacted':
callbacks.onMemoryCompacted?.(data as MemoryCompactedEvent);
break;
case 'context_usage':
callbacks.onContextUsage?.(data as ContextUsageEvent);
break;
case 'credit_update':
callbacks.onCreditUpdate?.(data as CreditUpdateEvent);
break;
case 'heartbeat':
// 心跳事件:仅用于保持连接,不需要特殊处理
// Node.js req.setTimeout 会在收到数据时自动重置计时器
console.log('[SSE] 收到心跳');
break;
default:
console.log(`[SSE] 未知事件类型: ${eventType}`, data);
}
}
/**
* 生成任务ID
*/
export function generateTaskId(): string {
return `task-${Date.now()}-${Math.random().toString(36).substring(2, 8)}`;
}