kafka 消息队列 php-rdkafka扩展示例
php版本为:7.2 # cd /var/www/html/ 生产者,producer.php $rk = new RdKafkaProducer(); $rk->setLogLevel(LOG_DEBUG); // 设置日志级别 $rk->addBrokers('127.0.0.1'); // 添加经纪人,就是ip地址 $topic = $rk->newTopic("test2"); // 新建主题 // 第一个参数:是分区。RD_KAFKA_PARTITION_UA代表未分配,并让librdkafka选择分区 // 第二个参数:是消息标志,必须为0 // 第三个参数:消息,如果不为NULL,它将被传递给主题分区程序 $topic->produce(RD_KAFKA_PARTITION_UA,'Message'); // 生成并发送单个消息 消费者,consumer.php $rk = new RdKafkaConsumer(); $rk->setLogLevel(LOG_DEBUG); // 设置日志级别 $rk->addBrokers("127.0.0.1"); // 添加经纪人,就是ip地址 $topic = $rk->newTopic("test2"); // 这里的$rk和生产者是不同的类哦 // 第一个参数分区ID // 第二个参数是开始消费的偏移量,有效值 $topic->consumeStart(0,RD_KAFKA_OFFSET_BEGINNING); while (true) { // 第一个参数要消耗的分区 // 第二个参数是等待收到消息的最长时间,1000是一秒 $msg = $topic->consume(0,1000); if (@$msg->err) { echo $msg->errstr(),"n"; // 输出错误 break; } else { echo @$msg->payload,"n"; // 输出消息 } } 测试 # cd /var/www/html/ # php consumer.php 新开一个终端,运行生产者,发送消息 # php producer.php 每当生产者运行一次就会发送一条消息,消费者那边就会接收到这条消息并进行相应的处理。 php-rdkafka扩展函数手册 (编辑:安卓应用网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |