Laravel中Kafka的使用详解

本文并没有kafka的安装教程,本文是针对已经安装kafka及其配置好kafka的php拓展并且使用laravel框架进行开发项目,配置一个可供laravel框架使用的生产及消费者类.

以下代码修改自本站的YII框架关于kafka类的代码,经过测试使用在本人的项目中,可正常运行,larvael版本:5.6 代码放置larvael框架位置:app/Tools/Kafka.php

<?php
namespace App\Tools;

use Illuminate\Config\Repository;

use Illuminate\Support\Facades\DB;
use Monolog\Logger;
use Monolog\Handler\StreamHandler;

use Illuminate\Http\Request;

class Kafka
{
  public $broker_list = '127.0.0.1';//配置kafka,可以用逗号隔开多个kafka
  public $topic = 'test';//管道名称
  public $partition = 0;

  protected $producer = null;
  protected $consumer = null;

  public function __construct()
  {
    if (empty($this->broker_list)) {
      throw new InvalidConfigException("broker not config");
    }
    $rk = new \RdKafka\Producer();
    if (empty($rk)) {
      throw new InvalidConfigException("producer error");
    }
    $rk->setLogLevel(LOG_DEBUG);
    if (!$rk->addBrokers($this->broker_list)) {
      throw new InvalidConfigException("producer error");
    }
    $this->producer = $rk;
  }

  /**
   * 生产者
   * @param array $messages
   * @return mixed
   */
  public function send($messages = [],$topic)
  {
    $topic = $this->producer->newTopic($topic);
    return $topic->produce(RD_KAFKA_PARTITION_UA, $this->partition, json_encode($messages));
  }

  /**
   * 消费者
   */
  public function consumer($object, $callback){
    $conf = new \RdKafka\Conf();
    $conf->set('group.id', 0);
    $conf->set('metadata.broker.list', $this->broker_list);

    $topicConf = new \RdKafka\TopicConf();
    $topicConf->set('auto.offset.reset', 'smallest');

    $conf->setDefaultTopicConf($topicConf);

    $consumer = new \RdKafka\KafkaConsumer($conf);

    $consumer->subscribe([$this->topic]);

    echo "waiting for messages.....\n";
    while(true) {
      $message = $consumer->consume(120*1000);
      switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
          echo "message payload....";
          $object->$callback($message->payload);
          break;
      }
      sleep(1);
    }
  }
}
?>

在控制器中如何使用:

首先再头部导入这个类:use App\Tools\Kafka;

下面是使用生产者实例:

public function test(){

   $topic = 'tool';//输入使用管道名称
   $data['shop_id'] = 58;
   $data['bar_code']=586;
   $data['goods_num'] = 1;
   $data['goods_unit'] = '个';

$Kafka = new Kafka();
$Error_Msg = $Kafka->send($data,$topic);//传入数组会自动转换json
var_dump($Error_Msg);

  }

下面是消费者实例,消费者我这里使用了的是php脚本进行的操作:

<?php

$conf = new RdKafka\Conf();

$conf->set('group.id', 'myConsumerGroup');

$rk = new RdKafka\Consumer($conf);
$rk->addBrokers("localhost:9092");

$topicConf = new RdKafka\TopicConf();
$topicConf->set('auto.commit.interval.ms', 100);
$topicConf->set('offset.store.method', 'file');
$topicConf->set('offset.store.path', sys_get_temp_dir());
$topicConf->set('auto.offset.reset', 'smallest');

$topic = $rk->newTopic("tool", $topicConf);//读取的管道

// Start consuming partition 0
$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);

while (true) {
  $message = $topic->consume(0, 120*10000);
  switch ($message->err) {
    case RD_KAFKA_RESP_ERR_NO_ERROR:
    //没有错误打印信息
      $message = json_decode(json_encode($message),true);
      $data = json_decode($message['payload'],true);
      var_dump($data);
      break;
    case RD_KAFKA_RESP_ERR__PARTITION_EOF:
      echo "等待接收信息\n";
      break;
    case RD_KAFKA_RESP_ERR__TIMED_OUT:
      echo "超时\n";
      break;
    default:
      throw new \Exception($message->errstr(), $message->err);
      break;
  }
 sleep(1);
}

?>

到此这篇关于Laravel中Kafka的使用详解的文章就介绍到这了,更多相关Laravel中Kafka内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • spring-cloud-stream结合kafka使用详解

    1.pom文件导入依赖 <!-- kafka --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency> 2.application.yml文件配置 spring: cloud: stream: kafka: bind

  • .NET Core下使用Kafka的方法步骤

    安装 CentOS安装 kafka Kafka : http://kafka.apache.org/downloads ZooLeeper : https://zookeeper.apache.org/releases.html 下载并解压 # 下载,并解压 $ wget https://archive.apache.org/dist/kafka/2.1.1/kafka_2.12-2.1.1.tgz $ tar -zxvf kafka_2.12-2.1.1.tgz $ mv kafka_2.12

  • Python confluent kafka客户端配置kerberos认证流程详解

    kafka的认证方式一般有如下3种: 1.SASL/GSSAPI 从版本0.9.0.0开始支持 2.SASL/PLAIN 从版本0.10.0.0开始支持 3.SASL/SCRAM-SHA-256 以及 SASL/SCRAM-SHA-512 从版本0.10.2.0开始支持 其中第一种SASL/GSSAPI的认证就是kerberos认证,对于java来说有原生的支持,但是对于python来说配置稍微麻烦一些,下面说一下具体的配置过程,confluent kafka模块底层依赖于librdkafka,

  • php测试kafka项目示例

    本文实例讲述了php测试kafka项目.分享给大家供大家参考,具体如下: 概述 Kafka是最初由Linkedin公司开发,是一个分布式.分区的.多副本的.多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志.访问日志,消息服务等等,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目. 主要应用场景是:日志收集系统和消息系统. 安装kafka-php项目依赖 composer require nmred/kafka-

  • Kafka多节点分布式集群搭建实现过程详解

    上一篇分享了单节点伪分布式集群搭建方法,本篇来分享一下多节点分布式集群搭建方法.多节点分布式集群结构如下图所示: 为了方便查阅,本篇将和上一篇一样从零开始一步一步进行集群搭建. 一.安装Jdk 具体安装步骤可参考linux安装jdk. 二.安装与配置zookeeper 下载地址:https://www-us.apache.org/dist/zookeeper/stable/ 下载二进制压缩包zookeeper-3.4.14.tar.gz,然后上传到linux服务器指定目录下,本次上传目录为/so

  • Laravel中Kafka的使用详解

    本文并没有kafka的安装教程,本文是针对已经安装kafka及其配置好kafka的php拓展并且使用laravel框架进行开发项目,配置一个可供laravel框架使用的生产及消费者类. 以下代码修改自本站的YII框架关于kafka类的代码,经过测试使用在本人的项目中,可正常运行,larvael版本:5.6 代码放置larvael框架位置:app/Tools/Kafka.php <?php namespace App\Tools; use Illuminate\Config\Repository;

  • Laravel中的Auth模块详解

    前言 本文主要给大家介绍的是关于Laravel中Auth模块的相关内容,分享出来供大家参考学习,下面话不多说了,来一起看看详细的介绍吧. 本文是基于Laravel 5.4 版本的本地化模块代码进行分析书写: 模块组成 Auth模块从功能上分为用户认证和权限管理两个部分:从文件组成上,Illuminate\Auth\Passwords目录下是密码重置或忘记密码处理的小模块,Illuminate\Auth是负责用户认证和权限管理的模块,Illuminate\Foundation\Auth提供了登录.

  • laravel 中repository模式使用详解

    什么是Repository模式,laravel学院中用这样一张图来解释 编码过程当中 解耦一直是个较为热门的话题. 使用MVC设计模式开发的时候,如果需要查询数据库/操作数据库的时候就得直接引用模型,调用模型.按照常规的调用方法直接以下所示,不使用Eloquent ORM就没法操作数据库,那么就是ORM和这个控制器有着非常之大的耦合性. $position = Position::createPosition($params); $position->users()->attach($user

  • 使用 Docker 搭建 Laravel 本地环境的教程详解

    Laravel 官方提供 Homestead 和 Valet 作为本地开发环境,Homestead 是一个官方预封装的 Vagrant Box,也就是一个虚拟机,但是跟 docker 比,它占用体积太大,启动速度慢,同时响应速度很慢,现在有了 docker 这种更好的方式,可以轻松方便的搭建整套 PHP 开发环境. 本文就介绍如何使用 docker 搭建 Laravel 本地环境. 安装 docker 首先安装 docker. 克隆 laradock laradock 官方文档: http://

  • Laravel5中Cookie的使用详解

    今天在Laravel框架中使用Cookie的时候,碰到了点问题,自己被迷糊折腾了半多小时.期间研究了Cookie的实现类,也在网站找了许多的资料,包括问答.发现并没有解决问题.网上的答案都是互相抄袭,互相转载.其实并没有什么用处.好在最后,我找到了决绝方法.奔着为广大Laravel爱好者和开发人员负责的精神,同时也希望大家在使用Cookie时少走弯路,在这里把在Laravel中Cookie的设置和读取方法贡献出来,供大家批评指正. 概述 Cookie的添加其实很简单,直接使用Cookie::ma

  • laravel高级的Join语法详解以及使用Join多个条件

    在laravel中我们常常会使用join,leftjion和rightjoin进行连表查询,非常的方便,但是我今天遇到一个问题,就是链表查询需要on多个条件,即我要订单的id和发货人都一样,默认的join只支持单个查询,所以我下面总结两种方法: 一.使用原是表达式(不推荐) 原生SQL中我们可以通过如下方法进行 select * from `orders` left join `users` on `orders`.`usename`=`users`.`usename` and `orders`

  • 基于laravel缓冲cache的用法详解

    一.在控制器中引用: use cache; 二.基本方法及使用 1.put() 键 值 有效时间(分钟) Cache::put('key1','val1',10); 2.add() 若key2不存在,则添加成功 否则,添加失败 Cache::add('key2','val2',20); 3.forever() 永久保存对象到缓存 Cache::forever('key3','val3'); 4.has() 判断是否存在 Cache::has('key1'); 5.get() 取值 Cache::

  • 使用jmx exporter采集kafka指标示例详解

    目录 预置条件 使用JMX exporter暴露指标 kafka集群启用监控 采集producer/consumer的指标 预置条件 安装kafka.prometheus 使用JMX exporter暴露指标 下载jmx exporter以及配置文件.Jmx exporter中包含了kafka各个组件的指标,如server metrics.producer metrics.consumer metrics等,但这些指标并不是prometheus格式的,因此需要通过重命名方式转变为promethe

  • Angularjs中数据绑定的实例详解

    Angularjs中数据绑定的实例详解 这是一个最简单的angularjs的例子,关于数据绑定的,大家可以执行一下,看看效果 <html ng-app> <head> <title>angularjs-include</title> <script type="text/javascript" src="js/angular/angular.min.js"></script> </head

  • 基于angular中的重要指令详解($eval,$parse和$compile)

    在angular的服务中,有一些服务你不得不去了解,因为他可以说是ng的核心,而今天,我要介绍的就是ng的两个核心服务,$parse和$compile.其实这两个服务讲的人已经很多了,但是100个读者就有100个哈姆雷特,我在这里讲讲自己对于他们两个服务的理解. 大家可能会疑问,$eval呢,其实他并不是一个服务,他是scope里面的一个方法,并不能算服务,而且它也基于parse的,所以只能算是$parse的另一种写法而已,我们看一下ng源码中$eval的定义是怎样的就知道了 $eval: fu

随机推荐