首页 > 编程知识 正文

分布式锁redis实现方式,基于redis集群实现分布式锁

时间:2023-05-06 14:15:16 阅读:33408 作者:4141

一、Redis 锁错误使用之一

$redis=newredis('127.0.0.1 ',6379 );

$cacheKey='query_cache ';

$result=$redis-get($cachekey );

如果启用了if($result )//缓存,则返回原样。

返回$ result;

} else { //缓存无效后重新获取并保存到Redis中。

$mysqlResult=[];

$redis-set($cachekey,JSON_encode ) $MySQLresult ),3600 );

返回$ MySQL result;

}

看代码也找不到问题。 通常,如果服务资源压力非常小,则此代码没有任何问题。 而且,真的能够提高服务器的吞吐量性能。

如果这个位置的代码出现单点压力呢? 例如,此功能是统计结果,查询数据库需要5s。 而且由于该功能比较常用,单位时间达到1000次/秒。

在这种情况下,会发生同时贯通的问题。

1000个请求同时到达此程序的位置,然后去读取缓存是否存在。 如果此时不存在缓存。 这1000个请求将得到不存在的结果。 运行到数据库中获取缓存结果。 有时也会将结果重写为Redis。

结果,这一瞬间的单点压力渗透到数据库中,数据库的压力瞬间达到峰值。 如果数据库性能无法承受这么大的负担,数据库服务器上的CPU将直接填满。 响应前端的数据将进入停顿状态。

所以,这个代码没有正确锁定。

二、Redis 锁错误使用之二

第一,发现了问题。 于是,有人想把它优化。 于是就有了以下代码:

$redis=newredis('127.0.0.1 ',6379 );

$lockKey='query_cache_lock '; //锁定专用密钥。

$cacheKey='query_cache '; //用于保存查询结果的密钥。

$result=$redis-get($cachekey );

如果启用了if($result )//缓存,则返回原样。

返回$ result;

} else { //缓存无效后重新获取并保存到Redis中。

if($redis-setNX ) $lockkey )===false ) {

“服务器爆炸了”。 请稍后再试”);

} else {

$mysqlResult=[];

$redis-set($cachekey,JSON_encode ) $MySQLresult ),3600 );

$redis-delete($lockkey; 锁不见了就开锁。 删除就是解除锁定。

返回$ MySQL result;

}

}

此代码完全避免了第一点同时贯通的问题。 但是,相对于第一个,代码也增加了几行。 但是,性能依然很强。

尽管如此,这个代码仍然有三个问题。

1 )并发越大,第一个锁定的请求越正常响应,后续请求中显示“服务器爆炸了。 请稍后再试”的异常信息。

2 )无法为后续请求的解锁添加等待时间。

3 )如果程序在代码运行到$redis-delete ) $lockkey )之前发生异常。 锁定不能正常释放。 后续锁定也无法正常获取。

1 )关于第一点,这是用户体验极差。

2 )关于第一点,是解决第一点的方案。

关于第三点,这是我们必须解决的问题。 否则,分布式锁定将无法正常工作。

三、正确的分布式锁

常规分布式锁定必须满足以下要求:

1 )可以解决同时多发的资源争夺。 这是最中心的需求。

2 )锁定可以正常追加解除。 不发生死锁。

3 )摇号可以等待。 否则,无法最大限度地保证用户的体验。

针对以上三点,得出 Redis 分布式锁示例

class RedisMutexLock

{

//*

缓存Redis连接。

*

* @return void

*/

公共静态功能获取就绪(

{

//此行

代码请根据自己项目替换为自己的获取 Redis 连接。

        return YCache::getRedisClient();

    }

    /**

     * 获得锁,如果锁被占用,阻塞,直到获得锁或者超时。

     * -- 1、如果 $timeout 参数为 0,则立即返回锁。

     * -- 2、建议 timeout 设置为 0,避免 redis 因为阻塞导致性能下降。请根据实际需求进行设置。

     *

     * @param  string  $key         缓存KEY。

     * @param  int     $timeout     取锁超时时间。单位(秒)。等于0,如果当前锁被占用,则立即返回失败。如果大于0,则反复尝试获取锁直到达到该超时时间。

     * @param  int     $lockSecond  锁定时间。单位(秒)。

     * @param  int     $sleep       取锁间隔时间。单位(微秒)。当锁为占用状态时。每隔多久尝试去取锁。默认 0.1 秒一次取锁。

     * @return bool 成功:true、失败:false

     */

    public static function lock($key, $timeout = 0, $lockSecond = 20, $sleep = 100000)

    {

        if (strlen($key) === 0) {

            // 项目抛异常方法

            YCore::exception(500, '缓存KEY没有设置');

        }

        $start = self::getMicroTime();

        $redis = self::getRedis();

        do {

            // [1] 锁的 KEY 不存在时设置其值并把过期时间设置为指定的时间。锁的值并不重要。重要的是利用 Redis 的特性。

            $acquired = $redis->set("Lock:{$key}", 1, ['NX', 'EX' => $lockSecond]);

            if ($acquired) {

                break;

            }

            if ($timeout === 0) {

                break;

            }

            usleep($sleep);

        } while (!is_numeric($timeout) || (self::getMicroTime()) < ($start + ($timeout * 1000000)));

        return $acquired ? true : false;

    }

    /**

     * 释放锁

     *

     * @param  mixed  $key  被加锁的KEY。

     * @return void

     */

    public static function release($key)

    {

        if (strlen($key) === 0) {

            // 项目抛异常方法

            YCore::exception(500, '缓存KEY没有设置');

        }

        $redis = self::getRedis();

        $redis->del("Lock:{$key}");

    }

    /**

     * 获取当前微秒。

     *

     * @return bigint

     */

    protected static function getMicroTime()

    {

        return bcmul(microtime(true), 1000000);

    }

}

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。