python 解决动态的定义变量名,并给其赋值的方法(大数据处理)

最近消费kafka数据到磁盘的时候遇到了这样的问题:

需求:每天大概有1千万条数据,每条数据包含19个字段信息,需要将数据写到服务器磁盘,以第二个字段作为大类建立目录,第7个字段作为小类配合时间戳作为文件名,临时文件后缀tmp,当每个文件的写入条数(可配置,比如100条)达到要求条数时,将后缀tmp改为out。

问题:大类共有30个,小类不计其数而且未知,比如大类为A,小类为a,时间戳为20180606095835234,则A目录下的文件名为20180606095835234_a.tmp,这样一来需要在此文件写满100条时,更新时间戳生成第二个文件名,如果此时有1000个文件都在写则需要有1000个时间戳,和1000个计数器记录每个文件当前的条数,如果分别定义1000个变量显然是不划算的,

尝试:中间过程想到了动态定义变量名,即

定义第七个字段:seven = data.split('|')[7]

定义文件名:filename = time_stamp + '_' + seven+'.tmp',

定义文件计数器:seven + ‘_num' = 0

定义文件时间戳:seven + '_stamp' = time.time( )

想法其实是没问题的,但是这里用到了一个不常用的语法:用一个变量名和一个字符串拼接出来一个新的变量名,并继续赋值(不知道我的表述是否清楚),试过了用local()函数、global()函数、exec()函数都没有达到预期效果,也许是把问题想的太复杂了

解决:最后使用三个字典将这个问题完美解决,

定义一个字典用来存计数器,字典的每一个键对应一个文件名,值对应当前计数,并实时更新;

定义一个字典用来存时间戳,键对应一个文件名,值对应时间戳,达到100条就更新一次;

定义一个字典用来存大类,键对应代号,值对应分类;

局部功能代码如下:

def kafka_to_disk():
 print('启动前检测上次运行时是否存在意外中断的数据文件......')
 print('搜索最近一次执行脚本产生的时间目录......')
 # 待处理临时文件列表
 tmp_list = []
 try:
  for category_dir in os.listdir(local_file_path):
   if len(os.listdir(local_file_path+os.sep+category_dir)) > 0:
    for file in os.listdir(local_file_path+os.sep+category_dir):
     if suffix in file:
      tmp_list.append(local_file_path+os.sep+category_dir+os.sep+file)
  # print('上次运行程序产生的临时文件有---{}'.format(tmp_list))
 except Exception as e:
  pass
 if len(tmp_list) == 0:
  print('未扫描任何残留临时文件')
 else:
  print('开始修复残留临时文件......')
 tmp_num = 0
 for tmp in tmp_list:
  os.rename(tmp, tmp.split('.')[0]+'.out')
  tmp_num += 1
 print('本次启动共修复残留临时文件★★★★★-----{}个-----★★★★★'.format(tmp_num))

 category_poor = {
  '1': 'news', '2': 'weibo', '3': 'weixin', '4': 'app', '5': 'newspaper', '6': 'luntan',
  '7': 'blog', '8': 'video', '9': 'shangji', '10': 'shangjia', '11': 'gtzy', '12': 'zfztb',
  '13': 'gyfp', '14': 'gjz', '15': 'zfxx', '16': 'ptztb', '17': 'company', '18': 'house',
  '19': 'hospital', '20': 'bank', '21': 'zone', '22': 'express', '23': 'zpgw', '24': 'zscq',
  '25': 'hotel', '26': 'cpws', '27': 'gxqy', '28': 'gpjj', '29': 'dtyy', '30': 'bdbk'}

 time_stamp = utils.get_time_stamp() # 初始化毫秒级时间戳 : 20180509103015125
 consumer = KafkaConsumer(topic, group_id=group_id, auto_offset_reset=auto_offset_reset, bootstrap_servers=eval(bootstrap_servers))
 print('连接kafka成功,数据筛选中......')
 file_poor = {}       # 子类池用于文件计数器
 time_stamp_poor = {}     # 子类时间戳池,用于触发文件切换
 time_stamp = utils.get_time_stamp()  # 初始化毫秒级时间戳 :20180509103015125
 for message in consumer:
  # 提取第8个字段自动匹配目录进行创建
  if message.value.decode().split('|')[1] in category_poor:
   category = category_poor[message.value.decode().split('|')[1]]
  else:
   print(message.value.decode())
   continue
  category_dir = local_file_path + os.sep + category
  if not os.path.exists(category_dir):
   os.makedirs(category_dir)
  # 提取第2个字段,用于生成文件名
  if message.value.decode().split('|')[7] in time_stamp_poor:
   shot_file_name = time_stamp_poor[message.value.decode().split('|')[7]] + '_' + message.value.decode().split('|')[7]
  else:
   shot_file_name = time_stamp + '_' + message.value.decode().split('|')[7]
  file_name = category_dir + os.sep + shot_file_name + '.tmp'

  # 给每一个文件设定一个计数器
  if message.value.decode().split('|')[7] not in file_poor:
   file_poor[message.value.decode().split('|')[7]] = 0

  with open(file_name, 'a', encoding='utf-8')as f1:
   f1.write(message.value.decode())
   file_poor[message.value.decode().split('|')[7]] += 1

  # 触发切换文件的操作,用时间戳生成第二文件名
  if file_poor[message.value.decode().split('|')[7]] == strip_number:
   time_stamp_poor[message.value.decode().split('|')[7]] = utils.get_time_stamp()
   file_poor[message.value.decode().split('|')[7]] = 0

以上这篇python 解决动态的定义变量名,并给其赋值的方法(大数据处理)就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持我们。

(0)

相关推荐

  • 详解Python列表赋值复制深拷贝及5种浅拷贝

    概述 在列表复制这个问题,看似简单的复制却有着许多的学问,尤其是对新手来说,理所当然的事情却并不如意,比如列表的赋值.复制.浅拷贝.深拷贝等绕口的名词到底有什么区别和作用呢? 列表赋值 # 定义一个新列表 l1 = [1, 2, 3, 4, 5] # 对l2赋值 l2 = l1 print(l1) l2[0] = 100 print(l1) 示例结果: [1, 2, 3, 4, 5] [100, 2, 3, 4, 5] 可以看到,更改赋值后的L2后L1同样也会被更改,看似简单的"复制"

  • Python二元赋值实用技巧解析

    这篇文章主要介绍了Python二元赋值实用技巧解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 python支持类似于a += 3这种二元表达式.比如: a += 3 -> a = a + 3 a -= 3 -> a = a - 3 a *= 3 -> a = a * 3 ... 在python中的某些情况下,这种二元赋值表达式可能比普通的赋值方式效率更高些.原因有二: 二元赋值表达式中,a可能会是一个表达式,它只需计算评估一次,而a

  • 对Python中列表和数组的赋值,浅拷贝和深拷贝的实例讲解

    对Python中列表和数组的赋值,浅拷贝和深拷贝的实例讲解 列表赋值: >>> a = [1, 2, 3] >>> b = a >>> print b [1, 2, 3] >>> a[0] = 0 >>> print b [0, 2, 3] 解释:[1, 2, 3]被视作一个对象,a,b均为这个对象的引用,因此,改变a[0],b也随之改变 如果希望b不改变,可以用到切片 >>> b = a[:] &

  • 浅谈Python 列表字典赋值的陷阱

    今天在用python刷leetcode 3Sum problem时,调入到了一个大坑中,检查半天并没有任何逻辑错误,但输出结果却总是不对,最终通过调试发现原来python中list和dict类型直接赋值竟然是浅拷贝!!!因此,在实际实验中,若要实现深拷贝,建立新list或dict,使新建的list或dict变量和以前的变量只是具有相同的值,但是却具有不同的存储地址,保证在改变以前的list变量的时候,不会对新的list产生任何影响. python中的深拷贝的实现需要通过copy.deepcopy

  • python如何给字典的键对应的值为字典项的字典赋值

    问题 1:需要得到一个类似{"demo":{"key":"value"}}这样格式的字典dic. dic = dict() dic_temp = dict() dic_temp = {"key":"value"} dic["demo"] = dic_temp 问题 2:创建一个多值映射字典.. d = {} for key, value in pairs: if key not in d

  • 解决python字典对值(值为列表)赋值出现重复的问题

    可能很少有人遇到这个问题,网上也没找到,这里记录一下,希望也可以帮到其他人. 问题描述:假设有一个字典data,其键不定,可能随时添加键(这不是关键),某一个键下面对应的值为一个长度为10的list,初始化为0,然后我想修改某些键下面的列表中的某一个值,比如data有一个键'k',对应的值为[0,0,0,0,0,0,0,0,0,0],现在我想把键'k'对应的列表的第三个数改成3,即[0,0,3,0,0,0,0,0,0,0],可是意外的事情发生了,如果data还有一个键'k1',假设其值为[0,0

  • python变量赋值方法(可变与不可变)

    python中不存在所谓的传值调用,一切传递的都是对象的引用,也可以认为是传址. 一.可变对象和不可变对象 Python在heap中分配的对象分成两类:可变对象和不可变对象.所谓可变对象是指,对象的内容可变,而不可变对象是指对象内容不可变. 不可变(immutable):int.字符串(string).float.(数值型number).元组(tuple) 可变(mutable):字典型(dictionary).列表型(list) 不可变类型特点: 看下面的例子(例1) i = 73 i +=

  • Python创建一个空的dataframe,并循环赋值的方法

    如下所示: # 创建一个空的 DataFrame df_empty = pd.DataFrame() #或者 df_empty = pd.DataFrame(columns=['A', 'B', 'C', 'D']) #添加数据 a为一个新的dataframe df_empty = df_empty.append(a) 以上这篇Python创建一个空的dataframe,并循环赋值的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持我们.

  • python 解决动态的定义变量名,并给其赋值的方法(大数据处理)

    最近消费kafka数据到磁盘的时候遇到了这样的问题: 需求:每天大概有1千万条数据,每条数据包含19个字段信息,需要将数据写到服务器磁盘,以第二个字段作为大类建立目录,第7个字段作为小类配合时间戳作为文件名,临时文件后缀tmp,当每个文件的写入条数(可配置,比如100条)达到要求条数时,将后缀tmp改为out. 问题:大类共有30个,小类不计其数而且未知,比如大类为A,小类为a,时间戳为20180606095835234,则A目录下的文件名为20180606095835234_a.tmp,这样一

  • python 给DataFrame增加index行名和columns列名的实现方法

    在工作中遇到需要对DataFrame加上列名和行名,不然会报错 开始的数据是这样的 需要的格式是这样的: 其实,需要做的就是添加行名和列名,下面开始操作下. # a是DataFrame格式的数据集 a.index.name = 'date' a.columns.name = 'code' 这样就可以修改过来. 以上这篇python 给DataFrame增加index行名和columns列名的实现方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持我们.

  • django框架模板中定义变量(set variable in django template)的方法分析

    本文实例讲述了django框架模板中定义变量的方法.分享给大家供大家参考,具体如下: 总有一些情况,你会想在django template中设置临时变量,但是django 对在模板中对临时变量的赋值没有很好的开箱即用的tag 或者filter.但是还是能通过一些其他方法实现的. 1. 利用 django 自带的 with 标签实现 2. 利用自定义 tag  实现,应该灵活很多. 利用 django 自带的 with 标签实现对变量赋值 好像在django 1.3 之后才支持这种做法 从cont

  • asp程序定义变量比不定义变量速度快一倍

    因此,在我接触那么多种语言当中,asp是最不严格的一种,是对程序员要求最低的一种. 昨天测试了asp.net.php和asp的运行速度比较,今天又来冲动,想看看定义变量与不定义变量对asp运行效率的影响有多大,结果令人惊讶,asp程序定义变量比不定义变量速度快一倍! 测试程序还是昨天那个,运行一千万次for循环,获得执行时间. 1.程序没有定义变量(dim i) 复制代码 代码如下: <% dim startime startime=timer() for i = 1 to 10000000 n

  • python 动态生成变量名以及动态获取变量的变量名方法

    前言 需求: 必须现在需要动态创建16个list,每个list的名字不一样,但是是有规律可循,比如第一个list的名字叫: arriage_list_0=[],第二个叫arriage_list_1=[]--..依次类推,但是我又不想手动的去写16个这样的名字,太累了,而且增加了代码的冗余性,灵活性也不强,所以有没有一种方法是能动态创建list名称的呢?答案是有的!而与之对应,既然要对上面的列表动态操作,肯定是少不了动态去解析list名称.所以下面开始介绍方法. python 动态生成变量名 lo

  • python动态加载变量示例分享

    众所周知,程序在启动后,各个程序文件都会被加载到内存中,这样如果程序文本再次变化,对当前程序的运行没有影响,这对程序是一种保护. 但是,对于像python这样解释执行的语言,我们有时候会用到"from 模块 import 变量名"这样的形式,如果这个变量直接被定义在文件当中,那么这些变量在程序开始时就会被定义.赋值,运行过程中值不变.如果打算在运行过程中对这个模块进行重写,那么更改后的变量值是无法被使用的. 对于这个问题,可以换一种思路,将这个模块中的变量定义在函数里,而函数是在程序运

  • asp.net中eval不能定义变量的问题的解决方法

    复制代码 代码如下: eval.asp <%@ LANGUAGE='JAVASCRIPT'%> <script Language="javascript" runat=server> eval("var f1=1,f2=2,f3=3;"); Response.Write(f1+"<br/>"); Response.Write(f2+"<br/>"); Response.Write

  • Python中实现输入超时及如何通过变量获取变量名

    背景介绍 开发中遇到了一个需求:程序运行到某处时需要用户确认, 但不能一直傻等, 后面的程序不能被一直阻塞, 需要有个超时限制, 也就是这个程序如果在一段时间后还没有得到用户输入就执行默认操作. 解决思路 – 多线程法 我就想到了用多线程的方式, 开启一个子线程用stdin(比如python的input函数)获取用户输入, 主线程里设置线程启动和超时. 创建线程 Python中使用多线程很方便, threading.Threaded(函数, 参数表)然后thread.start就好了. 只是有一

  • Python自动打印被调用函数变量名及对应值 

    目录 1.软件环境 2.问题描述 3.解决方法 4.结果预览 1.软件环境 Windows10 教育版64位Python 3.6.3 2.问题描述 我们在定义一个函数或者是调用一个函数的时候,总是希望能够知道传入该被调用函数的具体值是多少?是否符合我们的预期?因此我们往往会将我们关心的值给打印出来(当然debug也可以,但不能每次都debug吧?),如下,我们创建了一个initial_printer示例函数: def initial_printer(variable_a, variable_b,

  • 详解C++ 动态库导出函数名乱码及解决

    刚接触C++,在尝试从 dll 中导出函数时,发现导出的函数名都"乱码"了. 导出过程如下: 新建一个Win32项目: 新建的解决方案里有几个导出的示例: // 下列 ifdef 块是创建使从 DLL 导出更简单的 // 宏的标准方法.此 DLL 中的所有文件都是用命令行上定义的 DLLEXPORT_EXPORTS // 符号编译的.在使用此 DLL 的 // 任何其他项目上不应定义此符号.这样,源文件中包含此文件的任何其他项目都会将 // DLLEXPORT_API 函数视为是从 D

随机推荐