- 新增 HTTP 客户端(src/services/apiClient.ts) - 实现对话创建、消息发送、对话中止等 API 调用 - 支持用户答案提交和对话历史查询 - 统一的错误处理和超时控制 - 新增 SSE 事件处理器(src/services/sseHandler.ts) - 实现 Server-Sent Events 流式数据解析 - 支持 MessageChunk、ToolExecution、AskUser、Error 等事件类型 - 使用 eventsource-parser 库处理 SSE 数据流 - 提供事件回调机制,支持实时 UI 更新
297 lines
7.4 KiB
TypeScript
297 lines
7.4 KiB
TypeScript
/**
|
|
* SSE 事件处理器
|
|
* 处理与后端的流式通信
|
|
* 使用 eventsource-parser + Node.js 原生 http 模块
|
|
*/
|
|
import * as http from 'http';
|
|
import * as https from 'https';
|
|
import { URL } from 'url';
|
|
import { createParser, type EventSourceParser } from 'eventsource-parser';
|
|
import { getApiUrl, getConfig } from '../config/settings';
|
|
import type {
|
|
DialogRequest,
|
|
SSEEventType,
|
|
TextDeltaEvent,
|
|
ToolCallRequest,
|
|
AskUserEvent,
|
|
CompleteEvent,
|
|
ErrorEvent,
|
|
ToolStartEvent,
|
|
ToolCompleteEvent,
|
|
ToolErrorEvent,
|
|
WarningEvent,
|
|
NotificationEvent,
|
|
DepthUpdateEvent
|
|
} from '../types/api';
|
|
|
|
/**
|
|
* SSE 事件回调接口
|
|
*/
|
|
export interface SSECallbacks {
|
|
/** 收到文本增量 */
|
|
onTextDelta?: (data: TextDeltaEvent) => void;
|
|
/** 收到工具调用请求 */
|
|
onToolCall?: (data: ToolCallRequest) => void;
|
|
/** 工具开始执行 */
|
|
onToolStart?: (data: ToolStartEvent) => void;
|
|
/** 工具执行完成 */
|
|
onToolComplete?: (data: ToolCompleteEvent) => void;
|
|
/** 工具执行错误 */
|
|
onToolError?: (data: ToolErrorEvent) => void;
|
|
/** 收到用户提问 */
|
|
onAskUser?: (data: AskUserEvent) => void;
|
|
/** 对话完成 */
|
|
onComplete?: (data: CompleteEvent) => void;
|
|
/** 错误 */
|
|
onError?: (data: ErrorEvent) => void;
|
|
/** 警告 */
|
|
onWarning?: (data: WarningEvent) => void;
|
|
/** 通知 */
|
|
onNotification?: (data: NotificationEvent) => void;
|
|
/** 深度更新 */
|
|
onDepthUpdate?: (data: DepthUpdateEvent) => void;
|
|
/** 连接打开 */
|
|
onOpen?: () => void;
|
|
/** 连接关闭 */
|
|
onClose?: () => void;
|
|
}
|
|
|
|
/**
|
|
* SSE 会话控制器
|
|
*/
|
|
export class SSEController {
|
|
private request: http.ClientRequest | null = null;
|
|
private isConnected = false;
|
|
private isAborted = false;
|
|
|
|
/**
|
|
* 是否已连接
|
|
*/
|
|
get connected(): boolean {
|
|
return this.isConnected;
|
|
}
|
|
|
|
/**
|
|
* 设置请求对象
|
|
*/
|
|
setRequest(req: http.ClientRequest): void {
|
|
this.request = req;
|
|
}
|
|
|
|
/**
|
|
* 设置连接状态
|
|
*/
|
|
setConnected(connected: boolean): void {
|
|
this.isConnected = connected;
|
|
}
|
|
|
|
/**
|
|
* 是否已中止
|
|
*/
|
|
get aborted(): boolean {
|
|
return this.isAborted;
|
|
}
|
|
|
|
/**
|
|
* 中止当前连接
|
|
*/
|
|
abort(): void {
|
|
if (this.request && !this.isAborted) {
|
|
this.isAborted = true;
|
|
this.request.destroy();
|
|
this.request = null;
|
|
this.isConnected = false;
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* 发起流式对话
|
|
* @param request 对话请求
|
|
* @param callbacks 事件回调
|
|
* @returns SSE 控制器(用于中止连接)
|
|
*/
|
|
export async function startStreamDialog(
|
|
request: DialogRequest,
|
|
callbacks: SSECallbacks
|
|
): Promise<SSEController> {
|
|
const controller = new SSEController();
|
|
|
|
const urlString = getApiUrl('/api/dialog/stream');
|
|
const url = new URL(urlString);
|
|
const isHttps = url.protocol === 'https:';
|
|
const httpModule = isHttps ? https : http;
|
|
|
|
const body = JSON.stringify(request);
|
|
|
|
console.log(`[SSE] 开始流式对话: taskId=${request.taskId}, url=${urlString}`);
|
|
|
|
return new Promise((resolve, reject) => {
|
|
const options: http.RequestOptions = {
|
|
hostname: url.hostname,
|
|
port: url.port || (isHttps ? 443 : 80),
|
|
path: url.pathname + url.search,
|
|
method: 'POST',
|
|
headers: {
|
|
'Content-Type': 'application/json',
|
|
'Accept': 'text/event-stream',
|
|
'Cache-Control': 'no-cache',
|
|
'Content-Length': Buffer.byteLength(body)
|
|
}
|
|
};
|
|
|
|
const req = httpModule.request(options, (res) => {
|
|
// 检查响应状态
|
|
if (res.statusCode !== 200) {
|
|
let errorBody = '';
|
|
res.on('data', chunk => errorBody += chunk);
|
|
res.on('end', () => {
|
|
const error = new Error(`SSE 连接失败: ${res.statusCode} ${errorBody}`);
|
|
callbacks.onError?.({ message: error.message });
|
|
reject(error);
|
|
});
|
|
return;
|
|
}
|
|
|
|
// 连接成功
|
|
console.log('[SSE] 连接已建立');
|
|
controller.setConnected(true);
|
|
callbacks.onOpen?.();
|
|
resolve(controller);
|
|
|
|
// 创建 SSE 解析器
|
|
const parser = createParser({
|
|
onEvent: (event) => {
|
|
const eventType = event.event as SSEEventType;
|
|
const eventData = event.data;
|
|
|
|
if (!eventData) {
|
|
console.log(`[SSE] 收到空事件: ${eventType}`);
|
|
return;
|
|
}
|
|
|
|
try {
|
|
const data = JSON.parse(eventData);
|
|
console.log(`[SSE] 收到事件: ${eventType}`, data);
|
|
|
|
// 分发事件到对应回调
|
|
dispatchEvent(eventType, data, callbacks);
|
|
} catch (e) {
|
|
console.error(`[SSE] 解析事件数据失败: ${eventData}`, e);
|
|
}
|
|
}
|
|
});
|
|
|
|
// 设置编码
|
|
res.setEncoding('utf8');
|
|
|
|
// 处理数据流
|
|
res.on('data', (chunk: string) => {
|
|
if (!controller.aborted) {
|
|
console.log('[SSE] 收到原始数据块:', chunk.substring(0, 200));
|
|
parser.feed(chunk);
|
|
}
|
|
});
|
|
|
|
// 处理连接关闭
|
|
res.on('end', () => {
|
|
console.log('[SSE] 连接已关闭');
|
|
controller.setConnected(false);
|
|
callbacks.onClose?.();
|
|
});
|
|
|
|
// 处理错误
|
|
res.on('error', (err) => {
|
|
if (!controller.aborted) {
|
|
console.error('[SSE] 响应错误:', err);
|
|
controller.setConnected(false);
|
|
callbacks.onError?.({ message: err.message });
|
|
}
|
|
});
|
|
});
|
|
|
|
// 保存请求引用用于中止
|
|
controller.setRequest(req);
|
|
|
|
// 处理请求错误
|
|
req.on('error', (err) => {
|
|
if (!controller.aborted) {
|
|
console.error('[SSE] 请求错误:', err);
|
|
controller.setConnected(false);
|
|
callbacks.onError?.({ message: err.message });
|
|
reject(err);
|
|
}
|
|
});
|
|
|
|
// 处理超时
|
|
const { timeout } = getConfig();
|
|
req.setTimeout(timeout, () => {
|
|
if (!controller.aborted) {
|
|
console.error('[SSE] 请求超时');
|
|
controller.abort();
|
|
const error = new Error('请求超时');
|
|
callbacks.onError?.({ message: error.message });
|
|
reject(error);
|
|
}
|
|
});
|
|
|
|
// 发送请求体
|
|
req.write(body);
|
|
req.end();
|
|
});
|
|
}
|
|
|
|
/**
|
|
* 分发 SSE 事件到对应回调
|
|
*/
|
|
function dispatchEvent(
|
|
eventType: SSEEventType,
|
|
data: unknown,
|
|
callbacks: SSECallbacks
|
|
): void {
|
|
switch (eventType) {
|
|
case 'text_delta':
|
|
callbacks.onTextDelta?.(data as TextDeltaEvent);
|
|
break;
|
|
case 'tool_call':
|
|
callbacks.onToolCall?.(data as ToolCallRequest);
|
|
break;
|
|
case 'tool_start':
|
|
callbacks.onToolStart?.(data as ToolStartEvent);
|
|
break;
|
|
case 'tool_complete':
|
|
callbacks.onToolComplete?.(data as ToolCompleteEvent);
|
|
break;
|
|
case 'tool_error':
|
|
callbacks.onToolError?.(data as ToolErrorEvent);
|
|
break;
|
|
case 'ask_user':
|
|
callbacks.onAskUser?.(data as AskUserEvent);
|
|
break;
|
|
case 'complete':
|
|
callbacks.onComplete?.(data as CompleteEvent);
|
|
break;
|
|
case 'error':
|
|
callbacks.onError?.(data as ErrorEvent);
|
|
break;
|
|
case 'warning':
|
|
callbacks.onWarning?.(data as WarningEvent);
|
|
break;
|
|
case 'notification':
|
|
callbacks.onNotification?.(data as NotificationEvent);
|
|
break;
|
|
case 'depth_update':
|
|
callbacks.onDepthUpdate?.(data as DepthUpdateEvent);
|
|
break;
|
|
default:
|
|
console.log(`[SSE] 未知事件类型: ${eventType}`, data);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* 生成任务ID
|
|
*/
|
|
export function generateTaskId(): string {
|
|
return `task-${Date.now()}-${Math.random().toString(36).substring(2, 8)}`;
|
|
}
|