拆解 OpenHands(4)--- 服务

张开发
2026/4/16 4:25:23 15 分钟阅读

分享文章

拆解 OpenHands(4)--- 服务
x00 概述本篇结合官方文档进行解读OpenHands的服务器这是OpenHands系统的立身基础。因为本系列借鉴的文章过多可能在参考文献中有遗漏的文章如果有还请大家指出。0x01 服务OpenHands提供了WebSocket服务器。1.1 API 模式可以发送或从服务器接收两种类型的消息ActionsObservations1.1.1 Actions一个action 包含三个部分action要采取的动作args动作的参数message可以放在聊天记录中的友好消息有几种action 。它们的参数列在下面。 随着时间的推移这个列表可能会增长。initialize- 初始化代理。仅由客户端发送。model- 要使用的模型名称directory- 工作空间的路径agent_cls- 要使用的代理类start- 开始一个新的开发任务。仅由客户端发送。task- 要开始的任务read- 读取文件内容。path- 要读取的文件路径write- 写入内容到文件。path- 要写入的文件路径content- 写入文件的内容run- 运行命令。command- 要运行的命令browse- 打开网页。url- 要打开的URLthink- 允许代理制定计划、设定目标或记录想法thought- 要记录的想法finish- 代理发出任务完成的信号1.1.2 observation一个observation 包含四个部分observation观察类型content表示观察数据的字符串extras额外的结构化数据message可以放在聊天记录中的友好消息有几种observation 。它们的额外信息列在下面。 随着时间的推移这个列表可能会增长。read- 文件内容path- 读取的文件路径browse- URL的HTML内容url- 打开的URLrun- 命令的输出command- 运行的命令exit_code- 命令的退出代码chat- 用户的消息1.2 服务器组件以下部分描述了OpenHands项目的服务器端组件。session.pysession.py文件定义了Session类它代表与客户端的WebSocket会话。关键特性包括处理WebSocket连接和断开初始化和管理代理会话在客户端和代理之间分发事件向客户端发送消息和错误session/agent_session.pyagent_session.py文件包含AgentSession类它管理会话内代理的生命周期。关键特性包括创建和管理运行时环境初始化代理控制器处理安全分析管理事件流session/conversation_manager/conversation_manager.pyconversation_manager.py文件定义了ConversationManager类它负责管理多个客户端会话。关键特性包括添加和重启会话向特定会话发送消息清理非活动会话listen.pylisten.py文件是主服务器文件它设置FastAPI应用程序并定义各种API端点。关键特性包括设置CORS中间件处理WebSocket连接管理文件上传提供代理交互、文件操作和安全分析的API端点为前端提供静态文件服务该脚本定义了服务接口主要分为两个部分一部分是通过FastAPI库实现的HTTP接口其具体实现位于openhands/server/routes目录中另一部分是利用socketio库实现的WebSocket接口其代码实现在openhands/server/listen_socket.py文件中。用户与代理的交互通过WebSocket进行连接初始化时会触发connect事件用户发送消息时会触发oh_user_action事件连接断开时会触发disconnect事件。因此梳理代理交互逻辑的核心在于对这三个事件的处理流程进行整理。import socketio from openhands.server.app import app as base_app from openhands.server.listen_socket import sio from openhands.server.middleware import ( CacheControlMiddleware, InMemoryRateLimiter, LocalhostCORSMiddleware, RateLimitMiddleware, ) from openhands.server.static import SPAStaticFiles if os.getenv(SERVE_FRONTEND, true).lower() true: base_app.mount( /, SPAStaticFiles(directory./frontend/build, htmlTrue), namedist ) base_app.add_middleware(LocalhostCORSMiddleware) base_app.add_middleware(CacheControlMiddleware) base_app.add_middleware( RateLimitMiddleware, rate_limiterInMemoryRateLimiter(requests10, seconds1), ) app socketio.ASGIApp(sio, other_asgi_appbase_app)1.3 服务工作流程描述服务的工作流程如下服务器初始化FastAPI应用程序在listen.py中创建和配置。设置CORS中间件和静态文件服务。初始化ConversationManager。客户端连接当客户端通过WebSocket连接时创建新的Session或重启现有一个。Session初始化AgentSession设置运行时环境和代理控制器。代理初始化客户端发送初始化请求。服务器根据提供的参数创建和配置代理。设置运行时环境初始化代理控制器。事件处理Session管理客户端和代理之间的事件流。客户端的事件分发到代理。代理的观察结果发送回客户端。文件操作服务器处理文件上传确保它们符合大小和类型限制。通过运行时环境执行文件读取和写入操作。安全分析如果配置了每个会话初始化安全分析器。安全相关的API请求转发到安全分析器。会话管理ConversationManager定期清理非活动会话。它还在需要时处理向特定会话发送消息。API端点提供各种API端点用于代理交互、文件操作和获取配置默认值。这种服务器架构允许管理多个客户端会话每个会话都有自己的代理实例、运行时环境和安全分析器。事件驱动设计促进了客户端和代理之间的实时通信而模块化结构允许轻松扩展和维护不同组件。1.4 listen_socket.pylisten_socket.py是 OpenHands 服务器端的 Socket.IO 事件监听器负责处理客户端和服务器之间的实时双向通信包括连接建立、事件回放、用户行动转发和连接断开四大核心场景是客户端与后端会话、代理系统交互的桥梁。1.4.1 核心特色listen_socket.py的核心特色如下断点续传的事件回放支持通过latest_event_id参数实现事件断点续传客户端重连时仅回放未接收的事件避免重复数据传输提升连接效率。严格的身份与权限校验连接建立时校验会话 ID、API 密钥、用户身份通过 Cookie 和 Authorization 头确保会话安全性防止未授权访问。向后兼容的事件处理保留oh_action处理器兼容旧版客户端同时提供oh_user_action新版接口平滑过渡不中断服务。有序的事件推送逻辑代理状态变更事件AgentStateChangedObservation最后发送确保客户端先接收历史事件再同步最新状态避免状态不一致。异步高效的事件处理基于异步 IOasync/await实现事件回放和转发支持高并发连接不阻塞主线程提升系统吞吐量。完善的错误处理连接失败时主动断开无效连接记录详细日志便于问题排查过滤无效事件如NullAction减少不必要的网络传输。1.4.2 具体功能listen_socket.py的具体功能如下连接管理connect 事件身份验证验证连接参数中的 conversation_id 和 API 密钥用户认证通过 conversation_validator 验证用户身份会话恢复为已存在的会话重放事件流历史事件重播向新连接的客户端发送历史事件包括过滤特定事件类型会话加入将客户端连接加入到对应的会话中动作处理oh_user_action 和 oh_action 事件用户动作接收处理来自客户端的用户操作请求事件转发将用户动作转发到会话管理器进行处理向后兼容同时支持 oh_user_action 和 oh_action 事件处理后者为兼容旧客户端保留断开连接处理disconnect 事件连接清理当客户端断开连接时清理相关会话资源状态管理通知会话管理器客户端已断开连接listen_socket.py的核心工作流程为连接建立解析查询参数会话 ID、最新事件 ID等验证会话和用户身份创建事件存储实例事件历史重播为客户端重放会话历史事件过滤掉 NullAction、NullObservation、RecallAction 等特定事件确保 AgentStateChangedObservation 事件最后发送会话加入将连接 ID 与会话关联初始化会话设置安全机制API密钥验证检查 SESSION_API_KEY 环境变量与查询参数中的密钥是否匹配会话权限控制通过 conversation_validator 验证用户是否有权访问定会话错误处理连接拒绝在验证失败或出现错误时拒绝连接异常传播使用ConnectionRefusedError处理连接错误异步清理在连接被拒绝后异步断开连接listen_socket.py 与其他组件关系与EventStream紧密配合负责事件的传输和分发通过 connection_manager 管理会话状态使用 event_to_dict 进行事件序列化以便通过网络传输1.4.3 流程图1.4.4 会话连接此处关键一步为与会话管理器 ConversationManager 建立连接。conversation_init_data await setup_init_conversation_settings( user_id, conversation_id, providers_set ) agent_loop_info await conversation_manager.join_conversation( conversation_id, connection_id, conversation_init_data, user_id, )1.4.5 代码listen_socket.py的代码举例如下sio.event async def connect(connection_id: str, environ: dict) - None: SocketIO连接事件处理器客户端建立连接时触发完成会话验证、事件回放、会话加入等初始化流程。 参数 connection_id: 客户端连接唯一标识SocketIO分配 environ: WSGI环境变量字典包含请求头、查询参数等信息 try: logger.info(fSocketIO连接建立connection_id{connection_id}) # 解析查询参数从WSGI环境变量中提取QUERY_STRING query_params parse_qs(environ.get(QUERY_STRING, )) # 解析最新事件ID用于断点续传默认-1表示从最开始回放 latest_event_id_str query_params.get(latest_event_id, [-1])[0] try: latest_event_id int(latest_event_id_str) except ValueError: logger.debug(f无效的latest_event_id值{latest_event_id_str}默认设为-1) latest_event_id -1 # 解析会话ID必需参数用于关联特定对话 conversation_id query_params.get(conversation_id, [None])[0] logger.info(f会话连接请求conversation_id{conversation_id}, connection_id{connection_id}) # 解析提供者集合如支持的LLM提供商列表用于限制可用资源 raw_list query_params.get(providers_set, []) providers_list [] for item in raw_list: # 拆分逗号分隔的提供者名称过滤空值 providers_list.extend(item.split(,) if isinstance(item, str) else []) providers_list [p for p in providers_list if p] providers_set [ProviderType(p) for p in providers_list] # 转换为ProviderType枚举类型 # 校验会话ID是否存在 if not conversation_id: logger.error(查询参数中缺少conversation_id) raise ConnectionRefusedError(缺少会话IDconversation_id) # 校验会话API密钥是否有效 if _invalid_session_api_key(query_params): raise ConnectionRefusedError(无效的会话API密钥) # 提取请求中的Cookie和Authorization头用于用户身份验证 cookies_str environ.get(HTTP_COOKIE, ) # WSGI环境中HTTP头会转为HTTP_前缀下划线替换短横线格式 authorization_header environ.get(HTTP_AUTHORIZATION, None) # 创建会话验证器校验用户身份关联会话ID、Cookie、授权头 conversation_validator create_conversation_validator() user_id await conversation_validator.validate( conversation_id, cookies_str, authorization_header ) # 创建事件存储实例用于读取会话历史事件 try: event_store EventStore( conversation_id, conversation_manager.file_store, user_id ) except FileNotFoundError as e: logger.error(f创建会话事件存储失败conversation_id{conversation_id}, 错误{e}) raise ConnectionRefusedError(f无法访问会话事件{e}) agent_state_changed None # 存储代理状态变更事件最后单独发送 # 创建异步事件存储包装器从latest_event_id1开始回放事件避免重复 async_store AsyncEventStoreWrapper(event_store, latest_event_id 1) # 异步回放历史事件向客户端推送未接收过的事件 async for event in async_store: logger.debug(f回放事件{event.__class__.__name__}) # 跳过无效/召回类事件无需推送给客户端 if isinstance( event, (NullAction, NullObservation, RecallAction), ): continue # 暂存代理状态变更事件最后发送确保客户端状态同步 elif isinstance(event, AgentStateChangedObservation): agent_state_changed event # 其他事件直接推送给客户端 else: await sio.emit(oh_event, event_to_dict(event), toconnection_id) # 最后发送代理状态变更事件确保客户端获取最新状态 if agent_state_changed: await sio.emit( oh_event, event_to_dict(agent_state_changed), toconnection_id ) logger.info(f会话事件回放完成conversation_id{conversation_id}) # 初始化会话设置用户偏好、提供者配置等 conversation_init_data await setup_init_conversation_settings( user_id, conversation_id, providers_set ) # 加入会话关联connection_id与会话启动代理循环 agent_loop_info await conversation_manager.join_conversation( conversation_id, connection_id, conversation_init_data, user_id, ) # 校验会话加入结果 if agent_loop_info is None: raise ConnectionRefusedError(加入会话失败) logger.info(f会话加入成功conversation_id{conversation_id}, connection_id{connection_id}) except ConnectionRefusedError: # 发送错误后断开无效连接 asyncio.create_task(sio.disconnect(connection_id)) raise sio.event async def oh_user_action(connection_id: str, data: dict[str, Any]) - None: 处理客户端发送的用户行动事件如用户输入、操作指令。 参数 connection_id: 客户端连接ID data: 用户行动数据字典格式包含行动类型、内容等 # 将用户行动转发到事件流由会话管理器处理 await conversation_manager.send_to_event_stream(connection_id, data) sio.event async def oh_action(connection_id: str, data: dict[str, Any]) - None: 兼容旧版客户端的行动事件处理器保留用于向后兼容。 注意待所有客户端升级为使用oh_user_action后可移除该处理器 目前用于支持正在进行中的旧会话避免中断服务 await conversation_manager.send_to_event_stream(connection_id, data) sio.event async def disconnect(connection_id: str) - None: SocketIO断开连接事件处理器客户端断开连接时触发。 参数 connection_id: 断开连接的客户端ID logger.info(fSocketIO连接断开connection_id{connection_id}) # 通知会话管理器断开该连接与会话的关联 await conversation_manager.disconnect_from_session(connection_id)

更多文章