python每5分钟从kafka中提取数据的例子

我就废话不多说了,直接上代码吧!

import sys
sys.path.append("..")
from datetime import datetime
from utils.kafka2file import KafkaDownloader
import os
"""
实现取kafka数据,文件按照取数据的间隔命名
如每5分钟从kafka取数据写入文件中,文件名为当前时间加5
"""

TOPIC = "rtz_queue"
HOSTS = "ip:9092,ip:9092"
GROUP = "2001"

def get_end_time(hour,minute,time_step):
 if (minute+time_step)%60<60:
  if (minute+time_step)%60<10:
   return str(hour+int((minute+time_step)/60))+":"+"0"+str((minute+time_step)%60)
  else:
   return str(hour+int((minute+time_step)/60))+":"+str((minute+time_step)%60)
 else:
  pass

def kafkawritefile(time_step,time_num):
 start = datetime.now()
 downloader = KafkaDownloader(HOSTS, TOPIC, GROUP)
 i = 1
 while(i<=time_num):
  end_time = get_end_time(start.hour, start.minute,i*time_step)
  end_time_file = end_time.replace(':', '_')
  outfile_path = "/data/tmp/" + end_time_file + ".csv"

  if os.path.exists(outfile_path):
   os.remove(outfile_path)
  writefile = open(outfile_path, 'a+', encoding='utf-8')

  for msg in downloader.message():
   curr_time = datetime.now()
   curr_time = str(curr_time)
   split_curr_time = curr_time.split(' ')
   curr_time_str = split_curr_time[1][0:5]

   if curr_time_str >= str(end_time):
    break
  i += 1

if __name__=='__main__':
 time_step = 15
 time_num = 1
 kafkawritefile(time_step,time_num)

以上这篇python每5分钟从kafka中提取数据的例子就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持我们。

(0)

相关推荐

  • python 消费 kafka 数据教程

    1.安装python模块 pip install --user kafka-python==1.4.3 如果报错压缩相关的错尝试安装下面的依赖 yum install snappy-devel yum install lz4-devel pip install python-snappy pip install lz4 2.生产者 #!/usr/bin/env python # coding : utf-8 from kafka import KafkaProducer import json

  • python消费kafka数据批量插入到es的方法

    1.es的批量插入 这是为了方便后期配置的更改,把配置信息放在logging.conf中 用elasticsearch来实现批量操作,先安装依赖包,sudo pip install Elasticsearch2 from elasticsearch import Elasticsearch class ImportEsData: logging.config.fileConfig("logging.conf") logger = logging.getLogger("msg&

  • python操作kafka实践的示例代码

    1.先看最简单的场景,生产者生产消息,消费者接收消息,下面是生产者的简单代码. #!/usr/bin/env python # -*- coding: utf-8 -*- import json from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers='xxxx:x') msg_dict = { "sleep_time": 10, "db_config": { "

  • python kafka 多线程消费者&手动提交实例

    官方文档:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html import threading import os import sys from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata from consumers.db_util import * from consumers.json_dispose import *

  • 对python操作kafka写入json数据的简单demo分享

    如下所示: 安装kafka支持库pip install kafka-python from kafka import KafkaProducer import json ''' 生产者demo 向test_lyl2主题中循环写入10条json数据 注意事项:要写入json数据需加上value_serializer参数,如下代码 ''' producer = KafkaProducer( value_serializer=lambda v: json.dumps(v).encode('utf-8'

  • 在python环境下运用kafka对数据进行实时传输的方法

    背景: 为了满足各个平台间数据的传输,以及能确保历史性和实时性.先选用kafka作为不同平台数据传输的中转站,来满足我们对跨平台数据发送与接收的需要. kafka简介: Kafka is a distributed,partitioned,replicated commit logservice.它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现.kafka对消息保存时根据Topic进行归类,发送消息者成为Producer,消息接受者成为Consumer,此外ka

  • python hbase读取数据发送kafka的方法

    本例子实现从hbase获取数据,并发送kafka. 使用 #!/usr/bin/env python #coding=utf-8 import sys import time import json sys.path.append('/usr/local/lib/python3.5/site-packages') from thrift import Thrift from thrift.transport import TSocket from thrift.transport import

  • kafka-python批量发送数据的实例

    如下所示: from kafka import KafkaClient from kafka.producer import SimpleProducer def send_data_2_kafka(datas): ''' 向kafka解析队列发送数据 ''' client = KafkaClient(hosts=KAFKABROKER.split(","), timeout=30) producer = SimpleProducer(client, async=False) curc

  • python每5分钟从kafka中提取数据的例子

    我就废话不多说了,直接上代码吧! import sys sys.path.append("..") from datetime import datetime from utils.kafka2file import KafkaDownloader import os """ 实现取kafka数据,文件按照取数据的间隔命名 如每5分钟从kafka取数据写入文件中,文件名为当前时间加5 """ TOPIC = "rtz

  • python从PDF中提取数据的示例

    01 前言 数据是数据科学中任何分析的关键,大多数分析中最常用的数据集类型是存储在逗号分隔值(csv)表中的干净数据.然而,由于可移植文档格式(pdf)文件是最常用的文件格式之一,因此每个数据科学家都应该了解如何从pdf文件中提取数据,并将数据转换为诸如"csv"之类的格式,以便用于分析或构建模型. 在本文中,我们将重点讨论如何从pdf文件中提取数据表.类似的分析可以用于从pdf文件中提取其他类型的数据,如文本或图像.我们将说明如何从pdf文件中提取数据表,然后将其转换为适合于进一步分

  • Python 结构化字符串中提取数据详情

    目录 前言 从结构化字符串中提取数据 字符串解析 前言 在许多自动化任务中,我们都需要从已知格式结构化的输入文本中提取相关信息.例如,我们可能需要在一段电影评论数据中提取观影时间.电影名.评分等信息,以便存储后进行进一步分析.在本节中,我们将以提取电影评论数据信息为例讲解如何从结构化字符串中提取数据. 从结构化字符串中提取数据 假设我们具有以下结构的电影评分数据,我们需要解析存储观影时间.电影名.评分等信息: [<Timestamp>] - MOVIE ID: <movie id>

  • python如何在列表、字典中筛选数据

    python如何在列表.字典中筛选数据? 实际问题有哪些? 1.过滤掉列表[3,9,-1,10.-2......] 中负数 2.筛选出字典 {'li_ming':90,'xiao_hong':60,'li_kang':95,'bei_men':98} 中值高于90的项 3.筛选出集合{3,9,-1,10.-2......]中能被3整除的数 问题1如何解决? 最普通方法: #!/usr/bin/python3 def filter_l(data): res = [] for i in data:

  • Python实现将MySQL数据库表中的数据导出生成csv格式文件的方法

    本文实例讲述了Python实现将MySQL数据库表中的数据导出生成csv格式文件的方法.分享给大家供大家参考,具体如下: #!/usr/bin/env python # -*- coding:utf-8 -*- """ Purpose: 生成日汇总对账文件 Created: 2015/4/27 Modified:2015/5/1 @author: guoyJoe """ #导入模块 import MySQLdb import time impor

  • 使用python对多个txt文件中的数据进行筛选的方法

    一.问题描述 筛选出多个txt文件中需要的数据 二.数据准备 这是我自己建立的要处理的文件,里面是随意写的一些数字和字母 三.程序编写 import os def eachFile(filepath): pathDir =os.listdir(filepath) #遍历文件夹中的text return pathDir def readfile(name): fopen=open(name,'r') for lines in fopen.readlines(): #按行读取text中的内容 lin

  • Python使用pymysql从MySQL数据库中读出数据的方法

    python3.x已经不支持mysqldb了,支持的是pymysql 使用pandas读取MySQL数据时,使用sqlalchemy,出现No module named 'MySQLdb'错误. 安装:打开Windows PowerShell,输入pip3 install PyMySQL即可 import pymysql.cursors import pymysql import pandas as pd #连接配置信息 config = { 'host':'127.0.0.1', 'port'

  • 浅谈如何使用vb.net从数据库中提取数据

    1.设置从Model中的Sub Main 启动 2.程序结构 3.Model1 Imports System.Windows.Forms.Application Module Module1 Sub Main() 'form1 是测试多文档窗口 'Dim frm1 As New Form1() 'frm1.Show() Dim formStudentSysMain As New FormStudentSysMain() formStudentSysMain.Show() Do While Tru

  • php录入页面中动态从数据库中提取数据的实现

    摘要:用php制作动态web页面时,在提交服务器之前,让php根据用户在当前页面上录入的某字段的值立即从数据库中取出相关的其它字段的值并显示到当前页面上,是php程序开发中的难点.本文以一个具体实例详细介绍了怎样将两个html内嵌式语言php和javascript巧妙结合起来,解决这个难点的具体方法. 关键词:php.动态.html. 现在的网站已经从以前提供静态信息的形式发展到交互式的提供动态信息业务的方式.web的信息服务形式可以概括为两点:向客户提供信息:记录客户提交的信息.要提供这两种服

  • python 使用xlsxwriter循环向excel中插入数据和图片的操作

    写入Excel中后有显示第一列客户款号总库存这些,开始写在第12行第一列开始写入,一行写入5个,然后再隔12行,再写入下边的数据,图片需要对应客户款号在Excel写入图片,类似下面的格式 import xlsxwriter import os #以空字符填充缺失值,不然写入数据会报错 data.fillna('',inplace=True) #创建一个新Excel文件并添加一个工作表. workbook = xlsxwriter.Workbook('images.xlsx') worksheet

随机推荐