clxmm
首页
  • 01redis学习

    • 01redis开始
  • 02redis学习

    • 01redis开始
  • vue2学习

    • 01vue学习
  • centos安装zsh

    • centos安装zsh
GitHub (opens new window)
首页
  • 01redis学习

    • 01redis开始
  • 02redis学习

    • 01redis开始
  • vue2学习

    • 01vue学习
  • centos安装zsh

    • centos安装zsh
GitHub (opens new window)
  • redis

    • 01redis
    • 02redis持久化
    • 03redis事务和管道
    • 04redis发布与订阅
    • 05Redis复制(replica)
    • 06Redis哨兵(sentinel)
    • 07Redis集群(cluster)
    • 08redis与SpringBoot集成
    • redis单线程与多线程
    • redis的BigKey
    • redis缓存双写一致性
    • 12redis与mysql双写一致性
      • 1.面试题
      • 2.cancle
        • 2.1是什么
        • 2.2能干什么
        • 2.3使用
      • 3.工作原理
        • 3.1传统MySQL主从复制工作原理
        • 3.2cancle的原理
      • 4.mysql-canal-redis双写一致性Coding⭐
    • 13案列bitmap-hyperlog-geo
    • 14布隆过滤器BloomFilter
    • 缓存预热、雪崩、击穿、穿透
    • redis的分布式锁
    • 17Redlock算法和缓存淘汰
    • 18Redis源码
  • redis02

  • 后端学习
  • redis
clxmm
2024-09-12
目录

12redis与mysql双写一致性

# 1.面试题

采用双检加锁策略

多个线程同时去查询数据库的这条数据,那么我们可以在第一个查询数据的请求上使用一个 互斥锁来锁住它。

其他的线程走到这一步拿不到锁就等着,等第一个线程查询到了数据,然后做缓存。

后面的线程进来发现已经有缓存了,就直接走缓存。

# 2.cancle

# 2.1是什么

canal [kə'næl],中文翻译为 水道/管道/沟渠/运河,主要用途是用于 MySQL 数据库增量日志数据的订阅、消费和解析,是阿里巴巴开发并开源的,采用Java语言开发;

# 2.2能干什么

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

# 2.3使用

https://github.com/alibaba/canal/releases/tag/canal-1.1.6 (opens new window)

# 3.工作原理

# 3.1传统MySQL主从复制工作原理

MySQL的主从复制将经过如下步骤:

  • 1、当 master 主服务器上的数据发生改变时,则将其改变写入二进制事件日志文件中;
  • 2、salve 从服务器会在一定时间间隔内对 master 主服务器上的二进制日志进行探测,探测其是否发生过改变,如果探测到 master 主服务器的二进制事件日志发生了改变,则开始一个 I/O Thread 请求 master 二进制事件日志;
  • 3、同时 master 主服务器为每个 I/O Thread 启动一个dump Thread,用于向其发送二进制事件日志;
  • 4、slave 从服务器将接收到的二进制事件日志保存至自己本地的中继日志文件中;
  • 5、salve 从服务器将启动 SQL Thread 从中继日志中读取二进制日志,在本地重放,使得其数据和主服务器保持一致;
  • 6、最后 I/O Thread 和 SQL Thread 将进入睡眠状态,等待下一次被唤醒;

# 3.2cancle的原理

# 4.mysql-canal-redis双写一致性Coding⭐

  • java案例,来源出处

    https://github.com/alibaba/canal/wiki/ClientExample (opens new window)

  • mysql

    • 查看mysql版本

      SELECT VERSION();
      
      1
    • 当前的主机二进制日志

      show master status;
      
      1
    • 查看SHOW VARIABLES LIKE 'log_bin';

      • off
    • 开启 MySQL的binlog写入i

      my.ini

      log-bin=mysql-bin #开启 binlog
      binlog-format=ROW #选择 ROW 模式
      server_id=1    #配置MySQL replaction需要定义,不要和canal的 slaveId重复
      
      1
      2
      3
      • ROW模式 除了记录sql语句之外,还会记录每个字段的变化情况,能够清楚的记录每行数据的变化历史,但会占用较多的空间。
      • STATEMENT模式只记录了sql语句,但是没有记录上下文信息,在进行数据恢复的时候可能会导致数据的丢失情况;
      • MIX模式比较灵活的记录,理论上说当遇到了表结构变更的时候,就会记录为statement模式。当遇到了数据更新或者删除情况下就会变为row模式;

      • window my.ini
      • linux my.cnf
    • 重启mysql

    • 再次查看SHOW VARIABLES LIKE 'log_bin';

      • on
    • 授权canal连接MySQL账号

      CREATE USER 'canal'@'%' IDENTIFIED BY 'canal'; // 创建
      ALTER USER 'canal'@'%' IDENTIFIED BY 'canal'; // 更新
      
      GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%';
      FLUSH PRIVILEGES;
      
      1
      2
      3
      4
      5
  • canal服务端

    • 1.下载

      • https://github.com/alibaba/canal/releases/tag/canal-1.1.6 (opens new window)
      • 下载Linux版本:canal.deployer-1.1.6.tar.gz
    • 2.解压

    • 3.配置

      /conf/example路径下instance.properties文件

      instance.properties

    • 4.启动

        .bin/startup.sh
      
      1
    • 5.查看

      • 查看 server 日志

    • 查看 样例example 的日志
  • canal客户端(Java编写业务程序)

    • sql

      CREATE TABLE `t_user` (
      
        `id` bigint(20) NOT NULL AUTO_INCREMENT,
      
        `userName` varchar(100) NOT NULL,
      
        PRIMARY KEY (`id`)
      ) 
      
      1
      2
      3
      4
      5
      6
      7
      8
    • pom

       <dependencies>
              <dependency>
                  <groupId>org.springframework.boot</groupId>
                  <artifactId>spring-boot-starter-web</artifactId>
              </dependency>
              <dependency>
                  <groupId>org.springframework.boot</groupId>
                  <artifactId>spring-boot-starter-data-redis</artifactId>
              </dependency>
      
              <dependency>
                  <groupId>org.apache.commons</groupId>
                  <artifactId>commons-pool2</artifactId>
              </dependency>
      
              <dependency>
                  <groupId>com.github.xiaoymin</groupId>
                  <artifactId>knife4j-openapi3-jakarta-spring-boot-starter</artifactId>
                  <version>4.4.0</version>
              </dependency>
      
      
              <dependency>
                  <groupId>com.alibaba.otter</groupId>
                  <artifactId>canal.client</artifactId>
                  <version>1.1.0</version>
              </dependency>
      
              <dependency>
                  <groupId>redis.clients</groupId>
                  <artifactId>jedis</artifactId>
              </dependency>
      
              <dependency>
                  <groupId>com.alibaba.fastjson2</groupId>
                  <artifactId>fastjson2</artifactId>
                  <version>2.0.43</version>
              </dependency>
      
          </dependencies>
      
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
    • code

      import com.alibaba.fastjson2.JSONObject;
      import com.alibaba.otter.canal.client.CanalConnector;
      import com.alibaba.otter.canal.client.CanalConnectors;
      import com.alibaba.otter.canal.protocol.CanalEntry;
      import com.alibaba.otter.canal.protocol.CanalEntry.*;
      import com.alibaba.otter.canal.protocol.Message;
      import redis.clients.jedis.Jedis;
      
      import java.net.InetSocketAddress;
      import java.util.List;
      import java.util.UUID;
      import java.util.concurrent.TimeUnit;
      
      public class RedisCanalClientExample {
          public static final Integer _60SECONDS = 60;
          public static final String REDIS_IP_ADDR = "192.168.1.106";
      
          private static void redisInsert(List<Column> columns) {
              JSONObject jsonObject = new JSONObject();
              for (Column column : columns) {
                  System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
                  jsonObject.put(column.getName(), column.getValue());
              }
              if (columns.size() > 0) {
                  try (Jedis jedis = RedisUtils.getJedis()) {
                      jedis.set(columns.get(0).getValue(), jsonObject.toJSONString());
                  } catch (Exception e) {
                      e.printStackTrace();
                  }
              }
          }
      
      
          private static void redisDelete(List<Column> columns) {
              JSONObject jsonObject = new JSONObject();
              for (Column column : columns) {
                  jsonObject.put(column.getName(), column.getValue());
              }
              if (columns.size() > 0) {
                  try (Jedis jedis = RedisUtils.getJedis()) {
                      jedis.del(columns.get(0).getValue());
                  } catch (Exception e) {
                      e.printStackTrace();
                  }
              }
          }
      
          private static void redisUpdate(List<Column> columns) {
              JSONObject jsonObject = new JSONObject();
              for (Column column : columns) {
                  System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
                  jsonObject.put(column.getName(), column.getValue());
              }
              if (columns.size() > 0) {
                  try (Jedis jedis = RedisUtils.getJedis()) {
                      jedis.set(columns.get(0).getValue(), jsonObject.toJSONString());
                      System.out.println("---------update after: " + jedis.get(columns.get(0).getValue()));
                  } catch (Exception e) {
                      e.printStackTrace();
                  }
              }
          }
      
          public static void printEntry(List<CanalEntry.Entry> entrys) {
              for (Entry entry : entrys) {
                  if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                      continue;
                  }
      
                  CanalEntry.RowChange rowChage = null;
                  try {
                      //获取变更的row数据
                      rowChage = RowChange.parseFrom(entry.getStoreValue());
                  } catch (Exception e) {
                      throw new RuntimeException("ERROR ## parser of eromanga-event has an error,data:" + entry.toString(), e);
                  }
                  //获取变动类型
                  EventType eventType = rowChage.getEventType();
                  System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                          entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                          entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));
      
                  for (RowData rowData : rowChage.getRowDatasList()) {
                      if (eventType == EventType.INSERT) {
                          redisInsert(rowData.getAfterColumnsList());
                      } else if (eventType == EventType.DELETE) {
                          redisDelete(rowData.getBeforeColumnsList());
                      } else {//EventType.UPDATE
                          redisUpdate(rowData.getAfterColumnsList());
                      }
                  }
              }
          }
      
      
          public static void main(String[] args) {
              System.out.println("---------O(∩_∩)O哈哈~ initCanal() main方法-----------");
      
              //=================================
              // 创建链接canal服务端
              CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(REDIS_IP_ADDR,
                      11111), "example", "", "");
              int batchSize = 1000;
              //空闲空转计数器
              int emptyCount = 0;
              System.out.println("---------------------canal init OK,开始监听mysql变化------");
              try {
                  connector.connect();
                  //connector.subscribe(".*\\..*");
                  connector.subscribe("demo.t_user");
                  connector.rollback();
                  int totalEmptyCount = 10 * _60SECONDS;
                  while (emptyCount < totalEmptyCount) {
                      System.out.println("我是canal,每秒一次正在监听:" + UUID.randomUUID().toString());
                      Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                      long batchId = message.getId();
                      int size = message.getEntries().size();
                      if (batchId == -1 || size == 0) {
                          emptyCount++;
                          try {
                              TimeUnit.SECONDS.sleep(1);
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          }
                      } else {
                          //计数器重新置零
                          emptyCount = 0;
                          printEntry(message.getEntries());
                      }
                      connector.ack(batchId); // 提交确认
                      // connector.rollback(batchId); // 处理失败, 回滚数据
                  }
                  System.out.println("已经监听了" + totalEmptyCount + "秒,无任何消息,请重启重试......");
              } finally {
                  connector.disconnect();
              }
          }
      }
      
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      77
      78
      79
      80
      81
      82
      83
      84
      85
      86
      87
      88
      89
      90
      91
      92
      93
      94
      95
      96
      97
      98
      99
      100
      101
      102
      103
      104
      105
      106
      107
      108
      109
      110
      111
      112
      113
      114
      115
      116
      117
      118
      119
      120
      121
      122
      123
      124
      125
      126
      127
      128
      129
      130
      131
      132
      133
      134
      135
      136
      137
      138

      redisutil

      public class RedisUtils {
          public static final String REDIS_IP_ADDR = "192.168.1.106";
          public static final String REDIS_pwd = "123456abc";
          public static JedisPool jedisPool;
      
          static {
              JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
              jedisPoolConfig.setMaxTotal(20);
              jedisPoolConfig.setMaxIdle(10);
              jedisPool = new JedisPool(jedisPoolConfig, REDIS_IP_ADDR, 6379, 10000, REDIS_pwd);
          }
      
          public static Jedis getJedis() throws Exception {
              if (null != jedisPool) {
                  return jedisPool.getResource();
              }
              throw new Exception("Jedispool is not ok");
          }
      
      }
      
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
    • log,当数据库变动时

      我是canal,每秒一次正在监听:b6e49f8a-fac8-45ca-b4b7-f0ae2b45e9a8
      ================&gt; binlog[binlog.000078:1987] , name[demo,t_user] , eventType : INSERT
      id : 2    update=true
      userName : 2    update=true
      
      我是canal,每秒一次正在监听:6c404c6b-4901-4e22-9cf1-b44e986832ea
      ================&gt; binlog[binlog.000078:2277] , name[demo,t_user] , eventType : INSERT
      id : 3    update=true
      userName : 1    update=true
      id : 4    update=true
      userName : 2    update=true
      
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
编辑 (opens new window)
上次更新: 2024/09/24, 22:41:00
redis缓存双写一致性
13案列bitmap-hyperlog-geo

← redis缓存双写一致性 13案列bitmap-hyperlog-geo→

最近更新
01
vue3
02-08
02
vue3-1
01-24
03
vue3
01-18
更多文章>
Theme by Vdoing | Copyright © 2024-2025 Evan Xu | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式