大数据-266 实时数仓-Canal + Kafka 实现 MySQL 数据库变更实时捕获

张开发
2026/4/14 17:38:29 15 分钟阅读

分享文章

大数据-266 实时数仓-Canal + Kafka 实现 MySQL 数据库变更实时捕获
TL;DR场景需要将 MySQL 数据库的变更实时同步到 Kafka实现异构系统间的数据一致性结论通过 Canal 监听 MySQL binlog将变更数据解析为 JSON 后推送到 Kafka构建毫秒级的 CDC 数据管道产出完整的 Canal Kafka 集成配置与 INSERT/UPDATE/DELETE 操作示例版本矩阵功能状态说明Canal 监听 MySQL binlog✅ 已验证模拟主从复制机制毫秒级捕获Canal 解析 binlog 为 JSON✅ 已验证支持 INSERT/UPDATE/DELETEKafka 生产者发送消息✅ 已验证JSON 格式扁平化传输Kafka 消费者接收消息✅ 已验证kafka-console-consumer 验证集群部署⚠️ 待验证分布式架构支持高可用下游系统分发⚠️ 待验证ES/Redis/Hadoop 等Canal 简介Canal 是阿里巴巴开源的 MySQL binlog 增量订阅与消费平台。它模拟 MySQL 的主从复制机制通过解析 MySQL 的二进制日志binlog实现数据库变更的数据捕获CDC, Change Data Capture。数据同步支持将数据库的变更数据同步到其他数据源或消息系统如 Kafka、RocketMQ、Elasticsearch 等。实时性基于 binlog 的解析和订阅能够实现毫秒级的数据变更捕获。分布式架构支持集群部署满足高可用性和高吞吐量需求。多种数据支持除 MySQL 外还支持 MariaDB 和部分兼容 MySQL 协议的数据库。Kafka 简介Kafka 是一个分布式消息系统用于高吞吐量、低延迟的数据流处理。它支持消息持久化、订阅消费、流处理等功能常用于日志采集、事件流处理、大数据分析和消息队列场景。Producer生产者负责将数据写入 Kafka。Consumer消费者从 Kafka 中读取数据。BrokerKafka 集群中的服务器负责存储和分发消息。Topic消息的分类存储单元。Partition将数据分区存储以实现并行处理和负载均衡。Canal 与 Kafka 的集成原理Canal 和 Kafka 通常配合使用用于构建高效的数据同步管道实现数据库变更到消息队列的实时推送。流程如下数据源捕获Canal 监听 MySQL 的 binlog 数据变更事件。数据解析Canal 将 binlog 数据解析为 JSON 格式或其他结构化数据。消息推送Canal 将解析后的数据发送到 Kafka 的指定 Topic 中。消息消费与处理Kafka Consumer 消费数据并进一步分发给其他服务或存储如 Hadoop、Elasticsearch、Redis 等。使用场景数据同步与分发实现多种异构系统之间的数据一致性。比如将 MySQL 数据变更同步到 Elasticsearch实现实时搜索引擎。日志分析与监控将数据库操作事件推送到 Kafka供日志分析或实时监控系统使用。实时数据流处理数据经过 Kafka 进入 Flink、Spark Streaming 等流处理框架满足复杂的数据处理需求。缓存刷新数据库更新后推送变更消息到 Kafka再由消费者更新 Redis 缓存提高一致性和访问性能。注意事项与优化数据一致性保障确保 binlog 与业务日志一致以避免遗漏或重复消费。分区与负载均衡使用 Kafka 的分区机制分配不同的表或业务流量提升并行消费能力。消息格式优化选择扁平化 JSON 格式传输数据便于消费端解析处理。容错与恢复机制在 Canal 和 Kafka 之间配置重试机制避免临时网络故障导致数据丢失。安全性配置 Canal 访问 MySQL 的最小权限账户只授予 REPLICATION SLAVE 和 REPLICATION CLIENT 权限。环境要求MySQLCanal 采集 binlog在 Kafka 做验证新建主题# 新建主题kafka-topics.sh--zookeeperh123.wzk.icu:2181--create--replication-factor3--partitions1--topicdwshow查看主题# 查看主题kafka-topics.sh--zookeeperh123.wzk.icu:2181--list执行结果如下图所示启动生产者# 启动生产者kafka-console-producer.sh --broker-list h123.wzk.icu:9092--topicdwshow启动消费者# 启动消费者kafka-console-consumer.sh --bootstrap-server h123.wzk.icu:9092--topicdwshow --from-beginning操作数据此时 MySQL 数据表若有变化会将 row 类型的 log 写进 Kafka具体格式为 JSON。INSERT 操作{data:[{id:6,payMethod:meituan,payName:美团支付,description:美团支付,payOrder:0,online:-1}],database:dwshow,es:1604461572000,id:6,isDdl:false,mysqlType:{id:int(11),payMethod:varchar(20),payName:varchar(255),description:varchar(255),payOrder:int(11),online:tinyint(4)},old:null,pkNames:null,sql:,sqlType:{id:4,payMethod:12,payName:12,description:12,payOrder:4,online:-6},table:wzk_payments,ts:1604461572297,type:INSERT}UPDATE 操作{data:[{productId:115908,productName:索尼 xxx10,shopId:100365,price:300.0,isSale:1,status:0,categoryId:10395,createTime:2020-07-12 13:22:22,modifyTime:2020-09-27 02:51:16}],database:dwshow,es:1601189476000,id:456,isDdl:false,mysqlType:{productId:bigint(11),productName:varchar(200),shopId:bigint(11),price:decimal(11,2),isSale:tinyint(4),status:tinyint(4),categoryId:int(11),createTime:varchar(25),modifyTime:datetime},old:[{price:597.80,modifyTime:2020-07-12 13:22:22}],pkNames:null,sql:,sqlType:{productId:-5,productName:12,shopId:-5,price:3,isSale:-6,status:-6,categoryId:4,createTime:12,modifyTime:93},table:wzk_product_info,ts:1601189477116,type:UPDATE}DELETE 操作{data:[{productId:115908,productName:索尼 xxx10,shopId:100365,price:300.0,isSale:1,status:0,categoryId:10395,createTime:2020-07-12 13:22:22,modifyTime:2020-09-27 02:51:16}],database:dwshow,es:1601189576000,id:457,isDdl:false,mysqlType:{productId:bigint(11),productName:varchar(200),shopId:bigint(11),price:decimal(11,2),isSale:tinyint(4),status:tinyint(4),categoryId:int(11),createTime:varchar(25),modifyTime:datetime},old:null,pkNames:null,sql:,sqlType:{productId:-5,productName:12,shopId:-5,price:3,isSale:-6,status:-6,categoryId:4,createTime:12,modifyTime:93},table:wzk_product_info,ts:1601189576594,type:DELETE}上面的 JSON 格式解释如下data最新的数据为json数组。如果是插入则表示最新插入的数据如果是更新则表示更新后的最新数据如果是删除则表示被删除的数据database数据库名称es事件时间13位的时间戳id事件操作的序列号1,2,3…isDdl是否是DDL操作mysqlType字段类型old旧数据pkNames主键名称sqlSQL语句sqlType是经过canal转换处理的比如unsigned int会被转化为Longunsigned long会被转换为BigDecimaltable表名ts日志时间type操作类型比如DELETEUPDATEINSERT当我们操作数据库的时候可以看到Kafka 中写入了大量的数据这里我是消费者在监听错误速查卡症状根因定位修复Kafka 中无消息写入Canal 未正确连接 MySQL 或未开启 binlog检查 Canal 日志确认 binlog 解析状态确保 MySQL 开启 binlog 且 Canal 有 REPLICATION SLAVE 权限消息格式为 null 或乱码binlog 格式为 STATEMENT 而非 ROW检查 MySQL binlog_format 参数设置 binlog_formatROW消费者无法消费消息Topic 不存在或分区分配错误kafka-topics.sh --list 确认 Topic创建 Topic 并检查分区数量数据重复或丢失未配置 Canal 的消费位点或 Kafka 偏移量查看 Kafka Consumer offset配置合理的 offset 提交策略UPDATE 操作 old 字段为空MySQL binlog_row_image 配置为 FULL检查 binlog_row_image 参数设置 binlog_row_imageFULL

更多文章