使用FastAPI和StreamableHTTP实现打字机流式对话

张开发
2026/4/10 23:39:41 15 分钟阅读

分享文章

使用FastAPI和StreamableHTTP实现打字机流式对话
使用FastAPI和StreamableHTTP实现打字机流式对话关注朋蛋、码上小明1 StreamableHTTP和SSESSEServer-Sent Events是一种单向、依赖长连接的传统实时推送技术而 Streamable HTTP 是一种更现代、灵活的协议设计它按需使用流式传输且对无状态架构友好。特性SSE (EventSourceResponse)StreamableHTTP (StreamingResponse)核心类fastapi.sse.EventSourceResponsefastapi.responses.StreamingResponse数据格式自动格式化为text/event-stream完全自定义 (如 JSON Lines, 二进制流)重连机制浏览器自动处理需手动实现适用场景简单的实时通知、状态更新AI 流式输出、文件下载、日志流服务器⚠️1后端需要返回 StreamingResponse并指定 media_type“text/event-stream”。2数据格式必须遵循 SSE 规范以 data: 开头并用两个换行符\n\n结尾。3数据格式示例data: {“msg”:“你好”} \n\n⚠️1后端同样返回 StreamingResponse但 MIME 类型更自由例如media_type“application/json”推荐、media_type“application/x-ndjson”、media_typetext/event-stream等。2数据格式可自定义通常每块数据是一个完整的 JSON 对象并以一个换行符 \n 结尾。3数据格式示例{“msg”:“你好”} \n客户端浏览器原生EventSource简单易用需使用fetchreader更灵活⚠️注意如果实现SSE可直接使用FastAPI中fastapi.sse的EventSourceResponse, ServerSentEvent两个模块。2 框架设计2.1 机构说明SSE和StreamableHttp的实时对话存在刷新后会自动断开无法进行直接连接一般需要记录实时对话的信息才能重连Websocket不存在这个问题。解决思路是先创建实时对话的后台任务服务并记录任务编号接着实时运行后台服务的对话并实时发送给Redis中的Stream最后对话服务实时监听Redis服务。其他说明1为了简单本文使用FaskAPI中的数据background作为后端任务可根据实际情况更换为celery。2为了时效性本文选择Redis中的Stream结构作为消息流实现实时对话。3使用Redis的XADD发送消息和XREAD接收消息。XREAD会在异常情况下丢失数据因为没有消息确认机制针对消息重要的环境可使用XREADGROUP不会丢失消息具有消息确认机制XREAD和XREADGROUP需求自己实时监听都不会阻塞主进程。不要使用Redis的发布订阅Pub/Sub此模式发送消息不会阻塞进程接收消息时会阻塞进程此外此外模式即发即收不存储数据用户在线能接收消息不在线消息会丢失上线也不会接收到以前的消息适合用于广播模式。Redis消息服务后台任务服务对话服务前端Redis消息服务后台任务服务对话服务前端1 实时对话流2 创建后台服务3 实时发送消息4 获取任编号5 获取任务消息6 实时监听消息7 StreamableHTTP实时流2.2 创建RedisRedis管理工具Redis-Insight-win-installer# 下载地址 https://redis.io/downloads/#insightredis.conf# redis.conf # 可远程连接 # bind 127.0.0.1 # 解除保护模式 protected-mode no # 数据持久化 appendonly yes # 设置密码 requirepass 123456Docker创建sudo docker run -itd \ --restartalways \ --name fl_redis \ -p 6379:6379 \ -v /home/redis/redis.conf:/etc/redis/redis.conf \ -v /home/redis/data:/data \ redis:8.6.2 redis-server /etc/redis/redis.conf3 创建前端3.1 创建流程Vue3的官网https://vuejs.org/使用Vite创建Vue3前端项目Vite的官网地址https://vite.dev/在cmd中使用命令创建项目npm create vitelatest # 设置项目信息 streamchat创建的流程如下npm create vitelatest Need to install the following packages: create-vite9.0.3 Ok to proceed? (y) y npx create-vite | o Project name: | streamchat | o Select a framework: | Vue | o Select a variant: | TypeScript | o Install with npm and start now? | Yes | o Scaffolding project in D:\1_projects\vscode\streamchat... | o Installing dependencies with npm... added 49 packages, and audited 50 packages in 20s 9 packages are looking for funding run npm fund for details found 0 vulnerabilities | o Starting dev server... streamchat0.0.0 dev vite VITE v8.0.3 ready in 408 ms ➜ Local: http://localhost:5173/ ➜ Network: use --host to expose ➜ press h enter to show help需要的依赖包VueUse是一个专为 Vue 3 开发者打造的、功能强大的工具函数库。# VueUse包 https://github.com/vueuse/vueusetypes/node的作用可在 VS Code 等编辑器中智能提示与自动补全。安装组件npm install --save-dev types/node npm install3.2 项目配置配置绝对目录”“。1Vite 配置 (vite.config.ts)import{defineConfig}fromviteimportvuefromvitejs/plugin-vueimportpathfrompath// https://vite.dev/config/exportdefaultdefineConfig({plugins:[vue()],// 配置服务server:{// 设置端口号port:5173,// 注意下面的代理配置在项目打包上线后会失效// 配置代理proxy:{// 代理 /api 请求/api:{target:http://localhost:3000,// 后端服务器地址changeOrigin:true},}},// 新加内容resolve:{alias:{// 核心配置 指向 src 目录:path.resolve(__dirname,./src)}}})2TypeScript 配置 (tsconfig.json或者tsconfig.app.json)如果tsconfig.json已经映射了tsconfig.app.json直接配置tsconfig.app.json即可。{extends:vue/tsconfig/tsconfig.dom.json,compilerOptions:{tsBuildInfoFile:./node_modules/.tmp/tsconfig.app.tsbuildinfo,types:[vite/client],/* Linting */strict:true,noUnusedLocals:true,noUnusedParameters:true,erasableSyntaxOnly:true,noFallthroughCasesInSwitch:true,noUncheckedSideEffectImports:true,// 新加内容// 设置绝对路径paths:{/*:[./src/*],}},include:[src/**/*.ts,src/**/*.tsx,src/**/*.vue]}3.3 项目编辑项目的常用命令# 启动服务 npm run dev项目结构apis/chats/index.tsexport class Message { chat_id: string last_event_id: number 0 question: string } // export const streamChat async(msg: Message): PromiseResponse { export const streamChat async(msg: Message) { return await fetch( /api/chat, { method: POST, headers: { Content-Type: application/json }, body: JSON.stringify({chat: msg}) } ); }router/index.tsimport { createRouter, createWebHashHistory } from vue-router import HomeView from /views/index.vue import Chat from /views/chat/index.vue; import About from /views/about/index.vue; import Summary from /views/Summary.vue; const router createRouter({ history: createWebHashHistory(), routes: [ // 一级路由 { path: /, component: HomeView, // 二级路由 children: [ { // 重定向到chat path: /, redirect: /chat }, { path: /chat, name: Chat, component: Chat, meta: { title: 对话 } }, { path: /summary, name: Summary, component: Summary, meta: { title: 说明 } } ] }, // 一级路由 { path: /about, name: About, component: About, meta: { title: 关于 } }, ], }) export default router;views/about/index.vuescript setup langts /script template div 公众号:朋蛋、码上小明 版本1.0 /div /templateviews/chat/index.vuescript setup langts /* SSE可使用EventSource对象实现数据的接收和发送 */ // 下面的方法可实现StreamHttp import { ref, onMounted } from vue import { Message } from /apis/chats; import { m } from vue-router/dist/router-CWoNjPRp.mjs; // import { streamChat } from /apis/chats; // 初始化值 let textInput ref(消息) // 返回值 let textOutput ref() // 用户编号 let chatId ref() // 设置SessionID const sessionId session-123; // 中断请求 let controller: AbortController | null null // 发送消息 async function sendMessage(){ console.log(hello) // 如果已有连接先断开防止重复 if (controller ! null) { controller.abort(); } controller new AbortController(); // 设置消息 let msg new Message(); msg.question textInput.value; msg.chat_id chatId.value; console.log(msg) console.log(JSON.stringify({chat1: msg})) // 可以将接口封装起来统一调用 // const res await streamChat(msg) const headers { Content-Type: application/json, Mcp-Session-Id: sessionId }; try { const res await fetch( /api/chat, { method: POST, headers: headers, body: JSON.stringify(msg), signal: controller.signal // 绑定中断信号 } ); if (!res.ok){ throw new Error(网络响应错误); } if(res.body null){ console.log(数据为空) return } const reader res.body.getReader(); const decoder new TextDecoder(utf-8); while (true) { const { done, value } await reader.read(); if (done) break; // 解码 const chunk decoder.decode(value); console.log(收到数据:, chunk); const data JSON.parse(chunk); // 更新数据 textOutput.value textOutput.value data.msg } } catch (error) { // 中断请求 if (error instanceof Error error.name AbortError) { console.log(用户主动中断了请求); } else { console.error(请求出错:, error); } } finally { controller null; } } // 终止消息 function stopMessage(){ console.log(stop) // 拿着编号调用自定义的接口即可中断 if (controller) { controller.abort() } } // 生命周期钩子 onMounted(() { console.log(The initial count is ${textInput.value}.) // sendMessage() }) /script template div classeditor textarea classinput v-modeltextInput/textarea br input placeholder用户编号 v-modelchatId / div classbuttonmsg button clicksendMessage发送/button button clickstopMessage中断/button /div div stylepadding-top: 20px;接收的消息/div div classoutput{{ textOutput }}/div /div /template style scoped .input{ width: 400px; height: 20px; } .buttonmsg button{ margin-left: 50px; } /styleviews/index.vuetemplate div classviews div classnav a href/chart对话/a a href/summary说明/a a href/about关于/a /div router-view/router-view /div /template style scoped .nav a{ padding-left: 50px; } /styleviews/Summary.vuescript setup langts /script template div 基于Stream Http协议实现实时对话 /div /templateApp.vuescript setup langts /script template div idapp router-view/router-view /div /template style #app { font-family: Avenir, Helvetica, Arial, sans-serif; -webkit-font-smoothing: antialiased; -moz-osx-font-smoothing: grayscale; text-align: center; color: #2c3e50; margin-top: 60px; } /style4 FastAPI后端4.1 main.pyimportasyncioimportjsonfromcontextlibimportasynccontextmanagerfromtypingimportDictimportuvicornfromfastapiimportFastAPI,APIRouterfromfastapi.responsesimportStreamingResponsefromfastapi.middleware.corsimportCORSMiddlewarefrompydanticimportBaseModelfromredis_async_confimportinit_asyredis,close_asyredis,asyredisdb# 异步上下文管理器加入到FastAPI中asynccontextmanagerasyncdeflifspan(app:FastAPI):# 一、 启动服务# 1 初始化Redisawaitinit_asyredis()# 等待结束后自动关闭资源yield# 二、关闭资源# 1 关闭连接池和资源会自动关闭连接池等awaitclose_asyredis()# 构建应用appFastAPI()# CORS配置app.add_middleware(CORSMiddleware,# 生产环境可指定具体域名allow_origins[*],allow_credentialsTrue,allow_methods[*],allow_headers[*],)classChatRequest(BaseModel):chat_id:strlast_event_id:int0question:strclassChatResponse(BaseModel):chat_id:strlast_event_id:int0code:int0msg:str# 定义全局变量TEXT_MSG 河南大学创立于1912年始名河南留学欧美预备学校。后历经中州大学、国立开封中山大学、省立河南大学等阶段1942年升格为国立河南大学。1952年院系调整 校本部更名为河南师范学院。后经开封师范学院、河南师范大学等阶段1984年恢复河南大学校名。2000年6月原河南大学、开封医学高等专科学校、开封师范高等专科学校合并组建新的河南大学。2017年9月学校入选首批国家“双一流”建设高校。 # 保存缓存内容chat_history_cache:Dict[str,str]dict()# 保存任务信息background_tasks:Dict[str,asyncio.Task]dict()# 设置路由routerAPIRouter(prefix/api,tags[api])# 1. 定义一个异步生成器模拟“打字”过程发送到Redis中asyncdefbackground_worker(chat:ChatRequest):foriinrange(len(TEXT_MSG)):# 设置数据msg_itemTEXT_MSG[i]awaitasyncio.sleep(0.1)# 模拟耗时操作chat_resChatResponse()chat_res.msgmsg_item# 获取编号chat_res.chat_idchat.chat_id chat_res.last_event_idchat.last_event_id# 转化为字典chat_reschat_res.model_dump()print(chat_res)awaitasyredisdb.xadd(namechat.chat_id,fieldschat_res)# 消息结束chat_resChatResponse()chat_res.code2chat_res.msgchat_reschat_res.model_dump()awaitasyredisdb.xadd(namechat.chat_id,fieldschat_res)asyncdefcreate_consumer_group(stream_name:strchatstream,group_name:strchatgroup):try:awaitasyredisdb.xgroup_create(# 流的名称namestream_name,# 组名groupnamegroup_name,# 设置消费策略# id$表示只消费新来的消息# id0表示消费全部的消息即从头开始消费id$,# 如果流不存在自动创建mkstreamTrue)exceptExceptionase:ifBUSYGROUPinstr(e):print(分组已存在)else:print(e)asyncdefgenerate_stream(chat:ChatRequest):print(chat)# 判断是否已存在如果第一次那就创建任务# 后期可根据自己需求判断任务是否在数据库中存在ifchat.chat_id1:# chat_id创建任务并设置任务编号为“2”为接收数据chat.chat_id2# 后期可根据任务编号进行中断等taskasyncio.create_task(background_worker(chat))print(,task)# 监听Redis服务try:# 设置任务是否结束is_end:boolFalse# 设置消费者名称stream_name:strchat.chat_id group_name:strchatgroupconsumer_name:strworker# 构建Redis中的分组awaitcreate_consumer_group(stream_name,group_name)# 接收消息whileTrue:try:# 设置一对一编号messagesawaitasyredisdb.xreadgroup(# 对话的GroupNamegroupnamegroup_name,# 消费者名称consumernameconsumer_name,# 数据流streams{# 表示只读取尚未被其他消费者读取的数据stream_name:},# 获取10个块count1,# 超时等待5秒block5000,)ifmessages:print(messages)forstream,msgsinmessages:formsg_id,datainmsgs:data_dictdatayieldjson.dumps(data_dict)# 确认消息接收awaitasyredisdb.xack(consumer_name,group_name,chat.chat_id)ifdata_dict.get(code)2:is_endTruebreak# 终止循环ifis_end:breakelse:print(超时)print(messages)break# 终止循环ifis_end:breakexceptExceptionase:print(数据出错,e)# 中断循环breakprint(*****结束****)# 删除Redis中Stream释放空间# await asyredisdb.delete(chat.chat_id)exceptasyncio.CancelledError:print(任务被取消)router.post(/chat)asyncdefchatting(chat:ChatRequest):print()print(chat)print()returnStreamingResponse(generate_stream(chat),# application/x-ndjson返回值是矩阵不方便调试和查看。# media_typeapplication/x-ndjson,media_typeapplication/json)# 添加到路由app.include_router(router)if__name____main__:uvicorn.run(app,host0.0.0.0,port3000)4.2 redis_async_conf.pyimportlogging# 适用于异步编程fromredis.asyncioimportRedis,ConnectionPooldef_init_redis_client()-Redis:# 创建爱你连接池poolConnectionPool().from_url(# urlredis://default:passwordip:port,urlredis://default:123456192.168.108.147:6379,max_connections100,decode_responsesTrue)returnRedis(connection_poolpool)asyncdefinit_asyredis():try:awaitasyredisdb.ping()print(启动连接池成功)logging.warning(启动连接池成功)exceptExceptionase:print(启动连接池失败)logging.error(启动连接池失败)asyncdefclose_asyredis():awaitasyredisdb.aclose()asyredisdb_init_redis_client()5 截图使用方法先在用户编号中输入1触发条件然后输入2点击中断再点击发送可以继续接收剩下的数据。1 开始创建后端任务点击中断后后台还在持续运行不影响数据。后端2 断开后继续接收后端

更多文章