python hbase读取数据发送kafka的方法

本例子实现从hbase获取数据,并发送kafka。

使用

#!/usr/bin/env python
#coding=utf-8

import sys
import time
import json

sys.path.append('/usr/local/lib/python3.5/site-packages')
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
from hbase1 import Hbase #调用hbase thrif1
from hbase1.ttypes import *
from kafka import KafkaConsumer
from kafka import KafkaProducer
from kafka.errors import KafkaError
import unittest

class HbaseOpreator:
 def __init__(self,host,port,table='test'):
  self.tableName=table
  self.transport=TTransport.TBufferedTransport(TSocket.TSocket(host,port))
  self.protocol=TBinaryProtocol.TBinaryProtocol(self.transport)
  self.client=Hbase.Client(self.protocol)
  self.transport.open()

 def __del__(self):
  self.transport.close()

 def scanTablefilter(self,table,*args):
  d=dict()
  L=[]
  try:
   tableName=table
   # scan = Hbase.TScan(startRow, stopRow)
   scan=TScan()
   #主键首字母123
   # filter = "PrefixFilter('123_')"
   # filter = "RowFilter(=,'regexstring:.aaa')"
   #过滤条件,当前为 statis_date 字段,值为20170223
   # fitler = "SingleColumnValueFilter(tableName,'f','statis_date','20170223')"
   # filter="SingleColumnValueFilter('f','statis_date',=,'binary:20170223') AND SingleColumnValueFilter('f','name',=,'binary:LXS')"
   filter="SingleColumnValueFilter('info','name',=,'binary:lilei') OR SingleColumnValueFilter('info','name',=,'binary:lily')"
   scan.filterString=filter
   id=self.client.scannerOpenWithScan(tableName,scan,None)
   result=self.client.scannerGet(id)
   # result=self.client.scannerGetList(id,100)
   while result:
    for r in result:
     key=r.row
     name=r.columns.get('info:name').value
     age=r.columns.get('info:age').value
     phone=r.columns.get('info:phone').value
     d['key']=key
     d['name']=name
     d['age']=age
     d['phone']=phone
     # encode_result_json=json.dumps(d).encode(encoding="utf-8")
     # print(encode_result_json)
     L.append(d)
    result=self.client.scannerGet(id)
   return json.dumps(L).encode(encoding="utf-8")
  finally:
   # self.client.scannerClose(scan)
   print("scan finish")

def sendKfafkaProduct(data):
 # self.host_port='localhost:9092'
 producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
 for d in data:
  producer.send('test', key=b'lxs', value=d)
  time.sleep(5)
  print(d)

 while True:
  producer.send('test', key=b'lxs', value=data)
  time.sleep(5)
  print(data)

if __name__== '__main__':
 # unittest.main()

 B=HbaseOpreator('10.27.1.138',9090)
 value=B.scanTablefilter('ns_lbi:test_hbase_student')
 print(value)
 #sendKfafkaProduct(value)
 

以上这篇python hbase读取数据发送kafka的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持我们。

(0)

相关推荐

  • 对python操作kafka写入json数据的简单demo分享

    如下所示: 安装kafka支持库pip install kafka-python from kafka import KafkaProducer import json ''' 生产者demo 向test_lyl2主题中循环写入10条json数据 注意事项:要写入json数据需加上value_serializer参数,如下代码 ''' producer = KafkaProducer( value_serializer=lambda v: json.dumps(v).encode('utf-8'

  • 通过pykafka接收Kafka消息队列的方法

    没有Kafka环境,所以也没有进行验证.感觉今后应该能用到,所以借抄在此,备查. pykafka使用示例,自动消费最新消息,不重复消费: # -* coding:utf8 *- from pykafka import KafkaClient host = '192.168.200.38' client = KafkaClient(hosts="%s:9092" % host) print client.topics # 生产者 # topicdocu = client.topics['

  • Docker部署Kafka以及Spring Kafka实现

    这篇文章主要介绍了Docker部署Kafka以及Spring Kafka实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 从https://hub.docker.com/查找kafka 第三个活跃并stars数量多 进去看看使用 我们使用docker-compose来构建镜像 查看使用文档中的docker-compose.yml 因为kafka要搭配zookeeper一起使用,所以文档中包含了zookeeper 我修改了一下版本号 以及变量参

  • Docker搭建Zookeeper&Kafka集群的实现

    最近在学习Kafka,准备测试集群状态的时候感觉无论是开三台虚拟机或者在一台虚拟机开辟三个不同的端口号都太麻烦了(嗯..主要是懒). 环境准备 一台可以上网且有CentOS7虚拟机的电脑 为什么使用虚拟机?因为使用的笔记本,所以每次连接网络IP都会改变,还要总是修改配置文件的,过于繁琐,不方便测试.(通过Docker虚拟网络的方式可以避免此问题,当时实验的时候没有了解到) Docker 安装 如果已经安装Docker请忽略此步骤 Docker支持以下的CentOS版本: CentOS 7 (64

  • kafka-python批量发送数据的实例

    如下所示: from kafka import KafkaClient from kafka.producer import SimpleProducer def send_data_2_kafka(datas): ''' 向kafka解析队列发送数据 ''' client = KafkaClient(hosts=KAFKABROKER.split(","), timeout=30) producer = SimpleProducer(client, async=False) curc

  • 在python环境下运用kafka对数据进行实时传输的方法

    背景: 为了满足各个平台间数据的传输,以及能确保历史性和实时性.先选用kafka作为不同平台数据传输的中转站,来满足我们对跨平台数据发送与接收的需要. kafka简介: Kafka is a distributed,partitioned,replicated commit logservice.它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现.kafka对消息保存时根据Topic进行归类,发送消息者成为Producer,消息接受者成为Consumer,此外ka

  • 详解Spring Kafka中关于Kafka的配置参数

    SpringKafka文档地址:https://docs.spring.io/spring-kafka/reference/htmlsingle kafka文档地址:http://kafka.apache.org/documentation SpringKafka中配置的Java配置实现类:https://github.com/spring-projects/spring-boot/blob/v1.5.4.RELEASE/spring-boot-autoconfigure/src/main/ja

  • 在Spring Boot应用程序中使用Apache Kafka的方法步骤详解

    第1步:生成我们的项目: Spring Initializr来生成我们的项目.我们的项目将提供Spring MVC / Web支持和Apache Kafka支持. 第2步:发布/读取Kafka主题中的消息: <b>public</b> <b>class</b> User { <b>private</b> String name; <b>private</b> <b>int</b> age

  • python hbase读取数据发送kafka的方法

    本例子实现从hbase获取数据,并发送kafka. 使用 #!/usr/bin/env python #coding=utf-8 import sys import time import json sys.path.append('/usr/local/lib/python3.5/site-packages') from thrift import Thrift from thrift.transport import TSocket from thrift.transport import

  • Python脚本读取Consul配置信息的方法示例

    先来说一下背景,为什么要写脚本去读Consul的配置信息呢?Consul是啥呢?consul是google开源的一个使用go语言开发的服务发现.配置管理中心服务.目前公司用的是这个东西去管理项目上的一些配置信息.公司的环境是通过docker镜像的方式去部署的,镜像是通过rancher去进行管理的.这一套东西面临的一个问题是:服务每次更新之后,服务对应的ip地址是动态变化的.每次需要使用swagger去测接口的时候,都要去rancher上去重新找新的ip地址,比较麻烦.正好呢,最近部门在考虑准备做

  • python实现读取excel表格详解方法

    目录 一.python读取excel表格数据 1.读取excel表格数据常用操作 2.xlrd模块主要操作 3.读取单元格内容为日期时间的方式 4.读取合并单元格的数据 二.python写入excel表格数据 一.python读取excel表格数据 1.读取excel表格数据常用操作 import xlrd # 打开excel表格 data_excel = xlrd.open_workbook('data/dataset.xlsx') # 获取所有sheet名称 names = data_exc

  • python常见读取语音的3种方法速度对比

    python 读取语音文件时,常用的无非以下三种方式,但是在我们数据量变的很大是,不同的读取方式之间的性能差异就会被进一步放大,于是本文着重对比了librosa.soundfile.wavfile三种方式的在重复读取一万次某个文件所耗时间的差异,为确保实验结果的可比性,每种方式读取出的语音序列值均一致.具体数值,在下方程序结果中已经标示. # -*- coding: utf-8 -*- """ # @Time : 2022/12/29 17:27 # @Author : Wa

  • Python导入txt数据到mysql的方法

    本文实例讲述了Python导入txt数据到mysql的方法.分享给大家供大家参考.具体分析如下: 从TXT文本转换数据到MYSQL数据库,接触一段时间python了 第一次写东西 用的是Python2.7 #!/usr/bin/python #coding=utf-8 import _mysql,sys,io def addCity(prov,city,tel,post): try: conn=_mysql.connect("192.168.1.99",'php','php'); co

  • python:接口间数据传递与调用方法

    如下所示: import requests import unittest import json from pubulic_way.get_token import getSession class testlogin(unittest.TestCase): def test_getIdentify(self): '''调用test_listCollectInfoByCreditId(self)响应数据中的taxid参数''' result = self.get_listCollectInfo

  • Python实现读取机器硬件信息的方法示例

    本文实例讲述了Python实现读取机器硬件信息的方法.分享给大家供大家参考,具体如下: 本人最近新学python ,用到关于机器的相关信息,经过一番研究,从网上查找资料,经过测试,总结了一下相关的方法. # -*- coding: UTF8 -*- import os import win32api import datetime import platform import getpass import socket import uuid import _winreg import re 1

  • Python OpenCV读取中文路径图像的方法

    引言 这几天做点小东西,涉及到OpenCV读取中文图像的问题 如果直接读取中文路径的图像,往往返回[] import cv2 cv_im = cv2.imread('老干妈.jpg') 缘起 偶然发现opencv 读取图像,解决imread不能读取中文路径的问题文章,代码简单有效,可以参考下文章底部附录 im = cv2.imdecode(np.fromfile(im_name,dtype=np.uint8),-1) 但是作者代码注释中说该方法读取的图像的通道就会变为RGB,但是我实验仍为BGR

  • Python使用QQ邮箱发送Email的方法实例

    前言 其实Python使用QQ邮箱发送Email代码很简单,短短几行代码就可以实现这个功能. 使用到的模块有smtplib和email这个两个模块,关于这两个模块的方法就不多说了.不了解的朋友们可以查看这篇文章:python中使用smtplib和email模块发送邮件实例 我们先说说网上常用的使用这那两个模块发送邮件的方法 代码如下: import smtplib from email.mime.text import MIMEText from email.header import Head

  • Oracle同步数据到kafka的方法

    目录 环境准备 软件准备 下载地址 实施过程 Oracle主机(A)配置 Kafka主机(B)配置 配置apache-maven工具 配置Kafka 2.13-2.6.0 配置kafka-connect-oracle-maste 启动kafka-connect-oracle 启动kafka消费者 启动数据库JOB 环境准备 软件准备 CentOS Linux 7.6.1810 (2台,A主机,B主机) Oracle 11.2.0.4(A主机安装) Kafka 2.13-2.6.0 (B主机安装)

随机推荐