kafka-python 获取topic lag值方式

说真,这个问题看上去很简单,但“得益”与kafka-python神奇的文档,真的不算简单,反正我是搜了半天还看了半天源码。

直接上代码吧

from kafka import SimpleClient, KafkaConsumer
from kafka.common import OffsetRequestPayload, TopicPartition

def get_topic_offset(brokers, topic):
  """
  获取一个topic的offset值的和
  """
  client = SimpleClient(brokers)
  partitions = client.topic_partitions[topic]
  offset_requests = [OffsetRequestPayload(topic, p, -1, 1) for p in partitions.keys()]
  offsets_responses = client.send_offset_request(offset_requests)
  return sum([r.offsets[0] for r in offsets_responses])

def get_group_offset(brokers, group_id, topic):
  """
  获取一个topic特定group已经消费的offset值的和
  """
  consumer = KafkaConsumer(bootstrap_servers=brokers,
               group_id=group_id,
               )
  pts = [TopicPartition(topic=topic, partition=i) for i in
      consumer.partitions_for_topic(topic)]
  result = consumer._coordinator.fetch_committed_offsets(pts)
  return sum([r.offset for r in result.values()])

if __name__ == '__main__':
  topic_offset = get_topic_offset("brokers", "topic")
  group_offset = get_group_offset("brokers", "group_id", "topic")
  lag = topic_offset - group_offset

以上这篇kafka-python 获取topic lag值方式就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持我们。

(0)

相关推荐

  • python每5分钟从kafka中提取数据的例子

    我就废话不多说了,直接上代码吧! import sys sys.path.append("..") from datetime import datetime from utils.kafka2file import KafkaDownloader import os """ 实现取kafka数据,文件按照取数据的间隔命名 如每5分钟从kafka取数据写入文件中,文件名为当前时间加5 """ TOPIC = "rtz

  • python操作kafka实践的示例代码

    1.先看最简单的场景,生产者生产消息,消费者接收消息,下面是生产者的简单代码. #!/usr/bin/env python # -*- coding: utf-8 -*- import json from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers='xxxx:x') msg_dict = { "sleep_time": 10, "db_config": { "

  • python3连接kafka模块pykafka生产者简单封装代码

    1.1安装模块 pip install pykafka 1.2基本使用 # -* coding:utf8 *- from pykafka import KafkaClient host = 'IP:9092, IP:9092, IP:9092' client = KafkaClient(hosts = host) # 生产者 topicdocu = client.topics['my-topic'] producer = topicdocu.get_producer() for i in ran

  • python读取Kafka实例

    1. 新建.py文件 # pip install kafka-python from kafka import KafkaConsumer import setting conf = setting.luyang_kafka_setting consumer = KafkaConsumer(bootstrap_servers=conf['host'], group_id=conf['groupid']) print('consumer start to consuming...') consum

  • python kafka 多线程消费者&手动提交实例

    官方文档:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html import threading import os import sys from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata from consumers.db_util import * from consumers.json_dispose import *

  • python hbase读取数据发送kafka的方法

    本例子实现从hbase获取数据,并发送kafka. 使用 #!/usr/bin/env python #coding=utf-8 import sys import time import json sys.path.append('/usr/local/lib/python3.5/site-packages') from thrift import Thrift from thrift.transport import TSocket from thrift.transport import

  • Python测试Kafka集群(pykafka)实例

    生产者代码: # -* coding:utf8 *- from pykafka import KafkaClient host = 'IP:9092, IP:9092, IP:9092' client = KafkaClient(hosts = host) print client.topics # 生产者 topicdocu = client.topics['my-topic'] producer = topicdocu.get_producer() for i in range(100):

  • kafka-python 获取topic lag值方式

    说真,这个问题看上去很简单,但"得益"与kafka-python神奇的文档,真的不算简单,反正我是搜了半天还看了半天源码. 直接上代码吧 from kafka import SimpleClient, KafkaConsumer from kafka.common import OffsetRequestPayload, TopicPartition def get_topic_offset(brokers, topic): """ 获取一个topic的o

  • 详解Python获取线程返回值的三种方式

    目录 方法一 方法二 方法三 最后的话 提到线程,你的大脑应该有这样的印象:我们可以控制它何时开始,却无法控制它何时结束,那么如何获取线程的返回值呢?今天就分享一下自己的一些做法. 方法一 使用全局变量的列表,来保存返回值 ret_values = [] def thread_func(*args):     ...     value = ...     ret_values.append(value) 选择列表的一个原因是:列表的 append() 方法是线程安全的,CPython 中,GI

  • python 获取字符串MD5值方法

    工作中用到了MD5值来进行对文件校验,MD5本身就是一个很出色的算法,一定程度上解决了hash散列的冲突,关于MD5的内容网上也有很多,这里只要是进行一个实验,验证一下文件校验方面的工作,因为习惯使用了python来完成这样的任务,这里也是使用python,了解到python本身自带有hashlib模块,该模块中就包含了所需的MD5方法,当然python也有专门的MD5模块可以使用,使用得当发的大同小异,但是个人觉得hashlib模块更好用一些,今天就使用python的os.commands还有

  • Python获取文件ssdeep值的方法

    本文实例讲述了Python获取文件ssdeep值的方法,分享给大家供大家参考.具体方法如下: 首先,得到ssdeep值,需要先import ssdeep 在ubuntu上安装pyssdeep时 一直出错  后来发现apt-cache search "ssdeep"时把几个全apt-get install 上,但问题依旧. 后来下载到pyssdeep的源文件 ,tar zxvf pyssdeep.tar.zip 然后 apt-get install python-dev 然后 pytho

  • 根据tensor的名字获取变量的值方式

    需求: 有时候使用slim这种封装好的工具,或者是在做滑动平均时,系统会帮你自动建立一些变量,但是这些变量只有名字,而没有显式的变量名,所以这个时候我们需要使用那个名字来获取其对应的值. 如下: input = np.random.randn(4,3) net = slim.fully_connected(input,2,weights_initializer=tf.ones_initializer(dtype = tf.float32)) 这段代码看似简单,但其实帮你生成了一个w和一个b.如果

  • python 获取字典键值对的实现

    获 得 字 典 键. 值 的 函 数 有: items/ iteritems/ keys/ iterkeys/ values/ itervalues 通 过 以 上 这 些 函 数 得 到 的 是 键 或 者 值 的 列 表. 例: a_dict = {" name": "sir", "lang": "python", "email": "sir@ gmail.com", "w

  • python等间距取值方式

    目录 等间距取值 对区间进行等间距取数 range函数 numpy.linspace函数 等间距取值 a = np.linspace(float(-pi), float(pi), 100) 从-pi到pi取100个值 对区间进行等间距取数 range函数 range(stop) range(start, stop, step) 参数说明: start: 计数从 start 开始.默认是从 0 开始.例如range(5)等价于range(0, 5); stop: 计数到 stop 结束,但不包括

  • springboot获取properties属性值的多种方式总结

    目录 获取properties属性值方式总结 1. 除了默认配置在 application.properties的多环境中添加属性 2. 使用之前在spring中加载的value值形式 3. 也可以使用springboot里面的Environment 直接取值 4. 如果是自己新建的一个properties文件 获取多个自定义属性值 比如在application中自定义属性 获取properties属性值方式总结 spring boot 在多环境情况下我们需要根据不同的获取不一样的值, 我们会配

  • python计算Content-MD5并获取文件的Content-MD5值方式

    1.首先计算MD5加密的二进制数组(128位),然后再对这个二进制数组进行base64编码(而不是对32位字符串编码). 例如,用Python计算0123456789的Content-MD5,主要代码如下: import base64, hashlib hash = hashlib.md5() hash.update("0123456789") base64.b64encode(hash.digest()) 这样就生成了 'eB5eJF1ptWaXm4bijSPyxw==' 的Cont

  • Python获取协程返回值的四种方式详解

    目录 介绍 源码 依次执行结果 介绍 获取协程返回值的四种方式: 1.通过ensure_future获取,本质是future对象中的result方 2.使用loop自带的create_task, 获取返回值 3.使用callback, 一旦await地方的内容运行完,就会运行callback 4.使用partial这个模块向callback函数中传入值 源码 import asyncio from functools import partial async def talk(name): pr

随机推荐