57

PHP-NSQ —— 基于 C 的 NSQ 的 PHP 客户端

 5 years ago
source link: https://www.oschina.net/p/php-nsq?amp%3Butm_medium=referral
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

php-nsq

php-nsq 是nsq的php客户端,采用c扩展编写,性能和稳定性。

安装 :

请提前安装libevent

Dependencies: libevent  (apt-get install libevent-dev ,yum install libevent-devel)

1. sudo phpize
2. ./configure 
3. make  
4. make install  

add in your php.ini:

extension = nsq.so;

pub例子:

$nsqdAddr = array(
    "127.0.0.1:4150",
    "127.0.0.1:4154"
);

$nsq = new Nsq();
$isTrue = $nsq->connectNsqd($nsqdAddr);

for($i = 0; $i < 10000; $i++){
    $nsq->publish("test", "nihao");
}
$nsq->closeNsqdConnection();

// Deferred publish 
//function : deferredPublish(string topic,string message, int millisecond); 
//millisecond default : [0 < millisecond < 3600000]

$deferred = new Nsq();
$isTrue = $deferred->connectNsqd($nsqdAddr);
for($i = 0; $i < 20; $i++){
    $deferred->deferredPublish("test", "message daly", 3000); 
}
$deferred->closeNsqdConnection();

sub例子:

<?php 

//sub

$nsq_lookupd = new NsqLookupd("127.0.0.1:4161"); //the nsqlookupd http addr
$nsq = new Nsq();
$config = array(
    "topic" => "test",
    "channel" => "struggle",
    "rdy" => 2,                //optional , default 1
    "connect_num" => 1,        //optional , default 1   
    "retry_delay_time" => 5000,  //optional, default 0 , if run callback failed, after 5000 msec, message will be retried
    "auto_finish" => true, //default true
);

$nsq->subscribe($nsq_lookupd, $config, function($msg,$bev){ 

    echo $msg->payload;
    echo $msg->attempts;
    echo $msg->message_id;
    echo $msg->timestamp;


});

Nsq 类方法:

  • connectNsqd($nsqdAddrArr)

    pub的时候连接nsq,你也可以利用此函数做健康检查

  • closeNsqdConnection()

    关闭nsq的连接

  • publish($topic,$msg)

    消息发送

  • deferredPublish($topic,$msg,$msec)

    延迟消息发送

  • subscribe($nsq_lookupd,$config,$callback)

    消息订阅

Message 类方法与属性:

  • timestamp

    消息时间戳

  • attempts

    消息的重试次数,(从1开始)

  • message_id

    消息id 

  • payload

    消息内容

  • finish($bev,$msg->message_id)

    主动的 ack消息方法

  • touch($bev,$msg->message_id)

    如果你消息执行太长,可以利用次函数告知nsq 你还活着,一般用于执行频率比较规律的场景。

Tips :

1.如果callback内需要外部变量,可以采用以下use的写法:

$nsq->subscribe($nsq_lookupd, $config, function($msg,$bev) use ($you_variable){ 

    echo $msg->payload;
    echo $msg->attempts;
    echo $msg->message_id;
    echo $msg->timestamp;


});

 2.消息重试,只要抛异常就可以,切记不要陷入死循环,超过自己觉得可以的次数 要return:

subscribe($nsq_lookupd, $config, function($msg){ 
    try{
        echo $msg->payload . " " . "attempts:".$msg->attempts."\n";
        //do something
    }catch(Exception $e){

        if($msg->attempts < 3){
            //the message will be retried after you configure retry_delay_time 
            throw new Exception(""); 
        }else{
            echo $e->getMessage();
            return;
        }
    }

});

3.如果你想增加 客户端的心跳时间与消息的超时时间 :

 第一步 在nsqd启动时要加入相关参数,这个参数是最大的限制,比如--max-heartbeat-interval=1m30s 心跳时间最大不能超过1分30秒:

      nsqd --lookupd-tcp-address=127.0.0.1:4160 --max-heartbeat-interval=1m30s --msg-timeout=10m30s

第二步  因为第一步是指定最大时间,所以还需要第二步在客户端指定所需要的值 具体请看 example目录中的identify开头的文件例子。

4.如果你想增强消费能力,可以加大rdy参数

5.你可以用supervisor管理,但是因为是多进程消费,你需要在supervisor job的配置文件 添加:

    stopasgroup=true
    killasgroup=true

Changes

  • 3.0

    • 修复因libevent 超过4096消息被截断问题

    • 增加identify指令功能,可以增加客户端心跳时间 与 消息超时时间

  • 2.4.0

    • 修复 pub bug

    • 修复 sub coredump

    • 修覆盖 touch bug

    • 增加等待,当刚初始化的topic没消息时

  • 2.3.1

    • pub支持域名

    • 修复 pub coredump


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK