feat: 实现后端通信层

- 新增 HTTP 客户端(src/services/apiClient.ts)
  - 实现对话创建、消息发送、对话中止等 API 调用
  - 支持用户答案提交和对话历史查询
  - 统一的错误处理和超时控制

- 新增 SSE 事件处理器(src/services/sseHandler.ts)
  - 实现 Server-Sent Events 流式数据解析
  - 支持 MessageChunk、ToolExecution、AskUser、Error 等事件类型
  - 使用 eventsource-parser 库处理 SSE 数据流
  - 提供事件回调机制,支持实时 UI 更新
This commit is contained in:
XiaoFeng
2025-12-16 19:09:04 +08:00
parent f87adab7be
commit ba75541dd6
2 changed files with 450 additions and 0 deletions

154
src/services/apiClient.ts Normal file
View File

@ -0,0 +1,154 @@
/**
* API 客户端
* 封装与后端的 HTTP 通信
*/
import * as https from 'https';
import * as http from 'http';
import { URL } from 'url';
import { getApiUrl, getConfig } from '../config/settings';
import type { ToolCallResult, AnswerRequest, ToolResultResponse, AnswerResponse } from '../types/api';
/**
* HTTP 请求选项
*/
interface RequestOptions {
method: 'GET' | 'POST' | 'PUT' | 'DELETE';
headers?: Record<string, string>;
body?: unknown;
timeout?: number;
}
/**
* 发送 HTTP 请求
*/
async function request<T>(path: string, options: RequestOptions): Promise<T> {
const url = new URL(getApiUrl(path));
const { timeout } = getConfig();
const isHttps = url.protocol === 'https:';
const httpModule = isHttps ? https : http;
const requestOptions: http.RequestOptions = {
hostname: url.hostname,
port: url.port || (isHttps ? 443 : 80),
path: url.pathname + url.search,
method: options.method,
headers: {
'Content-Type': 'application/json',
...options.headers
},
timeout: options.timeout || timeout
};
return new Promise((resolve, reject) => {
const req = httpModule.request(requestOptions, (res) => {
let data = '';
res.on('data', (chunk) => {
data += chunk;
});
res.on('end', () => {
try {
const json = JSON.parse(data);
if (res.statusCode && res.statusCode >= 200 && res.statusCode < 300) {
resolve(json as T);
} else {
reject(new Error(json.error || json.message || `HTTP ${res.statusCode}`));
}
} catch (e) {
reject(new Error(`解析响应失败: ${data}`));
}
});
});
req.on('error', (error) => {
reject(error);
});
req.on('timeout', () => {
req.destroy();
reject(new Error('请求超时'));
});
if (options.body) {
req.write(JSON.stringify(options.body));
}
req.end();
});
}
/**
* 提交工具执行结果
* POST /api/tool/result
*/
export async function submitToolResult(result: ToolCallResult): Promise<ToolResultResponse> {
console.log(`[API] 提交工具结果: callId=${result.id}`);
return request<ToolResultResponse>('/api/tool/result', {
method: 'POST',
body: result
});
}
/**
* 提交用户回答
* POST /api/task/answer
*/
export async function submitAnswer(answer: AnswerRequest): Promise<AnswerResponse> {
console.log(`[API] 提交用户回答: askId=${answer.askId}`);
return request<AnswerResponse>('/api/task/answer', {
method: 'POST',
body: answer
});
}
/**
* 健康检查
* GET /api/dialog/health
*/
export async function healthCheck(): Promise<{ status: string }> {
return request<{ status: string }>('/api/dialog/health', {
method: 'GET',
timeout: 5000
});
}
/**
* 创建成功的工具结果
*/
export function createSuccessResult(id: number, text: string): ToolCallResult {
return {
jsonrpc: '2.0',
id,
result: {
content: [{ type: 'text', text }],
isError: false
}
};
}
/**
* 创建业务错误的工具结果(如编译失败)
*/
export function createBusinessErrorResult(id: number, errorMessage: string): ToolCallResult {
return {
jsonrpc: '2.0',
id,
result: {
content: [{ type: 'text', text: errorMessage }],
isError: true
}
};
}
/**
* 创建系统错误的工具结果
*/
export function createSystemErrorResult(id: number, code: number, message: string): ToolCallResult {
return {
jsonrpc: '2.0',
id,
error: { code, message }
};
}

296
src/services/sseHandler.ts Normal file
View File

@ -0,0 +1,296 @@
/**
* SSE 事件处理器
* 处理与后端的流式通信
* 使用 eventsource-parser + Node.js 原生 http 模块
*/
import * as http from 'http';
import * as https from 'https';
import { URL } from 'url';
import { createParser, type EventSourceParser } from 'eventsource-parser';
import { getApiUrl, getConfig } from '../config/settings';
import type {
DialogRequest,
SSEEventType,
TextDeltaEvent,
ToolCallRequest,
AskUserEvent,
CompleteEvent,
ErrorEvent,
ToolStartEvent,
ToolCompleteEvent,
ToolErrorEvent,
WarningEvent,
NotificationEvent,
DepthUpdateEvent
} from '../types/api';
/**
* SSE 事件回调接口
*/
export interface SSECallbacks {
/** 收到文本增量 */
onTextDelta?: (data: TextDeltaEvent) => void;
/** 收到工具调用请求 */
onToolCall?: (data: ToolCallRequest) => void;
/** 工具开始执行 */
onToolStart?: (data: ToolStartEvent) => void;
/** 工具执行完成 */
onToolComplete?: (data: ToolCompleteEvent) => void;
/** 工具执行错误 */
onToolError?: (data: ToolErrorEvent) => void;
/** 收到用户提问 */
onAskUser?: (data: AskUserEvent) => void;
/** 对话完成 */
onComplete?: (data: CompleteEvent) => void;
/** 错误 */
onError?: (data: ErrorEvent) => void;
/** 警告 */
onWarning?: (data: WarningEvent) => void;
/** 通知 */
onNotification?: (data: NotificationEvent) => void;
/** 深度更新 */
onDepthUpdate?: (data: DepthUpdateEvent) => void;
/** 连接打开 */
onOpen?: () => void;
/** 连接关闭 */
onClose?: () => void;
}
/**
* SSE 会话控制器
*/
export class SSEController {
private request: http.ClientRequest | null = null;
private isConnected = false;
private isAborted = false;
/**
* 是否已连接
*/
get connected(): boolean {
return this.isConnected;
}
/**
* 设置请求对象
*/
setRequest(req: http.ClientRequest): void {
this.request = req;
}
/**
* 设置连接状态
*/
setConnected(connected: boolean): void {
this.isConnected = connected;
}
/**
* 是否已中止
*/
get aborted(): boolean {
return this.isAborted;
}
/**
* 中止当前连接
*/
abort(): void {
if (this.request && !this.isAborted) {
this.isAborted = true;
this.request.destroy();
this.request = null;
this.isConnected = false;
}
}
}
/**
* 发起流式对话
* @param request 对话请求
* @param callbacks 事件回调
* @returns SSE 控制器(用于中止连接)
*/
export async function startStreamDialog(
request: DialogRequest,
callbacks: SSECallbacks
): Promise<SSEController> {
const controller = new SSEController();
const urlString = getApiUrl('/api/dialog/stream');
const url = new URL(urlString);
const isHttps = url.protocol === 'https:';
const httpModule = isHttps ? https : http;
const body = JSON.stringify(request);
console.log(`[SSE] 开始流式对话: taskId=${request.taskId}, url=${urlString}`);
return new Promise((resolve, reject) => {
const options: http.RequestOptions = {
hostname: url.hostname,
port: url.port || (isHttps ? 443 : 80),
path: url.pathname + url.search,
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Accept': 'text/event-stream',
'Cache-Control': 'no-cache',
'Content-Length': Buffer.byteLength(body)
}
};
const req = httpModule.request(options, (res) => {
// 检查响应状态
if (res.statusCode !== 200) {
let errorBody = '';
res.on('data', chunk => errorBody += chunk);
res.on('end', () => {
const error = new Error(`SSE 连接失败: ${res.statusCode} ${errorBody}`);
callbacks.onError?.({ message: error.message });
reject(error);
});
return;
}
// 连接成功
console.log('[SSE] 连接已建立');
controller.setConnected(true);
callbacks.onOpen?.();
resolve(controller);
// 创建 SSE 解析器
const parser = createParser({
onEvent: (event) => {
const eventType = event.event as SSEEventType;
const eventData = event.data;
if (!eventData) {
console.log(`[SSE] 收到空事件: ${eventType}`);
return;
}
try {
const data = JSON.parse(eventData);
console.log(`[SSE] 收到事件: ${eventType}`, data);
// 分发事件到对应回调
dispatchEvent(eventType, data, callbacks);
} catch (e) {
console.error(`[SSE] 解析事件数据失败: ${eventData}`, e);
}
}
});
// 设置编码
res.setEncoding('utf8');
// 处理数据流
res.on('data', (chunk: string) => {
if (!controller.aborted) {
console.log('[SSE] 收到原始数据块:', chunk.substring(0, 200));
parser.feed(chunk);
}
});
// 处理连接关闭
res.on('end', () => {
console.log('[SSE] 连接已关闭');
controller.setConnected(false);
callbacks.onClose?.();
});
// 处理错误
res.on('error', (err) => {
if (!controller.aborted) {
console.error('[SSE] 响应错误:', err);
controller.setConnected(false);
callbacks.onError?.({ message: err.message });
}
});
});
// 保存请求引用用于中止
controller.setRequest(req);
// 处理请求错误
req.on('error', (err) => {
if (!controller.aborted) {
console.error('[SSE] 请求错误:', err);
controller.setConnected(false);
callbacks.onError?.({ message: err.message });
reject(err);
}
});
// 处理超时
const { timeout } = getConfig();
req.setTimeout(timeout, () => {
if (!controller.aborted) {
console.error('[SSE] 请求超时');
controller.abort();
const error = new Error('请求超时');
callbacks.onError?.({ message: error.message });
reject(error);
}
});
// 发送请求体
req.write(body);
req.end();
});
}
/**
* 分发 SSE 事件到对应回调
*/
function dispatchEvent(
eventType: SSEEventType,
data: unknown,
callbacks: SSECallbacks
): void {
switch (eventType) {
case 'text_delta':
callbacks.onTextDelta?.(data as TextDeltaEvent);
break;
case 'tool_call':
callbacks.onToolCall?.(data as ToolCallRequest);
break;
case 'tool_start':
callbacks.onToolStart?.(data as ToolStartEvent);
break;
case 'tool_complete':
callbacks.onToolComplete?.(data as ToolCompleteEvent);
break;
case 'tool_error':
callbacks.onToolError?.(data as ToolErrorEvent);
break;
case 'ask_user':
callbacks.onAskUser?.(data as AskUserEvent);
break;
case 'complete':
callbacks.onComplete?.(data as CompleteEvent);
break;
case 'error':
callbacks.onError?.(data as ErrorEvent);
break;
case 'warning':
callbacks.onWarning?.(data as WarningEvent);
break;
case 'notification':
callbacks.onNotification?.(data as NotificationEvent);
break;
case 'depth_update':
callbacks.onDepthUpdate?.(data as DepthUpdateEvent);
break;
default:
console.log(`[SSE] 未知事件类型: ${eventType}`, data);
}
}
/**
* 生成任务ID
*/
export function generateTaskId(): string {
return `task-${Date.now()}-${Math.random().toString(36).substring(2, 8)}`;
}