Kevin's blog Kevin's blog
首页
  • AI基础
  • RAG技术
  • 提示词工程
  • Wireshark抓包
  • 常见问题
  • 数据库
  • 代码技巧
  • 浏览器
  • 手册教程
  • 技术应用
  • 流程规范
  • github技巧
  • git笔记
  • vpn笔记
  • 知识概念
  • 学习笔记
  • 环境搭建
  • linux&运维
  • 微服务
  • 经验技巧
  • 实用手册
  • arthas常用
  • spring应用
  • javaAgent技术
  • 网站
友情链接
  • 分类
  • 标签
  • 归档

Kevin

你可以迷茫,但不可以虚度
首页
  • AI基础
  • RAG技术
  • 提示词工程
  • Wireshark抓包
  • 常见问题
  • 数据库
  • 代码技巧
  • 浏览器
  • 手册教程
  • 技术应用
  • 流程规范
  • github技巧
  • git笔记
  • vpn笔记
  • 知识概念
  • 学习笔记
  • 环境搭建
  • linux&运维
  • 微服务
  • 经验技巧
  • 实用手册
  • arthas常用
  • spring应用
  • javaAgent技术
  • 网站
友情链接
  • 分类
  • 标签
  • 归档
  • 手册教程

    • Linux常用指令
    • xxl-job环境 搭建
    • Yaml语言教程
    • Arthas手册
    • Jenkins教程
    • ElasticSearch安装
    • Kibana安装
    • SkyWalking链路追踪
    • Zookeeper安装
    • Kafka集群搭建
    • Kafka Manager安装
    • MySQL安装
    • Canal数据同步
      • 1 MySQL开启binlog
      • 2 安装部署Canal
      • 3 启动Zookeeper和Kafka
      • 4 Canal配置文件修改
      • 5 启动Canal
      • 6 测试数据同步
      • 7 安装包下载
    • Markdown使用教程
    • canal环境搭建
  • 技术应用

  • 流程规范

  • GitHub技巧

  • VPN

  • Git笔记

  • 实用手册
  • 手册教程
luoxiaofeng
2022-05-19
目录

Canal数据同步

提示

Canal + Kafka + Mysql 数据同步方案示例。

# 1 MySQL开启binlog

查看MySQLs是否开启了binlog及binlog-format是否ROW模式。

-- 查看数据库版本
select version(); 

-- 显示OFF未开启 ON开启
show variables like ‘log_bin’

-- binlog_format 有三种:ROW,STATEMENT,MIXID
show variables like 'binlog_format';
1
2
3
4
5
6
7
8

MySQL安装目录下修改my.ini。

-- 在mysqld下面添加
log_bin=mysql-bin
binlog-format=ROW
server-id=1

-- 进入命令行重启mysql
停止 net stop mysql57
启动 net start mysql57
1
2
3
4
5
6
7
8

赋予canal用户复制权限。

-- 创建用户
create user 'canal'@'%' identified by '123456';

-- REPLICATION CLIENT
-- REPLICATION SLAVE
-- 复制相关。一般复制账号需要这两个权限。
grant select,replication slave, replication client on *.* to 'canal'@'%';

-- 刷新权限
FLUSH PRIVILEGES;
1
2
3
4
5
6
7
8
9
10

# 2 安装部署Canal

提示

canal.deployer-1.1.5.tar.gz 安装包已先上传到 / 根目录上。

可通过wget方式下载:

wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz

# 创建目录
mkdir /louis/canal-1.1.5
# 解压
tar -zvxf canal.deployer-1.1.5.tar.gz -C /louis/canal-1.1.5
1
2
3
4

解压后目录如下

- bin    # 运维脚本文件
- conf   # 配置文件目录
  canal_local.properties  # canal本地配置,一般不需要改动
  canal.properties        # canal服务配置
  logback.xml             # logback日志配置
  metrics     # 度量统计配置
  spring      # spring-实例配置,主要和binlog位置计算、一些策略配置相关,可以在canal.properties选用其中的任意一个配置文件
  example     # 实例配置文件夹,一般认为单个数据库对应一个独立的实例配置文件夹
    instance.properties   # 实例配置,一般指单个数据库的配置
- lib    # 服务依赖包
- logs   # 日志文件输出目录
- plugin # 支持的插件目录
  connector.kafka-1.1.5-jar-with-dependencies.jar     #kafka依赖包
  connector.rabbitmq-1.1.5-jar-with-dependencies.jar  #rabbitmq依赖包
  connector.rocketmq-1.1.5-jar-with-dependencies.jar  #rocketmq依赖包
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# 3 启动Zookeeper和Kafka

/louis/zookeeper-3.5.9/bin/zkServer.sh start

/louis/kafka_2.8.1/bin/kafka-server-start.sh /louis/kafka_2.8.1/config/server.properties &
/louis/kafka_2.8.1/bin/kafka-server-start.sh /louis/kafka_2.8.1/config/server-1.properties &
/louis/kafka_2.8.1/bin/kafka-server-start.sh /louis/kafka_2.8.1/config/server-2.properties &
1
2
3
4
5

启动Zookeeper客户端查看Kafka启动情况

/louis/zookeeper-3.5.9/bin/zkCli.sh 

# 进入客户端
[zk: localhost:2181(CONNECTED) 1] ls /brokers/ids
[0, 1, 2]
1
2
3
4
5

Kafka新建同步用的topic

louis-topic
1

# 4 Canal配置文件修改

conf/canal.properties

# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = kafka
# 配置要同步的kafka信息
kafka.bootstrap.servers = 172.16.227.132:9092,172.16.227.132:9093,172.16.227.132:9094
1
2
3
4

conf/example/instance.properties

# 值改成安装mysql服务器的ip及端口号
canal.instance.master.address=127.0.0.1:3306
# 前面新建的数据库备份账号canal及其密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=123456

# 配置要同步的kafka信息
canal.mq.topic=louis-topic
canal.mq.partition=0
1
2
3
4
5
6
7
8
9

# 5 启动Canal

sh bin/startup.sh
1

# 6 测试数据同步

编写Kafka消费服务

package com.kafka.kafkaDemo;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class MsgConsumer {
  private final static String TOPIC_NAME = "louis-topic";
  private final static String CONSUMER_GROUP_NAME = "testGroup";

  public static void main(String[] args) throws Exception {
    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.16.227.132:9092,172.16.227.132:9093,172.16.227.132:9094");
    // 消费分组名
    props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
		/*
		consumer给broker发送心跳的间隔时间,broker接收到心跳如果此时有rebalance发生会通过心跳响应将
		rebalance方案下发给consumer,这个时间可以稍微短一点
		*/
    props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
        /*
        服务端broker多久感知不到一个consumer心跳就认为他故障了,会将其踢出消费组,
        对应的Partition也会被重新分配给其他consumer,默认是10秒
        */
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);

    //一次poll最大拉取消息的条数,如果消费者处理速度很快,可以设置大点,如果处理速度一般,可以设置小点
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
        /*
        如果两次poll操作间隔超过了这个时间,broker就会认为这个consumer处理能力太弱,
        会将其踢出消费组,将分区分配给别的consumer消费
        */
    props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);

    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

    consumer.subscribe(Arrays.asList(TOPIC_NAME));

    while (true) {
      /*
       * poll() API 是拉取消息的长轮询
       */
      ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
      for (ConsumerRecord<String, String> record : records) {
        System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),
                record.offset(), record.key(), record.value());
      }
    }
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59

Mysql插入测试数据

INSERT INTO `user`
(id, name)
VALUES(8, '888');
1
2
3

Kafka消费端收到消息

收到消息:partition = 0,offset = 11, key = null, value = 
{
    "data":[
        {
            "id":"8",
            "name":"888"
        }
    ],
    "database":"datatest",
    "es":1653296022000,
    "id":84,
    "isDdl":false,
    "mysqlType":{
        "id":"int(11)",
        "name":"varchar(50)"
    },
    "old":null,
    "pkNames":[
        "id"
    ],
    "sql":"",
    "sqlType":{
        "id":4,
        "name":12
    },
    "table":"user",
    "ts":1652899613492,
    "type":"INSERT"
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

Mysql新增列

ALTER TABLE datatest.`user` ADD sex varchar(2) NULL;
1

Kafka消费端收到消息

收到消息:partition = 0,offset = 12, key = null, value = 
{
    "data":null,
    "database":"datatest",
    "es":1653296552000,
    "id":85,
    "isDdl":true,
    "mysqlType":null,
    "old":null,
    "pkNames":null,
    "sql":"/* ApplicationName=DBeaver Ultimate 21.3.0 - SQLEditor &lt;Script-9.sql> */ ALTER TABLE datatest.`user` ADD sex varchar(2) NULL",
    "sqlType":null,
    "table":"user",
    "ts":1652900187875,
    "type":"ALTER"
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# 7 安装包下载

百度网盘

https://pan.baidu.com/s/1pvGbnkMpdqs3ICMpMsrVow(opens new window) (opens new window)

提取码 : cr1x

上次更新: 2022/06/02, 15:31:16
MySQL安装
Markdown使用教程

← MySQL安装 Markdown使用教程→

最近更新
01
AI是如何学习的
06-05
02
chatGpt提示原则
06-05
03
提示词工程实践指南
06-05
更多文章>
| Copyright © 2022-2025 Kevin | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式