python实现磁盘日志清理的示例

一、描述:

以module的方式组件python代码,在磁盘文件清理上复用性更好

二、达到目标:

清空过期日志文件,清理掉超过自定大小日志文件

三、原码

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import commands
import os
import time
import re
import getopt
import sys

# commands.getstatusoutput 返回两个元素的元组tuple(status, result),status为int类型,result为string类型
def execute_local_shell_cmd(cmd):
  status, result = commands.getstatusoutput(cmd)

  result = result.split("\n")

  return status, result

def send_alert_mail():
  pass

'''
获取某一磁盘的空间使用率
'''
def get_disk_used(disk_name):
  status, result = execute_local_shell_cmd("df | grep %s | awk '{print $5}'" % disk_name)
  return status, result[0]

#print(get_disk_used('/data0'))

'''
判断文件是否在指定时间内修改过
'''

def file_modify_in(file_path,time_interval='1d'):
  current_time = time.time()
  # os.path.getmtime 返回最后修改时间。返回从unix纪元开始的跳秒数
  if current_time - os.path.getmtime(file_path) < translate_time_interval_to_second(time_interval):
    return True
  return False

def translate_file_size_to_kb(file_size):
  # 将字符串所有大写字符转为小写
  file_size = str(file_size.lower())
  # 创建匹配数字1次或多次的数字且小数点出现一次或者不出现的;小数点后数字重复0次或多次模式对象
  pattern = re.compile(r'\d+\.?\d*')
  match = pattern.match(file_size)
  file_size_number = None
  if match:
    # 使用Match获得分组信息
    #print(match.group())
    file_size_number = float(match.group())
  else:
    raise IOError("Input {0} can't translate to byte."
           "Current support g(gb)/m(mb)/k(kb)/b(byte)".format(file_size))
  # endswith() 方法用于判断字符串是否以指定后缀结尾,如果以指定后缀结尾返回True,否则返回False。
  # 可选参数"start"与"end"为检索字符串的开始与结束位置。
  if file_size.endswith("g") or file_size.endswith("gb"):
    return file_size_number * 1024 * 1024 * 1024
  elif file_size.endswith("m") or file_size.endswith("mb"):
    return file_size_number * 1024 * 1024
  elif file_size.endswith("k") or file_size.endswith("kb"):
    return file_size_number * 1024
  elif file_size.endswith("b") or file_size.endswith("byte"):
    return file_size_number
  else:
    raise IOError("Input {0} can't translate to byte."
            "Current support g(gb)/m(mb)/k(kb)/b(byte)".format(file_size))
#print(translate_file_size_to_kb('10g'))

def translate_time_interval_to_second(time_interval):
  date_interval = str(time_interval.lower())
  pattern = re.compile(r'\d+')
  match = pattern.match(date_interval)
  date_interval_number = None
  if match:
    date_interval_number = int(match.group())
  else:
    raise IOError("Input {0} can't translate to second."
           "Current support d(day)/h(hour)/m(min)/s(sec)".format(date_interval))
  if date_interval.endswith('d') or date_interval.endswith('day'):
    return date_interval_number * 24 * 3600
  elif date_interval.endswith('h') or date_interval.endswith('hour'):
    return date_interval_number * 3600
  elif date_interval.endswith('m') or date_interval.endswith('min'):
    return date_interval_number * 60
  elif date_interval.endswith('s') or date_interval.endswith('sec'):
    return date_interval_number
  else:
    raise IOError("Input {0} cant't translate to second."
           "Current support d(day)/h(hour)/m(min)/s(second)".format(date_interval))

#print(translate_time_interval_to_second('7d'))
'''
关断文件是否可能是当前log文件
1) 修改改时间1天内
2) 以pattern结尾
'''
def probable_current_log_file(file_path,pattern='log',modify_in='1d'):
  if file_modify_in(file_path,time_interval=modify_in):
    return True
  return str(file_path).endswith(pattern)

'''
获取超过天数设置log,注意不会返回可能是当前正在修改的文件,查看probable_current_log_file
确定如何做该判断
'''
def get_clean_log_list_by_date(target_dir,before_days_remove='7d',pattern="log"):
  before_seconds_remove = translate_time_interval_to_second(before_days_remove)
  current_time = time.time()
  # os.listdir 返回指定文件夹包含文件或文件夹的名字列表
  for candidate_file in os.listdir(target_dir):
    candidate_file_fullpath = "%s/%s" %(target_dir,candidate_file)
    # 是否存在一个普通文件
    if os.path.isfile(candidate_file_fullpath):
      candidate_file_mtime = os.path.getmtime(candidate_file_fullpath)

      # find\(\)根据是否包含字符串,如果包含有,返回开始的索引值,否则返回-1
      if current_time - candidate_file_mtime > before_seconds_remove \
        and candidate_file.find(pattern) != -1 \
        and not probable_current_log_file(candidate_file_fullpath):
        # yield 就是return一个值,并且记住这个返回值的位置,下次迭代就从这个位置后开始
        yield candidate_file_fullpath

'''
获取超过大小的日志文件(注意默认不会返回修改时间小于1天的文件)
'''
def get_clean_log_list_by_size(target_dir,file_size_limit='10g',pattern="log"):
  file_size_limit_byte = translate_file_size_to_kb(file_size_limit)
  for candidate_file in os.listdir(target_dir):
    candidate_file_fullpath = "%s/%s" %(target_dir,candidate_file)
    if os.path.isfile(candidate_file_fullpath):
      # stat返回相关文件的系统状态信息
      file_stat = os.stat(candidate_file_fullpath)
      if candidate_file.find(pattern) != -1 and \
              file_stat.st_size >= file_size_limit_byte:
        yield candidate_file_fullpath
      # 如果文件在modify_in之内修改过,则不返回
      # if not (modify_in and file_modify_in(candidate_file_fullpath, time_interval=modify_in)) and \
      #   not probable_current_log_file(candidate_file_fullpath):
      #    yield candidate_file_fullpath

'''
remove文件列表
'''
def remove_file_list(file_list,pattern='log',roll_back=False):
  for file_item in file_list:
    if roll_back or probable_current_log_file(file_item,pattern=pattern,modify_in='1d'):
      print('roll back file %s' % file_item)
      execute_local_shell_cmd("cat /dev/null > {0}".format(file_item))
    else:
      print('remove file %s' % file_item)
      # os.remove 删除指定路径文件。如果指定的路径是一个目录,将抛出OSError
      os.remove(file_item)

'''
清理掉超过日期的日志文件
'''
def remove_files_by_date(target_dir,before_days_remove='7d',pattern='log'):
  file_list = get_clean_log_list_by_date(target_dir,before_days_remove,pattern)
  remove_file_list(file_list)

'''
清理掉超过大小的日志文件
'''
def remove_files_by_size(target_dir,file_size_limit='10g',pattern='log'):
  file_list = get_clean_log_list_by_size(target_dir,file_size_limit,pattern)
  remove_file_list(file_list)

'''
清空当前的日志文件,使用cat /dev/null > {log_file}方式
'''

def clean_curren_log_file(target_dir,file_size_limit='10g',pattern='log'):
  for candidate_file in os.listdir(target_dir):
    candidate_file_fullpath = '%s/%s' % (target_dir,candidate_file)
    if candidate_file.endswith(pattern) and os.path.isfile(candidate_file_fullpath):
      file_stat = os.stat(candidate_file_fullpath)
      if file_stat.st_size >= translate_file_size_to_kb(file_size_limit):
        remove_file_list([candidate_file_fullpath],roll_back=True)

def clean_data_release_disk(disk_name, target_dir, disk_used_limit='80%', before_days_remove='7d',
              file_size_limit='10g', pattern='log'):
  disk_used_limit = disk_used_limit.replace('%', '')
  # 第一步执行按时间的日志清理
  print('Step one remove files {0} ago.'.format(before_days_remove))
  remove_files_by_date(target_dir, before_days_remove=before_days_remove, pattern=pattern)

  # 如果磁盘空间还是没有充分释放,则执行按大小的日志清理
  current_disk_used = int(get_disk_used(disk_name)[1].replace('%', ''))
  if current_disk_used > int(disk_used_limit):
    print("Disk {0}'s current used {1}% great than input used limit {2}%,"
       "so we will remove files bigger than {3}".
       format(disk_name, current_disk_used, disk_used_limit, file_size_limit))
    remove_files_by_size(target_dir, file_size_limit=file_size_limit, pattern=pattern)

  # 如果磁盘空间开没有释放,清空当前正在写的log文件,并alert
  current_disk_used = int(get_disk_used(disk_name)[1].replace('%', ''))
  if current_disk_used > int(disk_used_limit):
    print("Disk {0}'s current used {1}% great than input used limit {2}%,"
       "so we will roll back current log file".
       format(disk_name, current_disk_used, disk_used_limit, file_size_limit))
    clean_curren_log_file(target_dir, file_size_limit=file_size_limit, pattern=pattern)

  # 如果还是没有,alert mail
  if int(get_disk_used(disk_name)[1].replace('%', '')) > int(disk_used_limit):
    send_alert_mail()

def usage():
  print('clean.py -d <target_disk> -r <target_dirctory -u <diskUsedLimit(default 80%)> '
     '-f <fileSizeLimit(default 10gb,gb/mb/kb)> -p <filePattern(default log)> '
     '-t <beforeDaysRemove(default 7d,d)> ')
if __name__ == "__main__":
  target_disk_input = '/data0'
  target_dir_input = '/data0/hadoop2/logs'
  disk_used_limit_input = '80%'
  file_size_limit_input = '10g'
  pattern_input = 'log'
  before_days_remove_input = '7d'
  try:
    # getopt 命令解析,有短选项和长选项
    # getopt 返回两人个参数:一个对应参数选项和value元组,另一个一般为空
    opts,args = getopt.getopt(sys.argv[1:], 'hd:r:u:f:p:t:', ['help' 'disk=', 'directory=',
                                  'diskUsedLimit=', 'fileSizeLimit=',
                                  'filePattern=', 'beforeDaysRemove='])
  # getopt模块函数异常错误,捕获异常并打印错误
  except getopt.GetoptError as err:
    print err
    usage()
    sys.exit(2)

  if len(opts) < 6:
    usage()
    sys.exit(2)

  for opt,arg in opts:
    if opt == '-h':
      usage()
      sys.exit()
    elif opt in ("-d","--disk"):
      target_disk_input = arg.replace('/','')
    elif opt in ("-r","--directory"):
      target_dir_input = arg
    elif opt in ("-u","--diskUsedLimit"):
      disk_used_limit_input = arg
    elif opt in ("-f","--fileSizeLimit"):
      file_size_limit_input = arg
      translate_file_size_to_kb(file_size_limit_input)
    elif opt in ("-p","filePattern"):
      pattern_input = arg
    elif opt in ("-t","--beforeDaysRemove"):
      before_days_remove_input = arg
      translate_time_interval_to_second(before_days_remove_input)

  print ("{0} Start clean job.target_disk:{1},target_directory:{2},disk_used_limit:{3},"
      "file_size_limit:{4},pattern:{5},before_days_remove:{6}".format(time.ctime(time.time()),
                                      target_disk_input, target_dir_input,
                                      disk_used_limit_input, file_size_limit_input,
                                      pattern_input, before_days_remove_input))
  clean_data_release_disk(target_disk_input, target_dir_input,
              disk_used_limit=disk_used_limit_input, file_size_limit=file_size_limit_input,
              pattern=pattern_input, before_days_remove=before_days_remove_input)

四、统一调用目录定时删除

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os

# 遍历目录
def Lisdir(targetdir):
  list_dirs = os.walk(targetdir)
  for root,list_dirs,files in list_dirs:
    for d in list_dirs:
      yield os.path.join(root,d)

def log_dir(targetdir):
  list_dirs = os.listdir(targetdir)
  for ph in list_dirs:
    if os.path.isdir(os.path.join(targetdir,ph)):
      yield Lisdir(os.path.join(targetdir,ph))
for path in log_dir('/data0/backup_log-bin'):
  for ppp in path:
    # 以log-bin结尾 为假
    if ppp.endswith('log-bin') is False:
      os.system("db_script/clean_robo.py -d /data0 -r {0} -u 75% -f 501M -p bin -t 5d".format(ppp))

以上就是python实现磁盘日志清理的示例的详细内容,更多关于python 磁盘日志清理的资料请关注我们其它相关文章!

(0)

相关推荐

  • python获取磁盘号下盘符步骤详解

    这次主要教的是如何通过Python 获取Windows系统下的所有的磁盘盘符,以列表的形式展示出来,获取磁盘号下的盘符包括能够获取到我们正在插在电脑上的U盘,也可以读取到,希望能够对你们在学习过程中有所收获和有所灵感. 第一步:打开我们的pycharm软件,然后新建一个Python文件,输入我们的文件名 第二步:在编辑界面输入以下代码: import string import os def get_disklist(): disk_list = [] for c in string.ascii

  • python3实现磁盘空间监控

    本文实例为大家分享了python3磁盘空间监控的具体代码,供大家参考,具体内容如下 软硬件环境 python3 apscheduler 前言 在做频繁操作磁盘的python项目时,经常会碰到磁盘空间不足的情况,这个时候,工程应该要有自己的处理模块,当磁盘利用率到达某个点时,发出警告并停止程序的运行.本文就利用Python3中的apscheduler模块来处理这样的问题. 代码实践 import os import sys import signal import logging from aps

  • Python实现测试磁盘性能的方法

    本文实例讲述了Python实现测试磁盘性能的方法.分享给大家供大家参考.具体如下: 该代码做了如下工作: create 300000 files (512B to 1536B) with data from /dev/urandom rewrite 30000 random files and change the size read 30000 sequential files read 30000 random files delete all files sync and drop cac

  • 磁盘垃圾文件清理器python代码实现

    本文假设某些特定类型的文件和大小为0的文件为垃圾文件,可以自由扩展代码的列表,也就是垃圾文件的类型. from os.path import isdir, join, splitext from os import remove, listdir, chmod, stat import sys #指定要删除的文件类型 filetypes = ['.tmp', '.log', '.obj', '.txt'] def delCertainFiles(directory): for filename

  • Python实现获取磁盘剩余空间的2种方法

    本文实例讲述了Python实现获取磁盘剩余空间的2种方法.分享给大家供大家参考,具体如下: 方法1: import ctypes import os import platform import sys def get_free_space_mb(folder): """ Return folder/drive free space (in bytes) """ if platform.system() == 'Windows': free_by

  • python3监控CentOS磁盘空间脚本

    Python脚本监控CentOS磁盘空间,任何一个分区空间使用大于80%即发邮件给到指定邮箱. monitor.py #-*- coding: utf-8 -*- import socket import subprocess import smtplib from email.mime.text import MIMEText sender="xxx.xx@xxx.com" receiver= ["xxx.xx@xxx.com"] smtpHost="1

  • Linux 发邮件磁盘空间监控(python)

    核心代码: #!/usr/bin/python # -*- coding: UTF-8 -*- import smtplib import os import commands,time from email.mime.text import MIMEText #from email import MIMEText disk_free=os.popen('df -lh') list_disk=disk_free.read() mailto_list=["2880329185@qq.com&quo

  • 使用python获取电脑的磁盘信息方法

    使用Python获取电脑的磁盘信息需要借助于第三方的模块psutil,这个模块需要自己安装,纯粹的CPython下面不具备这个功能. 在iPython交互界面中进行如下演示: 查看电脑的磁盘分区: In [1]: import psutil In [2]: psutil.disk_partitions() Out[2]: [sdiskpart(device='/dev/disk2', mountpoint='/', fstype='hfs', opts='rw,local,rootfs,dovo

  • python实现磁盘日志清理的示例

    一.描述: 以module的方式组件python代码,在磁盘文件清理上复用性更好 二.达到目标: 清空过期日志文件,清理掉超过自定大小日志文件 三.原码 #!/usr/bin/env python # -*- coding: utf-8 -*- import commands import os import time import re import getopt import sys # commands.getstatusoutput 返回两个元素的元组tuple(status, resu

  • python实现扫描日志关键字的示例

    我们在压力测试过程会收集到很多log,怎样快速从中找到有用信息呢?让python脚本帮我们做这部分工作吧! 废话不说,上代码 环境:win10 + python2.7.14 #-*- encoding: utf-8 -*- #author : beihuijie #version 1.1 import re import sys import os import countTime def getParameters(): ''' get parameters from console comm

  • python动态监控日志内容的示例

    日志文件一般是按天产生,则通过在程序中判断文件的产生日期与当前时间,更换监控的日志文件程序只是简单的示例一下,监控test1.log 10秒,转向监控test2.log 程序监控使用是linux的命令tail -f来动态监控新追加的日志 复制代码 代码如下: #!/usr/bin/python# encoding=utf-8# Filename: monitorLog.pyimport osimport signalimport subprocessimport time logFile1 =

  • python实现log日志的示例代码

    源代码: # coding=utf-8 import logging import os import time LEVELS={'debug':logging.DEBUG,\ 'info':logging.INFO,\ 'warning':logging.WARNING,\ 'error':logging.ERROR,\ 'critical':logging.CRITICAL,} logger=logging.getLogger() level='default' def createFile

  • Python 根据日志级别打印不同颜色的日志的方法示例

    本文介绍了Python 根据日志级别打印不同颜色的日志的方法示例,分享给大家,具体如下: # -*-coding:UTF-8-*- import logging import os import time class logger(object): """ 终端打印不同颜色的日志,在pycharm中如果强行规定了日志的颜色, 这个方法不会起作用, 但是 对于终端,这个方法是可以打印不同颜色的日志的. """ #在这里定义StreamHandler

  • python实现简单日志记录库glog的使用

    这篇文章主要介绍了python实现简单日志记录库glog的使用,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 一. glog的简介 glog所记录的日志信息总是记录到标准的stderr中,即控制台终端. 每一行日志记录总是会添加一个谷歌风格的前缀,即google-style log prefix, 它的形式如下: E0924 22:19:15.123456 19552 filename.py:87] some message 上面红色部分加粗的

  • Python中logging日志的四个等级和使用

    1. logging日志的介绍 在现实生活中,记录日志非常重要,比如:银行转账时会有转账记录:飞机飞行过程中,会有个黑盒子(飞行数据记录器)记录着飞机的飞行过程,那在咱们python程序中想要记录程序在运行时所产生的日志信息,怎么做呢? 可以使用 logging 这个包来完成 记录程序日志信息的目的是: 1. 可以很方便的了解程序的运行情况 2. 可以分析用户的操作行为.喜好等信息 3. 方便开发人员检查bug 2. logging日志级别介绍 日志等级可以分为5个,从低到高分别是: 1. DE

  • Python常用工具类之adbtool示例代码

    1.adb常用命令 关闭adb服务:adb kill-server 启动adb服务  adb start-server 查询当前运行的所有设备  adb devices 可能在adb中存在多个虚拟设备运行 可以指定虚拟设备运行  -s 虚拟设备名称 重启设备 adb reboot  --指定虚拟设备   adb -s 设备名称 reboot 查看日志  adb logcat  清除日志 adb logcat -c 进入linux shell下  adb shell 其中常用的linux命令  c

  • 详解python之配置日志的几种方式

    作为开发者,我们可以通过以下3中方式来配置logging: 1)使用Python代码显式的创建loggers, handlers和formatters并分别调用它们的配置函数: 2)创建一个日志配置文件,然后使用fileConfig()函数来读取该文件的内容: 3)创建一个包含配置信息的dict,然后把它传递个dictConfig()函数: 需要说明的是,logging.basicConfig()也属于第一种方式,它只是对loggers, handlers和formatters的配置函数进行了封

  • python使用logging模块发送邮件代码示例

    logging模块不只是能记录log,还能发送邮件,使用起来非常简单方便 #coding=utf-8 ''''' Created on 2016-3-21 @author: Administrator ''' import logging, logging.handlers class EncodingFormatter(logging.Formatter): def __init__(self, fmt, datefmt=None, encoding=None): logging.Format

随机推荐