基于open62541实现的OpcUA订阅通信

张开发
2026/4/20 17:04:51 15 分钟阅读

分享文章

基于open62541实现的OpcUA订阅通信
为什么需要OpcUA订阅通信在工业物联网IIoT场景中OPC UAOpen Platform Communications Unified Architecture已成为工业设备通信的事实标准。它解决了传统OPCOLE for Process Control的跨平台、安全性和扩展性问题。核心需求设备数据实时监控 → 需要高效的数据订阅机制而非轮询Polling避免网络和服务器负载过高。OpcListener类实现了以下核心功能自动连接/重连OPC UA服务器订阅管理创建/删除订阅节点监视项管理添加/删除监视项数据变化回调处理断线自动重连机制架构设计核心成员变量UA_Client*m_client;// OPC UA客户端实例std::string m_endpointUrl;// 服务器端点URLboolm_runningfalse;// 服务运行状态boolm_connectedfalse;// 连接状态UA_UInt32 m_subscriptionId0;// 当前订阅ID// 重连配置intm_reconnectIntervalMs1000;intm_maxReconnectAttemptsContinueRetry;intm_currentReconnectAttempts0;// 监视项存储std::unordered_mapUA_UInt32,OpcListenerItemm_listenerItems;std::mutex m_itemMutex;// 保护监视项的互斥锁关键数据结构structOpcListenerItem{UA_Variant value;// 当前值UA_NodeId nodeId;// 节点IDOpcListenerHandler handler;// 回调函数UA_UInt32 monitoredItemId;// 监视项IDstd::string nodeName;// 节点名称};核心功能实现解析客户端初始化与连接OpcListener::OpcListener(){m_clientUA_Client_new();UA_ClientConfig*configUA_Client_getConfig(m_client);config-timeoutDefaultTimeout;UA_ClientConfig_setDefault(config);// 设置默认配置}连接与重连机制核心逻辑状态机驱动connect()internalConnect()成功disconnect() or network failurenetwork failurereconnect成功reconnect失败达到最大重试次数DISCONNECTEDCONNECTINGCONNECTEDRECONNECTINGSTOP关键代码解析boolOpcListener::internalConnect(){// 1. 连接服务器UA_StatusCode retvalUA_Client_connect(m_client,(char*)m_endpointUrl.c_str());// 2. 创建订阅核心UA_CreateSubscriptionRequest requestUA_CreateSubscriptionRequest_default();request.requestedPublishingIntervalm_intervalMs;// 采样间隔request.requestedLifetimeCount1000;// 订阅生命周期1000次心跳// ...其他参数UA_CreateSubscriptionResponse responseUA_Client_Subscriptions_create(m_client,request,...);m_subscriptionIdresponse.subscriptionId;// 3. 重新创建之前订阅的监视项for(autoitem:m_listenerItems){UA_MonitoredItemCreateResult resultUA_Client_MonitoredItems_createDataChange(...);// 重置监控项IDitem.second.monitoredItemIdresult.monitoredItemId;}}订阅SubscriptionOPC UA服务器端的核心机制客户端通过订阅获取数据变更而非轮询。requestedPublishingInterval设置数据发布间隔毫秒与m_intervalMs一致。requestedLifetimeCount订阅的“心跳”次数服务器每1000次心跳检查一次订阅存活。自动恢复internalConnect() 会重新创建所有监视项确保重连后监听不丢失。监听管理器addListener添加监听器流程boolOpcListener::addListener(constUA_NodeIdnodeId,OpcListenerHandler handler){// 1. 构建监视项请求UA_MonitoredItemCreateRequest itemRequestUA_MonitoredItemCreateRequest_default(nodeId);itemRequest.requestedParameters.samplingIntervalm_intervalMs;itemRequest.requestedParameters.discardOldesttrue;// 丢弃旧数据// 2. 创建监视项重试机制for(inti0;iMaxRetryCount;i){UA_MonitoredItemCreateResult resultUA_Client_MonitoredItems_createDataChange(m_client,m_subscriptionId,UA_TIMESTAMPSTORETURN_BOTH,itemRequest,this,DataChangeCallback,nullptr);if(result.statusCodeUA_STATUSCODE_GOOD){// 3. 保存监视项信息OpcListenerItem item{variant,nodeId,handler,result.monitoredItemId,...};m_listenerItems.insert({result.monitoredItemId,item});returntrue;}}}discardOldesttrue当队列满时丢弃旧数据避免内存溢出工业场景常见。重试机制最多MaxRetryCount次重试避免瞬时网络抖动导致失败。线程安全m_itemMutex 保护m_listenerItems防止多线程并发操作。数据回调处理DataChangeCallback 线程池 回调函数核心逻辑voidOpcListener::DataChangeCallback(...){// 1. 从m_listenerItems获取handlerstd::lock_guardlock(instance-m_itemMutex);handlerinstance-m_listenerItems[monId].handler;// 2. 复制数据避免直接操作OPC UA内部数据UA_Variant_copy(value-value,instance-m_listenerItems[monId].value);// 3. 提交到线程池执行避免阻塞OPC UA事件循环gT.push([](){handler(nodeId,value);// 用户自定义处理});}为什么需要线程池如果用户处理耗时如写数据库、AI分析会阻塞OPC UA网络事件处理。断开连接与资源清理disconnect断开连接并清理资源voidOpcListener::disconnect(){m_connectedfalse;if(m_subscriptionId!0){UA_Client_Subscriptions_deleteSingle(m_client,m_subscriptionId);// 删除订阅m_subscriptionId0;}UA_Client_disconnect(m_client);// 断开连接// 清理监听项for(autoitem:m_listenerItems){UA_Variant_clear(item.second.value);UA_NodeId_clear(item.second.nodeId);}m_listenerItems.clear();}顺序清理先删除订阅 → 再断开连接 → 最后清理监听项。订阅通信的核心优势传统轮询方式订阅价值服务器负载高每秒请求N次服务器负载低仅变更时推送 降低服务器CPU数据延迟高数据延迟低满足实时控制需求网络带宽浪费大量重复请求带宽利用率高仅发送变更数据减少网络流量OPC UA订阅原理客户端创建订阅 → 服务器分配订阅ID客户端添加监视项 → 服务器注册节点监控数据变更时 → 服务器通过订阅ID推送数据客户端处理数据 → 用户逻辑执行使用示例#includesrc/OpcListener.hppclassTestHandler{public:voidhandler(constUA_NodeIdnodeId,constUA_Variantvalue){std::coutstd::string((char*)nodeId.identifier.string.data,nodeId.identifier.string.length) value changed.std::endl;if(value.typeUA_TYPES[UA_TYPES_INT16]){std::coutNew value: *(UA_Int16*)value.datastd::endl;}else{std::coutError: Unknown type.std::endl;}}};intmain(){OpcListener listener;listener.setIntervalMs(1000);listener.setReconnectParameters(1000,5);listener.connect(opc.tcp://localhost:4840);TestHandler testHandler;listener.addListener(4,test,testHandler,TestHandler::handler);while(listener.isRunning()){std::this_thread::sleep_for(std::chrono::seconds(1));}return0;}源码地址https://gitcode.com/IT_Grey_Cat/OpcListener

更多文章