PHP扩展之kafka安装应用案例详解

话说用了就要有点产出,要不然过段时间又忘了,所以在这里就记录一下试用Kafka的安装过程和php扩展的试用。

实话说,如果用于队列的话,跟PHP比较配的,还是Redis。用的顺手,呵呵,只是Redis不能有多个consumer。但Kafka官方对PHP不支持,PHP扩展是爱好者或使用者写的。下面就开始讲Kafka的安装吧。我以CentOS6.4为例,64位。

一. 首先确认下jdk有没有安装

使用命令

[root@localhost ~]# java -version
java version "1.8.0_73"
Java(TM) SE Runtime Environment (build 1.8.0_73-b02)
Java HotSpot(TM) 64-Bit Server VM (build 25.73-b02, mixed mode)

如果有以上信息的话,就往下安装吧,有些可能是jdk对不上,那就装到对的上的。如果没有安装,就看一下下面的jdk安装方法:

http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html

到这个地址下载jdk8版本,我下载的是jdk-8u73-linux-x64.tar.gz,然后解压到/usr/local/jdk/下。

然后打开/etc/profile文件

[root@localhost ~]# vim /etc/profile

把下面这段代码写到文件里

export JAVA_HOME=/usr/local/jdk/jdk1.8.0_73
export CLASSPATH=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar
export PATH=$JAVA_HOME/bin:$PATH

最后

[root@localhost ~]# source /etc/profile

这时jdk就生效了,可以使用 java -version验证下。

二. 接下来安装Kafka

1. 下载Kafka

到http://kafka.apache.org/downloads.html下载相应的版本,我使用的是kafka_2.9.1-0.8.2.2.tgz

2. 下载完解压到你喜欢的目录

我是解压到 /usr/local/kafka/kafka_2.9.1-0.8.2.2

3. 运行默认的Kafka

启动Zookeeper server

[root@localhost kafka_2.9.1-0.8.2.2]# sh bin/zookeeper-server-start.sh config/zookeeper.properties &

启动Kafka server

[root@localhost kafka_2.9.1-0.8.2.2]# sh bin/kafka-server-start.sh config/server.properties &

运行生产者producer

[root@localhost kafka_2.9.1-0.8.2.2]# sh bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

运行消费者consumer

[root@localhost kafka_2.9.1-0.8.2.2]# sh bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

这样,在producer那边输入内容,consumer马上就能接收到。

4. 当有跨机的producer或consumer连接时

需要配置config/server.properties的host.name,要不然跨机的连不上。

三. Kafka-PHP扩展

使用了一圈,就https://github.com/nmred/kafka-php可以用。

我是使用composer安装的,以下是示例:

producer.php

<?php
require 'vendor/autoload.php';

while (1) {
    $part = mt_rand(0, 1);
    $produce = \Kafka\Produce::getInstance('kafka0:2181', 3000);
    // get available partitions
    $partitions = $produce->getAvailablePartitions('topic_name');
    var_dump($partitions);
    // send message
    $produce->setRequireAck(-1);
    $produce->setMessages('topic_name', 0, array(date('Y-m-d H:i:s'));

    sleep(3);
}

consumer.php

require 'vendor/autoload.php';

$consumer = \Kafka\Consumer::getInstance('kafka0:2181');
$group = 'topic_name';
$consumer->setGroup($group);
$consumer->setFromOffset(true);
$consumer->setTopic('topic_name', 0);
$consumer->setMaxBytes(102400);
$result = $consumer->fetch();
print_r($result);
foreach ($result as $topicName => $partition) {
    foreach ($partition as $partId => $messageSet) {
    var_dump($partition->getHighOffset());
        foreach ($messageSet as $message) {
            var_dump((string)$message);
        }
    var_dump($partition->getMessageOffset());
    }
}

到此这篇关于PHP扩展之kafka安装应用案例详解的文章就介绍到这了,更多相关PHP扩展之kafka安装应用内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • 使用 PHP Masked Package 屏蔽敏感数据的实现方法

    Fuko Masked 是 Kaloyan Tsvetkov的一个小型PHP库,用于通过用编辑后的元素替换列入黑名单的元素来屏蔽敏感数据. 以下是 软件包 readme的基本用法示例: use Fuko\Masked\Protect; //隐藏$secret_key var中的值 Protect::hideValue($secret_key); //隐藏$ _POST['password'] 的值 Protect::hideInput('password', INPUT_POST); $reda

  • php测试kafka项目示例

    本文实例讲述了php测试kafka项目.分享给大家供大家参考,具体如下: 概述 Kafka是最初由Linkedin公司开发,是一个分布式.分区的.多副本的.多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志.访问日志,消息服务等等,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目. 主要应用场景是:日志收集系统和消息系统. 安装kafka-php项目依赖 composer require nmred/kafka-

  • 利用ThinkPHP内置的ThinkAjax实现异步传输技术的实现方法

    准备工作: ① 首先要会使用ThinkPHP这个框架 ② 最好有些ajax的基础(可以去看下小飞的另外一篇博文:Ajax实时验证"用户名/邮箱等"是否已经存在) ③ 4个js文档(点此免积分下载) 先贴上源代码: 复制代码 代码如下: <script type="text/javascript" src="__PUBLIC__/js/base.js"></script> <script type="text

  • 完美解决phpdoc导出文档中@package的warning及Error的错误

    今天在编写PHPDoc的导出文档的时候发现一个很郁闷的错误,虽然这个warning不是什么重要错误,但是看着总是很不爽的.于是就去网上找了很多相关的资料,可是郁闷的是不知道是我用的PHPDoc版本太新(1.4的版本),还是说很多人都没遇到这个问题,反正就是没有相关的这个资料找到,只是找到了一些从PHPDocumentor官方网倒出来的关于@package的使用注意事项,然后就只能一条一条检查,看了一个版本又一个版本,总算是被我解决了. 而且发现该方案可以解决@package之类相关的错误提示:

  • PHP扩展之kafka安装应用案例详解

    话说用了就要有点产出,要不然过段时间又忘了,所以在这里就记录一下试用Kafka的安装过程和php扩展的试用. 实话说,如果用于队列的话,跟PHP比较配的,还是Redis.用的顺手,呵呵,只是Redis不能有多个consumer.但Kafka官方对PHP不支持,PHP扩展是爱好者或使用者写的.下面就开始讲Kafka的安装吧.我以CentOS6.4为例,64位. 一. 首先确认下jdk有没有安装 使用命令 [root@localhost ~]# java -version java version

  • Nginx+Tomcat负载均衡集群安装配置案例详解

    目录 前言 一.Nginx+Tomcat 二.配置Nginx服务器 三.部署Tomcat应用服务器 总结 前言 介绍Tomcat及Nginx+Tomcat负载均衡集群,Tomcat的应用场景,然后重点介绍Tomcat的安装配置.Nginx+Tomcat负载均衡集案列是应用于生产环境的一套可靠的Web站点解决方案. 一.Nginx+Tomcat 通常情况下,一个Tomcat站点由于可能出现单点故障及无法应付过多客户复杂多样的请求等问题,不能单独应用于生产环境下,所以我们需要一套更可靠的解决方案来完

  • Java sdk安装及配置案例详解

    1.安装Java SDK开发环境. 首先去官网下载Java SDK,http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html, 下载完成之后,开始安装. 点击下一步,安装完成. 2.配置Java SDK环境变量 单击"计算机-属性-高级系统设置",单击"环境变量".在"系统变量"栏下单击"新建",创建新的系统环境变量.

  • JavaScript axios安装与封装案例详解

    1.下载axios插件 cnpm install axios -S 2.在main.js引入axios import axios from 'axios' Vue.prototype.$http = axios 3.创建axios实例 let service = axios.create({ baseURL: baseUrl, // url = base api url + request url withCredentials: true, // send cookies when cross

  • Webpack-cli安装成功后查看webpack -v报错案例详解

    目录 问题 1. 安装webpack webpack-cli 2. 查看webpack 版本 解决 1. 查看node版本 2. 升级npm 3. 查看webpack版本 问题 1. 安装webpack webpack-cli npm install -g webpack webpack-cli 2. 查看webpack 版本 webpack -v 报错 /usr/local/lib/node_modules/webpack/lib/cli.js:66 .replace( ^ SyntaxErr

  • Vuex的安装、搭建及案例详解

    目录 前言 Vuex原理讲解 1.安装vuex组件 2.使用Vuex 求和案例 Coute.vue store.js Actions Mutations getters的使用: store Store仓库数据的使用: 效果图 GetState 的引入 总结 前言 本文讲诉了Vuex的安装.搭建.以及Actions.Mutations.State.Getters的使用,为什么使用mapState.mapGetters以及一些细节的解释 Vuex原理讲解 Actions:词义是 动作行为 Mutat

  • Java System.getProperty()-获取系统参数案例详解

    因为所有在java.io中的类都是将相对路径名解释为起始于用户的当前工作目录,所以应该清楚当前的目录. 可以通过调用System.getProperty("user.dir"); 来获得. /** * java.version Java 运行时环境版本 java.vendor Java 运行时环境供应商 java.vendor.url Java 供应商的 URL java.vm.specification.version Java 虚拟机规范版本 java.vm.specificati

  • IOS之WebSocket框架Starscream案例详解

    传统的网络技术 (也就是 Berkeley sockets) 被认为是可靠和稳定的.但是 Berkeley socket 在某些 web 技术,比如代理和防火墙下不太好使.WebSocket 出现于 2011 年,是一种在客户端和服务端之间建立双向通讯的新技术.WebSocket 比起多个 HTTP 请求来说更有效率并允许长连接. 在 iOS 上使用 WebSocket 并不是那么容易.iOS 和 Mac 库 Starscream 的出现,极大地简化了 WebSocket 的创建和使用. 注:本

  • Java 处理高并发负载类优化方法案例详解

    java处理高并发高负载类网站中数据库的设计方法(java教程,java处理大量数据,java高负载数据) 一:高并发高负载类网站关注点之数据库 没错,首先是数据库,这是大多数应用所面临的首个SPOF.尤其是Web2.0的应用,数据库的响应是首先要解决的. 一般来说MySQL是最常用的,可能最初是一个mysql主机,当数据增加到100万以上,那么,MySQL的效能急剧下降.常用的优化措施是M-S(主-从)方式进行同步复制,将查询和操作和分别在不同的服务器上进行操作.我推荐的是M-M-Slaves

  • Vue之使用mockjs生成模拟数据案例详解

    目录 在项目中安装mockjs 在Vue项目中使用mockjs的基本流程 Mock语法规范 数据模板定义规范(Data Template Definition,DTD) 数据占位符定义规范(Data Placeholder Definition,DPD) Mock.mock() Mock.Random() 在项目中安装mockjs 在项目目录下执行以下安装命令 npm install mockjs --save 在Vue项目中使用mockjs的基本流程 安装完成后,在项目src/utils目录下

随机推荐