Redis 6有哪些新特性或优化?

2026-05-05 22:592阅读0评论SEO问题
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计20818个文字,预计阅读时间需要84分钟。

Redis 6有哪些新特性或优化?

Redis 是一种流行的 NoSQL 数据库,支持多种数据结构类型。设计理念包括单线程、多路IO复用技术。示例:内存中只有一个单线程,无需进行线程切换等操作,确保了 Redis 在内存中的处理效率。

Redis是典型的NoSQL数据库,支持多种数据结构类型。设计思想是:单线程+多路IO复用技术

上图的例子说明:内存中只有一个单线程,无需进行线程切换等操作,保证了redis在内存处理中的效率,同时使用多个socket链接复用,一旦需要哪个链接的数据准备就绪之后,就将这个线程与这个链接进行IO操作。总结一句话就是:通过一个线程,进行拨开关的方式,来同时传输多个IO流

redis官网:redis.io/download

Redis是一个开源的key-value存储系统。

Memcached类似,它支持存储的value类型相对更多,包括string、list、set、zset、sorted set、hash

这些数据类型都支持push/pop、add/remove及取交集并集和差集及更丰富的操作,而且这些操作都是原子性的。

在此基础上,Redis支持各种不同方式的排序。

memcached一样,为了保证效率,数据都是缓存在内存中。

区别的是Redis会周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文件。

并且在此基础上实现了master-slave(主从)同步。

单线程 +IO多路复用。

安装和启动#
  • windows下:

    www.cnblogs.com/xing-nb/p/12146449.html

    www.cnblogs.com/peterYong/p/9652336.html

  • linux下:

安装C语言的编译环境

yum install centos-release-scl scl-utils-build yum install -y devtoolset-8-toolchain scl enable devtoolset-8 bash

通过wget下载

wget download.redis.io/releases/redis-6.2.6.tar.gz // 下载路径:/opt

解压至当前目录

tar -zxvf redis-6.2.6.tar.gz

解压完成后进入目录

cd redis-6.2.6

在当前目录下执行make

make && make install

默认安装在/usr/local/bin

redis-benchmark:性能测试工具,可以在自己本子运行,看看自己本子性能如何 redis-check-aof:修复有问题的AOF文件,rdb和aof后面讲 redis-check-dump:修复有问题的dump.rdb文件 redis-sentinel:Redis集群使用 redis-server:Redis服务器启动命令 redis-cli:客户端,操作入口

前台启动:/usr/local/bin目录下启动redis

redis-server(前台启动)

后台启动:

  • 安装redis的目录/opt/redis-6.2.6中将redis.conf复制到任意一个文件夹下

    cp redis.conf /etc/redis.conf // 将redis.conf复制到/etc/下

  • 修改/etc/redis.conf配置文件

    vim redis.conf # daemonize no 修改为 daemonize yes

  • /usr/local/bin目录下启动redis

    redis-server /etc/redis.conf

关闭redis

  • kill进程:kill掉进程号
  • 命令shutdown:redis-cli shutdown

默认端口号:6379

NoSQL数据库#

​ web2.0时代,访问量太大,CPUI和内存有巨大压力,如果还是按照单个服务器进行响应的话,显然性能是不够的,于是引出分布式架构的服务,也就是多台服务器来响应请求,使用过NoSQL数据库进行解决。

NoSQL数据库的作用:

  • 解决CPU及内存压力

    进行分布式服务的时候,可能同一台电脑的请求是由不同的服务器进行响应的,那么如何存放session?也就是session的共享问题。

  • 解决IO压力

    当做高速缓存使用,缓解访问数据库的压力

NoSQL数据库的特点:

NoSQL( NoSQL = Not Only SQL ),意即 “不仅仅是SQL” ,泛指非关系型的数据库

NoSQL不依赖业务逻辑方式存储,而以简单的key-value模式存储。因此大大的增加了数据库的扩展能力。

  • 不遵循SQL标准。
  • 不支持ACID
  • 远超于SQL的性能。

适用于的场景

  • 对数据高并发的读写
  • 海量数据的读写
  • 对数据高可扩展性的。

不适用的场景

  • 需要事务支持;
  • 基于sql的结构化查询存储,处理复杂的关系,需要即席查询。
  • 用不着sql的情况以及用了sql也不行的情况下,考虑NoSQL

常见的NoSQL数据库

  • Redis

  • MongoDB

大数据时代常用的数据库类型

  • 行式数据库

  • 列式数据库

配置文件#

**Redis 的配置文件位于 Redis 安装目录下,文件名为 ** redis.conf(Windows 名为 redis.windows.conf)。

  • 端口号:6379

  • 默认16个数据库,类似数组下标从0开始,初始默认使用0号库

    • select 8:切换到8号库
    • 所有库拥有统一的密码
    • dbsize:查看当前数据库的key的数量
    • flushdb:清空当前库
    • flushall:通杀全部库
Units单位#

单位,配置大小单位,开头定义了一些基本的度量单位,只支持bytes,不支持bit

大小写不敏感。

INCLUDES包含#

包含,多实例的情况可以把公用的配置文件提取出来

NETWORK网络#

网络相关配置。

bind

默认情况bind=127.0.0.1只能接受本机的访问请求。

不写的情况下,无限制接受任何ip地址的访问。

生产环境肯定要写你应用服务器的地址,服务器是需要远程访问的,所以需要将其注释掉

如果开启了protected-mode,那么在没有设定bind ip且没有设密码的情况下,Redis只允许接受本机的响应。

protected-mode

将本机访问保护模式设置no

port

端口号,默认6379

tcp-backlog

设置tcpbacklogbacklog其实是一个连接队列,backlog队列总和 $=$ 未完成三次握手队列 $+$ 已经完成三次握手队列。

在高并发环境下你需要一个高backlog值来避免慢客户端连接问题。

timeout

一个空闲的客户端维持多少秒会关闭,0 表示关闭该功能。即永不关闭。

tcp-keepalive

对访问客户端的一种心跳检测,每个n秒检测一次。(默认是300s检测一次)

单位为秒,如果设置为 0,则不会进行Keepalive检测,建议设置成 60。

GENERAL#

通用。

daemonize

是否为后台进程,设置为yes

守护进程,后台启动。

pidfile

存放pid文件的位置,每个实例会产生一个不同的pid文件。

loglevel

指定日志记录级别,Redis总共支持四个级别:debug、verbose、notice、warning,默认为notice

logfile

日志文件名称。

database

设定库的数量 默认16,默认数据库为 0,可以使用SELECT <dbid>命令在连接上指定数据库id

SECURITY#

安全。

访问密码的查看、设置和取消。

在命令中设置密码,只是临时的。重启redis服务器,密码就还原了。

永久设置,需要在配置文件中进行设置。

LIMITS#

限制。

maxclients

设置redis同时可以与多少个客户端进行连接。

默认情况下为10000个客户端。

如果达到了此限制,redis则会拒绝新的连接请求,并且向这些连接请求方发出max number of clients reached以作回应。

maxmemory

建议必须设置,否则,将内存占满,造成服务器宕机。

设置redis可以使用的内存量。一旦到达内存使用上限,redis将会试图移除内部数据,移除规则可以通过maxmemory-policy来指定。

如果redis无法根据移除规则来移除内存中的数据,或者设置了不允许移除,那么redis则会针对那些需要申请内存的指令返回错误信息,比如SET、LPUSH等。

但是对于无内存申请的指令,仍然会正常响应,比如GET等。如果你的redis是主redis( 说明你的redis有从redis),那么在设置内存使用上限时,需要在系统中留出一些内存空间给同步队列缓存,只有在你设置的是“不移除”的情况下,才不用考虑这个因素。

maxmemory-policy

volatile-lru:使用LRU算法移除key,只对设置了过期时间的键(最近最少使用)。

allkeys-lru:在所有集合key中,使用LRU算法移除key

volatile-random:在过期集合中移除随机的key,只对设置了过期时间的键。

allkeys-random:在所有集合key中,移除随机的key

volatile-ttl:移除那些TTL值最小的key,即那些最近要过期的key

noeviction:不进行移除。针对写操作,只是返回错误信息。

maxmemory-samples

设置样本数量,LRU算法和最小TTL算法都并非是精确的算法,而是估算值,所以你可以设置样本的大小,redis默认会检查这么多个key并选择其中LRU的那个。

一般设置 3 到 7 的数字,数值越小样本越不准确,但性能消耗越小。

常用五大基本数据类型# 前言:Redis键(key操作)#

keys *:查看当前库所有key

set key value:添加一组 k-v

exists key:判断某个key是否存在

type key:查看你的key是什么类型

del key:删除指定的key数据(直接删除,而不是异步删除)

unlink key根据value选择非阻塞删除,仅将keyskeyspace元数据中删除,真正的删除会在后续异步操作

expire key 10:为给定的key设置过期时间

ttl key:查看还有多少秒过期,-1表示永不过期,-2表示已过期

select:命令切换数据库

dbsize:查看当前数据库的key的数量

flushdb:清空当前库

flushall:通杀全部库

字符串(String)#

String类型是最基本的类型,是二进制安全的。意味着Redisstring可以包含任何数据,比如jpg图片或者序列化的对象,只要数据能够存储为字符串类型,redis都能通过k-v的形式存储。

String类型是Redis最基本的数据类型,一个Redis中字符串value最多可以是 512M

命令操作:

set <key> <value>:添加键值对,也包含对一个key下的value进行覆盖

get <key>:查询对应键值

append <key> <value>:将给定的<value>追加到原值的末尾,返回添加后的字符串长度值

strlen <key>:获得值的长度

setnx <key> <value>:只有在key不存在时,才能设置key的值(注意与set命令的区别)

incr <key>:将key中储存的数字值增 1,只能对数字值操作,如果为空,新增值为 1(具有原子性

decr <key>:将key中储存的数字值减 1,只能对数字值操作,如果为空,新增值为 -1

incrby/decrby <key><步长>:将key中储存的数字值增减。自定义步长

mset <key1> <value1> <key2> <value2>:同时设置一个或多个key-value

mget <key1> <key2> <key3>...:同时获取一个或多个value

msetnx <key1> <value1> <key2> <value2>...:同时设置一个或多个key-value对,当且仅当所有给定key都不存在(原子性操作,有一个失败那么就全部失败)

getrange <key><起始位置><结束位置>:获得值的范围(将字符串理解成一个数组,按数组序号进行取子字符串的操作)

setrange <key><起始位置><value>:用<value>覆写<key>所储存的字符串值

setex <key><过期时间><value>:设置键值的同时,设置过期时间,单位秒。(可以通过ttl key的命令,查看过期时间)

getset <key><value>:以新换旧,设置了新值同时获得旧值。

原子性

所谓原子操作是指不会被线程调度机制打断的操作;

这种操作一旦开始,就一直运行到结束,中间不会有任何context switch(切换到另一个线程)。

  • 在单线程中, 能够在单条指令中完成的操作都可以认为是"原子操作",因为中断只能发生于指令之间。(而redis是单线程的,所以redis中的操作是不会被打断的

  • 在多线程中,不能被其它进程(线程)打断的操作就叫原子操作。

Redis单命令的原子性主要得益于Redis的单线程。

数据结构

内部结构实现上类似于JavaArrayList,采用预分配冗余空间的方式来减少内存的频繁分配,字符串大小小于1M时,扩容都是加倍现有的空间大小,如果空间大小一旦超过1M,每次扩容只会多扩容1M的空间,注意字符串最大长度是512M。

列表(List)#

单键多值

Redis列表是简单的字符串列表按照插入顺序排序。你可以添加一个元素到列表的头部(左边)或者尾部(右边)。

它的底层实际是个双向链表,对两端的操作性能很高,通过索引下标的操作中间的节点性能会较差。

lpush/rpush <key><value1><value2><value3> ....: 从左边/右边插入一个或多个值。

lpush k1 v1 v2 v3 (头插法) lrange k1 0 -1 输出:v3 v2 v1 rpush k1 v1 v2 v3 (尾插法) rrange k1 0 -1 输出:v1 v2 v3

lpop/rpop <key>:从左边/右边吐出一个值。值在键在,值光键亡。

rpoplpush <key1><key2>:从<key1>列表右边吐出一个值,插到<key2>列表左边。

lrange <key><start><stop>:按照索引下标获得元素(从左到右)

lrange <key> 0 -1:左边第一个,-1右边第一个,(0 -1表示获取所有)

lindex <key><index>:按照索引下标获得元素(从左到右,从0开始到index索引值)

llen <key>:获得列表长度

linsert <key> before/after <value> <newvalue>:在<value>的前面/后面插入<newvalue>插入值

lrem <key><n><value>:从左边删除nvalue(从左到右)

lset <key><index><value>:将列表key下标为index的值替换成value

数据结构

List的数据结构为快速链表quickList

  • 首先在列表元素较少的情况下会使用一块连续的内存存储,这个结构是ziplist,也即是压缩列表

    • 它将所有的元素紧挨着一起存储,分配的是一块连续的内存。
  • 数据量比较多的时候才会改成quicklist

    • 因为普通的链表需要的附加指针空间太大,会比较浪费空间。比如这个列表里存的只是int类型的数据,结构上还需要两个额外的指针prevnext

  • Redis链表ziplist结合起来组成了quicklist。也就是将多个ziplist使用双向指针串起来使用。这样既满足了快速的插入删除性能,又不会出现太大的空间冗余。quicklist结构图如下:

Set(集合)#

Set对外提供的功能List类似列表的功能。(一个key,对应一个set集合)

  • 特殊之处在于Set是可以自动排重

  • 当需要存储一个列表数据,又不希望出现重复数据时,Set是一个很好的选择,并且Set提供了判断某个成员是否在一个Set集合内的重要接口,这个也是List所不能提供的。

  • RedisSetString类型的无序集合。它底层其实是一个valuenullhash表,所以添加,删除,查找的复杂度都是O(1)

一个算法,随着数据的增加,执行时间的长短,如果是O(1),数据增加,查找数据的时间不变。

命令操作:

sadd <key><value1><value2> .....:将一个或多个member元素加入到集合key中,已经存在的member元素将被忽略

smembers <key>:取出该集合的所有值。

sismember <key><value>:判断集合<key>是否为含有该<value>值,有返回 1,没有返回 0

scard<key>:返回该集合的元素个数。

srem <key><value1><value2> ....:删除集合中的某个元素

spop <key>:随机从该集合中吐出一个值

srandmember <key><n>:随机从该集合中取出n个值,不会从集合中删除

smove <source><destination>value:把集合中一个值从一个集合移动到另一个集合

sinter <key1><key2>:返回两个集合的交集元素

sunion <key1><key2>:返回两个集合的并集元素

sdiff <key1><key2>:返回两个集合的差集元素(key1中的,不包含key2中的)

数据结构

Set数据结构是字典,字典是用哈希表实现的。

Hash(哈希)#

Redis hash是一个键值对集合。

Redis hash是一个String类型的fieldvalue的映射表,hash特别适合用于存储对象。( 可以理解为Java中的Map<String,Object> )

hset <key><field><value>:给<key>集合中的<field>键赋值<value>

  • hset user:1001 id 1,redis允许key带 :,可以理解为key值是 user:1001,这是为了存很多对象时,用来区分不同对象
  • hset user:1001 name zhanggsan,给 user:1001 这个key对应的对象中,添加一个name字段,值为zhangsan

hget <key1><field>:从<key1>集合<field>取出value

  • hget user:1001 id,打印结果为“1”
  • hget user:1001 name,打印结果为“zhangsan”

hmset <key1><field1><value1><field2><value2>...: 批量设置hash的值(新版本这项命令,已经可以用hset进行实现了)

hexists <key1><field>:查看哈希表key中,给定域field是否存在(1:存在,0:不存在)

hkeys <key>:列出该hash集合的所有field

  • hkeys user:1001,列出这个key下所有的字段

hvals <key>:列出该hash集合的所有value

hincrby <key> <field> <increment>:为哈希表key中的域field的值加上增量 1 -1

hsetnx <key> <field> <value>:将哈希表key中的域field的值设置为value,当且仅当域field不存在

数据结构

Hash类型对应的数据结构是两种:ziplist(压缩列表),hashtable(哈希表)。

field-value长度较短且个数较少时,使用ziplist,否则使用hashtable

Zset(有序集合)#

Redis有序集合zset与普通集合set非常相似,是一个没有重复元素的字符串有序集合

不同之处是有序集合的每个成员都关联了一个评分(score,这个评分(score)被用来按照从最低分到最高分的方式排序集合中的成员。集合的成员是唯一的,但是评分可以是重复的。

因为元素是有序的,所以可以很快的根据评分(score)或者次序(position)来获取一个范围的元素。

访问有序集合的中间元素也是非常快的,因此能够使用有序集合作为一个没有重复成员的智能列表。

操作命令:

zadd <key> <score1> <value1> <score2> <value2> …:将一个或多个member元素及其score值加入到有序集key当中

zrange <key> <start> <stop> [WITHSCORES]:返回有序集key中,下标在<start><stop>之间的元素( 0 -1 ,表示所有)

  • 当带上WITHSCORES时,可以让分数一起和值返回到结果集

zrangebyscore key min max [withscores] [limit offset count]:返回有序集key中,所有score值介于minmax之间(包括等于minmax)的成员。有序集成员score值递增(从小到大)次序排列

zrevrangebyscore key max min [withscores] [limit offset count]:同上,改为从大到小排列

zincrby <key> <increment> <value>:为元素的score加上增量

zrem <key> <value>:删除该集合下,指定值的元素

zcount <key> <min> <max>:统计该集合,分数区间内的元素个数

zrank <key> <value>:返回该值在集合中的排名,从 0 开始

数据结构

SortedSet(zset)Redis提供的一个非常特别的数据结构,一方面它等价于Java的数据结构Map<String, Double>,可以给每一个元素value赋予一个权重score,另一方面它又类似于TreeSet,内部的元素会按照权重score进行排序,可以得到每个元素的名次,还可以通过score的范围来获取元素的列表。

zset底层使用了两个数据结构

  • hashhash的作用就是关联元素value和权重score(通过key值,能够找到关联的value和score),保障元素value的唯一性,可以通过元素value找到相应的score

  • 跳跃表,跳跃表的目的在于给元素value排序,根据score的范围获取元素列表

Redis6 新数据类型# Bitmaps#

  • 将 Bitmaps 数据类型理解为一个数组,每个单位只存储0和1

实例:

  • getbit:获取Bitmaps中某个偏移量的值
  • bitcount[start end]:统计字符串被设置为1的bit数,start end可以指定范围,且可以使用负数值,例如:-1表示最后一个位,-2表示倒数第二个位置(从0开始....)
  • bitop and(or/not/xor)[key...]:复合操作,可以做多个Bitmaps的交集、并集等操作,并将结果保存在destkey中
    • 例如:bitop and users:1 users:2 users:3,将users:2与users:3的交集结果存放到key为users:1的值中

Bitmaps与set对比

HyperLogLog#

命令操作:

  1. pfadd[element...]:添加指定元素到HyperLogLog中,执行命令后,若基数发生变化则返回1,否则返回0

  2. pfcount[key...]:计算基数值

  3. pfmerge[其中,sourcekey可以为多个]:将多个HyperLogLog数据类型进行合并,例子比如将月活跃用户数与日活跃用户数进行合并,就可以使用pfcount进行统计基数

Geospatial#

Redis的发布与订阅#

Redis发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。

Redis客户端可以订阅任意数量的频道。

发布者可以建立许多个频道进行消息的发送(如上图频道1、频道2、频道3),供订阅者进行接收和监听消息。

  1. 客户端可以订阅频道

  1. 当给这个频道发布消息后,消息就会发送给订阅的客户端

发布订阅命令行实现

  1. 打开一个客户端订阅channel1
  • subscribe channel1
  1. 打开另一个客户端,给channel1发布消息hello
  • publish channel1 hello

  • 返回的数字表示:订阅者的数量、

  1. 打开第一个客户端可以看到发送的信息

注:发布的消息如果没有持久化,那么在订阅的客户端是接收不到消息的,只能收到订阅后发布的消息

事务和锁机制#

Redis事务是一个单独的隔离操作事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。

Redis事务的主要作用就是串联多个命令防止别的命令插队

MultiExecDiscard#

Multi

Exec

Discard

输入Multi命令开始,输入的命令都会依次进入命令队列中,但不会执行,直到输入Exec后,Redis会将之前的命令队列中的命令依次执行

组队的过程中可以通过Discard来放弃组队。

Redis的事务就是:先使用multi进行命令的添加(组队过程),组队完毕后,使用exec进行执行,这个执行过程不能被其他命令打断(相当于事务执行的过程),如果要中止,就使用discard命令(类似于回滚操作)。

  • multi开启组队,输入命令;组队成功,exec执行事务,执行完毕,事务结束

  • 放弃组队

  • 组队中有命令错误,不会执行

  • 组队中不报错,执行时报错

悲观锁#

悲观锁(Pessimistic Lock),即每次去拿数据的时候都认为有其他线程会修改,所以每次在拿数据的时候都会上锁,这样其他线程想要拿到这个数据就会被block直到释放锁后,成功拿到锁。(效率低,操作之前先上锁)

乐观锁#

乐观锁(Optimistic Lock),即每次去拿数据的时候都认为其他线程不会修改,所以不会上锁,但是在更新的时候会判断,在此期间有没有其他线程去更新这个数据,可以使用版本号控制等机制。

乐观锁适用于多读的应用类型,这样可以提高吞吐量

Redis就是利用这种check-and-set机制实现事务的。

Watch、unwatch#

在执行multi之前,先执行watch key1 [key.....],可以监视一个(或多个 )key。如果在事务执行之前,这个key被其他命令所改动,那么事务将被打断。(类似于上锁,一旦发现watch的这个key被修改了,那么自己的exec操作就会中断)

取消WATCH命令对所有key的监视:如果在执行WATCH命令之后,EXEC命令或DISCARD命令先被执行,那么就不需要再执行UNWATCH

事务三特性#
  • 单独的隔离操作

    事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。

  • 没有隔离级别的概念

    队列中的命令没有提交之前都不会实际被执行,因为事务提交前任何指令都不会被实际执行。

  • 不保证原子性

    事务中如果有一条命令执行失败,其后的命令仍然会被执行,没有回滚 。

模拟秒杀# 基本实现#

核心的逻辑代码:

public class SecKill_redis { public static void main(String[] args) { Jedis jedis =new Jedis("192.168.242.110",6379); System.out.println(jedis.ping()); jedis.close(); } //秒杀过程 public static boolean doSecKill(String uid,String prodid) throws IOException { //1 uid和prodid非空判断 if(uid == null || prodid == null){ return false; } //2 连接redis Jedis jedis =new Jedis("192.168.xx.xxx",6379); //3 拼接key // 3.1 库存key String kcKey = "sk:"+prodid+":qt"; // 3.2 秒杀成功用户key String userKey = "sk:"+prodid+":user"; //4 获取库存,如果库存null,秒杀还没有开始 String kc = jedis.get(kcKey); if(kc == null){ System.out.println("秒杀还没开始,请稍等"); jedis.close(); return false; } // 5 判断用户是否重复秒杀操作 if(jedis.sismember(userKey, uid)){ System.out.println("每个用户只能秒杀成功一次,请下次再来"); jedis.close(); return false; } //6 判断如果商品数量,库存数量小于1,秒杀结束 if(Integer.parseInt(kc) < 1){ System.out.println("秒杀结束,请下次参与"); jedis.close(); return false; } //7 秒杀过程 //7.1库存-1 jedis.decr(kcKey); //7.2 把秒杀成功的用户添加到清单里面 jedis.sadd(userKey,uid); System.out.println("用户" + uid + "秒杀成功"); jedis.close(); return true; } } 使用ab工具模拟并发以及暴露出的问题#

CentOS 6 默认安装

CentOS 7 手动安装(yum -y install 192.168.0.43:8080/Seckill/doseckill

-n:测试会话中所执行的请求个数

-c:一次产生的请求个数

  • 并发暴露出来的问题

    • 会出现超卖问题:卖完了商品,但还存在继续购买,即库存变为负数

      • 解决方案:使用乐观锁,进行版本控制(redis事务+watch)

        代码修改:

        //秒杀过程 public static boolean doSecKill(String uid,String prodid) throws IOException { //1 uid和prodid非空判断 if(uid == null || prodid == null){ return false; } //2 连接redis //Jedis jedis =new Jedis("192.168.xx.xxx",6379); //通过连接池获取连接redis的对象 JedisPool jedisPoolInstance = JedisPoolUtil.getJedisPoolInstance(); Jedis jedis = jedisPoolInstance.getResource(); //3 拼接key // 3.1 库存key String kcKey = "sk:"+prodid+":qt"; // 3.2 秒杀成功用户key String userKey = "sk:"+prodid+":user"; //监视库存 jedis.watch(kcKey); //4 获取库存,如果库存null,秒杀还没有开始 String kc = jedis.get(kcKey); if(kc == null){ System.out.println("秒杀还没开始,请稍等"); jedis.close(); return false; } // 5 判断用户是否重复秒杀操作 if(jedis.sismember(userKey, uid)){ System.out.println("每个用户只能秒杀成功一次,请下次再来"); jedis.close(); return false; } //6 判断如果商品数量,库存数量小于1,秒杀结束 if(Integer.parseInt(kc) < 1){ System.out.println("秒杀结束,请下次参与"); jedis.close(); return false; } //7 秒杀过程 //使用事务 Transaction multi = jedis.multi(); //组队操作 multi.decr(kcKey); multi.sadd(userKey,uid); //执行 List<Object> results = multi.exec(); if(results == null || results.size()==0) { System.out.println("秒杀失败了...."); jedis.close(); return false; } // //7.1库存-1 // jedis.decr(kcKey); // //7.2 把秒杀成功的用户添加到清单里面 // jedis.sadd(userKey,uid); System.out.println("用户" + uid + "秒杀成功"); jedis.close(); return true; }

    • 连接超时问题

      • 解决方案:采用连接池

      // 创建工具类 public class JedisPoolUtil { private static volatile JedisPool jedisPool = null; private JedisPoolUtil() { } public static JedisPool getJedisPoolInstance() { if (null == jedisPool) { synchronized (JedisPoolUtil.class) { if (null == jedisPool) { JedisPoolConfig poolConfig = new JedisPoolConfig(); poolConfig.setMaxTotal(200); poolConfig.setMaxIdle(32); poolConfig.setMaxWaitMillis(100*1000); poolConfig.setBlockWhenExhausted(true); poolConfig.setTestOnBorrow(true); // ping PONG jedisPool = new JedisPool(poolConfig, "192.168.xx.xxx", 6379, 60000 ); } } } return jedisPool; } public static void release(JedisPool jedisPool, Jedis jedis) { if (null != jedis) { jedisPool.returnResource(jedis); } } }

      修改代码,主要是针对前面基本实现中的核心代码,对获取redis对象进行修改:

      //2 连接redis //Jedis jedis =new Jedis("192.168.xx.xxx",6379); //通过连接池获取连接redis的对象 JedisPool jedisPoolInstance = JedisPoolUtil.getJedisPoolInstance(); Jedis jedis = jedisPoolInstance.getResource();

    • 商品遗留问题,即秒杀已经结束了,却还有商品库存

      • 解决方案:使用Lua脚本

      Lua 是一个小巧的脚本语言,Lua脚本可以很容易的被C/C++ 代码调用,也可以反过来调用C/C++的函数,Lua并没有提供强大的库,一个完整的Lua解释器不过200k,所以Lua不适合作为开发独立应用程序的语言,而是作为嵌入式脚本语言。很多应用程序、游戏使用LUA作为自己的嵌入式脚本语言,以此来实现可配置性、可扩展性。

      将复杂的或者多步的redis操作,写为一个脚本,一次提交给redis执行,减少反复连接redis的次数。提升性能。

      Lua脚本是类似redis事务,有一定的原子性,不会被其他命令插队,可以完成一些redis事务性的操作。

      但是注意redis的lua脚本功能,只有在Redis 2.6以上的版本才可以使用。

      利用Lua脚本淘汰用户,解决超卖问题

      redis 2.6版本以后,通过lua脚本解决争抢问题,实际上是redis 利用其单线程的特性,用任务队列的方式解决多任务并发问题。

  • Redis中两种持久化机制#

    两种持久化机制,RDB和AOF,简单来说就是存数据使用的哪种机制,默认是使用RDB(Redis DataBase)。

    RDB#

    Redis DataBase

    在指定的时间间隔内将内存中的 数据集快照 写入磁盘,即Snapshot快照,恢复时是将快照文件直接读到内存里。

    周期性地进行持久化的操作

    Redis会单独创建一个子进程(fork)来进行持久化。

    底层执行过程:先将数据写入到一个临时文件中,待持久化过程完成后(同步过程完成),再将这个临时文件内容覆盖到dump.rdb(持久化文件)

    • 整个过程中,主进程是不进行任何IO操作的,这就确保了极高的性能。
    • 为什么要先同步到文件临时区域而不直接同步到rdb中?若同步过程中发生异常情况中断,不会导致数据库中的数据发生损坏,待同步过程完成后,用临时文件替代这个持久化的文件,保证了数据的完整性和一致性。
    • 如果需要进行大规模数据的恢复,且对于数据恢复的完整性不是非常敏感,那RDB方式要比AOF方式更加的高效。

    RDB的缺点是最后一次持久化后的数据可能丢失

    Fork#
    • Redis进程执行fork操作创建子进程,RDB持久化过程由子进程负责,完成后自动结束。阻塞只发生在fork阶段,一般时间很短。

    • 作用是复制一个与当前进程一样的进程。新进程的所有数据(变量、环境变量、程序计数器等) 数值都和原进程一致,但是是一个全新的进程,并作为原进程的子进程

    • Linux程序中,fork()会产生一个和父进程完全相同的子进程,但子进程在此后多会exec系统调用,出于效率考虑,Linux中引入了写时复制技术

    • 一般情况父进程和子进程会共用同一段物理内存,只有进程空间的各段的内容要发生变化时,才会将父进程的内容复制一份给子进程

    配置#

    dump文件名字

    redis.conf中配置文件名称,默认为dump.rdb

    dump保存位置

    rdb文件的保存路径可以修改。默认为Redis启动时命令行所在的目录下。

    stop-writes-on-bgsave-error

    即当redis无法写入磁盘,关闭redis的写入操作。推荐yes

    rdbcompression

    持久化的文件是否进行压缩存储。

    rdbchecksum

    完整性的检查,即数据是否完整性、准确性。

    save

    表示写操作的次数。

    格式:save 秒 写操作次数

    优点#
    • 适合大规模的数据恢复
    • 对数据完整性和一致性要求不高更适合使用
    • 节省磁盘空间;
    • 恢复速度快。
    • redis主进程会fork()一个子进程来处理所有保存工作,主进程不需要进行任何磁盘IO操作
    缺点#
    • Fork的时候,内存中的数据被克隆了一份,大致 2 倍的膨胀性需要考虑
    • 虽然Redisfork时使用了写时拷贝技术,但是如果数据庞大时还是比较消耗性能;
    • 在备份周期在一定间隔时间做一次备份,所以如果还没到达指定备份时间间隔的时候,Redis意外down掉的话,就会丢失最后一次快照后的所有修改,数据发生了丢失。
    AOF#

    Append Only File

    以日志的形式来记录每个写操作(增量保存),将Redis执行过的所有写指令记录下来(读操作不记录)只许追加文件但不可以改写文件(可能是使用的redo和undo日志恢复?)。Redis启动之初会读取该文件重新构建数据,换言之,如果Redis重启就会根据日志文件的内容将写指令从前到后执行一次,以完成数据的恢复工作。

    一种使用追加方式记录数据的方法

    执行流程

    • 客户端的请求写命令会被append追加到AOF缓冲区内;

    • AOF缓冲区根据AOF持久化策略[always,everysec,no]将操作sync同步到磁盘的AOF文件中;

    • AOF文件大小超过重写策略或手动重写时,会对AOF文件Rewrite重写,压缩AOF文件容量;

    • Redis服务重启时,会重新load加载AOF文件中的写操作达到数据恢复的目的。

    AOFRDB同时开启时,系统默认读取AOF的数据(数据不会存在丢失)

    配置#

    redis.conf中配置开启aof,默认生成的aof配置文件为appendonly.aof,与rdb文件路径一致(启动路径)。

    AOF默认不开启(RDB默认开启)

    • 若AOF和RDB同时开启,系统默认读取AOF的数据(数据不会存在丢失)。

    文件名字

    AOF同步频率设置

    appendfsync always

    ​ 始终同步,每次Redis的写入都会立刻记入日志;

    ​ 性能较差但数据完整性比较好。

    appendfsync everysec

    ​ 每秒同步,每秒记入日志一次,如果宕机,本秒的数据可能丢失。

    appendfsync no

    Redis不主动进行同步,把同步时机交给操作系统。

    Rewrite压缩

    AOF文件的大小超过所设定的阈值时,Redis就会启动AOF文件的内容压缩,只保留可以恢复数据的最小指令集。可以使用命令bgrewriteaof

    优点#
    • 备份机制更稳健,丢失数据概率更低;
    • 可读的日志文本,通过操作AOF稳健,可以处理误操作。
    缺点#
    • 比起RDB占用更多的磁盘空间(不仅记录数据还要记录操作);
    • 恢复备份速度要慢;
    • 每次读写都同步的话,有一定的性能压力;
    • 存在个别Bug,造成不能恢复。
    总结#

    官方推荐两个都启用。

    如果对数据不敏感(允许数据有部分丢失),可以选单独用RDB

    不建议单独用AOF,因为可能会出现Bug

    如果只是做纯内存缓存,可以都不用。

    主从复制# 基本介绍#

    ​ 主机数据更新后根据配置和策略, 自动同步到备机的master/slaver机制,Master以写为主,Slaver以读为主,即主服务器承担写操作,复制的若干 从服务器 则承担读操作

    特点:

    1. 读写分离,性能扩展

    2. 容灾快速恢复

      • 某个从服务器发生故障,那么会快速切换到另一个从服务器中,不影响读操作的进行
    3. 一主多从

      • 只有一台主服务器,供其他从服务器进行复制
    搭建一主两从#
    1. 创建文件目录

    /opt/etc

    1. redis.conf复制到当前目录

    cp /etc/redis.conf /opt/etc/

    1. 创建 3 个redis.conf配置文件

    redis6379.conf redis6380.conf redis6381.conf

    # redis6379.conf include /opt/etc/redis.conf pidfile /var/run/redis_6379.pid port 6379 dbfilename dump6379.rdb # redis6380.conf include /opt/etc/redis.conf pidfile /var/run/redis_6380.pid port 6380 dbfilename dump6380.rdb # redis6381.conf include /opt/etc/redis.conf pidfile /var/run/redis_6381.pid port 6381 dbfilename dump6381.rdb

    1. 启动 3 台redis服务器

    1. 查看主机运行情况

    info replication

    1. 配从不配主

      在从机中进行设置,成为谁的从机

    slaveof <ip> <port> # 成为某个实例的从服务器

    1. 再次查看主机运行情况

    成功搭建。

    一主二从#

    特点:

    主机6379,从机63806381

    1. 假设从机6380挂掉。(从机挂掉)

      • 当6380重启后,6380不再是6379的从机,而是作为新的master;(从机重启后,不再是某个主机的从机,其自身就是一个主机)
      • 当再次把6380作为6379的从机加入后,从机才会把数据从头到尾复制。(从机重启后,需要再输入成为从机的指令)
    2. 假设主机6379挂掉。(主机挂掉)

      • 6380和6381仍然是6379的从机,不会做任何事;(从机不会改变)
      • 当6379重启后,既然是主服务器。(主机重启后,还是主机)
    主从复制原理#

    完整版:

    • slave启动成功连接到master后会发送一个sync命令(同步命令)。

    • master接到命令启动后台的存盘进程,对数据进行持久化操作,同时收集所有接收到的用于修改数据集命令,在后台进程执行完毕之后,master将传送整个数据文件(rdb)到slave,以完成一次完全同步。

    • 当主服务进行写操作后,和从服务器进行数据同步。

    • 全量复制:而slave服务在接收到数据库文件数据后,将其存盘并加载到内存中。

    • 增量复制master继续将新的所有收集到的修改命令依次传给slave,完成同步。

    • 只要是重新连接master,一次完全同步(全量复制)将被自动执行。

    全量复制:是从机主动去请求主机进行同步操作,是一开始连接的时候

    增量复制:主机进行一次写操作之后,就主动同步从机

    简洁版:

    薪火相传#

    ​ 上一个slave可以是下一个slavemaster从机是另一个从机的主机,并由这个担任主机的从机,进行数据同步),slave同样可以接收其他slave的连接和同步请求,那么该slave作为了链条中下一个的master,可以有效减轻master的写压力,去中心化降低风险。

    slaveof <ip> <port>

    • 特点与一主二从类似

    • 中途变更转向:会清除之前的数据,重新建立拷贝最新的。

    • 当某个担任主机的slave宕机,其挂在后面的slave都没法备份。

      • 即当主机挂掉,从机还是从机,但是无法继续写数据。
    反客为主#

    当一个master宕机后,后面的slave可以立刻升为master,其后面的slave不用做任何修改。(需要手动完成,如果不手动执行的话,那么从机没有任何动作,主机重新启动后,还依旧是主机)

    使用命令:将从机变为主机

    slaveof no one 哨兵模式# 基本介绍#

    反客为主的自动版,即能够后台监控主机是否故障,如果故障了根据投票数自动将从库转换为主库。

    1. 创建sentinel.conf文件

    /opt/etc/sentinel.conf

    1. 配置哨兵

    sentinel monitor mymaster 172.16.xx.xxx 6379 1 # mymaster:给监控对象起的服务器名称 # 1:至少有多少个哨兵同意迁移的数量

    1. 启动哨兵

    redis-sentinel /opt/etc/sentinel.conf

    主机挂掉,哨兵监控到之后,会按照选举规则,从 从机 中选举中产生新的主机,原来挂掉的主机会变成新主机的从机

    选举规则#

    选择条件依次为:

    • 根据优先级别,slave-priority/replica-priority优先选择优先级靠前的。(越小优先级越高)

    • 根据偏移量,优先选择偏移量大的。(偏移量是指获得原主机数据最全的)

    • 若前两个条件相同,那么选择runid最小的,优先选择最小的服务

      • 每个redis实例启动后,都会随机生成一个40位的runid
    复制延时#

    ​ 由于所有的写操作都是先在master上操作,然后同步更新到slave上,所以从master同步到slave从机有一定的延迟,当系统很繁忙的时候,延迟问题会更加严重,slave机器数量的增加也会使这个问题更加严重。

    集群# 基本介绍#

    ​ 容量不够,redis如何进行扩容?

    ​ 并发写操作,redis如何分摊?

    ​ 另外,主从模式、薪火相传模式,主机宕机,导致ip地址发生变化,应用程序中配置需要修改对应的主机地址、端口等信息。

    ​ 解决方法:

    • 代理主机(之前

    • 无中心化集群配置(redis3.0

      • 服务之间可以进行相互连通
      • 任何一个服务模块都可以作为集群的入口

    什么是集群:

    Redis集群实现了对Redis水平扩容,即启动NRedis节点,将整个数据库分布存储在这N个节点中,每个节点存储总数据的1/N

    Redis集群通过分区(partition)来提供一定程度的可用性(availability),即使集群中有一部分节点失效或者无法进行通讯, 集群也可以继续处理命令请求。

    搭建Redis集群#
    1. 创建配置文件

    # 以redis6379.conf为例 include /opt/etc/redis.conf pidfile /var/run/redis_6379.pid # 更改 port 6379 # 更改 dbfilename dump6379.rdb # 更改 cluster-enabled yes # 打开集群模式 cluster-config-file nodes-6379.conf # 设置节点配置文件名称,需要更改 cluster-node-timeout 15000 # 设置节点失联事件,超过该时间(ms),集群自动进行主从切换

    1. 启动

    1. 将 6 个节点合成一个集群

    # 组合之前请确保所有redis实例启动后,nodes-xxxx.conf文件都生成正常。

    # 进入redis安装目录 /opt/redis-6.2.6/src # 执行 redis-cli --cluster create --cluster-replicas 1 172.16.88.168:6379 172.16.88.168:6380 172.16.88.168:6381 172.16.88.168:6389 172.16.88.168:6390 172.16.88.168:6391

    1. 采用集群策略连接

    redis-cli -c -p PORT cluster nodes # 命令查看集群信息

    问题# redis cluster如何分配这六个节点?#

    一个集群至少要有三个主节点

    选项--cluster-replicas 1,表示希望为集群中的每个主节点创建一个从节点

    分配原则尽量保证每个主数据库运行在不同的IP地址每个从库和主库不在一个IP地址上。保证出故障的时候,能够有替换的,继续提供服务。

    什么是slots?#

    插槽

    一个Redis集群包含16384个插槽(hash slot), 数据库中的每个键都属于这16384个插槽的其中一个。

    集群使用公式CRC16(key) % 16384来计算键key属于哪个槽, 其中CRC16(key)语句用于计算键keyCRC16校验和 。

    集群中的每个节点负责处理一部分插槽,分担压力。 例如, 如果一个集群可以有主节点, 其中:

    • 节点A负责处理0号至5460号插槽。
    • 节点B负责处理5461号至10922号插槽。
    • 节点C负责处理10923号至16383号插槽。
    如何在集群中录入值?#

    redis-cli每次录入、查询键值,redis都会计算出该key应该送往的插槽,如果不是该客户端对应服务器的插槽,redis会报错,并告知应前往的redis实例地址和端口。

    redis-cli客户端提供了–c参数实现自动重定向。

    例如redis-cli -c –p 6379登入后,再录入、查询键值对可以自动重定向。

    如何查询集群中的值?#

    每个主机只能查询自己范围内部的插槽。

    cluster keyslot <key>:查询某个key的 **slot**。

    cluster countkeysinslot <slot>:查询某个slot是否有值。

    CLUSTER GETKEYSINSLOT <slot><count>:返回countslot槽中的键。

    集群故障恢复?#

    如果主节点下线?从节点能否自动升为主节点?注意:15秒超时(一旦超时,从机升为主机)

    • 6379挂掉后,6389成为新的主机。
    • 集群中,主机挂掉后,从机会自动变成主节点,担任主节点功能(有哨兵功能)

    主节点恢复后,主从关系会如何?主节点回来变成从机。(类似于一主多从中的哨兵模式)

    • 6379重启后,6379成为6389的从机。

    如果所有某一段插槽的 主、从节点 都宕掉,redis服务是否还能继续?

    • 如果某一段插槽的主从都挂掉,而cluster-require-full-coverage=yes,那么 ,整个集群都挂掉。
    • 如果某一段插槽的主从都挂掉,而cluster-require-full-coverage=no,那么,该插槽数据全都不能使用,也无法存储。

    redis.conf中的参数cluster-require-full-coverage

    优点#
    • 实现redis扩容;
    • 使用插槽,分摊压力;
    • 无中心配置,相对简单。
    缺点#
    • 多键操作是不被支持的,但可以用组,不方便;
    • 多键的Redis事务是不被支持的。lua脚本不被支持;
    • 由于集群方案出现较晚,很多公司已经采用了其他的集群方案,而代理或者客户端分片的方案想要迁移至redis cluster,需要整体迁移而不是逐步过渡,复杂度较大。
    Jedis (Java操作Redis)# 基本操作#
    1. 依赖

    <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>3.2.0</version> </dependency>

    1. 连接Redis

    public class JedisDemo { public static void main(String[] args) { // 创建Jedis对象 Jedis jedis = new Jedis("192.168.xx.xxx", 6379); // 测试,能够连接上的话,ping通,会返回一个值 String ping = jedis.ping(); System.out.println("连接成功:" + ping); jedis.close(); } }

    ​ 注意:使用Jedis进行操作,需要对Redis的网络相关配置文件进行修改:

    • bind:默认是bind绑定本机,不进行修改的情况下,只能接受本机的访问请求,不写的情况下,能够无限制接受任何ip地址的访问。
    • protected-mode:将本机访问保护模式设置为 no

    如果出现connet timed out错误,检查两块,第一是否配置文件进行了修改,第二防火墙是否关闭。

    Key

    jedis.set("k1", "v1"); jedis.set("k2", "v2"); jedis.set("k3", "v3"); Set<String> keys = jedis.keys("*"); // 返回所有key System.out.println(keys.size()); for (String key : keys) { System.out.println(key); } System.out.println(jedis.exists("k1")); // 是否存在 System.out.println(jedis.ttl("k1")); // 过期时间 System.out.println(jedis.get("k1")); // 获取key对应value值

    String

    jedis.mset("str1","v1","str2","v2","str3","v3"); System.out.println(jedis.mget("str1","str2","str3"));

    List

    // 可以使用lpush或者rpush添加k-v List<String> list = jedis.lrange("mylist",0,-1); for (String element : list) { System.out.println(element); }

    Set

    jedis.sadd("orders", "order01"); jedis.sadd("orders", "order02"); jedis.sadd("orders", "order03"); jedis.sadd("orders", "order04"); Set<String> smembers = jedis.smembers("orders"); for (String order : smembers) { System.out.println(order); } jedis.srem("orders", "order02");

    Hash

    jedis.hset("hash1","userName","lisi"); System.out.println(jedis.hget("hash1","userName")); Map<String,String> map = new HashMap<String,String>(); map.put("telphone","13810169999"); map.put("address","atguigu"); map.put("email","abc@163.com"); jedis.hmset("hash2",map); List<String> result = jedis.hmget("hash2", "telphone","email"); for (String element : result) { System.out.println(element); }

    zset

    jedis.zadd("zset01", 100d, "z3"); jedis.zadd("zset01", 90d, "l4"); jedis.zadd("zset01", 80d, "w5"); jedis.zadd("zset01", 70d, "z6"); Set<String> zrange = jedis.zrange("zset01", 0, -1); for (String e : zrange) { System.out.println(e); }

    模拟验证码发送#

    代码实现:

    public class PhoneCode { public static void main(String[] args) { // 模拟验证码发送 // verifyCode("123456789"); getRedisCode("123456789", "123456"); } // 1. 生成6位数字验证码 public static String getCode(){ Random random = new Random(); String code = ""; for (int i = 0; i < 6; i++) { int nextInt = random.nextInt(10); code += nextInt; } return code; } // 2. 每个手机每天只能发送三次验证码请求,验证码放到redis中,并设置过期时间 public static void verifyCode(String phone) { // 连接redis Jedis jedis = new Jedis("127.0.0.1", 6379); // 拼接key // 手机发送次数的key String countKey = "VerifyCode-" + phone + ":count"; // 验证码的key String phoneKey = "VerifyCode-" + phone + ":code"; // 每个手机每天只能发送三次验证码 String count = jedis.get(countKey); if(count == null) { // 之前还没发送过,这次是第一次发送,设置发送次数为1 jedis.setex(countKey, 24*60*60, "1"); // 设置过期时间为一天 } else if (Integer.parseInt(count) < 3) { // 发送次数加1 jedis.incr(countKey); } else if (Integer.parseInt(count) >= 3) { // 发送已经有三次了,不能再发送了 System.out.println("今天发送验证码的次数已经达到三次,无法再发送!"); jedis.close(); // 关闭连接 return; // 不执行下面的代码 } // 验证码放到redis中 String code1 = getCode(); jedis.setex(phoneKey, 120, code1); // 设置验证码的过期时间为两分钟,会进行覆盖 jedis.close(); } // 3. 验证码校验 public static void getRedisCode(String phone, String code){ // 连接redis Jedis jedis = new Jedis("127.0.0.1", 6379); // 拼接key // 验证码的key String phoneKey = "VerifyCode-" + phone + ":code"; // 判断 String codePhone = jedis.get(phoneKey); if(codePhone.equals(code)) { System.out.println("成功!"); } else { System.out.println("失败!"); } jedis.close(); } } Jedis主从复制#

    private static JedisSentinelPool jedisSentinelPool=null; public static Jedis getJedisFromSentinel(){ if(jedisSentinelPool==null){ Set<String> sentinelSet=new HashSet<>(); sentinelSet.add("172.16.88.168:26379"); // 端口为sentinal JedisPoolConfig jedisPoolConfig =new JedisPoolConfig(); jedisPoolConfig.setMaxTotal(10); // 最大可用连接数 jedisPoolConfig.setMaxIdle(5); // 最大闲置连接数 jedisPoolConfig.setMinIdle(5); // 最小闲置连接数 jedisPoolConfig.setBlockWhenExhausted(true); // 连接耗尽是否等待 jedisPoolConfig.setMaxWaitMillis(2000); // 等待时间 jedisPoolConfig.setTestOnBorrow(true); // 取连接的时候进行测试 jedisSentinelPool=new JedisSentinelPool("mymaster",sentinelSet,jedisPoolConfig); // 服务主机名 return jedisSentinelPool.getResource(); } else { return jedisSentinelPool.getResource(); } } 集群的Jedis开发#

    即使连接的不是主机,集群会自动切换主机存储。主机写,从机读。

    无中心化主从集群。无论从哪台主机写的数据,其他主机上都能读到数据。

    public class JedisClusterTest { public static void main(String[] args) { // 创建对象 Set<HostAndPort> set = new HashSet<HostAndPort>(); set.add(new HostAndPort("172.16.xx.xxx",6379)); // 任何一个端口 JedisCluster jedisCluster = new JedisCluster(set); // 操作 jedisCluster.set("k1", "v1"); System.out.println(jedisCluster.get("k1")); jedisCluster.close(); } } SpringBoot整合Redis#

    1. 依赖

    <!-- redis --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <!-- 连接池:spring2.X集成redis所需common-pool2--> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> <version>2.6.0</version> </dependency>

    1. 配置文件配置Redis

    #Redis服务器地址 spring.redis.host= ip地址 #Redis服务器连接端口 spring.redis.port=6379 #Redis数据库索引(默认为0,一共有16个) spring.redis.database= 0 #连接超时时间(毫秒) spring.redis.timeout=1800000 #连接池最大连接数(使用负值表示没有限制) spring.redis.lettuce.pool.max-active=20 #最大阻塞等待时间(负数表示没限制) spring.redis.lettuce.pool.max-wait=-1 #连接池中的最大空闲连接 spring.redis.lettuce.pool.max-idle=5 #连接池中的最小空闲连接 spring.redis.lettuce.pool.min-idle=0

    1. Redis配置类(需要继承CachingConfigurerSupport

    @EnableCaching @Configuration public class RedisConfig extends CachingConfigurerSupport { @Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) { RedisTemplate<String, Object> template = new RedisTemplate<>(); RedisSerializer<String> redisSerializer = new StringRedisSerializer(); Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(om); template.setConnectionFactory(factory); // key序列化方式 template.setKeySerializer(redisSerializer); // value序列化 template.setValueSerializer(jackson2JsonRedisSerializer); // value hashmap序列化 template.setHashValueSerializer(jackson2JsonRedisSerializer); return template; } @Bean public CacheManager cacheManager(RedisConnectionFactory factory) { RedisSerializer<String> redisSerializer = new StringRedisSerializer(); Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); // 解决查询缓存转换异常的问题 ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(om); // 配置序列化(解决乱码的问题),过期时间600秒 RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig() .entryTtl(Duration.ofSeconds(600)) .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(jackson2JsonRedisSerializer)) .disableCachingNullValues(); RedisCacheManager cacheManager = RedisCacheManager.builder(factory) .cacheDefaults(config) .build(); return cacheManager; } }

    1. 测试

    应用问题解决(面试重点)# 缓存穿透#

    现象#

    key对应的数据在数据源并不存在,每次针对此key的请求从缓存获取不到,请求都会压到数据源,从而可能压垮数据源。

    ​ redis查询不到数据库,出现了很多非正常url访问。黑客攻击就是通过查询一个不存在的值,缓存里面没有,那么就会查数据库,大量类似的请求发生后,导致数据库崩溃。若黑客利用此漏洞进行攻击可能压垮数据库。

    造成的条件:

    1. 应用服务器压力变大,访问请求增光
    2. redis命中率下降(重点)
    3. 导致一直访问查询数据库

    服务器压力变大,请求太多,导致redis缓存命中率开始下降,对数据库的访问越来越多,数据库最终承受不住压力,崩溃了。

    如何解决#
    • 对空值缓存

      如果一个查询返回的数据为空(不管是数据是否不存在),仍然把这个空结果(null)进行缓存,设置空结果的过期时间会很短,最长不超过五分钟。

    • 设置可访问的名单(白名单):

      使用bitmaps类型定义一个可以访问的名单,名单id作为bitmaps的偏移量,每次访问和bitmap里面的id进行比较,如果访问id不在bitmaps里面,进行拦截,则不允许访问。

    • 采用布隆过滤器

      布隆过滤器(Bloom Filter)是1970年由布隆提出的。它实际上是一个很长的二进制向量(位图)和一系列随机映射函数(哈希函数)。(跟bitmaps类似,不过效率更高)

      布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都远远超过一般的算法缺点是有一定的误识别率和删除困难,命中率不一定高

      将所有可能存在的数据哈希到一个足够大的bitmaps中,一个一定不存在的数据会被这个bitmaps拦截掉,从而避免了对底层存储系统的查询压力。

    • 进行实时监控

      当发现Redis的命中率开始急速降低,需要排查访问对象和访问的数据,和运维人员配合,可以设置黑名单限制服务

    缓存击穿#

    注意与缓存穿透的区别:

    • 缓存穿透
      • redis命中率下降,导致数据库访问量激增
    • 缓存击穿
      • redis正常访问,但某个热点key突然失效,导致瞬间数据库的访问量激增
    现象#

    key对应的数据存在,但在redis中过期,此时若有大量并发请求过来,这些请求发现缓存过期一般都会从后端DB加载数据并回设到缓存,这个时候大并发的请求可能会瞬间把后端DB压垮。

    1. 数据库访问压力瞬间增大
    2. redis中没有出现大量key过期,redis正常运行(与缓存穿透的区别)
    3. 某个经常访问的key,即十分热点的key,不停地被大量访问,当这个key过期的瞬间,持续的高并发就击穿了缓存,大量请求数据库,导致数据库奔溃
    如何解决#
    • 预先设置热门数据

      redis高峰访问之前,把一些热门数据提前存入到redis里面,加大这些热门数据key的时长

    • 实时调整

      现场监控哪些数据热门,实时调整key的过期时长。

    • 使用锁

      Redis 6有哪些新特性或优化?

    缓存雪崩#

    现象#

    key对应的数据存在,但在redis中过期,此时若有大量并发请求过来,这些请求发现缓存过期后,一般都会从后端DB加载数据并回设到缓存,这个时候大并发的请求可能会瞬间把后端DB压垮。

    缓存雪崩与缓存击穿的区别在于这里针对很多key缓存,前者则是某一个key

    1. 数据库压力变大
    2. 极少的时间段,查询大量key的集中过期情况(大量key集中过期,而缓存击穿是热点key过期)
    如何解决#
    • 构建多级缓存架构

      nginx缓存 +redis缓存 + 其他缓存(ehcache等)

    • 使用锁或队列:

      用加锁或者队列的方式保证来保证不会有大量的线程对数据库一次性进行读写,从而避免失效时大量的并发请求落到底层存储系统上。不适用高并发情况

    • 设置过期标志更新缓存:

      记录缓存数据是否过期(设置提前量),快过期的时候,提前进行一个缓存。如果过期会触发通知另外的线程在后台去更新实际key的缓存。

    • 将缓存失效时间分散开:

      比如我们可以在原有的失效时间基础上增加一个随机值,比如 1~5 分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件

    分布式锁# 基本介绍#

    ​ 随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的Java API并不能提供分布式锁的能力。为了解决这个问题就需要一种跨JVM的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题。

    分布式锁主流的实现方案:

    • 基于数据库实现分布式锁
    • 基于缓存(Redis等)
    • 基于Zookeeper

    每一种分布式锁解决方案都有各自的优缺点:

    • 性能:redis最高
    • 可靠性:zookeeper最高
    设置锁以及过期时间#
    • 设置锁的命令

    SETNX KEY VALUE # 设置锁 del key # 删除锁

    • 给锁设置过期时间

    expire users 30 # 给users上锁30s

    • 优化:上锁的同时设置过期时间

    set key value nx ex time # nx 上锁;ex 设置过期时间

    • Java实现

    @GetMapping("testLock") public void testLock(){ //1获取锁,setne ,顺便设置过期时间 Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "111",3,TimeUnit.SECONDS); // key, value, 过期时间,时间单位 //2获取锁成功、查询num的值 if(lock){ Object value = redisTemplate.opsForValue().get("num"); //2.1判断num为空return if(StringUtils.isEmpty(value)){ return; } //2.2有值就转成成int int num = Integer.parseInt(value+""); //2.3把redis的num加1 redisTemplate.opsForValue().set("num", ++num); //2.4释放锁,del redisTemplate.delete("lock"); }else{ //3获取锁失败、每隔0.1秒再获取 try { Thread.sleep(100); // 休眠,等一会 testLock(); // 再去尝试获取锁 } catch (InterruptedException e) { e.printStackTrace(); } } } 分布式锁产生的问题# 使用UUID防止误删锁#

    现象

    ​ a先上锁后,在执行操作的过程中,服务器卡顿,而10秒过期后,b抢到锁进行具体操作,然而此时a的服务器恢复正常,a继续执行操作并结束,此时有一个释放锁的操作,那么此时释放的锁是b的锁,这就是导致误删除锁的现象发生。

    解决方案

    Java实现修改版:

    @GetMapping("testLock") public void testLock(){ // 设置UUID String uuid = UUID.randomUUID().toString(); ..... if(lock){ ... // 判断UUID值是否一样 String lockUuid = (String)redisTemplate.opsForValue().get("lock"); if(uuid.equals(lockUuid)){ // UUID一样时,才释放锁 //2.4释放锁,del redisTemplate.delete("lock"); } }else{ ... } } Lua保证删除原子性#

    问题:删除操作缺乏原子性,即uuid的比较操作和删除操作不是原子操作

    Java实现修改:

    @GetMapping("testLockLua") public void testLockLua() { //1 声明一个uuid ,将做为一个value 放入我们的key所对应的值中 String uuid = UUID.randomUUID().toString(); //2 定义一个锁:lua 脚本可以使用同一把锁,来实现删除! String skuId = "25"; // 访问skuId 为25号的商品 100008348542 String locKey = "lock:" + skuId; // 锁住的是每个商品的数据 // 3 获取锁 Boolean lock = redisTemplate.opsForValue().setIfAbsent(locKey, uuid, 3, TimeUnit.SECONDS); // 第一种: lock 与过期时间中间不写任何的代码。 // redisTemplate.expire("lock",10, TimeUnit.SECONDS);//设置过期时间 // 如果true if (lock) { // 执行的业务逻辑开始 // 获取缓存中的num 数据 Object value = redisTemplate.opsForValue().get("num"); // 如果是空直接返回 if (StringUtils.isEmpty(value)) { return; } // 不是空 如果说在这出现了异常! 那么delete 就删除失败! 也就是说锁永远存在! int num = Integer.parseInt(value + ""); // 使num 每次+1 放入缓存 redisTemplate.opsForValue().set("num", String.valueOf(++num)); /*使用lua脚本来锁*/ // 定义lua 脚本 String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; // 使用redis执行lua执行 DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(); redisScript.setScriptText(script); // 设置一下返回值类型 为Long // 因为删除判断的时候,返回的0,给其封装为数据类型。如果不封装那么默认返回String 类型, // 那么返回字符串与0 会有发生错误。 redisScript.setResultType(Long.class); // 第一个要是script 脚本 ,第二个需要判断的key,第三个就是key所对应的值。 redisTemplate.execute(redisScript, Arrays.asList(locKey), uuid); } else { ..... } } 总结#

    为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件

    • 互斥性。在任意时刻,只有一个客户端能持有锁

    • 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。

    • 解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。

    • 加锁和解锁必须具有原子性

    本文共计20818个文字,预计阅读时间需要84分钟。

    Redis 6有哪些新特性或优化?

    Redis 是一种流行的 NoSQL 数据库,支持多种数据结构类型。设计理念包括单线程、多路IO复用技术。示例:内存中只有一个单线程,无需进行线程切换等操作,确保了 Redis 在内存中的处理效率。

    Redis是典型的NoSQL数据库,支持多种数据结构类型。设计思想是:单线程+多路IO复用技术

    上图的例子说明:内存中只有一个单线程,无需进行线程切换等操作,保证了redis在内存处理中的效率,同时使用多个socket链接复用,一旦需要哪个链接的数据准备就绪之后,就将这个线程与这个链接进行IO操作。总结一句话就是:通过一个线程,进行拨开关的方式,来同时传输多个IO流

    redis官网:redis.io/download

    Redis是一个开源的key-value存储系统。

    Memcached类似,它支持存储的value类型相对更多,包括string、list、set、zset、sorted set、hash

    这些数据类型都支持push/pop、add/remove及取交集并集和差集及更丰富的操作,而且这些操作都是原子性的。

    在此基础上,Redis支持各种不同方式的排序。

    memcached一样,为了保证效率,数据都是缓存在内存中。

    区别的是Redis会周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文件。

    并且在此基础上实现了master-slave(主从)同步。

    单线程 +IO多路复用。

    安装和启动#
    • windows下:

      www.cnblogs.com/xing-nb/p/12146449.html

      www.cnblogs.com/peterYong/p/9652336.html

    • linux下:

    安装C语言的编译环境

    yum install centos-release-scl scl-utils-build yum install -y devtoolset-8-toolchain scl enable devtoolset-8 bash

    通过wget下载

    wget download.redis.io/releases/redis-6.2.6.tar.gz // 下载路径:/opt

    解压至当前目录

    tar -zxvf redis-6.2.6.tar.gz

    解压完成后进入目录

    cd redis-6.2.6

    在当前目录下执行make

    make && make install

    默认安装在/usr/local/bin

    redis-benchmark:性能测试工具,可以在自己本子运行,看看自己本子性能如何 redis-check-aof:修复有问题的AOF文件,rdb和aof后面讲 redis-check-dump:修复有问题的dump.rdb文件 redis-sentinel:Redis集群使用 redis-server:Redis服务器启动命令 redis-cli:客户端,操作入口

    前台启动:/usr/local/bin目录下启动redis

    redis-server(前台启动)

    后台启动:

    • 安装redis的目录/opt/redis-6.2.6中将redis.conf复制到任意一个文件夹下

      cp redis.conf /etc/redis.conf // 将redis.conf复制到/etc/下

    • 修改/etc/redis.conf配置文件

      vim redis.conf # daemonize no 修改为 daemonize yes

    • /usr/local/bin目录下启动redis

      redis-server /etc/redis.conf

    关闭redis

    • kill进程:kill掉进程号
    • 命令shutdown:redis-cli shutdown

    默认端口号:6379

    NoSQL数据库#

    ​ web2.0时代,访问量太大,CPUI和内存有巨大压力,如果还是按照单个服务器进行响应的话,显然性能是不够的,于是引出分布式架构的服务,也就是多台服务器来响应请求,使用过NoSQL数据库进行解决。

    NoSQL数据库的作用:

    • 解决CPU及内存压力

      进行分布式服务的时候,可能同一台电脑的请求是由不同的服务器进行响应的,那么如何存放session?也就是session的共享问题。

    • 解决IO压力

      当做高速缓存使用,缓解访问数据库的压力

    NoSQL数据库的特点:

    NoSQL( NoSQL = Not Only SQL ),意即 “不仅仅是SQL” ,泛指非关系型的数据库

    NoSQL不依赖业务逻辑方式存储,而以简单的key-value模式存储。因此大大的增加了数据库的扩展能力。

    • 不遵循SQL标准。
    • 不支持ACID
    • 远超于SQL的性能。

    适用于的场景

    • 对数据高并发的读写
    • 海量数据的读写
    • 对数据高可扩展性的。

    不适用的场景

    • 需要事务支持;
    • 基于sql的结构化查询存储,处理复杂的关系,需要即席查询。
    • 用不着sql的情况以及用了sql也不行的情况下,考虑NoSQL

    常见的NoSQL数据库

    • Redis

    • MongoDB

    大数据时代常用的数据库类型

    • 行式数据库

    • 列式数据库

    配置文件#

    **Redis 的配置文件位于 Redis 安装目录下,文件名为 ** redis.conf(Windows 名为 redis.windows.conf)。

    • 端口号:6379

    • 默认16个数据库,类似数组下标从0开始,初始默认使用0号库

      • select 8:切换到8号库
      • 所有库拥有统一的密码
      • dbsize:查看当前数据库的key的数量
      • flushdb:清空当前库
      • flushall:通杀全部库
    Units单位#

    单位,配置大小单位,开头定义了一些基本的度量单位,只支持bytes,不支持bit

    大小写不敏感。

    INCLUDES包含#

    包含,多实例的情况可以把公用的配置文件提取出来

    NETWORK网络#

    网络相关配置。

    bind

    默认情况bind=127.0.0.1只能接受本机的访问请求。

    不写的情况下,无限制接受任何ip地址的访问。

    生产环境肯定要写你应用服务器的地址,服务器是需要远程访问的,所以需要将其注释掉

    如果开启了protected-mode,那么在没有设定bind ip且没有设密码的情况下,Redis只允许接受本机的响应。

    protected-mode

    将本机访问保护模式设置no

    port

    端口号,默认6379

    tcp-backlog

    设置tcpbacklogbacklog其实是一个连接队列,backlog队列总和 $=$ 未完成三次握手队列 $+$ 已经完成三次握手队列。

    在高并发环境下你需要一个高backlog值来避免慢客户端连接问题。

    timeout

    一个空闲的客户端维持多少秒会关闭,0 表示关闭该功能。即永不关闭。

    tcp-keepalive

    对访问客户端的一种心跳检测,每个n秒检测一次。(默认是300s检测一次)

    单位为秒,如果设置为 0,则不会进行Keepalive检测,建议设置成 60。

    GENERAL#

    通用。

    daemonize

    是否为后台进程,设置为yes

    守护进程,后台启动。

    pidfile

    存放pid文件的位置,每个实例会产生一个不同的pid文件。

    loglevel

    指定日志记录级别,Redis总共支持四个级别:debug、verbose、notice、warning,默认为notice

    logfile

    日志文件名称。

    database

    设定库的数量 默认16,默认数据库为 0,可以使用SELECT <dbid>命令在连接上指定数据库id

    SECURITY#

    安全。

    访问密码的查看、设置和取消。

    在命令中设置密码,只是临时的。重启redis服务器,密码就还原了。

    永久设置,需要在配置文件中进行设置。

    LIMITS#

    限制。

    maxclients

    设置redis同时可以与多少个客户端进行连接。

    默认情况下为10000个客户端。

    如果达到了此限制,redis则会拒绝新的连接请求,并且向这些连接请求方发出max number of clients reached以作回应。

    maxmemory

    建议必须设置,否则,将内存占满,造成服务器宕机。

    设置redis可以使用的内存量。一旦到达内存使用上限,redis将会试图移除内部数据,移除规则可以通过maxmemory-policy来指定。

    如果redis无法根据移除规则来移除内存中的数据,或者设置了不允许移除,那么redis则会针对那些需要申请内存的指令返回错误信息,比如SET、LPUSH等。

    但是对于无内存申请的指令,仍然会正常响应,比如GET等。如果你的redis是主redis( 说明你的redis有从redis),那么在设置内存使用上限时,需要在系统中留出一些内存空间给同步队列缓存,只有在你设置的是“不移除”的情况下,才不用考虑这个因素。

    maxmemory-policy

    volatile-lru:使用LRU算法移除key,只对设置了过期时间的键(最近最少使用)。

    allkeys-lru:在所有集合key中,使用LRU算法移除key

    volatile-random:在过期集合中移除随机的key,只对设置了过期时间的键。

    allkeys-random:在所有集合key中,移除随机的key

    volatile-ttl:移除那些TTL值最小的key,即那些最近要过期的key

    noeviction:不进行移除。针对写操作,只是返回错误信息。

    maxmemory-samples

    设置样本数量,LRU算法和最小TTL算法都并非是精确的算法,而是估算值,所以你可以设置样本的大小,redis默认会检查这么多个key并选择其中LRU的那个。

    一般设置 3 到 7 的数字,数值越小样本越不准确,但性能消耗越小。

    常用五大基本数据类型# 前言:Redis键(key操作)#

    keys *:查看当前库所有key

    set key value:添加一组 k-v

    exists key:判断某个key是否存在

    type key:查看你的key是什么类型

    del key:删除指定的key数据(直接删除,而不是异步删除)

    unlink key根据value选择非阻塞删除,仅将keyskeyspace元数据中删除,真正的删除会在后续异步操作

    expire key 10:为给定的key设置过期时间

    ttl key:查看还有多少秒过期,-1表示永不过期,-2表示已过期

    select:命令切换数据库

    dbsize:查看当前数据库的key的数量

    flushdb:清空当前库

    flushall:通杀全部库

    字符串(String)#

    String类型是最基本的类型,是二进制安全的。意味着Redisstring可以包含任何数据,比如jpg图片或者序列化的对象,只要数据能够存储为字符串类型,redis都能通过k-v的形式存储。

    String类型是Redis最基本的数据类型,一个Redis中字符串value最多可以是 512M

    命令操作:

    set <key> <value>:添加键值对,也包含对一个key下的value进行覆盖

    get <key>:查询对应键值

    append <key> <value>:将给定的<value>追加到原值的末尾,返回添加后的字符串长度值

    strlen <key>:获得值的长度

    setnx <key> <value>:只有在key不存在时,才能设置key的值(注意与set命令的区别)

    incr <key>:将key中储存的数字值增 1,只能对数字值操作,如果为空,新增值为 1(具有原子性

    decr <key>:将key中储存的数字值减 1,只能对数字值操作,如果为空,新增值为 -1

    incrby/decrby <key><步长>:将key中储存的数字值增减。自定义步长

    mset <key1> <value1> <key2> <value2>:同时设置一个或多个key-value

    mget <key1> <key2> <key3>...:同时获取一个或多个value

    msetnx <key1> <value1> <key2> <value2>...:同时设置一个或多个key-value对,当且仅当所有给定key都不存在(原子性操作,有一个失败那么就全部失败)

    getrange <key><起始位置><结束位置>:获得值的范围(将字符串理解成一个数组,按数组序号进行取子字符串的操作)

    setrange <key><起始位置><value>:用<value>覆写<key>所储存的字符串值

    setex <key><过期时间><value>:设置键值的同时,设置过期时间,单位秒。(可以通过ttl key的命令,查看过期时间)

    getset <key><value>:以新换旧,设置了新值同时获得旧值。

    原子性

    所谓原子操作是指不会被线程调度机制打断的操作;

    这种操作一旦开始,就一直运行到结束,中间不会有任何context switch(切换到另一个线程)。

    • 在单线程中, 能够在单条指令中完成的操作都可以认为是"原子操作",因为中断只能发生于指令之间。(而redis是单线程的,所以redis中的操作是不会被打断的

    • 在多线程中,不能被其它进程(线程)打断的操作就叫原子操作。

    Redis单命令的原子性主要得益于Redis的单线程。

    数据结构

    内部结构实现上类似于JavaArrayList,采用预分配冗余空间的方式来减少内存的频繁分配,字符串大小小于1M时,扩容都是加倍现有的空间大小,如果空间大小一旦超过1M,每次扩容只会多扩容1M的空间,注意字符串最大长度是512M。

    列表(List)#

    单键多值

    Redis列表是简单的字符串列表按照插入顺序排序。你可以添加一个元素到列表的头部(左边)或者尾部(右边)。

    它的底层实际是个双向链表,对两端的操作性能很高,通过索引下标的操作中间的节点性能会较差。

    lpush/rpush <key><value1><value2><value3> ....: 从左边/右边插入一个或多个值。

    lpush k1 v1 v2 v3 (头插法) lrange k1 0 -1 输出:v3 v2 v1 rpush k1 v1 v2 v3 (尾插法) rrange k1 0 -1 输出:v1 v2 v3

    lpop/rpop <key>:从左边/右边吐出一个值。值在键在,值光键亡。

    rpoplpush <key1><key2>:从<key1>列表右边吐出一个值,插到<key2>列表左边。

    lrange <key><start><stop>:按照索引下标获得元素(从左到右)

    lrange <key> 0 -1:左边第一个,-1右边第一个,(0 -1表示获取所有)

    lindex <key><index>:按照索引下标获得元素(从左到右,从0开始到index索引值)

    llen <key>:获得列表长度

    linsert <key> before/after <value> <newvalue>:在<value>的前面/后面插入<newvalue>插入值

    lrem <key><n><value>:从左边删除nvalue(从左到右)

    lset <key><index><value>:将列表key下标为index的值替换成value

    数据结构

    List的数据结构为快速链表quickList

    • 首先在列表元素较少的情况下会使用一块连续的内存存储,这个结构是ziplist,也即是压缩列表

      • 它将所有的元素紧挨着一起存储,分配的是一块连续的内存。
    • 数据量比较多的时候才会改成quicklist

      • 因为普通的链表需要的附加指针空间太大,会比较浪费空间。比如这个列表里存的只是int类型的数据,结构上还需要两个额外的指针prevnext

    • Redis链表ziplist结合起来组成了quicklist。也就是将多个ziplist使用双向指针串起来使用。这样既满足了快速的插入删除性能,又不会出现太大的空间冗余。quicklist结构图如下:

    Set(集合)#

    Set对外提供的功能List类似列表的功能。(一个key,对应一个set集合)

    • 特殊之处在于Set是可以自动排重

    • 当需要存储一个列表数据,又不希望出现重复数据时,Set是一个很好的选择,并且Set提供了判断某个成员是否在一个Set集合内的重要接口,这个也是List所不能提供的。

    • RedisSetString类型的无序集合。它底层其实是一个valuenullhash表,所以添加,删除,查找的复杂度都是O(1)

    一个算法,随着数据的增加,执行时间的长短,如果是O(1),数据增加,查找数据的时间不变。

    命令操作:

    sadd <key><value1><value2> .....:将一个或多个member元素加入到集合key中,已经存在的member元素将被忽略

    smembers <key>:取出该集合的所有值。

    sismember <key><value>:判断集合<key>是否为含有该<value>值,有返回 1,没有返回 0

    scard<key>:返回该集合的元素个数。

    srem <key><value1><value2> ....:删除集合中的某个元素

    spop <key>:随机从该集合中吐出一个值

    srandmember <key><n>:随机从该集合中取出n个值,不会从集合中删除

    smove <source><destination>value:把集合中一个值从一个集合移动到另一个集合

    sinter <key1><key2>:返回两个集合的交集元素

    sunion <key1><key2>:返回两个集合的并集元素

    sdiff <key1><key2>:返回两个集合的差集元素(key1中的,不包含key2中的)

    数据结构

    Set数据结构是字典,字典是用哈希表实现的。

    Hash(哈希)#

    Redis hash是一个键值对集合。

    Redis hash是一个String类型的fieldvalue的映射表,hash特别适合用于存储对象。( 可以理解为Java中的Map<String,Object> )

    hset <key><field><value>:给<key>集合中的<field>键赋值<value>

    • hset user:1001 id 1,redis允许key带 :,可以理解为key值是 user:1001,这是为了存很多对象时,用来区分不同对象
    • hset user:1001 name zhanggsan,给 user:1001 这个key对应的对象中,添加一个name字段,值为zhangsan

    hget <key1><field>:从<key1>集合<field>取出value

    • hget user:1001 id,打印结果为“1”
    • hget user:1001 name,打印结果为“zhangsan”

    hmset <key1><field1><value1><field2><value2>...: 批量设置hash的值(新版本这项命令,已经可以用hset进行实现了)

    hexists <key1><field>:查看哈希表key中,给定域field是否存在(1:存在,0:不存在)

    hkeys <key>:列出该hash集合的所有field

    • hkeys user:1001,列出这个key下所有的字段

    hvals <key>:列出该hash集合的所有value

    hincrby <key> <field> <increment>:为哈希表key中的域field的值加上增量 1 -1

    hsetnx <key> <field> <value>:将哈希表key中的域field的值设置为value,当且仅当域field不存在

    数据结构

    Hash类型对应的数据结构是两种:ziplist(压缩列表),hashtable(哈希表)。

    field-value长度较短且个数较少时,使用ziplist,否则使用hashtable

    Zset(有序集合)#

    Redis有序集合zset与普通集合set非常相似,是一个没有重复元素的字符串有序集合

    不同之处是有序集合的每个成员都关联了一个评分(score,这个评分(score)被用来按照从最低分到最高分的方式排序集合中的成员。集合的成员是唯一的,但是评分可以是重复的。

    因为元素是有序的,所以可以很快的根据评分(score)或者次序(position)来获取一个范围的元素。

    访问有序集合的中间元素也是非常快的,因此能够使用有序集合作为一个没有重复成员的智能列表。

    操作命令:

    zadd <key> <score1> <value1> <score2> <value2> …:将一个或多个member元素及其score值加入到有序集key当中

    zrange <key> <start> <stop> [WITHSCORES]:返回有序集key中,下标在<start><stop>之间的元素( 0 -1 ,表示所有)

    • 当带上WITHSCORES时,可以让分数一起和值返回到结果集

    zrangebyscore key min max [withscores] [limit offset count]:返回有序集key中,所有score值介于minmax之间(包括等于minmax)的成员。有序集成员score值递增(从小到大)次序排列

    zrevrangebyscore key max min [withscores] [limit offset count]:同上,改为从大到小排列

    zincrby <key> <increment> <value>:为元素的score加上增量

    zrem <key> <value>:删除该集合下,指定值的元素

    zcount <key> <min> <max>:统计该集合,分数区间内的元素个数

    zrank <key> <value>:返回该值在集合中的排名,从 0 开始

    数据结构

    SortedSet(zset)Redis提供的一个非常特别的数据结构,一方面它等价于Java的数据结构Map<String, Double>,可以给每一个元素value赋予一个权重score,另一方面它又类似于TreeSet,内部的元素会按照权重score进行排序,可以得到每个元素的名次,还可以通过score的范围来获取元素的列表。

    zset底层使用了两个数据结构

    • hashhash的作用就是关联元素value和权重score(通过key值,能够找到关联的value和score),保障元素value的唯一性,可以通过元素value找到相应的score

    • 跳跃表,跳跃表的目的在于给元素value排序,根据score的范围获取元素列表

    Redis6 新数据类型# Bitmaps#

    • 将 Bitmaps 数据类型理解为一个数组,每个单位只存储0和1

    实例:

    • getbit:获取Bitmaps中某个偏移量的值
    • bitcount[start end]:统计字符串被设置为1的bit数,start end可以指定范围,且可以使用负数值,例如:-1表示最后一个位,-2表示倒数第二个位置(从0开始....)
    • bitop and(or/not/xor)[key...]:复合操作,可以做多个Bitmaps的交集、并集等操作,并将结果保存在destkey中
      • 例如:bitop and users:1 users:2 users:3,将users:2与users:3的交集结果存放到key为users:1的值中

    Bitmaps与set对比

    HyperLogLog#

    命令操作:

    1. pfadd[element...]:添加指定元素到HyperLogLog中,执行命令后,若基数发生变化则返回1,否则返回0

    2. pfcount[key...]:计算基数值

    3. pfmerge[其中,sourcekey可以为多个]:将多个HyperLogLog数据类型进行合并,例子比如将月活跃用户数与日活跃用户数进行合并,就可以使用pfcount进行统计基数

    Geospatial#

    Redis的发布与订阅#

    Redis发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。

    Redis客户端可以订阅任意数量的频道。

    发布者可以建立许多个频道进行消息的发送(如上图频道1、频道2、频道3),供订阅者进行接收和监听消息。

    1. 客户端可以订阅频道

    1. 当给这个频道发布消息后,消息就会发送给订阅的客户端

    发布订阅命令行实现

    1. 打开一个客户端订阅channel1
    • subscribe channel1
    1. 打开另一个客户端,给channel1发布消息hello
    • publish channel1 hello

    • 返回的数字表示:订阅者的数量、

    1. 打开第一个客户端可以看到发送的信息

    注:发布的消息如果没有持久化,那么在订阅的客户端是接收不到消息的,只能收到订阅后发布的消息

    事务和锁机制#

    Redis事务是一个单独的隔离操作事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。

    Redis事务的主要作用就是串联多个命令防止别的命令插队

    MultiExecDiscard#

    Multi

    Exec

    Discard

    输入Multi命令开始,输入的命令都会依次进入命令队列中,但不会执行,直到输入Exec后,Redis会将之前的命令队列中的命令依次执行

    组队的过程中可以通过Discard来放弃组队。

    Redis的事务就是:先使用multi进行命令的添加(组队过程),组队完毕后,使用exec进行执行,这个执行过程不能被其他命令打断(相当于事务执行的过程),如果要中止,就使用discard命令(类似于回滚操作)。

    • multi开启组队,输入命令;组队成功,exec执行事务,执行完毕,事务结束

    • 放弃组队

    • 组队中有命令错误,不会执行

    • 组队中不报错,执行时报错

    悲观锁#

    悲观锁(Pessimistic Lock),即每次去拿数据的时候都认为有其他线程会修改,所以每次在拿数据的时候都会上锁,这样其他线程想要拿到这个数据就会被block直到释放锁后,成功拿到锁。(效率低,操作之前先上锁)

    乐观锁#

    乐观锁(Optimistic Lock),即每次去拿数据的时候都认为其他线程不会修改,所以不会上锁,但是在更新的时候会判断,在此期间有没有其他线程去更新这个数据,可以使用版本号控制等机制。

    乐观锁适用于多读的应用类型,这样可以提高吞吐量

    Redis就是利用这种check-and-set机制实现事务的。

    Watch、unwatch#

    在执行multi之前,先执行watch key1 [key.....],可以监视一个(或多个 )key。如果在事务执行之前,这个key被其他命令所改动,那么事务将被打断。(类似于上锁,一旦发现watch的这个key被修改了,那么自己的exec操作就会中断)

    取消WATCH命令对所有key的监视:如果在执行WATCH命令之后,EXEC命令或DISCARD命令先被执行,那么就不需要再执行UNWATCH

    事务三特性#
    • 单独的隔离操作

      事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。

    • 没有隔离级别的概念

      队列中的命令没有提交之前都不会实际被执行,因为事务提交前任何指令都不会被实际执行。

    • 不保证原子性

      事务中如果有一条命令执行失败,其后的命令仍然会被执行,没有回滚 。

    模拟秒杀# 基本实现#

    核心的逻辑代码:

    public class SecKill_redis { public static void main(String[] args) { Jedis jedis =new Jedis("192.168.242.110",6379); System.out.println(jedis.ping()); jedis.close(); } //秒杀过程 public static boolean doSecKill(String uid,String prodid) throws IOException { //1 uid和prodid非空判断 if(uid == null || prodid == null){ return false; } //2 连接redis Jedis jedis =new Jedis("192.168.xx.xxx",6379); //3 拼接key // 3.1 库存key String kcKey = "sk:"+prodid+":qt"; // 3.2 秒杀成功用户key String userKey = "sk:"+prodid+":user"; //4 获取库存,如果库存null,秒杀还没有开始 String kc = jedis.get(kcKey); if(kc == null){ System.out.println("秒杀还没开始,请稍等"); jedis.close(); return false; } // 5 判断用户是否重复秒杀操作 if(jedis.sismember(userKey, uid)){ System.out.println("每个用户只能秒杀成功一次,请下次再来"); jedis.close(); return false; } //6 判断如果商品数量,库存数量小于1,秒杀结束 if(Integer.parseInt(kc) < 1){ System.out.println("秒杀结束,请下次参与"); jedis.close(); return false; } //7 秒杀过程 //7.1库存-1 jedis.decr(kcKey); //7.2 把秒杀成功的用户添加到清单里面 jedis.sadd(userKey,uid); System.out.println("用户" + uid + "秒杀成功"); jedis.close(); return true; } } 使用ab工具模拟并发以及暴露出的问题#

    CentOS 6 默认安装

    CentOS 7 手动安装(yum -y install 192.168.0.43:8080/Seckill/doseckill

    -n:测试会话中所执行的请求个数

    -c:一次产生的请求个数

  • 并发暴露出来的问题

    • 会出现超卖问题:卖完了商品,但还存在继续购买,即库存变为负数

      • 解决方案:使用乐观锁,进行版本控制(redis事务+watch)

        代码修改:

        //秒杀过程 public static boolean doSecKill(String uid,String prodid) throws IOException { //1 uid和prodid非空判断 if(uid == null || prodid == null){ return false; } //2 连接redis //Jedis jedis =new Jedis("192.168.xx.xxx",6379); //通过连接池获取连接redis的对象 JedisPool jedisPoolInstance = JedisPoolUtil.getJedisPoolInstance(); Jedis jedis = jedisPoolInstance.getResource(); //3 拼接key // 3.1 库存key String kcKey = "sk:"+prodid+":qt"; // 3.2 秒杀成功用户key String userKey = "sk:"+prodid+":user"; //监视库存 jedis.watch(kcKey); //4 获取库存,如果库存null,秒杀还没有开始 String kc = jedis.get(kcKey); if(kc == null){ System.out.println("秒杀还没开始,请稍等"); jedis.close(); return false; } // 5 判断用户是否重复秒杀操作 if(jedis.sismember(userKey, uid)){ System.out.println("每个用户只能秒杀成功一次,请下次再来"); jedis.close(); return false; } //6 判断如果商品数量,库存数量小于1,秒杀结束 if(Integer.parseInt(kc) < 1){ System.out.println("秒杀结束,请下次参与"); jedis.close(); return false; } //7 秒杀过程 //使用事务 Transaction multi = jedis.multi(); //组队操作 multi.decr(kcKey); multi.sadd(userKey,uid); //执行 List<Object> results = multi.exec(); if(results == null || results.size()==0) { System.out.println("秒杀失败了...."); jedis.close(); return false; } // //7.1库存-1 // jedis.decr(kcKey); // //7.2 把秒杀成功的用户添加到清单里面 // jedis.sadd(userKey,uid); System.out.println("用户" + uid + "秒杀成功"); jedis.close(); return true; }

    • 连接超时问题

      • 解决方案:采用连接池

      // 创建工具类 public class JedisPoolUtil { private static volatile JedisPool jedisPool = null; private JedisPoolUtil() { } public static JedisPool getJedisPoolInstance() { if (null == jedisPool) { synchronized (JedisPoolUtil.class) { if (null == jedisPool) { JedisPoolConfig poolConfig = new JedisPoolConfig(); poolConfig.setMaxTotal(200); poolConfig.setMaxIdle(32); poolConfig.setMaxWaitMillis(100*1000); poolConfig.setBlockWhenExhausted(true); poolConfig.setTestOnBorrow(true); // ping PONG jedisPool = new JedisPool(poolConfig, "192.168.xx.xxx", 6379, 60000 ); } } } return jedisPool; } public static void release(JedisPool jedisPool, Jedis jedis) { if (null != jedis) { jedisPool.returnResource(jedis); } } }

      修改代码,主要是针对前面基本实现中的核心代码,对获取redis对象进行修改:

      //2 连接redis //Jedis jedis =new Jedis("192.168.xx.xxx",6379); //通过连接池获取连接redis的对象 JedisPool jedisPoolInstance = JedisPoolUtil.getJedisPoolInstance(); Jedis jedis = jedisPoolInstance.getResource();

    • 商品遗留问题,即秒杀已经结束了,却还有商品库存

      • 解决方案:使用Lua脚本

      Lua 是一个小巧的脚本语言,Lua脚本可以很容易的被C/C++ 代码调用,也可以反过来调用C/C++的函数,Lua并没有提供强大的库,一个完整的Lua解释器不过200k,所以Lua不适合作为开发独立应用程序的语言,而是作为嵌入式脚本语言。很多应用程序、游戏使用LUA作为自己的嵌入式脚本语言,以此来实现可配置性、可扩展性。

      将复杂的或者多步的redis操作,写为一个脚本,一次提交给redis执行,减少反复连接redis的次数。提升性能。

      Lua脚本是类似redis事务,有一定的原子性,不会被其他命令插队,可以完成一些redis事务性的操作。

      但是注意redis的lua脚本功能,只有在Redis 2.6以上的版本才可以使用。

      利用Lua脚本淘汰用户,解决超卖问题

      redis 2.6版本以后,通过lua脚本解决争抢问题,实际上是redis 利用其单线程的特性,用任务队列的方式解决多任务并发问题。

  • Redis中两种持久化机制#

    两种持久化机制,RDB和AOF,简单来说就是存数据使用的哪种机制,默认是使用RDB(Redis DataBase)。

    RDB#

    Redis DataBase

    在指定的时间间隔内将内存中的 数据集快照 写入磁盘,即Snapshot快照,恢复时是将快照文件直接读到内存里。

    周期性地进行持久化的操作

    Redis会单独创建一个子进程(fork)来进行持久化。

    底层执行过程:先将数据写入到一个临时文件中,待持久化过程完成后(同步过程完成),再将这个临时文件内容覆盖到dump.rdb(持久化文件)

    • 整个过程中,主进程是不进行任何IO操作的,这就确保了极高的性能。
    • 为什么要先同步到文件临时区域而不直接同步到rdb中?若同步过程中发生异常情况中断,不会导致数据库中的数据发生损坏,待同步过程完成后,用临时文件替代这个持久化的文件,保证了数据的完整性和一致性。
    • 如果需要进行大规模数据的恢复,且对于数据恢复的完整性不是非常敏感,那RDB方式要比AOF方式更加的高效。

    RDB的缺点是最后一次持久化后的数据可能丢失

    Fork#
    • Redis进程执行fork操作创建子进程,RDB持久化过程由子进程负责,完成后自动结束。阻塞只发生在fork阶段,一般时间很短。

    • 作用是复制一个与当前进程一样的进程。新进程的所有数据(变量、环境变量、程序计数器等) 数值都和原进程一致,但是是一个全新的进程,并作为原进程的子进程

    • Linux程序中,fork()会产生一个和父进程完全相同的子进程,但子进程在此后多会exec系统调用,出于效率考虑,Linux中引入了写时复制技术

    • 一般情况父进程和子进程会共用同一段物理内存,只有进程空间的各段的内容要发生变化时,才会将父进程的内容复制一份给子进程

    配置#

    dump文件名字

    redis.conf中配置文件名称,默认为dump.rdb

    dump保存位置

    rdb文件的保存路径可以修改。默认为Redis启动时命令行所在的目录下。

    stop-writes-on-bgsave-error

    即当redis无法写入磁盘,关闭redis的写入操作。推荐yes

    rdbcompression

    持久化的文件是否进行压缩存储。

    rdbchecksum

    完整性的检查,即数据是否完整性、准确性。

    save

    表示写操作的次数。

    格式:save 秒 写操作次数

    优点#
    • 适合大规模的数据恢复
    • 对数据完整性和一致性要求不高更适合使用
    • 节省磁盘空间;
    • 恢复速度快。
    • redis主进程会fork()一个子进程来处理所有保存工作,主进程不需要进行任何磁盘IO操作
    缺点#
    • Fork的时候,内存中的数据被克隆了一份,大致 2 倍的膨胀性需要考虑
    • 虽然Redisfork时使用了写时拷贝技术,但是如果数据庞大时还是比较消耗性能;
    • 在备份周期在一定间隔时间做一次备份,所以如果还没到达指定备份时间间隔的时候,Redis意外down掉的话,就会丢失最后一次快照后的所有修改,数据发生了丢失。
    AOF#

    Append Only File

    以日志的形式来记录每个写操作(增量保存),将Redis执行过的所有写指令记录下来(读操作不记录)只许追加文件但不可以改写文件(可能是使用的redo和undo日志恢复?)。Redis启动之初会读取该文件重新构建数据,换言之,如果Redis重启就会根据日志文件的内容将写指令从前到后执行一次,以完成数据的恢复工作。

    一种使用追加方式记录数据的方法

    执行流程

    • 客户端的请求写命令会被append追加到AOF缓冲区内;

    • AOF缓冲区根据AOF持久化策略[always,everysec,no]将操作sync同步到磁盘的AOF文件中;

    • AOF文件大小超过重写策略或手动重写时,会对AOF文件Rewrite重写,压缩AOF文件容量;

    • Redis服务重启时,会重新load加载AOF文件中的写操作达到数据恢复的目的。

    AOFRDB同时开启时,系统默认读取AOF的数据(数据不会存在丢失)

    配置#

    redis.conf中配置开启aof,默认生成的aof配置文件为appendonly.aof,与rdb文件路径一致(启动路径)。

    AOF默认不开启(RDB默认开启)

    • 若AOF和RDB同时开启,系统默认读取AOF的数据(数据不会存在丢失)。

    文件名字

    AOF同步频率设置

    appendfsync always

    ​ 始终同步,每次Redis的写入都会立刻记入日志;

    ​ 性能较差但数据完整性比较好。

    appendfsync everysec

    ​ 每秒同步,每秒记入日志一次,如果宕机,本秒的数据可能丢失。

    appendfsync no

    Redis不主动进行同步,把同步时机交给操作系统。

    Rewrite压缩

    AOF文件的大小超过所设定的阈值时,Redis就会启动AOF文件的内容压缩,只保留可以恢复数据的最小指令集。可以使用命令bgrewriteaof

    优点#
    • 备份机制更稳健,丢失数据概率更低;
    • 可读的日志文本,通过操作AOF稳健,可以处理误操作。
    缺点#
    • 比起RDB占用更多的磁盘空间(不仅记录数据还要记录操作);
    • 恢复备份速度要慢;
    • 每次读写都同步的话,有一定的性能压力;
    • 存在个别Bug,造成不能恢复。
    总结#

    官方推荐两个都启用。

    如果对数据不敏感(允许数据有部分丢失),可以选单独用RDB

    不建议单独用AOF,因为可能会出现Bug

    如果只是做纯内存缓存,可以都不用。

    主从复制# 基本介绍#

    ​ 主机数据更新后根据配置和策略, 自动同步到备机的master/slaver机制,Master以写为主,Slaver以读为主,即主服务器承担写操作,复制的若干 从服务器 则承担读操作

    特点:

    1. 读写分离,性能扩展

    2. 容灾快速恢复

      • 某个从服务器发生故障,那么会快速切换到另一个从服务器中,不影响读操作的进行
    3. 一主多从

      • 只有一台主服务器,供其他从服务器进行复制
    搭建一主两从#
    1. 创建文件目录

    /opt/etc

    1. redis.conf复制到当前目录

    cp /etc/redis.conf /opt/etc/

    1. 创建 3 个redis.conf配置文件

    redis6379.conf redis6380.conf redis6381.conf

    # redis6379.conf include /opt/etc/redis.conf pidfile /var/run/redis_6379.pid port 6379 dbfilename dump6379.rdb # redis6380.conf include /opt/etc/redis.conf pidfile /var/run/redis_6380.pid port 6380 dbfilename dump6380.rdb # redis6381.conf include /opt/etc/redis.conf pidfile /var/run/redis_6381.pid port 6381 dbfilename dump6381.rdb

    1. 启动 3 台redis服务器

    1. 查看主机运行情况

    info replication

    1. 配从不配主

      在从机中进行设置,成为谁的从机

    slaveof <ip> <port> # 成为某个实例的从服务器

    1. 再次查看主机运行情况

    成功搭建。

    一主二从#

    特点:

    主机6379,从机63806381

    1. 假设从机6380挂掉。(从机挂掉)

      • 当6380重启后,6380不再是6379的从机,而是作为新的master;(从机重启后,不再是某个主机的从机,其自身就是一个主机)
      • 当再次把6380作为6379的从机加入后,从机才会把数据从头到尾复制。(从机重启后,需要再输入成为从机的指令)
    2. 假设主机6379挂掉。(主机挂掉)

      • 6380和6381仍然是6379的从机,不会做任何事;(从机不会改变)
      • 当6379重启后,既然是主服务器。(主机重启后,还是主机)
    主从复制原理#

    完整版:

    • slave启动成功连接到master后会发送一个sync命令(同步命令)。

    • master接到命令启动后台的存盘进程,对数据进行持久化操作,同时收集所有接收到的用于修改数据集命令,在后台进程执行完毕之后,master将传送整个数据文件(rdb)到slave,以完成一次完全同步。

    • 当主服务进行写操作后,和从服务器进行数据同步。

    • 全量复制:而slave服务在接收到数据库文件数据后,将其存盘并加载到内存中。

    • 增量复制master继续将新的所有收集到的修改命令依次传给slave,完成同步。

    • 只要是重新连接master,一次完全同步(全量复制)将被自动执行。

    全量复制:是从机主动去请求主机进行同步操作,是一开始连接的时候

    增量复制:主机进行一次写操作之后,就主动同步从机

    简洁版:

    薪火相传#

    ​ 上一个slave可以是下一个slavemaster从机是另一个从机的主机,并由这个担任主机的从机,进行数据同步),slave同样可以接收其他slave的连接和同步请求,那么该slave作为了链条中下一个的master,可以有效减轻master的写压力,去中心化降低风险。

    slaveof <ip> <port>

    • 特点与一主二从类似

    • 中途变更转向:会清除之前的数据,重新建立拷贝最新的。

    • 当某个担任主机的slave宕机,其挂在后面的slave都没法备份。

      • 即当主机挂掉,从机还是从机,但是无法继续写数据。
    反客为主#

    当一个master宕机后,后面的slave可以立刻升为master,其后面的slave不用做任何修改。(需要手动完成,如果不手动执行的话,那么从机没有任何动作,主机重新启动后,还依旧是主机)

    使用命令:将从机变为主机

    slaveof no one 哨兵模式# 基本介绍#

    反客为主的自动版,即能够后台监控主机是否故障,如果故障了根据投票数自动将从库转换为主库。

    1. 创建sentinel.conf文件

    /opt/etc/sentinel.conf

    1. 配置哨兵

    sentinel monitor mymaster 172.16.xx.xxx 6379 1 # mymaster:给监控对象起的服务器名称 # 1:至少有多少个哨兵同意迁移的数量

    1. 启动哨兵

    redis-sentinel /opt/etc/sentinel.conf

    主机挂掉,哨兵监控到之后,会按照选举规则,从 从机 中选举中产生新的主机,原来挂掉的主机会变成新主机的从机

    选举规则#

    选择条件依次为:

    • 根据优先级别,slave-priority/replica-priority优先选择优先级靠前的。(越小优先级越高)

    • 根据偏移量,优先选择偏移量大的。(偏移量是指获得原主机数据最全的)

    • 若前两个条件相同,那么选择runid最小的,优先选择最小的服务

      • 每个redis实例启动后,都会随机生成一个40位的runid
    复制延时#

    ​ 由于所有的写操作都是先在master上操作,然后同步更新到slave上,所以从master同步到slave从机有一定的延迟,当系统很繁忙的时候,延迟问题会更加严重,slave机器数量的增加也会使这个问题更加严重。

    集群# 基本介绍#

    ​ 容量不够,redis如何进行扩容?

    ​ 并发写操作,redis如何分摊?

    ​ 另外,主从模式、薪火相传模式,主机宕机,导致ip地址发生变化,应用程序中配置需要修改对应的主机地址、端口等信息。

    ​ 解决方法:

    • 代理主机(之前

    • 无中心化集群配置(redis3.0

      • 服务之间可以进行相互连通
      • 任何一个服务模块都可以作为集群的入口

    什么是集群:

    Redis集群实现了对Redis水平扩容,即启动NRedis节点,将整个数据库分布存储在这N个节点中,每个节点存储总数据的1/N

    Redis集群通过分区(partition)来提供一定程度的可用性(availability),即使集群中有一部分节点失效或者无法进行通讯, 集群也可以继续处理命令请求。

    搭建Redis集群#
    1. 创建配置文件

    # 以redis6379.conf为例 include /opt/etc/redis.conf pidfile /var/run/redis_6379.pid # 更改 port 6379 # 更改 dbfilename dump6379.rdb # 更改 cluster-enabled yes # 打开集群模式 cluster-config-file nodes-6379.conf # 设置节点配置文件名称,需要更改 cluster-node-timeout 15000 # 设置节点失联事件,超过该时间(ms),集群自动进行主从切换

    1. 启动

    1. 将 6 个节点合成一个集群

    # 组合之前请确保所有redis实例启动后,nodes-xxxx.conf文件都生成正常。

    # 进入redis安装目录 /opt/redis-6.2.6/src # 执行 redis-cli --cluster create --cluster-replicas 1 172.16.88.168:6379 172.16.88.168:6380 172.16.88.168:6381 172.16.88.168:6389 172.16.88.168:6390 172.16.88.168:6391

    1. 采用集群策略连接

    redis-cli -c -p PORT cluster nodes # 命令查看集群信息

    问题# redis cluster如何分配这六个节点?#

    一个集群至少要有三个主节点

    选项--cluster-replicas 1,表示希望为集群中的每个主节点创建一个从节点

    分配原则尽量保证每个主数据库运行在不同的IP地址每个从库和主库不在一个IP地址上。保证出故障的时候,能够有替换的,继续提供服务。

    什么是slots?#

    插槽

    一个Redis集群包含16384个插槽(hash slot), 数据库中的每个键都属于这16384个插槽的其中一个。

    集群使用公式CRC16(key) % 16384来计算键key属于哪个槽, 其中CRC16(key)语句用于计算键keyCRC16校验和 。

    集群中的每个节点负责处理一部分插槽,分担压力。 例如, 如果一个集群可以有主节点, 其中:

    • 节点A负责处理0号至5460号插槽。
    • 节点B负责处理5461号至10922号插槽。
    • 节点C负责处理10923号至16383号插槽。
    如何在集群中录入值?#

    redis-cli每次录入、查询键值,redis都会计算出该key应该送往的插槽,如果不是该客户端对应服务器的插槽,redis会报错,并告知应前往的redis实例地址和端口。

    redis-cli客户端提供了–c参数实现自动重定向。

    例如redis-cli -c –p 6379登入后,再录入、查询键值对可以自动重定向。

    如何查询集群中的值?#

    每个主机只能查询自己范围内部的插槽。

    cluster keyslot <key>:查询某个key的 **slot**。

    cluster countkeysinslot <slot>:查询某个slot是否有值。

    CLUSTER GETKEYSINSLOT <slot><count>:返回countslot槽中的键。

    集群故障恢复?#

    如果主节点下线?从节点能否自动升为主节点?注意:15秒超时(一旦超时,从机升为主机)

    • 6379挂掉后,6389成为新的主机。
    • 集群中,主机挂掉后,从机会自动变成主节点,担任主节点功能(有哨兵功能)

    主节点恢复后,主从关系会如何?主节点回来变成从机。(类似于一主多从中的哨兵模式)

    • 6379重启后,6379成为6389的从机。

    如果所有某一段插槽的 主、从节点 都宕掉,redis服务是否还能继续?

    • 如果某一段插槽的主从都挂掉,而cluster-require-full-coverage=yes,那么 ,整个集群都挂掉。
    • 如果某一段插槽的主从都挂掉,而cluster-require-full-coverage=no,那么,该插槽数据全都不能使用,也无法存储。

    redis.conf中的参数cluster-require-full-coverage

    优点#
    • 实现redis扩容;
    • 使用插槽,分摊压力;
    • 无中心配置,相对简单。
    缺点#
    • 多键操作是不被支持的,但可以用组,不方便;
    • 多键的Redis事务是不被支持的。lua脚本不被支持;
    • 由于集群方案出现较晚,很多公司已经采用了其他的集群方案,而代理或者客户端分片的方案想要迁移至redis cluster,需要整体迁移而不是逐步过渡,复杂度较大。
    Jedis (Java操作Redis)# 基本操作#
    1. 依赖

    <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>3.2.0</version> </dependency>

    1. 连接Redis

    public class JedisDemo { public static void main(String[] args) { // 创建Jedis对象 Jedis jedis = new Jedis("192.168.xx.xxx", 6379); // 测试,能够连接上的话,ping通,会返回一个值 String ping = jedis.ping(); System.out.println("连接成功:" + ping); jedis.close(); } }

    ​ 注意:使用Jedis进行操作,需要对Redis的网络相关配置文件进行修改:

    • bind:默认是bind绑定本机,不进行修改的情况下,只能接受本机的访问请求,不写的情况下,能够无限制接受任何ip地址的访问。
    • protected-mode:将本机访问保护模式设置为 no

    如果出现connet timed out错误,检查两块,第一是否配置文件进行了修改,第二防火墙是否关闭。

    Key

    jedis.set("k1", "v1"); jedis.set("k2", "v2"); jedis.set("k3", "v3"); Set<String> keys = jedis.keys("*"); // 返回所有key System.out.println(keys.size()); for (String key : keys) { System.out.println(key); } System.out.println(jedis.exists("k1")); // 是否存在 System.out.println(jedis.ttl("k1")); // 过期时间 System.out.println(jedis.get("k1")); // 获取key对应value值

    String

    jedis.mset("str1","v1","str2","v2","str3","v3"); System.out.println(jedis.mget("str1","str2","str3"));

    List

    // 可以使用lpush或者rpush添加k-v List<String> list = jedis.lrange("mylist",0,-1); for (String element : list) { System.out.println(element); }

    Set

    jedis.sadd("orders", "order01"); jedis.sadd("orders", "order02"); jedis.sadd("orders", "order03"); jedis.sadd("orders", "order04"); Set<String> smembers = jedis.smembers("orders"); for (String order : smembers) { System.out.println(order); } jedis.srem("orders", "order02");

    Hash

    jedis.hset("hash1","userName","lisi"); System.out.println(jedis.hget("hash1","userName")); Map<String,String> map = new HashMap<String,String>(); map.put("telphone","13810169999"); map.put("address","atguigu"); map.put("email","abc@163.com"); jedis.hmset("hash2",map); List<String> result = jedis.hmget("hash2", "telphone","email"); for (String element : result) { System.out.println(element); }

    zset

    jedis.zadd("zset01", 100d, "z3"); jedis.zadd("zset01", 90d, "l4"); jedis.zadd("zset01", 80d, "w5"); jedis.zadd("zset01", 70d, "z6"); Set<String> zrange = jedis.zrange("zset01", 0, -1); for (String e : zrange) { System.out.println(e); }

    模拟验证码发送#

    代码实现:

    public class PhoneCode { public static void main(String[] args) { // 模拟验证码发送 // verifyCode("123456789"); getRedisCode("123456789", "123456"); } // 1. 生成6位数字验证码 public static String getCode(){ Random random = new Random(); String code = ""; for (int i = 0; i < 6; i++) { int nextInt = random.nextInt(10); code += nextInt; } return code; } // 2. 每个手机每天只能发送三次验证码请求,验证码放到redis中,并设置过期时间 public static void verifyCode(String phone) { // 连接redis Jedis jedis = new Jedis("127.0.0.1", 6379); // 拼接key // 手机发送次数的key String countKey = "VerifyCode-" + phone + ":count"; // 验证码的key String phoneKey = "VerifyCode-" + phone + ":code"; // 每个手机每天只能发送三次验证码 String count = jedis.get(countKey); if(count == null) { // 之前还没发送过,这次是第一次发送,设置发送次数为1 jedis.setex(countKey, 24*60*60, "1"); // 设置过期时间为一天 } else if (Integer.parseInt(count) < 3) { // 发送次数加1 jedis.incr(countKey); } else if (Integer.parseInt(count) >= 3) { // 发送已经有三次了,不能再发送了 System.out.println("今天发送验证码的次数已经达到三次,无法再发送!"); jedis.close(); // 关闭连接 return; // 不执行下面的代码 } // 验证码放到redis中 String code1 = getCode(); jedis.setex(phoneKey, 120, code1); // 设置验证码的过期时间为两分钟,会进行覆盖 jedis.close(); } // 3. 验证码校验 public static void getRedisCode(String phone, String code){ // 连接redis Jedis jedis = new Jedis("127.0.0.1", 6379); // 拼接key // 验证码的key String phoneKey = "VerifyCode-" + phone + ":code"; // 判断 String codePhone = jedis.get(phoneKey); if(codePhone.equals(code)) { System.out.println("成功!"); } else { System.out.println("失败!"); } jedis.close(); } } Jedis主从复制#

    private static JedisSentinelPool jedisSentinelPool=null; public static Jedis getJedisFromSentinel(){ if(jedisSentinelPool==null){ Set<String> sentinelSet=new HashSet<>(); sentinelSet.add("172.16.88.168:26379"); // 端口为sentinal JedisPoolConfig jedisPoolConfig =new JedisPoolConfig(); jedisPoolConfig.setMaxTotal(10); // 最大可用连接数 jedisPoolConfig.setMaxIdle(5); // 最大闲置连接数 jedisPoolConfig.setMinIdle(5); // 最小闲置连接数 jedisPoolConfig.setBlockWhenExhausted(true); // 连接耗尽是否等待 jedisPoolConfig.setMaxWaitMillis(2000); // 等待时间 jedisPoolConfig.setTestOnBorrow(true); // 取连接的时候进行测试 jedisSentinelPool=new JedisSentinelPool("mymaster",sentinelSet,jedisPoolConfig); // 服务主机名 return jedisSentinelPool.getResource(); } else { return jedisSentinelPool.getResource(); } } 集群的Jedis开发#

    即使连接的不是主机,集群会自动切换主机存储。主机写,从机读。

    无中心化主从集群。无论从哪台主机写的数据,其他主机上都能读到数据。

    public class JedisClusterTest { public static void main(String[] args) { // 创建对象 Set<HostAndPort> set = new HashSet<HostAndPort>(); set.add(new HostAndPort("172.16.xx.xxx",6379)); // 任何一个端口 JedisCluster jedisCluster = new JedisCluster(set); // 操作 jedisCluster.set("k1", "v1"); System.out.println(jedisCluster.get("k1")); jedisCluster.close(); } } SpringBoot整合Redis#

    1. 依赖

    <!-- redis --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <!-- 连接池:spring2.X集成redis所需common-pool2--> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> <version>2.6.0</version> </dependency>

    1. 配置文件配置Redis

    #Redis服务器地址 spring.redis.host= ip地址 #Redis服务器连接端口 spring.redis.port=6379 #Redis数据库索引(默认为0,一共有16个) spring.redis.database= 0 #连接超时时间(毫秒) spring.redis.timeout=1800000 #连接池最大连接数(使用负值表示没有限制) spring.redis.lettuce.pool.max-active=20 #最大阻塞等待时间(负数表示没限制) spring.redis.lettuce.pool.max-wait=-1 #连接池中的最大空闲连接 spring.redis.lettuce.pool.max-idle=5 #连接池中的最小空闲连接 spring.redis.lettuce.pool.min-idle=0

    1. Redis配置类(需要继承CachingConfigurerSupport

    @EnableCaching @Configuration public class RedisConfig extends CachingConfigurerSupport { @Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) { RedisTemplate<String, Object> template = new RedisTemplate<>(); RedisSerializer<String> redisSerializer = new StringRedisSerializer(); Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(om); template.setConnectionFactory(factory); // key序列化方式 template.setKeySerializer(redisSerializer); // value序列化 template.setValueSerializer(jackson2JsonRedisSerializer); // value hashmap序列化 template.setHashValueSerializer(jackson2JsonRedisSerializer); return template; } @Bean public CacheManager cacheManager(RedisConnectionFactory factory) { RedisSerializer<String> redisSerializer = new StringRedisSerializer(); Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); // 解决查询缓存转换异常的问题 ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(om); // 配置序列化(解决乱码的问题),过期时间600秒 RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig() .entryTtl(Duration.ofSeconds(600)) .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(jackson2JsonRedisSerializer)) .disableCachingNullValues(); RedisCacheManager cacheManager = RedisCacheManager.builder(factory) .cacheDefaults(config) .build(); return cacheManager; } }

    1. 测试

    应用问题解决(面试重点)# 缓存穿透#

    现象#

    key对应的数据在数据源并不存在,每次针对此key的请求从缓存获取不到,请求都会压到数据源,从而可能压垮数据源。

    ​ redis查询不到数据库,出现了很多非正常url访问。黑客攻击就是通过查询一个不存在的值,缓存里面没有,那么就会查数据库,大量类似的请求发生后,导致数据库崩溃。若黑客利用此漏洞进行攻击可能压垮数据库。

    造成的条件:

    1. 应用服务器压力变大,访问请求增光
    2. redis命中率下降(重点)
    3. 导致一直访问查询数据库

    服务器压力变大,请求太多,导致redis缓存命中率开始下降,对数据库的访问越来越多,数据库最终承受不住压力,崩溃了。

    如何解决#
    • 对空值缓存

      如果一个查询返回的数据为空(不管是数据是否不存在),仍然把这个空结果(null)进行缓存,设置空结果的过期时间会很短,最长不超过五分钟。

    • 设置可访问的名单(白名单):

      使用bitmaps类型定义一个可以访问的名单,名单id作为bitmaps的偏移量,每次访问和bitmap里面的id进行比较,如果访问id不在bitmaps里面,进行拦截,则不允许访问。

    • 采用布隆过滤器

      布隆过滤器(Bloom Filter)是1970年由布隆提出的。它实际上是一个很长的二进制向量(位图)和一系列随机映射函数(哈希函数)。(跟bitmaps类似,不过效率更高)

      布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都远远超过一般的算法缺点是有一定的误识别率和删除困难,命中率不一定高

      将所有可能存在的数据哈希到一个足够大的bitmaps中,一个一定不存在的数据会被这个bitmaps拦截掉,从而避免了对底层存储系统的查询压力。

    • 进行实时监控

      当发现Redis的命中率开始急速降低,需要排查访问对象和访问的数据,和运维人员配合,可以设置黑名单限制服务

    缓存击穿#

    注意与缓存穿透的区别:

    • 缓存穿透
      • redis命中率下降,导致数据库访问量激增
    • 缓存击穿
      • redis正常访问,但某个热点key突然失效,导致瞬间数据库的访问量激增
    现象#

    key对应的数据存在,但在redis中过期,此时若有大量并发请求过来,这些请求发现缓存过期一般都会从后端DB加载数据并回设到缓存,这个时候大并发的请求可能会瞬间把后端DB压垮。

    1. 数据库访问压力瞬间增大
    2. redis中没有出现大量key过期,redis正常运行(与缓存穿透的区别)
    3. 某个经常访问的key,即十分热点的key,不停地被大量访问,当这个key过期的瞬间,持续的高并发就击穿了缓存,大量请求数据库,导致数据库奔溃
    如何解决#
    • 预先设置热门数据

      redis高峰访问之前,把一些热门数据提前存入到redis里面,加大这些热门数据key的时长

    • 实时调整

      现场监控哪些数据热门,实时调整key的过期时长。

    • 使用锁

      Redis 6有哪些新特性或优化?

    缓存雪崩#

    现象#

    key对应的数据存在,但在redis中过期,此时若有大量并发请求过来,这些请求发现缓存过期后,一般都会从后端DB加载数据并回设到缓存,这个时候大并发的请求可能会瞬间把后端DB压垮。

    缓存雪崩与缓存击穿的区别在于这里针对很多key缓存,前者则是某一个key

    1. 数据库压力变大
    2. 极少的时间段,查询大量key的集中过期情况(大量key集中过期,而缓存击穿是热点key过期)
    如何解决#
    • 构建多级缓存架构

      nginx缓存 +redis缓存 + 其他缓存(ehcache等)

    • 使用锁或队列:

      用加锁或者队列的方式保证来保证不会有大量的线程对数据库一次性进行读写,从而避免失效时大量的并发请求落到底层存储系统上。不适用高并发情况

    • 设置过期标志更新缓存:

      记录缓存数据是否过期(设置提前量),快过期的时候,提前进行一个缓存。如果过期会触发通知另外的线程在后台去更新实际key的缓存。

    • 将缓存失效时间分散开:

      比如我们可以在原有的失效时间基础上增加一个随机值,比如 1~5 分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件

    分布式锁# 基本介绍#

    ​ 随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的Java API并不能提供分布式锁的能力。为了解决这个问题就需要一种跨JVM的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题。

    分布式锁主流的实现方案:

    • 基于数据库实现分布式锁
    • 基于缓存(Redis等)
    • 基于Zookeeper

    每一种分布式锁解决方案都有各自的优缺点:

    • 性能:redis最高
    • 可靠性:zookeeper最高
    设置锁以及过期时间#
    • 设置锁的命令

    SETNX KEY VALUE # 设置锁 del key # 删除锁

    • 给锁设置过期时间

    expire users 30 # 给users上锁30s

    • 优化:上锁的同时设置过期时间

    set key value nx ex time # nx 上锁;ex 设置过期时间

    • Java实现

    @GetMapping("testLock") public void testLock(){ //1获取锁,setne ,顺便设置过期时间 Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "111",3,TimeUnit.SECONDS); // key, value, 过期时间,时间单位 //2获取锁成功、查询num的值 if(lock){ Object value = redisTemplate.opsForValue().get("num"); //2.1判断num为空return if(StringUtils.isEmpty(value)){ return; } //2.2有值就转成成int int num = Integer.parseInt(value+""); //2.3把redis的num加1 redisTemplate.opsForValue().set("num", ++num); //2.4释放锁,del redisTemplate.delete("lock"); }else{ //3获取锁失败、每隔0.1秒再获取 try { Thread.sleep(100); // 休眠,等一会 testLock(); // 再去尝试获取锁 } catch (InterruptedException e) { e.printStackTrace(); } } } 分布式锁产生的问题# 使用UUID防止误删锁#

    现象

    ​ a先上锁后,在执行操作的过程中,服务器卡顿,而10秒过期后,b抢到锁进行具体操作,然而此时a的服务器恢复正常,a继续执行操作并结束,此时有一个释放锁的操作,那么此时释放的锁是b的锁,这就是导致误删除锁的现象发生。

    解决方案

    Java实现修改版:

    @GetMapping("testLock") public void testLock(){ // 设置UUID String uuid = UUID.randomUUID().toString(); ..... if(lock){ ... // 判断UUID值是否一样 String lockUuid = (String)redisTemplate.opsForValue().get("lock"); if(uuid.equals(lockUuid)){ // UUID一样时,才释放锁 //2.4释放锁,del redisTemplate.delete("lock"); } }else{ ... } } Lua保证删除原子性#

    问题:删除操作缺乏原子性,即uuid的比较操作和删除操作不是原子操作

    Java实现修改:

    @GetMapping("testLockLua") public void testLockLua() { //1 声明一个uuid ,将做为一个value 放入我们的key所对应的值中 String uuid = UUID.randomUUID().toString(); //2 定义一个锁:lua 脚本可以使用同一把锁,来实现删除! String skuId = "25"; // 访问skuId 为25号的商品 100008348542 String locKey = "lock:" + skuId; // 锁住的是每个商品的数据 // 3 获取锁 Boolean lock = redisTemplate.opsForValue().setIfAbsent(locKey, uuid, 3, TimeUnit.SECONDS); // 第一种: lock 与过期时间中间不写任何的代码。 // redisTemplate.expire("lock",10, TimeUnit.SECONDS);//设置过期时间 // 如果true if (lock) { // 执行的业务逻辑开始 // 获取缓存中的num 数据 Object value = redisTemplate.opsForValue().get("num"); // 如果是空直接返回 if (StringUtils.isEmpty(value)) { return; } // 不是空 如果说在这出现了异常! 那么delete 就删除失败! 也就是说锁永远存在! int num = Integer.parseInt(value + ""); // 使num 每次+1 放入缓存 redisTemplate.opsForValue().set("num", String.valueOf(++num)); /*使用lua脚本来锁*/ // 定义lua 脚本 String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; // 使用redis执行lua执行 DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(); redisScript.setScriptText(script); // 设置一下返回值类型 为Long // 因为删除判断的时候,返回的0,给其封装为数据类型。如果不封装那么默认返回String 类型, // 那么返回字符串与0 会有发生错误。 redisScript.setResultType(Long.class); // 第一个要是script 脚本 ,第二个需要判断的key,第三个就是key所对应的值。 redisTemplate.execute(redisScript, Arrays.asList(locKey), uuid); } else { ..... } } 总结#

    为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件

    • 互斥性。在任意时刻,只有一个客户端能持有锁

    • 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。

    • 解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。

    • 加锁和解锁必须具有原子性