Redis实践应用
Redis 是一款高性能的键值对数据库,它在内存中存储数据并支持持久化。
Redis 提供了丰富的数据类型和功能,使其成为许多应用场景的理想选择。
本文将以5.x版本为例深入探讨 Redis 的实践应用。
# 一、数据类型及其命令
Redis 支持多种数据类型,这里先对其基础类型进行介绍。
# 1、字符串(String)
字符串是 Redis 中最基本的数据结构,它可以存储文本或二进制数据。相关命令如下:
set
: 用来设置 string 值,第一个参数是 key,第二个参数是 value。示例:set mykey "something"
。get
: 获取 key 对应的 string 值。示例:get mykey
。strlen
: 获取 key 对应的 string 值的长度。示例:strlen mykey
。setnx
: 如果 key 不存在,则设置 string 值。示例:setnx mykey "newvalue"
。setex
: 设置 key 对应的 string 值,并设置过期时间(单位:秒)。示例:setex mykey 60 "newvalue"
。psetex
: 设置 key 对应的 string 值,并设置过期时间(单位:毫秒)。示例:psetex mykey 60000 "newvalue"
。mset
: 一次设置多个 key 的 string 值。示例:mset key1 "value1" key2 "value2" key3 "value3"
。mget
: 一次获取多个 key 的 string 值。示例:mget key1 key2 key3
。msetnx
: 如果所有给定的 key 都不存在,则一次设置多个 key 的 string 值。示例:msetnx key1 "value1" key2 "value2"
。incr
: 将 key 对应的数字值自增 1。示例:incr mykey
。decr
: 将 key 对应的数字值自减 1。示例:decr mykey
。incrby
: 将 key 对应的数字值增加指定整数。示例:incrby mykey 5
。decrby
: 将 key 对应的数字值减少指定整数。示例:decrby mykey 5
。incrbyfloat
: 将 key 对应的数字值增加指定浮点数。示例:incrbyfloat mykey 1.5
。append
: 将指定字符串追加到 key 对应的 string 值。示例:append mykey "extra"
。getrange
: 获取 key 对应的 string 值的指定范围。示例:getrange mykey 0 4
。setrange
: 修改 key 对应的 string 值的指定范围。示例:setrange mykey 4 "newpart"
。getset
: 将 key 对应的 string 值设置为新值,并返回旧值。示例:getset mykey "newvalue"
。
# 2、列表(List)
列表是一个有序的字符串集合,可以通过索引访问。相关命令如下:
lpush
: 在列表头部插入一个或多个值。示例:lpush mylist "apple" "banana" "cherry"
。rpush
: 在列表尾部插入一个或多个值。示例:rpush mylist "apple" "banana" "cherry"
。lpop
: 移除并返回列表头部的元素。示例:lpop mylist
。rpop
: 移除并返回列表尾部的元素。示例:rpop mylist
。lrange
: 返回列表中指定范围内的元素。示例:lrange mylist 0 -1
。llen
: 返回列表的长度。示例:llen mylist
。lindex
: 返回列表中指定位置的元素。示例:lindex mylist 0
。lset
: 设置列表中指定位置的元素。示例:lset mylist 0 "orange"
。linsert
: 在列表的指定元素前或后插入一个新元素。示例:linsert mylist before "banana" "orange"
。lrem
: 移除列表中指定数量的指定元素。示例:lrem mylist 1 "banana"
。rpoplpush
: 移除列表的尾部元素,并将其插入到另一个列表的头部。示例:rpoplpush sourceList destList
。
阻塞相关的命令:
blpop
: 从列表头部移除并返回第一个非空元素,如果列表为空则阻塞等待。示例:blpop mylist 0
。brpop
: 从列表尾部移除并返回第一个非空元素,如果列表为空则阻塞等待。示例:brpop mylist 0
。brpoplpush
: 从一个列表的尾部移除元素,并将其插入到另一个列表的头部,同时返回该元素。如果源列表为空则阻塞等待。示例:brpoplpush sourceList destList 0
。
请注意,blpop
和 brpop
命令的最后一个参数是超时时间(单位:秒)。设置为 0 表示无限期等待。
# 3、集合(Set)
集合是一个无序的、不包含重复元素的字符串集合。相关命令如下:
sadd
: 向集合添加一个或多个成员。示例:sadd myset "apple" "banana" "cherry"
。smembers
: 返回集合中的所有成员。示例:smembers myset
。sismember
: 判断成员是否是集合的成员。示例:sismember myset "apple"
。scard
: 获取集合的成员数。示例:scard myset
。spop
: 移除并返回集合中的一个随机元素。示例:spop myset
。srem
: 移除集合中一个或多个成员。示例:srem myset "apple" "banana"
。sunion
: 返回给定集合的并集。示例:sunion set1 set2
。sinter
: 返回给定集合的交集。示例:sinter set1 set2
。sdiff
: 返回给定集合的差集。示例:sdiff set1 set2
。smove
: 将一个成员从一个集合移动到另一个集合。示例:smove sourceSet destSet "apple"
。sdiffstore
: 返回给定集合的差集并存储在目标集合。示例:sdiffstore destSet set1 set2
。sinterstore
: 返回给定集合的交集并存储在目标集合。示例:sinterstore destSet set1 set2
。sunionstore
: 返回给定集合的并集并存储在目标集合。示例:sunionstore destSet set1 set2
。
# 4、有序集合(Sorted Set)
有序集合类似于集合,但是每个成员都关联了一个分数(score),根据分数排序。相关命令如下:
zadd
: 向有序集合添加一个或多个成员,或更新已存在成员的分数。示例:zadd myzset 1 "apple" 2 "banana" 3 "cherry"
。zrange
: 返回有序集合中指定范围内的成员,按分数值递增排序。示例:zrange myzset 0 -1
。zrevrange
: 返回有序集合中指定范围内的成员,按分数值递减排序。示例:zrevrange myzset 0 -1
。zrangebyscore
: 返回有序集合中指定分数区间内的成员,按分数值递增排序。示例:zrangebyscore myzset 1 3
。zrevrangebyscore
: 返回有序集合中指定分数区间内的成员,按分数值递减排序。示例:zrevrangebyscore myzset 3 1
。zcount
: 返回有序集合中指定分数区间内的成员数量。示例:zcount myzset 1 3
。zcard
: 获取有序集合的成员数。示例:zcard myzset
。zscore
: 返回有序集合中指定成员的分数。示例:zscore myzset "apple"
。zrank
: 返回有序集合中指定成员的排名,按分数值递增排序。示例:zrank myzset "apple"
。zrevrank
: 返回有序集合中指定成员的排名,按分数值递减排序。示例:zrevrank myzset "apple"
。zrem
: 移除有序集合中的一个或多个成员。示例:zrem myzset "apple" "banana"
。zincrby
: 为有序集合中的成员增加分数。示例:zincrby myzset 1 "apple"
。zremrangebyrank
: 移除有序集合中指定排名区间的所有成员。示例:zremrangebyrank myzset 0 1
。zremrangebyscore
: 移除有序集合中指定分数区间的所有成员。示例:zremrangebyscore myzset 1 2
。zinterstore
: 计算给定的一个或多个有序集合的交集并将结果集存储在新的有序集合中。示例:zinterstore out 2 zset1 zset2
。zunionstore
: 计算给定的一个或多个有序集合的并集并将结果集存储在新的有序集合中。示例:zunionstore out 2 zset1 zset2
。zscan
: 增量迭代有序集合中的元素。示例:zscan myzset 0
。
# 5、哈希(Hash)
哈希是一个由键值对组成的无序集合,它将键值对映射到一个 Redis 键。相关命令如下:
hset
: 为哈希表中的字段赋值。示例:hset myhash field1 "value1"
。hget
: 获取哈希表中指定字段的值。示例:hget myhash field1
。hexists
: 查看哈希表中,指定的字段是否存在。示例:hexists myhash field1
。hdel
: 删除哈希表中的一个或多个指定字段。示例:hdel myhash field1 field2
。hlen
: 获取哈希表中字段的数量。示例:hlen myhash
。hkeys
: 获取哈希表中的所有字段名。示例:hkeys myhash
。hvals
: 获取哈希表中的所有字段值。示例:hvals myhash
。hgetall
: 获取哈希表中的所有字段和值。示例:hgetall myhash
。hincrby
: 为哈希表中的指定字段的整数值加上增量。示例:hincrby myhash field1 1
。hincrbyfloat
: 为哈希表中的指定字段的浮点数值加上增量。示例:hincrbyfloat myhash field1 1.0
。hmset
: 同时将多个字段值对设置到哈希表的多个字段。示例:hmset myhash field1 "value1" field2 "value2"
。hmget
: 获取哈希表中所有给定字段的值。示例:hmget myhash field1 field2
。hscan
: 增量地迭代哈希表中的键值对。示例:hscan myhash 0
。
# 二、数据库命令
Redis 支持多个数据库,每个数据库都是一个独立的键空间。以下是与数据库相关的命令:
select
: 切换到指定的数据库。示例:select 1
。flushdb
: 清空当前数据库中的所有 key。示例:flushdb
。flushall
: 清空整个 Redis 服务器的数据。示例:flushall
。dbsize
: 返回当前数据库的 key 的数量。示例:dbsize
。info
: 获取 Redis 服务器的信息和统计数据。示例:info
。lastsave
: 返回最近一次 Redis 成功将数据存储到磁盘上的时间。示例:lastsave
。bgsave
: 在后台异步保存当前数据库的数据到磁盘。示例:bgsave
。save
: 阻塞方式将所有数据保存到磁盘。示例:save
。config get
: 获取 Redis 配置参数的值。示例:config get maxmemory
。config set
: 设置 Redis 配置参数的值。示例:config set maxmemory 100mb
。slaveof
: 改变复制策略设置。示例:slaveof 127.0.0.1 6379
。monitor
: 实时打印出 Redis 服务器接收到的命令
# 三、连接命令
Redis为了确保连接的安全性和稳定性,有如下命令:
auth
: 通过密码认证连接。ping
: 检查与服务器的连接是否仍然活着。echo
: 返回给定的字符串。select
: 选择指定的数据库。quit
: 关闭连接。client getname
: 获取与客户端相关联的名字。client setname
: 将与客户端相关联的名字设置为给定的字符串。client list
: 获取所有连接到服务器的客户端信息。client kill
: 关闭指定的客户端连接。
通过auth命令可以进行密码认证,确保连接的安全性。使用select命令可以切换到指定的数据库上下文。通过client命令可以获取或设置客户端相关的信息,以及管理连接到服务器的客户端。
# 四、命令
Redis 中的键(key)是唯一的,用于标识存储的数据。以下是与 key 相关的命令:
del
: 删除给定的一个或多个 key。示例:del key1 key2
。dump
: 序列化给定 key,并返回被序列化的值。示例:dump mykey
。exists
: 检查给定 key 是否存在。示例:exists mykey
。expire
: 为给定 key 设置过期时间(以秒为单位)。示例:expire mykey 60
。expireat
: 为给定 key 设置过期时间(以 POSIX 时间戳为单位)。示例:expireat mykey 1677598225
。keys
: 查找所有符合给定模式的 key。示例:keys *
。move
: 将当前数据库的 key 移动到给定的数据库。示例:move mykey 1
。persist
: 移除给定 key 的过期时间。示例:persist mykey
。pexpire
: 为给定 key 设置过期时间(以毫秒为单位)。示例:pexpire mykey 60000
。pexpireat
: 为给定 key 设置过期时间(以毫秒级别的 POSIX 时间戳为单位)。示例:pexpireat mykey 1677598225000
。pttl
: 以毫秒为单位返回 key 的剩余过期时间。示例:pttl mykey
。rename
: 修改 key 的名称。示例:rename oldkey newkey
。renamenx
: 仅当新 key 不存在时,将 key 改名为 newkey。示例:renamenx oldkey newkey
。ttl
: 以秒为单位,返回给定 key 的剩余过期时间。示例:ttl mykey
。type
: 返回 key 所储存的值的数据类型。示例:type mykey
。scan
: 增量地迭代一组 key 和相应的值。示例:scan 0
。
# 五、位图(Bitmaps)
位图本质上是一个字符串,它将特殊的命令用于处理位值。位图可用于大量的计数场景,如用户在线状态、网站访问统计等。相关命令如下:
setbit
: 对 key 所储存的字符串值,设置或清除指定偏移量上的位(bit)。示例:setbit mykey 7 1
。getbit
: 对 key 所储存的字符串值,获取指定偏移量上的位(bit)。示例:getbit mykey 7
。bitcount
: 计算给定字符串中被设置为 1 的位的数量。示例:bitcount mykey
。bitop
: 对一个或多个保存二进制位的字符串 key 进行位元操作,并将结果保存到 destkey 上。示例:bitop AND destkey mykey1 mykey2
。bitpos
: 返回 key 对应的字符串值中第一个值为 bit 的位的位置。示例:bitpos mykey 1
。bitfield
: 对一个或多个整数字段进行位操作。示例:bitfield mykey set i5 0 31
。
# 六、HyperLogLog(基数估算器)
HyperLogLog 是一种基数估算器,用于估算一个集合中不重复元素的数量。相关命令如下:
pfadd
: 将指定元素添加到 HyperLogLog 中。示例:pfadd myhll "apple" "banana" "cherry"
。pfcount
: 返回给定 HyperLogLog 的基数估算值。示例:pfcount myhll
。pfmerge
: 将多个 HyperLogLog 合并为一个。示例:pfmerge desthll hll1 hll2 hll3
。
# 七、GeoHash
Redis 支持地理位置数据的存储和查询,基于 GeoHash 算法。相关命令如下:
geoadd
: 添加地理位置坐标。示例:geoadd mygeo 13.361389 38.115556 "Palermo" 15.087269 37.502669 "Catania"
。geopos
: 获取地理位置的坐标。示例:geopos mygeo "Palermo" "Catania"
。geodist
: 计算两个地理位置之间的距离。示例:geodist mygeo "Palermo" "Catania" km
。georadius
: 查询指定范围内的地理位置。示例:georadius mygeo 15 37 100 km
。georadiusbymember
: 查询与指定地理位置成员在给定范围内的地理位置。示例:georadiusbymember mygeo "Palermo" 100 km
。geohash
: 返回地理位置的 GeoHash 字符串。示例:geohash mygeo "Palermo" "Catania"
。
# 八、Stream(流)
Redis 5.0 引入了 Stream 数据类型,用于处理消息流和日志数据。Stream 提供了消息持久化、消费组、消息确认等功能。
# 1、基本命令
xadd
: 向流中添加消息。示例:xadd mystream * name "Alice" age 30
。xrange
: 返回流中指定范围的消息。示例:xrange mystream - +
。xrevrange
: 反向返回流中指定范围的消息。示例:xrevrange mystream + -
。xlen
: 返回流的长度。示例:xlen mystream
。xread
: 读取一个或多个流的消息。示例:xread count 2 streams mystream 0
。xdel
: 删除流中的消息。示例:xdel mystream 1234567890-0
。xtrim
: 修剪流的长度。示例:xtrim mystream maxlen 100
。
# 2、消费组命令
xgroup create
: 创建消费组。示例:xgroup create mystream mygroup $
。xgroup setid
: 设置消费组的最后消费ID。示例:xgroup setid mystream mygroup $
。xgroup destroy
: 删除消费组。示例:xgroup destroy mystream mygroup
。xgroup delconsumer
: 删除消费组中的消费者。示例:xgroup delconsumer mystream mygroup myconsumer
。xreadgroup
: 从消费组中读取消息。示例:xreadgroup group mygroup myconsumer count 2 streams mystream >
。xack
: 确认消息已被处理。示例:xack mystream mygroup 1234567890-0
。xpending
: 查看待处理消息。示例:xpending mystream mygroup
。xclaim
: 转移消息的所有权。示例:xclaim mystream mygroup myconsumer 3600000 1234567890-0
。
# 3、使用示例
# 创建流并添加消息
xadd orders * user "user123" product "laptop" price 1299
xadd orders * user "user456" product "phone" price 699
# 创建消费组
xgroup create orders order-processing $
# 消费者读取消息
xreadgroup group order-processing consumer1 count 1 streams orders >
# 确认消息处理完成
xack orders order-processing 1234567890-0
# 九、发布与订阅
Redis提供了简单的发布订阅功能。相关命令如下:
subscribe
: 订阅一个或多个频道。示例:subscribe news
或subscribe news sports
。unsubscribe
: 取消订阅一个或多个频道。示例:unsubscribe news
或unsubscribe news sports
。psubscribe
: 订阅一个或多个匹配模式。示例:psubscribe news.*
或psubscribe *news* sports.*
。punsubscribe
: 取消订阅一个或多个匹配模式。示例:punsubscribe news.*
或punsubscribe *news* sports.*
。publish
: 向指定频道发布消息。示例:publish news "Hello, world!"
。pubsub channels [pattern]
: 列出所有被订阅的频道。如果指定了pattern,则只列出与之匹配的频道。示例:pubsub channels
或pubsub channels news.*
。pubsub numsub [channel [channel ...]]
: 获取指定频道的订阅者数量。如果未指定任何频道,则返回所有频道的订阅者数量。示例:pubsub numsub news
或pubsub numsub news sports
。pubsub numpat
: 获取匹配模式的订阅数量。示例:pubsub numpat
。
# 十、事务
Redis提供对事务的支持。相关命令如下:
multi
: 开启一个新事务。exec
: 执行所有事务内的命令。如果事务中有语法错误或运行时错误,整个事务将会回滚。discard
: 放弃当前事务。watch
: 监视一个或多个键,如果在执行exec之前键的值发生变化,整个事务将被回滚。可以监视多个键,也可以在一个事务内多次使用watch命令。unwatch
: 取消所有键的监视。
在Redis事务中,使用multi命令开始一个新事务,然后按顺序执行一系列命令。这些命令不会立即被执行,而是被存储在事务队列中。当执行exec命令时,Redis会逐个执行所有事务内的命令。如果事务中所有命令都执行成功,则事务被提交;如果有任何一个命令执行失败,则整个事务将被回滚。
watch命令用于在事务执行期间监视一个或多个键,如果在执行exec之前键的值发生变化,整个事务将被回滚。这样可以避免在事务执行期间其他客户端对被监视的键进行操作导致数据不一致的问题。
discard命令用于放弃当前事务,将事务队列清空并退出事务执行环境。
Redis事务允许在单个客户端内执行一系列操作,将这些操作作为一个原子性操作进行提交或回滚。使用事务可以保证一系列操作的原子性,避免出现数据不一致的问题。
# 十一、持久化
Redis提供了两种持久化方式,分别是RDB持久化和AOF持久化。
# 1、RDB持久化
RDB持久化是Redis默认的持久化方式,它将Redis在内存中的数据定期保存到磁盘上,形成一个快照文件。RDB文件可以用于备份、恢复或者迁移Redis实例。
# 1.1、RDB配置
在 redis.conf
中配置 RDB 持久化:
# 保存快照的条件
save 900 1 # 900秒内至少1个key被修改
save 300 10 # 300秒内至少10个key被修改
save 60 10000 # 60秒内至少10000个key被修改
# RDB文件名
dbfilename dump.rdb
# RDB文件存放目录
dir /var/lib/redis
# 压缩RDB文件
rdbcompression yes
# RDB校验
rdbchecksum yes
# 1.2、RDB持久化相关的命令
save
: 在主线程中同步执行SAVE命令,将数据保存到磁盘上。在持久化操作完成之前,Redis主线程将被阻塞。bgsave
: 异步执行BGSAVE命令,将数据保存到磁盘上。在持久化操作完成之前,Redis主线程不会被阻塞。lastsave
: 返回上一次成功将数据保存到磁盘上的时间,以UNIX时间戳的形式表示。
# 1.3、RDB优缺点
优点:
- RDB文件紧凑,适合备份和灾难恢复
- 恢复速度快
- 性能影响小(fork子进程处理)
缺点:
- 可能丢失最后一次快照后的数据
- fork操作在大数据集时可能耗时较长
# 2、AOF持久化
AOF持久化是一种追加式文件写入方式,它记录Redis服务器接收到的每一条写命令,并将这些命令追加到AOF文件末尾。当Redis重新启动时,它可以通过重新执行AOF文件中记录的命令来恢复数据。
# 2.1、AOF配置
在 redis.conf
中配置 AOF 持久化:
# 启用AOF
appendonly yes
# AOF文件名
appendfilename "appendonly.aof"
# AOF同步策略
appendfsync everysec # 每秒同步一次(推荐)
# appendfsync always # 每次写操作都同步
# appendfsync no # 由操作系统决定何时同步
# AOF重写触发条件
auto-aof-rewrite-percentage 100 # AOF文件增长100%时触发重写
auto-aof-rewrite-min-size 64mb # AOF文件最小大小
# 重写期间是否继续AOF
no-appendfsync-on-rewrite no
# 2.2、AOF持久化相关的命令
bgrewriteaof
: 异步执行BGREWRITEAOF命令,将AOF文件重写为更紧凑的格式。config set appendonly yes
: 启用AOF持久化。config set appendfsync always
: 将AOF缓冲区中的命令写入磁盘的频率设置为每次命令都写入磁盘。config set appendfsync everysec
: 将AOF缓冲区中的命令写入磁盘的频率设置为每秒钟写入一次磁盘。config set appendfsync no
: 将AOF缓冲区中的命令写入磁盘的频率设置为尽可能少地写入磁盘。
# 2.3、AOF优缺点
优点:
- 数据安全性更高,最多丢失1秒数据
- AOF文件易于理解和修复
- 支持后台重写,不影响服务
缺点:
- AOF文件通常比RDB文件大
- 恢复速度比RDB慢
- 可能存在AOF重写的性能开销
# 3、混合持久化
Redis 4.0 开始支持 RDB-AOF 混合持久化,结合了两者的优点:
# 启用混合持久化
aof-use-rdb-preamble yes
混合持久化时,AOF重写会生成一个RDB格式的文件头,后续的修改以AOF格式追加。这样既保证了恢复速度,又减少了数据丢失的风险。
Redis提供了RDB持久化和AOF持久化两种方式,以确保数据在发生故障时不会丢失。使用RDB持久化可以定期保存Redis的快照文件,而使用AOF持久化可以记录Redis服务器接收到的每一条写命令。在生产环境中,通常会同时使用这两种方式来提高数据的安全性和可靠性。
# 十二、Lua脚本
Redis脚本是一种由Lua脚本语言编写的命令序列,可以在Redis服务器端运行。脚本可以用于执行复杂的数据处理、计算或者业务逻辑,也可以用于实现原子性的操作或者复杂的事务。
Redis提供了以下脚本相关的命令:
eval
: 执行Lua脚本,并返回执行结果。evalsha
: 执行已经存储在服务器端的Lua脚本,并返回执行结果。script exists
: 检查指定的脚本是否已经被服务器端缓存。script load
: 将指定的脚本缓存到服务器端,并返回脚本的SHA1摘要。script flush
: 清空服务器端缓存的所有Lua脚本。script kill
: 中断正在执行的Lua脚本。
使用eval命令可以执行一个由Lua脚本语言编写的命令序列,并返回执行结果。eval命令需要传入两个参数:第一个参数是Lua脚本字符串,第二个参数是一个可选的参数列表,用于传递参数给脚本。脚本可以使用Redis提供的一系列API来访问Redis的数据结构,进行数据处理和计算等操作。
使用evalsha命令可以执行已经存储在服务器端的Lua脚本,并返回执行结果。evalsha命令需要传入两个参数:第一个参数是脚本的SHA1摘要,第二个参数是一个可选的参数列表,用于传递参数给脚本。evalsha命令可以提高脚本的执行效率,因为它避免了每次执行时都需要传递脚本字符串的开销。
使用script exists命令可以检查指定的脚本是否已经被服务器端缓存。如果指定的脚本已经被缓存,则返回1,否则返回0。
使用script load命令可以将指定的脚本缓存到服务器端,并返回脚本的SHA1摘要。缓存脚本可以提高脚本的执行效率,因为它避免了每次执行时都需要传递脚本字符串的开销。
使用script flush命令可以清空服务器端缓存的所有Lua脚本。
使用script kill命令可以中断正在执行的Lua脚本。
总的来说,Redis脚本可以用于执行复杂的数据处理、计算或者业务逻辑,也可以用于实现原子性的操作或者复杂的事务。使用Lua脚本可以将多个操作打包成一个原子性操作,从而提高数据的安全性和可靠性。
# 1、分布式锁的示例
定义一段脚本:
-- 获得锁的值
local lockValue = redis.call('get', KEYS[1])
-- 如果锁没有被占用,则获取锁
if not lockValue then
redis.call('set', KEYS[1], ARGV[1])
redis.call('expire', KEYS[1], ARGV[2])
return true
end
-- 如果锁被当前线程占用,则更新锁的过期时间
if lockValue == ARGV[1] then
redis.call('expire', KEYS[1], ARGV[2])
return true
end
-- 如果锁被其他线程占用,则返回false
return false
在这个脚本中,我们使用Redis提供的get
命令获取锁的值,如果锁没有被占用,则获取锁。如果锁被当前线程占用,则更新锁的过期时间。如果锁被其他线程占用,则返回false。
在执行这个脚本时,我们需要传入三个参数:第一个参数是锁的键名,第二个参数是要获取锁的值,第三个参数是锁的过期时间。
使用Java语言来执行这个脚本:
Jedis jedis = new Jedis("localhost");
String lockKey = "mylock";
String lockValue = UUID.randomUUID().toString();
int expireTime = 300;
String script = "local lockValue = redis.call('get', KEYS[1]); " +
"if not lockValue then " +
" redis.call('set', KEYS[1], ARGV[1]); " +
" redis.call('expire', KEYS[1], ARGV[2]); " +
" return true; " +
"end; " +
"if lockValue == ARGV[1] then " +
" redis.call('expire', KEYS[1], ARGV[2]); " +
" return true; " +
"end; " +
"return false;";
List<String> keys = new ArrayList<>();
keys.add(lockKey);
List<String> args = new ArrayList<>();
args.add(lockValue);
args.add(String.valueOf(expireTime));
Object result = jedis.eval(script, keys, args);
if ((boolean) result) {
System.out.println("获取锁成功!");
} else {
System.out.println("获取锁失败!");
}
在这个Java程序中,我们使用Jedis连接Redis服务器,并执行Lua脚本。在执行脚本时,我们需要传入三个参数:锁的键名、要获取锁的值以及锁的过期时间。如果脚本执行成功,则表示获取锁成功;否则表示获取锁失败。
当然,这个分布式锁还比较简陋,自己去实现也容易出错。推荐使用Redisson客服端来实现分布式锁。
# 十三、扩展模块
Redis 扩展模块可以为 Redis 提供额外的功能和数据结构。以下是一些常用且有用的 Redis 扩展模块:
Redisearch:一个全文搜索引擎模块,允许您在 Redis 中进行复杂的全文搜索,支持模糊匹配、数值过滤和地理位置查询。它还提供了索引维护和实时查询的功能。
官方网站:https://redis.io/docs/stack/search/ (opens new window)RedisGraph:一个基于图数据库的 Redis 模块,支持基于图形的数据查询和操作。它使用图结构来存储数据,并支持 Cypher 查询语言进行高效的图查询。
官方网站:https://redis.io/docs/stack/graph/ (opens new window)RedisBloom:一个提供概率数据结构的 Redis 模块,包括布隆过滤器、计数布隆过滤器、顶点草图和空间草图。这些数据结构可以用来实现内存高效的去重、统计和近似查询等功能。
官方网站:https://redis.io/docs/stack/bloom/ (opens new window)RedisJSON:一个提供原生 JSON 数据类型支持的 Redis 模块,允许在 Redis 中存储、更新和查询 JSON 数据。它支持 JSONPath 和 JSON API,使得操作 JSON 数据变得更加简便。
官方网站:https://redis.io/docs/stack/json/ (opens new window)
这些扩展模块为 Redis 提供了更多高级功能,可以帮助您更好地满足特定需求。请注意,这些模块需要单独安装并在 Redis 配置中启用。
# 十四、客户端使用
Redisson 是一个功能丰富的 Redis 客户端,提供了多种分布式数据结构和服务,如分布式锁、限流等。
以下是抛砖引玉的介绍,更多内容请到官网查看 (opens new window):
# 1、分布式锁
Redisson提供了一种分布式锁实现,它基于Redis的原子性和过期时间等特性来实现分布式锁。
# 1.1、获取锁
在Redisson中,可以通过以下方式获取锁:
RLock lock = redissonClient.getLock("myLock");
lock.lock();
try {
// 执行业务逻辑
} finally {
lock.unlock();
}
其中,redissonClient
是Redisson的客户端实例,myLock
是要获取的锁名字,lock()
方法是获取锁操作,unlock()
方法是释放锁操作。当lock()
方法执行成功后,说明当前线程获取到了分布式锁,可以执行业务逻辑;当业务逻辑执行完成后,需要释放锁,否则其他线程无法获取该锁。
# 1.2、指定过期时间
可以通过lock()
方法的重载形式指定锁的过期时间,如下:
RLock lock = redissonClient.getLock("myLock");
lock.lock(10, TimeUnit.SECONDS);
try {
// 执行业务逻辑
} finally {
lock.unlock();
}
其中,第一个参数是过期时间,第二个参数是时间单位,表示在指定时间内没有释放锁,锁会自动失效。这种方式可以防止锁被永久占用,提高系统的可用性。
# 1.3、尝试获取锁
在某些情况下,需要在不阻塞的情况下尝试获取锁。可以使用tryLock()
方法,如下:
RLock lock = redissonClient.getLock("myLock");
if (lock.tryLock()) {
try {
// 执行业务逻辑
} finally {
lock.unlock();
}
} else {
// 获取锁失败,可以进行其他操作
}
其中,tryLock()
方法会立即返回,如果获取锁成功,则可以执行业务逻辑;如果获取锁失败,则可以进行其他操作,如等待一段时间后再尝试获取锁。
# 1.4、公平锁和非公平锁
Redisson支持公平锁和非公平锁两种锁类型,默认情况下是非公平锁。可以通过以下方式指定锁类型:
// 非公平锁
RLock lock1 = redissonClient.getLock("myLock");
// 公平锁
RLock lock2 = redissonClient.getFairLock("myLock");
其中,getFairLock()
方法返回一个公平锁实例。
# a、补充:公平锁与不公平锁
公平锁和不公平锁是相对于锁的获取机制而言的。
在一个非公平锁的实现中,线程在尝试获取锁时,会直接去争抢锁,而不管其他线程是否在等待锁。这意味着,某些线程可能会一直被饿死,无法获取到锁。这种实现方式的优点是速度快,缺点是可能会导致线程饥饿。
相对地,公平锁会保证锁的获取顺序与线程的等待顺序一致。也就是说,等待时间最久的线程会最先获取到锁。这种实现方式的优点是公平,缺点是速度较慢,因为线程需要排队等待锁的释放。
需要注意的是,公平锁只能保证锁的获取顺序是公平的,但不能保证执行顺序也是公平的。也就是说,等待时间最久的线程获取到锁后,可能会因为执行逻辑的复杂度等因素而导致执行时间较长,而其他线程则需要等待较长的时间才能获取锁。
# 1.5、多重锁
在一些情况下,需要获取多个锁。可以使用RedissonMultiLock
类来实现,如下:
RLock lock1 = redissonClient.getLock("lock1");
RLock lock2 = redissonClient.getLock("lock2");
RLock lock3 = redissonClient.getLock("lock3");
RedissonMultiLock multiLock = new RedissonMultiLock(lock1, lock2, lock3);
multiLock.lock();
try {
// 执行业务逻辑
} finally {
multiLock.unlock();
}
其中,RedissonMultiLock
类是一个用于同时获取多个锁的工具类,使用方式与单个锁类似,只需要将多个锁传入构造函数即可。在使用完毕后,需要调用unlock()
方法释放所有锁。
# 2、限流
Redisson提供了基于Redis的分布式限流功能,可以实现对请求的流量和访问次数进行限制。
# 2.1、限制流量
Redisson提供了基于漏桶算法的流量限制功能,可以限制单位时间内的请求次数和流量大小。可以通过以下方式实现:
// 每秒钟最多处理10个请求
RPermitExpirableSemaphore semaphore = redissonClient.getPermitExpirableSemaphore("mySemaphore");
semaphore.trySetPermits(10);
semaphore.acquire(1, TimeUnit.SECONDS);
try {
// 执行业务逻辑
} finally {
semaphore.release();
}
其中,mySemaphore
是信号量的名称,trySetPermits()
方法设置了最大处理请求数,acquire()
方法获取信号量,release()
方法释放信号量。如果在1秒钟内超过10个请求,则后续请求会被阻塞,直到当前请求处理完成并释放了信号量。
# 2.2、限制访问次数
Redisson提供了基于计数器的访问次数限制功能,可以限制某个URL或API的访问次数。可以通过以下方式实现:
// 每个用户最多访问10次
RMapCache<String, Integer> mapCache = redissonClient.getMapCache("myMapCache");
String userId = "123456";
int count = mapCache.getOrDefault(userId, 0);
if (count < 10) {
mapCache.put(userId, count + 1, 1, TimeUnit.MINUTES);
// 执行业务逻辑
} else {
// 返回访问次数过多的错误信息
}
其中,myMapCache
是分布式缓存的名称,getOrDefault()
方法获取指定键的值,put()
方法存储键值对,并设置过期时间。如果一个用户在1分钟内访问超过10次,则返回访问次数过多的错误信息。
需要注意的是,以上示例仅仅是演示Redisson提供的限流功能,并不能完全防止恶意攻击。如果需要更严格的限流控制,可以考虑结合其他技术手段,如CDN、WAF、防火墙等,以确保系统的安全性和可靠性。
# 十五、Redis Cluster(集群)
Redis Cluster 是 Redis 的分布式解决方案,提供了自动分片、高可用性和故障转移功能。
# 1、集群架构
Redis Cluster 采用无中心架构,使用哈希槽(hash slot)来分配数据:
- 整个集群有 16384 个哈希槽
- 每个节点负责一部分哈希槽
- 数据根据 key 的 CRC16 哈希值对 16384 取模来决定存储位置
# 2、集群配置
在 redis.conf
中启用集群模式:
# 启用集群模式
cluster-enabled yes
# 集群配置文件
cluster-config-file nodes-6379.conf
# 集群节点超时时间
cluster-node-timeout 15000
# 集群副本数量
cluster-replica-validity-factor 10
# 集群迁移屏障
cluster-migration-barrier 1
# 集群要求槽全覆盖
cluster-require-full-coverage yes
# 3、创建集群
使用 redis-cli 创建集群:
# 创建集群(3主3从)
redis-cli --cluster create \
127.0.0.1:7000 127.0.0.1:7001 127.0.0.1:7002 \
127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005 \
--cluster-replicas 1
# 检查集群状态
redis-cli --cluster check 127.0.0.1:7000
# 获取集群信息
redis-cli -c -p 7000 cluster info
# 获取集群节点信息
redis-cli -c -p 7000 cluster nodes
# 4、集群操作命令
cluster meet
: 将节点加入集群。示例:cluster meet 127.0.0.1 7001
。cluster forget
: 从集群中移除节点。示例:cluster forget <node-id>
。cluster replicate
: 将当前节点设置为指定节点的从节点。示例:cluster replicate <node-id>
。cluster saveconfig
: 保存集群配置。示例:cluster saveconfig
。cluster addslots
: 为节点分配槽。示例:cluster addslots 0 1 2
。cluster delslots
: 移除节点的槽。示例:cluster delslots 0 1 2
。cluster flushslots
: 清空节点的所有槽。示例:cluster flushslots
。cluster setslot
: 设置槽的状态。示例:cluster setslot 0 node <node-id>
。cluster keyslot
: 计算 key 属于哪个槽。示例:cluster keyslot mykey
。cluster countkeysinslot
: 返回槽中的 key 数量。示例:cluster countkeysinslot 0
。cluster getkeysinslot
: 返回槽中的 key。示例:cluster getkeysinslot 0 10
。
# 5、集群客户端使用
Java 使用 Jedis 连接集群:
Set<HostAndPort> jedisClusterNodes = new HashSet<>();
jedisClusterNodes.add(new HostAndPort("127.0.0.1", 7000));
jedisClusterNodes.add(new HostAndPort("127.0.0.1", 7001));
jedisClusterNodes.add(new HostAndPort("127.0.0.1", 7002));
JedisCluster jedisCluster = new JedisCluster(jedisClusterNodes);
jedisCluster.set("key", "value");
String value = jedisCluster.get("key");
# 十六、Redis Sentinel(哨兵)
Redis Sentinel 是 Redis 的高可用性解决方案,提供监控、通知和自动故障转移功能。
# 1、Sentinel 功能
- 监控(Monitoring):持续检查主节点和从节点是否正常工作
- 通知(Notification):当被监控的 Redis 实例出现问题时,通过 API 向管理员或其他应用程序发送通知
- 自动故障转移(Automatic failover):当主节点不可用时,自动将一个从节点升级为主节点
- 配置提供者(Configuration provider):为客户端提供服务发现功能
# 2、Sentinel 配置
创建 sentinel.conf
配置文件:
# 端口
port 26379
# 守护进程模式
daemonize yes
# 日志文件
logfile /var/log/redis-sentinel.log
# 监控主节点
sentinel monitor mymaster 127.0.0.1 6379 2
# 主节点密码
sentinel auth-pass mymaster yourpassword
# 主节点失效时间
sentinel down-after-milliseconds mymaster 30000
# 故障转移超时
sentinel failover-timeout mymaster 180000
# 并行同步数量
sentinel parallel-syncs mymaster 1
# 3、启动 Sentinel
# 启动 Sentinel
redis-sentinel /path/to/sentinel.conf
# 或使用 redis-server 启动
redis-server /path/to/sentinel.conf --sentinel
# 4、Sentinel 命令
sentinel masters
: 显示所有被监控的主节点信息。sentinel master <master-name>
: 显示指定主节点的信息。sentinel slaves <master-name>
: 显示指定主节点的所有从节点信息。sentinel sentinels <master-name>
: 显示指定主节点的其他 Sentinel 信息。sentinel get-master-addr-by-name <master-name>
: 返回主节点的地址。sentinel reset <pattern>
: 重置匹配模式的所有主节点。sentinel failover <master-name>
: 强制进行故障转移。sentinel ckquorum <master-name>
: 检查当前 Sentinel 配置的仲裁数量。sentinel flushconfig
: 强制 Sentinel 重写配置文件。
# 5、客户端连接 Sentinel
Java 使用 Jedis 通过 Sentinel 连接:
Set<String> sentinels = new HashSet<>();
sentinels.add("127.0.0.1:26379");
sentinels.add("127.0.0.1:26380");
sentinels.add("127.0.0.1:26381");
JedisPoolConfig poolConfig = new JedisPoolConfig();
JedisSentinelPool sentinelPool = new JedisSentinelPool(
"mymaster", sentinels, poolConfig, "yourpassword"
);
try (Jedis jedis = sentinelPool.getResource()) {
jedis.set("key", "value");
String value = jedis.get("key");
}
# 十七、作为缓存的实践
在实际应用中,Redis 常被用作缓存层,用于存储热点数据以提高系统性能。以下是一些缓存实践的规范和异常处理:
# 1、缓存规范
请参考:代码质量管理之规范篇
# 2、数据预热
数据预热是指在系统启动或低峰期,提前将热点数据加载到缓存中,以提高缓存命中率。可以使用定时任务或者异步线程池等方式,定期扫描数据库中的热点数据,并将其加载到缓存中。
在实际应用中,还可以根据用户的行为和偏好,动态调整热点数据的预热策略,以更好地满足用户的需求。
# 3、缓存异常处理
# 3.1、缓存穿透
缓存穿透是指查询不存在的数据,导致请求直接到达数据库层。
解决方案:
- 布隆过滤器
// 使用 Redisson 的布隆过滤器
RBloomFilter<String> bloomFilter = redisson.getBloomFilter("userIdFilter");
// 初始化布隆过滤器,预计元素数量1000万,误差率0.01
bloomFilter.tryInit(10000000L, 0.01);
// 查询时先判断布隆过滤器
public User getUserById(String userId) {
// 布隆过滤器中不存在,直接返回
if (!bloomFilter.contains(userId)) {
return null;
}
// 查询缓存
User user = cache.get(userId);
if (user == null) {
// 查询数据库
user = database.get(userId);
if (user != null) {
cache.set(userId, user, 3600);
}
}
return user;
}
- 缓存空值
public User getUserById(String userId) {
String cacheKey = "user:" + userId;
String cachedValue = redis.get(cacheKey);
if (cachedValue != null) {
// 如果是空值标记,返回null
if ("NULL".equals(cachedValue)) {
return null;
}
return JSON.parseObject(cachedValue, User.class);
}
User user = database.get(userId);
if (user == null) {
// 缓存空值,设置较短过期时间
redis.setex(cacheKey, 300, "NULL");
} else {
redis.setex(cacheKey, 3600, JSON.toJSONString(user));
}
return user;
}
# 3.2、缓存击穿
缓存击穿是指某个热点 key 过期后,大量请求同时到达数据库层。
解决方案:
- 分布式锁
public String getHotData(String key) {
String value = redis.get(key);
if (value == null) {
// 获取分布式锁
String lockKey = "lock:" + key;
String lockValue = UUID.randomUUID().toString();
try {
// 尝试获取锁,超时时间10秒
if (redis.set(lockKey, lockValue, "NX", "EX", 10)) {
// 双重检查
value = redis.get(key);
if (value == null) {
value = database.get(key);
redis.setex(key, 3600, value);
}
} else {
// 获取锁失败,等待并重试
Thread.sleep(100);
return getHotData(key);
}
} finally {
// 释放锁(使用Lua脚本保证原子性)
String script = "if redis.call('get',KEYS[1]) == ARGV[1] then " +
"return redis.call('del',KEYS[1]) else return 0 end";
redis.eval(script, Arrays.asList(lockKey), Arrays.asList(lockValue));
}
}
return value;
}
- 提前更新(缓存预热)
@Scheduled(fixedRate = 60000) // 每分钟执行一次
public void refreshHotCache() {
List<String> hotKeys = getHotKeys(); // 获取热点key列表
for (String key : hotKeys) {
// 检查缓存剩余时间
Long ttl = redis.ttl(key);
// 如果剩余时间小于5分钟,提前更新
if (ttl != null && ttl < 300) {
String value = database.get(key);
redis.setex(key, 3600, value);
}
}
}
# 3.3、缓存雪崩
缓存雪崩是指大量的 key 同时过期,导致大量请求直接到达数据库层。
解决方案:
- 随机过期时间
public void setCache(String key, String value) {
// 基础过期时间1小时
int baseExpire = 3600;
// 添加随机时间,避免同时过期
Random random = new Random();
int randomExpire = random.nextInt(600); // 0-10分钟的随机值
redis.setex(key, baseExpire + randomExpire, value);
}
- 多级缓存架构
public class MultiLevelCache {
private final Cache localCache; // 本地缓存(如Caffeine)
private final RedisTemplate<String, Object> redisCache; // Redis缓存
public Object get(String key) {
// 一级缓存:本地缓存
Object value = localCache.getIfPresent(key);
if (value != null) {
return value;
}
// 二级缓存:Redis
value = redisCache.opsForValue().get(key);
if (value != null) {
// 回填本地缓存
localCache.put(key, value);
return value;
}
// 数据库查询
value = database.get(key);
if (value != null) {
// 更新多级缓存
localCache.put(key, value);
redisCache.opsForValue().set(key, value, 1, TimeUnit.HOURS);
}
return value;
}
}
- 熔断降级
@Component
public class CacheService {
private final CircuitBreaker circuitBreaker;
public String getData(String key) {
return circuitBreaker.executeSupplier(
() -> {
// 正常查询缓存和数据库
String value = redis.get(key);
if (value == null) {
value = database.get(key);
redis.setex(key, 3600, value);
}
return value;
},
() -> {
// 降级处理:返回默认值或缓存的旧数据
return getDefaultValue(key);
}
);
}
}
# 十八、性能优化与监控
# 1、性能优化
# 1.1、内存优化
配置优化:
# 最大内存设置
maxmemory 4gb
# 内存淘汰策略
maxmemory-policy allkeys-lru
# volatile-lru: 从设置了过期时间的key中淘汰最少使用的
# allkeys-lru: 从所有key中淘汰最少使用的
# volatile-lfu: 从设置了过期时间的key中淘汰最不频繁使用的
# allkeys-lfu: 从所有key中淘汰最不频繁使用的
# volatile-random: 从设置了过期时间的key中随机淘汰
# allkeys-random: 从所有key中随机淘汰
# volatile-ttl: 淘汰即将过期的key
# noeviction: 不淘汰,返回错误
# 压缩列表配置
list-max-ziplist-size -2
list-compress-depth 0
# 哈希表压缩
hash-max-ziplist-entries 512
hash-max-ziplist-value 64
# 集合压缩
set-max-intset-entries 512
# 有序集合压缩
zset-max-ziplist-entries 128
zset-max-ziplist-value 64
数据结构选择:
// 使用 Hash 代替多个 String 存储对象
// 不推荐:每个字段一个key
redis.set("user:1:name", "Alice");
redis.set("user:1:age", "30");
redis.set("user:1:email", "alice@example.com");
// 推荐:使用Hash存储
redis.hset("user:1", "name", "Alice");
redis.hset("user:1", "age", "30");
redis.hset("user:1", "email", "alice@example.com");
# 1.2、网络优化
使用 Pipeline 批量操作:
// 不推荐:多次网络往返
for (int i = 0; i < 1000; i++) {
jedis.set("key" + i, "value" + i);
}
// 推荐:使用Pipeline
Pipeline pipeline = jedis.pipelined();
for (int i = 0; i < 1000; i++) {
pipeline.set("key" + i, "value" + i);
}
pipeline.sync();
连接池配置:
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(100); // 最大连接数
poolConfig.setMaxIdle(50); // 最大空闲连接数
poolConfig.setMinIdle(10); // 最小空闲连接数
poolConfig.setMaxWaitMillis(3000); // 获取连接最大等待时间
poolConfig.setTestOnBorrow(true); // 获取连接时检测有效性
poolConfig.setTestOnReturn(true); // 归还连接时检测有效性
poolConfig.setTestWhileIdle(true); // 空闲时检测有效性
JedisPool jedisPool = new JedisPool(poolConfig, "localhost", 6379);
# 1.3、慢查询优化
慢查询配置:
# 慢查询时间阈值(微秒)
slowlog-log-slower-than 10000
# 慢查询日志长度
slowlog-max-len 128
查看慢查询:
# 获取慢查询日志
slowlog get 10
# 获取慢查询日志长度
slowlog len
# 清空慢查询日志
slowlog reset
# 2、监控指标
# 2.1、关键监控指标
内存指标:
# 查看内存使用情况
info memory
# 重要指标:
# used_memory: 已使用内存
# used_memory_rss: 操作系统分配的内存
# mem_fragmentation_ratio: 内存碎片率
# evicted_keys: 被淘汰的key数量
性能指标:
# 查看性能统计
info stats
# 重要指标:
# instantaneous_ops_per_sec: 每秒操作数
# total_connections_received: 总连接数
# rejected_connections: 拒绝的连接数
# keyspace_hits: 命中次数
# keyspace_misses: 未命中次数
持久化指标:
# 查看持久化信息
info persistence
# 重要指标:
# rdb_last_save_time: 最后一次RDB保存时间
# rdb_changes_since_last_save: 自上次RDB后的修改数
# aof_current_size: AOF文件大小
# aof_base_size: AOF重写后的大小
# 2.2、监控工具
Redis 自带命令:
# 实时监控命令执行
monitor
# 查看客户端连接
client list
# 查看命令统计
info commandstats
# 查看CPU使用
info cpu
使用 redis-cli 监控:
# 连续模式监控
redis-cli --stat
# 延迟监控
redis-cli --latency
# 延迟历史
redis-cli --latency-history
# 大key扫描
redis-cli --bigkeys
# 3、性能测试
使用 redis-benchmark:
# 基本性能测试
redis-benchmark -h localhost -p 6379 -c 50 -n 100000
# 测试特定命令
redis-benchmark -t set,get -n 100000
# 使用Pipeline
redis-benchmark -P 16 -n 100000
# 测试不同数据大小
redis-benchmark -d 100 -n 100000
压力测试脚本:
@Test
public void performanceTest() {
int threads = 10;
int operations = 10000;
ExecutorService executor = Executors.newFixedThreadPool(threads);
CountDownLatch latch = new CountDownLatch(threads);
long startTime = System.currentTimeMillis();
for (int i = 0; i < threads; i++) {
executor.submit(() -> {
try (Jedis jedis = jedisPool.getResource()) {
for (int j = 0; j < operations; j++) {
String key = "test:" + Thread.currentThread().getId() + ":" + j;
jedis.setex(key, 60, "value" + j);
jedis.get(key);
}
} finally {
latch.countDown();
}
});
}
latch.await();
long endTime = System.currentTimeMillis();
long totalOps = threads * operations * 2; // set + get
double qps = totalOps * 1000.0 / (endTime - startTime);
System.out.println("QPS: " + qps);
}
# 4、最佳实践
# 4.1、Key 设计规范
// 推荐的key命名规范
// 业务名:对象名:ID:属性
String key = "shop:user:1001:info";
String key2 = "shop:order:2001:items";
// 避免过长的key
// 不推荐
String badKey = "this_is_a_very_long_key_name_that_wastes_memory";
// 推荐
String goodKey = "u:1001:n"; // user:1001:name的缩写
# 4.2、避免大Key
// 检测大key
public void findBigKeys() {
try (Jedis jedis = jedisPool.getResource()) {
Set<String> keys = jedis.keys("*");
for (String key : keys) {
Long size = jedis.strlen(key); // String类型
// Long size = jedis.hlen(key); // Hash类型
// Long size = jedis.llen(key); // List类型
// Long size = jedis.scard(key); // Set类型
// Long size = jedis.zcard(key); // ZSet类型
if (size > 10000) { // 超过1万个元素
System.out.println("Big key found: " + key + ", size: " + size);
}
}
}
}
// 分批处理大key
public void handleBigList(String key) {
Long total = redis.llen(key);
int batchSize = 100;
for (int i = 0; i < total; i += batchSize) {
List<String> batch = redis.lrange(key, i, i + batchSize - 1);
// 处理批次数据
processBatch(batch);
}
}
# 4.3、合理使用过期时间
// 批量设置过期时间时添加随机值
public void batchSetWithExpire(Map<String, String> data) {
Random random = new Random();
Pipeline pipeline = jedis.pipelined();
for (Map.Entry<String, String> entry : data.entrySet()) {
// 基础过期时间1天,添加0-1小时的随机值
int expire = 86400 + random.nextInt(3600);
pipeline.setex(entry.getKey(), expire, entry.getValue());
}
pipeline.sync();
}
# 十九、总结
本文全面介绍了 Redis 的实践应用,包括:
- 基础数据类型:String、List、Set、Sorted Set、Hash 及其常用命令
- 高级数据类型:Bitmap、HyperLogLog、GeoHash、Stream 等
- 持久化机制:RDB、AOF 和混合持久化的配置与优化
- 高可用方案:Redis Cluster 和 Sentinel 的部署与使用
- 缓存实践:缓存穿透、击穿、雪崩的解决方案
- 性能优化:内存优化、网络优化、监控与调优
- 客户端使用:Redisson 分布式锁和限流的实现
通过合理使用 Redis 的各种特性,可以构建高性能、高可用的分布式系统。在实际应用中,需要根据业务场景选择合适的数据结构和部署方案,并持续监控和优化系统性能。
希望这篇文章能帮助您深入了解 Redis 并在实际应用中更好地发挥其优势。
祝你变得更强!