微软生态圈相对比较封闭 ,大数到SQLServer数据库作为微软系产品在互联网企业应用较少 。据实教程但在传统的时捕数据生产制造企业中大量的传统ERP软件以SQLServer作为数据存储载体。网上对于SQLServer的大数到数据实时CDC案例较少 。本篇文章将介绍如何通过Debezium+kafka-connnect将SQLServer实时捕获到kafka消息队列上。据实教程
SQLServer为实时更新数据同步提供了CDC机制,时捕数据类似于Mysql的大数到binlog ,将数据更新操作维护到一张CDC表中。据实教程开启cdc的时捕数据源表在插入INSERT、更新UPDATE和删除DELETE活动时会插入数据到日志表中 。大数到cdc通过捕获进程将变更数据捕获到变更表中 ,据实教程通过cdc提供的时捕数据查询函数 ,可以捕获这部分数据。大数到下图为SQLServer实现CDC机制 :
图片源于微软官网
SQLServer 2012版本
统信UOS或其它linux发行版
DBeaver或其它数据可视化工具
本文默认SQLServer 2012版已正常安装。
2.1 创建测试数据库
在可视化工具里执行如下命令创建测试数据库
create database testdb;
2.2 开启数据库CDC功能
use testdb;nEXEC sys.sp_cdc_enable_db;
开启数据库CDC功能
如果想关闭CDC功能可使用如下命令 :
use testdb;nexec sys.sp_cdc_disable_db;
查询CDC功能是否开启
select * from sys.databases where is_cdc_enabled = 1;
数据库CDC功能已开启
执行命令后 ,刚刚新建的testdb数据库出现在结果集里,就表示CDC功能开启成功 。这时在testdb数据下会自动新建一个名为cdc用户,里面会有一些CDC的功能表 。
自动创建cdc用户
2.3 新建监听测试表
SQLServer CDC监听的目标表一定要存在ID主键才能生效,没有主键则无法正常监听该表。
# 创建测试表nuse testdb;nCREATE TABLE [dbo].[sales] ( [amt] int NULL,n[id] int NOT NULL,n[dept] varchar(100) COLLATE Chinese_PRC_CI_AS NULL,nCONSTRAINT [PK__sales] PRIMARY KEY CLUSTERED ([id]) WITH (PAD_INDEX = OFF,nSTATISTICS_NORECOMPUTE = OFF,nIGNORE_DUP_KEY = OFF,nALLOW_ROW_LOCKS = ON,nALLOW_PAGE_LOCKS = ONn) ONn[PRIMARY] ) ONn[PRIMARY];nnALTER TABLE [dbo].[sales] SET(LOCK_ESCALATION = AUTO);nn# 插入测试数据ninsert into sales(id,dept,amt) values(1,'一部',12.3);ninsert into sales(id,dept,amt) values(2,'二部',191);
2.4 开启测试表CDC功能
exec sp_cdc_enable_table @source_schema='dbo', @source_name='sales', @role_name=null, @supports_net_changes = 1;
2.5 验证测试表CDC功能开启
EXEC sys.sp_cdc_help_change_data_capture
表CDC功能已开启
当刚刚创建的测试表sales进入查询结果集里,则表示此表的CDC监听功能已正常开启 。
2.6 确认sqlserver agent已开启
虽然上面数据库和表的监听功能都已开启 ,但是SQLServer依赖于agent代理 。所以要确认代理已正常开启。
EXEC master.dbo.xp_servicecontrol N'QUERYSTATE',N'SQLSERVERAGENT'
代理正常
由于作者在前次教程中已详细演示kafka和debezium的安装方法,这里就不演示。接下来就略过截图而把命令行和步骤演示出来 。有需要详细演示安装的同学可以前往作者前一篇文章阅读安装和使用方法 :大数据(一)-实时捕获mysql变化数据到kafka
# 拉取zookeeper镜像nsudo docker pull wurstmeister/zookeeper:latestn# 拉取kakfa镜像nsudo docker pull wurstmeister/kafka:2.13-2.7.0n# 拉取debezium/connectnsudo docker pull debezium/connectnn# 启动zookeepernsudo docker run n-itd n-p 2181:2181 # zookeeper端口映射n--name zookeeper n-e TZ="Asia/Shanghai" nwurstmeister/zookeepernn# 启动kafkansudo docker run n-p 9092:9092 # kafka端口映射n--name kafka1 n-itd n-e KAFKA_BROKER_ID=1 n-e KAFKA_ZOOKEEPER_CONNECT=本机IP:2181 n-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://本机IP:9092 n-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 n-e KAFKA_NUM_PARTITIONS=1 tttttttttttttt #分区数 n-e KAFKA_DEFAULT_REPLICATION_FACTOR=1 # 副本数n-e KAFKA_LOG_RETENTION_HOURS=168 #kafka数据保留时长n-e TZ="Asia/Shanghai" nwurstmeister/kafka:2.13-2.7.0nn# 启动监听工具nsudo docker run n-itd n--name kafka-connect n-p 8083:8083 n-e GROUP_ID=1 n-e CONFIG_STORAGE_TOPIC=my_connect_configs n-e OFFSET_STORAGE_TOPIC=my_connect_offsets n-e STATUS_STORAGE_TOPIC=my_connect_statuses n-e TZ="Asia/Shanghai" n--link zookeeper:zookeeper ttttttttttttttttttttttttt#与zookeeper通信n--link kafka1:kafka1 #与容器kafka1通信ndebezium/connect
工具均已正常运行
4.1 新建监听配置文件
在宿主机新建一个名为sqlserver-cdc-source.json的文件,输入下列配置保存。配置项详细说明可参考Debezium官网 。
新建监听配置文件
{ "name": "sqlserver-cdc-source",n "config": { n # 驱动n "connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector",n # 进程数n "tasks.max" : "1",n # 数据库名n "database.server.name" : "testdb",n # 数据库地址n "database.hostname" : "192.168.31.5",n "database.port" : "1433",n # 数据库账号n "database.user" : "sa",n # 数据库密码n "database.password" : "123456",n "database.dbname" : "testdb",n # 监听表白名单n "table.whitelist":"dbo.sales",n "schemas.enable" : "false",n "mode":"incrementing",n "incrementing.column.name":"id",n # kafka地址n "database.history.kafka.bootstrap.servers" : "192.168.31.5:9092",n # 生成topic名称n "database.history.kafka.topic": "testdb.dbo.sales",n "value.converter.schemas.enable":"false",n "value.converter":"org.apache.kafka.connect.json.JsonConverter"n }n}
4.2 启动kafka-connector监听
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @sqlserver-cdc-source.json
通过kafka的查询命令可以看到在kafka里出现了监听表产生的topic,监听成功。接来就可以做数据上的实时消费。
sudo docker exec -it kafka1 /opt/kafka_2.13-2.7.0/bin/kafka-topics.sh --list --zookeeper 192.168.31.5:2181
监听表对应的topic
此次监听SQLServer数据库遇到一个深坑 ,kafka和debezium镜像里openjdk version 为"11.0.13"。在11版本下启动kafka-connector监听会报如下错误:
错误信息
解决方案是在容器里将jdk部分配置注释掉,路径为:/etc/java/java-11-openjdk/java-11-openjdk-11.0.13.0.8-2.fc34.x86_64/conf/security/java.security
注释这段代码
大数据(一)-实时捕获mysql变化数据到kafka
码字不易 ,请评论关注点赞并收藏 ,谢谢。
(作者:新闻中心)