12redis与mysql双写一致性
# 1.面试题
采用双检加锁策略
多个线程同时去查询数据库的这条数据,那么我们可以在第一个查询数据的请求上使用一个 互斥锁来锁住它。
其他的线程走到这一步拿不到锁就等着,等第一个线程查询到了数据,然后做缓存。
后面的线程进来发现已经有缓存了,就直接走缓存。

# 2.cancle
# 2.1是什么
canal [kə'næl],中文翻译为 水道/管道/沟渠/运河,主要用途是用于 MySQL 数据库增量日志数据的订阅、消费和解析,是阿里巴巴开发并开源的,采用Java语言开发;
# 2.2能干什么
- 数据库镜像
- 数据库实时备份
- 索引构建和实时维护(拆分异构索引、倒排索引等)
- 业务 cache 刷新
- 带业务逻辑的增量数据处理
# 2.3使用
https://github.com/alibaba/canal/releases/tag/canal-1.1.6 (opens new window)
# 3.工作原理
# 3.1传统MySQL主从复制工作原理

MySQL的主从复制将经过如下步骤:
- 1、当 master 主服务器上的数据发生改变时,则将其改变写入二进制事件日志文件中;
- 2、salve 从服务器会在一定时间间隔内对 master 主服务器上的二进制日志进行探测,探测其是否发生过改变,如果探测到 master 主服务器的二进制事件日志发生了改变,则开始一个 I/O Thread 请求 master 二进制事件日志;
- 3、同时 master 主服务器为每个 I/O Thread 启动一个dump Thread,用于向其发送二进制事件日志;
- 4、slave 从服务器将接收到的二进制事件日志保存至自己本地的中继日志文件中;
- 5、salve 从服务器将启动 SQL Thread 从中继日志中读取二进制日志,在本地重放,使得其数据和主服务器保持一致;
- 6、最后 I/O Thread 和 SQL Thread 将进入睡眠状态,等待下一次被唤醒;
# 3.2cancle的原理


# 4.mysql-canal-redis双写一致性Coding⭐
java案例,来源出处
https://github.com/alibaba/canal/wiki/ClientExample (opens new window)
mysql
查看mysql版本
SELECT VERSION();1当前的主机二进制日志
show master status;1查看SHOW VARIABLES LIKE 'log_bin';
- off
开启 MySQL的binlog写入i
my.inilog-bin=mysql-bin #开启 binlog binlog-format=ROW #选择 ROW 模式 server_id=1 #配置MySQL replaction需要定义,不要和canal的 slaveId重复1
2
3- ROW模式 除了记录sql语句之外,还会记录每个字段的变化情况,能够清楚的记录每行数据的变化历史,但会占用较多的空间。
- STATEMENT模式只记录了sql语句,但是没有记录上下文信息,在进行数据恢复的时候可能会导致数据的丢失情况;
- MIX模式比较灵活的记录,理论上说当遇到了表结构变更的时候,就会记录为statement模式。当遇到了数据更新或者删除情况下就会变为row模式;


- window my.ini
- linux my.cnf
重启mysql
再次查看SHOW VARIABLES LIKE 'log_bin';
- on
授权canal连接MySQL账号
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal'; // 创建 ALTER USER 'canal'@'%' IDENTIFIED BY 'canal'; // 更新 GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES;1
2
3
4
5
canal服务端
1.下载
- https://github.com/alibaba/canal/releases/tag/canal-1.1.6 (opens new window)
- 下载Linux版本:canal.deployer-1.1.6.tar.gz
2.解压
3.配置
/conf/example路径下instance.properties文件instance.properties

4.启动
.bin/startup.sh15.查看
- 查看 server 日志

- 查看 样例example 的日志

canal客户端(Java编写业务程序)
sql
CREATE TABLE `t_user` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `userName` varchar(100) NOT NULL, PRIMARY KEY (`id`) )1
2
3
4
5
6
7
8pom
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> </dependency> <dependency> <groupId>com.github.xiaoymin</groupId> <artifactId>knife4j-openapi3-jakarta-spring-boot-starter</artifactId> <version>4.4.0</version> </dependency> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.0</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> </dependency> <dependency> <groupId>com.alibaba.fastjson2</groupId> <artifactId>fastjson2</artifactId> <version>2.0.43</version> </dependency> </dependencies>1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40code
import com.alibaba.fastjson2.JSONObject; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.CanalEntry.*; import com.alibaba.otter.canal.protocol.Message; import redis.clients.jedis.Jedis; import java.net.InetSocketAddress; import java.util.List; import java.util.UUID; import java.util.concurrent.TimeUnit; public class RedisCanalClientExample { public static final Integer _60SECONDS = 60; public static final String REDIS_IP_ADDR = "192.168.1.106"; private static void redisInsert(List<Column> columns) { JSONObject jsonObject = new JSONObject(); for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); jsonObject.put(column.getName(), column.getValue()); } if (columns.size() > 0) { try (Jedis jedis = RedisUtils.getJedis()) { jedis.set(columns.get(0).getValue(), jsonObject.toJSONString()); } catch (Exception e) { e.printStackTrace(); } } } private static void redisDelete(List<Column> columns) { JSONObject jsonObject = new JSONObject(); for (Column column : columns) { jsonObject.put(column.getName(), column.getValue()); } if (columns.size() > 0) { try (Jedis jedis = RedisUtils.getJedis()) { jedis.del(columns.get(0).getValue()); } catch (Exception e) { e.printStackTrace(); } } } private static void redisUpdate(List<Column> columns) { JSONObject jsonObject = new JSONObject(); for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); jsonObject.put(column.getName(), column.getValue()); } if (columns.size() > 0) { try (Jedis jedis = RedisUtils.getJedis()) { jedis.set(columns.get(0).getValue(), jsonObject.toJSONString()); System.out.println("---------update after: " + jedis.get(columns.get(0).getValue())); } catch (Exception e) { e.printStackTrace(); } } } public static void printEntry(List<CanalEntry.Entry> entrys) { for (Entry entry : entrys) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { continue; } CanalEntry.RowChange rowChage = null; try { //获取变更的row数据 rowChage = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error,data:" + entry.toString(), e); } //获取变动类型 EventType eventType = rowChage.getEventType(); System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); for (RowData rowData : rowChage.getRowDatasList()) { if (eventType == EventType.INSERT) { redisInsert(rowData.getAfterColumnsList()); } else if (eventType == EventType.DELETE) { redisDelete(rowData.getBeforeColumnsList()); } else {//EventType.UPDATE redisUpdate(rowData.getAfterColumnsList()); } } } } public static void main(String[] args) { System.out.println("---------O(∩_∩)O哈哈~ initCanal() main方法-----------"); //================================= // 创建链接canal服务端 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(REDIS_IP_ADDR, 11111), "example", "", ""); int batchSize = 1000; //空闲空转计数器 int emptyCount = 0; System.out.println("---------------------canal init OK,开始监听mysql变化------"); try { connector.connect(); //connector.subscribe(".*\\..*"); connector.subscribe("demo.t_user"); connector.rollback(); int totalEmptyCount = 10 * _60SECONDS; while (emptyCount < totalEmptyCount) { System.out.println("我是canal,每秒一次正在监听:" + UUID.randomUUID().toString()); Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据 long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { emptyCount++; try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } else { //计数器重新置零 emptyCount = 0; printEntry(message.getEntries()); } connector.ack(batchId); // 提交确认 // connector.rollback(batchId); // 处理失败, 回滚数据 } System.out.println("已经监听了" + totalEmptyCount + "秒,无任何消息,请重启重试......"); } finally { connector.disconnect(); } } }1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138redisutil
public class RedisUtils { public static final String REDIS_IP_ADDR = "192.168.1.106"; public static final String REDIS_pwd = "123456abc"; public static JedisPool jedisPool; static { JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); jedisPoolConfig.setMaxTotal(20); jedisPoolConfig.setMaxIdle(10); jedisPool = new JedisPool(jedisPoolConfig, REDIS_IP_ADDR, 6379, 10000, REDIS_pwd); } public static Jedis getJedis() throws Exception { if (null != jedisPool) { return jedisPool.getResource(); } throw new Exception("Jedispool is not ok"); } }1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20log,当数据库变动时
我是canal,每秒一次正在监听:b6e49f8a-fac8-45ca-b4b7-f0ae2b45e9a8 ================> binlog[binlog.000078:1987] , name[demo,t_user] , eventType : INSERT id : 2 update=true userName : 2 update=true 我是canal,每秒一次正在监听:6c404c6b-4901-4e22-9cf1-b44e986832ea ================> binlog[binlog.000078:2277] , name[demo,t_user] , eventType : INSERT id : 3 update=true userName : 1 update=true id : 4 update=true userName : 2 update=true1
2
3
4
5
6
7
8
9
10
11
编辑 (opens new window)
上次更新: 2024/09/24, 22:41:00