kafka-python 获取topic lag值方式
说真,这个问题看上去很简单,但“得益”与kafka-python神奇的文档,真的不算简单,反正我是搜了半天还看了半天源码。
直接上代码吧
from kafka import SimpleClient, KafkaConsumer from kafka.common import OffsetRequestPayload, TopicPartition def get_topic_offset(brokers, topic): """ 获取一个topic的offset值的和 """ client = SimpleClient(brokers) partitions = client.topic_partitions[topic] offset_requests = [OffsetRequestPayload(topic, p, -1, 1) for p in partitions.keys()] offsets_responses = client.send_offset_request(offset_requests) return sum([r.offsets[0] for r in offsets_responses]) def get_group_offset(brokers, group_id, topic): """ 获取一个topic特定group已经消费的offset值的和 """ consumer = KafkaConsumer(bootstrap_servers=brokers, group_id=group_id, ) pts = [TopicPartition(topic=topic, partition=i) for i in consumer.partitions_for_topic(topic)] result = consumer._coordinator.fetch_committed_offsets(pts) return sum([r.offset for r in result.values()]) if __name__ == '__main__': topic_offset = get_topic_offset("brokers", "topic") group_offset = get_group_offset("brokers", "group_id", "topic") lag = topic_offset - group_offset
以上这篇kafka-python 获取topic lag值方式就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持我们。
相关推荐
-
python kafka 多线程消费者&手动提交实例
官方文档:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html import threading import os import sys from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata from consumers.db_util import * from consumers.json_dispose import *
-
Python测试Kafka集群(pykafka)实例
生产者代码: # -* coding:utf8 *- from pykafka import KafkaClient host = 'IP:9092, IP:9092, IP:9092' client = KafkaClient(hosts = host) print client.topics # 生产者 topicdocu = client.topics['my-topic'] producer = topicdocu.get_producer() for i in range(100):
-
python每5分钟从kafka中提取数据的例子
我就废话不多说了,直接上代码吧! import sys sys.path.append("..") from datetime import datetime from utils.kafka2file import KafkaDownloader import os """ 实现取kafka数据,文件按照取数据的间隔命名 如每5分钟从kafka取数据写入文件中,文件名为当前时间加5 """ TOPIC = "rtz
-
python3连接kafka模块pykafka生产者简单封装代码
1.1安装模块 pip install pykafka 1.2基本使用 # -* coding:utf8 *- from pykafka import KafkaClient host = 'IP:9092, IP:9092, IP:9092' client = KafkaClient(hosts = host) # 生产者 topicdocu = client.topics['my-topic'] producer = topicdocu.get_producer() for i in ran
-
python hbase读取数据发送kafka的方法
本例子实现从hbase获取数据,并发送kafka. 使用 #!/usr/bin/env python #coding=utf-8 import sys import time import json sys.path.append('/usr/local/lib/python3.5/site-packages') from thrift import Thrift from thrift.transport import TSocket from thrift.transport import
-
python操作kafka实践的示例代码
1.先看最简单的场景,生产者生产消息,消费者接收消息,下面是生产者的简单代码. #!/usr/bin/env python # -*- coding: utf-8 -*- import json from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers='xxxx:x') msg_dict = { "sleep_time": 10, "db_config": { "
-
python读取Kafka实例
1. 新建.py文件 # pip install kafka-python from kafka import KafkaConsumer import setting conf = setting.luyang_kafka_setting consumer = KafkaConsumer(bootstrap_servers=conf['host'], group_id=conf['groupid']) print('consumer start to consuming...') consum
-
kafka-python 获取topic lag值方式
说真,这个问题看上去很简单,但"得益"与kafka-python神奇的文档,真的不算简单,反正我是搜了半天还看了半天源码. 直接上代码吧 from kafka import SimpleClient, KafkaConsumer from kafka.common import OffsetRequestPayload, TopicPartition def get_topic_offset(brokers, topic): """ 获取一个topic的o
-
详解Python获取线程返回值的三种方式
目录 方法一 方法二 方法三 最后的话 提到线程,你的大脑应该有这样的印象:我们可以控制它何时开始,却无法控制它何时结束,那么如何获取线程的返回值呢?今天就分享一下自己的一些做法. 方法一 使用全局变量的列表,来保存返回值 ret_values = [] def thread_func(*args): ... value = ... ret_values.append(value) 选择列表的一个原因是:列表的 append() 方法是线程安全的,CPython 中,GI
-
python 获取字符串MD5值方法
工作中用到了MD5值来进行对文件校验,MD5本身就是一个很出色的算法,一定程度上解决了hash散列的冲突,关于MD5的内容网上也有很多,这里只要是进行一个实验,验证一下文件校验方面的工作,因为习惯使用了python来完成这样的任务,这里也是使用python,了解到python本身自带有hashlib模块,该模块中就包含了所需的MD5方法,当然python也有专门的MD5模块可以使用,使用得当发的大同小异,但是个人觉得hashlib模块更好用一些,今天就使用python的os.commands还有
-
Python获取文件ssdeep值的方法
本文实例讲述了Python获取文件ssdeep值的方法,分享给大家供大家参考.具体方法如下: 首先,得到ssdeep值,需要先import ssdeep 在ubuntu上安装pyssdeep时 一直出错 后来发现apt-cache search "ssdeep"时把几个全apt-get install 上,但问题依旧. 后来下载到pyssdeep的源文件 ,tar zxvf pyssdeep.tar.zip 然后 apt-get install python-dev 然后 pytho
-
根据tensor的名字获取变量的值方式
需求: 有时候使用slim这种封装好的工具,或者是在做滑动平均时,系统会帮你自动建立一些变量,但是这些变量只有名字,而没有显式的变量名,所以这个时候我们需要使用那个名字来获取其对应的值. 如下: input = np.random.randn(4,3) net = slim.fully_connected(input,2,weights_initializer=tf.ones_initializer(dtype = tf.float32)) 这段代码看似简单,但其实帮你生成了一个w和一个b.如果
-
python 获取字典键值对的实现
获 得 字 典 键. 值 的 函 数 有: items/ iteritems/ keys/ iterkeys/ values/ itervalues 通 过 以 上 这 些 函 数 得 到 的 是 键 或 者 值 的 列 表. 例: a_dict = {" name": "sir", "lang": "python", "email": "sir@ gmail.com", "w
-
python等间距取值方式
目录 等间距取值 对区间进行等间距取数 range函数 numpy.linspace函数 等间距取值 a = np.linspace(float(-pi), float(pi), 100) 从-pi到pi取100个值 对区间进行等间距取数 range函数 range(stop) range(start, stop, step) 参数说明: start: 计数从 start 开始.默认是从 0 开始.例如range(5)等价于range(0, 5); stop: 计数到 stop 结束,但不包括
-
springboot获取properties属性值的多种方式总结
目录 获取properties属性值方式总结 1. 除了默认配置在 application.properties的多环境中添加属性 2. 使用之前在spring中加载的value值形式 3. 也可以使用springboot里面的Environment 直接取值 4. 如果是自己新建的一个properties文件 获取多个自定义属性值 比如在application中自定义属性 获取properties属性值方式总结 spring boot 在多环境情况下我们需要根据不同的获取不一样的值, 我们会配
-
python计算Content-MD5并获取文件的Content-MD5值方式
1.首先计算MD5加密的二进制数组(128位),然后再对这个二进制数组进行base64编码(而不是对32位字符串编码). 例如,用Python计算0123456789的Content-MD5,主要代码如下: import base64, hashlib hash = hashlib.md5() hash.update("0123456789") base64.b64encode(hash.digest()) 这样就生成了 'eB5eJF1ptWaXm4bijSPyxw==' 的Cont
-
Python获取协程返回值的四种方式详解
目录 介绍 源码 依次执行结果 介绍 获取协程返回值的四种方式: 1.通过ensure_future获取,本质是future对象中的result方 2.使用loop自带的create_task, 获取返回值 3.使用callback, 一旦await地方的内容运行完,就会运行callback 4.使用partial这个模块向callback函数中传入值 源码 import asyncio from functools import partial async def talk(name): pr
随机推荐
- jQuery实现的粘性滚动导航栏效果实例【附源码下载】
- AngularJS实现Input格式化的方法
- 批处理文件(bat文件)注册dll批量注册dll
- Python内建数据结构详解
- ASP.Net MVC 布局页、模板页使用方法详细介绍
- PHP实现WebService的简单示例和实现步骤
- c#.net 常用函数和方法集
- javascript 判断页面访问方式电脑或者移动端
- Lua中遍历文件操作代码实例
- 通过T_sql语句向其中一次填入一条数据或一次填入多条数据的方式填充数据
- jquery.validate分组验证代码
- JQuery获取或设置ckeditor的数据(示例代码)
- jQuery+CSS 实现随滚动条增减的汽水瓶中的液体效果
- jquery实现动态画圆
- 浅谈JavaScript的Polymer框架中的事件绑定
- 详解node-ccap模块生成captcha验证码
- Python优先队列实现方法示例
- newasp中下载类
- 养成良好java代码编码规范
- 基于Python安装pyecharts所遇的问题及解决方法