Redis


Redis

初识 Redis

认识 NoSQL

简单理解,SQL 代表了关系型数据库,而 NoSQL 代表了非关系型数据库。

对于关系型数据库,其特点有:

  1. 结构化:在关系型数据库中,使用表结构来存储数据,字段一般不轻易修改,并且还具有各式的约束以及数据类型。
  2. 关系性:表与表之间具有关系,一般通过外键建立关系约束。
  3. 查询:使用 SQL 语法查询,格式固定。
  4. 事务:关系型数据库的事务需要满足 ACID 的特性。
  5. 使用场景:数据结构固定,相关业务对数据安全性,一致性要求比较高。

对于非关系型数据库,其特点有:

  1. 非结构化:即没有很严格的结构化规定,如我们本章学习的 Redis 便是使用键值对存储的(值一般是 json 串)。
  2. 非关系性:使用 json 直接维护数据,不存在表结构,所以数据间的关系性不是很大。
  3. 查询:没有固定的语法格式查询,例如 Redis、MongoDB、elasticsearch 这些非关系型数据库的查询语法就有差异。
  4. 事务:非关系型数据库要么没有事务,要么只能满足基本的事务。即只能基本满足事务性质。
  5. 使用场景:数据结构不固定,对一致性,安全性要求不高但对性能要求较高的。

认识 Redis

Redis 诞生于 2009 年,全称是 Remote Dictionary Server,远程词典服务器,是一个基于内存的键值型 NoSQL 数据库。下载地址戳我。

特征:

  1. 键值型,value 支持多种不同数据结构,功能丰富。
  2. 单线程,每个命令具有原子性。
  3. 低延迟,速度快(基于内存、IO 多路复用、良好的编码)。
  4. 支持数据持久化。
  5. 支持主从集群、分片集群。
  6. 支持多语言客户端。

Redis 常见命令

命令可以在官网命令文档查看。

使用help [command]可以查看某个命令的具体用法。

Redis 的使用

Ubuntu 下安装 Redis 戳我。(以下的操作均为 Ubuntu 适用)

Redis 启动:service redis-server start

Redis 关闭:service redis-server stop

Redis 重启:service redis-server restart

使用 Redis:启动 Redis 后redis-cli -a [password]。(输入exit退出)

查看 Redis 状态:service redis-server status

Redis 数据结构介绍

Redis 中的 value 的类型多种多样。基本类型有:StringHashListSetSortedSet。除此之外还有GEOBitMapHyperLog等特殊类型,它们本质上也还是字符串。

key 的层级格式

Redis 没有类似 MySQL 中的 Table 的概念,我们该如何区分不同类型的 key 呢?(例如,需要存储用户、商品信息到 redis,有一个用户 id 是 1,有一个商品 id 恰好也是 1)

在 Redis 中,key 允许有多个单词形成层级结构,多个单词之间用:隔开,格式如下:项目名:业务名:类型:id。(这个格式并非固定,也可以根据自己的需求来删除或添加词条)

示例的 key 和 value 示例如下:

KEY VALUE
hnuxcc:user:1 {"id":1, "name":"Jack", "age":21}
hnuxcc:product:1 {"id":1, "name":"大米", "price":66}

Redis 通用命令

通用指令是部分数据类型的,都可以使用的指令,常见的有:

KEYS

该命令用于查看符合模板的所有 key,但是不建议在生产环境设备上使用(查找线程会阻塞其他线程工作)。

redis> MSET firstname Jack lastname Stuntman age 35
"OK"
redis> KEYS *name*
1) "lastname"
2) "firstname"
redis> KEYS a??
1) "age"
redis> KEYS *
1) "age"
2) "lastname"
3) "firstname"

DEL

该命令用于删除一个 key:

redis> SET key1 "Hello"
"OK"
redis> SET key2 "World"
"OK"
redis> DEL key1 key2 key3
(integer) 2
redis> KEYS *
(empty array)

EXISTS

该命令用于查看某个 key 是否存在:

redis> SET key1 "Hello"
"OK"
redis> EXISTS key1
(integer) 1
redis> EXISTS nosuchkey
(integer) 0
redis> SET key2 "World"
"OK"
redis> EXISTS key1 key2 nosuchkey
(integer) 2

EXPIRE

该命令用于给一个 key 设置有效期(**-1 是永久有效,-2 是过期**),有效期到期时该 key 会被自动删除。(用于在内存中删除 key,防止爆内存,短信的验证码的有效期就可以在这里设置)

  • NX– 仅当密钥没有过期时才设置过期
  • XX– 仅当密钥已过期时才设置过期时间
  • GT– 仅当新到期日大于当前到期日时才设置到期日
  • LT– 仅当新的到期日小于当前到期日时才设置到期日
redis> SET mykey "Hello"
"OK"
redis> EXPIRE mykey 10
(integer) 1
redis> TTL mykey
(integer) 10
redis> SET mykey "Hello World"
"OK"
redis> TTL mykey
(integer) -1
redis> EXPIRE mykey 10 XX
(integer) 0
redis> TTL mykey
(integer) -1
redis> EXPIRE mykey 10 NX
(integer) 1
redis> TTL mykey
(integer) 10

TTL

该命令用于查看一个 key 的有效期。

redis> SET mykey "Hello"
"OK"
redis> EXPIRE mykey 10
(integer) 1
redis> TTL mykey
(integer) 10

String 类型命令

String 类型,也就是字符串类型,是 Redis 中最简单的存储类型。其 value 是字符串,不过根据字符串的格式不同,又可以分为三类:

  1. string:普通字符串。
  2. int:整数类型,可以做自增、自减操作。
  3. float:浮点类型,可以做自增、自减操作。

不管是哪种存储格式,底层都是字节数组形式存储,只不过是编码方式不同。字符串类型的最大空间不能超过 512M。

常见命令有:

命令 描述
SET 添加或者修改已经存在的一个 String 类型的键值对
GET 根据 key 获取 String 类型的 value
MSET 批量添加多个 String 类型的键值对
MGET 根据多个 key 获取多个 String 类型的 value
INCR 让一个整型的 key 自增 1
INCRBY 让一个整型的 key 自增并指定步长(指定负数就是自减)
INCRBYFLOAT 让一个浮点类型的数字自增并指定步长
SETNX 添加一个 String 类型的键值对,前提是这个 key 不存在,否则不执行
SETEX 添加一个 String 类型的键值对,并且指定有效期

Hash 类型命令

Hash 类型,其 value 是一个无序字典,类似于 HashMap 的结构(Map<key, Map<field, value>>)。

String 结构是将对象序列化为 json 字符串后存储,当需要修改对象的某个字段时很不方便。而 Hash 结构是可以将对象中的每个字段独立存储,可以针对单个字段做 CRUD。

常见命令有:

命令 描述
HSET key field value 添加或者修改 hash 类型 key 的 field 值
HGET key field 获取一个 hash 类型 key 的 field 值
HMSET 批量添加多个 hash 类型 key 的 field 值
HMGET 批量获取多个 hash 类型 key 的 field 值
HGETALL 获取一个 hash 类型 key 中的所有 field 和 value
HKEYS 获取一个 hash 类型 key 中的所有 field
HVALS 获取一个 hash 类型 key 中的所有 value
HINCRBY 让一个 hash 类型 key 的字段自增并指定步长
HSETNX 添加一个 hash 类型的 key 的 field 值,前提是这个 field 不存在,否则不执行

List 类型命令

Redis 中的 List 类型与 Java 中的 LinkedList 类似,可以看做是一个双向链表结构。既可以支持正向检索和也可以支持反向检索。其特征也与 LinkedList 类似,常用来存储一个有序数据,例如:朋友圈点赞列表,评论列表等。

常见命令有:

命令 描述
LPUSH key elements 向列表左侧插入一个或多个元素
LPOP key 移除并返回列表左侧的第一个元素,没有则返回 null
RPUSH key elements 向列表右侧插入一个或多个元素
RPOP key 移除并返回列表右侧的第一个元素
LRANGE key star end 返回一段角标范围内的所有元素
BLPOP 和 BRPOP 与 LPOP 和 RPOP 类似,只不过在没有元素时等待指定时间,而不是直接返回 null

Set 类型命令

Redis 的 Set 结构与 Java 中的 HashSet 类似,可以看做是一个 value 为 null 的 HashMap。与 HashSet 不同的是,Redis 中的 Set 类型还支持交集、并集、差集等功能。

常见命令有:

命令 描述
SADD key members 向 set 中添加一个或多个元素
SREM key members 移除 set 中的指定元素
SCARD key 返回 set 中元素的个数
SISMEMBER key members 判断一个元素是否存在于 set 中
SMEMBERS 获取 set 中的所有元素
SINTER key1 key2 求交集
SDIFF key1 key2 求差集
SUNION key1 key2 求并集

SortedSet 类型命令

Redis 的 SortedSet 是一个可排序的 set 集合,与 Java 中的 TreeSet 有些类似,但底层数据结构却差别很大。SortedSet 中的每一个元素都带有一个 score 属性,可以基于 score 属性对元素排序,底层的实现是一个跳表(SkipList)加 hash 表。

因为 SortedSet 的可排序性,经常被用来实现排行榜这样的功能。

常见命令有:

命令 描述
ZADD key score members 添加一个或者多个元素到 sortedset,如果已经存在则更新值
ZREM key members 删除 sortedset 中的一个指定元素
ZSCORE key members 获取 sortedset 中指定元素的 score 值
Z(REV)RANK key members 获取 sortedset 中的指定元素的排名
ZCARD key 获取 sortedset 中的元素个数
ZCOUNT key min max 统计 score 值在给定范围内的所有元素的个数
ZINCRBY key increment menber 让 sortedset 中的指定元素自增,步长为指定的 increment 值
Z(REV)RANGE key min max 按照 score 排序后,获取指定排名范围内的元素
ZRANGEBYSCORE key min max 按照 score 排序后,获取指定 score 范围内的元素
ZDIFF、ZINTER、ZUNION 求差集、交集、并集

Jedis

Jedis 是一个 Java 操作的 Redis 客户端,使用 Jedis 需要引入依赖:

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>5.1.2</version>
</dependency>

Jedis 基本用法如下:

public class JedisTest {
    private Jedis jedis;
    
    @BeforeEach
    void setUp() {
        // 建立连接
        jedis = new Jedis("192.168.23.130", 6379);

        // 设置密码
        jedis.auth("Redis:040809");

        // 选择库
        jedis.select(0);
    }

    @Test
    void testJedis() {
        // Jedis的方法名和Redis的命令名一样
        String result = jedis.set("jedis", "testJedis");
        System.out.println(result);

        String res = jedis.get("jedis");
        System.out.println(res);
    }

    @AfterEach
    void tearDown() {
        // 关闭连接
        if (jedis != null) {
            jedis.close();
        }
    }
}

Jedis 是线程不安全的,并发环境下需要使用连接池防止性能出现损耗:

public class JedisConnectionFactory {
    private static final JedisPool JEDIS_POOL;

    static {
        // 配置连接池
        JedisPoolConfig poolConfig = new JedisPoolConfig();
        poolConfig.setMaxTotal(8);      // 最大连接数
        poolConfig.setMaxIdle(8);       // 最大空闲连接
        poolConfig.setMinIdle(0);       // 最小空闲连接
        poolConfig.setMaxWait(Duration.ofMillis(1000));  // 最大等待时间

        // 创建连接池对象
        JEDIS_POOL = new JedisPool(poolConfig, "192.168.23.130",
                6379, 1000, "Redis:040809");
    }

    public static Jedis getJedis() {
        return JEDIS_POOL.getResource();
    }
}

SpringDataRedis

SpringData 是 Spring 中数据操作的模块,包含对各种数据库的集成,其中对 Redis 的集成模块就叫做SpringDataRedis。提供了对不同 Redis 客户端的整合(Lettuce 和 Jedis),并提供了 RedisTemplate 统一 API 来操作 Redis。支持基于 Lettuce 的响应式编程,支持基于 Redis 的 JDKCollection 实现。

SpringDataRedis 中提供了 RedisTemplate 工具类,其中封装了各种对 Redis 的操作。并且将不同数据类型的操作 API 封装到了不同的类型中:

API 返回值类型 说明
redisTemplate.opsForValue() ValueOperations 操作 String 类型数据
redisTemplate.opsForHash() HashOperations 操作 Hash 类型数据
redisTemplate.opsForList() ListOperations 操作 List 类型数据
redisTemplate.opsForSet() SetOperations 操作 Set 类型数据
redisTemplate.opsForZSet() ZSetOperations 操作 SortedSet 类型数据
redisTemplate 通用的命令

使用时需要在 SpringBoot 中加入依赖:

<!-- Redis依赖 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- 连接池依赖 -->
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
</dependency>

接下来配置 Redis 信息:

spring:
  data:
    redis:
      host: 192.168.23.130
      port: 6379
      password: Redis:040809
      lettuce:
        pool:
          max-active: 8 # 最大连接
          max-idle: 8   # 最大空闲连接
          min-idle: 0   # 最小空闲连接
          max-wait: 100 # 连接等待时间

快速上手:

@SpringBootTest
class SpringDataRedisDemoApplicationTests {

    // 自动注入
    @Autowired
    private RedisTemplate redisTemplate;

    @Test
    void testString() {
        // 写入一条String数据
        redisTemplate.opsForValue().set("name", "Tom");

        // 获取String数据
        Object name = redisTemplate.opsForValue().get("name");
        System.out.println(name);
    }

}

RedisSerializer

RedisTemplate 可以接收任意 Object 作为值写入 Redis,只不过写入前会把 Object 序列化为字节形式,默认是采用 JDK 序列化,得到的结果类似于这样:\xac\xed\x00\x05t\x00\x04name。这会导致可读性差且内存占用较大。故我们需要改变 Redis 的序列化方式。

一般情况下,如果是 String 类型,使用 StringRedisSerializer 进行序列化,如果是 Object 类型,使用 GenericJackson2JsonRedisSerializer 进行序列化。

没有 Jackson 依赖的时候可以引入依赖:

<!-- Jackson依赖 -->
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
</dependency>
@Configuration
public class RedisConfig {

    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        // 创建RedisTemplate对象
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        // 设置连接工厂
        template.setConnectionFactory(factory);
        // 创建Json序列化工具
        GenericJackson2JsonRedisSerializer jsonRedisSerializer =
                new GenericJackson2JsonRedisSerializer();
        // 设置key的序列化
        template.setKeySerializer(RedisSerializer.string());
        template.setHashKeySerializer(RedisSerializer.string());
        // 设置value的序列化
        template.setValueSerializer(jsonRedisSerializer);
        template.setHashValueSerializer(jsonRedisSerializer);
        // 返回
        return template;
    }
}
@SpringBootTest
class SpringDataRedisDemoApplicationTests {

    // 自动注入,需要指定泛型
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    //...

}

StringRedisTemplate

当使用上述的序列化对象的时候,会存在一个小问题:就是当我们写入 Object 时,会顺带把 Object 的字节码写入 json 中,会额外占用内存,当数据量大的时候,会造成大量的内存开销。

为了节省内存空间,我们并不会使用 JSON 序列化器来处理 value,而是统一使用 String 序列化器,要求只能存储 String 类型的 key 和 value。当需要存储 Java 对象时,手动完成对象的序列化和反序列化。相当于是使用额外的代码量来减少额外的内存开销。

Spring 默认提供了一个 StringRedisTemplate 类,它的 key 和 value 的序列化方式默认就是 String 方式。省去了我们自定义 RedisTemplate 的过程

@SpringBootTest
class SpringDataRedisDemoApplicationTests {

    // 自动注入
    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    // 准备反序列化和序列化mapper
    ObjectMapper mapper = new ObjectMapper();

    @Test
    void testStringRedisTemplate() throws Exception {
        // 准备对象
        User user = new User("lisi", 19);
        // 手动序列化
        String json = mapper.writeValueAsString(user);
        // 写入一条数据到redis
        stringRedisTemplate.opsForValue().set("user:2", json);

        // 读取数据
        String value = stringRedisTemplate.opsForValue().get("user:2");
        // 反序列化
        User res = mapper.readValue(value, User.class);
        System.out.println(res);
    }
    
    @Test
    void testHash() {
        // Hash类型的存入
        stringRedisTemplate.opsForHash().put("user:3", "name", "wangwu");
        stringRedisTemplate.opsForHash().put("user:3", "age", "21");

        // Hash类型的取出,得到所有key和value
        Map<Object, Object> entries = stringRedisTemplate
            .opsForHash().entries("user:3");
        System.out.println(entries);
    }
}

短信登录

基于 Session 实现登录

使用 Session 登录有一个弊端,那就是 Session 共享问题:当项目的服务体量较大的时候,往往需要我们部署多台 Tomcat 服务器,而多台 Tomcat 服务器并不会共享 Session 存储空间,当请求切换到不同 Tomcat 服务的时候会导致数据丢失的问题。

基于 Redis 实现共享 Session 登录

在校验登录状态的时候,需要去刷新 token 的有效期,如果只有一个拦截器(该拦截器对特定路径进行登录状态校验)则没办法保证用户在访问不需要校验登录状态的页面的 token 有效期刷新。所以,我们不妨配置两个拦截器,第一个拦截器对所有路径进行拦截,主要做 token 有效期刷新以及存放数据到 ThreadLocal 的工作。第二个拦截器对需要进行登录状态校验的路径进行拦截,主要判断用户是否进行了登录:

参考代码

首先是第一个用来刷新 token 有效期的拦截器:

@Component
public class RefreshTokenInterceptor implements HandlerInterceptor {
    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @Override
    public boolean preHandle(HttpServletRequest request,
                             HttpServletResponse response,
                             Object handler) throws Exception {
        // 获取请求头中的token
        String token = request.getHeader("authorization");

        // token为空表示未登录
        if (StrUtil.isBlank(token)) {
            return true;
        }

        // 取出map
        String key = LOGIN_USER_KEY + token;
        Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(key);

        // 查找是否存在用户
        if (userMap.isEmpty()) {
            return true;
        }

        // 把map转回来并存入ThreadLocal中
        UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
        UserHolder.saveUser(userDTO);

        // 刷新token有效期
        stringRedisTemplate.expire(key, LOGIN_USER_TTL, TimeUnit.SECONDS);

        return true;
    }

    @Override
    public void afterCompletion(HttpServletRequest request,
                                HttpServletResponse response,
                                Object handler,
                                Exception ex) throws Exception {
        UserHolder.removeUser();
    }
}

第二个用来从 ThreadLocal 中取出数据的拦截器:

@Component
public class LoginInterceptor implements HandlerInterceptor {
    @Override
    public boolean preHandle(HttpServletRequest request,
                             HttpServletResponse response,
                             Object handler) throws Exception {
        // 判断threadlocal中是否有用户信息
        UserDTO user = UserHolder.getUser();

        if (user == null) {
            response.setStatus(401);
            return false;
        }

        // 有用户则直接放行
        return true;
    }
}

接下来是业务逻辑代码:

@Service
public class UserServiceImpl extends ServiceImpl<UserMapper, User>
                            implements IUserService {

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    // 发送短信验证码
    @Override
    public Result sendCode(String phone, HttpSession session) {
        // 校验手机号
        if (RegexUtils.isPhoneInvalid(phone)) {
            return Result.fail("手机号格式不正确");
        }

        // 利用hutool工具包生成验证码
        String code = RandomUtil.randomNumbers(6);

        // 保存验证码到redis,加上业务前缀进行key区分
        stringRedisTemplate.opsForValue().set(LOGIN_CODE_KEY + phone, code, LOGIN_CODE_TTL, TimeUnit.MINUTES);

        // 发送短信验证码
        log.debug("发送短信验证码成功, 验证码为: " + code);

        // 返回结果
        return Result.ok();
    }

    @Override
    public Result login(LoginFormDTO loginForm, HttpSession session) {
        // 校验手机号
        String phone = loginForm.getPhone();
        if (RegexUtils.isPhoneInvalid(phone)) {
            return Result.fail("手机号格式不正确");
        }

        // 校验验证码
        String cashCode = stringRedisTemplate.opsForValue().get(LOGIN_CODE_KEY + phone);
        String code = loginForm.getCode();
        if (cashCode == null || !cashCode.equals(code)) {
            return Result.fail("验证码错误");
        }

        // 使用mybatis-plus根据手机号查询用户
        User user = query().eq("phone", phone).one();
        if (user == null) {
            // 用户不存在,则创建新用户
            user = createUserWithPhone(phone);
        }

        // 保存用户信息到redis中
        String token = UUID.randomUUID().toString(true);
        UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
        // 注意把非String类型转换为String
        Map<String, Object> userMap = BeanUtil
                .beanToMap(userDTO, new HashMap<>(), CopyOptions.create()
                        .setIgnoreNullValue(true)
                        .setFieldValueEditor((fieldName, fieldValue) -> fieldValue.toString()));
        stringRedisTemplate.opsForHash().putAll(LOGIN_USER_KEY + token, userMap);
        // 设置token有效期
        stringRedisTemplate.expire(LOGIN_USER_KEY + token, LOGIN_USER_TTL, TimeUnit.SECONDS);

        // 把token返回给前端
        return Result.ok(token);
    }
}

商户查询缓存

缓存就是数据交换的缓冲区,是存贮数据的临时地方,一般读写性能较高。在 web 开发中,缓存的作用一般是降低后端负载,提高读写效率,降低响应时间。但使用缓存也有一定成本,一个就是数据一致性成本(redis 中的数据更新和 mysql 中数据的更新不一致),另一个就是代码维护成本和运维成本。

添加 Redis 缓存

缓存作用模型大致工作流程:请求先访问 Redis,命中,直接返回。如果未命中,再去数据库中查找,将数据库数据写入 Redis 后返回。

缓存更新策略

常见的缓存更新策略有:

方法 说明 一致性 维护成本
内存淘汰 不用自己维护,利用 Redis 的内存淘汰说明机制,当内存不足时自动淘汰部分数据。
超时剔除 给缓存数据添加 TTL 时间,到期后自动删除缓存。下次查询时更新缓存。 一般
主动更新 编写业务逻辑,在修改数据库的时候更新缓存

业务场景:

  • 低一致性需求:使用内存淘汰机制,例如店铺类型的查询缓存。
  • 高一致性需求:使用主动更新机制,并以超时剔除作为兜底方案,例如店铺详情查询的缓存。

主动更新策略有如下几种形式:

  1. Cache Aside Pattern:由缓存的调用者,在更新数据库的时候更新缓存。考虑性能和维护难度,我们一般使用这种方法。

    针对这个方法,我们有三点需要注意:

    1. 当数据发生变动时,我们应该是选择更新数据库并删除旧缓存,等查询时再更新缓存。而不是每次更新数据库都更新缓存,这样会导致无效写操作过多。
    2. 缓存与数据库的操作需要保证一致性,即同时成功、同时失败。针对于单体系统,我们可以把缓存与数据库的操作放在一个事务当中。而针对于分布式系统,我们可以利用 TCC 等分布式事务方案来解决。
    3. 考虑线程安全以及异常发生概率,我们应该先操作数据库,再操作缓存
  2. Read / Write Through Pattern:缓存与数据库整合为一个服务,由服务来维护一致性。调用者调用该服务,无需关心缓存一致性问题。

  3. Write Behind Caching Pattern:调用者只操作缓存,由其他线程异步将缓存数据持久化到数据库,保证最终一致。

结合缓存更新策略,我们可以得出示例代码:

@Service
public class ShopServiceImpl extends ServiceImpl<ShopMapper, Shop> 
    implements IShopService {

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    // 查询店铺信息
    @Override
    public Result queryById(Long id) {
        // 从redis查询缓存
        String shopJson = stringRedisTemplate.opsForValue().get(CACHE_SHOP_KEY + id);
        if (StrUtil.isNotBlank(shopJson)) {
            // Redis命中直接返回
            Shop shop = JSONUtil.toBean(shopJson, Shop.class);
            return Result.ok(shop);
        }

        // Redis不命中从数据库中查找
        Shop shop = getById(id);
        if (shop == null) {
            return Result.fail("店铺不存在");
        }
        // 写入Redis缓存,并设置过期时间
        stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id, JSONUtil.toJsonStr(shop), CACHE_SHOP_TTL, TimeUnit.MINUTES);

        return Result.ok(shop);
    }

    // 更新店铺信息
    @Override
    @Transactional  // 设置成事务,保证原子性
    public Result update(Shop shop) {
        // 更新数据库
        updateById(shop);

        // 删除缓存
        Long id = shop.getId();
        if (id == null) {
            return Result.fail("店铺id不能为空");
        }
        stringRedisTemplate.delete(CACHE_SHOP_KEY + id);
        
        return Result.ok();
    }
}

缓存穿透

缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都一定会打到数据库

常见的解决缓存穿透的方法有:

  1. 缓存空对象:当 Redis 和数据库都不命中的时候,这时向 Redis 中缓存一个空对象,这样保证下一次查询可以在 Redis 中查找到 value,只不过这个 value 为 null 而已。

    优点是实现简单,维护方便。但缺点也很明显,就是会造成额外的内存消耗(可以通过对空对象设置一个比较短的 TTL 来解决),也可能造成短期的数据不一致问题。

  2. 布隆过滤:该算法会在客户端和 Redis 之间再添加一层过滤器(布隆过滤器),每一次请求先询问过滤器该数据是否存在,如果不存在,直接拒绝这次请求(布隆过滤器的实现方式有点类似于位图,其将数据库中的数据哈希过后转入位图当中,因为哈希冲突的问题,导致布隆过滤器响应数据存在时,是有一定误差的)。

    优点是内存占用较少,没有多余的 key。缺点就是实现复杂(可以使用 Redis 内置的 BigMap 来处理),存在误判可能。

使用缓存空对象优化查询代码:

@Override
    public Result queryById(Long id) {
        // 从redis查询缓存
        String shopJson = stringRedisTemplate.opsForValue().get(CACHE_SHOP_KEY + id);
        if (StrUtil.isNotBlank(shopJson)) {
            // Redis命中直接返回
            Shop shop = JSONUtil.toBean(shopJson, Shop.class);
            return Result.ok(shop);
        }

        // 命中是否是空值
        if ("".equals(shopJson)) {
            return Result.fail("商铺不存在");
        }

        // Redis不命中从数据库中查找
        Shop shop = getById(id);
        if (shop == null) {
            // 将空值写入redis
            stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id, "", CACHE_NULL_TTL, TimeUnit.MINUTES);
            return Result.fail("商铺不存在");
        }
        // 写入Redis缓存,并设置过期时间
        stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id, JSONUtil.toJsonStr(shop), CACHE_SHOP_TTL, TimeUnit.MINUTES);

        return Result.ok(shop);
    }

缓存雪崩

缓存雪崩是指在同一时段大量的缓存 key 同时失效或者 Redis 服务宕机,导致大量请求到达数据库,带来巨大压力。

常见的缓存雪崩解决方案有:

  1. 给不同 key 的 TTL 添加随机值。
  2. 利用 Redis 集群提高服务的可用性。
  3. 给缓存业务添加降级限流策略。
  4. 给业务添加多级缓存。

缓存击穿

缓存击穿问题也叫热点 key 问题,就是一个被高并发访问并且缓存重建业务较复杂的 key 突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。

常见的缓存击穿解决方案有:

  1. 互斥锁。(可能会导致大量线程等待的情况)
  2. 逻辑过期。(在存储 value 的时候专门多一个字段来存储过期时间)

使用互斥锁的示例代码:

// 缓存击穿代码封装
public Shop queryWithMutex(Long id) {
    // 从redis查询缓存
    String shopJson = stringRedisTemplate.opsForValue().get(CACHE_SHOP_KEY + id);
    if (StrUtil.isNotBlank(shopJson)) {
        // Redis命中直接返回
        return JSONUtil.toBean(shopJson, Shop.class);
    }

    // 命中是否是空值
    if ("".equals(shopJson)) {
        return null;
    }

    // redis未命中,实现缓存重建
    // 获取互斥锁
    String lockKey = CACHE_SHOP_KEY + id;
    Shop shop = null;
    try {
        if (!tryLock(lockKey)) {
            // 失败则休眠
            Thread.sleep(50);
            return queryWithMutex(id);
        }

        // Redis不命中从数据库中查找
        shop = getById(id);
        if (shop == null) {
            // 将空值写入redis
            stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id, "", CACHE_NULL_TTL, TimeUnit.MINUTES);
            return null;
        }
        // 写入Redis缓存,并设置过期时间
        stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY + id, JSONUtil.toJsonStr(shop), CACHE_SHOP_TTL, TimeUnit.MINUTES);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    } finally {
        // 释放互斥锁
        unlock(lockKey);
    }

    return shop;
}

// 尝试获取锁
private boolean tryLock(String key) {
    // setnx设置锁
    Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
    return BooleanUtil.isTrue(flag);
}

// 释放锁
private void unlock(String key) {
    stringRedisTemplate.delete(key);
}

缓存工具类封装

基于 StringRedisTemplate 封装一个缓存工具类,满足下列需求:

  1. 将任意 Java 对象序列化为 json 并存储在 string 类型的 key 中,并且可以设置 TTL 过期时间。
  2. 将任意 Java 对象序列化为 json 并存储在 string 类型的 key 中,并且可以设置逻辑过期时间,用于处理缓存击穿问题。
  3. 根据指定的 key 查询缓存,并反序列化为指定类型,利用缓存空值的方式解决缓存穿透问题。
  4. 根据指定的 key 查询缓存,并反序列化为指定类型,需要利用逻辑过期解决缓存击穿问题。
@Slf4j
@Component
public class CacheClient {

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    // 线程池
    private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);

    // 向redis中存入缓存
    public void set(String key, Object value, Long time, TimeUnit timeUnit) {
        stringRedisTemplate.opsForValue()
                .set(key, JSONUtil.toJsonStr(value), time, timeUnit);
    }

    // 存入带有逻辑过期的缓存
    public void setWithLogicalExpire(String key, Object value, Long time, TimeUnit timeUnit) {
        // 设置逻辑过期
        RedisData redisData = new RedisData();
        redisData.setData(value);
        redisData.setExpireTime(LocalDateTime.now().plusSeconds(timeUnit.toSeconds(time)));
        // 带有逻辑过期时间的对象写入redis
        stringRedisTemplate.opsForValue()
                .set(key, JSONUtil.toJsonStr(redisData));
    }

    // 使用空对象解决缓存穿透
    public <R, ID> R queryWithPassThrough(String keyPrefix, ID id,
                                          Class<R> clazz, Function<ID, R> dbFallback,
                                          Long time, TimeUnit timeUnit) {
        String key = keyPrefix + id;
        // 从redis查询
        String json = stringRedisTemplate.opsForValue().get(key);
        if (StrUtil.isNotBlank(json)) {
            // redis命中则把对象反序列化出去
            return JSONUtil.toBean(json, clazz);
        }

        if (key.isEmpty()) {    // 字符串为空值
            return null;
        }

        // 调用函数查询数据库
        R r = dbFallback.apply(id);
        if (r == null) {
            // 数据库查询结果为空,向redis写入空值
            stringRedisTemplate.opsForValue().set(key, "", time, timeUnit);
            return null;
        }
        // 结果存在,直接写入redis
        stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(r), time, timeUnit);
        return r;
    }

    // 使用逻辑过期解决缓存击穿
    public <R, ID> R queryWithLogicalExpire(String keyPrefix, String lockKeyPrefix, ID id,
                                            Class<R> clazz, Function<ID, R> dbFallback,
                                            Long time, TimeUnit timeUnit) {
        String key = keyPrefix + id;
        // 从redis中查询缓存
        String json = stringRedisTemplate.opsForValue().get(key);
        if (StrUtil.isNotBlank(json)) {
            return null;
        }

        // 如果redis命中,需要将json反序列化为对象
        RedisData redisData = JSONUtil.toBean(json, RedisData.class);
        R r = JSONUtil.toBean((JSONObject) redisData.getData(), clazz);
        LocalDateTime expireTime = redisData.getExpireTime();

        // 判断是否过期
        if (expireTime.isAfter(LocalDateTime.now())) {
            // 未过期,直接返回
            return r;
        }

        // 如果已过期,需要去获取锁,重建redis缓存
        String lockKey = lockKeyPrefix + id;
        if (tryLock(lockKey)) {
            // 上锁成功,使用新线程重建缓存
            CACHE_REBUILD_EXECUTOR.submit(() -> {
               try {
                   // 查询数据库
                   R r1 = dbFallback.apply(id);
                   // 写入redis
                   this.setWithLogicalExpire(key, r1, time, timeUnit);
               } catch (Exception e) {
                    throw new RuntimeException(e);
               } finally {
                    // 释放锁
                   unlock(lockKey);
               }
            });
        }

        // 最后返回旧值
        return r;
    }


    // 尝试获取锁
    private boolean tryLock(String key) {
        // setnx设置锁
        Boolean flag = stringRedisTemplate.opsForValue()
                .setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
        return BooleanUtil.isTrue(flag);
    }

    // 释放锁
    private void unlock(String key) {
        stringRedisTemplate.delete(key);
    }
}

分布式锁

集群下的线程并发问题

在单体系统中,有些时候我们会使用 synchronized 给代码上锁,因为 synchronized 的原理是在 JVM 中有一个锁监视器(存在于常量池中),能够保证使用锁监视器里的锁对象可以唯一。

但是在集群模式下,JVM 并不唯一,所以导致各个 JVM 中的锁对象是独立开来的,这就会导致 synchronized 关键字的锁失效。

基本原理

synchronized 关键字的锁失效的原因是各个集群里锁监视器并不共享,所以,如果我们能够实现一个共享的锁监视器,那么就可以实现分布式锁。

分布式锁的核心是实现多进程之间的互斥,而满足这一点的方法很多,常见的三种有:

性能评估 MySQL Redis Zookeeper
互斥 利用 mysql 本身的互斥锁机制 利用 setnx 这样的互斥指令 利用节点的唯一性和有序性实现互斥
高可用
高性能 一般 一般
安全性 断开连接,自动释放锁 利用锁超时时间,到期释放 临时节点,断开连接自动释放

基于 Redis 的分布式锁

public interface ILock {

    /**
     * 尝试获取锁
     * @param timeOutSec 锁持有的超时时间,过期后自动释放
     * @return true代表获取成功,false代表获取失败
     */
    boolean tryLock(long timeOutSec);

    /**
     * 释放锁
     */
    void unlock();
}

基于上述接口实现一个非阻塞分布式锁:

public class SimpleRedisLock implements ILock {

    private static final String keyPrefix = "lock:";

    private String name;    // 锁的业务名称,传入时需要包含用户名称等唯一标识信息

    private StringRedisTemplate stringRedisTemplate;

    public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
        this.name = name;
        this.stringRedisTemplate = stringRedisTemplate;
    }

    @Override
    public boolean tryLock(long timeOutSec) {
        // 获取线程标识作为value,这个是有巧思的,具体见下章解析
        String threadName = Thread.currentThread().getName();
        // 获取锁
        Boolean success = stringRedisTemplate.opsForValue()
                .setIfAbsent(keyPrefix + name, threadName, timeOutSec, TimeUnit.SECONDS);
        return Boolean.TRUE.equals(success);
    }

    @Override
    public void unlock() {
        stringRedisTemplate.delete(keyPrefix + name);
    }
}

锁的使用:

Long userId = UserHolder.getUser().getId();
// 创建锁对象
SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
// 尝试获取锁
boolean isLock = lock.tryLock(5);
if (!isLock) {
    return Result.fail("异常信息");
}

try {
    // ...业务代码
} catch (IllegalStateException e) {
    throw new RuntimeException(e);
} finally {
    // 释放锁
    lock.unlock();
}

分布式锁误删问题

假设同一个用户产生了多个线程请求:线程 1 申请了锁,但是因为线程 1 的业务很繁琐,导致业务还没结束,锁就到了 TTL 时间,提前释放掉了。这个时候来了一个线程 2,获取锁并开始业务。此刻线程 1 的业务完成,释放锁,这个时候就会释放掉线程 2 的锁

解决方法就是在存入 key 和 value 的时候,不仅 key 要可以具有唯一标识性,value 也要有唯一标识性,这样释放锁的时候就可以根据 value 来判断这个锁是不是自己的锁了:

public class SimpleRedisLock implements ILock {

    private static final String KEY_PREFIX = "lock:";

    // 使用随机UUID标识唯一性
    private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";

    private String name;    // 锁的业务名称

    private StringRedisTemplate stringRedisTemplate;

    public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
        this.name = name;
        this.stringRedisTemplate = stringRedisTemplate;
    }

    @Override
    public boolean tryLock(long timeOutSec) {
        // 获取线程标识作为value
        String value = ID_PREFIX + Thread.currentThread().getName();
        // 获取锁
        Boolean success = stringRedisTemplate.opsForValue()
                .setIfAbsent(KEY_PREFIX + name, value, timeOutSec, TimeUnit.SECONDS);
        return Boolean.TRUE.equals(success);
    }

    @Override
    public void unlock() {
        // 获取标识
        String value = ID_PREFIX + Thread.currentThread().getName();
        String val = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
        // 判断是否一致
        if (value.equals(val)) {
            // 释放锁
            stringRedisTemplate.delete(KEY_PREFIX + name);
        }
    }
}

分布式锁原子性问题

假设同一个用户产生了多个线程请求:线程 1 申请了锁,这一次,线程 1 的业务很快执行完了,便顺利地走到了释放锁阶段。但是,在释放锁时,我们注意到判断标识是否一致以及真正释放锁的动作并不是原子性的。所以,倘若线程 1 在判断标识正确后,即将释放锁前,进入了阻塞状态(JVM 的垃圾回收机制可能导致这种占用资源的情况),就有可能导致当线程 1 苏醒时,锁又到达了 TTL,提前释放。这时又来了线程 2 并开启线程业务,此时的线程 1 又会去释放掉线程 2 的锁。

解决这个问题可以使用 Lua 脚本。Redis 提供了 Lua 脚本的功能,在一个脚本中可以编写多条 Redis 命令,确保多条命令执行的原子性。其中,调用 redis 命令的语法如下:

redis.call('命令名称', 'key', '其他参数', ...)

-- 示例
redis.call('set', 'name', 'jack')	-- 等价于redis命令: set name jack
local name = redis.call('get', 'name')	-- 等价于redis命令: get name
return name

而 Redis 中调用脚本的指令是 EVAL

EVAL "return redis.call('set', 'name', 'jack')" 0

如果脚本中的 key、value 不想写死,可以作为参数传递。key 类型的参数会放入 KEYS 数组,其他参数会放入 ARGV 数组,在脚本中可以从 KEYS 和 ARGV 数组中获取这些参数:

EVAL "return redis.call('set', KEYS[1], ARGV[1])" 1 name Rose

上述命令中,后面的数字 1 表示 KEYS 参数的个数为 1,也就是告诉系统说 KEYS 数组只有一个参数,这个参数是 name,剩下的 Rose 是属于 ARGV 数组的。(Lua 脚本中,数组下标从 1 开始索引)

了解了上述之后,我们便可以动手开始写脚本了。释放锁的业务流程是这样的:

  1. 获取锁中的线程标识。
  2. 判断是否与指定的标识一致。
  3. 如果一致则释放锁。
  4. 如果不一致则什么都不做。

对应脚本如下:

-- 锁的key
local key = KEYS[1]
-- 当前线程标识
local threadId = ARGV[1]

-- 获取锁中的标识
local id = redis.call('get', KEYS[1])
-- 比较线程标识与锁中的标识是否一致
if (id == threadId) then
    --释放锁
    redis.call('del', KEYS[1])
end
return 0

分布式锁的 unlock 方法更改如下:

private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;

static {
    UNLOCK_SCRIPT  = new DefaultRedisScript<>();
    // 设置脚本位置,直接在类路径下查找
    UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
    // 返回值配置
    UNLOCK_SCRIPT.setResultType(Long.class);
}

@Override
public void unlock() {
    // 调用execute方法执行lua脚本
    stringRedisTemplate.execute(
            UNLOCK_SCRIPT,
            Collections.singletonList(KEY_PREFIX + name),
            ID_PREFIX + Thread.currentThread().getName()
    );

}

Redisson

上述基于 setnx 实现的分布式锁还是存在一些问题:

  1. 不可重入:同一个线程无法多次获取同一把锁。
  2. 不可重试:获取锁只重试一次就返回 false,没有重试机制。
  3. 超时释放:锁超时释放虽然可以避免死锁,但是如果业务执行耗时较长,也会导致锁释放,存在安全隐患。
  4. 主从一致性:如果 Redis 提供了主从集群,主从同步存在延迟,当主宕机时,如果从并没有同步主中锁数据,则会出现锁失效。

功能介绍

Redisson 是一个在 Redis 基础上实现的 Java 驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的 Java 常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。

快速入门

我们先引入依赖:

<dependency>
   <groupId>org.redisson</groupId>
   <artifactId>redisson</artifactId>
   <version>3.33.0</version>
</dependency>  

然后再配置 Redisson 客户端:

@Configuration
public class RedissonConfig {

    @Bean
    public RedissonClient redissonClient() {
        // 配置
        Config config = new Config();
        // useSingleServer是单机环境的配置
        config.useSingleServer()
                .setAddress("redis://192.168.23.130:6379")
                .setPassword("Redis:040809");
        // 创建客户端
        return Redisson.create(config);
    }
}

最后就可以使用锁了:

@Resource
private RedissonClient redissonClient;

@Test
void testRedisson() throws InterruptedException {
    // 获取锁(可重入),指定锁的名称
    RLock lock = redissonClient.getLock("anyLock");
    // 尝试获取锁
    // 参数:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位
    boolean isLock = lock.tryLock(1, 10, TimeUnit.SECONDS);
    // 判断获取锁成功
    if (isLock) {
        try {
            // 执行业务...
        } finally {
            // 释放锁
            lock.unlock();
        }
	}
}

Redisson 可重入锁原理

原先我们自己借助 Redis setnx 等指令实现的锁之所以没办法做到重入,是因为对于同一个线程来讲,key 的值是相同的,导致如果同一个线程有多次嵌套获取锁的请求的话,就会造成死锁。

为了解决上述问题,我们不仅要在 value 中记录线程的标识(用于避免误删问题),还要记录重入的次数。故此,使用 Redis 的 Hash 结构来存储就可以符合我们的需求。并且也不再需要使用 setnx 命令了,对于锁是否存在,需要使用 exists 命令手动判断。且释放锁的时候需要对重入次数进行扣减的操作,并且重置锁的 TTL。上述对于锁的操作比较多,所以需要使用 Lua 脚本保证原子性。

获取锁的 Lua 脚本:

local key = KEYS[1];	-- 锁的key
local threadId = ARGV[1];	-- 线程唯一标识
local releaseTime = ARGV[2];	-- 锁的自动释放时间
-- 判断是否存在
if (redis.call('exists', key) == 0) then
    -- 不存在,获取锁
    redis.call('hset', key, threadId, '1');
    -- 设置有效期
    redis.call('expire', key, releaseTime);
    return 1;	-- 返回结果
end;
-- 锁已经存在,判断threadId是否是自己
if (redis.call('hexists', key, threadId) == 1) then
    -- 不存在,获取锁,重入次数+1
    redis.call('hincrby', key, threadId, '1');
    -- 设置有效期
    redis.call('expire', key, releaseTime);
    return 1;	-- 返回结果
end;
return 0;	-- 代码如果能走到这里,说明获取锁的不是自己,返回false

释放锁的 Lua 脚本:

local key = KEYS[1];	-- 锁的key
local threadId = ARGV[1];	-- 线程唯一标识
local releaseTime = ARGV[2];	-- 锁的自动释放时间
-- 判断当前锁是否还是自己持有
if (redis.call('hexists', key, threadId) == 0) then
    return nil;	-- 如果已经不是自己,则直接返回
end;
-- 是自己的锁,则重入次数减1
local count = redis.call('hincrby', key, threadId, -1);
-- 判断重入次数是否已经为0
if (count > 0) then
    -- 大于0说明不能释放锁,重置有效期后返回
    redis.call('expire', key, releaseTime);
    return nil;
else	-- 等于0说明可以释放锁,直接删除
    redis.call('del', key);
    return nil;
end;

Redisson 的锁重试和 WatchDog 机制

锁重试

Redisson 每一次尝试获取锁都会返回一个 ttl,如果成功获取锁,ttl 为空,如果获取锁失败,ttl 记录锁的超时时间。也就意味着,只要 ttl 不为空,锁就获取失败,需要进行锁重试。

而锁的重试也不是马上执行(因为一般情况下,其他线程拿到锁之后并不会马上释放,所以如果获取锁失败后马上重试,会比较浪费 cpu 资源),而是会先去等待一个锁释放的信号量。在实际的释放锁的 Lua 脚本中,最后会有一个 publish 的命令,相当于信号量那样,向系统广播说已经有锁释放了。只要接收到这个信号量,就意味着别人已经释放掉锁,这个时候才会去进行锁的重试。

WatchDog 机制

在分布式环境中,锁的持有者可能会因为各种原因(例如网络分区、节点宕机)而无法在锁到期前主动释放锁。WatchDog 的目的是为了防止锁提前释放,确保锁在持有者故障恢复之前不会被其他节点获取。如果没有 WatchDog,这种情况下锁会过期并自动释放,其他节点可能会获得该锁,从而导致数据一致性问题。

在 Redisson 中,锁的默认有效期是 30 秒。在这段时间内,如果锁的持有者没有显式释放锁,WatchDog 会自动延长锁的有效期。当一个客户端获取到锁时,Redisson 会启动一个 WatchDog 线程。这个线程会启动一个定时任务,每隔一段时间(通常是锁有效期的三分之一)检查一次锁的状态。如果锁仍然由当前客户端持有,WatchDog 会向 Redis 发送命令,延长锁的有效期。如果锁已经被显式释放或客户端崩溃(无法续期),WatchDog 会停止工作。

值得注意的是,只有当程序员使用默认的超时时间时才会启动 WatchDog,如果程序员自己设定了超时时间,则不会触发 WatchDog 机制。

获取锁和释放锁的图例如下:

Redisson 的 multiLock 原理

在 Redis 的集群模式中,主节点通常主要是来处理写的操作,而读的操作交由从节点来处理。主从负责的功能不同,但是数据却需要保证一致,这就需要主从进行同步。但是,主从同步也是需要时间的。倘若客户端连接主节点申请了一个锁之后,主节点的锁信息还未完全同步到从节点时,主节点宕机,这个时候 Redis 集群哨兵会把其中的一个从节点升级为主节点,来继续维持服务。但是,这个新的主节点并没有同步到之前的数据,所以,就产生了锁失效的问题。

Redisson 的解决方式是,把每一个节点都当成独立的节点来处理。每当客户端更新数据的时候,需要依次向每一个节点进行数据的更新,只有每一个节点都完成了更新,才能够结束操作。同样,在申请锁的时候,只有当每一个节点上都申请到了,才能够完成锁的申请。(因为向每个节点申请锁需要耗费时间,所以,如果申请成功且程序员指定了过期时间,Redisson 会将所有的锁的有效期刷新,如果没有指定过期时间,则触发 WatchDog 机制刷新有效期)这样,就可以保证节点之间的数据一致性。

Redisson 通过多个客户端创建锁之后,调用 getMultiLock 方法创建联锁:

RLock lock1 = redissonClient1.getLock("order");
RLock lock2 = redissonClient2.getLock("order");
RLock lock3 = redissonClient3.getLock("order");

// 创建联锁,这里用哪一个客户端创建都一样
lock = redissonClient1.getMultiLock(lock1, lock2, lock3);

// 获取锁,此时会遍历上述三个锁,只有三个锁全部获取成功,才判定为成功
boolean isLock = lock.tryLock(1L, TimeUnit.SECONDS);

消息队列

消息队列(Message Queue),字面意思就是存放消息的队列。最简单的消息队列模型包括 3 个角色:

  • 消息队列:存储和管理消息,也被称为消息代理(Message Broker)。
  • 生产者:发送消息到消息队列。
  • 消费者:从消息队列获取消息并处理消息。

如果直接使用 JDK 自带的阻塞队列来实现生产者消费者模型,是存在问题的:

  1. 使用的是 JDK 自带的阻塞队列,占用的是 JVM 的内存,高并发情况下存在内存限制问题。
  2. 现在是基于内存保存用户订单信息的,如果服务宕机,内存数据丢失,会造成数据安全问题。

而消息队列是独立于 JVM 的一个服务,不会占用 JVM 内存,且消息队列还会将数据持久化,并确保消息会被消费一次,保证数据的安全性。

Redis 提供了三种不同的方式来实现消息队列:

  • list 结构:基于 List 结构模拟消息队列。
  • PubSub:基本的点对点消息模型。
  • Stream:比较完善的消息队列模型。

基于 List 结构模拟消息队列

Redis 的 List 结构是一个双向链表,很容易模拟出队列效果。不过要注意的是,单纯的 LPUSH 和 RPOP 并不是阻塞操作,为了实现阻塞等待,需要使用 BRPOP 或者 BLPOP 来取出消息。

优点:

  • 利用 Redis 存储,不受限于 JVM 内存上限。
  • 基于 Redis 的持久化机制,数据安全性有保证。
  • 可以满足消息有序性。

缺点:

  • 无法避免消息丢失。
  • 只支持单消费者。

基于 PubSub 的消息队列

PubSub(发布订阅)是 Redis 2.0 版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个 channel,生产者向对应 channel 发送消息后,所有订阅者都能收到相关消息。

  • SUBSCRIBE channel [channel]:订阅一个或者多个频道。
  • PUBLISH channel msg:向一个频道发送消息。
  • PSUBSCRIBE pattern[pattern]:订阅与 pattern 格式匹配的所有频道。

优点:

  • 采用发布订阅模型,支持多生产、多消费。

缺点:

  • 不支持数据持久化。
  • 无法避免消息丢失。
  • 消息堆积有上限,超出时数据丢失。

基于 Stream 的消息队列

Stream 是 Redis 5.0 引入的一种新数据类型,可以实现一个功能非常完善的消息队列。

发送消息的命令:

XADD key [NOMKSTREAM] [<MAXLEN | MINID> [= | ~] threshold
  [LIMIT count]] <* | id> field value [field value ...]

例如:

# 创建名为users的队列,并向其中一个发送消息,内容是{name:jack,age=21},并且使用Redis自动生成的id
XADD users * name jack age 21

读取消息的命令:

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id
  [id ...]

例如:

# 从users队列中,阻塞读1条信息,$表示读最新消息
XREAD COUNT 1 BLOCK 1000 STREAMS users $

在业务开发中,我们可以循环的调用XREAD阻塞方式来查询最新消息,从而实现持续监听队列的效果,伪代码如下:

while (true) {
    // 尝试读取队列中的消息,最多阻塞2秒
    Object msg = redis.execute("XREAD COUNT 1 BLOCK 2000 STREAMS users $");
    if (msg == null) {
        continue;
	}
    // 处理消息
    handleMessage(msg);
}

注意:当我们指定其实 ID 为 $ 时,代表读取最新的消息,如果我们处理一条消息的过程中,又有超过 1 条以上的消息到达队列,则下次获取时也只能获取到最新的一条,会出现漏读消息的问题。

Stream 的消费者组模式

消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备以下特点:

  1. 消息分流:队列中的消息会分流给组内的不同消费者,而不是重复消费,从而加快消息处理的速度。
  2. 消息标识:消费者组会维护一个标示记录最后一个被处理的消息哪怕消费者岩机重启,还会从标示之后读取消息。确保每一个消息都会被消费。
  3. 消息确认:消费者获取消息后,消息处于 pending 状态,并存入一个 pending-list。当处理完成后需要通过XACK 来确认消息,标记消息为已处理,才会从 pending-list 移除。

创建消费者组:

XGROUP CREATE key groupName ID [MKSTREAM]
# key-队列名称
# groupName-消费者组名称
# ID-起始id标识,$代表队列中最后一个消息,0代表队列中第一个消息
# MKSTREAM-队列不存在时自动创建队列

其他常见命令:

# 删除指定的消费者组
XGROUP DESTORY key groupName

# 给指定的消费者组添加消费者
XGROUP CREATECONSUMER key groupName consumername

# 删除消费者组中的指定消费者
XGROUP DELCONSUMER key groupName consumername

从消费者组读取消息:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds]
  [NOACK] STREAMS key [key ...] id [id ...]
# group-消费组名称
# consumer-消费者名称,如果消费者不存在,会自动创建一个消费者
# count-本次查询的最大数量
# BLOCK milliseconds-当没有消息时最长等待时间
# NOACK-无需手动ACK,获取到消息后自动确认
# STREAMS key-指定队列名称
# ID-获取消息的起始ID
## 填入 > 号-从下一个未消费的消息开始
## 填入其他符号-根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始

确认消息:

XACK key group id [id ...]

获取 pending-list 中的消息:

XPENDING key group [[IDLE min-idle-time] start end count [consumer]]

消费者监听消息的基本思路:

while (true) {
    // 尝试监听队列,使用阻塞模式,最长等待2000ms
    Object msg = redis.call("XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >");
    if (msg == null) {	// null说明没有消息,继续下一次
        continue;
    }
    try {
        // 处理消息,完成后一定要ACK
        handleMessge(msg);
	} catch (Exception e) {
        while (true) {
            Object msg = redis.call("XREADGROUP GROUP g1 c1 COUNT 1 STREAMS s1 0");
            if (msg == null) {	// null说明没有异常消息,结束循环
                break;
            }
            try {
                // 说明有异常消息,再次处理
                handlMessage(msg);
            } catch (Exception e) {
                // 再次出现异常,记录日志,继续循环
                continue;
            }
        }
    }
}

优惠券秒杀

全局唯一 ID

每个店铺都可以发布优惠券,用户抢购时,会生成订单并保存到数据库表中,而订单表如果使用数据库自增 ID 就会存在一些问题:例如 id 的规律性太明显,且会受单表数据量的限制等。

所以,为了解决上述问题,我们需要一个全局 ID 生成器,这是一种在分布式系统下用来生成全局唯一 ID 的工具。可以使用雪花算法来生成这个唯一 ID。

可以使用 Redis 来帮助我们实现这个全局唯一 ID 生成器(以下算法类似雪花算法):

@Component
public class RedisIdWorker {

    // 以2024年起点为时间戳
    private static final long BEGIN_TIMESTAMP = 1704067200L;

    // 序列号位数
    private static final int COUNT_BITS = 32;

    @Resource
    StringRedisTemplate stringRedisTemplate;

    public long nextId(String keyPrefix) {
        // 生成时间戳
        LocalDateTime now = LocalDateTime.now();
        long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
        long timestamp = nowSecond - BEGIN_TIMESTAMP;

        // 生成序列号
        // 先获取日期来做key的标识
        String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
        long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);

        // 利用位运算拼接出id
        return timestamp << COUNT_BITS | count;
    }
}

实现优惠券下单

@Override
@Transactional  // 加上事务保证一致性
public Result setkillVoucher(Long voucherId) {
    // 查询优惠券信息
    SeckillVoucher voucher = seckillVoucherService.getById(voucherId);

    // 判断秒杀是否开始
    if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
        return Result.fail("秒杀尚未开始");
    }
    // 判断秒杀是否结束
    if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
        return Result.fail("秒杀已经结束");
    }

    // 判断库存是否充足
    if (voucher.getStock() < 1) {
        return Result.fail("库存不足");
    }

    // 扣减库存
    seckillVoucherService.update()
            .setSql("stock = stock - 1")
            .eq("voucher_id", voucherId).update();

    // 创建订单
    VoucherOrder voucherOrder = new VoucherOrder();
    // 设置订单id,利用全局唯一id生成器
    long orderId = redisIdWorker.nextId("order");
    voucherOrder.setId(orderId);
    // 从ThreadLocal获取用户id
    Long userId = UserHolder.getUser().getId();
    voucherOrder.setUserId(userId);
    // 代金券id
    voucherOrder.setVoucherId(voucherId);

    // 写入数据库
    save(voucherOrder);

    return Result.ok(orderId);
}

超卖问题

高并发情况下,针对上述代码 “扣减库存” 这一部分,因为其需要让多个线程访问共享资源,所以存在线程不安全问题。容易出现超卖问题(数据库的库存被扣减成小于 0)。

解决类似超卖这种多线程安全问题,通常有两种解决方案:

  1. 悲观锁:认为线程安全问题一定会发生,因此在操作数据之前先获得锁,保证线程串行执行。
  2. 乐观锁:认为线程安全问题不一定会发生,因此不加锁,只是在更新数据的时候判断有没有其他线程对数据做了修改。如果没有修改则认为是安全的,自己才更新数据。如果已经被其他线程修改说明发生了安全问题,此时可以重试或者报异常。

乐观锁的关键是判断之前得到的数据是否有被修改过,常见的方法有两种:

  1. 版本号法:对每个要操作的数据维护一个版本号,在操作 sql 的时候指定版本号更新(where version = xxx),并且每次操作都要更新版本,这样可以避免多线程安全问题。
  2. CAS 法:Compare And Set,这是一个优化后的版本号法,直接利用要操作的数据本身作为版本号,可以节省内存占用。

Redis 优化秒杀

目前的业务逻辑如下:

查询优惠券、判断秒杀库存、查询订单、校验一人一单、减库存、创建订单。

其中,查询优惠券、查询订单、减库存、创建订单这几个操作需要我们和数据库进行交互,数据库的高并发性能并不好,并且,这几个操作都是串行执行,消耗的时间也是很长。这就需要我们进行性能的优化。

我们可以使用 Redis + 异步执行的操作来优化,具体的流程图如下:

改进后的秒杀业务如下:

  1. 新增秒杀优惠券的同时,将优惠券的信息保存到 Redis 中。
  2. 基于 Lua 脚本,判断秒杀库存、一人一单,决定用户是否抢购成功。
  3. 如果抢购成功,将优惠券 id 和用户 id 封装后存入消息队列。
  4. 开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能。

添加秒杀券的同时保存信息到 Redis 中:

@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
    // 保存优惠券
    save(voucher);
    // 保存秒杀信息
    SeckillVoucher seckillVoucher = new SeckillVoucher();
    seckillVoucher.setVoucherId(voucher.getId());
    seckillVoucher.setStock(voucher.getStock());
    seckillVoucher.setBeginTime(voucher.getBeginTime());
    seckillVoucher.setEndTime(voucher.getEndTime());
    seckillVoucherService.save(seckillVoucher);
    // 保存秒杀信息到Redis中
    stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString());
}

创建 Stream 类型的消息队列:

XGROUP CREATE stream.orders g1 0 MKSTREAM

编写 Lua 脚本:

-- 优惠券id
local voucherId = ARGV[1]
-- 用户id
local userId = ARGV[2]
-- 订单id
local orderId = ARGV[3]

-- 库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 订单key
local orderKey = 'seckill:order:' .. voucherId

-- 脚本业务
-- 判断库存是否充足
if (tonumber(redis.call('get', stockKey)) <= 0) then
    -- 库存不足,返回1
    return 1
end
-- 判断用户是否下单
if (redis.call('sismember', orderKey, userId) == 1) then
    -- 存在,重复下单,返回2
    return 2
end
-- 扣库存
redis.call('incrby', stockKey, -1)
-- 下单(保存用户)
redis.call('sadd', orderKey, userId)
-- 发送消息到消息队列    XADD stream.orders * k1 v1 k2 v2
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
return 0

执行 Lua 脚本:

private static final DefaultRedisScript<Long> SECKILL_SCRIPT;

static {
    SECKILL_SCRIPT  = new DefaultRedisScript<>();
    // 设置脚本位置,直接在类路径下查找
    SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
    // 返回值配置
    SECKILL_SCRIPT.setResultType(Long.class);
}

private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

private IVoucherOrderService proxy = null;

@Override
public Result setkillVoucher(Long voucherId) {
    // 获取用户id和订单id
    long orderId = redisIdWorker.nextId("order");
    Long userId = UserHolder.getUser().getId();
    // 执行Lua脚本
    Long result = stringRedisTemplate.execute(
            SECKILL_SCRIPT,
            Collections.emptyList(),
            voucherId.toString(), userId.toString(), String.valueOf(orderId)
    );

    // 判断结果
    int res = result.intValue();
    if (res != 0) {
        // 不为0表示没有购买资格
        return Result.fail(res == 1 ? "库存不足" : "不能重复下单");
    }
    // 提前获取代理对象
    proxy = (IVoucherOrderService) AopContext.currentProxy();

    return Result.ok(orderId);
}

@PostConstruct  // 当类加载之后马上加载线程任务
public void init() {
    // 线程池提交处理订单的任务
    SECKILL_ORDER_EXECUTOR.submit(() -> {
        String queueName = "stream.orders";
        while (true) {
            try {
                // 获取消息队列中的消息
                List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                        Consumer.from("g1", "c1"),   // 传入组的名称和消费者的名称
                        StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),    // 一次读一个,阻塞等待2s
                        StreamOffset.create(queueName, ReadOffset.lastConsumed())
                );

                // 判断消息是否获取成功
                if (list == null || list.isEmpty()) {
                    // 获取失败,轮询
                    continue;
                }

                // 解析消息
                MapRecord<String, Object, Object> record = list.getFirst();
                Map<Object, Object> values = record.getValue();
                VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
                // 如果成功,创建订单
                handleVoucherOrder(voucherOrder);

                // 确认消息处理成功
                stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
            } catch (Exception e) {
                // 消息处理出现异常,从pending-list中获取
                handlePendingList();
            }
        }
    });
}

private void handlePendingList() {
    String queueName = "stream.orders";
    while (true) {
        try {
            // 获取消息队列中的消息
            List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                    Consumer.from("g1", "c1"),   // 传入组的名称和消费者的名称
                    StreamReadOptions.empty().count(1),    // 一次读一个,阻塞等待2s
                    StreamOffset.create(queueName, ReadOffset.from("0"))
            );

            // 判断消息是否获取成功
            if (list == null || list.isEmpty()) {
                // 获取失败,说明pending-list中没有消息
                break;
            }

            // 解析消息
            MapRecord<String, Object, Object> record = list.getFirst();
            Map<Object, Object> values = record.getValue();
            VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
            // 如果成功,创建订单
            handleVoucherOrder(voucherOrder);

            // 确认消息处理成功
            stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
        } catch (Exception e) {
            // 消息处理再次出现异常,不处理,直接打印
            log.error("消息处理异常");
        }
    }
}

private void handleVoucherOrder(VoucherOrder voucherOrder) {
    // 获取用户id,这里不能从UserHolder中取,因为线程池中的线程在执行,而不是主线程在执行
    Long userId = voucherOrder.getUserId();
    // 创建锁对象,实际上这里不创建锁也没问题,因为redis中,lua脚本已经保证了并发安全了
    RLock lock = redissonClient.getLock("lock:order:" + userId);
    // 获取锁
    boolean isLock = lock.tryLock();
    if (!isLock) {
        log.error("不允许重复下单");
        return;
    }
    try {
        proxy.createVoucherOrder(voucherOrder);
    } finally {
        lock.unlock();
    }
}

@Transactional  // 保证原子性
public void createVoucherOrder(VoucherOrder voucherOrder) {
    // 一人一单
    // 从ThreadLocal获取用户id
    Long userId = voucherOrder.getUserId();
    Long count = query()
            .eq("user_id", userId)
            .eq("voucher_id", voucherOrder.getVoucherId())
            .count();

    if (count > 0) {
        log.error("该用户已经购买过一次");
        return;
    }

    // 扣减库存
    seckillVoucherService.update()
            .setSql("stock = stock - 1")
            .eq("voucher_id", voucherOrder.getVoucherId())
            .gt("stock", 0)
            .update();

    // 写入数据库
    save(voucherOrder);
}

达人探店

点赞功能

同一个用户只能点赞一次,再次点击则取消点赞。主要的流程就是在 Redis 中存入一个 Set 集合,key 值为博客 id,value 值是给这个博客点过赞的用户 id:

@Override
public Result likeBlog(Long id) {
    // 获取登录用户
    Long userId = UserHolder.getUser().getId();
    // 判断当前用户是否点赞
    String key = BLOG_LIKED_KEY + id;
    Boolean isMember = stringRedisTemplate.opsForSet()
            .isMember(key, userId.toString());

    if (Boolean.FALSE.equals(isMember)) {
        // 没点过赞
        boolean isSuccess = update()
            .setSql("liked = liked + 1")
            .eq("id", id)
            .update();
        if (isSuccess) {    // 更新Redis
            stringRedisTemplate.opsForSet().add(key, userId.toString());
        }
    } else {
        // 点过赞了
        boolean isSuccess = update()
            .setSql("liked = liked - 1")
            .eq("id", id)
            .update();
        if (isSuccess) {    // 更新Redis
            stringRedisTemplate.opsForSet().remove(key, userId.toString());
        }
    }

    return Result.ok();
}

点赞排行榜

需要返回点赞时间 top5 的用户,相较于上一小节使用 Set 集合,本业务使用 SortedSet 更加合适。把用户点赞的时间戳(System.currentTimeMillis())记录成 score,就可以进行排序操作了。

@Override
public Result queryBlogLikes(Long id) {
    String key = BLOG_LIKED_KEY + id;
    // 查询top5点赞用户
    Set<String> top5 = stringRedisTemplate.opsForZSet().range(key, 0, 4);
    if (top5 == null || top5.isEmpty()) {
        return Result.ok(Collections.emptyList());
    }
    List<Long> ids = top5.stream().map(Long::valueOf).toList();
    // 根据用户id查询用户
    List<User> users = userService.listByIds(ids);
    List<UserDTO> list = users.stream()
        .map(user -> BeanUtil.copyProperties(user, UserDTO.class))
        .toList();
    return Result.ok(list);
}

好友关注

共同关注

对于当前用户和指定用户的共同关注,可以利用 Redis 中,Set 结构的求交集来实现业务逻辑。(当然,别忘记在好友关注的时候顺便把数据存放到 Redis 当中)

@Override
public Result followCommons(Long id) {
    // 获取当前的用户
    Long userId = UserHolder.getUser().getId();
    String key = "follows:" + userId;

    // 求交集
    String key2 = "follows:" + id;
    Set<String> intersect = stringRedisTemplate.opsForSet().intersect(key, key2);
    if (intersect == null || intersect.isEmpty()) {
        // 无交集
        return Result.ok(Collections.emptyList());
    }

    // 解析信息
    List<Long> ids = intersect.stream().map(Long::valueOf).toList();
    // 查询用户
    List<User> users = userService.listByIds(ids);
    List<UserDTO> list = users.stream()
            .map(user -> BeanUtil.copyProperties(user, UserDTO.class))
            .toList();
    return Result.ok(list);
}

关注推送

关注推送也叫 Feed 流,直译为投喂。为用户持续的提供 “沉浸式” 的体验,通过无限下拉刷新获取新的信息。Feed 是根据用户行为为用户匹配其喜爱的内容。

Feed 流产品有两种常见模式:

  • Timeline:不做内容筛选,简单的按照内容发布时间排序,常用于好友或关注。例如朋友圈。
    • 优点:信息全面,不会有缺失。并且实现也相对简单。
    • 缺点:信息噪音较多,用户不一定感兴趣,内容获取效率低。
  • 智能排序:利用智能算法屏蔽掉违规的、用户不感兴趣的内容。推送用户感兴趣信息来吸引用户。
    • 优点:投喂用户感兴趣信息,用户粘度很高,容易沉迷。
    • 缺点:如果算法不精准,可能起到反作用。

我们基于好友关注来进行的推送,使用的是 Timeline 模式,该模式的实现方案有三种:

  1. 拉模式。

    也叫读扩散,具体方法是:每个用户都配置有一个发件箱和收件箱,如果发布消息,就会往发件箱中投递。而当用户要查看其关注的人的信息时,就根据当前用户的关注列表,拉取对应的发件箱信息到当前用户的收件箱中即可。

    该模式的优点是信息只保存一份(主要在发件箱中保存),节省内存。缺点就是每一次收件的时候,需要拉取信息且根据发布的时间戳进行排序,这就意味着我们拉取信息的延迟较高。

  2. 推模式。

    也叫写扩散,具体方法是:去除发件箱,保留收件箱。每个用户发送新信息的时候,直接推送到其所有粉丝的收件箱中,再对收件箱的信息进行排序。该模式拉取信息的效率较高,但是对应的写出信息的效率便很低。

  3. 推拉结合。

    也叫读写混合,兼具推拉模式的优点。对于粉丝较少的用户,使用推模式。对于粉丝较多的用户,其活跃粉丝使用推模式,非活跃粉丝使用拉模式。

基于推模式实现关注推送功能

根据上述所示,收件箱需要满足根据时间戳排序的功能,所以可以使用 Redis 中的数据结构来实现。考虑到查询收件箱的数据时,还需要实现分页查询。但是,Feed 流中的数据会不断更新,所以数据的角标也在不断变化,因此不能采用传统的分页模式。针对上述分页问题,我们需要对 Feed 流采取滚动分页模式。

所谓滚动分页模式,指的是每一次查询后记录最后一次查询的信息,而后下一次查询从记录的最后一次查询信息开始。考虑到 SortedSet 可以进行范围查询,也就是说,针对滚动分页的功能,我们可以记录最后一次查询的信息的时间戳,然后下一次要查询的时候,查找小于这个时间戳的范围即可。

故最终我们使用 SortedSet 来作为收件箱的数据结构。

推送博客的功能如下:

@Override
public Result saveBlog(Blog blog) {
    // 获取登录用户
    UserDTO user = UserHolder.getUser();
    blog.setUserId(user.getId());

    // 保存探店博文
    boolean isSuccess = save(blog);
    if (!isSuccess) {
        return Result.fail("新增博客失败");
    }

    // 查询博客作者的所有粉丝
    List<Follow> follows = followService.query().eq("follow_user_id", user.getId()).list();

    // 推送博客给所有粉丝
    follows.forEach(follow -> {
        // 获取粉丝id
        Long userId = follow.getUserId();
        // 推送到粉丝的收件箱中
        String key = FEED_KEY + userId;
        stringRedisTemplate.opsForZSet()
                .add(key, blog.getId().toString(), System.currentTimeMillis());
    });

    // 返回id
    return Result.ok(blog.getId());
}

滚动分页查询涉及到了几个参数:

  1. max:时间戳范围最大值,第一次查询的时候取当前时间戳,而后每一次查询完都记录为上一次查询的最小时间戳。
  2. min:时间戳大于0,所以该值为常量,0。
  3. offset:相同时间戳下的元素偏移量,一开始为 0,而后每一次查询都记录为上一次结果中,与最小时间戳一样的元素的个数。
  4. count:每一次分页要取几条,由前端决定,这里我们以 3 为例。

所以滚动分页查询的实现如下:

// 滚动分页需要用到的实现类
public class ScrollResult {
    private List<?> list;
    private Long minTime;
    private Integer offset;
}
@Override
public Result queryBlogOfFollow(Long max, Integer offset) {
    // 获取当前用户
    Long userId = UserHolder.getUser().getId();

    // 查询收件箱
    String key = FEED_KEY + userId;
    // 滚动分页查询
    Set<ZSetOperations.TypedTuple<String>> typedTuples = stringRedisTemplate.opsForZSet()
            .reverseRangeByScoreWithScores(key, 0, max, offset, 3);
    if (typedTuples == null || typedTuples.isEmpty()) {
        return Result.ok();
    }

    // 解析数据
    List<Long> ids = new ArrayList<>(typedTuples.size());   // 指定长度
    long minTime = 0;
    int off = 1;
    for (ZSetOperations.TypedTuple<String> tuple : typedTuples) {
        // 获取blog_id
        String idStr = tuple.getValue();
        if (idStr != null) {
            ids.add(Long.valueOf(idStr));
        }
        // 获取分数(时间戳),更新到最后一个就是min了
        long time = tuple.getScore().longValue();
        if (time == minTime) {
            ++off;
        } else {
            minTime = time;
            off = 1;
        }
    }

    // 根据id查询blog
    // select * from t where id in (ids) order by field(id, ids)
    // 以ids的顺序返回结果集
    String idStr = StrUtil.join(",", ids);
    List<Blog> blogs = query()
            .in("id", ids)
            .last("ORDER BY FIELD(id," + idStr + ")")
            .list();
    blogs.forEach(blog -> {
        // 查询blog有关的用户
        queryBlogUser(blog);
        // 查询blog是否被点赞
        isBlogLiked(blog);
    });

    // 封装并返回最终结果
    ScrollResult r = new ScrollResult();
    r.setList(blogs);
    r.setOffset(off);
    r.setMinTime(minTime);

    return Result.ok(r);
}

附近商铺

GEO 数据结构

GEO 就是 Geolocation 的简写形式,代表地理坐标。Redis 在 3.2 版本中加入了对 GEO 的支持,允许存储地理坐标信息,帮助我们根据经纬度来检索数据。常见的命令有:

  • GEOADD:添加一个地理空间信息,包含:经度(longitude)、纬度(latitude)、值(member)。
  • GEODIST:计算指定两个点的距离并返回。
  • GEOHASH:将指定 member 的坐标转为 hash 字符串形式并返回。
  • GEOPOS:返回指定 member 的坐标。
  • GEORADIUS:指定圆心、半径,找到该圆内包含的所有 member,并按照与圆心之间的距离排序后返回。6.2以后已废弃。
  • GEOSEARCH:指定范围内搜索 member,并按照与指定点之间的距离排序后返回。范围可以是圆形或矩形。6.2 新功能。
  • GEOSEARCHSTORE:与 GEOSEARCH 功能一致,不过可以把结果存储到一个指定的 key。6.2 新功能。

在 Redis 底层,GEO 数据的坐标会被转换为一个数值存入 Redis 中,使用的是 SortedSet 这个数据结构存储。转换之后的数值就是 SortedSet 的 Score。

搜索附近商铺

@Override
public Result queryShopByType(Integer typeId, Integer current, Double x, Double y) {
    if (x == null || y == null) {
        // x与y为空,直接查数据库
        Page<Shop> page = query()
                .eq("type_id", typeId)
                .page(new Page<>(current, DEFAULT_PAGE_SIZE));
        return Result.ok(page.getRecords());
    }

    // 计算分页参数
    int from = (current - 1) * DEFAULT_PAGE_SIZE;
    int end = current * DEFAULT_PAGE_SIZE;

    // 使用redis做分页查询
    String key = SHOP_GEO_KEY + typeId;
    GeoResults<RedisGeoCommands.GeoLocation<String>> results = stringRedisTemplate.opsForGeo()
            .search(key,
                    GeoReference.fromCoordinate(x, y),
                    new Distance(5000),
                    RedisGeoCommands.GeoSearchCommandArgs.newGeoSearchArgs().includeDistance().limit(end)
            );
    if (results == null) {
        return Result.ok(Collections.emptyList());
    }
    List<GeoResult<RedisGeoCommands.GeoLocation<String>>> list = results.getContent();
    if (list.size() <= from) {
        // 没有下一页了,结束
        return Result.ok(Collections.emptyList());
    }

    // 解析数据
    List<Long> ids = new ArrayList<>(list.size());
    Map<String, Distance> distanceMap = new HashMap<>(list.size());
    list.stream().skip(from).forEach(result -> {
        // 获取店铺id
        String shopIdStr = result.getContent().getName();
        ids.add(Long.parseLong(shopIdStr));
        // 获取距离
        Distance distance = result.getDistance();
        distanceMap.put(shopIdStr, distance);
    });

    // 根据id查询shop
    String idStr = StrUtil.join(",", ids);
    List<Shop> shops = query().in("id", ids).last("ORDER BY FIELD(id, " + idStr + ")").list();

    // 把距离和店铺一一对应
    shops.forEach(shop ->
        shop.setDistance(distanceMap.get(shop.getId().toString()).getValue())
    );

    return Result.ok(shops);
}

用户签到

BitMap 用法

我们按月来统计用户签到信息,把每一个 bit 位对应当月的每一天,形成映射关系。签到记录为 1,未签到记录为 0。

Redis 中是利用 String 类型数据结构实现 BitMap,因此最大上限是 512M,BitMap 的操作命令有:

  • SETBIT:向指定位置(offset)存入一个 0 或 1。
  • GETBIT:获取指定位置(offset)的 bit 值。
  • BITCOUNT:统计 BitMap 中值为 1 的 bit 位的数量。
  • BITFIELD:操作(查询、修改、自增)BitMap 中 bit 数组中的指定位置(offset)的值。
  • BITFIELD_RO:获取 BitMap 中 bit 数组,并以十进制形式返回。
  • BITOP:多个 BitMap 的结果做位运算(与、或、异或)。
  • BITPOS:查找 bit 数组中指定范围内第一个 0 或 1 出现的位置。

签到功能

将用户当天签到信息保存到 Redis 中:

@Override
public Result sign() {
    // 获取当前用户
    Long userId = UserHolder.getUser().getId();
    // 获取日期
    LocalDateTime now = LocalDateTime.now();

    // 拼接key
    String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));
    String key = USER_SIGN_KEY + userId + keySuffix;

    // 获取今天是本月的第几天
    int dayOfMonth = now.getDayOfMonth();

    // 写入Redis
    stringRedisTemplate.opsForValue().setBit(key, dayOfMonth - 1, true);
    return Result.ok();
}

统计连续签到

@Override
public Result signCount() {
    // 获取当前用户
    Long userId = UserHolder.getUser().getId();
    // 获取日期
    LocalDateTime now = LocalDateTime.now();

    // 拼接key
    String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));
    String key = USER_SIGN_KEY + userId + keySuffix;

    // 获取今天是本月的第几天
    int dayOfMonth = now.getDayOfMonth();

    // 获取本月截至今天为止的所有签到记录
    List<Long> result = stringRedisTemplate.opsForValue().bitField(
            key,
            BitFieldSubCommands.create()
                    .get(BitFieldSubCommands.BitFieldType.unsigned(dayOfMonth))
                    .valueAt(0)
    );
    if (result == null || result.isEmpty()) {
        return Result.ok(0);
    }

    Long num = result.getFirst();
    if (num == null || num == 0) {
        return Result.ok(0);
    }

    // 最近连续签到次数
    int cnt = 0;
    while (true) {
        if ((num & 1) == 0) {
            break;
        } else {
            ++cnt;
        }
        num >>>= 1;
    }

    return Result.ok(cnt);
}

UV 统计

UV:Unique Visitor,独立访客量,是指通过互联网访问、浏览这个网页的自然人。1 天内同一个用户多次访问该网站,只记录 1 次。

PV:Page View,页面访问量或点击量,用户每访问网站的一个页面,记录 1 次 PV,用户多次打开页面,则记录多次 PV。往往用来衡量网站的流量。

HyperLogLog 用法

Hyperloglog(HLL)是从 Loglog 算法派生的概率算法,用于确定非常大的集合的基数,而不需要存储其所有值。Redis 中的 HLL 是基于 string 结构实现的,单个 HLL 的内存永远小于 16kb,内存占用低的令人发指!作为代价,其测量结果是概率性的,有小于 0.81% 的误差。不过对于 UV 统计来说,这完全可以忽略。

  • PFADD:添加统计用户。
  • PFCOUNT:返回指定 key 的流量,不一定是准确值。
  • PFMERGE:合并多个 key 的流量。

实现 UV 统计

@Test
void testHyperLogLog() {
    String[] values = new String[1000];
    int j = 0;
    for (int i = 0; i < 1000000; i++) {
         j = i % 1000;
        values[j] = "user_" + i;
        if (j == 999) {
            // 每隔1000条发一次数据
            stringRedisTemplate.opsForHyperLogLog().add("hl2", values);
        }
    }

    // 统计数量
    Long count = stringRedisTemplate.opsForHyperLogLog().size("hl2");
    System.out.println(count);
}

Redis 高级应用

Redis 的高级应用涉及到分布式系统内容,这里暂时不作拓展,日后再来补充。

Redis 原理总结

数据结构

动态字符串 SDS

我们都知道 Redis 中保存的 Key 是字符串,Value 往往是字符串或者字符串的集合。可见字符串是 Redis 中最常用的一种数据结构。

不过 Redis 没有直接使用 C 语言中的字符串,因为 C 语言字符串存在很多问题,例如获取字符串长度时需要通过运算(去掉计算结尾的 \0)、非二进制安全、不可修改等。

// c语言,声明字符串
char* s = "hello";

// 本质是字符数组 {'h', 'e', 'l', 'l', 'o', '\0'}

Redis 构建了一种新的字符串结构,称为简单动态字符串(Simple Dynamic String),简称 SDS。例如我们使用 Redis 命令 set name jack,实际上,在 Redis 底层会创建两个 SDS,一个是包含 name 的 SDS,另一个是包含 jack 的 SDS。

Redis 是 C 语言实现的,SDS 本质是一个结构体:

struct __attribute__ ((__packed__)) sdshdr8 {
    uint8_t len; /* buf已保存的字符串字节数,不包含结束标示 */
    uint8_t alloc; /* buf申请的总的字节数,不包含结束标示 */
    unsigned char flags; /* 不同SDS的头类型,用来控制SDS的头大小 */
    char buf[];	/* 底层的字符数组 */
}

// flags对应取值如下
#define SDS_TYPE_5  0
#define SDS_TYPE_8  1
#define SDS_TYPE_16 2
#define SDS_TYPE_32 3
#define SDS_TYPE_64 4

例如,一个包含字符串 name 的结构体 sds 如下:

struct __attribute__ ((__packed__)) sdshdr8 {
    uint8_t len = 4; 
    uint8_t alloc = 4; 
    unsigned char flags = 1; 
    char buf[] = {'n', 'a', 'm', 'e', '\0'};	
}

而 SDS 之所以叫做动态字符串,是因为它具备动态扩容的能力,追加字符串内容时,申请新空间的规则如下:

  • 如果新字符串小于 1M,则新空间为扩展后字符串长度的两倍 + 1。
  • 如果新字符串大于 1M,则新空间为扩展后字符串长度 + 1M + 1。称为内存预分配

所以,动态字符串 SDS 有如下优点:

  1. 获取字符串长度的时间复杂度为 O(1)。
  2. 支持动态扩容。
  3. 减少内存分配次数。
  4. 二进制安全。

IntSet

IntSet 是 Redis 中 set 集合的一种实现方式,基于整数数组来实现,并且具备长度可变、有序等特征。

结构如下:

typedef struct intset {
    uint32_t encoding;	// 编码方式,支持存放16、32、64位整数
    uint32_t length;	// 元素个数
    int8_t contents[];	// 整数数组,保存集合数据
} intset;

为了方便查找,Redis 会将 intset 中所有的整数按照升序依次保存在 contents 数组中。

IntSet 在添加元素的时候也是会做一些额外的操作:

首先,确定添加的数字的编码方式,如果添加进去的数字超过了当前 intset 的编码范围,那么 intset 可以进行自动升级,将编码方式调整成合适的大小(并且,数据在复制的时候是倒序复制的,防止新数据因为扩大大小而覆盖了旧数据)。

此外,在添加元素的时候,还会查找集合中是否存在当前插入的元素(因为集合数据是有序的,所以查找时使用的是二分查找),如果存在,就不会添加进去。新添加进去的数据会根据大小插入到指定位置(要插入的位置由二分查找确定),保证数据的有序性。

所以,IntSet 具备的特点如下:

  1. Redis 会确保 IntSet 中的元素唯一、有序。
  2. 具备类型升级机制,可以节省内存空间。
  3. 底层采用二分查找的方式来查询。

Dict

我们知道 Redis 是一个键值型(Key-Value Pair)的数据库,我们可以根据键实现快速的增删改查。而键与值的映射关系正是通过 Dict 来实现的。

Dict由三部分组成,分别是:哈希表(DictHashTable)、哈希节点(DictEntry)、字典(Dict)。

结构如下:

typedef struct dictht {
    // entry数组
    // 数组中保存的是指向entry的指针
    dictEntry **table;
    // 哈希表大小
    unsigned long size;
    // 哈希表大小的掩码,总等于size-1
    unsigned long sizemask;
    // entry个数,由于哈希冲突,used可能大于size
    unsigned long used;
} dictht;

typedef struct dictEntry {
    void *key;	// 键
    union {
        void *val;
        uint64_t u64;
        int64_t s64;
        double d;
    } v;	// 值
    struct dictEntry *next;    // 下一个Entry的指针,哈希冲突时使用
} dictEntry;

typedef struct dict {
    dictType *type;	// dict类型,内置不同的hash函数
    void *privdata;	// 私有数据,在做特殊hash运算时使用
    dictht ht[2];	// 一个Dict包含两个哈希表,其中一个是当前数据,另一个一般是空,rehash时使用
    long rehashidx;	// rehash的进度,-1表示未进行
    int16_t pauserehash;	// rehash是否暂停,1表示暂停,0表示继续
} dict;

当我们向 Dict 添加键值对时,Redis 首先根据 key 计算出 hash 值(h),然后利用 h&sizemask (等价于取余)来计算元素应该存储到数组中的哪个索引位置。

Dict 的渐进式 rehash

Dict 中的 HashTable 就是数组结合单向链表的实现,当集合中元素较多时,必然导致哈希冲突增多,链表过长,则查询效率会大大降低。

Dict 在每次新增键值对时都会检查负载因子(LoadFactor=used/size),满足以下两种情况时会触发哈希表扩容:

  1. 哈希表满足 LoadFactor >= 1,并且服务器没有执行 BGSAVE 或者 BGREWRITEAOF 等后台进程。
  2. 哈希表满足 LoadFactor > 5

Dict 除了扩容以外,每次删除元素时,也会对负载因子进行检查,当满足 LoadFactor < 0.1 时,会做哈希表收缩。

不管是扩容还是收缩,必定会创建新的哈希表,导致哈希表的 size 和 sizemask 变化,而 key 的查询与 sizemask 有关。因此必须对哈希表中的每一个 key 重新计算索引,插入新的哈希表,这个过程称为 rehash。又因为考虑到数百万数据的哈希表一次性进行 rehash 所消耗的时间过多,极有可能造成主线程阻塞。因此,Dict 的 rehash 是分多次的,渐进式完成,过程是这样的:

  1. 计算新 hash 表的 realeSize,值取决于当前要做的是扩容还是收缩:
    1. 如果是扩容,则新 size 为第一个大于等于 dict.ht[0].used+1 的 $2^n$。
    2. 如果是收缩,则新 size 为第一个大于等于 dict.ht[0].used 的 $2^n$(不得小于 4 )。
  2. 按照新的 realeSize 申请内存空间,创建 dictht,并赋值给 dict.ht[1]
  3. 设置 dict.rehashidx = 0,标示开始 rehash。
  4. 每次执行新增、查询、修改、删除操作时,都检查一下 dict.rehashidx 是否大于 -1,如果是则将dict.ht[0].table[rehashidx] 的 entry 链表 rehash 到 dict.ht[1],并且将 rehashidx++。直至dict.ht[0] 的所有数据都 rehash 到 dict.ht[1]
  5. dict.ht[1] 赋值给 dict.ht[0],给 dict.ht[1] 初始化为空哈希表,释放原来的 dict.ht[0] 的内存。
  6. 将 rehashidx 赋值为 -1,代表 rehash 结束。
  7. 在 rehash 过程中,新增操作,则直接写入 ht[1],查询、修改和删除则会在 dict.ht[0]dict.ht[1] 依次查找并执行。这样可以确保 ht[0] 的数据只减不增,随着 rehash 最终为空。

ZipList

ZipList 是一种特殊的 “双端链表”,由一系列特殊编码的连续内存块组成。可以在任意一端进行压入 / 弹出操作,并且该操作的时间复杂度为 O(1)。

其结构为:

  • zlbytes:记录整个压缩列表占用的内存字节数。
  • zltail:记录压缩列表表尾节点距离压缩列表的起始地址有多少字节,通过这个偏移量,可以确定表尾节点的地址。
  • zllen:记录了压缩列表包含的节点数量。
  • entry:压缩列表的各个节点,节点的长度由节点保存的内容决定。
  • zlend:特殊值 0xff,用于标记压缩列表的末端。

ZipList 中的 Entry 并不像普通链表那样记录前后节点的指针,因为记录两个指针要占用 16 个字节,浪费内存。而是采用了下面的结构:

  • previous_entry_length:前一节点的长度,占 1 个或 5 个字节。(为了方便倒序遍历)
    • 如果前一节点的长度小于 254 字节,则采用 1 个字节来保存这个长度值。
    • 如果前一节点的长度大于 254 字节,则采用 5 个字节来保存这个长度值,第一个字节为 0xfe,后四个字节才是真实长度数据。
  • encoding:编码属性,记录 content 的数据类型(字符串还是整数)以及长度,占用 1 个、2 个或 5 个字节。
  • contents:保存节点的数据,可以是字符串或整数。

注意:ZipList 中所有存储长度的数值均采用小端字节序,即低位字节在前,高位字节在后。

QuickList

ZipList 虽然节省内存,但申请内存必须是连续空间,如果内存占用较多,申请内存效率很低。怎么办?为了缓解这个问题,我们必须限制 ZipList 的长度和 entry 的大小。

但是我们要存储大量数据的时候,超出了 ZipList 的上限该怎么办?一个解决方法是,创建多个 ZipList 来分片存储数据。所以,QuickList 就应运而生了。QuickList 是一个双端链表,只不过链表中的每一个节点指向的都是一个 ZipList。

为了避免 QuickList 中的每个 ZipList 中 entry 过多,Redis提供了一个配置项:list-max-ziplist-size 来限制。

  • 如果值为正,则代表 ZipList 的允许的 entry 个数的最大值。
  • 如果值为负,则代表 ZipList 的最大内存大小,分 5 种情况:
    • -1:每个 ZipList 的内存占用不超过 4kb。
    • -2:每个 ZipList 的内存占用不超过 8kb。(默认值)
    • -3:每个 ZipList 的内存占用不超过 16kb。
    • -4:每个 ZipList 的内存占用不超过 32kb。
    • -5:每个 ZipList 的内存占用不超过 64kb。

除了控制 ZipList 的大小,QuickList 还可以对节点的 ZipList 做压缩。通过配置项 list-compress-depth 来控制。因为链表一般都是从首尾访问较多,所以首尾是不压缩的。这个参数是控制首尾不压缩的节点个数:

  • 0:特殊值,代表不压缩。(默认值)
  • 1:表示 QuickList 的首尾各有 1 个节点不压缩,中间节点压缩。
  • 2:表示 QuickList 的首尾各有 2 个节点不压缩,中间节点压缩。
  • 以此类推。

SkipList

SkipList 首先是链表,但与传统链表相比有几点差异:

  • 元素按照升序排列存储。
  • 节点可能包含多个指针,指针跨度不同。(每隔 5 个节点进行链接,然后再每隔 10 个节点再进行一次链接,这样在查找的时候就可以跳着找)

SkipList 的特点:

  • 跳跃表是一个双响链表,每个节点都包含 score 和 ele 值
  • 节点按照 score 值排序,score 值一样则按照 ele 字典排序。
  • 每个节点都可以包含多层指针,层数是 1 到 32 之间的随机数。
  • 不同层指针到下一个节点的跨度不同,层级越高,跨度越大。
  • 增删改查效率与红黑树基本一致,实现却更简单。

RedisObject

Redis 中的任意数据类型的键和值都会被封装为一个 RedisObject,也叫做 Redis 对象:

typedef struct redisObject {
    unsigned type:4;	// 对象类型,分别是string、hash、list、set和zset,占4个比特位
    unsigned encoding:4;	// 底层编码方式,共有11种
    unsigned lru:LRU_BITS;	// LRU_BITS为24,记录本对象最近被访问是何时,用于内存回收
    int refcount;	// 对象被引用计数器,计数器为0表示可以被回收
    void* ptr;		// 被封装的各种数据结构类型
} robj;

五种数据结构

String

String 是 Redis 中常见的数据存储类型:

  • 其编码方式是 RAW,基于简单动态字符串(SDS)实现,存储上限为 512MB。
  • 如果存储的 SDS 长度小于 44 字节,则会采用 EMBSTR 编码,此时 object head 与 SDS 是一段连续空间。申请内存时只需要调用一次内存分配函数、效率更高。(并且此时的 RedisObject 头信息与 redisObject.ptr 所连接的 SDS 加起来大小差不多在 64 字节以内,Redis 内存分配时是以 2 的 n 次方分配的,这样可以尽可能少的产生内存碎片)
  • 如果存储的字符串是整数值,并且大小在 LONG_MAX 范围内,则会采用 INT 编码:直接将数据保存在RedisObject 的 ptr 指针位置(刚好 8 字节),不再需要 SDS 了。

所以在这里我们推荐:在使用 Redis 的 String 类型时,尽可能避免字符串占用字节大小超过 44 字节,否则转换为 RAW 编码方式后,连续存储会转换为链式存储,且分配空间时会产生内存碎片。且能用数值格式的,尽可能用数值格式。

List

Redis 的 List 结构类似一个双端链表,可以从首、尾操作列表中的元素:

  • 在 3.2 版本之前,Redis 采用 ZipList 和 LinkedList 来实现 List,当元素数量小于 512 并且元素大小小于 64 字节时采用 ZipList 编码,超过则采用 LinkedList 编码。
  • 在 3.2 版本之后,Redis 统一采用 QuickList 来实现 List。
Set

Set 是 Redis 中的集合,不一定确保元素有序,可以满足元素唯一、查询效率要求极高:

  • 为了查询效率和唯一性,set 采用 HT 编码(Dict)。Dict 中的 key 用来存储元素,Value 统一为 null。
  • 当存储的所有数据都是整数,并且元素数量不超过 set-max-intset-entries 时,Set 会采用 IntSet 编码,以节省内存。如果之后插入的数据不是整数,则会转为 HT 编码。
ZSet

ZSet 也就是 SotredSet,其中每一个元素都需要指定 score 值和 member 值。考虑到 Zet 需要满足键值存储、键必须唯一、可排序这几个要求。而 SkipList 虽然可以排序,也可以进行键值存储,但是其根据 member 查询分数时效率太慢;而 Dict 虽然可以键值存储,也可以快速根据 key 找 value,但是其无法保证有序。

所以,为了实现 ZSet 这样的数据结构,Redis 便把 SkipList 和 Dict 结合起来,相互弥补各自的缺点:

在 ZSet 中,数据存两份,一份存在 Dict 中,一份存在 SkipList 中,需要根据 key 找 value 时,去 Dict 找,需要排序进行范围查询时,去 SkipList 中找。

虽然上述的实现方式对于 ZSet 来讲可以满足要求,并且各方面效率都挺高的,但是缺点就是太占用内存空间了,对此,ZSet 还有第二种实现方式。当元素数量不多时,HT 和 SkipList 的优势不明显,而且更耗内存。因此 ZSet 还会采用 ZipList 结构来节省内存,不过需要同时满足两个条件:

  1. 元素数量小于 zset_max_ziplist_entries,默认值 128。
  2. 每个元素都小于 zset_max_ziplist_value 字节,默认值 64。

但 ZipList 本身没有排序功能,而且没有键值对的概念,因此需要有 ZSet 通过编码实现:

  • ZipList 是连续内存,因此 score 和 element 是紧挨在一起的两个 entry,element 在前,score 在后。
  • score 越小越接近队首,score 越大越接近队尾,按照 score 值升序排列。
Hash

Hash 底层默认采用 ZipList 编码,用以节省内存。ZipList 中的两个 entry 分别保存 field 和 value。

当数据量较大时,Hash 结构会转为 HT 编码,也就是 Dict,触发条件有两个:

  1. ZipList 中的元素数量超过了 hash-max-ziplist-entries(默认512)。
  2. ZipList 中的任意 entry 大小超过了 hash-max-ziplist-value(默认64字节)。

网络模型

用户空间和内核空间

服务器大多都采用 Linux 系统,这里我们以 Linux 为例来讲解:

任何 Linux 发行版,其系统内核都是 Linux。我们的应用都需要通过 Linux 内核与硬件交互。为了避免用户应用导致冲突甚至内核崩溃,用户应用与内核是分离的:

  • 进程的寻址空间会划分为两部分:内核空间、用户空间。当进程运行在内核空间时,我们就称为内核态;当进程运行在用户空间时,我们就称为用户态。
  • 用户空间只能执行受限的命令(Ring3),而且不能直接调用系统资源,必须通过内核提供的接口来访问。
  • 内核空间可以执行特权命令(Ring0),调用一切系统资源。

以 IO 系统来举例,Linux 系统为了提高 IO 效率,会在用户空间和内核空间都加入缓冲区:

  • 写数据时,要把用户缓冲数据拷贝到内核缓冲区,然后写入设备。
  • 读数据时,要从设备读取数据到内核缓冲区,然后拷贝到用户缓冲区。

示意图如下:

Linux 对于 IO 模型的区别,主要是在于等待数据和读取数据这几步操作,在 《 UNIX 网络编程 》 中,总结归纳了 5 中 IO 模型:

  • 阻塞 IO(Blocking IO)。
  • 非阻塞 IO(Nonblocking IO)。
  • IO 多路复用(IO Multiplexing)。
  • 信号驱动 IO(Signal Driven IO)。
  • 异步 IO(Asynchronous IO)。

接下来,我们将逐步介绍这些 IO 模型。

阻塞 IO

顾名思义,阻塞 IO 就是两个阶段都必须阻塞等待:

可以看到,阻塞 IO 模型中,用户进程在两个阶段都是阻塞状态。

非阻塞 IO

顾名思义,非阻塞 IO 的 recvfrom 操作会立即返回结果而不是阻塞用户进程:

非阻塞 IO 模型下,用户进程在等待数据时并不是阻塞状态,而是不断轮询;而在从内核拷贝数据到用户空间这一步骤中,用户进程就变为阻塞状态了。

从某种程度上讲,非阻塞 IO 在性能上有时不如阻塞 IO。但是,在某些场景下,我们却不得不使用非阻塞 IO 模型,才能有更好的性能表现……

IO 多路复用

无论是阻塞 IO 还是非阻塞 IO,用户应用在一阶段都需要调用 recvfrom 来获取数据,差别在于无数据时的处理方案:

  • 如果调用 recvfrom 时,恰好没有数据,阻塞 IO 会使进程阻塞,非阻塞 IO 使 CPU 空转,都不能充分发挥 CPU 的作用。
  • 如果调用 recvfrom 时,恰好有数据,则用户进程可以直接进入第二阶段,读取并处理数据。

比如服务端处理客户端 Socket 请求时,在单线程情况下,只能依次处理每一个 socket,如果正在处理的 socket 恰好未就绪(数据不可读或不可写),线程就会被阻塞,所有其它客户端 socket 都必须等待,性能自然会很差。

解决方案有两种:

  1. 多线程,使用多个线程去处理各个 socket,但是这样一来,上下文切换也是很会消耗资源的。
  2. 另一种解决方案就是,只启用一个线程,监听所有 socket(NIO 中的 Selector),服务端优先处理已经处理好数据的 socket。

Linux 采用的是上述第二种解决方案。在 Linux 中,一切皆文件,Linux 中每一个文件都被文件描述符(File Descriptor,FD)关联着。IO 多路复用便是利用单线程同时监听多个 FD,并在某个 FD 可读、可写时得到通知,从而避免无效的等待,充分利用 CPU 资源。

监听 FD 的方式、通知的方式又有多种,常见的有:select、poll、epoll。差异:select 和 poll 只会通知用户进程有 FD 就绪,但不确定具体是哪个 FD,需要用户进程逐个遍历 FD 来确认;epoll 则会在通知用户进程 FD 就绪的同时,把已就绪的 FD 写入用户空间。

select

select 是 Linux 中最早的 IO 多路复用实现方案:

typedef long int __fd_mask;

// fd_set是要监听的fd集合
typedef struct {
    // fd_bits是long类型数组,长度为1024/32=32
    // 共1024个bit位,每个bit位代表一个fd,0表示未就绪,1代表就绪
    __fd_mask fds_bis[__FD_SETSIZE / __NFDBITS];
} fd_set;

int select(
    int nfds;	// 要监视的fd_set的最大fd+1
    fd_set *readfds,	// 要监听读时间的fd集合
    fd_set *writefds,	// 要监听写时间的fd集合
    fd_set *exceptfds,	// 要监听异常事件的fd集合
    // 超时时间,null-永不超时,0-不阻塞等待;大于0-固定等待时间
    struct timeval *timeout;
);

select 模式存在的问题:

  • 需要将整个 fd_set 从用户空间拷贝到内核空间,select 结束还要再次拷贝回用户空间。
  • select 无法得知具体是哪个 fd 就绪,需要遍历整个 fd_set
  • fd_set 监听的 fd 数量不能超过 1024。
poll

poll 模式对 select 模式做了简单改进,但性能提升不明显,部分关键代码如下:

// pollid中的事件类型
#define POLLIN		// 可读事件
#define POLLOUT		// 可写事件
#define POLLERR		// 错误事件
#define POLLNVAL	// fd未打开

// pollfd结构
struct pollfd {
    int fd;		// 要监听的fd
    short int events;	// 要监听的事件类型:读、写、异常
    short int revents;	// 实际发生的事件类型
};

// poll函数
int poll(
	struct pollfd *fds,	// pollfd数组,可以自定义大小
    nfds_t nfds,		// 数组元素个数
    int timeout			// 超时时间
);

IO 流程:

  1. 创建 pollfd 数组,向其中添加关注的 fd 信息,数组大小自定义。
  2. 调用 poll 函数,将 pollfd 数组拷贝到内核空间,转链表存储,无上限。
  3. 内核遍历 fd,判断是否就绪。
  4. 数据就绪或超时后,拷贝 pollfd 数组到用户空间,返回就绪 fd 数量 n。
  5. 用户进程判断 n 是否大于 0。
  6. 大于 0 则遍历 pollfd 数组,找到就绪的 fd。

与 select 对比:

  • select 模式中的 fd_set 大小固定为 1024,而 pollfd 在内核中采用链表,理论上无上限。
  • 监听 FD 越多,每次遍历消耗时间也就越久,性能反而会下降。
epoll

epoll 模式是对 select 和 poll 的改进,它提供了三个函数:

struct eventpoll {
    //...
    struct rb_root rbr;	// 一棵红黑树,记录要监听的FD
    struct list_head rdlist;	// 一个链表,记录就绪的FD
    //...
};

// 1.会在内核创建eventpoll结构体,返回对应的句柄epfd
int epoll_create(int size);

// 2.将一个FD添加到epoll的红黑树中,并设置ep_poll_callback
// callback会在socket就绪时触发,触发时,就把对应的FD加入到rdlist这个就绪列表中
int epoll_ctl(
    int epfd,	// epoll实例的句柄
    int op,		// 要执行的操作,包括:ADD、MOD、DEL
    int fd,		// 要监听的FD
    struct epoll_event *event	// 要监听的事件类型:读、写、异常等
);

// 3.检查rdlist列表是否为空,不为空则返回就绪的FD数量
int epoll_wait(
	int epfd,	// eventpoll实例的句柄
    struct epoll_event *events,	// 空event数组,用于在用户空间中接收就绪的FD
	int maxevents,		// events数组的最大长度
    int timeout		// 超时时间,-1永不超时,0不阻塞,大于0为阻塞时间
);

select 模式存在的三个问题:

  1. 能监听的 FD 最大不超过 1024。
  2. 每次 select 都需要把所有要监听的 FD 都拷贝到内核空间。
  3. 每次都要遍历所有 FD 来判断就绪状态。

poll 模式存在的问题:poll 利用链表解决了 select 中监听 FD 上限的问题,但依然要遍历所有 FD,如果监听较多,性能会下降。

epoll 模式对上述问题的解决方案:

  • 基于 epoll 实例中的红黑树保存要监听的 FD,理论上无上限,而且增删改查效率都非常高,性能不会随监听的 FD 数量增多而下降。
  • 每个 FD 只需要执行一次 epoll_ctl 添加到红黑树,以后每次 epoll_wait 无需传递任何参数,无需重复拷贝 FD 到内核空间。
  • 内核会将就绪的 FD 直接拷贝到用户空间的指定位置,用户进程无需遍历所有 FD 就能知道就绪的 FD 是谁。
epoll 事件通知机制

当 FD 有数据可读时,我们调用 epoll_wait 就可以得到通知。但是事件通知的模式有两种:

  • LevelTriggered:简称 LT。当 FD 有数据可读时,会重复通知多次,直至数据处理完成。是 Epoll 的默认模式。
  • EdgeTriggered:简称 ET。当 FD 有数据可读时,只会被通知一次,不管数据是否处理完成。

ET 模式可以避免 LT 模式可能出现的惊群现象,但是 ET 模式可能碰到无法一次性把数据读完的问题,所以最好结合非阻塞 IO 读取 FD 数据,但是这样相比 LT 会更加复杂一些。

epoll 服务流程

信号驱动 IO

信号驱动 IO 是与内核建立 SIGIO 的信号关联并设置回调,当内核有 FD 就绪时,会发出 SIGIO 信号通知用户,期间用户应用可以执行其它业务,无需阻塞等待:

当有大量 IO 操作时,信号较多,SIGIO 处理函数不能及时处理可能导致信号队列溢出,部分信号丢失。而且内核空间与用户空间频繁的信号交互也会导致性能下降。

异步 IO

上述的阻塞 IO、非阻塞 IO、IO 多路复用均为同步(调用和获取结果都是同一个线程)。

异步 IO 的整个过程都是非阻塞的(不存在异步阻塞,异步阻塞这种说法是错误的),用户进程调用完异步 API 后就可以去做其它事情,内核等待数据就绪并拷贝到用户空间后才会递交信号,通知用户进程:

但是,在高并发情况下,异步 IO 因为其异步的性质,需要大量线程异步执行数据的处理,也是很消耗性能的。

Redis 网络模型

Redis 到底是单线程还是多线程?

  • 如果是指 Redis 的核心业务部分(命令处理),那就是单线程。
  • 如果是指整个 Redis 业务,那便是多线程。

在 Redis 版本迭代过程中,在两个重要的时间节点上引入了多线程的支持:

  • Redis v4.0:引入多线程异步处理一些耗时较长的任务,例如异步删除命令 unlink。
  • Redis v6.0:在核心网络模型中引入多线程,进一步提高对于多核 CPU 的利用率。

为什么 Redis 要选择单线程?

  • 抛开持久化不谈,Redis 是纯内存操作,执行速度非常快,它的性能瓶颈是网络延迟而不是执行速度,因此多线程并不会带来巨大的性能提升。
  • 多线程会导致过多的上下文切换,带来不必要的开销。
  • 引入多线程会面临线程安全问题,必然要引入线程锁这样的安全手段,实现复杂度增高,而且性能也会大打折扣。

Redis 旧版本(v6.0 之前),通过 IO 多路复用来提高网络性能,并且支持各种不同的多路复用实现,并且将这些实现进行封装,提供了统一的高性能事件库 API 库 AE。

Redis 单线程网络模型的整个流程:

Redis v6.0 版本引入了多线程,目的是为了提高 IO 读写效率。因此在解析客户端命令、写响应结果时采用了多线程。核心的命令执行、IO 多路复用模块依然由主线程执行。

通信协议

RESP 协议

Redis 是一个 CS 架构的软件,通信一般分两步(不包括 pipeline 和 PubSub):

  1. 客户端(client)向服务端(server)发送一条命令。
  2. 服务端解析并执行命令,返回响应结果给客户端。

因此客户端发送命令的格式、服务端响应结果的格式必须有一个规范,这个规范就是通信协议。

而在 Redis 中采用的是 RESP(Redis Serialization Protocol)协议:

  • Redis v1.2 版本引入了 RESP 协议。
  • Redis v2.0 版本中成为与 Redis 服务端通信的标准,称为 RESP2。
  • Redis v6.0 版本中,从 RESP2 升级到了 RESP3 协议,增加了更多数据类型并支持 6.0 的新特性——客户端缓存。

在 RESP 中,通过首字节的字符来区分不同数据类型,常用的数据类型包括 5 种:

  • 单行字符串:首字节是 +,后面跟上单行字符串,以 CRLF("\r\n")结尾。例如返回 "OK",实际上就是返回 "+OK\r\n"
  • 错误(Error):首字节是 -,与单行字符串格式一样,只是字符串是异常信息,例如:"-Error message\r\n"
  • 数值:首字节是 :,后面跟上数字格式的字符串,以 CRLF 结尾,例如:":10\r\n"
  • 多行字符串:首字节是 $,表示二进制安全的字符串,最大支持 512MB。在字符串最开始,会记录字符串占用的字节大小,例如:"$5\r\nhello\r\n"。如果大小为 0,代表空字符串:"$0\r\n\r\n";如果大小为 -1,则代表不存在:"$-1\r\n"
  • 数组:首字节是 *,头部跟上数组元素个数,再跟上元素,元素类型不限。例如:"*2\r\n$3\r\nset\r\n$4\r\nname\r\n"

内存策略

过期 key 处理

Redis 之所以性能强,最主要的原因就是基于内存存储。然而单节点的 Redis 其内存大小不宜过大,会影响持久化或主从同步性能。

Redis 提供了两种策略,来实现对过期 key 的处理:过期策略、淘汰策略。

过期策略

Redis 在使用时,可以使用 expire 命令来给 Redis 的 key 设置 TTL,当 TTL 到期后,对应的内存便会释放,从而起到内存回收的目的。

上述是我们基于开发的角度来看的,而我们现在要来思考其中的原理:

  1. Redis 是如何知道一个 key 是否过期呢?
  2. 且对于过期的键,是不是 TTL 一到期就立即删除了呢?

Redis 本身是一个典型的 key-value 内存存储数据库,因此所有的 key、value 都保存在之前学习过的 Dict 结构中。不过在其 database 结构体中,有两个 Dict:一个用来记录 key-value;另一个用来记录 key-TTL。所以,利用这两个 Dict,就可以让 Redis 知道哪些 key 是过期的了。

而针对第二个问题,如果我们直接暴力轮询所有的 key,观察其是否到达 TTL 然后进行必要的删除的话,这将会十分消耗 CPU 资源。所以,Redis 对于 TTL 过期的处理,采用的是惰性删除,或者是周期删除

  • 惰性删除:当我们访问一个 key 的时候(读写操作),都需要事先检查 key 的存活时间,如果已经过期,则执行删除操作。
  • 周期删除:周期删除是通过一个定时任务,周期性地抽取部分过期的 key,然后执行删除。执行周期有两种:
    • Redis 会设置一个定时任务 serverCron(),按照 server.hz 的频率来执行过期 key 清理,模式为 SLOW。SLOW 模式规则如下:
      • 执行频率受到 server.hz 影响,默认为 10,即每秒执行 10 次,每隔执行周期 100ms。
      • 执行清理耗时不超过一次执行周期的 25%。
      • 逐个遍历 db,逐个遍历 db 中的 bucket,抽取 20 个 key 判断是否过期。
      • 如果没达到时间上限(25ms)并且过期 key 比例大于 10%,再进行一次抽样,否则结束。
    • Redis 的每隔事件循环前会调用 beforeSleep() 函数,执行过期 key 清理,模式为 FAST。FAST 模式规则如下(过期 key 比例小于 10% 不执行):
      • 执行频率受 beforeSleep() 调用频率影响,但两次 FAST 模式间隔不低于 2ms。
      • 执行清理耗时不超过 1ms。
      • 逐个遍历 db,逐个遍历 db 中的 bucket,抽取 20 个 key 判断是否过期。
      • 如果没达到时间上限(1ms)并且过期 key 比例大于 10%,再进行一次抽样,否则结束。
淘汰策略

内存淘汰:就是当 Redis 内存使用达到设置的阈值时,Redis 主动挑选部分 key 删除以释放更多内存的流程。

Redis 针对于内存空间的检查十分朴素:只要客户端访问了 Redis 服务端,Redis 都会去检查内存空间是否够用。只要不够,就会触发淘汰策略。Redis 会在处理客户端命令的方法 processCommand() 中尝试做内存淘汰。

Redis 支持 8 种不同策略来选择要删除的 key:

  1. noeviction:淘汰任何 key,但是内存满时不允许写入新数据,默认就是这种策略
  2. volatile-ttl:对设置了 TTL 的 key,比较 key 的剩余 TTL 值,TTL 越小越先被淘汰。
  3. allkeys-random:对全体 key,随机进行淘汰。也就是直接从 db->dict 中随机挑选。
  4. volatile-random:对设置了 TTL 的 key,随机进行淘汰。也就是从 db->expires 中随机挑选。
  5. allkeys-lru:对全体 key,基于 LRU 算法进行淘汰。
  6. volatile-lfu:对设置了 TTL 的 key,基于 LRU 算法进行淘汰。
  7. allkeys-lfu:对全体key,基于 LFU 算法进行淘汰。
  8. volatile-lfu:对设置了 TTL 的 key,基于 LFU 算法进行淘汰。

比较容易混淆的有两个:

  • LRU(Least Recently Used),最近最少使用。用当前事件减去最后一次访问时间,这个值越大则淘汰优先级越高。
  • LFU(Least Frequently Used),最少频率使用。会统计每个 key 的访问频率,值越小淘汰优先级越高。
  • 以上的访问时间和访问频率会存储在 RedisObject 的成员属性中。


文章作者: 热心市民灰灰
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 热心市民灰灰 !
  目录