24

MixPHP V2.1 生态: Swoole 协程 Redis 订阅器

 4 years ago
source link: https://segmentfault.com/a/1190000021551153
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

OpenMix 全家桶 中有一个 Mix Redis Subscribe 的项目,这是一个不依赖 phpredis 扩展,直接解析 Redis 协议专用于订阅处理的一个库,任何 Swoole 框架都可使用,可广泛使用于 WebSocket 开发中,在 MixPHP 骨架中也默认包含了这个库。

为何开发

MixPHP V2.1 完成开发后,我试图开发一个基于订阅机制的 WebScoket 服务,该服务需要可动态切换订阅频道,但 phpredis 的订阅方法无法实现以下功能:

$redis = new \Redis();
$res   = $redis->pconnect('127.0.0.1', 6379, 0);
$redis->subscribe(['test'], function ($instance, $channelName, $message) {
    echo $channelName, "==>", $message, PHP_EOL;
});
  • 无法得知订阅成功

以上代码中,当执行到 subscribe 会阻塞执行,只有在有消息过来时才会执行到匿名函数中,并不会在订阅成功的当时执行该闭包,但是 redis-cli 执行订阅时,redis-server 是有回复订阅成功消息的,因此是 phpredis 的设计问题。

  • 无法动态 subscribe 增加频道

由于 subscribe 阻塞了执行,代码只能在有消息触发回调时才能在回调中执行,因此动态增加频道也是无法操作的。

  • unsubscribe 只可在回调中执行

因为上面那种阻塞回调的设计,如果需要取消一个频道,只能在有消息过来时方可操作,但是实际需求通常是需要在任意时刻都可取消频道。

  • 无法在其他协程中 close 连接

phpredis 当试图在匿名函数以外的其他协程中 close 连接会抛出异常 PHP Fatal error: Uncaught RedisException: read error on connection ,这让关闭一个订阅中的 redis 连接都无法优雅的实现。

造轮子

当我得知 redis 协议是简单的文本协议时,我决定抛弃 phpredis 自己造一个好用的订阅库,新轮子具有以下优点:

  • 不依赖 phpredis 扩展
  • 平滑修改:可随时增加、取消订阅通道,实现无缝切换通道的需求。
  • 跨协程安全关闭:可在任意时刻关闭订阅。
  • 通道获取消息:该库封装风格参考 golang 语言 go-redis 库封装,通过 channel 获取订阅的消息。
$sub = new \Mix\Redis\Subscribe\Subscriber([ // 连接失败将抛出异常
    'host'     => '192.168.198.1',
    'port'     => 6379,
    'timeout'  => 5,
    'password' => '',
]);
$sub->subscribe('foo', 'bar'); // 订阅失败将抛出异常

$chan = $sub->channel();
while (true) {
    $data = $chan->pop();
    if (empty($data)) { // 手动close与redis异常断开都会导致返回false
        if (!$sub->closed) {
            // redis异常断开处理
            var_dump('Redis connection is disconnected abnormally');
        }
        break;
    }
    var_dump($data);
}

接收到订阅消息:

object(Mix\Redis\Subscribe\Message)#8 (2) {
  ["channel"]=>
  string(2) "foo"
  ["payload"]=>
  string(4) "test"
}

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK