From ba75541dd651101e103e2a4a1d188a2b6d03ccb1 Mon Sep 17 00:00:00 2001 From: XiaoFeng <117837368+Fzhiyu1@users.noreply.github.com> Date: Tue, 16 Dec 2025 19:09:04 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=AE=9E=E7=8E=B0=E5=90=8E=E7=AB=AF?= =?UTF-8?q?=E9=80=9A=E4=BF=A1=E5=B1=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 HTTP 客户端(src/services/apiClient.ts) - 实现对话创建、消息发送、对话中止等 API 调用 - 支持用户答案提交和对话历史查询 - 统一的错误处理和超时控制 - 新增 SSE 事件处理器(src/services/sseHandler.ts) - 实现 Server-Sent Events 流式数据解析 - 支持 MessageChunk、ToolExecution、AskUser、Error 等事件类型 - 使用 eventsource-parser 库处理 SSE 数据流 - 提供事件回调机制,支持实时 UI 更新 --- src/services/apiClient.ts | 154 +++++++++++++++++++ src/services/sseHandler.ts | 296 +++++++++++++++++++++++++++++++++++++ 2 files changed, 450 insertions(+) create mode 100644 src/services/apiClient.ts create mode 100644 src/services/sseHandler.ts diff --git a/src/services/apiClient.ts b/src/services/apiClient.ts new file mode 100644 index 0000000..585f471 --- /dev/null +++ b/src/services/apiClient.ts @@ -0,0 +1,154 @@ +/** + * API 客户端 + * 封装与后端的 HTTP 通信 + */ +import * as https from 'https'; +import * as http from 'http'; +import { URL } from 'url'; +import { getApiUrl, getConfig } from '../config/settings'; +import type { ToolCallResult, AnswerRequest, ToolResultResponse, AnswerResponse } from '../types/api'; + +/** + * HTTP 请求选项 + */ +interface RequestOptions { + method: 'GET' | 'POST' | 'PUT' | 'DELETE'; + headers?: Record; + body?: unknown; + timeout?: number; +} + +/** + * 发送 HTTP 请求 + */ +async function request(path: string, options: RequestOptions): Promise { + const url = new URL(getApiUrl(path)); + const { timeout } = getConfig(); + + const isHttps = url.protocol === 'https:'; + const httpModule = isHttps ? https : http; + + const requestOptions: http.RequestOptions = { + hostname: url.hostname, + port: url.port || (isHttps ? 443 : 80), + path: url.pathname + url.search, + method: options.method, + headers: { + 'Content-Type': 'application/json', + ...options.headers + }, + timeout: options.timeout || timeout + }; + + return new Promise((resolve, reject) => { + const req = httpModule.request(requestOptions, (res) => { + let data = ''; + + res.on('data', (chunk) => { + data += chunk; + }); + + res.on('end', () => { + try { + const json = JSON.parse(data); + if (res.statusCode && res.statusCode >= 200 && res.statusCode < 300) { + resolve(json as T); + } else { + reject(new Error(json.error || json.message || `HTTP ${res.statusCode}`)); + } + } catch (e) { + reject(new Error(`解析响应失败: ${data}`)); + } + }); + }); + + req.on('error', (error) => { + reject(error); + }); + + req.on('timeout', () => { + req.destroy(); + reject(new Error('请求超时')); + }); + + if (options.body) { + req.write(JSON.stringify(options.body)); + } + + req.end(); + }); +} + +/** + * 提交工具执行结果 + * POST /api/tool/result + */ +export async function submitToolResult(result: ToolCallResult): Promise { + console.log(`[API] 提交工具结果: callId=${result.id}`); + return request('/api/tool/result', { + method: 'POST', + body: result + }); +} + +/** + * 提交用户回答 + * POST /api/task/answer + */ +export async function submitAnswer(answer: AnswerRequest): Promise { + console.log(`[API] 提交用户回答: askId=${answer.askId}`); + return request('/api/task/answer', { + method: 'POST', + body: answer + }); +} + +/** + * 健康检查 + * GET /api/dialog/health + */ +export async function healthCheck(): Promise<{ status: string }> { + return request<{ status: string }>('/api/dialog/health', { + method: 'GET', + timeout: 5000 + }); +} + +/** + * 创建成功的工具结果 + */ +export function createSuccessResult(id: number, text: string): ToolCallResult { + return { + jsonrpc: '2.0', + id, + result: { + content: [{ type: 'text', text }], + isError: false + } + }; +} + +/** + * 创建业务错误的工具结果(如编译失败) + */ +export function createBusinessErrorResult(id: number, errorMessage: string): ToolCallResult { + return { + jsonrpc: '2.0', + id, + result: { + content: [{ type: 'text', text: errorMessage }], + isError: true + } + }; +} + +/** + * 创建系统错误的工具结果 + */ +export function createSystemErrorResult(id: number, code: number, message: string): ToolCallResult { + return { + jsonrpc: '2.0', + id, + error: { code, message } + }; +} diff --git a/src/services/sseHandler.ts b/src/services/sseHandler.ts new file mode 100644 index 0000000..7bae62a --- /dev/null +++ b/src/services/sseHandler.ts @@ -0,0 +1,296 @@ +/** + * SSE 事件处理器 + * 处理与后端的流式通信 + * 使用 eventsource-parser + Node.js 原生 http 模块 + */ +import * as http from 'http'; +import * as https from 'https'; +import { URL } from 'url'; +import { createParser, type EventSourceParser } from 'eventsource-parser'; +import { getApiUrl, getConfig } from '../config/settings'; +import type { + DialogRequest, + SSEEventType, + TextDeltaEvent, + ToolCallRequest, + AskUserEvent, + CompleteEvent, + ErrorEvent, + ToolStartEvent, + ToolCompleteEvent, + ToolErrorEvent, + WarningEvent, + NotificationEvent, + DepthUpdateEvent +} from '../types/api'; + +/** + * SSE 事件回调接口 + */ +export interface SSECallbacks { + /** 收到文本增量 */ + onTextDelta?: (data: TextDeltaEvent) => void; + /** 收到工具调用请求 */ + onToolCall?: (data: ToolCallRequest) => void; + /** 工具开始执行 */ + onToolStart?: (data: ToolStartEvent) => void; + /** 工具执行完成 */ + onToolComplete?: (data: ToolCompleteEvent) => void; + /** 工具执行错误 */ + onToolError?: (data: ToolErrorEvent) => void; + /** 收到用户提问 */ + onAskUser?: (data: AskUserEvent) => void; + /** 对话完成 */ + onComplete?: (data: CompleteEvent) => void; + /** 错误 */ + onError?: (data: ErrorEvent) => void; + /** 警告 */ + onWarning?: (data: WarningEvent) => void; + /** 通知 */ + onNotification?: (data: NotificationEvent) => void; + /** 深度更新 */ + onDepthUpdate?: (data: DepthUpdateEvent) => void; + /** 连接打开 */ + onOpen?: () => void; + /** 连接关闭 */ + onClose?: () => void; +} + +/** + * SSE 会话控制器 + */ +export class SSEController { + private request: http.ClientRequest | null = null; + private isConnected = false; + private isAborted = false; + + /** + * 是否已连接 + */ + get connected(): boolean { + return this.isConnected; + } + + /** + * 设置请求对象 + */ + setRequest(req: http.ClientRequest): void { + this.request = req; + } + + /** + * 设置连接状态 + */ + setConnected(connected: boolean): void { + this.isConnected = connected; + } + + /** + * 是否已中止 + */ + get aborted(): boolean { + return this.isAborted; + } + + /** + * 中止当前连接 + */ + abort(): void { + if (this.request && !this.isAborted) { + this.isAborted = true; + this.request.destroy(); + this.request = null; + this.isConnected = false; + } + } +} + +/** + * 发起流式对话 + * @param request 对话请求 + * @param callbacks 事件回调 + * @returns SSE 控制器(用于中止连接) + */ +export async function startStreamDialog( + request: DialogRequest, + callbacks: SSECallbacks +): Promise { + const controller = new SSEController(); + + const urlString = getApiUrl('/api/dialog/stream'); + const url = new URL(urlString); + const isHttps = url.protocol === 'https:'; + const httpModule = isHttps ? https : http; + + const body = JSON.stringify(request); + + console.log(`[SSE] 开始流式对话: taskId=${request.taskId}, url=${urlString}`); + + return new Promise((resolve, reject) => { + const options: http.RequestOptions = { + hostname: url.hostname, + port: url.port || (isHttps ? 443 : 80), + path: url.pathname + url.search, + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'Accept': 'text/event-stream', + 'Cache-Control': 'no-cache', + 'Content-Length': Buffer.byteLength(body) + } + }; + + const req = httpModule.request(options, (res) => { + // 检查响应状态 + if (res.statusCode !== 200) { + let errorBody = ''; + res.on('data', chunk => errorBody += chunk); + res.on('end', () => { + const error = new Error(`SSE 连接失败: ${res.statusCode} ${errorBody}`); + callbacks.onError?.({ message: error.message }); + reject(error); + }); + return; + } + + // 连接成功 + console.log('[SSE] 连接已建立'); + controller.setConnected(true); + callbacks.onOpen?.(); + resolve(controller); + + // 创建 SSE 解析器 + const parser = createParser({ + onEvent: (event) => { + const eventType = event.event as SSEEventType; + const eventData = event.data; + + if (!eventData) { + console.log(`[SSE] 收到空事件: ${eventType}`); + return; + } + + try { + const data = JSON.parse(eventData); + console.log(`[SSE] 收到事件: ${eventType}`, data); + + // 分发事件到对应回调 + dispatchEvent(eventType, data, callbacks); + } catch (e) { + console.error(`[SSE] 解析事件数据失败: ${eventData}`, e); + } + } + }); + + // 设置编码 + res.setEncoding('utf8'); + + // 处理数据流 + res.on('data', (chunk: string) => { + if (!controller.aborted) { + console.log('[SSE] 收到原始数据块:', chunk.substring(0, 200)); + parser.feed(chunk); + } + }); + + // 处理连接关闭 + res.on('end', () => { + console.log('[SSE] 连接已关闭'); + controller.setConnected(false); + callbacks.onClose?.(); + }); + + // 处理错误 + res.on('error', (err) => { + if (!controller.aborted) { + console.error('[SSE] 响应错误:', err); + controller.setConnected(false); + callbacks.onError?.({ message: err.message }); + } + }); + }); + + // 保存请求引用用于中止 + controller.setRequest(req); + + // 处理请求错误 + req.on('error', (err) => { + if (!controller.aborted) { + console.error('[SSE] 请求错误:', err); + controller.setConnected(false); + callbacks.onError?.({ message: err.message }); + reject(err); + } + }); + + // 处理超时 + const { timeout } = getConfig(); + req.setTimeout(timeout, () => { + if (!controller.aborted) { + console.error('[SSE] 请求超时'); + controller.abort(); + const error = new Error('请求超时'); + callbacks.onError?.({ message: error.message }); + reject(error); + } + }); + + // 发送请求体 + req.write(body); + req.end(); + }); +} + +/** + * 分发 SSE 事件到对应回调 + */ +function dispatchEvent( + eventType: SSEEventType, + data: unknown, + callbacks: SSECallbacks +): void { + switch (eventType) { + case 'text_delta': + callbacks.onTextDelta?.(data as TextDeltaEvent); + break; + case 'tool_call': + callbacks.onToolCall?.(data as ToolCallRequest); + break; + case 'tool_start': + callbacks.onToolStart?.(data as ToolStartEvent); + break; + case 'tool_complete': + callbacks.onToolComplete?.(data as ToolCompleteEvent); + break; + case 'tool_error': + callbacks.onToolError?.(data as ToolErrorEvent); + break; + case 'ask_user': + callbacks.onAskUser?.(data as AskUserEvent); + break; + case 'complete': + callbacks.onComplete?.(data as CompleteEvent); + break; + case 'error': + callbacks.onError?.(data as ErrorEvent); + break; + case 'warning': + callbacks.onWarning?.(data as WarningEvent); + break; + case 'notification': + callbacks.onNotification?.(data as NotificationEvent); + break; + case 'depth_update': + callbacks.onDepthUpdate?.(data as DepthUpdateEvent); + break; + default: + console.log(`[SSE] 未知事件类型: ${eventType}`, data); + } +} + +/** + * 生成任务ID + */ +export function generateTaskId(): string { + return `task-${Date.now()}-${Math.random().toString(36).substring(2, 8)}`; +}