4

Redis 入门学习笔记

 1 year ago
source link: https://lemon-cs.github.io/2020/05/05/%E4%B8%AD%E9%97%B4%E4%BB%B6/Redis/Redis%E5%85%A5%E9%97%A8%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B0/#10-1-%E4%B8%BA%E4%BB%80%E4%B9%88%E9%9C%80%E8%A6%81Scan%EF%BC%9F
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Lemon-CS

Redis 入门学习笔记

发表于 2020-05-05|更新于 2021-12-31|中间件
字数总计:51.9k|阅读时长:194 分钟 | 阅读量:11

Redis 学习笔记

简单来说 redis 就是一个数据库,不过与传统数据库不同的是 redis 的数据是存在内存中的,所以读写速度非常快,因此 redis 被广泛应用于缓存方向。另外,redis 也经常用来做分布式锁。redis 提供了多种数据类型来支持不同的业务场景。除此之外,redis 支持事务 、持久化、LUA 脚本、LRU 驱动事件、多种集群方案。

1.1-Redis 简介

  • Redis 全称是 Remote Dictionary Server
  • Redis 是一种基于键值对 (key-value) 的 NoSQL 数据库,Redis 的值可以是由 string (字符串)、hash (哈希)、list (列表)、set (集合)、zset (有序集合)、Bitmaps (位图)、HyperLogLog、GEO (地理信息定位) 等多种数据结构和算法组成;
  • Redis 会将所有数据都存放在内存中,所以它的读写性能非常惊人;
  • Redis 还可以将内存的数据利用快照和日志的形式保存到硬盘上,这样发生类似断电或者机器故障的时候,内存中的数据不会” 丢失”;
  • Redis 提供了键过期、发布订阅、事务、流水线、Lua 脚本等附加功能。

1.2-Redis 特性

  • 速度快 (读写性能 10 万 / 秒)
    • Redis 的所有数据都是存放在内存中
    • Redis 是用 C 语言实现的
    • Redis 使用了单线程架构,预防了多线程可能产生的竞争问题
  • 基于键值对的数据结构服务器
  • 丰富的功能 (键过期、发布订阅实现消息系统)
  • 客户端语言多 (支持 Redis 的客户端语言非常多,Java、PHP、Python、C、C++、Nodejs 等)
  • 持久化 (Redis 提供了两种持久化方式:RDB 和 AOF)
  • 高可用和分布式

1.3 - 为什么使用 Redis

主要是从两个角度去考虑:性能和并发。当然,redis 还具备可以做分布式锁等其他功能,但是如果只是为了分布式锁这些其他功能,完全还有其他中间件 (如 zookpeer 等) 代替,并不是非要使用 redis。因此,这个问题主要从性能和并发两个角度去答。

1. 性能(缓存快速响应)

我们在碰到需要执行耗时特别久,且结果不频繁变动的 SQL,就特别适合将运行结果放入 Redis 中缓存。这样,后面的请求就去缓存中读取,使得请求能够迅速响应。

假如用户第一次访问数据库中的某些数据。这个过程会比较慢,因为是从硬盘上读取的。将该用户访问的数据存在数缓存中,这样下一次再访问这些数据的时候就可以直接从缓存中获取了。操作缓存就是直接操作内存,所以速度相当快。如果数据库中的对应数据改变的之后,同步改变缓存中相应的数据即可。

2. 并发(减少了数据库请求)

在大并发的情况下,所有的请求直接访问数据库,数据库会出现连接异常。这个时候,就需要使用 redis 做一个缓冲操作,让请求先访问到 redis,而不是直接访问数据库。

直接操作缓存能够承受的请求是远远大于直接访问数据库的,所以我们可以考虑把数据库中的部分数据转移到缓存中去,这样用户的一部分请求会直接到缓存这里而不用经过数据库。

1.4 - 为什么要用 redis 而不用 map/guava 做缓存?

缓存分为本地缓存和分布式缓存。以 Java 为例,使用自带的 map 或者 guava 实现的是本地缓存,最主要的特点是轻量以及快速,生命周期随着 jvm 的销毁而结束,并且在多实例的情况下,每个实例都需要各自保存一份缓存,缓存不具有一致性。

使用 redis 或 memcached 之类的称为分布式缓存,在多实例的情况下,各实例共用一份缓存数据,缓存具有一致性。缺点是需要保持 redis 或 memcached 服务的高可用,整个程序架构上较为复杂。

1.5-Redis 是如何执行的?

1. 命令执行流程

一条命令的执行过程有很多细节,但大体可分为:客户端先将用户输入的命令,转化为 Redis 相关的通讯协议,再用 socket 连接的方式将内容发送给服务器端,服务器端在接收到相关内容之后,先将内容转化为具体的执行命令,再判断用户授权信息和其他相关信息,当验证通过之后会执行最终命令,命令执行完之后,会进行相关的信息记录和数据统计,然后再把执行结果发送给客户端,这样一条命令的执行流程就结束了。如果是集群模式的话,主节点还会将命令同步至子节点,下面我们一起来看更加具体的执行流程。

  • 步骤一:用户输入一条命令

  • 步骤二:客户端先将命令转换成 Redis 协议,然后再通过 socket 连接发送给服务器端

    客户端和服务器端是基于 socket 通信的,服务器端在初始化时会创建了一个 socket 监听,用于监测链接客户端的 socket 链接,源码如下:

    void initServer(void) {
    //......
    // 开启 Socket 事件监听
    if (server.port != 0 &&
    listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
    exit(1);
    //......
    }

    socket 小知识:每个 socket 被创建后,会分配两个缓冲区,输入缓冲区和输出缓冲区。 写入函数并不会立即向网络中传输数据,而是先将数据写入缓冲区中,再由 TCP 协议将数据从缓冲区发送到目标机器。一旦将数据写入到缓冲区,函数就可以成功返回,不管它们有没有到达目标机器,也不管它们何时被发送到网络,这些都是 TCP 协议负责的事情。 注意:数据有可能刚被写入缓冲区就发送到网络,也可能在缓冲区中不断积压,多次写入的数据被一次性发送到网络,这取决于当时的网络情况、当前线程是否空闲等诸多因素,不由程序员控制。 读取函数也是如此,它也是从输入缓冲区中读取数据,而不是直接从网络中读取。

    当 socket 成功连接之后,客户端会先把命令转换成 Redis 通讯协议(RESP 协议,REdis Serialization Protocol)发送给服务器端,这个通信协议是为了保障服务器能最快速的理解命令的含义而制定的,如果没有这个通讯协议,那么 Redis 服务器端要遍历所有的空格以确认此条命令的含义,这样会加大服务器的运算量,而直接发送通讯协议,相当于把服务器端的解析工作交给了每一个客户端,这样会很大程度的提高 Redis 的运行速度。例如,当我们输入 set key val 命令时,客户端会把这个命令转换为 *3\r\n$3\r\nSET\r\n$4\r\nKEY\r\n$4\r\nVAL\r\n 协议发送给服务器端。 更多通讯协议,可访问官方文档:https://redis.io/topics/protocol

  • 扩展知识:I/O 多路复用

    Redis 使用的是 I/O 多路复用功能来监听多 socket 链接的,这样就可以使用一个线程链接来处理多个请求,减少线程切换带来的开销,同时也避免了 I/O 阻塞操作,从而大大提高了 Redis 的运行效率。

    I/O 多路复用机制如下图所示:

    综合来说,此步骤的执行流程如下:

    • 与服务器端以 socket 和 I/O 多路复用的技术建立链接;
    • 将命令转换为 Redis 通讯协议,再将这些协议发送至缓冲区。
  • 步骤三:服务器端接收到命令

    服务器会先去输入缓冲中读取数据,然后判断数据的大小是否超过了系统设置的值 (默认是 1GB),如果大于此值就会返回错误信息,并关闭客户端连接。 默认大小如下图所示:

    当数据大小验证通过之后,服务器端会对输入缓冲区中的请求命令进行分析,提取命令请求中包含的命令参数,存储在 client 对象 (服务器端会为每个链接创建一个 Client 对象) 的属性中。

  • 步骤四:执行前准备

    ① 判断是否为退出命令,如果是则直接返回;

    ② 非 null 判断,检查 client 对象是否为 null,如果是返回错误信息;

    ③ 获取执行命令,根据 client 对象存储的属性信息去 redisCommand 结构中查询执行命令;

    ④ 用户权限效验,未通过身份验证的客户端只能执行 AUTH (授权) 命令,未通过身份验证的客户端执行了 AUTH 之外的命令则返回错误信息;

    ⑤ 集群相关操作,如果是集群模式,把命令重定向到目标节点,如果是 master (主节点) 则不需要重定向;

    ⑥ 检查服务器端最大内存限制,如果服务器端开启了最大内存限制,会先检查内存大小,如果内存超过了最大值会对内存进行回收操作;

    ⑦ 持久化检测,检查服务器是否开启了持久化和持久化出错停止写入配置,如果开启了此配置并且有持久化失败的情况,禁止执行写命令;

    ⑧ 集群模式最少从节点 (slave) 验证,如果是集群模式并且配置了 replminslavestowrite (最小从节点写入),当从节点的数量少于配置项时,禁止执行写命令;

    ⑨ 只读从节点验证,当此服务器为只读从节点时,只接受 master 的写命令;

    ⑩ 客户端订阅判断,当客户端正在订阅频道时,只会执行部分命令(只会执行 SUBSCRIBE、PSUBSCRIBE、UNSUBSCRIBE、PUNSUBSCRIBE,其他命令都会被拒绝)。

    ⑪ 从节点状态效验,当服务器为 slave 并且没有连接 master 时,只会执行状态查询相关的命令,如 info 等;

    ⑫ 服务器初始化效验,当服务器正在启动时,只会执行 loading 标志的命令,其他的命令都会被拒绝;

    ⑬ lua 脚本阻塞效验,当服务器因为执行 lua 脚本阻塞时,只会执行部分命令;

    ⑭ 事务命令效验,如果执行的是事务命令,则开启事务把命令放入等待队列;

    ⑮ 监视器 (monitor) 判断,如果服务器打开了监视器功能,那么服务器也会把执行命令和相关参数发送给监视器 (监视器是用于监控服务器运行状态的)。

    当服务器经过以上操作之后,就可以执行真正的操作命令了。

  • 步骤五:执行最终命令,调用 redisCommand 中的 proc 函数执行命令。

  • 步骤六:执行完后相关记录和统计

    ① 检查慢查询是否开启,如果开启会记录慢查询日志;

    ② 检查统计信息是否开启,如果开启会记录一些统计信息,例如执行命令所耗费时长和计数器 (calls) 加 1;

    ③ 检查持久化功能是否开启,如果开启则会记录持久化信息;

    ④ 如果有其它从服务器正在复制当前服务器,则会将刚刚执行的命令传播给其他从服务器。

  • 步骤七:返回结果给客户端

    命令执行完之后,服务器会通过 socket 的方式把执行结果发送给客户端,客户端再把结果展示给用户,至此一条命令的执行就结束了。

当用户输入一条命令之后,客户端会以 socket 的方式把数据转换成 Redis 协议,并发送至服务器端,服务器端在接受到数据之后,会先将协议转换为真正的执行命令,在经过各种验证以保证命令能够正确并安全的执行,但验证处理完之后,会调用具体的方法执行此条命令,执行完成之后会进行相关的统计和记录,然后再把执行结果返回给客户端,整个执行流程,如下图所示:

2-Redis 的安装和使用

Redis 是由 C 语言开发的开源内存数据存储器,经常被用作数据库、缓存以及消息队列等。 Redis 因为其强大的功能和简洁的设计,深受广大开发者和公司的喜爱,几乎占领了内存数据库市场的所有份额。

2.1-Redis 的特性

1. 多种数据类型支持

Redis 支持多种数据类型,例如字符串、散列、列表、集合、有序集合、HyperLogLog、流、地理坐标等,每种类型都有对应的使用场景,同时也满足了所有开发者的需要。

2. 功能完善

Redis 提供了很多的功能,例如消息队列、自动过期删除、事务、数据持久化、分布式锁、附近的人、慢查询分析、Sentinel 和集群等多项功能。

3. 高性能

Redis 是一款内存型数据库,因此在性能方面有天生的优势 (内存操作比磁盘操作要快很多),并且 Redis 在底层使用了更加高效的算法和数据结构,以最大限度的提高了 Redis 的性能。

4. 广泛的编程语言支持

Redis 客户端有众多的开发者提供了相应的支持,这些客户端可以在 https://redis.io/clients 上找到,支持是编程语言,如下图所示:

可以看出几乎所有的编程语言,都有相应的客户端支持。

5. 使用简单

Redis 的 API 虽然比较丰富,但操作的方法都非常的简便,并且需要传递的参数也不多,这样开发者就能更快的上手使用,而且 Redis 官方也提供了比较完整的说明文档。

6. 活跃性高 / 版本迭代快

Redis 活跃度很高,这点可以在 Redis 的开源项目 https://github.com/antirez/redis 中发现,并且 Redis 的版本迭代也很快,到目前为止 Redis 的发布版本就已经有二百多个了。

7. I/O 多路复用模型

Redis 使用了多路 I/O 复用模型,“多路” 指的是多个网络连接,“复用” 指的是复用同一个线程,采用多路 I/O 复用技术可以让单个线程高效的处理多个连接请求,这样就减少了创建和销毁线程所带来的时间消耗,从而到达高效处理大量并发请求的目的。

2.2-Redis 发展历程

Redis 的发展大概经过了以下几个过程:

  • 2009 年 5 月发布 Redis 初始版本;
  • 2012 年发布 Redis 2.6,重构了大量的核心代码,去掉了所有和集群相关的代码;
  • 2013 年 11 月发布 Redis 2.8,增加了部分主从复制功能;
  • 2015 年 4 月发布 Redis 3.0,增加了集群功能;
  • 2017 年 7 月发布 Redis 4.0 ,优化了复制功能和新增了混合持久化;
  • 2018 年 10 月发布 Redis 5.0,增加了 Stream 数据类型;
  • 2020 年 3-5 月计划发布 Redis 6.0 稳定版。

2.3-Redis 的安装

Redis 官方提供了 Linux 和 MacOS 服务端安装包,对于 Windows 还有提供正式的支持,之所以不支持 Windows 平台是因为目前 Linux 版本已经很稳定,并且也有大量的用户,如果开发 Windows 版本可能会带来很多的兼容性问题,但 Windows 平台还是有很多种方法可以安装 Redis 的,我们先来看 Redis 在 Linux 和 MacOS 平台的安装。

1. 源码安装

① 下载源码包

进入网址:https://redis.io/download 选择需要安装的版本,点击 Download 按钮,如下图所示:

② 解压安装包

使用命令:tar zxvf redis-5.0.7.tar.gz

③ 切换到 Redis 目录

使用命令:cd /usr/local/redis-5.0.7/

④ 编译安装

使用命令:sudo make install 安装完成,如下图所示:

如果没有异常信息输出,向上图所示,则表示 Redis 已经安装成功。

2. Docker 安装

Docker 的使用前提是必须先有 Docker,如果本机没有安装 Docker,对于 Linux 用户来说,可使用命令 yum -y install docker 在线安装 docker,如果是非 Linux 平台需要在官网下载并安装 Docker Desker。

① 拉取 Reids 镜像

使用命令:

docker pull redis

如果要安装其他版本的 Redis,可使用 docker pull redis:版本号 的方式来安装。

② 运行 Redis 容器

使用命令:

docker run –name myredis -d -p 6379:6379 redis

  • –name:设置别名
  • -p:映射宿主端口到容器端口
  • -d:表示后台运行

执行完成后截图如下:

如图所示,则证明 Redis 已经正常启动了。 如果要查询 Redis 的安装版本,可遵循下图的执行流程,先进入容器,在进入 Redis 的安装目录,执行 redis-server -v 命令,如图如下:

③ 执行命令

Docker 版的 Redis 命令执行和其他方式安装的 Redis 不太一样,所以这里需要单独讲一下,我们要使用 redis-cli 工具,需要执行以下命令:

docker exec -it myredis redis-cli

其中 “myredis” 指的是容器中 Redis 服务端的别名。

2.4-Redis 的使用

Redis 安装完之后,在 src 目录和 /usr/local/bin 目录下有几个很重要的可执行文件,这些可执行文件可以做很多事,如下表所示:

可执行文件功能
redis-server 启动 Redis
redis-cliRedis 命令行工具
redis-benchmark 基准测试工具
redis-check-aofAOF 持久化文件检测工具和修复工具
redis-check-dumpRDB 持久化文件检测工具和修复工具
redis-sentinel 启动 redis-sentinel

1. 启动 Redis

下面我们就用可执行文件 redis-server 来启动 Redis 服务器,我们在 Redis 的安装目录执行 src/redis-server 命令就可以启动 Redis 服务了,如下图所示:

可以看出 Redis 已经正常启动了,但这种启动方式,会使得 Redis 服务随着控制台的关闭而退出,因为 Redis 服务默认是非后台启动的,我们需要修改配置文件 (redis.conf),找到 daemonize no 改为 daemonize yes ,然后重启服务,此时 Redis 就是以后台运行方式启动了,并且不会随着控制台的关闭而退出。

daemonize 配置如下:

![](https://gitee.com/lemon-cs/images/raw/master/daemonize 配置.png)

2. 使用可视化工具操作 Redis

Redis 启动之后就可以使用一些客户端工具进行链接和操作,如下图所示:

Redis Desktop Manager:

可以看出 Redis 服务器默认有 16 个数据库实例,从 db0 到 db15,但这个数据库实例和传统的关系型数据库实例是不一样的。传统型数据库实例是通过连接字符串配置的,而 Redis 数据库连接字符串只有一个,并不能指定要使用的数据库实例。

在 Redis 中如果要切换数据库实例,只需要执行 select n 命令即可,例如需要连接 db1 ,使用 select 1 命令选择即可,默认连接的数据库实例是 db0。

小贴士:当使用了 flushall 清空 Redis 数据库时,此数据库下的所有数据都会被清除。

Redis 数据库的实例个数也可以通过配置文件更改,在 redis.conf 中找到 databases 16 ,修改后面的数字重启 Redis 服务就会生效。

3. 使用 redis-cli 操作 Redis

redis-cli 是官方自带的客户端链接工具,它可以配合命令行来对 Redis 进行操作,在 Redis 的安装目录使用 src/redis-cli 命令即可链接并操作 Redis,如下图所示:

3-Redis 的数据类型

Redis 发展到现在已经有 9 种数据类型了,其中最基础、最常用的数据类型有 5 种,它们分别是:字符串类型、列表类型、哈希表类型、集合类型、有序集合类型,而在这 5 种数据类型中最常用的是字符串类型。

3.1 - 字符串

字符串类型的全称是 Simple Dynamic Strings 简称 SDS,中文意思是:简单动态字符串。它是以键值对 key-value 的形式进行存储的,根据 key 来存储和获取 value 值,它的使用相对来说比较简单,但在实际项目中应用非常广泛。

1. 字符串类型能做什么?

字符串类型的使用场景有很多,但从功能的角度来区分,大致可分为以下两种:

  • 字符串存储和操作;
  • 整数类型和浮点类型的存储和计算。

字符串最常用的业务场景有以下几个:

  • 页面数据缓存

    我们知道,一个系统最宝贵的资源就是数据库资源,随着公司业务的发展壮大,数据库的存储量也会越来越大,并且要处理的请求也越来越多,当数据量和并发量到达一定级别之后,数据库就变成了拖慢系统运行的 “罪魁祸首”,为了避免这种情况的发生,我们可以把查询结果放入缓存 (Redis) 中,让下次同样的查询直接去缓存系统取结果,而非查询数据库,这样既减少了数据库的压力,同时也提高了程序的运行速度。

    介于以上这个思路,我们可以把文章详情页的数据放入缓存系统。具体的做法是先将文章详情页序列化为字符串存入缓存,再从缓存中读取到字符串,反序列化成对象,然后再赋值到页面进行显示 (当然也可以用哈希类型进行存储),这样我们就实现了文章详情页的缓存功能,架构流程对比图如下所示。

    原始系统运行流程图:

    引入缓存系统后的流程图:

  • 数字计算与统计

    Redis 可以用来存储整数和浮点类型的数据,并且可以通过命令直接累加并存储整数信息,这样就省去了每次先要取数据、转换数据、拼加数据、再存入数据的麻烦,只需要使用一个命令就可以完成此流程,具体实现过程本文下半部分会讲。这样我们就可以使用此功能来实现访问量的统计,当有人访问时访问量 +1 就可以了。

  • 共享 Session 信息

    通常我们在开发后台管理系统时,会使用 Session 来保存用户的会话 (登录) 状态,这些 Session 信息会被保存在服务器端,但这只适用于单系统应用,如果是分布式系统此模式将不再适用。

    例如用户一的 Session 信息被存储在服务器一,但第二次访问时用户一被分配到服务器二,这个时候服务器并没有用户一的 Session 信息,就会出现需要重复登录的问题。分布式系统每次会把请求随机分配到不同的服务器,因此我们需要借助缓存系统对这些 Session 信息进行统一的存储和管理,这样无论请求发送到那台服务器,服务器都会去统一的缓存系统获取相关的 Session 信息,这样就解决了分布式系统下 Session 存储的问题。

    分布式系统单独存储 Session 流程图:

    ![](https://gitee.com/lemon-cs/images/raw/master/ 存储 Session 流程图.png)

    分布式系统使用统一的缓存系统存储 Session 流程图:

    ![](https://gitee.com/lemon-cs/images/raw/master/ 缓存系统存储 Session 流程图.png)

2. 字符串如何使用?

通常我们会使用两种方式来操作 Redis:第一种是使用命令行来操作,例如 redis-cli;另一种是使用代码的方式来操作,下面我们分别来看。

1)命令行操作方式

字符串的操作命令有很多,但大体可分为以下几类:

  • 单个键值对操作
  • 多个键值对操作

我们本文使用 redis-cli 来实现对 Redis 的操作,在使用命令之前,先输入 redis-cli 来链接到 Redis 服务器。

① 单个键值对操作
  • 添加键值对

    语法:set key value [expiration EX seconds|PX milliseconds] [NX|XX] 示例:

    127.0.0.1:6379> set k1 val1
    OK
  • 获取键值对

    语法:get key 示例:

    127.0.0.1:6379> get k1
    "val1"
  • 给元素追加值

    语法:append key value 示例:

    127.0.0.1:6379> get k1
    "v1"
    127.0.0.1:6379> append k1 append
    (integer) 5
    127.0.0.1:6379> get k1
    "v1append"
  • 查询字符串的长度

    语法:strlen key 示例:

    127.0.0.1:6379> strlen k1
    (integer) 5
② 多个键值对操作
  • 创建一个或多个键值对

    语法:mset key value [key value …] 示例:

    127.0.0.1:6379> mset k2 v2 k3 v3
    OK

    小贴士:mset 是一个原子性 (atomic) 操作,所有给定 key 都会在同一时间内被设置,不会出现某些 key 被更新,而另一些 key 没被更新的情况。

  • 查询一个或多个元素

    语法:mget key [key …] 示例:

    127.0.0.1:6379> mget k2 k3
    1) "v2"
    2) "v3"
③ 数字统计

在 Redis 中可以直接操作整型和浮点型,例如可以直接使用命令来加、减值。

  • 给整数类型的值加 1

    语法:incr key 示例:

    127.0.0.1:6379> get k1
    "3"
    127.0.0.1:6379> incr k1
    (integer) 4
    127.0.0.1:6379> get k1
    "4"
  • 给整数类型的值减 1

    语法:decr key 示例:

    127.0.0.1:6379> get k1
    "4"
    127.0.0.1:6379> decr k1
    (integer) 3
    127.0.0.1:6379> get k1
    "3"
  • 根据 key 减去指定的值

    语法:decrby key decrement 示例:

    127.0.0.1:6379> get k1
    "3"
    127.0.0.1:6379> decrby k1 2
    (integer) 1
    127.0.0.1:6379> get k1
    "1"

    如果 key 不存在,则会先初始化此 key 为 0 ,然后再执行减法操作:

    127.0.0.1:6379> get k2
    (nil)
    127.0.0.1:6379> decrby k2 3
    (integer) -3
    127.0.0.1:6379> get k2
    "-3"
  • 根据 key 加指定的整数值

    语法:incrby key increment 示例:

    127.0.0.1:6379> get k1
    "1"
    127.0.0.1:6379> incrby k1 2
    (integer) 3
    127.0.0.1:6379> get k1
    "3"

    如果 key 不存在,则会先初始化此 key 为 0 ,然后再执行加整数值的操作:

    127.0.0.1:6379> get k3
    (nil)
    127.0.0.1:6379> incrby k3 5
    (integer) 5
    127.0.0.1:6379> get k3
    "5"
  • 根据 key 加上指定的浮点数

    语法:incrbyfloat key increment 示例:

    127.0.0.1:6379> get k3
    "5"
    127.0.0.1:6379> incrbyfloat k3 4.9
    "9.9"
    127.0.0.1:6379> get k3
    "9.9"

    如果 key 不存在,则会先初始化此 key 为 0 ,然后再执行加浮点数的操作:

    127.0.0.1:6379> get k4
    (nil)
    127.0.0.1:6379> incrbyfloat k4 4.4
    "4.4"
    127.0.0.1:6379> get k4
    "4.4"
2)代码操作方式

本文我们使用 Java 语言来实现对 Redis 的操作,首先我们在项目中添加对 Jedis 框架的引用,如果是 Maven 项目,我们会在 pom.xml 文件中添加如下信息:

<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>${version}</version>
</dependency>

Jedis 是 Redis 官方推荐的 Java 客户端开发包,用于实现快速简单的操作 Redis。添加完 Jedis 之后,我们来写具体的操作代码,操作函数与命令方式的调用比较相似,如下代码所示:

import redis.clients.jedis.Jedis;
import java.util.List;

public class StringExample {
public static void main(String[] args) {
Jedis jedis = new Jedis("127.0.0.1", 6379);
// jedis.auth("xxx"); // 输入密码,没有密码,可以不设置
// 添加一个元素
jedis.set("mystr", "redis");
// 获取元素
String myStr = jedis.get("mystr");
System.out.println(myStr); // 输出:redis
// 添加多个元素(key,value,key2,value2)
jedis.mset("db", "redis", "lang", "java");
// 获取多个元素
List<String> mlist = jedis.mget("db", "lang");
System.out.println(mlist); // 输出:[redis, java]
// 给元素追加字符串
jedis.append("db", ",mysql");
// 打印追加的字符串
System.out.println(jedis.get("db")); // 输出:redis,mysql
// 当 key 不存在时,赋值键值
Long setnx = jedis.setnx("db", "db2");
// 因为 db 元素已经存在,所以会返回 0 条修改
System.out.println(setnx); // 输出:0
// 字符串截取
String range = jedis.getrange("db", 0, 2);
System.out.println(range); // 输出:red
// 添加键值并设置过期时间(单位:毫秒)
String setex = jedis.setex("db", 1000, "redis");
System.out.println(setex); // 输出:ok
// 查询键值的过期时间
Long ttl = jedis.ttl("db");
System.out.println(ttl); // 输出:1000
}
}

3. 代码实战

本文的上半部分我们讲到了字符串的很多种使用场景,本小节就以字符串存储用户对象信息为例,我们先将用户对象信息序列化为字符串存储在 Redis,再从 Redis 中取出字符串并反序列化为对象信息为例,使用 Java 语言来实现。

首先添加 JSON 转换类,用于对象和字符串之间的序列化和反序列化,我们这里采用 Google 的 Gson 来实现,首先在 pom.xml 文件中添加如下引用:

<!-- https://mvnrepository.com/artifact/com.google.code.gson/gson -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.6</version>
</dependency>

添加完 Gson 引用之后,我们来写具体的业务代码,先见用户信息序列化之后存储在 Redis 中:

Jedis jedis = new Jedis("xxx.xxx.xxx.xxx", 6379);
jedis.auth("xxx");
Gson gson = new Gson();
// 构建用户数据
User user = new User();
user.setId(1);
user.setName("Redis");
user.setAge(10);
String jsonUser = gson.toJson(user);
// 打印用户信息(json)
System.out.println(jsonUser); // 输出:{"id":1,"name":"Redis","age":10}
// 把字符串存入 Redis
jedis.set("user", jsonUser);

当使用用户信息时,我们从 Redis 反序列化出来,代码如下:

String getUserData = jedis.get("user");
User userData = gson.fromJson(getUserData, User.class);
// 打印对象属性信息
System.out.println(userData.getId() + ":" + userData.getName()); // 输出结果:1:Redis

以上两个步骤就完成了用户信息存放至 Redis 中的过程,也是常用的经典使用场景之一。

4. 字符串的内部实现

1)源码分析

Redis 3.2 之前 SDS 源码如下:

struct sds{
int len; // 已占用的字节数
int free; // 剩余可以字节数
char buf[]; // 存储字符串的数据空间
}

可以看出 Redis 3.2 之前 SDS 内部是一个带有长度信息的字节数组,存储结构如下图所示:

为了更加有效的利用内存,Redis 3.2 优化了 SDS 的存储结构,源码如下:

typedef char *sds;

struct __attribute__ ((__packed__)) sdshdr5 { // 对应的字符串长度小于 1<<5
unsigned char flags;
char buf[];
};
struct __attribute__ ((__packed__)) sdshdr8 { // 对应的字符串长度小于 1<<8
uint8_t len; /* 已使用长度,1 字节存储 */
uint8_t alloc; /* 总长度 */
unsigned char flags;
char buf[]; // 真正存储字符串的数据空间
};
struct __attribute__ ((__packed__)) sdshdr16 { // 对应的字符串长度小于 1<<16
uint16_t len; /* 已使用长度,2 字节存储 */
uint16_t alloc;
unsigned char flags;
char buf[];
};
struct __attribute__ ((__packed__)) sdshdr32 { // 对应的字符串长度小于 1<<32
uint32_t len; /* 已使用长度,4 字节存储 */
uint32_t alloc;
unsigned char flags;
char buf[];
};
struct __attribute__ ((__packed__)) sdshdr64 { // 对应的字符串长度小于 1<<64
uint64_t len; /* 已使用长度,8 字节存储 */
uint64_t alloc;
unsigned char flags;
char buf[];
};

这样就可以针对不同长度的字符串申请相应的存储类型,从而有效的节约了内存使用。

2)数据类型

我们可以使用 object encoding key 命令来查看对象 (键值对) 存储的数据类型,当我们使用此命令来查询 SDS 对象时,发现 SDS 对象竟然包含了三种不同的数据类型:int、embstr 和 raw。

① int 类型
127.0.0.1:6379> set key 666
OK
127.0.0.1:6379> object encoding key
"int"
② embstr 类型
127.0.0.1:6379> set key abc
OK
127.0.0.1:6379> object encoding key
"embstr"
③ raw 类型
127.0.0.1:6379> set key abcdefghigklmnopqrstyvwxyzabcdefghigklmnopqrs
OK
127.0.0.1:6379> object encoding key
"raw"

int 类型很好理解,整数类型对应的就是 int 类型,而字符串则对应是 embstr 类型,当字符串长度大于 44 字节时,会变为 raw 类型存储。

3)为什么是 44 字节?

在 Redis 中,如果 SDS 的存储值大于 64 字节时,Redis 的内存分配器会认为此对象为大字符串,并使用 raw 类型来存储,当数据小于 64 字节时 (字符串类型),会使用 embstr 类型存储。既然内存分配器的判断标准是 64 字节,那为什么 embstr 类型和 raw 类型的存储判断值是 44 字节?

这是因为 Redis 在存储对象时,会创建此对象的关联信息,redisObject 对象头和 SDS 自身属性信息,这些信息都会占用一定的存储空间,因此长度判断标准就从 64 字节变成了 44 字节。

在 Redis 中,所有的对象都会包含 redisObject 对象头。我们先来看 redisObject 对象的源码:

typedef struct redisObject {
unsigned type:4; // 4 bit
unsigned encoding:4; // 4 bit
unsigned lru:LRU_BITS; // 3 个字节
int refcount; // 4 个字节
void *ptr; // 8 个字节
} robj;

它的参数说明如下:

  • type:对象的数据类型,例如:string、list、hash 等,占用 4 bits 也就是半个字符的大小;
  • encoding:对象数据编码,占用 4 bits;
  • lru:记录对象的 LRU (Least Recently Used 的缩写,即最近最少使用) 信息,内存回收时会用到此属性,占用 24 bits (3 字节);
  • refcount:引用计数器,占用 32 bits (4 字节);
  • *ptr:对象指针用于指向具体的内容,占用 64 bits (8 字节)。

redisObject 总共占用 0.5 bytes + 0.5 bytes + 3 bytes + 4 bytes + 8 bytes = 16 bytes (字节)。

了解了 redisObject 之后,我们再来看 SDS 自身的数据结构,从 SDS 的源码可以看出,SDS 的存储类型一共有 5 种:SDSTYPE5、SDSTYPE8、SDSTYPE16、SDSTYPE32、SDSTYPE64,在这些类型中最小的存储类型为 SDSTYPE5,但 SDSTYPE5 类型会默认转成 SDSTYPE8,以下源码可以证明,如下图所示:

那我们直接来看 SDSTYPE8 的源码:

struct __attribute__ ((__packed__)) sdshdr8 {
uint8_t len; // 1 byte
uint8_t alloc; // 1 byte
unsigned char flags; // 1 byte
char buf[];
};

可以看出除了内容数组 (buf) 之外,其他三个属性分别占用了 1 个字节,最终分隔字符等于 64 字节,减去 redisObject 的 16 个字节,再减去 SDS 自身的 3 个字节,再减去结束符 \0 结束符占用 1 个字节,最终的结果是 44 字节 (64-16-3-1=44),内存占用如下图所示:

5. 更多字符串操作命令

1) 键值对过期操作
  • 添加键值对并设置过期时间

    语法:set key value [expiration EX seconds|PX milliseconds] [NX|XX] 示例:

    127.0.0.1:6379> set k1 val1 ex 1000
    OK

    设置键值对 k1=val1,过期时间为 1000 秒。 查询键的过期时间可以使用 ttl key,如下代码所示:

    127.0.0.1:6379> ttl k1
    (integer) 997
  • 赋值字符串,并设置过期时间 (单位 / 秒)

    语法:setex key seconds value 示例:

    127.0.0.1:6379> setex k1 1000 v1
    OK
    127.0.0.1:6379> ttl k1
    (integer) 999
    127.0.0.1:6379> get k1
    "v1"

    如果 key 已经存在,setex 命令将会覆写原来的旧值。

  • 赋值字符串,并设置过期时间 (单位 / 毫秒)

    与 setex 用法类似,只不过 psetex 设置的单位是毫秒。 语法:psetex key milliseconds value 示例:

    127.0.0.1:6379> psetex k1 100000 v11
    OK
    127.0.0.1:6379> ttl k1
    (integer) 97
    127.0.0.1:6379> get k1
    "v11"
2) 字符串操作进阶
  • 根据指定的范围截取字符串

    语法:getrange key start end 示例:

    127.0.0.1:6379> get hello
    "hello world"
    127.0.0.1:6379> getrange hello 0 4
    "hello"
    127.0.0.1:6379> getrange hello 0 -1
    "hello world"
    127.0.0.1:6379> getrange hello 0 -2
    "hello worl"

    负数表示从字符串最后开始计数, -1 表示最后一个字符, -2 表示倒数第二个,以此类推。

  • 设置字符串新值并返回旧值

    语法:getset key value 示例:

    127.0.0.1:6379> get db
    "redis"
    127.0.0.1:6379> getset db mysql
    "redis"
    127.0.0.1:6379> get db
    "mysql"

    使用 getset 命令时,如果 key 不为字符串会报错,如下效果所示:

    127.0.0.1:6379> type myset
    set
    127.0.0.1:6379> getset myset v1
    (error) WRONGTYPE Operation against a key holding the wrong kind of value

    根据 type 命令可以查询出 key 所对应的数据类型为非字符串,在使用 getset 命令就会报错。

  • 赋值 (创建) 键值对,当 key 不存在时

    如果 key 已经存在,则执行命令无效,不会修改原来的值,否则会创建新的键值对。 语法:setnx key value 示例:

    127.0.0.1:6379> setnx k9 v9
    (integer) 1
    127.0.0.1:6379> get k9
    "v9"
    127.0.0.1:6379> setnx k9 v99
    (integer) 0
    127.0.0.1:6379> get k9
    "v9"
  • 设置一个或多个键值,当所有键值都不存在时

    语法:msetnx key value [key value …] 示例:

    127.0.0.1:6379> msetnx k5 v5 k6 v6
    (integer) 1
    127.0.0.1:6379> mget k5 k6
    1) "v5"
    2) "v6"

    注意:msetnx 是一个原子操作,当一个操作失败时,其他操作也会失败。例如,如果有一个已经存在的值,那么全部键值都会设置失败,效果如下:

    127.0.0.1:6379> get k1
    "val1"
    127.0.0.1:6379> get k8
    (nil)
    127.0.0.1:6379> msetnx k1 v1 k8 v8
    (integer) 0
    127.0.0.1:6379> get k1
    "val1"
    127.0.0.1:6379> get k8
    (nil)
  • 截取字符串并赋值

    语法:setrange key offset value 示例:

    127.0.0.1:6379> get hello
    "hello java"
    127.0.0.1:6379> setrange hello 6 redis
    (integer) 11
    127.0.0.1:6379> get hello
    "hello redis"

    如果待截取的键不存在,会当作空白字符串处理,效果如下:

    127.0.0.1:6379> setrange mystr 3 mystring
    (integer) 11
    127.0.0.1:6379> get mystring
    (nil)

    以上这些命令基本涵盖了所有的字符串操作,有些不常用,但很好用,例如 setnx key value 命令,当 key 已经存在,则执行命令无效,并不会覆盖原有的值,如果没有此 key 则会新创建一个键值对。

3.2 - 字典类型(Hash)

字典类型 (Hash) 又被成为散列类型或者是哈希表类型,它是将一个键值 (key) 和一个特殊的 “哈希表” 关联起来,这个 “哈希表” 表包含两列数据:字段和值。例如我们使用字典类型来存储一篇文章的详情信息,存储结构如下图所示:

同理我们也可以使用字典类型来存储用户信息,并且使用字典类型来存储此类信息,是不需要手动序列化和反序列化数据的,所以使用起来更加的方便和高效。

1. 基础使用

首先我们使用命令行工具 redis-cli,来对字典类型进行相关的操作。

1)插入单个元素

语法:hset key field value 示例:

127.0.0.1:6379> hset myhash key1 value1
(integer) 1
127.0.0.1:6379> hset myhash key2 value2
(integer) 1
2)当某键不存在时,插入数据

语法:hsetnx key field value 示例:

127.0.0.1:6379> hsetnx myhash k4 v4
(integer) 1
127.0.0.1:6379> hget myhash k4
"v4"

如果尝试插入已存在的键,不会改变原来的值,示例如下:

127.0.0.1:6379> hsetnx myhash k4 val4
(integer) 0
127.0.0.1:6379> hget myhash k4
"v4"

尝试修改已经存在的 k4 赋值为 val4,但并没有生效,查询 k4 的结果依然是原来的值 v4。

3)查询单个元素

语法:hget key field 示例:

127.0.0.1:6379> hget myhash key1
"value1"
4)删除 key 中的一个或多个元素

语法:hdel myhash field [field …] 示例:

127.0.0.1:6379> hdel myhash key1 key2
(integer) 1

注意:不能使用类似于 hdel myhash 的命令删除整个 Hash 值的。

5)某个整数值累加计算

语法:hincrby key field increment 示例:

127.0.0.1:6379> hset myhash k3 3
(integer) 1
127.0.0.1:6379> hincrby myhash k3 2
(integer) 5
127.0.0.1:6379> hget myhash k3
"5"

2. 代码实战

接下来我们用 Java 代码实现对 Redis 的操作,同样我们先引入 Jedis 框架 ,接下来再用代码来对字典类型进行操作,示例代码如下:

import redis.clients.jedis.Jedis;
import java.util.Map;

public class HashExample {
public static void main(String[] args) throws InterruptedException {
Jedis jedis = new Jedis("127.0.0.1", 6379);
// 把 Key 值定义为变量
final String REDISKEY = "myhash";
// 插入单个元素
jedis.hset(REDISKEY, "key1", "value1");
// 查询单个元素
Map<String, String> singleMap = jedis.hgetAll(REDISKEY);
System.out.println(singleMap.get("key1")); // 输出:value1
// 查询所有元素
Map<String, String> allMap = jedis.hgetAll(REDISKEY);
System.out.println(allMap.get("k2")); // 输出:val2
System.out.println(allMap); // 输出:{key1=value1, k1=val1, k2=val2, k3=9.2, k4=v4...}
// 删除单个元素
Long delResult = jedis.hdel(REDISKEY, "key1");
System.out.println("删除结果:" + delResult); // 输出:删除结果:1
// 查询单个元素
System.out.println(jedis.hget(REDISKEY, "key1")); // 输出:返回 null
}
}

从代码中可以看出,在 Jedis 中我们可以直接使用 Map 来接收 Redis 中读取的字典类型的数据,省去了手动转化的麻烦,还是比较方便的。

3. 数据结构

字典类型本质上是由数组和链表结构组成的,来看字典类型的源码实现:

typedef struct dictEntry { // dict.h
void *key;
union {
void *val;
uint64_t u64;
int64_t s64;
double d;
} v;
struct dictEntry *next; // 下一个 entry
} dictEntry;

字典类型的数据结构,如下图所示:

通常情况下字典类型会使用数组的方式来存储相关的数据,但发生哈希冲突时才会使用链表的结构来存储数据。

4. 哈希冲突

字典类型的存储流程是先将键值进行 Hash 计算,得到存储键值对应的数组索引,再根据数组索引进行数据存储,但在小概率事件下可能会出完全不相同的键值进行 Hash 计算之后,得到相同的 Hash 值,这种情况我们称之为哈希冲突

哈希冲突一般通过链表的形式解决,相同的哈希值会对应一个链表结构,每次有哈希冲突时,就把新的元素插入到链表的尾部,请参考上面数据结构的那张图。

键值查询的流程如下:

  • 通过算法 (Hash,计算和取余等) 操作获得数组的索引值,根据索引值找到对应的元素;
  • 判断元素和查找的键值是否相等,相等则成功返回数据,否则需要查看 next 指针是否还有对应其他元素,如果没有,则返回 null,如果有的话,重复此步骤。

键值查询流程,如下图所示:

5. 渐进式 rehash

Redis 为了保证应用的高性能运行,提供了一个重要的机制 —— 渐进式 rehash。 渐进式 rehash 是用来保证字典缩放效率的,也就是说在字典进行扩容或者缩容是会采取渐进式 rehash 的机制。

当元素数量等于数组长度时就会进行扩容操作,源码在 dict.c 文件中,核心代码如下:

int dictExpand(dict *d, unsigned long size)
{
/* 需要的容量小于当前容量,则不需要扩容 */
if (dictIsRehashing(d) || d->ht[0].used > size)
return DICT_ERR;
dictht n;
unsigned long realsize = _dictNextPower(size); // 重新计算扩容后的值
/* 计算新的扩容大小等于当前容量,不需要扩容 */
if (realsize == d->ht[0].size) return DICT_ERR;
/* 分配一个新的哈希表,并将所有指针初始化为NULL */
n.size = realsize;
n.sizemask = realsize-1;
n.table = zcalloc(realsize*sizeof(dictEntry*));
n.used = 0;
if (d->ht[0].table == NULL) {
// 第一次初始化
d->ht[0] = n;
return DICT_OK;
}
d->ht[1] = n; // 把增量输入放入新 ht[1] 中
d->rehashidx = 0; // 非默认值 -1,表示需要进行 rehash
return DICT_OK;
}

从以上源码可以看出,如果需要扩容则会申请一个新的内存地址赋值给 ht [1],并把字典的 rehashindex 设置为 0,表示之后需要进行 rehash 操作。

当字典的使用容量不足总空间的 10% 时就会触发缩容,Redis 在进行缩容时也会把 rehashindex 设置为 0,表示之后需要进行 rehash 操作。

3) 渐进式 rehash 流程

在进行渐进式 rehash 时,会同时保留两个 hash 结构,新键值对加入时会直接插入到新的 hash 结构中,并会把旧 hash 结构中的元素一点一点的移动到新的 hash 结构中,当移除完最后一个元素时,清空旧 hash 结构,主要的执行流程如下:

  • 扩容或者缩容时把字典中的字段 rehashidx 标识为 0;
  • 在执行定时任务或者执行客户端的 hset、hdel 等操作指令时,判断是否需要触发 rehash 操作(通过 rehashidx 标识判断),如果需要触发 rehash 操作,也就是调用 dictRehash 函数,dictRehash 函数会把 ht [0] 中的元素依次添加到新的 Hash 表 ht [1] 中;
  • rehash 操作完成之后,清空 Hash 表 ht [0],然后对调 ht [1] 和 ht [0] 的值,把新的数据表 ht [1] 更改为 ht [0],然后把字典中的 rehashidx 标识为 -1,表示不需要执行 rehash 操作。

6. 使用场景

哈希字典的典型使用场景如下:

  • 商品购物车,购物车非常适合用哈希字典表示,使用人员唯一编号作为字典的 key,value 值可以存储商品的 id 和数量等信息;
  • 存储用户的属性信息,使用人员唯一编号作为字典的 key,value 值为属性字段和对应的值;
  • 存储文章详情页信息等。

7. 更多字典操作命令

  • 插入一个或多个元素

    语法:hmset key field value [field value …] 示例:

    127.0.0.1:6379> hmset myhash k1 val1 k2 val2
    OK
    127.0.0.1:6379> hmget myhash k1 k2
    1) "val1"
    2) "val2"
  • 查询一个或多个元素

    语法:hmget key field [field …] 示例:

    127.0.0.1:6379> hmget myhash k1 k2
    1) "v1"
    2) "v2"
  • 查询某个 key 的所有字段

    语法:hkeys key 示例:

    127.0.0.1:6379> hkeys myhash
    1) "key1"
    2) "key2"
  • 查询某个 key 的所有值

    语法:hvals key 示例:

    127.0.0.1:6379> hvals myhash
    1) "value1"
    2) "value2"
  • 查询某个 key 的所有字段和值

    语法:hgetall key 示例:

    127.0.0.1:6379> hgetall myhash
    1) "k1"
    2) "v1"
    3) "k2"
    4) "v2"
  • 某个浮点值累加计算

    语法:hincrbyfloat key field increment 示例:

    127.0.0.1:6379> hincrbyfloat myhash k3 2.2
    "9.2"
  • 查询元素是否存在

    语法:hexists key field 示例:

    127.0.0.1:6379> hexists myhash key1
    (integer) 1
  • 查询元素个数

    语法:hlen key 示例:

    127.0.0.1:6379> hlen myhash
    (integer) 2

3.3 - 列表(List)

列表类型 (List) 是一个使用链表结构存储的有序结构,它的元素插入会按照先后顺序存储到链表结构中,因此它的元素操作 (插入 \ 删除) 时间复杂度为 O (1),所以相对来说速度还是比较快的,但它的查询时间复杂度为 O (n),因此查询可能会比较慢。

1. 基础使用

列表类型的使用相对来说比较简单,对它的操作就相当操作一个没有任何 key 值的 value 集合,如下图所示:

  • 给列表添加一个或多个元素

    语法:lpush key value [value …] 示例:

    127.0.0.1:6379> lpush list 1 2 3
    (integer) 3
  • 给列表尾部添加一个或多个元素

    语法:rpush key value [value …] 示例:

    127.0.0.1:6379> rpush list2 1 2 3
    (integer) 3
  • 返回列表指定区间内的元素

    语法:lrange key start stop 示例:

    127.0.0.1:6379> lrange list 0 -1
    "3"
    "2"
    "1"
    127.0.0.1:6379> lrange list2 0 -1
    "1"
    "2"
    "3"

    其中 -1 代表列表中的最后一个元素。

  • 获取并删除列表的第一个元素

    语法:lpop key 示例:

    127.0.0.1:6379> lrange list 0 -1
    1) "d"
    2) "c"
    3) "b"
    4) "a"
    127.0.0.1:6379> lpop list
    "d"
    127.0.0.1:6379> lrange list 0 -1
    1) "c"
    2) "b"
    3) "a"
  • 获取并删除列表的最后一个元素

    语法:rpop key 示例:

    127.0.0.1:6379> lrange list 0 -1
    1) "c"
    2) "b"
    3) "a"
    127.0.0.1:6379> rpop list
    "a"
    127.0.0.1:6379> lrange list 0 -1
    1) "c"
    2) "b"
  • 根据下标获取对应的元素

    语法:lindex key index 示例:

    127.0.0.1:6379> rpush list3 a b c
    (integer) 3
    127.0.0.1:6379> lindex list3 0
    "a"

2. 代码实战

下面来看列表类型在 Java 中的使用,同样先添加 Jedis 框架,使用代码如下:

public class ListExample {
public static void main(String[] args) {
Jedis jedis = new Jedis("127.0.0.1", 6379);
// 声明 Redis key
final String REDISKEY = "list";
// 在头部插入一个或多个元素
Long lpushResult = jedis.lpush(REDISKEY, "Java", "Sql");
System.out.println(lpushResult); // 输出:2
// 获取第 0 个元素的值
String idValue = jedis.lindex(REDISKEY, 0);
System.out.println(idValue); // 输出:Sql
// 查询指定区间的元素
List<String> list = jedis.lrange(REDISKEY, 0, -1);
System.out.println(list); // 输出:[Sql, Java]
// 在元素 Java 前面添加 MySQL 元素
jedis.linsert(REDISKEY, ListPosition.BEFORE, "Java", "MySQL");
System.out.println(jedis.lrange(REDISKEY, 0, -1)); // 输出:[Sql, MySQL, Java]
jedis.close();
}
}

程序运行结果如下:

2 Sql [Sql, Java] [Sql, MySQL, Java]

3. 内部实现

我们先用 debug encoding key 来查看列表类型的内部存储类型,如下所示:

127.0.0.1:6379> object encoding list
"quicklist"

从结果可以看出,列表类型的底层数据类型是 quicklist。

quicklist (快速列表) 是 Redis 3.2 引入的数据类型,早期的列表类型使用的是 ziplist (压缩列表) 和双向链表组成的,Redis 3.2 改为用 quicklist 来存储列表元素。

我们来看下 quicklist 的实现源码:

typedef struct quicklist { // src/quicklist.h
quicklistNode *head;
quicklistNode *tail;
unsigned long count; /* ziplist 的个数 */
unsigned long len; /* quicklist 的节点数 */
unsigned int compress : 16; /* LZF 压缩算法深度 */
//...
} quicklist;
typedef struct quicklistNode {
struct quicklistNode *prev;
struct quicklistNode *next;
unsigned char *zl; /* 对应的 ziplist */
unsigned int sz; /* ziplist 字节数 */
unsigned int count : 16; /* ziplist 个数 */
unsigned int encoding : 2; /* RAW==1 or LZF==2 */
unsigned int container : 2; /* NONE==1 or ZIPLIST==2 */
unsigned int recompress : 1; /* 该节点先前是否被压缩 */
unsigned int attempted_compress : 1; /* 节点太小无法压缩 */
//...
} quicklistNode;
typedef struct quicklistLZF {
unsigned int sz;
char compressed[];
} quicklistLZF;

从以上源码可以看出 quicklist 是一个双向链表,链表中的每个节点实际上是一个 ziplist,它们的结构如下图所示:

ziplist 作为 quicklist 的实际存储结构,它本质是一个字节数组,ziplist 数据结构如下图所示:

其中的字段含义如下:

  • zlbytes:压缩列表字节长度,占 4 字节;
  • zltail:压缩列表尾元素相对于起始元素地址的偏移量,占 4 字节;
  • zllen:压缩列表的元素个数;
  • entryX:压缩列表存储的所有元素,可以是字节数组或者是整数;
  • zlend:压缩列表的结尾,占 1 字节。

4. 源码解析

下面我们来看一下更多关于列表类型的源码实现。

1) 添加功能源码分析

quicklist 添加操作对应函数是 quicklistPush,源码如下:

void quicklistPush(quicklist *quicklist, void *value, const size_t sz,
int where) {
if (where == QUICKLIST_HEAD) {
// 在列表头部添加元素
quicklistPushHead(quicklist, value, sz);
} else if (where == QUICKLIST_TAIL) {
// 在列表尾部添加元素
quicklistPushTail(quicklist, value, sz);
}
}

以 quicklistPushHead 为例,源码如下:

int quicklistPushHead(quicklist *quicklist, void *value, size_t sz) {
quicklistNode *orig_head = quicklist->head;
if (likely(
_quicklistNodeAllowInsert(quicklist->head, quicklist->fill, sz))) {
// 在头部节点插入元素
quicklist->head->zl =
ziplistPush(quicklist->head->zl, value, sz, ZIPLIST_HEAD);
quicklistNodeUpdateSz(quicklist->head);
} else {
// 头部节点不能继续插入,需要新建 quicklistNode、ziplist 进行插入
quicklistNode *node = quicklistCreateNode();
node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);
quicklistNodeUpdateSz(node);
// 将新建的 quicklistNode 插入到 quicklist 结构中
_quicklistInsertNodeBefore(quicklist, quicklist->head, node);
}
quicklist->count++;
quicklist->head->count++;
return (orig_head != quicklist->head);
}

quicklistPushHead 函数的执行流程,先判断 quicklist 的 head 节点是否可以插入数据,如果可以插入则使用 ziplist 的接口进行插入,否则就新建 quicklistNode 节点进行插入。

函数的入参是待插入的 quicklist,还有需要插入的值 value 以及他的大小 sz。

函数的返回值为 int,0 表示没有新建 head,1 表示新建了 head。 quicklistPushHead 执行流程,如下图所示:

![](https://gitee.com/lemon-cs/images/raw/master/quicklistPushHead 执行流程.png)

2) 删除功能源码分析

quicklist 元素删除分为两种情况:单一元素删除和区间元素删除,它们都位于 src/quicklist.c 文件中。

① 单一元素删除

单一元素的删除函数是 quicklistDelEntry,源码如下:

void quicklistDelEntry(quicklistIter *iter, quicklistEntry *entry) {
quicklistNode *prev = entry->node->prev;
quicklistNode *next = entry->node->next;
// 删除指定位置的元素
int deleted_node = quicklistDelIndex((quicklist *)entry->quicklist,
entry->node, &entry->zi);
//...
}

可以看出 quicklistDelEntry 函数的底层,依赖 quicklistDelIndex 函数进行元素删除。

② 区间元素删除

区间元素删除的函数是 quicklistDelRange,源码如下:

// start 表示开始删除的下标,count 表示要删除的个数
int quicklistDelRange(quicklist *quicklist, const long start,
const long count) {
if (count <= 0)
return 0;
unsigned long extent = count;
if (start >= 0 && extent > (quicklist->count - start)) {
// 删除的元素个数大于已有元素
extent = quicklist->count - start;
} else if (start < 0 && extent > (unsigned long)(-start)) {
// 删除指定的元素个数
extent = -start; /* c.f. LREM -29 29; just delete until end. */
}
//...
// extent 为剩余需要删除的元素个数,
while (extent) {
// 保存下个 quicklistNode,因为本节点可能会被删除
quicklistNode *next = node->next;
unsigned long del;
int delete_entire_node = 0;
if (entry.offset == 0 && extent >= node->count) {
// 删除整个 quicklistNode
delete_entire_node = 1;
del = node->count;
} else if (entry.offset >= 0 && extent >= node->count) {
// 删除本节点的所有元素
del = node->count - entry.offset;
} else if (entry.offset < 0) {
// entry.offset<0 表示从后向前,相反则表示从前向后剩余的元素个数
del = -entry.offset;
if (del > extent)
del = extent;
} else {
// 删除本节点部分元素
del = extent;
}
D("[%ld]: asking to del: %ld because offset: %d; (ENTIRE NODE: %d), "
"node count: %u",
extent, del, entry.offset, delete_entire_node, node->count);
if (delete_entire_node) {
__quicklistDelNode(quicklist, node);
} else {
quicklistDecompressNodeForUse(node);
node->zl = ziplistDeleteRange(node->zl, entry.offset, del);
quicklistNodeUpdateSz(node);
node->count -= del;
quicklist->count -= del;
quicklistDeleteIfEmpty(quicklist, node);
if (node)
quicklistRecompressOnly(quicklist, node);
}
// 剩余待删除元素的个数
extent -= del;
// 下个 quicklistNode
node = next;
// 从下个 quicklistNode 起始位置开始删除
entry.offset = 0;
}
return 1;
}

从上面代码可以看出,quicklist 在区间删除时,会先找到 start 所在的 quicklistNode,计算删除的元素是否小于要删除的 count,如果不满足删除的个数,则会移动至下一个 quicklistNode 继续删除,依次循环直到删除完成为止。

quicklistDelRange 函数的返回值为 int 类型,当返回 1 时表示成功的删除了指定区间的元素,返回 0 时表示没有删除任何元素。

3) 更多源码

除了上面介绍的几个常用函数之外,还有一些更多的函数,例如:

  • quicklistCreate:创建 quicklist;
  • quicklistInsertAfter:在某个元素的后面添加数据;
  • quicklistInsertBefore:在某个元素的前面添加数据;
  • quicklistPop:取出并删除列表的第一个或最后一个元素;
  • quicklistReplaceAtIndex:替换某个元素。

5. 使用场景

列表的典型使用场景有以下两个:

  • 消息队列:列表类型可以使用 rpush 实现先进先出的功能,同时又可以使用 lpop 轻松的弹出(查询并删除)第一个元素,所以列表类型可以用来实现消息队列;
  • 文章列表:对于博客站点来说,当用户和文章都越来越多时,为了加快程序的响应速度,我们可以把用户自己的文章存入到 List 中,因为 List 是有序的结构,所以这样又可以完美的实现分页功能,从而加速了程序的响应速度。

列表类型并不是简单的双向链表,而是采用了 quicklist 的数据结构对数据进行存取,quicklist 是 Redis 3.2 新增的数据类型,它的底层采取的是压缩列表加双向链表的存储结构,quicklist 为了存储更多的数据,会对每个 quicklistNode 节点进行压缩,这样就可以有效的存储更多的消息队列或者文章的数据了。

6. 更多列表操作命令

  • 在某值之前 / 之后添加某个元素

    语法:linsert key before|after pivot value 示例:

    127.0.0.1:6379> linsert list3 before b A
    (integer) 4
    127.0.0.1:6379> lrange list3 0 -1
    "a"
    "A"
    "b"
    "c"
  • 根据下标修改元素

    语法:lset key index value 示例

    127.0.0.1:6379> lindex list3 0
    "a"
    127.0.0.1:6379> lset list3 0 A
    OK
    127.0.0.1:6379> lindex list3 0
    "A"
  • 根据下标删除元素

    语法:ltrim key start stop 示例:

    127.0.0.1:6379> lpush list a b c
    (integer) 3
    127.0.0.1:6379> ltrim list 0 1
    OK
    127.0.0.1:6379> lrange list 0 -1
    1) "c"
    2) "b"
  • 查询列表的长度

    语法:llen key 示例:

    127.0.0.1:6379> llen list
    (integer) 2
  • 删除指定个数的元素

    语法:lrem key count value 示例:

    127.0.0.1:6379> lpush list a a b b c c
    (integer) 6
    127.0.0.1:6379> lrem list 2 a
    (integer) 2
    127.0.0.1:6379> lrem list 1 b
    (integer) 1
    127.0.0.1:6379> lrange list 0 -1
    1) "c"
    2) "c"
    3) "b"

3.4 - 集合类型 (Set)

集合类型 (Set) 是一个无序并唯一的键值集合。

之所以说集合类型是一个无序集合,是因为它的存储顺序不会按照插入的先后顺序进行存储,如下代码所示:

127.0.0.1:6379> sadd myset v2 v1 v3 #插入数据 v2、v1、v3 
(integer) 3
127.0.0.1:6379> smembers myset #查询数据
1) "v1"
2) "v3"
3) "v2"

从上面代码执行结果可以看出,myset 的存储顺序并不是以插入的先后顺序进行存储的。

集合类型和列表类型的区别如下:

  • 列表可以存储重复元素,集合只能存储非重复元素;
  • 列表是按照元素的先后顺序存储元素的,而集合则是无序方式存储元素的。

1. 基础使用

集合类型的功能比列表类型丰富一些,集合类型可以用来统计多个集合的交集、错集和并集,如下代码所示。

1)添加一个或多个元素

语法:sadd key member [member …] 示例:

127.0.0.1:6379> sadd myset v1 v2 v3
(integer) 3
2)查询集合所有元素

语法:smembers key 示例:

127.0.0.1:6379> smembers myset
1) "v1"
2) "v3"
3) "v2"
3)查询集合的成员数量

语法:scard key 示例:

127.0.0.1:6379> scard myset
(integer) 3
4)查询集合中是否包含某个元素

语法:sismember key member 示例:

127.0.0.1:6379> sismember myset v1
(integer) 1
127.0.0.1:6379> sismember myset v4
(integer) 0
5)从一个集合中移动一个元素到另一个集合

语法:smove source destination member 示例:

127.0.0.1:6379> smembers myset
1) "v1"
2) "v3"
3) "v2"
127.0.0.1:6379> smembers myset2
1) "v1"
2) "v8"
127.0.0.1:6379> smove myset myset2 v3
(integer) 1
127.0.0.1:6379> smembers myset2
1) "v1"
2) "v8"
3) "v3"
127.0.0.1:6379> smembers myset
1) "v1"
2) "v2"
6)移除集合中一个或多个元素

语法:srem key member [member …] 示例:

127.0.0.1:6379> smembers myset
1) "v4"
2) "v1"
3) "v3"
4) "v2"
5) "v5"
127.0.0.1:6379> srem myset v5
(integer) 1
127.0.0.1:6379> smembers myset
1) "v3"
2) "v2"
3) "v1"
4) "v4"

注意:使用 srem 指令,不存在的元素将会被忽略。

2. 代码实战

下面来看集合类型在 Java 中的使用,同样先添加 Jedis 框架,使用代码如下:

import redis.clients.jedis.Jedis;
import java.util.Set;

public class SetExample {
public static void main(String[] args) {
Jedis jedis = new Jedis("xxx.xxx.xxx.xxx", 6379);
jedis.auth("xxx");
// 创建集合并添加元素
jedis.sadd("set1", "java", "golang");
// 查询集合中的所有元素
Set<String> members = jedis.smembers("set1");
System.out.println(members); // 输出:[java, golang]
// 查询集合中的元素数量
System.out.println(jedis.scard("set1"));
// 移除集合中的一个元素
jedis.srem("set1", "golang");
System.out.println(jedis.smembers("set1")); // 输出:[java]
// 创建集合 set2 并添加元素
jedis.sadd("set2", "java", "golang");
// 查询两个集合中交集
Set<String> inters = jedis.sinter("set1", "set2");
System.out.println(inters); // 输出:[java]
// 查询两个集合中并集
Set<String> unions = jedis.sunion("set1", "set2");
System.out.println(unions); // 输出:[java,golang]
// 查询两个集合的错集
Set<String> diffs = jedis.sdiff("set2", "set1");
System.out.println(diffs); // 输出:[golang]
}
}

3. 内部实现

集合类型是由 intset (整数集合) 或 hashtable (普通哈希表) 组成的。当集合类型以 hashtable 存储时,哈希表的 key 为要插入的元素值,而哈希表的 value 则为 Null,如下图所示:

当集合中所有的值都为整数时,Redis 会使用 intset 结构来存储,如下代码所示:

127.0.0.1:6379> sadd myset 1 9 3 -2
(integer) 4
127.0.0.1:6379> object encoding myset
"intset"

从上面代码可以看出,当所有元素都为整数时,集合会以 intset 结构进行 (数据) 存储。 当发生以下两种情况时,会导致集合类型使用 hashtable 而非 intset 存储: 1)当元素的个数超过一定数量时,默认是 512 个,该值可通过命令 set-max-intset-entries xxx 来配置。 2)当元素为非整数时,集合将会使用 hashtable 来存储,如下代码所示:

127.0.0.1:6379> sadd myht "redis" "db"
(integer) 2
127.0.0.1:6379> object encoding myht
"hashtable"

从上面代码可以看出,当元素为非整数时,集合会使用 hashtable 进行存储

4. 源码解析

集合源码在 t_set.c 文件中,核心源码如下:

/* 
* 添加元素到集合
* 如果当前值已经存在,则返回 0 不作任何处理,否则就添加该元素,并返回 1。
*/
int setTypeAdd(robj *subject, sds value) {
long long llval;
if (subject->encoding == OBJ_ENCODING_HT) { // 字典类型
dict *ht = subject->ptr;
dictEntry *de = dictAddRaw(ht,value,NULL);
if (de) {
// 把 value 作为字典到 key,将 Null 作为字典到 value,将元素存入到字典
dictSetKey(ht,de,sdsdup(value));
dictSetVal(ht,de,NULL);
return 1;
}
} else if (subject->encoding == OBJ_ENCODING_INTSET) { // inset 数据类型
if (isSdsRepresentableAsLongLong(value,&llval) == C_OK) {
uint8_t success = 0;
subject->ptr = intsetAdd(subject->ptr,llval,&success);
if (success) {
// 超过 inset 的最大存储数量,则使用字典类型存储
if (intsetLen(subject->ptr) > server.set_max_intset_entries)
setTypeConvert(subject,OBJ_ENCODING_HT);
return 1;
}
} else {
// 转化为整数类型失败,使用字典类型存储
setTypeConvert(subject,OBJ_ENCODING_HT);

serverAssert(dictAdd(subject->ptr,sdsdup(value),NULL) == DICT_OK);
return 1;
}
} else {
// 未知编码(类型)
serverPanic("Unknown set encoding");
}
return 0;
}

以上这些代码验证了,我们上面所说的内容,当元素都为整数并且元素的个数没有到达设置的最大值时,键值的存储使用的是 intset 的数据结构,反之到元素超过了一定的范围,又或者是存储的元素为非整数时,集合会选择使用 hashtable 的数据结构进行存储。

5. 使用场景

集合类型的经典使用场景如下:

  • 微博关注我的人和我关注的人都适合用集合存储,可以保证人员不会重复;
  • 中奖人信息也适合用集合类型存储,这样可以保证一个人不会重复中奖。

集合类型是由整数集合 (intset) 或者是哈希表 (hashtable) 组成的,集合类型比较适合用来数据去重和保障数据的唯一性,除此之外,集合类型还可以用来统计多个集合的交集、错集和并集 。当我们存储的数据是无序并且需要去重的情况下,比较适合使用集合类型进行存储。

6. 更多集合操作命令

  • 移除并返回集合中的一个随机元素

    语法:spop key [count] 示例:

    127.0.0.1:6379> smembers myset
    1) "v1"
    2) "v2"
    127.0.0.1:6379> spop myset 1
    1) "v2"
    127.0.0.1:6379> smembers myset
    1) "v1"
  • 随机返回集合中指定数量的元素列表

    语法:srandmember key [count] 示例:

    127.0.0.1:6379> srandmember myset 2
    1) "v4"
    2) "v2"
  • 返回一个集合或多个集合的交集

    语法:sinter key [key …] 示例:

    127.0.0.1:6379> smembers myset
    1) "v1"
    2) "v3"
    3) "v2"
    127.0.0.1:6379> smembers myset2
    1) "v1"
    2) "v8"
    127.0.0.1:6379> sinter myset myset2
    1) "v1"
  • 把集合的交集复制到新的集合中

    语法:sinterstore destination key [key …] 示例:

    127.0.0.1:6379> smembers myset
    1) "v1"
    2) "v3"
    3) "v2"
    127.0.0.1:6379> smembers myset2
    1) "v1"
    2) "v8"
    127.0.0.1:6379> sinterstore myset3 myset myset2
    (integer) 1
    127.0.0.1:6379> smembers myset3
    1) "v1"

    命令解析:从以上代码可以看出,我们把集合 myset 和 集合 myset2 的合集元素 v1 复制到了新的集合 myset3 中,但 v1 并不会从原有集合中移除。

  • 查询一个或多个集合的并集

    语法:sunion key [key …] 示例:

    127.0.0.1:6379> smembers group1
    1) "java"
    127.0.0.1:6379> smembers group2
    1) "golang"
    127.0.0.1:6379> sunion group1 group2
    1) "java"
    2) "golang"
  • 把一个或多个集合的并集复制到新集合中

    语法:sunionstore destination key [key …] 示例:

    127.0.0.1:6379> smembers group1
    1) "java"
    127.0.0.1:6379> smembers group2
    1) "golang"
    127.0.0.1:6379> sunionstore group3 group1 group2
    (integer) 2
    127.0.0.1:6379> smembers group3
    1) "java"
    2) "golang"

    注意:只是把一个或多个集合的并集复制到新集合中,并不会在原集合中删除复制的元素。

  • 查询一个或多个集合的错集

    语法:sdiff key [key …] 示例:

    127.0.0.1:6379> smembers group1
    1) "java"
    2) "golang"
    127.0.0.1:6379> smembers group2
    1) "golang"
    127.0.0.1:6379> sdiff group1 group2
    1) "java"

    注意:执行命令时集合的先后顺序会影响返回的结果,如下所示:

    127.0.0.1:6379> sdiff group1 group2
    1) "java"
    127.0.0.1:6379> sdiff group2 group1
    (empty list or set)

    这是因为查询错集是以第一个集合为主的,当第二个元素包含第一个元素时,查询的错集结果就是空。

  • 把一个或多个集合的错集复制到新集合

    语法:sdiffstore destination key [key …] 示例:

    127.0.0.1:6379> smembers group1
    1) "java"
    2) "golang"
    127.0.0.1:6379> smembers group2
    1) "golang"
    127.0.0.1:6379> sdiffstore group3 group1 group2
    (integer) 1
    127.0.0.1:6379> smembers group3
    1) "java"

3.5 - 有序集合类型 (Sorted Set)

有序集合类型 (Sorted Set) 相比于集合类型多了一个排序属性 score(分值),对于有序集合 ZSet 来说,每个存储元素相当于有两个值组成的,一个是有序结合的元素值,一个是排序值。有序集合的存储元素值也是不能重复的,但分值是可以重复的。

当我们把学生的成绩存储在有序集合中时,它的存储结构如下图所示:

1. 基础使用

1)添加一个或多个元素

语法:zadd key [NX|XX] [CH] [INCR] score member [score member …] 示例:

127.0.0.1:6379> zadd zset1 10 java
(integer) 1
127.0.0.1:6379> zadd zset1 3 golang 4 sql 1 redis
(integer) 3

可以看出有序集合的添加是 zadd 键值 分值1 元素值1 分值2 元素值2 的形式添加的。

2)查询所有元素列表

语法:zrange key start stop [WITHSCORES] 示例:

127.0.0.1:6379> zrange zset 0 -1
1) "redis"
2) "mysql"
3) "java"

其中 -1 表示最后一个元素,查询结果包含开始和结束元素。

3)删除一个或多个元素 (根据元素值)

语法:zrem key member [member …] 示例:

127.0.0.1:6379> zrangebyscore zset1 0 -1 #查询所有元素
1) "golang"
2) "redis"
3) "sql"
4) "java"
127.0.0.1:6379> zrem zset1 redis sql #删除元素:reids、sql
(integer) 2
127.0.0.1:6379> zrange zset1 0 -1 #查询所有元素
1) "golang"
2) "java"

删除命令中如果包含了不存在的元素,并不会影响命令的正常执行,不存在的元素将会被忽略。

4)查询某元素的 score 值

语法:zscore key member 示例:

127.0.0.1:6379> zscore zset1 redis
"1"
5)查询 score 区间内元素

语法:zrangebyscore key min max [WITHSCORES] [LIMIT offset count] 示例:

127.0.0.1:6379> zrangebyscore zset1 3 10
1) "golang"
2) "redis"
3) "sql"
4) "java"
6)查询某元素排名

语法:zrank key member 示例:

127.0.0.1:6379> zadd zset 5 redis 10 java 8 mysql #创建有序集合
(integer) 3
127.0.0.1:6379> zrank zset java #查询元素排序
(integer) 2
127.0.0.1:6379> zrank zset redis
(integer) 0

可以看出,排名是从 0 开始的,排名可以理解为元素排序后的下标值。

2. 代码实战

下面来看有序集合在 Java 中的使用,同样先添加 Jedis 框架,示例代码如下:

import redis.clients.jedis.Jedis;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

public class ZSetExample {
public static void main(String[] args) {
Jedis jedis = new Jedis("127.0.0.1", 6379);
Map<String, Double> map = new HashMap<>();
map.put("小明", 80.5d);
map.put("小红", 75d);
map.put("老王", 85d);
// 为有序集合(ZSet)添加元素
jedis.zadd("grade", map);
// 查询分数在 80 分到 100 分之间的人(包含 80 分和 100 分)
Set<String> gradeSet = jedis.zrangeByScore("grade", 80, 100);
System.out.println(gradeSet); // 输出:[小明, 老王]
// 查询小红的排名(排名从 0 开始)
System.out.println(jedis.zrank("grade", "小明")); // 输出:1
// 从集合中移除老王
jedis.zrem("grade", "老王");
// 查询有序集合中的所有元素(根据排名从小到大)
Set<String> range = jedis.zrange("grade", 0, -1);
System.out.println(range); // 输出:[小红, 小明]
// 查询有序集合中的所有元素(根据 score 从小到大)
Set<String> rangeByScore = jedis.zrangeByScore("grade", 0, 100);
System.out.println(rangeByScore);
}
}

3. 内部实现

有序集合是由 ziplist (压缩列表) 或 skiplist (跳跃表) 组成的。

1)ziplist

当数据比较少时,有序集合使用的是 ziplist 存储的,如下代码所示:

127.0.0.1:6379> zadd myzset 1 db 2 redis 3 mysql
(integer) 3
127.0.0.1:6379> object encoding myzset
"ziplist"

从结果可以看出,有序集合把 myset 键值对存储在 ziplist 结构中了。 有序集合使用 ziplist 格式存储必须满足以下两个条件:

  • 有序集合保存的元素个数要小于 128 个;
  • 有序集合保存的所有元素成员的长度都必须小于 64 字节。

如果不能满足以上两个条件中的任意一个,有序集合将会使用 skiplist 结构进行存储。 接下来我们来测试以下,当有序集合中某个元素长度大于 64 字节时会发生什么情况? 代码如下:

127.0.0.1:6379> zadd zmaxleng 1.0 redis
(integer) 1
127.0.0.1:6379> object encoding zmaxleng
"ziplist"
127.0.0.1:6379> zadd zmaxleng 2.0 aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
(integer) 1
127.0.0.1:6379> object encoding zmaxleng
"skiplist"

通过以上代码可以看出,当有序集合保存的所有元素成员的长度大于 64 字节时,有序集合就会从 ziplist 转换成为 skiplist。

小贴士:可以通过配置文件中的 zset-max-ziplist-entries(默认 128)和 zset-max-ziplist-value(默认 64)来设置有序集合使用 ziplist 存储的临界值。

2)skiplist

skiplist 数据编码底层是使用 zset 结构实现的,而 zset 结构中包含了一个字典和一个跳跃表,源码如下:

typedef struct zset {
dict *dict;
zskiplist *zsl;
} zset;

更多关于跳跃表的源码实现,会在后面的章节详细介绍。

① 跳跃表实现原理

跳跃表的结构如下图所示:

根据以上图片展示,当我们在跳跃表中查询值 32 时,执行流程如下:

  • 从最上层开始找,1 比 32 小,在当前层移动到下一个节点进行比较;
  • 7 比 32 小,当前层移动下一个节点比较,由于下一个节点指向 Null,所以以 7 为目标,移动到下一层继续向后比较;
  • 18 小于 32,继续向后移动查找,对比 77 大于 32,以 18 为目标,移动到下一层继续向后比较;
  • 对比 32 等于 32,值被顺利找到。

从上面的流程可以看出,跳跃表会想从最上层开始找起,依次向后查找,如果本层的节点大于要找的值,或者本层的节点为 Null 时,以上一个节点为目标,往下移一层继续向后查找并循环此流程,直到找到该节点并返回,如果对比到最后一个元素仍未找到,则返回 Null。

② 为什么是跳跃表?而非红黑树?

因为跳跃表的性能和红黑树基本相近,但却比红黑树更好实现,所有 Redis 的有序集合会选用跳跃表来实现存储。

4. 使用场景

有序集合的经典使用场景如下:

  • 学生成绩排名
  • 粉丝列表,根据关注的先后时间排序

有序集合具有唯一性和排序的功能,排序功能是借助分值字段 score 实现的,score 字段不仅可以实现排序功能,还可以实现数据的赛选与过滤的功能。我们还了解到了有序集合是由 压缩列表 (ziplist) 或跳跃列表 (skiplist) 来存储的,当元素个数小于 128 个,并且所有元素的值都小于 64 字节时,有序集合会采取 ziplist 来存储,反之则会用 skiplist 来存储,其中 skiplist 是从上往下、从前往后进行元素查找的,相比于传统的普通列表,可能会快很多,因为普通列表只能从前往后依次查找。

6. 更多有序集合操作命令

  • 查询有序集合的总个数

    语法:zcard key 示例:

    127.0.0.1:6379> zcard zset1
    (integer) 4
  • 查询 score 区间内的元素个数

    语法:zcount key min max 示例:

    127.0.0.1:6379> zcount zset1 0 10
    (integer) 4
  • 累加元素的 score 值

    语法:zincrby key increment member 示例:

    127.0.0.1:6379> zscore zset1 redis #查询 zset1 的 score 值
    "1"
    127.0.0.1:6379> zincrby zset1 2 redis #累加 score 值
    "3"
    127.0.0.1:6379> zscore zset1 redis
    "3"
  • 查询某元素倒序排名

    语法:zrevrank key member 示例:

    127.0.0.1:6379> zrevrank zset1 python #倒序查询
    (integer) 0
    127.0.0.1:6379> zrange zset1 0 -1 #正序列表
    1) "redis"
    2) "java"
    3) "golang"
    4) "python"
  • 根据排名删除元素

    语法:zremrangebyrank key start stop 示例:

    127.0.0.1:6379> zrange zset1 0 -1 #查询所有元素
    1) "redis"
    2) "java"
    3) "golang"
    4) "python"
    127.0.0.1:6379> zremrangebyrank zset1 0 2 #删除元素
    (integer) 3
    127.0.0.1:6379> zrange zset1 0 -1 #查询所有元素
    1) "python"
  • 删除 score 区间内的元素

    语法:zremrangebyscore key min max 示例:

    127.0.0.1:6379> zscore zset1 python
    "4"
    127.0.0.1:6379> zremrangebyscore zset1 4 5
    (integer) 1
    127.0.0.1:6379> zscore zset1 python
    (nil)
  • 复制交集元素到新集合

    语法:zinterstore destination numkeys key [key …] [WEIGHTS weight] [AGGREGATE SUM|MIN|MA 参数 numkeys 表示需要几个集合参与查询。 示例:

    127.0.0.1:6379> zrange zset1 0 -1
    1) "redis"
    2) "java"
    3) "golang"
    4) "python"
    127.0.0.1:6379> zrange zset2 0 -1
    1) "redis"
    2) "db"
    127.0.0.1:6379> zinterstore zset3 2 zset1 zset2
    (integer) 1
    127.0.0.1:6379> zrange zset3 0 -1
    1) "redis"
  • 复制并集元素到新集合

    语法:zunionstore destination numkeys key [key …] [WEIGHTS weight] [AGGREGATE SUM|MIN|MA 示例:

    127.0.0.1:6379> zrange zset1 0 -1
    1) "redis"
    2) "java"
    3) "golang"
    4) "python"
    127.0.0.1:6379> zrange zset2 0 -1
    1) "redis"
    2) "db"
    127.0.0.1:6379> zunionstore zset3 2 zset1 zset2
    (integer) 5
    127.0.0.1:6379> zrange zset3 0 -1
    1) "java"
    2) "golang"
    3) "redis"
    4) "python"
    5) "db"

4-Redis 持久化机制

Redis 的读写都是在内存中,所以它的性能较高,但在内存中的数据会随着服务器的重启而丢失,为了保证数据不丢失,我们需要将内存中的数据存储到磁盘,以便 Redis 重启时能够从磁盘中恢复原有的数据,而整个过程就叫做 Redis 持久化。

也是 Redis 和 Memcached 的主要区别之一,因为 Memcached 不具备持久化功能。

Redis 持久化拥有以下三种方式:

  • 快照方式(RDB, Redis DataBase)将某一个时刻的内存数据,以二进制的方式写入磁盘;
  • 文件追加方式(AOF, Append Only File),记录所有的操作命令,并以文本的形式追加到文件中;
  • 混合持久化方式,Redis 4.0 之后新增的方式,混合持久化是结合了 RDB 和 AOF 的优点,在写入的时候,先把当前的数据以 RDB 的形式写入文件的开头,再将后续的操作命令以 AOF 的格式存入文件,这样既能保证 Redis 重启时的速度,又能减低数据丢失的风险。

4.1-RDB

RDB(Redis DataBase)是将某一个时刻的内存快照(Snapshot),以二进制的方式写入磁盘的过程。

1. 持久化触发

RDB 的持久化触发方式有两类:一类是手动触发,另一类是自动触发。

1)手动触发

手动触发持久化的操作有两个: savebgsave ,它们主要区别体现在:是否阻塞 Redis 主线程的执行。

① save 命令

在客户端中执行 save 命令,就会触发 Redis 的持久化,但同时也是使 Redis 处于阻塞状态,直到 RDB 持久化完成,才会响应其他客户端发来的命令,所以在生产环境一定要慎用

save 命令使用如下:

从图片可以看出,当执行完 save 命令之后,持久化文件 dump.rdb 的修改时间就变了,这就表示 save 成功的触发了 RDB 持久化。 save 命令执行流程,如下图所示:

② bgsave 命令

bgsave(background save)既后台保存的意思, 它和 save 命令最大的区别就是 bgsave 会 fork () 一个子进程来执行持久化,整个过程中只有在 fork () 子进程时有短暂的阻塞,当子进程被创建之后,Redis 的主进程就可以响应其他客户端的请求了,相对于整个流程都阻塞的 save 命令来说,显然 bgsave 命令更适合我们使用。 bgsave 命令使用,如下图所示:

bgsave 执行流程,如下图所示:

2)自动触发

说完了 RDB 的手动触发方式,下面来看如何自动触发 RDB 持久化? RDB 自动持久化主要来源于以下几种情况。

① save m n

save m n 是指在 m 秒内,如果有 n 个键发生改变,则自动触发持久化。 参数 m 和 n 可以在 Redis 的配置文件中找到,例如,save 60 1 则表明在 60 秒内,至少有一个键发生改变,就会触发 RDB 持久化。 自动触发持久化,本质是 Redis 通过判断,如果满足设置的触发条件,自动执行一次 bgsave 命令。 注意:当设置多个 save m n 命令时,满足任意一个条件都会触发持久化。 例如,我们设置了以下两个 save m n 命令:

  • save 60 10
  • save 600 1

当 60s 内如果有 10 次 Redis 键值发生改变,就会触发持久化;如果 60s 内 Redis 的键值改变次数少于 10 次,那么 Redis 就会判断 600s 内,Redis 的键值是否至少被修改了一次,如果满足则会触发持久化。

② flushall

flushall 命令用于清空 Redis 数据库,在生产环境下一定慎用,当 Redis 执行了 flushall 命令之后,则会触发自动持久化,把 RDB 文件清空。 执行结果如下图所示:

③ 主从同步触发

在 Redis 主从复制中,当从节点执行全量复制操作时,主节点会执行 bgsave 命令,并将 RDB 文件发送给从节点,该过程会自动触发 Redis 持久化。

2. 配置说明

合理的设置 RDB 的配置,可以保障 Redis 高效且稳定的运行,下面一起来看 RDB 的配置项都有哪些?

RDB 配置参数可以在 Redis 的配置文件中找见,具体内容如下:

# RDB 保存的条件
save 900 1
save 300 10
save 60 10000

# bgsave 失败之后,是否停止持久化数据到磁盘,yes 表示停止持久化,no 表示忽略错误继续写文件。
stop-writes-on-bgsave-error yes

# RDB 文件压缩
rdbcompression yes

# 写入文件和读取文件时是否开启 RDB 文件检查,检查是否有无损坏,如果在启动是检查发现损坏,则停止启动。
rdbchecksum yes

# RDB 文件名
dbfilename dump.rdb

# RDB 文件目录
dir ./

其中比较重要的参数如下列表:

① save 参数 它是用来配置触发 RDB 持久化条件的参数,满足保存条件时将会把数据持久化到硬盘。 默认配置说明如下:

  • save 900 1:表示 900 秒内如果至少有 1 个 key 值变化,则把数据持久化到硬盘;
  • save 300 10:表示 300 秒内如果至少有 10 个 key 值变化,则把数据持久化到硬盘;
  • save 60 10000:表示 60 秒内如果至少有 10000 个 key 值变化,则把数据持久化到硬盘。

② rdbcompression 参数 它的默认值是 yes 表示开启 RDB 文件压缩,Redis 会采用 LZF 算法进行压缩。如果不想消耗 CPU 性能来进行文件压缩的话,可以设置为关闭此功能,这样的缺点是需要更多的磁盘空间来保存文件。

③ rdbchecksum 参数 它的默认值为 yes 表示写入文件和读取文件时是否开启 RDB 文件检查,检查是否有无损坏,如果在启动是检查发现损坏,则停止启动。

3. 配置查询

Redis 中可以使用命令查询当前配置参数。查询命令的格式为:config get xxx ,例如,想要获取 RDB 文件的存储名称设置,可以使用 config get dbfilename ,执行效果如下图所示:

查询 RDB 的文件目录,可使用命令 config get dir ,执行效果如下图所示:

4. 配置设置

设置 RDB 的配置,可以通过以下两种方式:

  • 手动修改 Redis 配置文件;
  • 使用命令行设置,例如,使用 config set dir "/usr/data" 就是用于修改 RDB 的存储目录。

注意:手动修改 Redis 配置文件的方式是全局生效的,即重启 Redis 服务器设置参数也不会丢失,而使用命令修改的方式,在 Redis 重启之后就会丢失。但手动修改 Redis 配置文件,想要立即生效需要重启 Redis 服务器,而命令的方式则不需要重启 Redis 服务器。

小贴士:Redis 的配置文件位于 Redis 安装目录的根路径下,默认名称为 redis.conf。

5. RDB 文件恢复

当 Redis 服务器启动时,如果 Redis 根目录存在 RDB 文件 dump.rdb,Redis 就会自动加载 RDB 文件恢复持久化数据。 如果根目录没有 dump.rdb 文件,请先将 dump.rdb 文件移动到 Redis 的根目录。 验证 RDB 文件是否被加载 Redis 在启动时有日志信息,会显示是否加载了 RDB 文件,我们执行 Redis 启动命令:src/redis-server redis.conf ,如下图所示:

从日志上可以看出, Redis 服务在启动时已经正常加载了 RDB 文件。

小贴士:Redis 服务器在载入 RDB 文件期间,会一直处于阻塞状态,直到载入工作完成为止。

6. RDB 的优缺点

1)RDB 优点
  • RDB 的内容为二进制的数据,占用内存更小,更紧凑,更适合做为备份文件;
  • RDB 对灾难恢复非常有用,它是一个紧凑的文件,可以更快的传输到远程服务器进行 Redis 服务恢复;
  • RDB 可以更大程度的提高 Redis 的运行速度,因为每次持久化时 Redis 主进程都会 fork () 一个子进程,进行数据持久化到磁盘,Redis 主进程并不会执行磁盘 I/O 等操作;
  • 与 AOF 格式的文件相比,RDB 文件可以更快的重启。
2)RDB 缺点
  • 因为 RDB 只能保存某个时间间隔的数据,如果中途 Redis 服务被意外终止了,则会丢失一段时间内的 Redis 数据;
  • RDB 需要经常 fork () 才能使用子进程将其持久化在磁盘上。如果数据集很大,fork () 可能很耗时,并且如果数据集很大且 CPU 性能不佳,则可能导致 Redis 停止为客户端服务几毫秒甚至一秒钟。

7. 禁用持久化

禁用持久化可以提高 Redis 的执行效率,如果对数据丢失不敏感的情况下,可以在连接客户端的情况下,执行 config set save "" 命令即可禁用 Redis 的持久化。

4.2-AOF

使用 RDB 持久化有一个风险,它可能会造成最新数据丢失的风险。因为 RDB 的持久化有一定的时间间隔,在这个时间段内如果 Redis 服务意外终止的话,就会造成最新的数据全部丢失。

可能会操作 Redis 服务意外终止的条件:

  • 安装 Redis 的机器停止运行,蓝屏或者系统崩溃;
  • 安装 Redis 的机器出现电源故障,例如突然断电;
  • 使用 kill -9 Redis_PID 等。

那么如何解决以上的这些问题呢?Redis 为我们提供了另一种持久化的方案 ——AOF。

AOF(Append Only File)中文是附加到文件,顾名思义 AOF 可以把 Redis 每个键值对操作都记录到文件(appendonly.aof)中。

2. 持久化查询和设置

1)查询 AOF 启动状态

使用 config get appendonly 命令,如下图所示:

![](https://gitee.com/lemon-cs/images/raw/master/ 查询 AOF 启动状态.png)

其中,第一行为 AOF 文件的名称,而最后一行表示 AOF 启动的状态,yes 表示已启动,no 表示未启动。

2)开启 AOF 持久化

Redis 默认是关闭 AOF 持久化的,想要开启 AOF 持久化,有以下两种方式:

  • 通过命令行的方式;
  • 通过修改配置文件的方式(redis.conf)。

下面分别来看以上两种方式的实现。

① 命令行启动 AOF

命令行启动 AOF,使用 config set appendonly yes 命令。

命令行启动 AOF 的优缺点:命令行启动优点是无需重启 Redis 服务,缺点是如果 Redis 服务重启,则之前使用命令行设置的配置就会失效。

② 配置文件启动 AOF

Redis 的配置文件在它的根路径下的 redis.conf 文件中,获取 Redis 的根目录可以使用命令 config get dir 获取,只需要在配置文件中设置 appendonly yes 即可,默认 appendonly no 表示关闭 AOF 持久化。

配置文件启动 AOF 的优缺点:修改配置文件的缺点是每次修改配置文件都要重启 Redis 服务才能生效,优点是无论重启多少次 Redis 服务,配置文件中设置的配置信息都不会失效。

3. 触发持久化

AOF 持久化开启之后,只要满足一定条件,就会触发 AOF 持久化。AOF 的触发条件分为两种:自动触发和手动触发。

1)自动触发

有两种情况可以自动触发 AOF 持久化,分为是:满足 AOF 设置的策略触发满足 AOF 重写触发。其中,AOF 重写触发会在本文的后半部分详细介绍,这里重点来说 AOF 持久化策略都有哪些。 AOF 持久化策略,分为以下三种:

  • always:每条 Redis 操作命令都会写入磁盘,最多丢失一条数据;
  • everysec:每秒钟写入一次磁盘,最多丢失一秒的数据;
  • no:不设置写入磁盘的规则,根据当前操作系统来决定何时写入磁盘,Linux 默认 30s 写入一次数据至磁盘。

这三种配置可以在 Redis 的配置文件(redis.conf)中设置,如下代码所示:

# 开启每秒写入一次的持久化策略
appendfsync everysec

小贴士:因为每次写入磁盘都会对 Redis 的性能造成一定的影响,所以要根据用户的实际情况设置相应的策略,一般设置每秒写入一次磁盘的频率就可以满足大部分的使用场景了。

触发自动持久化的两种情况,如下图所示:

2) 手动触发

在客户端执行 bgrewriteaof 命令就可以手动触发 AOF 持久化,如下图所示:

可以看出执行完 bgrewriteaof 命令之后,AOF 持久化就会被触发。

4. AOF 文件重写

AOF 是通过记录 Redis 的执行命令来持久化(保存)数据的,所以随着时间的流逝 AOF 文件会越来越多,这样不仅增加了服务器的存储压力,也会造成 Redis 重启速度变慢,为了解决这个问题 Redis 提供了 AOF 重写的功能。

1)什么是 AOF 重写?

AOF 重写指的是它会直接读取 Redis 服务器当前的状态,并压缩保存为 AOF 文件。例如,我们增加了一个计数器,并对它做了 99 次修改,如果不做 AOF 重写的话,那么持久化文件中就会有 100 条记录执行命令的信息,而 AOF 重写之后,之后记录一条此计数器最终的结果信息,这样就去除了所有的无效信息。

2)AOF 重写实现

触发 AOF 文件重写,要满足两个条件,这两个条件也是配置在 Redis 配置文件中的,它们分别:

  • auto-aof-rewrite-min-size:允许 AOF 重写的最小文件容量,默认是 64mb 。
  • auto-aof-rewrite-percentage:AOF 文件重写的大小比例,默认值是 100,表示 100%,也就是只有当前 AOF 文件,比最后一次(上次)的 AOF 文件大一倍时,才会启动 AOF 文件重写。

查询 auto-aof-rewrite-min-size 和 auto-aof-rewrite-percentage 的值,可使用 config get xxx 命令,如下图所示:

![](https://gitee.com/lemon-cs/images/raw/master/AOF 重写配置.png)

小贴士:只有同时满足 auto-aof-rewrite-min-size 和 auto-aof-rewrite-percentage 设置的条件,才会触发 AOF 文件重写。

注意:使用 bgrewriteaof 命令,可以自动触发 AOF 文件重写。

3)AOF 重写流程

AOF 文件重写是生成一个全新的文件,并把当前数据的最少操作命令保存到新文件上,当把所有的数据都保存至新文件之后,Redis 会交换两个文件,并把最新的持久化操作命令追加到新文件上。

5. 配置说明

合理的设置 AOF 的配置,可以保障 Redis 高效且稳定的运行,以下是 AOF 的全部配置信息和说明。

AOF 的配置参数在 Redis 的配置文件中,也就是 Redis 根路径下的 redis.conf 文件中,配置参数和说明如下:

# 是否开启 AOF,yes 为开启,默认是关闭
appendonly no

# AOF 默认文件名
appendfilename "appendonly.aof"

# AOF 持久化策略配置
# appendfsync always
appendfsync everysec
# appendfsync no

# AOF 文件重写的大小比例,默认值是 100,表示 100%,也就是只有当前 AOF 文件,比最后一次的 AOF 文件大一倍时,才会启动 AOF 文件重写。
auto-aof-rewrite-percentage 100

# 允许 AOF 重写的最小文件容量
auto-aof-rewrite-min-size 64mb

# 是否开启启动时加载 AOF 文件效验,默认值是 yes,表示尽可能的加载 AOF 文件,忽略错误部分信息,并启动 Redis 服务。
# 如果值为 no,则表示,停止启动 Redis,用户必须手动修复 AOF 文件才能正常启动 Redis 服务。
aof-load-truncated yes

其中比较重要的是 appendfsync 参数,用它来设置 AOF 的持久化策略,可以选择按时间间隔或者操作次数来存储 AOF 文件,这个参数的三个值在文章开头有说明,这里就不再复述了。

6. 数据恢复

1)正常数据恢复

正常情况下,只要开启了 AOF 持久化,并且提供了正常的 appendonly.aof 文件,在 Redis 启动时就会自定加载 AOF 文件并启动,执行如下图所示:

其中 DB loaded from append only file...... 表示 Redis 服务器在启动时,先去加载了 AOF 持久化文件。

小贴士:默认情况下 appendonly.aof 文件保存在 Redis 的根目录下。

持久化文件加载规则

  • 如果只开启了 AOF 持久化,Redis 启动时只会加载 AOF 文件(appendonly.aof),进行数据恢复;
  • 如果只开启了 RDB 持久化,Redis 启动时只会加载 RDB 文件(dump.rdb),进行数据恢复;
  • 如果同时开启了 RDB 和 AOF 持久化,Redis 启动时只会加载 AOF 文件(appendonly.aof),进行数据恢复。

在 AOF 开启的情况下,即使 AOF 文件不存在,只有 RDB 文件,也不会加载 RDB 文件。 AOF 和 RDB 的加载流程如下图所示:

2)简单异常数据恢复

在 AOF 写入文件时如果服务器崩溃,或者是 AOF 存储已满的情况下,AOF 的最后一条命令可能被截断,这就是异常的 AOF 文件。

在 AOF 文件异常的情况下,如果为修改 Redis 的配置文件,也就是使用 aof-load-truncated 等于 yes 的配置,Redis 在启动时会忽略最后一条命令,并顺利启动 Redis,执行结果如下:

* Reading RDB preamble from AOF file...
* Reading the remaining AOF tail...
# !!! Warning: short read while loading the AOF file !!!
# !!! Truncating the AOF at offset 439 !!!
# AOF loaded anyway because aof-load-truncated is enabled
3)复杂异常数据恢复

AOF 文件可能出现更糟糕的情况,当 AOF 文件不仅被截断,而且中间的命令也被破坏,这个时候再启动 Redis 会提示错误信息并中止运行,错误信息如下:

* Reading the remaining AOF tail...
# Bad file format reading the append only file: make a backup of your AOF file, then use ./redis-check-aof --fix <filename>

出现此类问题的解决方案如下:

  1. 首先使用 AOF 修复工具,检测出现的问题,在命令行中输入 redis-check-aof 命令,它会跳转到出现问题的命令行,这个时候可以尝试手动修复此文件;
  2. 如果无法手动修复,我们可以使用 redis-check-aof --fix 自动修复 AOF 异常文件,不过执行此命令,可能会导致异常部分至文件末尾的数据全部被丢弃。

7. 优缺点

1) AOF 优点
  • AOF 持久化保存的数据更加完整,AOF 提供了三种保存策略:每次操作保存、每秒钟保存一次、跟随系统的持久化策略保存,其中每秒保存一次,从数据的安全性和性能两方面考虑是一个不错的选择,也是 AOF 默认的策略,即使发生了意外情况,最多只会丢失 1s 钟的数据;
  • AOF 采用的是命令追加的写入方式,所以不会出现文件损坏的问题,即使由于某些意外原因,导致了最后操作的持久化数据写入了一半,也可以通过 redis-check-aof 工具轻松的修复;
  • AOF 持久化文件,非常容易理解和解析,它是把所有 Redis 键值操作命令,以文件的方式存入了磁盘。即使不小心使用 flushall 命令删除了所有键值信息,只要使用 AOF 文件,删除最后的 flushall 命令,重启 Redis 即可恢复之前误删的数据。
2) AOF 缺点
  • 对于相同的数据集来说,AOF 文件要大于 RDB 文件;
  • 在 Redis 负载比较高的情况下,RDB 比 AOF 性能更好;
  • RDB 使用快照的形式来持久化整个 Redis 数据,而 AOF 只是将每次执行的命令追加到 AOF 文件中,因此从理论上说,RDB 比 AOF 更健壮。

AOF 保存数据更加完整,它可以记录每次 Redis 的键值变化,或者是选择每秒保存一次数据。AOF 的持久化文件更加易读,但相比与二进制的 RDB 来说,所占的存储空间也越大,为了解决这个问题,AOF 提供自动化重写机制,最大程度的减少了 AOF 占用空间大的问题。同时 AOF 也提供了很方便的异常文件恢复命令: redis-check-aof --fix ,为使用 AOF 提供了很好的保障。

4.3 - 混合持久化

RDB 和 AOF 持久化各有利弊,RDB 可能会导致一定时间内的数据丢失,而 AOF 由于文件较大则会影响 Redis 的启动速度,为了能同时使用 RDB 和 AOF 各种的优点,Redis 4.0 之后新增了混合持久化的方式。

在开启混合持久化的情况下,AOF 重写时会把 Redis 的持久化数据,以 RDB 的格式写入到 AOF 文件的开头,之后的数据再以 AOF 的格式化追加的文件的末尾。

混合持久化的数据存储结构如下图所示:

1. 开启混合持久化

查询是否开启混合持久化可以使用 config get aof-use-rdb-preamble 命令,执行结果如下图所示:

其中 yes 表示已经开启混合持久化,no 表示关闭,Redis 5.0 默认值为 yes。 如果是其他版本的 Redis 首先需要检查一下,是否已经开启了混合持久化,如果关闭的情况下,可以通过以下两种方式开启:

  • 通过命令行开启
  • 通过修改 Redis 配置文件开启
1)通过命令行开启

使用命令 config set aof-use-rdb-preamble yes

小贴士:命令行设置配置的缺点是重启 Redis 服务之后,设置的配置就会失效。

2)通过修改 Redis 配置文件开启

在 Redis 的根路径下找到 redis.conf 文件,把配置文件中的 aof-use-rdb-preamble no 改为 aof-use-rdb-preamble yes 如下图所示:

2. 实例运行

当在混合持久化关闭的情况下,使用 bgrewriteaof 触发 AOF 文件重写之后,查看 appendonly.aof 文件的持久化日志,如下图所示:

可以看出,当混合持久化关闭的情况下 AOF 持久化文件存储的为标准的 AOF 格式的文件。 当混合持久化开启的模式下,使用 bgrewriteaof 命令触发 AOF 文件重写,得到 appendonly.aof 的文件内容如下图所示:

可以看出 appendonly.aof 文件存储的内容是 REDIS 开头的 RDB 格式的内容,并非为 AOF 格式的日志。

3. 数据恢复和源码解析

混合持久化的数据恢复和 AOF 持久化过程是一样的,只需要把 appendonly.aof 放到 Redis 的根目录,在 Redis 启动时,只要开启了 AOF 持久化,Redis 就会自动加载并恢复数据。 Redis 启动信息如下图所示:

可以看出 Redis 在服务器初始化的时候加载了 AOF 文件的内容。

1)混合持久化的加载流程

混合持久化的加载流程如下:

  1. 判断是否开启 AOF 持久化,开启继续执行后续流程,未开启执行加载 RDB 文件的流程;
  2. 判断 appendonly.aof 文件是否存在,文件存在则执行后续流程;
  3. 判断 AOF 文件开头是 RDB 的格式,先加载 RDB 内容再加载剩余的 AOF 内容;
  4. 判断 AOF 文件开头不是 RDB 的格式,直接以 AOF 格式加载整个文件。

AOF 加载流程图如下图所示:

![](https://gitee.com/lemon-cs/images/raw/master/AOF 加载流程图.png)

2)源码解析

Redis 判断 AOF 文件的开头是否是 RDB 格式的,是通过关键字 REDIS 判断的,RDB 文件的开头一定是 REDIS 关键字开头的,判断源码在 Redis 的 src/aof.c 中,核心代码如下所示:

char sig[5]; /* "REDIS" */
if (fread(sig,1,5,fp) != 5 || memcmp(sig,"REDIS",5) != 0) {
// AOF 文件开头非 RDB 格式,非混合持久化文件
if (fseek(fp,0,SEEK_SET) == -1) goto readerr;
} else {
/* RDB preamble. Pass loading the RDB functions. */
rio rdb;

serverLog(LL_NOTICE,"Reading RDB preamble from AOF file...");
if (fseek(fp,0,SEEK_SET) == -1) goto readerr;
rioInitWithFile(&rdb,fp);
// AOF 文件开头是 RDB 格式,先加载 RDB 再加载 AOF
if (rdbLoadRio(&rdb,NULL,1) != C_OK) {
serverLog(LL_WARNING,"Error reading the RDB preamble of the AOF file, AOF loading aborted");
goto readerr;
} else {
serverLog(LL_NOTICE,"Reading the remaining AOF tail...");
}
}
// 加载 AOF 格式的数据

可以看出 Redis 是通过判断 AOF 文件的开头是否是 REDIS 关键字,来确定此文件是否为混合持久化文件的。

小贴士:AOF 格式的开头是 *,而 RDB 格式的开头是 REDIS。

混合持久化优点:
  • 混合持久化结合了 RDB 和 AOF 持久化的优点,开头为 RDB 的格式,使得 Redis 可以更快的启动,同时结合 AOF 的优点,有减低了大量数据丢失的风险。
混合持久化缺点:
  • AOF 文件中添加了 RDB 格式的内容,使得 AOF 文件的可读性变得很差;
  • 兼容性差,如果开启混合持久化,那么此混合持久化 AOF 文件,就不能用在 Redis 4.0 之前版本了。

5. 持久化最佳实践

持久化虽然保证了数据不丢失,但同时拖慢了 Redis 的运行速度,那怎么更合理的使用 Redis 的持久化功能呢? Redis 持久化的最佳实践可从以下几个方面考虑。

1)控制持久化开关

使用者可根据实际的业务情况考虑,如果对数据的丢失不敏感的情况下,可考虑关闭 Redis 的持久化,这样所以的键值操作都在内存中,就可以保证最高效率的运行 Redis 了。 持久化关闭操作:

  • 关闭 RDB 持久化,使用命令: config set save ""
  • 关闭 AOF 和 混合持久化,使用命令: config set appendonly no
2)主从部署

使用主从部署,一台用于响应主业务,一台用于数据持久化,这样就可能让 Redis 更加高效的运行。

3)使用混合持久化

混合持久化结合了 RDB 和 AOF 的优点,Redis 5.0 默认是开启的。

4)使用配置更高的机器

Redis 对 CPU 的要求并不高,反而是对内存和磁盘的要求很高,因为 Redis 大部分时候都在做读写操作,使用更多的内存和更快的磁盘,对 Redis 性能的提高非常有帮助。

5-Redis 事务

事务指的是提供一种将多个命令打包,一次性按顺序地执行的机制,并且保证服务器只有在执行完事务中的所有命令后,才会继续处理此客户端的其他命令。

事务也是其他关系型数据库所必备的基础功能,以支付的场景为例,正常情况下只有正常消费完成之后,才会减去账户余额。但如果没有事务的保障,可能会发生消费失败了,但依旧会把账户的余额给扣减了,我想这种情况应该任何人都无法接受吧?所以事务是数据库中一项非常重要的基础功能。

5.1 - 事务基本使用

事务在其他语言中,一般分为以下三个阶段:

  • 开启事务 ——Begin Transaction
  • 执行业务代码,提交事务 ——Commit Transaction
  • 业务处理中出现异常,回滚事务 ——Rollback Transaction

以 Java 中的事务执行为例:

// 开启事务
begin();
try {
//......
// 提交事务
commit();
} catch(Exception e) {
// 回滚事务
rollback();
}

Redis 中的事务从开始到结束也是要经历三个阶段:

  • 执行事务 / 放弃事务

其中,开启事务使用 multi 命令,事务执行使用 exec 命令,放弃事务使用 discard 命令。

5.2- 开启事务

multi 命令用于开启事务,实现代码如下:

> multi
OK

multi 命令可以让客户端从非事务模式状态,变为事务模式状态。

注意:multi 命令不能嵌套使用,如果已经开启了事务的情况下,再执行 multi 命令,会提示如下错误:

(error) ERR MULTI calls can not be nested

执行效果,如下代码所示:

127.0.0.1:6379> multi
OK
127.0.0.1:6379> multi
(error) ERR MULTI calls can not be nested

当客户端是非事务状态时,使用 multi 命令,客户端会返回结果 OK,如果客户端已经是事务状态,再执行 multi 命令会 multi 命令不能嵌套的错误,但不会终止客户端为事务的状态。

5.3- 命令入列

客户端进入事务状态之后,执行的所有常规 Redis 操作命令(非触发事务执行或放弃和导致入列异常的命令)会依次入列,命令入列成功后会返回 QUEUED,如下代码所示:

> multi
OK
> set k v
QUEUED
> get k
QUEUED

执行流程如下图所示:

注意:命令会按照先进先出(FIFO)的顺序出入列,也就是说事务会按照命令的入列顺序,从前往后依次执行。

5.4- 执行事务 / 放弃事务

执行事务的命令是 exec,放弃事务的命令是 discard。

执行事务示例代码如下:

> multi
OK
> set k v2
QUEUED
> exec
1) OK
> get k
"v2"

放弃事务示例代码如下:

> multi
OK
> set k v3
QUEUED
> discard
OK
> get k
"v2"

执行流程如下图所示:

5.5 - 事务错误 & 回滚

事务执行中的错误分为以下三类:

  • 执行时才会出现的错误(简称:执行时错误);
  • 入列时错误,不会终止整个事务;
  • 入列时错误,会终止整个事务。

1. 执行时错误

示例代码如下:

> get k
"v"
> multi
OK
> set k v2
QUEUED
> expire k 10s
QUEUED
> exec
1) OK
2) (error) ERR value is not an integer or out of range
> get k
"v2"

执行命令解释如下图所示:

从以上结果可以看出,即使事务队列中某个命令在执行期间发生了错误,事务也会继续执行,直到事务队列中所有命令执行完成。

2. 入列错误不会导致事务结束

示例代码如下:

> get k
"v"
> multi
OK
> set k v2
QUEUED
> multi
(error) ERR MULTI calls can not be nested
> exec
1) OK
> get k
"v2"

执行命令解释如下图所示:

可以看出,重复执行 multi 会导致入列错误,但不会终止事务,最终查询的结果是事务执行成功了。除了重复执行 multi 命令,还有在事务状态下执行 watch 也是同样的效果,下文会详细讲解关于 watch 的内容。

3. 入列错误导致事务结束

示例代码如下:

> get k
"v2"
> multi
OK
> set k v3
QUEUED
> set k
(error) ERR wrong number of arguments for 'set' command
> exec
(error) EXECABORT Transaction discarded because of previous errors.
> get k
"v2"

执行命令解释如下图所示:

4. 为什么不支持事务回滚?

Redis 官方文档的解释如下:

If you have a relational databases background, the fact that Redis commands can fail during a transaction, but still Redis will execute the rest of the transaction instead of rolling back, may look odd to you.

However there are good opinions for this behavior:

  • Redis commands can fail only if called with a wrong syntax (and the problem is not detectable during the command queueing), or against keys holding the wrong data type: this means that in practical terms a failing command is the result of a programming errors, and a kind of error that is very likely to be detected during development, and not in production.
  • Redis is internally simplified and faster because it does not need the ability to roll back.

An argument against Redis point of view is that bugs happen, however it should be noted that in general the roll back does not save you from programming errors. For instance if a query increments a key by 2 instead of 1, or increments the wrong key, there is no way for a rollback mechanism to help. Given that no one can save the programmer from his or her errors, and that the kind of errors required for a Redis command to fail are unlikely to enter in production, we selected the simpler and faster approach of not supporting roll backs on errors.

大概的意思是,作者不支持事务回滚的原因有以下两个:

  • 他认为 Redis 事务的执行时,错误通常都是编程错误造成的,这种错误通常只会出现在开发环境中,而很少会在实际的生产环境中出现,所以他认为没有必要为 Redis 开发事务回滚功能;
  • 不支持事务回滚是因为这种复杂的功能和 Redis 追求的简单高效的设计主旨不符合。

这里不支持事务回滚,指的是不支持运行时错误的事务回滚。

5.6- 监控

watch 命令用于客户端并发情况下,为事务提供一个乐观锁(CAS,Check And Set),也就是可以用 watch 命令来监控一个或多个变量,如果在事务的过程中,某个监控项被修改了,那么整个事务就会终止执行

watch 基本语法如下:

watch key [key ...]

watch 示例代码如下:

> watch k
OK
> multi
OK
> set k v2
QUEUED
> exec
(nil)
> get k
"v"

注意:以上事务在执行期间,也就是开启事务(multi)之后,执行事务(exec)之前,模拟多客户端并发操作了变量 k 的值,这个时候再去执行事务,才会出现如上结果,exec 执行的结果为 nil。

可以看出,当执行 exec 返回的结果是 nil 时,表示 watch 监控的对象在事务执行的过程中被修改了。从 get k 的结果也可以印证,因为事务中设置的值 set k v2 并未正常执行。

执行流程如下图所示:

注意: watch 命令只能在客户端开启事务之前执行,在事务中执行 watch 命令会引发错误,但不会造成整个事务失败,如下代码所示:

> multi
OK
> set k v3
QUEUED
> watch k
(error) ERR WATCH inside MULTI is not allowed
> exec
1) OK
> get k
"v3"

执行命令解释如下图所示:

unwatch 命令用于清除所有之前监控的所有对象(键值对)。

unwatch 示例如下所示:

> set k v
OK
> watch k
OK
> multi
OK
> unwatch
QUEUED
> set k v2
QUEUED
> exec
1) OK
2) OK
> get k
"v2"

可以看出,即使在事务的执行过程中,k 值被修改了,因为调用了 unwatch 命令,整个事务依然会顺利执行。

5.7 - 代码实战

以下是事务在 Java 中的使用,代码如下:

import redis.clients.jedis.Jedis;
import redis.clients.jedis.Transaction;

public class TransactionExample {
public static void main(String[] args) {
// 创建 Redis 连接
Jedis jedis = new Jedis("xxx.xxx.xxx.xxx", 6379);
// 设置 Redis 密码
jedis.auth("xxx");
// 设置键值
jedis.set("k", "v");
// 开启监视 watch
jedis.watch("k");
// 开始事务
Transaction tx = jedis.multi();
// 命令入列
tx.set("k", "v2");
// 执行事务
tx.exec();
System.out.println(jedis.get("k"));
jedis.close();
}
}

5.8 - 小结

事务为多个命令提供一次性按顺序执行的机制,与 Redis 事务相关的命令有以下五个:

  • multi:开启事务
  • exec:执行事务
  • discard:丢弃事务
  • watch:为事务提供乐观锁实现
  • unwatch:取消监控(取消事务中的乐观锁)

正常情况下 Redis 事务分为三个阶段:开启事务、命令入列、执行事务。Redis 事务并不支持运行时错误的事务回滚,但在某些入列错误,如 set key 或者是 watch 监控项被修改时,提供整个事务回滚的功能。

6-Redis 键值过期

Redis 所有的数据结构都可以设置过期时间,时间一到,就会自动删除。你可以想象 Redis 内部有一个死神,时刻盯着所有设置了过期时间的 key,寿命一到就会立即收割。

你还可以进一步站在死神的角度思考,会不会因为同一时间太多的 key 过期,以至于忙不过来。同时因为 Redis 是单线程的,收割的时间也会占用线程的处理时间,如果收割的太过于繁忙,会不会导致线上读写指令出现卡顿。

这些问题 Antirez 早就想到了,所有在过期这件事上,Redis 非常小心。

6.1-Redis 过期策略

1. 过期的 key 集合

redis 会将每个设置了过期时间的 key 放入到一个独立的字典中,以后会定时遍历这个字典来删除到期的 key。除了定时遍历之外,它还会使用惰性策略来删除过期的 key,所谓惰性策略就是在客户端访问这个 key 的时候,redis 对 key 的过期时间进行检查,如果过期了就立即删除。定时删除是集中处理,惰性删除是零散处理。

2. 定时扫描策略

Redis 默认会每秒进行十次过期扫描,过期扫描不会遍历过期字典中所有的 key,而是采用了一种简单的贪心策略。

  1. 从过期字典中随机 20 个 key;
  2. 删除这 20 个 key 中已经过期的 key;
  3. 如果过期的 key 比率超过 1/4,那就重复步骤 1;

同时,为了保证过期扫描不会出现循环过度,导致线程卡死现象,算法还增加了扫描时间的上限,默认不会超过 25ms。

设想一个大型的 Redis 实例中所有的 key 在同一时间过期了,会出现怎样的结果?

毫无疑问,Redis 会持续扫描过期字典 (循环多次),直到过期字典中过期的 key 变得稀疏,才会停止 (循环次数明显下降)。这就会导致线上读写请求出现明显的卡顿现象。导致这种卡顿的另外一种原因是内存管理器需要频繁回收内存页,这也会产生一定的 CPU 消耗。

当客户端请求到来时,服务器如果正好进入过期扫描状态,客户端的请求将会等待至少 25ms 后才会进行处理,如果客户端将超时时间设置的比较短,比如 10ms,那么就会出现大量的链接因为超时而关闭,业务端就会出现很多异常。而且这时你还无法从 Redis 的 slowlog 中看到慢查询记录,因为慢查询指的是逻辑处理过程慢,不包含等待时间。

所以业务开发人员一定要注意过期时间,如果有大批量的 key 过期,要给过期时间设置一个随机范围,而不宜全部在同一时间过期,分散过期处理的压力。

# 在目标过期时间上增加一天的随机时间
redis.expire_at(key, random.randint(86400) + expire_ts)

在一些活动系统中,因为活动是一期一会,下一期活动举办时,前面几期的很多数据都可以丢弃了,所以需要给相关的活动数据设置一个过期时间,以减少不必要的 Redis 内存占用。如果不加注意,你可能会将过期时间设置为活动结束时间再增加一个常量的冗余时间,如果参与活动的人数太多,就会导致大量的 key 同时过期。

3. 从库的过期策略

从库不会进行过期扫描,从库对过期的处理是被动的。主库在 key 到期时,会在 AOF 文件里增加一条 del 指令,同步到所有的从库,从库通过执行这条 del 指令来删除过期的 key。

因为指令同步是异步进行的,所以主库过期的 key 的 del 指令没有及时同步到从库的话,会出现主从数据的不一致,主库没有的数据在从库里还存在。

6.2-Redis 键值过期操作

1. 过期设置

Redis 中设置过期时间主要通过以下四种方式:

  • expire key seconds:设置 key 在 n 秒后过期;
  • pexpire key milliseconds:设置 key 在 n 毫秒后过期;
  • expireat key timestamp:设置 key 在某个时间戳(精确到秒)之后过期;
  • pexpireat key millisecondsTimestamp:设置 key 在某个时间戳(精确到毫秒)之后过期;

下面分别来看以上这些命令的具体实现。

  • expire:N 秒后过期
    127.0.0.1:6379> set key value
    OK
    127.0.0.1:6379> expire key 100
    (integer) 1
    127.0.0.1:6379> ttl key
    (integer) 97

    其中命令 ttl 的全称是 Time To Live,表示此键值在 n 秒后过期。例如,上面的结果 97 表示 key 在 97s 后过期。

  • pexpire:N 毫秒后过期
    127.0.0.1:6379> set key2 value2
    OK
    127.0.0.1:6379> pexpire key2 100000
    (integer) 1
    127.0.0.1:6379> pttl key2
    (integer) 94524

    其中 pexpire key2 100000 表示设置 key2 在 100000 毫秒(100 秒)后过期。

  • expireat:过期时间戳精确到秒
    127.0.0.1:6379> set key3 value3
    OK
    127.0.0.1:6379> expireat key3 1573472683
    (integer) 1
    127.0.0.1:6379> ttl key3
    (integer) 67

    其中 expireat key3 1573472683 表示 key3 在时间戳 1573472683 后过期(精确到秒),使用 ttl 查询可以发现在 67s 后 key3 会过期。

    小贴士:在 Redis 可以使用 time 命令查询当前时间的时间戳(精确到秒),示例如下:

    127.0.0.1:6379> time

    1. “1573472563”

    2. “248426”

  • pexpireat:过期时间戳精确到毫秒
    127.0.0.1:6379> set key4 value4
    OK
    127.0.0.1:6379> pexpireat key4 1573472683000
    (integer) 1
    127.0.0.1:6379> pttl key4
    (integer) 3522

    其中 pexpireat key4 1573472683000 表示 key4 在时间戳 1573472683000 后过期(精确到毫秒),使用 ttl 查询可以发现在 3522ms 后 key4 会过期。

  • 字符串中的过期操作

    字符串中几个直接操作过期时间的方法,如下列表:

    • set key value ex seconds:设置键值对的同时指定过期时间(精确到秒);
    • set key value px milliseconds:设置键值对的同时指定过期时间(精确到毫秒);
    • setex key seconds valule:设置键值对的同时指定过期时间(精确到秒)。

    实现示例如下。

    1. set key value ex seconds
    127.0.0.1:6379> set k v ex 100
    OK
    127.0.0.1:6379> ttl k
    (integer) 97
    2. set key value ex milliseconds
    127.0.0.1:6379> set k2 v2 px 100000
    OK
    127.0.0.1:6379> pttl k2
    (integer) 92483
    3. setex key seconds valule
    127.0.0.1:6379> setex k3 100 v3
    OK
    127.0.0.1:6379> ttl k3
    (integer) 91

2. 移除过期时间

使用命令: persist key 可以移除键值的过期时间,如下代码所示。

127.0.0.1:6379> ttl k3
(integer) 97
127.0.0.1:6379> persist k3
(integer) 1
127.0.0.1:6379> ttl k3
(integer) -1

可以看出第一次使用 ttl 查询 k3 会在 97s 后过期,当使用了 persist 命令之后,在查询 k3 的存活时间发现结果是 -1,它表示 k3 永不过期。

3. Java 实现过期操作

本文将使用 Jedis 框架来实现对 Redis 过期时间的操作,如下代码所示:

public class TTLTest {
public static void main(String[] args) throws InterruptedException {
// 创建 Redis 连接
Jedis jedis = new Jedis("xxx.xxx.xxx.xxx", 6379);
// 设置 Redis 密码(如果没有密码,此行可省略)
jedis.auth("xxx");
// 存储键值对(默认情况下永不过期)
jedis.set("k", "v");
// 查询 TTL(过期时间)
Long ttl = jedis.ttl("k");
// 打印过期日志
System.out.println("过期时间:" + ttl);
// 设置 100s 后过期
jedis.expire("k", 100);
// 等待 1s 后执行
Thread.sleep(1000);
// 打印过期日志
System.out.println("执行 expire 后的 TTL=" + jedis.ttl("k"));
}
}

程序的执行结果为:

过期时间:-1
执行 expire 后的 TTL=99

可以看出使用 Jedis 来操作 Redis 的过期时间还是很方便的,可直接使用 jedis.ttl("k") 查询键值的生存时间,使用 jedis.expire("k",seconds) 方法设置过期时间(精确到秒)。

小贴士:使用 Jedis 之前,先要把 Jedis 引入到程序中,如果使用的是 Maven 项目的,直接在 pom.xml 文件中添加以下引用:

<!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>version</version>
</dependency>

更多过期操作方法,如下列表:

  • pexpire (String key, long milliseconds):设置 n 毫秒后过期;
  • expireAt (String key, long unixTime):设置某个时间戳后过期(精确到秒);
  • pexpireAt (String key, long millisecondsTimestamp):设置某个时间戳后过期(精确到毫秒);
  • persist (String key):移除过期时间。

完整示例代码如下:

public class TTLTest {
public static void main(String[] args) throws InterruptedException {
// 创建 Redis 连接
Jedis jedis = new Jedis("xxx.xxx.xxx.xxx", 6379);
// 设置 Redis 密码(如果没有密码,此行可省略)
jedis.auth("xxx");
// 存储键值对(默认情况下永不过期)
jedis.set("k", "v");
// 查询 TTL(过期时间)
Long ttl = jedis.ttl("k");
// 打印过期日志
System.out.println("过期时间:" + ttl);
// 设置 100s 后过期
jedis.expire("k", 100);
// 等待 1s 后执行
Thread.sleep(1000);
// 打印过期日志
System.out.println("执行 expire 后的 TTL=" + jedis.ttl("k"));
// 设置 n 毫秒后过期
jedis.pexpire("k", 100000);
// 设置某个时间戳后过期(精确到秒)
jedis.expireAt("k", 1573468990);
// 设置某个时间戳后过期(精确到毫秒)
jedis.pexpireAt("k", 1573468990000L);
// 移除过期时间
jedis.persist("k");
}
}

4. 持久化中的过期键

上面我们讲了过期键在 Redis 正常运行中一些使用案例,接下来,我们来看 Redis 在持久化的过程中是如何处理过期键的。

Redis 持久化文件有两种格式:RDB(Redis Database)和 AOF(Append Only File),下面我们分别来看过期键在这两种格式中的呈现状态。

  • RDB 中的过期键

    RDB 文件分为两个阶段,RDB 文件生成阶段和加载阶段。

    1. RDB 文件生成

    从内存状态持久化成 RDB(文件)的时候,会对 key 进行过期检查,过期的键不会被保存到新的 RDB 文件中,因此 Redis 中的过期键不会对生成新 RDB 文件产生任何影响。

    2. RDB 文件加载

    RDB 加载分为以下两种情况:

    • 如果 Redis 是主服务器运行模式的话,在载入 RDB 文件时,程序会对文件中保存的键进行检查,过期键不会被载入到数据库中。所以过期键不会对载入 RDB 文件的主服务器造成影响;
    • 如果 Redis 是从服务器运行模式的话,在载入 RDB 文件时,不论键是否过期都会被载入到数据库中。但由于主从服务器在进行数据同步时,从服务器的数据会被清空。所以一般来说,过期键对载入 RDB 文件的从服务器也不会造成影响。

    RDB 文件加载的源码可以在 rdb.c 文件的 rdbLoad () 函数中找到,源码所示:

    /* Check if the key already expired. This function is used when loading
    * an RDB file from disk, either at startup, or when an RDB was
    * received from the master. In the latter case, the master is
    * responsible for key expiry. If we would expire keys here, the
    * snapshot taken by the master may not be reflected on the slave.
    *
    * 如果服务器为主节点的话,
    * 那么在键已经过期的时候,不再将它们关联到数据库中去
    */
    if (server.masterhost == NULL && expiretime != -1 && expiretime < now) {
    decrRefCount(key);
    decrRefCount(val);
    // 跳过
    continue;
    }
  • AOF 中的过期键
    1. AOF 文件写入

    当 Redis 以 AOF 模式持久化时,如果数据库某个过期键还没被删除,那么 AOF 文件会保留此过期键,当此过期键被删除后,Redis 会向 AOF 文件追加一条 DEL 命令来显式地删除该键值。

    2. AOF 重写

    执行 AOF 重写时,会对 Redis 中的键值对进行检查已过期的键不会被保存到重写后的 AOF 文件中,因此不会对 AOF 重写造成任何影响。

5. 主从库的过期键

当 Redis 运行在主从模式下时,从库不会进行过期扫描,从库对过期的处理是被动的。也就是即使从库中的 key 过期了,如果有客户端访问从库时,依然可以得到 key 对应的值,像未过期的键值对一样返回。

从库的过期键处理依靠主服务器控制,主库在 key 到期时,会在 AOF 文件里增加一条 del 指令,同步到所有的从库,从库通过执行这条 del 指令来删除过期的 key。

本文我们知道了 Redis 中的四种设置过期时间的方式:expire、pexpire、expireat、pexpireat,其中比较常用的是 expire 设置键值 n 秒后过期。

字符串中可以在添加键值的同时设置过期时间,并可以使用 persist 命令移除过期时间。同时我们也知道了过期键在 RDB 写入和 AOF 重写时都不会被记录。

过期键在主从模式下,从库对过期键的处理要完全依靠主库,主库删除过期键之后会发送 del 命令给所有的从库。

本文的知识点,如下图所示:

6.3-Redis 过期策略与源码分析

1. 过期键执行流程

Redis 之所以能知道那些键值过期,是因为在 Redis 中维护了一个字典,存储了所有设置了过期时间的键值,我们称之为过期字典。

过期键判断流程如下图所示:

2. 过期键源码分析

过期键存储在 redisDb 结构中,源代码在 src/server.h 文件中:

/* Redis database representation. There are multiple databases identified
* by integers from 0 (the default database) up to the max configured
* database. The database number is the 'id' field in the structure. */
typedef struct redisDb {
dict *dict; /* 数据库键空间,存放着所有的键值对 */
dict *expires; /* 键的过期时间 */
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/
dict *ready_keys; /* Blocked keys that received a PUSH */
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
int id; /* Database ID */
long long avg_ttl; /* Average TTL, just for stats */
list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
} redisDb;

小贴士:本文的所有源码都是基于 Redis 5。

过期键数据结构如下图所示:

3. 过期策略

Redis 会删除已过期的键值,以此来减少 Redis 的空间占用,但因为 Redis 本身是单线的,如果因为删除操作而影响主业务的执行就得不偿失了,为此 Redis 需要制定多个(过期)删除策略来保证糟糕的事情不会发生。

常见的过期策略有以下三种:

下面分别来看每种策略有何不同。

  • 定时删除

    在设置键值过期时间时,创建一个定时事件,当过期时间到达时,由事件处理器自动执行键的删除操作。

    • 优点:保证内存可以被尽快地释放。
    • 缺点:在 Redis 高负载的情况下或有大量过期键需要同时处理时,会造成 Redis 服务器卡顿,影响主业务执行。
  • 惰性删除

    不主动删除过期键,每次从数据库获取键值时判断是否过期,如果过期则删除键值,并返回 null。

    • 优点:因为每次访问时,才会判断过期键,所以此策略只会使用很少的系统资源。
    • 缺点:系统占用空间删除不及时,导致空间利用率降低,造成了一定的空间浪费。

    源码解析

    惰性删除的源码位于 src/db.c 文件的 expireIfNeeded 方法中,源码如下:

    int expireIfNeeded(redisDb *db, robj *key) {
    // 判断键是否过期
    if (!keyIsExpired(db,key)) return 0;
    if (server.masterhost != NULL) return 1;
    /* 删除过期键 */
    // 增加过期键个数
    server.stat_expiredkeys++;
    // 传播键过期的消息
    propagateExpire(db,key,server.lazyfree_lazy_expire);
    notifyKeyspaceEvent(NOTIFY_EXPIRED,
    "expired",key,db->id);
    // server.lazyfree_lazy_expire 为 1 表示异步删除(懒空间释放),反之同步删除
    return server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) :
    dbSyncDelete(db,key);
    }
    // 判断键是否过期
    int keyIsExpired(redisDb *db, robj *key) {
    mstime_t when = getExpire(db,key);
    if (when < 0) return 0; /* No expire for this key */
    /* Don't expire anything while loading. It will be done later. */
    if (server.loading) return 0;
    mstime_t now = server.lua_caller ? server.lua_time_start : mstime();
    return now > when;
    }
    // 获取键的过期时间
    long long getExpire(redisDb *db, robj *key) {
    dictEntry *de;
    /* No expire? return ASAP */
    if (dictSize(db->expires) == 0 ||
    (de = dictFind(db->expires,key->ptr)) == NULL) return -1;
    /* The entry was found in the expire dict, this means it should also
    * be present in the main dict (safety check). */
    serverAssertWithInfo(NULL,key,dictFind(db->dict,key->ptr) != NULL);
    return dictGetSignedIntegerVal(de);
    }

    所有对数据库的读写命令在执行之前,都会调用 expireIfNeeded 方法判断键值是否过期,过期则会从数据库中删除,反之则不做任何处理。

    惰性删除执行流程,如下图所示:

  • 定期删除

    每隔一段时间检查一次数据库,随机删除一些过期键。

    Redis 默认每秒进行 10 次过期扫描,此配置可通过 Redis 的配置文件 redis.conf 进行配置,配置键为 hz 它的默认值是 hz 10

    需要注意的是:Redis 每次扫描并不是遍历过期字典中的所有键,而是采用随机抽取判断并删除过期键的形式执行的。

    1. 定期删除流程
    1. 从过期字典中随机取出 20 个键;
    2. 删除这 20 个键中过期的键;
    3. 如果过期 key 的比例超过 25%,重复步骤 1。

    同时为了保证过期扫描不会出现循环过度,导致线程卡死现象,算法还增加了扫描时间的上限,默认不会超过 25ms。

    定期删除执行流程,如下图所示:

  • 优点:通过限制删除操作的时长和频率,来减少删除操作对 Redis 主业务的影响,同时也能删除一部分过期的数据减少了过期键对空间的无效占用。
  • 缺点:内存清理方面没有定时删除效果好,同时没有惰性删除使用的系统资源少。
2. 源码解析

定期删除的核心源码在 src/expire.c 文件下的 activeExpireCycle 方法中,源码如下:

void activeExpireCycle(int type) {
static unsigned int current_db = 0; /* 上次定期删除遍历到的数据库ID */
static int timelimit_exit = 0; /* Time limit hit in previous call? */
static long long last_fast_cycle = 0; /* 上一次执行快速定期删除的时间点 */
int j, iteration = 0;
int dbs_per_call = CRON_DBS_PER_CALL; // 每次定期删除,遍历的数据库的数量
long long start = ustime(), timelimit, elapsed;
if (clientsArePaused()) return;
if (type == ACTIVE_EXPIRE_CYCLE_FAST) {
if (!timelimit_exit) return;
// ACTIVE_EXPIRE_CYCLE_FAST_DURATION 是快速定期删除的执行时长
if (start < last_fast_cycle + ACTIVE_EXPIRE_CYCLE_FAST_DURATION*2) return;
last_fast_cycle = start;
}
if (dbs_per_call > server.dbnum || timelimit_exit)
dbs_per_call = server.dbnum;
// 慢速定期删除的执行时长
timelimit = 1000000*ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC/server.hz/100;
timelimit_exit = 0;
if (timelimit <= 0) timelimit = 1;
if (type == ACTIVE_EXPIRE_CYCLE_FAST)
timelimit = ACTIVE_EXPIRE_CYCLE_FAST_DURATION; /* 删除操作的执行时长 */
long total_sampled = 0;
long total_expired = 0;
for (j = 0; j < dbs_per_call && timelimit_exit == 0; j++) {
int expired;
redisDb *db = server.db+(current_db % server.dbnum);
current_db++;
do {
// .......
expired = 0;
ttl_sum = 0;
ttl_samples = 0;
// 每个数据库中检查的键的数量
if (num > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP)
num = ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP;
// 从数据库中随机选取 num 个键进行检查
while (num--) {
dictEntry *de;
long long ttl;
if ((de = dictGetRandomKey(db->expires)) == NULL) break;
ttl = dictGetSignedInteger
// 过期检查,并对过期键进行删除
if (activeExpireCycleTryExpire(db,de,now)) expired++;
if (ttl > 0) {
/* We want the average TTL of keys yet not expired. */
ttl_sum += ttl;
ttl_samples++;
}
total_sampled++;
}
total_expired += expired;
if (ttl_samples) {
long long avg_ttl = ttl_sum/ttl_samples;
if (db->avg_ttl == 0) db->avg_ttl = avg_ttl;
db->avg_ttl = (db->avg_ttl/50)*49 + (avg_ttl/50);
}
if ((iteration & 0xf) == 0) { /* check once every 16 iterations. */
elapsed = ustime()-start;
if (elapsed > timelimit) {
timelimit_exit = 1;
server.stat_expired_time_cap_reached_count++;
break;
}
}
/* 每次检查只删除 ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP/4 个过期键 */
} while (expired > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP/4);
}
// .......
}

activeExpireCycle 方法在规定的时间,分多次遍历各个数据库,从过期字典中随机检查一部分过期键的过期时间,删除其中的过期键。

这个函数有两种执行模式,一个是快速模式一个是慢速模式,体现是代码中的 timelimit 变量,这个变量是用来约束此函数的运行时间的。快速模式下 timelimit 的值是固定的,等于预定义常量 ACTIVE_EXPIRE_CYCLE_FAST_DURATION,慢速模式下,这个变量的值是通过 1000000*ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC/server.hz/100 计算的。

  • Redis 使用的过期策略

    Redis 使用的是惰性删除加定期删除的过期策略。

7-Redis 管道技术 —Pipeline

管道技术(Pipeline)是客户端提供的一种批处理技术,用于一次处理多个 Redis 命令,从而提高整个交互的性能。

通常情况下 Redis 是单行执行的,客户端先向服务器发送请求,服务端接收并处理请求后再把结果返回给客户端,这种处理模式在非频繁请求时不会有任何问题。

但如果出现集中大批量的请求时,因为每个请求都要经历先请求再响应的过程,这就会造成网络资源浪费,此时就需要管道技术来把所有的命令整合一次发给服务端,再一次响应给客户端,这样就能大大的提升了 Redis 的响应速度。

普通命令模式,如下图所示:

两个连续的写操作和两个连续的读操作总共只会花费一次网络来回,就好比连续的 write 操作合并了,连续的 read 操作也合并了一样。

管道模式,如下图所示:

这便是管道操作的本质,服务器根本没有任何区别对待,还是收到一条消息,执行一条消息,回复一条消息的正常的流程。客户端通过对管道中的指令列表改变读写顺序就可以大幅节省 IO 时间。管道中指令越多,效果越好。

7.1 - 管道技术的作用

管道技术解决了多个命令集中请求时造成网络资源浪费的问题,加快了 Redis 的响应速度,让 Redis 拥有更高的运行速度。但要注意的一点是,管道技术本质上是客户端提供的功能,而非 Redis 服务器端的功能。

7.2 - 管道技术的本质

接下来我们深入分析一个请求交互的流程,真实的情况是它很复杂,因为要经过网络协议栈,这个就得深入内核了。

上图就是一个完整的请求交互流程图。我用文字来仔细描述一遍:

  1. 客户端进程调用 write 将消息写到操作系统内核为套接字分配的发送缓冲 send buffer
  2. 客户端操作系统内核将发送缓冲的内容发送到网卡,网卡硬件将数据通过「网际路由」送到服务器的网卡。
  3. 服务器操作系统内核将网卡的数据放到内核为套接字分配的接收缓冲 recv buffer
  4. 服务器进程调用 read 从接收缓冲中取出消息进行处理。
  5. 服务器进程调用 write 将响应消息写到内核为套接字分配的发送缓冲 send buffer
  6. 服务器操作系统内核将发送缓冲的内容发送到网卡,网卡硬件将数据通过「网际路由」送到客户端的网卡。
  7. 客户端操作系统内核将网卡的数据放到内核为套接字分配的接收缓冲 recv buffer
  8. 客户端进程调用 read 从接收缓冲中取出消息返回给上层业务逻辑进行处理。

其中步骤 58 和 14 是一样的,只不过方向是反过来的,一个是请求,一个是响应。

我们开始以为 write 操作是要等到对方收到消息才会返回,但实际上不是这样的。write 操作只负责将数据写到本地操作系统内核的发送缓冲然后就返回了。剩下的事交给操作系统内核异步将数据送到目标机器。但是如果发送缓冲满了,那么就需要等待缓冲空出空闲空间来,这个就是写操作 IO 操作的真正耗时。

我们开始以为 read 操作是从目标机器拉取数据,但实际上不是这样的。read 操作只负责将数据从本地操作系统内核的接收缓冲中取出来就了事了。但是如果缓冲是空的,那么就需要等待数据到来,这个就是读操作 IO 操作的真正耗时。

所以对于 value = redis.get(key) 这样一个简单的请求来说,write 操作几乎没有耗时,直接写到发送缓冲就返回,而 read 就会比较耗时了,因为它要等待消息经过网络路由到目标机器处理后的响应消息,再回送到当前的内核读缓冲才可以返回。这才是一个网络来回的真正开销

而对于管道来说,连续的 write 操作根本就没有耗时,之后第一个 read 操作会等待一个网络的来回开销,然后所有的响应消息就都已经回送到内核的读缓冲了,后续的 read 操作直接就可以从缓冲拿到结果,瞬间就返回了。

7.3 - 管道技术的使用

本文我们使用 Jedis 客户端提供的 Pipeline 对象来实现管道技术。首先先获取 Pipeline 对象,再为 Pipeline 对象设置需要执行的命令,最后再使用 sync () 方法或 syncAndReturnAll () 方法来统一执行这些命令,代码如下:

public class PipelineExample {
public static void main(String[] args) {
Jedis jedis = new Jedis("127.0.0.1", 6379);
// 记录执行开始时间
long beginTime = System.currentTimeMillis();
// 获取 Pipeline 对象
Pipeline pipe = jedis.pipelined();
// 设置多个 Redis 命令
for (int i = 0; i < 100; i++) {
pipe.set("key" + i, "val" + i);
pipe.del("key"+i);
}
// 执行命令
pipe.sync();
// 记录执行结束时间
long endTime = System.currentTimeMillis();
System.out.println("执行耗时:" + (endTime - beginTime) + "毫秒");
}
}

以上程序执行结果如下:

执行耗时:297毫秒

如果要接收管道所有命令的执行结果,可使用 syncAndReturnAll () 方法,示例代码如下:

public class PipelineExample {
public static void main(String[] args) {
Jedis jedis = new Jedis("127.0.0.1", 6379);
// 获取 Pipeline 对象
Pipeline pipe = jedis.pipelined();
// 设置多个 Redis 命令
for (int i = 0; i < 100; i++) {
pipe.set("key" + i, "val" + i);
}
// 执行命令并返回结果
List<Object> res = pipe.syncAndReturnAll();
for (Object obj : res) {
// 打印结果
System.out.println(obj);
}
}
}

7.4 - 管道技术 VS 普通命令

上面使用管道技术执行一个 for 循环所用的时间为 297 毫秒,接下来我们用普通的命令执行此循环,看下程序的执行时间,代码如下:

public class PipelineExample {
public static void main(String[] args) {
Jedis jedis = new Jedis("127.0.0.1", 6379);
// 记录执行开始时间
long beginTime = System.currentTimeMillis();
for (int i = 0; i < 100; i++) {
jedis.set("key" + i, "val" + i);
jedis.del("key"+i);
}
// 记录执行结束时间
long endTime = System.currentTimeMillis();
System.out.println("执行耗时:" + (endTime - beginTime) + "毫秒");
}
}

以上程序执行结果如下:

执行耗时:17276毫秒

结论

从上面的结果可以看出,管道的执行时间是 297 毫秒,而普通命令执行时间是 17276 毫秒,管道技术要比普通的执行快了 58 倍。

7.5 - 管道技术需要注意的事项

管道技术虽然有它的优势,但在使用时还需注意以下几个细节:

  • 发送的命令数量不会被限制,但输入缓存区也就是命令的最大存储体积为 1GB,当发送的命令超过此限制时,命令不会被执行,并且会被 Redis 服务器端断开此链接;
  • 如果管道的数据过多可能会导致客户端的等待时间过长,导致网络阻塞;
  • 部分客户端自己本身也有缓存区大小的设置,如果管道命令没有没执行或者是执行不完整,可以排查此情况或较少管道内的命令重新尝试执行。

使用管道技术可以解决多个命令执行时的网络等待,它是把多个命令整合到一起发送给服务器端处理之后统一返回给客户端,这样就免去了每条命令执行后都要等待的情况,从而有效地提高了程序的执行效率,但使用管道技术也要注意避免发送的命令过大,或管道内的数据太多而导致的网络阻塞。

8 - 内存淘汰机制与算法

当 Redis 内存超出物理内存限制时,内存的数据会开始和磁盘产生频繁的交换 (swap)。交换会让 Redis 的性能急剧下降,对于访问量比较频繁的 Redis 来说,这样龟速的存取效率基本上等于不可用。

在生产环境中我们是不允许 Redis 出现交换行为的,为了限制最大使用内存,Redis 提供了配置参数 maxmemory 来限制内存超出期望大小。

当实际内存超出 maxmemory 时,Redis 提供了几种可选策略 (maxmemory-policy) 来让用户自己决定该如何腾出新的空间以继续提供读写服务。

8.1-Redis 最大内存

只有在 Redis 的运行内存达到了某个阀值,才会触发内存淘汰机制,这个阀值就是我们设置的最大运行内存,此值在 Redis 的配置文件中可以找到,配置项为 maxmemory。

内存淘汰执行流程,如下图所示:

查询最大运行内存

我们可以使用命令 config get maxmemory 来查看设置的最大运行内存,命令如下:

127.0.0.1:6379> config get maxmemory
1) "maxmemory"
2) "0"

我们发现此值竟然是 0,这是 64 位操作系统默认的值,当 maxmemory 为 0 时,表示没有内存大小限制。

小贴士:32 位操作系统,默认的最大内存值是 3GB。

8.2 - 内存淘汰策略

1. 查看 Redis 内存淘汰策略

我们可以使用 config get maxmemory-policy 命令,来查看当前 Redis 的内存淘汰策略,命令如下:

127.0.0.1:6379> config get maxmemory-policy
1) "maxmemory-policy"
2) "noeviction"

可以看出此 Redis 使用的是 noeviction 类型的内存淘汰机制,它表示当运行内存超过最大设置内存时,不淘汰任何数据,但新增操作会报错。

2. 内存淘汰策略分类

早期版本的 Redis 有以下 6 种淘汰策略:

  1. noeviction:不淘汰任何数据,当内存不足时,新增操作会报错,Redis 默认内存淘汰策略;
  2. allkeys-lru:淘汰整个键值中最久未使用的键值;
  3. allkeys-random:随机淘汰任意键值;
  4. volatile-lru:淘汰所有设置了过期时间的键值中最久未使用的键值;
  5. volatile-random:随机淘汰设置了过期时间的任意键值;
  6. volatile-ttl:优先淘汰更早过期的键值。

在 Redis 4.0 版本中又新增了 2 种淘汰策略:

  1. volatile-lfu:淘汰所有设置了过期时间的键值中,最少使用的键值;
  2. allkeys-lfu:淘汰整个键值中最少使用的键值。

其中 allkeys-xxx 表示从所有的键值中淘汰数据,而 volatile-xxx 表示从设置了过期键的键值中淘汰数据。

  • 修改 Redis 内存淘汰策略
    设置内存淘汰策略有两种方法,这两种方法各有利弊,需要使用者自己去权衡。
    • 方式一:通过 “config set maxmemory-policy 策略” 命令设置。它的优点是设置之后立即生效,不需要重启 Redis 服务,缺点是重启 Redis 之后,设置就会失效。

    • 方式二:通过修改 Redis 配置文件修改,设置 “maxmemory-policy 策略”,它的优点是重启 Redis 服务后配置不会丢失,缺点是必须重启 Redis 服务,设置才能生效。

  • 内存淘汰算法

从内测淘汰策略分类上,我们可以得知,除了随机删除和不删除之外,主要有两种淘汰算法:LRU 算法和 LFU 算法。

8.3-LRU 算法

LRU 全称是 Least Recently Used 译为最近最少使用,是一种常用的页面置换算法,选择最近最久未使用的页面予以淘汰。

1. LRU 算法实现

LRU 算法需要基于链表结构,链表中的元素按照操作顺序从前往后排列,最新操作的键会被移动到表头,当需要内存淘汰时,只需要删除链表尾部的元素即可。

现 LRU 算法除了需要 key/value 字典外,还需要附加一个链表,链表中的元素按照一定的顺序进行排列。当空间满的时候,会踢掉链表尾部的元素。当字典的某个元素被访问时,它在链表中的位置会被移动到表头。所以链表的元素排列顺序就是元素最近被访问的时间顺序。

位于链表尾部的元素就是不被重用的元素,所以会被踢掉。位于表头的元素就是最近刚刚被人用过的元素,所以暂时不会被踢。

2. 近 LRU 算法

Redis 使用的是一种近似 LRU 算法,目的是为了更好的节约内存,它的实现方式是给现有的数据结构添加一个额外的字段,用于记录此键值的最后一次访问时间,Redis 内存淘汰时,会使用随机采样的方式来淘汰数据,它是随机取 5 个值(此值可配置),然后淘汰最久没有使用的那个。

Redis 使用的是一种近似 LRU 算法,它跟 LRU 算法还不太一样。之所以不使用 LRU 算法,是因为需要消耗大量的额外的内存,需要对现有的数据结构进行较大的改造。近似 LRU 算法则很简单,在现有数据结构的基础上使用随机采样法来淘汰元素,能达到和 LRU 算法非常近似的效果。Redis 为实现近似 LRU 算法,它给每个 key 增加了一个额外的小字段,这个字段的长度是 24 个 bit,也就是最后一次被访问的时间戳。

3. LRU 算法缺点

LRU 算法有一个缺点,比如说很久没有使用的一个键值,如果最近被访问了一次,那么它就不会被淘汰,即使它是使用次数最少的缓存,那它也不会被淘汰,因此在 Redis 4.0 之后引入了 LFU 算法。

4. LFU 算法

LFU 全称是 Least Frequently Used 翻译为最不常用的,最不常用的算法是根据总访问次数来淘汰数据的,它的核心思想是 “如果数据过去被访问多次,那么将来被访问的频率也更高”。

LFU 解决了偶尔被访问一次之后,数据就不会被淘汰的问题,相比于 LRU 算法也更合理一些。

在 Redis 中每个对象头中记录着 LFU 的信息,源码如下:

typedef struct redisObject {
unsigned type:4;
unsigned encoding:4;
unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or
* LFU data (least significant 8 bits frequency
* and most significant 16 bits access time). */
int refcount;
void *ptr;
} robj;

在 Redis 中 LFU 存储分为两部分,16 bit 的 ldt(last decrement time)和 8 bit 的 logc(logistic counter)。

  1. logc 是用来存储访问频次,8 bit 能表示的最大整数值为 255,它的值越小表示使用频率越低,越容易淘汰;
  2. ldt 是用来存储上一次 logc 的更新时间。

9 - 查询附近的人 —GeoHash

我们所处的任何位置都可以用经度和纬度来标识,经度的范围 -180 到 180,纬度的范围为 -90 到 90。纬度以赤道为界,赤道以南为负数,赤道以北为正数;经度以本初子午线(英国格林尼治天文台)为界,东边为正数,西边为负数。

Redis 在 3.2 版本中增加了 GEO 类型用于存储和查询地理位置,关于 GEO 的命令不多,主要包含以下 6 个:

  1. geoadd:添加地理位置
  2. geopos:查询位置信息
  3. geodist:距离统计
  4. georadius:查询某位置内的其他成员信息
  5. geohash:查询位置的哈希值
  6. zrem:删除地理位置

9.1 - 用数据库来算附近的人

地图元素的位置数据使用二维的经纬度表示,经度范围 (-180, 180],纬度范围 (-90, 90],纬度正负以赤道为界,北正南负,经度正负以本初子午线 (英国格林尼治天文台) 为界,东正西负。比如掘金办公室在望京 SOHO,它的经纬度坐标是 (116.48105,39.996794),都是正数,因为中国位于东北半球。

当两个元素的距离不是很远时,可以直接使用勾股定理就能算得元素之间的距离。我们平时使用的「附近的人」的功能,元素距离都不是很大,勾股定理算距离足矣。不过需要注意的是,经纬度坐标的密度不一样 (地球是一个椭圆),勾股定律计算平方差时之后再求和时,需要按一定的系数比加权求和,如果不求精确的话,也可以不必加权。

问题:经度总共 360 度,维度总共只有 180 度,为什么距离密度不是 2:1?

现在,如果要计算「附近的人」,也就是给定一个元素的坐标,然后计算这个坐标附近的其它元素,按照距离进行排序,该如何下手?

如果现在元素的经纬度坐标使用关系数据库 (元素 id, 经度 x, 纬度 y) 存储,你该如何计算?

首先,你不可能通过遍历来计算所有的元素和目标元素的距离然后再进行排序,这个计算量太大了,性能指标肯定无法满足。一般的方法都是通过矩形区域来限定元素的数量,然后对区域内的元素进行全量距离计算再排序。这样可以明显减少计算量。如何划分矩形区域呢?可以指定一个半径 r,使用一条 SQL 就可以圈出来。当用户对筛出来的结果不满意,那就扩大半径继续筛选。

select id from positions where x0-r < x < x0+r and y0-r < y < y0+r

为了满足高性能的矩形区域算法,数据表需要在经纬度坐标加上双向复合索引 (x, y),这样可以最大优化查询性能。

但是数据库查询性能毕竟有限,如果「附近的人」查询请求非常多,在高并发场合,这可能并不是一个很好的方案。

9.2-GeoHash 算法

业界比较通用的地理位置距离排序算法是 GeoHash 算法,Redis 也使用 GeoHash 算法。GeoHash 算法将二维的经纬度数据映射到一维的整数,这样所有的元素都将在挂载到一条线上,距离靠近的二维坐标映射到一维后的点之间距离也会很接近。当我们想要计算「附近的人时」,首先将目标位置映射到这条线上,然后在这个一维的线上获取附近的点就行了。

那这个映射算法具体是怎样的呢?它将整个地球看成一个二维平面,然后划分成了一系列正方形的方格,就好比围棋棋盘。所有的地图元素坐标都将放置于唯一的方格中。方格越小,坐标越精确。然后对这些方格进行整数编码,越是靠近的方格编码越是接近。那如何编码呢?一个最简单的方案就是切蛋糕法。设想一个正方形的蛋糕摆在你面前,二刀下去均分分成四块小正方形,这四个小正方形可以分别标记为 00,01,10,11 四个二进制整数。然后对每一个小正方形继续用二刀法切割一下,这时每个小小正方形就可以使用 4bit 的二进制整数予以表示。然后继续切下去,正方形就会越来越小,二进制整数也会越来越长,精确度就会越来越高。

编码之后,每个地图元素的坐标都将变成一个整数,通过这个整数可以还原出元素的坐标,整数越长,还原出来的坐标值的损失程度就越小。对于「附近的人」这个功能而言,损失的一点精确度可以忽略不计。

GeoHash 算法会继续对这个整数做一次 base32 编码 (0-9,a-z 去掉 a,i,l,o 四个字母) 变成一个字符串。在 Redis 里面,经纬度使用 52 位的整数进行编码,放进了 zset 里面,zset 的 value 是元素的 key,score 是 GeoHash 的 52 位整数值。zset 的 score 虽然是浮点数,但是对于 52 位的整数值,它可以无损存储。

在使用 Redis 进行 Geo 查询时,我们要时刻想到它的内部结构实际上只是一个 zset (skiplist)。通过 zset 的 score 排序就可以得到坐标附近的其它元素 (实际情况要复杂一些,不过这样理解足够了),通过将 score 还原成坐标值就可以得到元素的原始坐标。

9.3-Redis 的 GEO 的使用

1. 添加地理位置

我们先用百度地图提供的经纬度查询工具,地址:

http://api.map.baidu.com/lbsapi/getpoint/index.html

如下图所示:

找了以下 4 个地点,添加到 Redis 中:

  1. 天安门:116.404269,39.913164
  2. 月坛公园:116.36,39.922461
  3. 北京欢乐谷:116.499705,39.874635
  4. 香山公园:116.193275,39.996348

代码如下:

127.0.0.1:6379> geoadd site 116.404269 39.913164 tianan
(integer) 1
127.0.0.1:6379> geoadd site 116.36 39.922461 yuetan
(integer) 1
127.0.0.1:6379> geoadd site 116.499705 39.874635 huanle
(integer) 1
127.0.0.1:6379> geoadd site 116.193275 39.996348 xiangshan
(integer) 1

相关语法:

geoadd key longitude latitude member [longitude latitude member ...]

重点参数说明如下:

  • longitude 表示经度
  • latitude 表示纬度
  • member 是为此经纬度起的名字

此命令支持一次添加一个或多个位置信息。

2. 查询位置信息

127.0.0.1:6379> geopos site tianan
1) 1) "116.40541702508926392"
2) "39.91316289865137179"

相关语法:

geopos key member [member ...]

此命令支持查看一个或多个位置信息。

3. 距离统计

127.0.0.1:6379> geodist site tianan yuetan km
"3.9153"

可以看出天安门距离月坛公园的直线距离大概是 3.9 km,我们打开地图使用工具测试一下咱们的统计结果是否准确,如下图所示:

可以看出 Redis 的统计和使用地图工具统计的距离是完全吻合的。

注意:此命令统计的距离为两个位置的直线距离。

相关语法:

geodist key member1 member2 [unit]

unit 参数表示统计单位,它可以设置以下值:

  • m:以米为单位,默认单位;
  • km:以千米为单位;
  • mi:以英里为单位;
  • ft:以英尺为单位。

4. 查询某位置内的其他成员信息

127.0.0.1:6379> georadius site 116.405419 39.913164 5 km
1) "tianan"
2) "yuetan"

此命令的意思是查询天安门(116.405419,39.913164)附近 5 公里范围内的成员列表。

相关语法:

georadius key longitude latitude radius m|km|ft|mi [WITHCOORD] [WITHDIST] [WITHHASH] [COUNT count] [ASC|DESC]

可选参数说明如下。

1. WITHCOORD

说明:返回满足条件位置的经纬度信息。

示例代码:

127.0.0.1:6379> georadius site 116.405419 39.913164 5 km withcoord
1) 1) "tianan"
2) 1) "116.40426903963088989"
2) "39.91316289865137179"
2) 1) "yuetan"
2) 1) "116.36000186204910278"
2) "39.92246025586381819"

2. WITHDIST

说明:返回满足条件位置与查询位置的直线距离。

示例代码:

127.0.0.1:6379> georadius site 116.405419 39.913164 5 km withdist
1) 1) "tianan"
2) "0.0981"
2) 1) "yuetan"
2) "4.0100"

3. WITHHASH

说明:返回满足条件位置的哈希信息。

示例代码:

127.0.0.1:6379> georadius site 116.405419 39.913164 5 km withhash
1) 1) "tianan"
2) (integer) 4069885552230465
2) 1) "yuetan"
2) (integer) 4069879797297521

4. COUNT count

说明:指定返回满足条件位置的个数。

例如,指定返回一条满足条件的信息,代码如下:

127.0.0.1:6379> georadius site 116.405419 39.913164 5 km count 1
1) "tianan"

5. ASC|DESC

说明:从近到远 | 从远到近排序返回。

示例代码:

127.0.0.1:6379> georadius site 116.405419 39.913164 5 km desc
1) "yuetan"
2) "tianan"
127.0.0.1:6379> georadius site 116.405419 39.913164 5 km asc
1) "tianan"
2) "yuetan"

当然以上这些可选参数也可以一起使用,例如以下代码:

127.0.0.1:6379> georadius site 116.405419 39.913164 5 km withdist desc
1) 1) "yuetan"
2) "4.0100"
2) 1) "tianan"
2) "0.0981"

5. 查询哈希值

127.0.0.1:6379> geohash site tianan
1) "wx4g0cgp000"

相关语法:

geohash key member [member ...]

此命令支持查询一个或多个地址的哈希值。

6. 删除地理位置

127.0.0.1:6379> zrem site xiaoming
(integer) 1

相关语法:

zrem key member [member ...]

此命令支持删除一个或多个位置信息。

9.4 - 代码实战

下面我们用 Java 代码,来实现查询附近的人,完整代码如下:

import redis.clients.jedis.GeoCoordinate;
import redis.clients.jedis.GeoRadiusResponse;
import redis.clients.jedis.GeoUnit;
import redis.clients.jedis.Jedis;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class GeoHashExample {
public static void main(String[] args) {
Jedis jedis = new Jedis("127.0.0.1", 6379);
Map<String, GeoCoordinate> map = new HashMap<>();
// 添加小明的位置
map.put("xiaoming", new GeoCoordinate(116.404269, 39.913164));
// 添加小红的位置
map.put("xiaohong", new GeoCoordinate(116.36, 39.922461));
// 添加小美的位置
map.put("xiaomei", new GeoCoordinate(116.499705, 39.874635));
// 添加小二
map.put("xiaoer", new GeoCoordinate(116.193275, 39.996348));
jedis.geoadd("person", map);
// 查询小明和小红的直线距离
System.out.println("小明和小红相距:" + jedis.geodist("person", "xiaoming",
"xiaohong", GeoUnit.KM) + " KM");
// 查询小明附近 5 公里的人
List<GeoRadiusResponse> res = jedis.georadiusByMemberReadonly("person", "xiaoming",
5, GeoUnit.KM);
for (int i = 1; i < res.size(); i++) {
System.out.println("小明附近的人:" + res.get(i).getMemberByString());
}
}
}

以上程序执行的结果如下:

小明和小红相距:3.9153 KM
小明附近的人:xiaohong

9.5 - 应用场景

Redis 中的 GEO 经典使用场景如下:

  1. 查询附近的人、附近的地点等;
  2. 计算相关的距离信息。

10 - 游标迭代器(过滤器)—Scan

10.1 - 为什么需要 Scan?

在平时线上 Redis 维护工作中,有时候需要从 Redis 实例成千上万的 key 中找出特定前缀的 key 列表来手动处理数据,可能是修改它的值,也可能是删除 key。这里就有一个问题,如何从海量的 key 中找出满足特定前缀的 key 列表来?

Redis 提供了一个简单暴力的指令 keys 用来列出所有满足特定正则字符串规则的 key。

但这个命令存在两个缺点:

  1. 此命令没有分页功能,我们只能一次性查询出所有符合条件的 key 值,如果查询结果非常巨大,那么得到的输出信息也会非常多;
  2. keys 命令是遍历查询,因此它的查询时间复杂度是 o (n),所以数据量越大查询时间就越长。
127.0.0.1:6379> set codehole1 a
OK
127.0.0.1:6379> set codehole2 b
OK
127.0.0.1:6379> set codehole3 c
OK
127.0.0.1:6379> set code1hole a
OK
127.0.0.1:6379> set code2hole b
OK
127.0.0.1:6379> set code3hole b
OK
127.0.0.1:6379> keys *
1) "codehole1"
2) "code3hole"
3) "codehole3"
4) "code2hole"
5) "codehole2"
6) "code1hole"
127.0.0.1:6379> keys codehole*
1) "codehole1"
2) "codehole3"
3) "codehole2"
127.0.0.1:6379> keys code*hole
1) "code3hole"
2) "code2hole"
3) "code1hole"

这个指令使用非常简单,提供一个简单的正则字符串即可,但是有很明显的两个缺点

  1. 没有 offset、limit 参数,一次性吐出所有满足条件的 key,万一实例中有几百 w 个 key 满足条件,当你看到满屏的字符串刷的没有尽头时,你就知道难受了。
  2. keys 算法是遍历算法,复杂度是 O (n),如果实例中有千万级以上的 key,这个指令就会导致 Redis 服务卡顿,所有读写 Redis 的其它的指令都会被延后甚至会超时报错,因为 Redis 是单线程程序,顺序执行所有指令,其它指令必须等到当前的 keys 指令执行完了才可以继续。

面对这两个显著的缺点该怎么办呢?

Redis 为了解决这个问题,它在 2.8 版本中加入了大海捞针的指令 ——scanscan 相比 keys 具备有以下特点:

  1. 复杂度虽然也是 O (n),但是它是通过游标分步进行的,不会阻塞线程;
  2. 提供 limit 参数,可以控制每次返回结果的最大条数,limit 只是一个 hint,返回的结果可多可少;
  3. 同 keys 一样,它也提供模式匹配功能;
  4. 服务器不需要为游标保存状态,游标的唯一状态就是 scan 返回给客户端的游标整数;
  5. 返回的结果可能会有重复,需要客户端去重复,这点非常重要;
  6. 遍历的过程中如果有数据修改,改动后的数据能不能遍历到是不确定的;
  7. 单次返回的结果是空的并不意味着遍历结束,而要看返回的游标值是否为零;

10.2-Scan 命令使用

我们先来模拟海量数据,使用 Pipeline 添加 10w 条数据,Java 代码实现如下:

import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
import utils.JedisUtils;

public class ScanExample {
public static void main(String[] args) {
// 添加 10w 条数据
initData();
}
public static void initData(){
Jedis jedis = JedisUtils.getJedis();
Pipeline pipe = jedis.pipelined();
for (int i = 1; i < 100001; i++) {
pipe.set("user_token_" + i, "id" + i);
}
// 执行命令
pipe.sync();
System.out.println("数据插入完成");
}
}

我们来查询用户 id 为 9999* 的数据,Scan 命令使用如下:

127.0.0.1:6379> scan 0 match user_token_9999* count 10000
1) "127064"
2) 1) "user_token_99997"
127.0.0.1:6379> scan 127064 match user_token_9999* count 10000
1) "1740"
2) 1) "user_token_9999"
127.0.0.1:6379> scan 1740 match user_token_9999* count 10000
1) "21298"
2) 1) "user_token_99996"
127.0.0.1:6379> scan 21298 match user_token_9999* count 10000
1) "65382"
2) (empty list or set)
127.0.0.1:6379> scan 65382 match user_token_9999* count 10000
1) "78081"
2) 1) "user_token_99998"
2) "user_token_99992"
127.0.0.1:6379> scan 78081 match user_token_9999* count 10000
1) "3993"
2) 1) "user_token_99994"
2) "user_token_99993"
127.0.0.1:6379> scan 3993 match user_token_9999* count 10000
1) "13773"
2) 1) "user_token_99995"
127.0.0.1:6379> scan 13773 match user_token_9999* count 10000
1) "47923"
2) (empty list or set)
127.0.0.1:6379> scan 47923 match user_token_9999* count 10000
1) "59751"
2) 1) "user_token_99990"
2) "user_token_99991"
3) "user_token_99999"
127.0.0.1:6379> scan 59751 match user_token_9999* count 10000
1) "0"
2) (empty list or set)

从以上的执行结果,我们看出两个问题:

  1. 查询的结果为空,但游标值不为 0,表示遍历还没结束;
  2. 设置的是 count 10000,但每次返回的数量都不是 10000,且不固定,这是因为 count 只是限定服务器单次遍历的字典槽位数量(约等于),而不是规定返回结果的 count 值。

相关语法:

scan cursor [MATCH pattern] [COUNT count]
  • cursor:光标位置,整数值,从 0 开始,到 0 结束,查询结果是空,但游标值不为 0,表示遍历还没结束;
  • match pattern:正则匹配字段;
  • count:限定服务器单次遍历的字典槽位数量(约等于),只是对增量式迭代命令的一种提示(hint),并不是查询结果返回的最大数量,它的默认值是 10。

10.3 - 代码实战

本文我们使用 Java 代码来实现 Scan 的查询功能,代码如下:

import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.ScanParams;
import redis.clients.jedis.ScanResult;
import utils.JedisUtils;

public class ScanExample {
public static void main(String[] args) {
Jedis jedis = JedisUtils.getJedis();
// 定义 match 和 count 参数
ScanParams params = new ScanParams();
params.count(10000);
params.match("user_token_9999*");
// 游标
String cursor = "0";
while (true) {
ScanResult<String> res = jedis.scan(cursor, params);
if (res.getCursor().equals("0")) {
// 表示最后一条
break;
}
cursor = res.getCursor(); // 设置游标
for (String item : res.getResult()) {
// 打印查询结果
System.out.println("查询结果:" + item);
}
}
}
}

以上程序执行结果如下:

查询结果:user_token_99997
查询结果:user_token_9999
查询结果:user_token_99996
查询结果:user_token_99998
查询结果:user_token_99992
查询结果:user_token_99994
查询结果:user_token_99993
查询结果:user_token_99995
查询结果:user_token_99990
查询结果:user_token_99991
查询结果:user_token_99999

10.4-Scan 相关命令

Scan 是一个系列指令,除了 Scan 之外,还有以下 3 个命令:

  1. HScan 遍历字典游标迭代器
  2. SScan 遍历集合的游标迭代器
  3. ZScan 遍历有序集合的游标迭代器

来看这些命令的具体使用。

1. HScan 使用

127.0.0.1:6379> hscan myhash 0 match k2* count 10
1) "0"
2) 1) "k2"
2) "v2"

相关语法:

hscan key cursor [MATCH pattern] [COUNT count]

2.SScan 使用

127.0.0.1:6379> sscan myset 0 match v2* count 20
1) "0"
2) 1) "v2"

相关语法:

sscan key cursor [MATCH pattern] [COUNT count]

3. ZScan 使用

127.0.0.1:6379> zscan zset 0 match red* count 20
1) "0"
2) 1) "redis"
2) "10"

相关语法:

zscan key cursor [MATCH pattern] [COUNT count]

5. Scan 说明

官方对 Scan 命令的描述信息如下。

Scan guarantees

The SCAN command, and the other commands in the SCAN family, are able to provide to the user a set of guarantees associated to full iterations.

  • A full iteration always retrieves all the elements that were present in the collection from the start to the end of a full iteration. This means that if a given element is inside the collection when an iteration is started, and is still there when an iteration terminates, then at some point SCANreturned it to the user.
  • A full iteration never returns any element that was NOT present in the collection from the start to the end of a full iteration. So if an element was removed before the start of an iteration, and is never added back to the collection for all the time an iteration lasts, SCAN ensures that this element will never be returned.

However because SCAN has very little state associated (just the cursor) it has the following drawbacks:

  • A given element may be returned multiple times. It is up to the application to handle the case of duplicated elements, for example only using the returned elements in order to perform operations that are safe when re-applied multiple times.
  • Elements that were not constantly present in the collection during a full iteration, may be returned or not: it is undefined.

官方文档地址:

https://redis.io/commands/scan

翻译为中文的含义是:Scan 及它的相关命令可以保证以下查询规则。

  • 它可以完整返回开始到结束检索集合中出现的所有元素,也就是在整个查询过程中如果这些元素没有被删除,且符合检索条件,则一定会被查询出来;
  • 它可以保证不会查询出,在开始检索之前删除的那些元素。

然后,Scan 命令包含以下缺点:

  • 一个元素可能被返回多次,需要客户端来实现去重;
  • 在迭代过程中如果有元素被修改,那么修改的元素能不能被遍历到不确定。

10.5 - 小结

通过本文我们可以知道 Scan 包含以下四个指令:

  1. Scan:用于检索当前数据库中所有数据;
  2. HScan:用于检索哈希类型的数据;
  3. SScan:用于检索集合类型中的数据;
  4. ZScan:由于检索有序集合中的数据。

Scan 具备以下几个特点:

  1. Scan 可以实现 keys 的匹配功能;
  2. Scan 是通过游标进行查询的不会导致 Redis 假死;
  3. Scan 提供了 count 参数,可以规定遍历的数量;
  4. Scan 会把游标返回给客户端,用户客户端继续遍历查询;
  5. Scan 返回的结果可能会有重复数据,需要客户端去重;
  6. 单次返回空值且游标不为 0,说明遍历还没结束;
  7. Scan 可以保证在开始检索之前,被删除的元素一定不会被查询出来;
  8. 在迭代过程中如果有元素被修改, Scan 不保证能查询出相关的元素。

11 - 优秀的基数统计算法 —HyperLogLog

11.1 - 为什么要使用 HyperLogLog?

在我们实际开发的过程中,可能会遇到这样一个问题,当我们需要统计一个大型网站的独立访问次数时,该用什么的类型来统计?

如果我们使用 Redis 中的集合来统计,当它每天有数千万级别的访问时,将会是一个巨大的问题。因为这些访问量不能被清空,我们运营人员可能会随时查看这些信息,那么随着时间的推移,这些统计数据所占用的空间会越来越大,逐渐超出我们能承载最大空间。

例如,我们用 IP 来作为独立访问的判断依据,那么我们就要把每个独立 IP 进行存储,以 IP4 来计算,IP4 最多需要 15 个字节来存储信息,例如:110.110.110.110。当有一千万个独立 IP 时,所占用的空间就是 15 bit*10000000 约定于 143MB,但这只是一个页面的统计信息,假如我们有 1 万个这样的页面,那我们就需要 1T 以上的空间来存储这些数据,而且随着 IP6 的普及,这个存储数字会越来越大,那我们就不能用集合的方式来存储了,这个时候我们需要开发新的数据类型 HyperLogLog 来做这件事了。

11.2-HyperLogLog 介绍

HyperLogLog(下文简称为 HLL)是 Redis 2.8.9 版本添加的数据结构,它用于高性能的基数(去重)统计功能,它的缺点就是存在极低的误差率。

HLL 具有以下几个特点:

  • 能够使用极少的内存来统计巨量的数据,它只需要 12K 空间就能统计 2^64 的数据;
  • 统计存在一定的误差,误差率整体较低,标准误差为 0.81%;
  • 误差可以被设置辅助计算因子进行降低。

HyperLogLog 算法是一种非常巧妙的近似统计海量去重元素数量的算法。它内部维护了 16384 个桶(bucket)来记录各自桶的元素数量。当一个元素到来时,它会散列到其中一个桶,以一定的概率影响这个桶的计数值。因为是概率算法,所以单个桶的计数值并不准确,但是将所有的桶计数值进行调合均值累加起来,结果就会非常接近真实的计数值。

11.3 - 基础使用

HLL 的命令只有 3 个,但都非常的实用,下面分别来看。

1. 添加元素

127.0.0.1:6379> pfadd key "redis"
(integer) 1
127.0.0.1:6379> pfadd key "java" "sql"
(integer) 1

相关语法:

pfadd key element [element ...]

此命令支持添加一个或多个元素至 HLL 结构中。

2. 统计不重复的元素

127.0.0.1:6379> pfadd key "redis"
(integer) 1
127.0.0.1:6379> pfadd key "sql"
(integer) 1
127.0.0.1:6379> pfadd key "redis"
(integer) 0
127.0.0.1:6379> pfcount key
(integer) 2

从 pfcount 的结果可以看出,在 HLL 结构中键值为 key 的元素,有 2 个不重复的值:redis 和 sql,可以看出结果还是挺准的。

相关语法:

pfcount key [key ...]

此命令支持统计一个或多个 HLL 结构。

3. 合并一个或多个 HLL 至新结构

新增 k 和 k2 合并至新结构 k3 中,代码如下:

127.0.0.1:6379> pfadd k "java" "sql"
(integer) 1
127.0.0.1:6379> pfadd k2 "redis" "sql"
(integer) 1
127.0.0.1:6379> pfmerge k3 k k2
OK
127.0.0.1:6379> pfcount k3
(integer) 3

相关语法:

pfmerge destkey sourcekey [sourcekey ...]

pfmerge 使用场景

当我们需要合并两个或多个同类页面的访问数据时,我们可以使用 pfmerge 来操作。

11.4 - 代码实战

接下来我们使用 Java 代码来实现 HLL 的三个基础功能,代码如下:

import redis.clients.jedis.Jedis;

public class HyperLogLogExample {
public static void main(String[] args) {
Jedis jedis = new Jedis("127.0.0.1", 6379);
// 添加元素
jedis.pfadd("k", "redis", "sql");
jedis.pfadd("k", "redis");
// 统计元素
long count = jedis.pfcount("k");
// 打印统计元素
System.out.println("k:" + count);
// 合并 HLL
jedis.pfmerge("k2", "k");
// 打印新 HLL
System.out.println("k2:" + jedis.pfcount("k2"));
}
}

以上代码执行结果如下:

k:2
k2:2

11.5-HLL 算法原理

HyperLogLog 算法来源于论文 HyperLogLog the analysis of a near-optimal cardinality estimation algorithm,想要了解 HLL 的原理,先要从伯努利试验说起,伯努利实验说的是抛硬币的事。一次伯努利实验相当于抛硬币,不管抛多少次只要出现一个正面,就称为一次伯努利实验。

我们用 k 来表示每次抛硬币的次数,n 表示第几次抛的硬币,用 k_max 来表示抛硬币的最高次数,最终根据估算发现 n 和 k_max 存在的关系是 n=2^(k_max),但同时我们也发现了另一个问题当试验次数很小的时候,这种估算方法的误差会很大,例如我们进行以下 3 次实验:

  • 第 1 次试验:抛 3 次出现正面,此时 k=3,n=1;
  • 第 2 次试验:抛 2 次出现正面,此时 k=2,n=2;
  • 第 3 次试验:抛 6 次出现正面,此时 k=6,n=3。

对于这三组实验来说,k_max=6,n=3,但放入估算公式明显 3≠2^6。为了解决这个问题 HLL 引入了分桶算法和调和平均数来使这个算法更接近真实情况。

分桶算法是指把原来的数据平均分为 m 份,在每段中求平均数在乘以 m,以此来消减因偶然性带来的误差,提高预估的准确性,简单来说就是把一份数据分为多份,把一轮计算,分为多轮计算。

而调和平均数指的是使用平均数的优化算法,而非直接使用平均数。

例如小明的月工资是 1000 元,而小王的月工资是 100000 元,如果直接取平均数,那小明的平均工资就变成了 (1000+100000)/2=50500‬ 元,这显然是不准确的,而使用调和平均数算法计算的结果是 2/(1/1000+1/100000)≈1998 元,显然此算法更符合实际平均数。

所以综合以上情况,在 Redis 中使用 HLL 插入数据,相当于把存储的值经过 hash 之后,再将 hash 值转换为二进制,存入到不同的桶中,这样就可以用很小的空间存储很多的数据,统计时再去相应的位置进行对比很快就能得出结论,这就是 HLL 算法的基本原理,想要更深入的了解算法及其推理过程,可以看去原版的论文,

链接地址 - 论文 HyperLogLog: the analysis of a near-optimal cardinality estimation algorithm

11.6 - 小结

当需要做大量数据统计时,普通的集合类型已经不能满足我们的需求了,这个时候我们可以借助 Redis 2.8.9 中提供的 HyperLogLog 来统计,它的优点是只需要使用 12k 的空间就能统计 2^64 的数据,但它的缺点是存在 0.81% 的误差,HyperLogLog 提供了三个操作方法 pfadd 添加元素、pfcount 统计元素和 pfmerge 合并元素。

12-Redis 消息队列

12.1 - 发布订阅模式

在 Redis 中提供了专门的类型:Publisher(发布者)和 Subscriber(订阅者)来实现消息队列。

在文章开始之前,先来介绍消息队列中有几个基础概念,以便大家更好的理解本文的内容。

首先,发布消息的叫做发布方或发布者,也就是消息的生产者,而接收消息的叫做消息的订阅方或订阅者,也就是消费者,用来处理生产者发布的消息。

除了发布和和订阅者,在消息队列中还有一个重要的概念:channel 意为频道或通道,可以理解为某个消息队列的名称,首先消费者先要订阅某个 channel,然后当生产者把消息发送到这个 channel 中时,消费者就可以正常接收到消息了,如下图所示:

1. 普通订阅与发布

消息队列有两个重要的角色,一个是发送者,另一个就是订阅者,对应的命令如下:

  • 发布消息:publish channel “message”
  • 订阅消息:subscribe channel

下面我们来看具体的命令实现。

订阅消息
127.0.0.1:6379> subscribe channel #订阅消息channel
Reading messages...
1) "subscribe"
2) "channel"
3) (integer) 1

相关语法:

subscribe channel [channel ...]

此命令支持订阅一个或多个频道的命令,也就是说一个订阅者可以订阅多个频道。例如,某个客户端订阅了两个频道 channel 和 channel2,当两个发布者分别推送消息后,订阅者的信息输出如下:

127.0.0.1:6379> subscribe channel channel2 #订阅 channel 和 channel2
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "channel"
3) (integer) 1
1) "subscribe"
2) "channel2"
3) (integer) 2
1) "message"
2) "channel" # 收到 channel 消息
3) "message 1."
1) "message"
2) "channel2" # 收到 channel2 消息
3) "message 2."

可以看出此订阅者可以收到来自两个频道的消息推送。

发送消息
127.0.0.1:6379> publish channel "hello,redis." #发布消息
(integer) 1

相关语法:

publish channel message

最后的返回值表示成功发送给几个订阅方,1 表示成功发给了一个订阅者,这个数字可以是 0~n,这是由订阅者的数量决定的。

例如,当有两个订阅者时,推送的结果为 2,如下代码所示。

订阅者一:

127.0.0.1:6379> subscribe channel
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "channel"
3) (integer) 1

订阅者二:

127.0.0.1:6379> subscribe channel
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "channel"
3) (integer) 1

发送消息:

127.0.0.1:6379> publish channel "message"
(integer) 2

可以看出,此消息已成功发给两个订阅者,结果也变成 2 了。

2. 主题订阅

上面介绍了普通的订阅与发布模式,但如果我要订阅某一个类型的消息就不适用了,例如我要订阅日志类的消息队列,它们的命名都是 logXXX,这个时候就需要使用 Redis 提供的另一个功能 Pattern Subscribe 主题订阅,这种方式可以使用 * 来匹配多个频道,如下图所示:

主题模式的具体实现代码如下,订阅者:

127.0.0.1:6379> psubscribe log_* #主题订阅 log_*
1) "psubscribe"
2) "log_*"
3) (integer) 1
1) "pmessage"
2) "log_*"
3) "log_user" #接收到频道 log_user 的消息推送
4) "user message."
1) "pmessage"
2) "log_*"
3) "log_sys" #接收到频道 log_sys 的消息推送
4) "sys message."
1) "pmessage"
2) "log_*"
3) "log_db" #接收到频道 log_db 的消息推送
4) "db message"

从上面的运行结果,可以看出使用命令 psubscribe log_* 可以接收到所有频道包含 log_XXX 的消息。

相关语法:

psubscribe pattern [pattern ...]

生产者的代码如下:

127.0.0.1:6379> publish log_user "user message."
(integer) 1
127.0.0.1:6379> publish log_sys "sys message."
(integer) 1
127.0.0.1:6379> publish log_db "db message"
(integer) 1

3. 代码实战

下面我们使用 Jedis 实现普通的发布订阅模式和主题订阅的功能。

普通模式

消费者代码如下:

/**
* 消费者
*/
public static void consumer() {
Jedis jedis = new Jedis("127.0.0.1", 6379);
// 接收并处理消息
jedis.subscribe(new JedisPubSub() {
@Override
public void onMessage(String channel, String message) {
// 接收消息,业务处理
System.out.println("频道 " + channel + " 收到消息:" + message);
}
}, "channel");
}

生产者代码如下:

/**
* 生产者
*/
public static void producer() {
Jedis jedis = new Jedis("127.0.0.1", 6379);
// 推送消息
jedis.publish("channel", "Hello, channel.");
}

发布者和订阅者模式运行:

public static void main(String[] args) throws InterruptedException {
// 创建一个新线程作为消费者
new Thread(() -> consumer()).start();
// 暂停 0.5s 等待消费者初始化
Thread.sleep(500);
// 生产者发送消息
producer();
}

以上代码运行结果如下:

频道 channel 收到消息:Hello, channel.
主题订阅模式

主题订阅模式的生产者的代码是一样,只有消费者的代码是不同的,如下所示:

/**
* 主题订阅
*/
public static void pConsumer() {
Jedis jedis = new Jedis("127.0.0.1", 6379);
// 主题订阅
jedis.psubscribe(new JedisPubSub() {
@Override
public void onPMessage(String pattern, String channel, String message) {
// 接收消息,业务处理
System.out.println(pattern + " 主题 | 频道 " + channel + " 收到消息:" + message);
}
}, "channel*");
}

主题模式运行代码如下:

public static void main(String[] args) throws InterruptedException {
// 主题订阅
new Thread(() -> pConsumer()).start();
// 暂停 0.5s 等待消费者初始化
Thread.sleep(500);
// 生产者发送消息
producer();
}

以上代码运行结果如下:

channel* 主题 | 频道 channel 收到消息:Hello, channel.

4. 注意事项

发布订阅模式存在以下两个缺点:

  1. 无法持久化保存消息,如果 Redis 服务器宕机或重启,那么所有的消息将会丢失;
  2. 发布订阅模式是 “发后既忘” 的工作模式,如果有订阅者离线重连之后不能消费之前的历史消息。

然而这些缺点在 Redis 5.0 添加了 Stream 类型之后会被彻底的解决。

除了以上缺点外,发布订阅模式还有另一个需要注意问题:当消费端有一定的消息积压时,也就是生产者发送的消息,消费者消费不过来时,如果超过 32M 或者是 60s 内持续保持在 8M 以上,消费端会被强行断开,这个参数是在配置文件中设置的,默认值是 client-output-buffer-limit pubsub 32mb 8mb 60

12.2 - 消息队列的其他实现方式

在 Redis 5.0 之前消息队列的实现方式有很多种,比较常见的除了我们上文介绍的发布订阅模式,还有两种:List 和 ZSet 的实现方式。

List 和 ZSet 的方式解决了发布订阅模式不能持久化的问题,但这两种方式也有自己的缺点,接下来我们一起来了解一下,先从 List 实现消息队列的方式说起。

1. List 版消息队列

List 方式是实现消息队列最简单和最直接的方式,它主要是通过 lpush 和 rpop 存入和读取实现消息队列的,如下图所示:

List 使用命令的方式实现消息队列:

127.0.0.1:6379> lpush mq "hello" #推送消息 hello
(integer) 1
127.0.0.1:6379> lpush mq "msg" #推送消息 msg
(integer) 2
127.0.0.1:6379> rpop mq #接收到消息 hello
"hello"
127.0.0.1:6379> rpop mq #接收到消息 msg
"mq"

其中,mq 就相当于频道名称 channel,而 lpush 用于生产消息, rpop 拉取消息。

代码实现

接下来我们用 Java 代码的方式来实现 List 形式的消息队列,源码如下:

import redis.clients.jedis.Jedis;

public class ListMQExample {
public static void main(String[] args){
// 消费者
new Thread(() -> consumer()).start();
// 生产者
producer();
}
/**
* 生产者
*/
public static void producer() {
Jedis jedis = new Jedis("127.0.0.1", 6379);
// 推送消息
jedis.lpush("mq", "Hello, List.");
}
/**
* 消费者
*/
public static void consumer() {
Jedis jedis = new Jedis("127.0.0.1", 6379);
// 消费消息
while (true) {
// 获取消息
String msg = jedis.rpop("mq");
if (msg != null) {
// 接收到了消息
System.out.println("接收到消息:" + msg);
}
}
}
}

以上程序的运行结果是:

接收到消息:Hello, List.

我们使用无限循环来获取队列中的数据,这样就可以实时地获取相关信息了,但这样会带来另一个新的问题,当队列中如果没有数据的情况下,无限循环会一直消耗系统的资源,这时候我们可以使用 brpop 替代 rpop 来完美解决这个问题。

b 是 blocking 的缩写,表示阻塞读,也就是当队列没有数据时,它会进入休眠状态,当有数据进入队列之后,它才会 “苏醒” 过来执行读取任务,这样就可以解决 while 循环一直执行消耗系统资源的问题了,改良版代码如下:

import redis.clients.jedis.Jedis;

public class ListMQExample {
public static void main(String[] args) throws InterruptedException {
// 消费者 改良版
new Thread(() -> bConsumer()).start();
// 生产者
producer();
}
/**
* 生产者
*/
public static void producer() throws InterruptedException {
Jedis jedis = new Jedis("127.0.0.1", 6379);
// 推送消息
jedis.lpush("mq", "Hello, List.");
Thread.sleep(1000);
jedis.lpush("mq", "message 2.");
Thread.sleep(2000);
jedis.lpush("mq", "message 3.");
}
/**
* 消费者(阻塞版)
*/
public static void bConsumer() {
Jedis jedis = new Jedis("127.0.0.1", 6379);
while (true) {
// 阻塞读
for (String item : jedis.brpop(0,"mq")) {
// 读取到相关数据,进行业务处理
System.out.println(item);
}
}
}
}

其中,brpop () 方法的第一个参数是设置超时时间的,设置 0 表示一直阻塞。

优缺点分析

List 优点:

  • 消息可以被持久化,借助 Redis 本身的持久化(AOF、RDB 或者是混合持久化),可以有效的保存数据;
  • 消费者可以积压消息,不会因为客户端的消息过多而被强行断开。

List 缺点:

  • 消息不能被重复消费,一个消息消费完就会被删除;
  • 没有主题订阅的功能。

2. ZSet 版消息队列

ZSet 版消息队列相比于之前的两种方式,List 和发布订阅方式在实现上要复杂一些,但 ZSet 因为多了一个 score(分值)属性,从而使它具备更多的功能,例如我们可以用它来存储时间戳,以此来实现延迟消息队列等。

它的实现思路和 List 相同也是利用 zadd 和 zrangebyscore 来实现存入和读取,这里就不重复叙述了,读者可以根据 List 的实现方式来实践一下,看能不能实现相应的功能,如果写不出来也没关系,本课程的后面章节,介绍延迟队列的时候会用 ZSet 来实现。

优缺点分析

ZSet 优点:

  • 支持消息持久化;
  • 相比于 List 查询更方便,ZSet 可以利用 score 属性很方便的完成检索,而 List 则需要遍历整个元素才能检索到某个值。

ZSet 缺点:

  • ZSet 不能存储相同元素的值,也就是如果有消息是重复的,那么只能插入一条信息在有序集合中;
  • ZSet 是根据 score 值排序的,不能像 List 一样,按照插入顺序来排序;
  • ZSet 没有向 List 的 brpop 那样的阻塞弹出的功能。

12.3 - 消息队列终极解决方案 —Stream

在 Redis 5.0 Stream 没出来之前,消息队列的实现方式都有着各自的缺陷,例如:

  • 发布订阅模式 PubSub,不能持久化也就无法可靠的保存消息,并且对于离线重连的客户端不能读取历史消息的缺陷;
  • 列表实现消息队列的方式不能重复消费,一个消息消费完就会被删除;
  • 有序集合消息队列的实现方式不能存储相同 value 的消息,并且不能阻塞读取消息。

并且以上三种方式在实现消息队列时,只能存储单 value 值,也就是如果你要存储一个对象的情况下,必须先序列化成 JSON 字符串,在读取之后还要反序列化成对象才行,这也给用户的使用带来的不便,基于以上问题,Redis 5.0 便推出了 Stream 类型也是此版本最重要的功能,用于完美地实现消息队列,它借鉴了 Kafka 的设计思路,它支持消息的持久化和消息轨迹的消费,支持 ack 确认消息的模式,让消息队列更加的稳定和可靠。

Redis Stream 的结构如上图所示,它有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容。消息是持久化的,Redis 重启后,内容还在。

每个 Stream 都有唯一的名称,它就是 Redis 的 key,在我们首次使用 xadd 指令追加消息时自动创建。

每个 Stream 都可以挂多个消费组,每个消费组会有个游标 last_delivered_id 在 Stream 数组之上往前移动,表示当前消费组已经消费到哪条消息了。每个消费组都有一个 Stream 内唯一的名称,消费组不会自动创建,它需要单独的指令 xgroup create 进行创建,需要指定从 Stream 的某个消息 ID 开始消费,这个 ID 用来初始化 last_delivered_id 变量。

每个消费组 (Consumer Group) 的状态都是独立的,相互不受影响。也就是说同一份 Stream 内部的消息会被每个消费组都消费到。

同一个消费组 (Consumer Group) 可以挂接多个消费者 (Consumer),这些消费者之间是竞争关系,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。每个消费者有一个组内唯一名称。

消费者 (Consumer) 内部会有个状态变量 pending_ids,它记录了当前已经被客户端读取的消息,但是还没有 ack。如果客户端没有 ack,这个变量里面的消息 ID 会越来越多,一旦某个消息被 ack,它就开始减少。这个 pending_ids 变量在 Redis 官方被称之为 PEL,也就是 Pending Entries List,这是一个很核心的数据结构,它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了没处理。

1. 消息 ID

消息 ID 的形式是 timestampInMillis-sequence,例如 1527846880572-5,它表示当前的消息在毫米时间戳 1527846880572 时产生,并且是该毫秒内产生的第 5 条消息。消息 ID 可以由服务器自动生成,也可以由客户端自己指定,但是形式必须是整数-整数,而且必须是后面加入的消息的 ID 要大于前面的消息 ID。

2. 消息内容

消息内容就是键值对,形如 hash 结构的键值对,这没什么特别之处。

3. 基础使用

Stream 既然是一个数据类型,那么和其他数据类型相似,它也有一些自己的操作方法,例如:

  • xadd 添加消息;
  • xlen 查询消息长度;
  • xdel 根据消息 ID 删除消息;
  • del 删除整个 Stream;
  • xrange 读取区间消息
  • xread 读取某个消息之后的消息。

具体使用如下所述。

添加消息
127.0.0.1:6379> xadd key * name redis age 10
"1580880750844-0" #结果返回的是消息 id

其中 * 表示使用 Redis 的规则:时间戳 + 序号的方式自动生成 ID,用户也可以自己指定 ID。

相关语法:

xadd key ID field string [field string ...]
查询消息的长度
127.0.0.1:6379> xlen key
(integer) 1

相关语法:

xlen key
删除消息
127.0.0.1:6379> xadd key * name redis
"1580881585129-0" #消息 ID
127.0.0.1:6379> xlen key
(integer) 1
127.0.0.1:6379> xdel key 1580881585129-0 #删除消息,根据 ID
(integer) 1
127.0.0.1:6379> xlen key
(integer) 0

相关语法:

xdel key ID [ID ...]

此命令支持删除一条或多条消息,根据消息 ID。

删除整个 Stream
127.0.0.1:6379> del key #删除整个 Stream
(integer) 1
127.0.0.1:6379> xlen key
(integer) 0

相关语法:

del key [key ...]

此命令支持删除一个或多个 Stream。

查询区间消息
127.0.0.1:6379> xrange mq - +
1) 1) "1580882060464-0"
2) 1) "name"
2) "redis"
3) "age"
4) "10"
2) 1) "1580882071524-0"
2) 1) "name"
2) "java"
3) "age"
4) "20"

其中:- 表示第一条消息,+ 表示最后一条消息。

相关语法:

xrange key start end [COUNT count]
查询某个消息之后的消息
127.0.0.1:6379> xread count 1 streams mq 1580882060464-0
1) 1) "mq"
2) 1) 1) "1580882071524-0"
2) 1) "name"
2) "java"
3) "age"
4) "20"

在名称为 mq 的 Stream 中,从消息 ID 为 1580882060464-0 的,往后查询一条消息。

相关语法:

xread [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

此命令提供了阻塞读的参数 block,我们可以使用它读取从当前数据以后新增数据,命令如下:

127.0.0.1:6379> xread count 1 block 0 streams mq $

其中 block 0 表示一直阻塞,$ 表示从最后开始读取,这个时候新开一个命令行插入一条数据,此命令展示的结果如下:

127.0.0.1:6379> xadd mq * name sql age 20 #新窗口添加数据
"1580890737890-0"
#阻塞读取到的新数据
127.0.0.1:6379> xread count 1 block 0 streams mq $
1) 1) "mq"
2) 1) 1) "1580890737890-0"
2) 1) "name"
2) "sql"
3) "age"
4) "20"
(36.37s)

4. 基础版消息队列

使用 Stream 消费分组实现消息队列的功能和列表方式的消息队列比较相似,使用 xadd 命令和 xread 循环读取就可以实现基础版的消息队列,具体代码如下:

import com.google.gson.Gson;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamEntry;
import redis.clients.jedis.StreamEntryID;
import java.util.AbstractMap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class StreamExample {
public static void main(String[] args) throws InterruptedException {
// 消费者
new Thread(() -> consumer()).start();
Thread.sleep(1000);
// 生产者
producer();
}
/**
* 生产者
*/
public static void producer() throws InterruptedException {
Jedis jedis = new Jedis("127.0.0.1", 6379);
// 推送消息
Map<String, String> map = new HashMap<>();
map.put("name", "redis");
map.put("age", "10");
// 添加消息
StreamEntryID id = jedis.xadd("mq", null, map);
System.out.println("消息添加成功 ID:" + id);
}
/**
* 消费者
*/
public static void consumer() {
Jedis jedis = new Jedis("127.0.0.1", 6379);
// 消费消息
while (true) {
// 获取消息,new StreamEntryID().LAST_ENTRY 标识获取当前时间以后的新增消息
Map.Entry<String, StreamEntryID> entry = new AbstractMap.SimpleImmutableEntry<>("mq",
new StreamEntryID().LAST_ENTRY);
// 阻塞读取一条消息(最大阻塞时间120s)
List<Map.Entry<String, List<StreamEntry>>> list = jedis.xread(1, 120 * 1000, entry);
if (list.size() == 1) {
// 读取到消息
System.out.println("读取到消息 ID:" + list.get(0).getValue().get(0).getID());
// 使用 Gson 来打印 JSON 格式的消息内容
System.out.println("内容:" + new Gson().toJson(list.get(0).getValue().get(0).getFields()));
}
}
}
}

以上代码运行结果如下:

消息添加成功 ID:1580895735148-0
读取到消息 ID:1580895735148-0
内容:{"name":"redis","age":"10"}

以上代码需要特殊说明的是,我们使用 new StreamEntryID().LAST_ENTRY 来实现读取当前时间以后新增的消息,如果要从头读取历史消息把这行代码中的 .LAST_ENTRY 去掉即可。

还有一点需要注意,在 Jedis 框架中如果使用 jedis.xread () 方法来阻塞读取消息队列,第二个参数 long block 必须设置大于 0,如果设置小于 0,此阻塞条件就无效了,我查看了 jedis 的源码发现,它只有判断在大于 0 的时候才会设置阻塞属性,源码如下:

if (block > 0L) {
params[streamsIndex++] = Keyword.BLOCK.raw;
params[streamsIndex++] = Protocol.toByteArray(block);
}

所以 block 属性我们可以设置一个比较大的值来阻塞读取消息。

所谓的阻塞读取消息指的是当队列中没有数据时会进入休眠模式,等有数据之后才会唤醒继续执行。

5. 创建消费组

在开始使用消息分组之前,我们必须手动创建分组才行,以下是几个和 Stream 分组有关的命令,我们先来学习一下它的使用。

消息分组命令
创建消费者群组
127.0.0.1:6379> xgroup create mq group1 0-0 
OK

相关语法:

xgroup create stream-key group-key ID
  • mq 为 Stream 的 key;
  • group1 为分组的名称;
  • 0-0 表示从第一条消息开始读取。

如果要从当前最后一条消息向后读取,使用 $ 即可,命令如下:

127.0.0.1:6379> xgroup create mq group2 $
OK
读取消息
127.0.0.1:6379> xreadgroup group group1 c1 count 1 streams mq >
1) 1) "mq"
2) 1) 1) "1580959593553-0"
2) 1) "name"
2) "redis"
3) "age"
4) "10"

相关语法:

xreadgroup group group-key consumer-key streams stream-key
  • > 表示读取下一条消息;
  • group1 表示分组名称;
  • c1 表示 consumer(消费者)名称。

xreadgroup 命令和 xread 使用类似,也可以设置阻塞读取,命令如下:

127.0.0.1:6379> xreadgroup group group1 c2 streams mq >
1) 1) "mq"
2) 1) 1) "1580959606181-0"
2) 1) "name"
2) "java"
3) "age"
4) "20"
127.0.0.1:6379> xreadgroup group group1 c2 streams mq >
(nil) #队列中的消息已经被读取完
127.0.0.1:6379> xreadgroup group group1 c1 count 1 block 0 streams mq > #阻塞读取

此时打开另一个命令行创建使用 xadd 添加一条消息,阻塞命令执行结果如下:

127.0.0.1:6379> xreadgroup group group1 c1 count 1 block 0 streams mq >
1) 1) "mq"
2) 1) 1) "1580961475368-0"
2) 1) "name"
2) "sql"
3) "age"
4) "20"
(86.14s)
消息消费确认

接收到消息之后,我们要手动确认一下(ack),命令如下:

127.0.0.1:6379> xack mq group1 1580959593553-0
(integer) 1

相关语法:

xack key group-key ID [ID ...]

消费确认增加了消息的可靠性,一般在业务处理完成之后,需要执行 ack 确认消息已经被消费完成,整个流程的执行如下图所示:

查询未确认的消费队列
127.0.0.1:6379> xpending mq group1
1) (integer) 1 #未确认(ack)的消息数量为 1 条
2) "1580994063971-0"
3) "1580994063971-0"
4) 1) 1) "c1"
2) "1"
127.0.0.1:6379> xack mq group1 1580994063971-0 #消费确认
(integer) 1
127.0.0.1:6379> xpending mq group1
1) (integer) 0 #没有未确认的消息
2) (nil)
3) (nil)
4) (nil)
xinfo 查询相关命令
1. 查询流信息
127.0.0.1:6379> xinfo stream mq
1) "length"
2) (integer) 2 #队列中有两个消息
3) "radix-tree-keys"
4) (integer) 1
5) "radix-tree-nodes"
6) (integer) 2
7) "groups"
8) (integer) 1 #一个消费分组
9) "last-generated-id"
10) "1580959606181-0"
11) "first-entry"
12) 1) "1580959593553-0"
2) 1) "name"
2) "redis"
3) "age"
4) "10"
13) "last-entry"
14) 1) "1580959606181-0"
2) 1) "name"
2) "java"
3) "age"
4) "20"

相关语法:

xinfo stream stream-key
2. 查询消费组消息
127.0.0.1:6379> xinfo groups mq
1) 1) "name"
2) "group1" #消息分组名称
3) "consumers"
4) (integer) 1 #一个消费者客户端
5) "pending"
6) (integer) 1 #一个未确认消息
7) "last-delivered-id"
8) "1580959593553-0" #读取的最后一条消息 ID

相关语法:

xinfo groups stream-key
3. 查看消费者组成员信息
127.0.0.1:6379> xinfo consumers mq group1
1) 1) "name"
2) "c1" #消费者名称
3) "pending"
4) (integer) 0 #未确认消息
5) "idle"
6) (integer) 481855

相关语法:

xinfo consumers stream group-key
删除消费者
127.0.0.1:6379> xgroup delconsumer mq group1 c1
(integer) 1

相关语法:

xgroup delconsumer stream-key group-key consumer-key
删除消费组
127.0.0.1:6379> xgroup destroy mq group1
(integer) 1

相关语法:

xgroup destroy stream-key group-key

6. 进阶版消费队列

接下来我们使用 Jedis 来实现 Stream 分组消息队列,代码如下:

import com.google.gson.Gson;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamEntry;
import redis.clients.jedis.StreamEntryID;
import utils.JedisUtils;

import java.util.AbstractMap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class StreamGroupExample {
private static final String _STREAM_KEY = "mq"; // 流 key
private static final String _GROUP_NAME = "g1"; // 分组名称
private static final String _CONSUMER_NAME = "c1"; // 消费者 1 的名称
private static final String _CONSUMER2_NAME = "c2"; // 消费者 2 的名称
public static void main(String[] args) {
// 生产者
producer();
// 创建消费组
createGroup(_STREAM_KEY, _GROUP_NAME);
// 消费者 1
new Thread(() -> consumer()).start();
// 消费者 2
new Thread(() -> consumer2()).start();
}
/**
* 创建消费分组
* @param stream 流 key
* @param groupName 分组名称
*/
public static void createGroup(String stream, String groupName) {
Jedis jedis = JedisUtils.getJedis();
jedis.xgroupCreate(stream, groupName, new StreamEntryID(), true);
}
/**
* 生产者
*/
public static void producer() {
Jedis jedis = JedisUtils.getJedis();
// 添加消息 1
Map<String, String> map = new HashMap<>();
map.put("data", "redis");
StreamEntryID id = jedis.xadd(_STREAM_KEY, null, map);
System.out.println("消息添加成功 ID:" + id);
// 添加消息 2
Map<String, String> map2 = new HashMap<>();
map2.put("data", "java");
StreamEntryID id2 = jedis.xadd(_STREAM_KEY, null, map2);
System.out.println("消息添加成功 ID:" + id2);
}
/**
* 消费者 1
*/
public static void consumer() {
Jedis jedis = JedisUtils.getJedis();
// 消费消息
while (true) {
// 读取消息
Map.Entry<String, StreamEntryID> entry = new AbstractMap.SimpleImmutableEntry<>(_STREAM_KEY,
new StreamEntryID().UNRECEIVED_ENTRY);
// 阻塞读取一条消息(最大阻塞时间120s)
List<Map.Entry<String, List<StreamEntry>>> list = jedis.xreadGroup(_GROUP_NAME, _CONSUMER_NAME, 1,
120 * 1000, true, entry);
if (list != null && list.size() == 1) {
// 读取到消息
Map<String, String> content = list.get(0).getValue().get(0).getFields(); // 消息内容
System.out.println("Consumer 1 读取到消息 ID:" + list.get(0).getValue().get(0).getID() +
" 内容:" + new Gson().toJson(content));
}
}
}
/**
* 消费者 2
*/
public static void consumer2() {
Jedis jedis = JedisUtils.getJedis();
// 消费消息
while (true) {
// 读取消息
Map.Entry<String, StreamEntryID> entry = new AbstractMap.SimpleImmutableEntry<>(_STREAM_KEY,
new StreamEntryID().UNRECEIVED_ENTRY);
// 阻塞读取一条消息(最大阻塞时间120s)
List<Map.Entry<String, List<StreamEntry>>> list = jedis.xreadGroup(_GROUP_NAME, _CONSUMER2_NAME, 1,
120 * 1000, true, entry);
if (list != null && list.size() == 1) {
// 读取到消息
Map<String, String> content = list.get(0).getValue().get(0).getFields(); // 消息内容
System.out.println("Consumer 2 读取到消息 ID:" + list.get(0).getValue().get(0).getID() +
" 内容:" + new Gson().toJson(content));
}
}
}
}

以上代码运行结果如下:

消息添加成功 ID:1580971482344-0
消息添加成功 ID:1580971482415-0
Consumer 1 读取到消息 ID:1580971482344-0 内容:{"data":"redis"}
Consumer 2 读取到消息 ID:1580971482415-0 内容:{"data":"java"}

其中,jedis.xreadGroup () 方法的第五个参数 noAck 表示是否自动确认消息,如果设置 true 收到消息会自动确认(ack)消息,否则则需要手动确认。

注意:Jedis 框架要使用最新版,低版本 block 设置大于 0 时,会有 bug 抛连接超时异常。

可以看出,同一个分组内的多个 consumer 会读取到不同消息,不同的 consumer 不会读取到分组内的同一条消息。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK