目录
  1. 1. 背景
  2. 2. 版本信息
  3. 3. 工作原理
  4. 4. 实现代码
  5. 5. 数据格式
  6. 6. 注意事项
  7. 7. 参考资料
  8. 8. 简单总结
Canal使用

文章首发于:clawhub.club


背景

存在跨数据库同步数据的需求,对数据有实时同步的要求,采用阿里的开源框架Canal实时采集Mysql的binlog日志,将过滤后的数据统一标准格式发送到kafka中,在服务的消费端进行数据消费入库。

版本信息

Canal 1.1.3
Mysql支持版本 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

工作原理

  • Canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • Canal 解析 binary log 对象(原始为 byte 流)

实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* canal连接
*
* @return
*/
@Bean
public CanalConnector getCanalConnector() {
String canalHost = canalCfg.getHost();
int canalPort = Integer.valueOf(canalCfg.getPort());
String canalDestination = canalCfg.getDestination();
String canalUsername = canalCfg.getUsername();
String canalPassword = canalCfg.getPassword();
canalConnector = CanalConnectors.newClusterConnector(Lists.newArrayList(new InetSocketAddress(canalHost, canalPort)), canalDestination, canalUsername, canalPassword);
try {
canalConnector.connect();
canalConnector.subscribe();
log.info("connect canal server successed, canal client started!")
} catch (Throwable t) {
log.error("failed to connect to canal server", t);
canalConnector.disconnect();
}
return canalConnector;
}

配置文件内配置Canal Server的连接地址,端口号,用户名和密码,将连接注册为Bean,加入Springboot管理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* 消费获取canal实体,成功后提交该批次信息的batchId,删除该条信息,出现异常则通过batchId进行数据回滚
*/
@Override
public void run() {
try {
while (taskRunning) {
Message message = canalConnector.getWithoutAck(batchSize);
long batchId = message.getId();
List<Entry> entries = message.getEntries();
if (batchId != -1 && entries.size() > 0) {
for (Entry entry : entries) {
if (entry.getEntryType() == EntryType.ROWDATA) {
log.info(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName()));
}
}
} else {
Thread.sleep(500);
}
canalConnector.ack(batchId);
}
} catch (CanalClientException e) {
log.error("canal obtain data is fail: {}", e);
} catch (InterruptedException e) {
log.error("当前线程异常:{}", e);
}
}

主线程持续采集binlog日志。

数据格式

数据传输格式:protobuff

Entry.Header.logfileName [binlog文件名]
Entry.Header.logfileOffset [binlog position]
Entry.Header.executeTime [发生的变更]
Entry.Header.schemaName
Entry.Header.tableName
Entry.Header.eventType [insert/update/delete类型]
Entry.entryType [事务头BEGIN/事务尾END/数据ROWDATA]
Entry.storeValue [byte数据,可展开,对应的类型为RowChange]
RowChange.isDdl [是否是ddl变更操作,比如create table/drop table]
RowChange.sql [具体的ddl sql]
RowChange.rowDatas [具体insert/update/delete的变更数据,可为多条,1个binlog event事件可对应多条变更,比如批处理]
RowChange.rowDatas.beforeColumns [Column类型的数组]
RowChange.rowDatasafterColumns [Column类型的数组]
Column.index
Column.sqlType [jdbc type]
Column.name [column name]
Column.isKey [是否为主键]
Column.updated [是否发生过变更]
Column.isNull [值是否为null]
Column.value [具体的内容,注意为文本]

注意事项

1.canal的配置文件连接数据库时要连接主库地址。
2.查询show VARIABLES like “log_bin” Mysql的binlog日志是否开启,设置为开启状态。
3.在canal服务端启动后会生成meta.dat文件,在conf/example的目录下,将meta.dat删除,重新启动可重置canal服务的偏移量。
4.canal在配置时将auto.scan配置为false,在conf的canal.properties中修改配置,canal/conf/example目录下存在实例配置instance.properties,可根据实际情况进行配置。

参考资料

https://github.com/alibaba/canal

简单总结

由于也是第一次使用canal,踩过的坑也较多,所幸能够一一解决。再此发现最好的解决方式是在github的canal评论中,所遇到的问题大多数人也遇到过。

文章作者: ClawHub
文章链接: https://www.clawhub.club/posts/2019/09/02/Canal%E6%95%B0%E6%8D%AE%E9%87%87%E9%9B%86/Canal%E6%95%B0%E6%8D%AE%E9%87%87%E9%9B%86%E6%A1%86%E6%9E%B6%E7%9A%84%E4%BD%BF%E7%94%A8%E4%B8%8E%E9%94%99%E8%AF%AF%E4%BF%A1%E6%81%AF%E6%80%BB%E7%BB%93/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 ClawHub的博客
打赏
  • 微信
  • 支付宝
扫一扫关注ClawHub公众号,专注Java、技术分享、面试资源。

评论