Kevin's blog Kevin's blog
首页
  • AI基础
  • RAG技术
  • 提示词工程
  • Wireshark抓包
  • 常见问题
  • 数据库
  • 代码技巧
  • 浏览器
  • 手册教程
  • 技术应用
  • 流程规范
  • github技巧
  • git笔记
  • vpn笔记
  • 知识概念
  • 学习笔记
  • 环境搭建
  • linux&运维
  • 微服务
  • 经验技巧
  • 实用手册
  • arthas常用
  • spring应用
  • javaAgent技术
  • 网站
友情链接
  • 分类
  • 标签
  • 归档

Kevin

你可以迷茫,但不可以虚度
首页
  • AI基础
  • RAG技术
  • 提示词工程
  • Wireshark抓包
  • 常见问题
  • 数据库
  • 代码技巧
  • 浏览器
  • 手册教程
  • 技术应用
  • 流程规范
  • github技巧
  • git笔记
  • vpn笔记
  • 知识概念
  • 学习笔记
  • 环境搭建
  • linux&运维
  • 微服务
  • 经验技巧
  • 实用手册
  • arthas常用
  • spring应用
  • javaAgent技术
  • 网站
友情链接
  • 分类
  • 标签
  • 归档
  • 基础

  • JVM

  • Spring

  • Redis

  • 消息中间件

  • 持久化

    • 数据库设计三大范式
    • 执行计划Explain
    • 索引
    • 事务
    • 数据库锁
    • Elasticsearch 杂记
      • 1 canal
        • 1.1 什么是canal?
        • 1.2 canal的工作原理?
        • 1.3 canal高可用?
        • 1.4 canal投递消息到kafka
      • 2 mysql的binlog
        • 2.1 什么是binlog?
        • 2.2 怎么开启binlog?
        • 2.3 binlog有几种格式?
        • 2.4 binlog格式怎么选择?
      • 3 Oracle GoldenGate(ogg)
        • 3.1 什么是ogg?
      • 4 日志系统实现思路
      • 5 Elasticsearch
        • 5.1 elasticsearch的倒排索引是什么?
        • 5.2 elasticsearch索引数据多了怎么办?
        • 5.3 ES的集群架构,索引数据大小,分片有多少?
      • 6 RestHighLevelClient的使用
        • 6.1 查询
  • 算法

  • 网络

  • 系统架构

  • 知识整理
  • 持久化
luoxiaofeng
2022-04-28
目录

Elasticsearch 杂记

# 1 canal

# 1.1 什么是canal?

canal是用java开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。

目前,canal主要支持MySQL的binlog解析,解析完成才利用canal client用来处理获得的相关数据。数据库同步需要阿里的otter中间件,基于canal。

# 1.2 canal的工作原理?

首先了解一下mysql主备复制原理:

(1)master主库将改变记录,发送到二进制文件(binary log)中

(2)slave从库向mysql Master发送dump协议,将master主库的binary log events拷贝到它的中继日志(relay log)

(3)slave从库读取并重做中继日志中的事件,将改变的数据同步到自己的数据库

canal的工作原理

把自己伪装成slave,从master复制数据。读取binlog是需要master授权的,因为binlog是加密的,授权分用户名密码才能读。

master授权后不知道读他的binlog的是从机还是canal,他的所有传输协议都符合从机的标准,所以master一直以为是从机读的。

# 1.3 canal高可用?

有两个canal服务器都监控一个或多个mysql服务器的binlog,这两个canal服务同时只能有一个提供服务。

当提供服务的这个宕机时,zookeeper能知道,zookeeper就通知另一个canal服务器让他提供服务。

当原来宕机的那个再启动起来时,是抢占模式的,谁抢到就谁上,没抢到就standy模式。

canal本身就是一个工具不存数据,宕机了就宕机,只有还有另外一个能提供服务就行,所以没有什么同步问题(不像数据库有同步问题)。

# 1.4 canal投递消息到kafka

1.canal投递消息到kafka,可指定mysql库表,支持按库表指定字段hash投递的kafka的partition。

2.canal投递到kafka的消息体,例如:

ConsumerRecord(topic = binlog, partition = 0, offset = 29, CreateTime = 1647331490778, serialized key size = -1, serialized value size = 52, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"event":"datatest.user.update","value":[999,"ccc"]})

ConsumerRecord(topic = binlog, partition = 1, offset = 21, CreateTime = 1647331490777, serialized key size = -1, serialized value size = 62, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = "{\"event\":\"datatest.user.update\",\"value\":[999,\"ccc\"]}")

ConsumerRecord(topic = binlog, partition = 0, offset = 30, CreateTime = 1647268549467, serialized key size = -1, serialized value size = 276, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"data":[{"id":"999","name":"ccc"}],"database":"datatest","es":1647331490000,"id":8,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(50)"},"old":[{"name":"bbb"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12},"table":"user","ts":1647268549466,"type":"UPDATE"})

# 2 mysql的binlog

# 2.1 什么是binlog?

mysql的二进制日志,记录了所有的DDL和DML(除了数据查询语句),以事件的形式进行记录,包含语句执行消耗的时间,mysql的二进制日志是事务安全型的。

开启二进制日志大概会有1%的性能损坏。

二进制日志有2个主要的使用场景:

① mysql的主备复制

② 数据恢复,通过使用mysqlbinlog工具来恢复数据

(用这个做恢复是备选方案,主方案还是定期快照,定期执行脚本导数据,其实就是把当前所有数据导成insert,这个量少)

二进制日志包括2类文件:

①二进制日志索引文件(后缀为.index)用于记录所有的二进制文件

②二进制日志文件(后缀为.00000*)记录数据库所有的DDL和DML(除了数据查询语句)

# 2.2 怎么开启binlog?

1.修改my.cnf配置

2.重启mysql

3.查看开启状态

输入 show variables like 'log_bin'; 查看binlog开启状态。如下图所示。

输入 show variables like 'binlog_format'; 查看Binary Log记录方式。如下图所示。

# 2.3 binlog有几种格式?

binlog的格式有三种:STATEMENT,MIXED,ROW对比如下

# 2.4 binlog格式怎么选择?

如果只考虑主从复制的话可以用mixed。

抽取数据用于统计分析之类的话用row。

# 3 Oracle GoldenGate(ogg)

# 3.1 什么是ogg?

GoldenGate软件是一种基于日志的结构化数据复制软件,它通过解析源数据库在线日志或归档日志获得数据的增量变化,再将这些变化应用到目标 数据库,从而实现源数据库与目标数据库同步。

GoldenGate可以实现一对一、广播(一对多)、聚合(多对一)、双向、点对点、级联等多种拓扑结构。

# 4 日志系统实现思路

1.业务入口生成唯一traceId -> 发送kafka落日志文件(包含此次操作的用户信息)

-> 数据变更落数据库(包含traceId)

2.binglog -> kafka -> 有traceId变更的数据解析落日志文件(包含本次变更的所有字段)

3.flume收集日志 -> kafka -> flink接收日志并清洗 -> 写入es(相同traceId关联成一条数据)

# 5 Elasticsearch

# 5.1 elasticsearch的倒排索引是什么?

例如:查询指定关键词的文章。

传统的检索是:遍历文章找到有对应的关键词。

倒排索引:通过分词策略,形成词和文章的映射关系表,这种词典+映射表即为倒排索引。

# 5.2 elasticsearch索引数据多了怎么办?

1.使用滚动索引。基于模板+时间+rollover api滚动创建索引。

2.只保留指定时间范围内数据。

3.动态增加节点。ES自身支持动态扩展。

# 5.3 ES的集群架构,索引数据大小,分片有多少?

集群架构

ES的集群架构有23个节点,节点配置是16核64G的。

索引数据大小

该集群架构包括了订单服务和运单服务的索引,其中负责的运单服务包括寄件运单和派件运单索引。

索引根据录入时间每日递增(滚动索引)。当时每日新增数据两个索引大概四五千万,数据大小几十G。

分片数量

10个分片。5个主分片和5个副本分片。

# 6 RestHighLevelClient的使用

提示

ES官网推荐的ES客户端组件RestHighLevelClient, 其封装了操作ES的CRUD方法,底层原理就是模拟各种ES需要的请求,如PUT,POST,DELETE,GET等方式

# 6.1 查询

<dependency>
  <groupId>org.elasticsearch.client</groupId>
  <artifactId>elasticsearch-rest-high-level-client</artifactId>
  <version>6.8.1</version>
</dependency>
1
2
3
4
5
public class Test {
  public void search() {
    // 构建查询参数
    SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
    // 指定返回字段
    String[] includes = new String[]{"name", "age"};
    String[] excludes = new String[]{"sex"};
    searchSourceBuilder.fetchSource(includes, excludes);
    // 构建条件查询
    // and = filter/must; or = should
    BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
    searchSourceBuilder.query(boolQueryBuilder.filter(QueryBuilders.termsQuery("id", ids)));
    // 时间范围查询
    searchSourceBuilder.query(boolQueryBuilder.filter(QueryBuilders.rangeQuery("created_time").from(queryVO.getCreatedTime())));
    searchSourceBuilder.query(boolQueryBuilder.filter(QueryBuilders.rangeQuery("created_time").to(queryVO.getEndTime())));
    // 分页
    Integer size = queryVO.getSize();
    Integer from = (queryVO.getPage() - 1) * queryVO.getSize();
    if (size + from > 10000) {
      throw new RRException("分页参数不合理,暂不处理");
    }
    searchSourceBuilder.size(size);
    searchSourceBuilder.from(from);
    searchSourceBuilder.query(boolQueryBuilder);
    // 构建请求
    SearchRequest searchRequest = new SearchRequest(EsOperateTables.EXCEPTION_HANDLE.getAlias());// ES索引别名
    searchRequest.source(searchSourceBuilder);
    // 发起请求
    SearchResponse searchResponse = new SearchResponse();
    try {
      searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
    } catch (IOException e) {
      log.error(ErrorMsgConstant.ES_QUERY_ERROR, e);
      throw new RRException(ErrorMsgConstant.ES_QUERY_ERROR);
    }
    // 解析SearchResponse
    long totalHits = searchResponse.getHits().getTotalHits();
    List<Map<String, Object>> mapList = Lists.newArrayListWithCapacity(searchResponse.getHits().getHits().length);
    Arrays.stream(searchResponse.getHits().getHits()).forEach(hit -> mapList.add(hit.getSourceAsMap()));
    List<ResInfo> resInfos = new ArrayList<>();
    if (CollectionUtils.isNotEmpty(mapList)) {
      resInfos = JSON.parseArray(JSON.toJSONString(mapList), ResInfo.class);
    }
  }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#ES
上次更新: 2022/06/02, 11:20:10
数据库锁
排序算法

← 数据库锁 排序算法→

最近更新
01
AI是如何学习的
06-05
02
chatGpt提示原则
06-05
03
提示词工程实践指南
06-05
更多文章>
| Copyright © 2022-2025 Kevin | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式