一、Redis 锁错误使用之一
$redis=newredis('127.0.0.1 ',6379 );
$cacheKey='query_cache ';
$result=$redis-get($cachekey );
如果启用了if($result )//缓存,则返回原样。
返回$ result;
} else { //缓存无效后重新获取并保存到Redis中。
$mysqlResult=[];
$redis-set($cachekey,JSON_encode ) $MySQLresult ),3600 );
返回$ MySQL result;
}
看代码也找不到问题。 通常,如果服务资源压力非常小,则此代码没有任何问题。 而且,真的能够提高服务器的吞吐量性能。
如果这个位置的代码出现单点压力呢? 例如,此功能是统计结果,查询数据库需要5s。 而且由于该功能比较常用,单位时间达到1000次/秒。
在这种情况下,会发生同时贯通的问题。
1000个请求同时到达此程序的位置,然后去读取缓存是否存在。 如果此时不存在缓存。 这1000个请求将得到不存在的结果。 运行到数据库中获取缓存结果。 有时也会将结果重写为Redis。
结果,这一瞬间的单点压力渗透到数据库中,数据库的压力瞬间达到峰值。 如果数据库性能无法承受这么大的负担,数据库服务器上的CPU将直接填满。 响应前端的数据将进入停顿状态。
所以,这个代码没有正确锁定。
二、Redis 锁错误使用之二
第一,发现了问题。 于是,有人想把它优化。 于是就有了以下代码:
$redis=newredis('127.0.0.1 ',6379 );
$lockKey='query_cache_lock '; //锁定专用密钥。
$cacheKey='query_cache '; //用于保存查询结果的密钥。
$result=$redis-get($cachekey );
如果启用了if($result )//缓存,则返回原样。
返回$ result;
} else { //缓存无效后重新获取并保存到Redis中。
if($redis-setNX ) $lockkey )===false ) {
“服务器爆炸了”。 请稍后再试”);
} else {
$mysqlResult=[];
$redis-set($cachekey,JSON_encode ) $MySQLresult ),3600 );
$redis-delete($lockkey; 锁不见了就开锁。 删除就是解除锁定。
返回$ MySQL result;
}
}
此代码完全避免了第一点同时贯通的问题。 但是,相对于第一个,代码也增加了几行。 但是,性能依然很强。
尽管如此,这个代码仍然有三个问题。
1 )并发越大,第一个锁定的请求越正常响应,后续请求中显示“服务器爆炸了。 请稍后再试”的异常信息。
2 )无法为后续请求的解锁添加等待时间。
3 )如果程序在代码运行到$redis-delete ) $lockkey )之前发生异常。 锁定不能正常释放。 后续锁定也无法正常获取。
1 )关于第一点,这是用户体验极差。
2 )关于第一点,是解决第一点的方案。
关于第三点,这是我们必须解决的问题。 否则,分布式锁定将无法正常工作。
三、正确的分布式锁
常规分布式锁定必须满足以下要求:
1 )可以解决同时多发的资源争夺。 这是最中心的需求。
2 )锁定可以正常追加解除。 不发生死锁。
3 )摇号可以等待。 否则,无法最大限度地保证用户的体验。
针对以上三点,得出 Redis 分布式锁示例
class RedisMutexLock
{
//*
缓存Redis连接。
*
* @return void
*/
公共静态功能获取就绪(
{
//此行
代码请根据自己项目替换为自己的获取 Redis 连接。return YCache::getRedisClient();
}
/**
* 获得锁,如果锁被占用,阻塞,直到获得锁或者超时。
* -- 1、如果 $timeout 参数为 0,则立即返回锁。
* -- 2、建议 timeout 设置为 0,避免 redis 因为阻塞导致性能下降。请根据实际需求进行设置。
*
* @param string $key 缓存KEY。
* @param int $timeout 取锁超时时间。单位(秒)。等于0,如果当前锁被占用,则立即返回失败。如果大于0,则反复尝试获取锁直到达到该超时时间。
* @param int $lockSecond 锁定时间。单位(秒)。
* @param int $sleep 取锁间隔时间。单位(微秒)。当锁为占用状态时。每隔多久尝试去取锁。默认 0.1 秒一次取锁。
* @return bool 成功:true、失败:false
*/
public static function lock($key, $timeout = 0, $lockSecond = 20, $sleep = 100000)
{
if (strlen($key) === 0) {
// 项目抛异常方法
YCore::exception(500, '缓存KEY没有设置');
}
$start = self::getMicroTime();
$redis = self::getRedis();
do {
// [1] 锁的 KEY 不存在时设置其值并把过期时间设置为指定的时间。锁的值并不重要。重要的是利用 Redis 的特性。
$acquired = $redis->set("Lock:{$key}", 1, ['NX', 'EX' => $lockSecond]);
if ($acquired) {
break;
}
if ($timeout === 0) {
break;
}
usleep($sleep);
} while (!is_numeric($timeout) || (self::getMicroTime()) < ($start + ($timeout * 1000000)));
return $acquired ? true : false;
}
/**
* 释放锁
*
* @param mixed $key 被加锁的KEY。
* @return void
*/
public static function release($key)
{
if (strlen($key) === 0) {
// 项目抛异常方法
YCore::exception(500, '缓存KEY没有设置');
}
$redis = self::getRedis();
$redis->del("Lock:{$key}");
}
/**
* 获取当前微秒。
*
* @return bigint
*/
protected static function getMicroTime()
{
return bcmul(microtime(true), 1000000);
}
}