Python实现日志实时监测的示例详解

目录
  • 介绍
  • 观察者模式类图
  • 观察者模式示例
    • 1、创建订阅者类
    • 2、创建发布者类
    • 3、应用客户端-Map_server_client.py
    • 4、测试

介绍

观察者模式:是一种行为型设计模式。主要关注的是对象的责任,允许你定义一种订阅机制,可在对象事件发生时通知多个"观察"该对象的其他对象。用来处理对象之间彼此交互。

观察者模式也叫发布-订阅模式,定义了对象之间一对多依赖,当一个对象改变状态时,这个对象的所有依赖者都会收到通知并按照自己的方式进行更新。

观察者设计模式是最简单的行为模式之一。在观察者设计模式中,对象维护了一个依赖(观察者)列表,以便主题可以使用观察者定义的任何方法通知所有观察者它所发生的变化。

可使用观察者模式应用场景

在广播或者发布订阅系统的情形中,你会看到观察者设计模式的用法,它的主要使用场景如下:

1、分布式系统中实现事件服务。

2、广播或发布/阅系统情形中。

2、用作新闻机器的框架。

3、股票监测机器人。

观察者模式类图

观察者模式类图

1、发布者Publisher:向其他对象发送值得关注的事件。事件会在发布者自身状态改变或执行特定行为后发生。发布者中包含一个允许新订阅者加入和当前订阅者离开列表的订阅机制。

2、订阅者Subscriber:定义通知接口。一般情况下,该接口仅包含一个update()更新方法。方法中可以有多个参数,使发布者能在更新时传递事件详细信息。

3、客户端Client:分别创建发布者和订阅者对象,然后为订阅者注册,发布者更新。

观察者模式示例

假如我们对应用函数运行状态进行监测,当发生异常时报警记录,可通过观察者模式进行信息订阅:1、短信 2、日志 3、邮件

代码实现---subscription_model.py

1、创建订阅者类

Subscriber订阅者:所有希望关注发布者状态变化的其他对象。

这里提供了三个主要的订阅者(观察者)接口,跟踪着同一个发布者类的事件。主要包括:

1)、每个具体订阅者__init()方法使用attach()方法向发布者进行注册以获取信息更新。

2)、具体订阅者的update()更新消息。

#抽象订阅者
from abc import ABCMeta,abstractmethod
class Subscriber(metaclass=ABCMeta):
    #向具体订阅者发送消息的方法
    @abstractmethod
    def update(self):
        pass

#具体订阅者
#1、短信订阅者
class SMSSubscriber(Subscriber):
    def __init__(self,publisher):
        self.publisher = publisher
        self.publisher.attach(self)

    def update(self):
        print(type(self).__name__,self.publisher.getNews())

#2、邮件订阅者
class EmailSubscriber(Subscriber):
    def __init__(self, publisher):
        self.publisher = publisher
        self.publisher.attach(self)

    def update(self):
        print(type(self).__name__,self.publisher.getNews())
        info = self.publisher.getNews()
        # 发送邮件
        Sender_mail(info).sender_mail()

#3、日志订阅(文件存储)
class LoggerSubscriber(Subscriber):

    def __init__(self, publisher):
        log_dir = os.path.expanduser(r".\apps\Mapview\logs")
        log_file = os.path.join(log_dir, "file_{time}.log")
        logger.add(log_file, rotation="100KB", retention=2)
        self.publisher = publisher
        self.publisher.attach(self)

    def update(self):
        print(type(self).__name__,self.publisher.getNews())
        info=self.publisher.getNews()
        logger.info(f"{info}")

2、创建发布者类

Publisher发布者:将自身的状态改变通知其他对象,为发布者添加订阅机制,每个对象都能订阅或取消订阅者事件流。

主要包括:

1)self.__subscribers = []:一个用于存储订阅对象列表

2)供订阅者来注册NewsPublisher或删除订阅用户。

3)几个用于添加、删除或查看列表中订阅者的公有方法。

4)notifySubscribers(self):用于通知所有订阅者出现新的信息,发送者会遍历订阅列表并通过内部调用具体订阅者实现的update()方法来实现。

5)创建新消息和返回最新消息。

#创建发布者
class NewsPublisher:
    def __init__(self):
        self.__subscribers = []
        self.__latestNews = None
    
    #将订阅者添加到队列中
    def attach(self,subscriber):
        self.__subscribers.append(subscriber)
    
    #从订阅的主题里面移除
    def detach(self):
        return self.__subscribers.pop()

    #生成观察者列表
    def subscribers(self):
        return [type(x).__name__ for x in self.__subscribers]

    #发送通知给相关的主题订阅者
    def notifySubscribers(self):
        for sub in self.__subscribers:
            #update()方法由具体的观察者或订阅者实现的
            sub.update()  #推送更新
    
    #创建新消息
    def addNews(self,news):
        self.__latestNews = news
    
    #返回最新消息,并通知观察者
    def getNews(self):
        return "Got News:",self.__latestNews

3、应用客户端-Map_server_client.py

订阅者通常需要一些上下文信息正确处理更新。因此,发布者通常会将一些上下文数据作为通知方法的参数传递。

这里给第一篇文章留下的尾巴补充一下,客户端实例化get_Map_model方法添加带参数装饰器,@fail_data(msg='地图加载失败')添加接口调用失败处理机制,追加日志记录。这里可以进一步将更多细节参数添加到日志中,装饰器传参并在接口中声明通知方法及参数,这样发布者在发出通知时传递一些上下文数据。

from apps.tools.subscription_model import NewsPublisher,LoggerSubscriber,EmailSubscriber
import functools

#如果加载失败,调用订阅者
def publisher(info):
    news_publisher = NewsPublisher()
    # for Subscribers in [EmailSubscriber, LoggerSubscriber]:
    for Subscribers in [LoggerSubscriber]:
        # eval(LoggerSubscriber)(news_publisher)
        Subscribers(news_publisher)
        print("\nSubscribers", news_publisher.subscribers())
        news_publisher.addNews(f"{info}")
        news_publisher.notifySubscribers()

#处理异常的装饰器
def fail_data(msg='地图加载失败'):
    def catch_exception(origin_func):
        @functools.wraps(origin_func)
        def wrapper(*args, **kwargs):
            try:
                u = origin_func(*args, **kwargs)
                print("这个函数正常执行:%s" % origin_func.__name__)
                return u

            except Exception as e:
                info = f"{msg}:{e.__doc__}"
                """
                接口调用失败处理机制,追加日志
                """
                print(info)
                publisher(info)
                # news_publisher = NewsPublisher()
                # LoggerSubscriber(news_publisher)
                # print("\nSubscribers", news_publisher.subscribers())
                # news_publisher.addNews(f"{info}")
                # news_publisher.notifySubscribers()
        return wrapper
    return catch_exception

4、测试

if __name__ == '__main__':

    from loguru import logger
    from apps.tools.Sender_Email import Sender_mail

    news_publisher =NewsPublisher()
    for Subscribers in [LoggerSubscriber]:
        print(Subscribers)
        Subscribers(news_publisher)

    print("\nSubscribers",news_publisher.subscribers())

    news_publisher.addNews("地图加载失败!")
    news_publisher.notifySubscribers()

结果

class '__main__.LoggerSubscriber';

Subscribers ['LoggerSubscriber']
LoggerSubscriber ('Got News:', '地图加载失败!')
2022-04-05 16:38:00.667 | INFO     | __main__:update:81 - ('Got News:', '地图加载失败!')

以上就是实现了一个简单的发布订阅模式,发布者与订阅者之间是松耦合的,添加新订阅者无需修改发布者。所有具体订阅者类都实现了同样接口。

到此这篇关于Python实现日志实时监测的示例详解的文章就介绍到这了,更多相关Python日志监测内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • python实时监控logstash日志代码

    实时读取logstash日志,有异常错误keywork即触发报警. # /usr/bin/env python3 # -*- coding: utf-8 -*- # __author__ = caozhi # create_time 2018-11-12,update_time 2018-11-15 # version = 1.0 # 录像高可用报警 # 1 读取日志 使用游标移动 # 2 线上业务日志文件会切割,切割后,读取上一个切割的日志 import os import sys impor

  • Python实现监控程序执行时间并将其写入日志的方法

    本文实例讲述了Python实现监控程序执行时间并将其写入日志的方法.分享给大家供大家参考.具体实现方法如下: # /usr/bin/python # -*- coding:utf-8 -*- from time import time def logged(when): def log(f,*args,**kargs): print ''' called: functions:%s args: %r kargs: %r ''' % (f,args,kargs) def pre_logged(f)

  • Python设计模式之观察者模式简单示例

    本文实例讲述了Python设计模式之观察者模式.分享给大家供大家参考,具体如下: 观察者模式是一个软件设计模式,一个主题对象包涵一系列依赖他的观察者,自动通知观察者的主题对象的改变,通常会调用每个观察者的一个方法.这个设计模式非常适用于分布式事件处理系统. 典型的在观察者模式下: 1.发布者类应该包涵如下方法: 注册能够接收通知的对象 从主对象到注册对象,通知任何变化 未注册对象不能够接收任何通知信息 2.订购者类应该包涵如下: 发布者会调用一个订购者提供的方法,将任何改变告知注册对象. 3.当

  • Python设计模式之观察者模式实例

    关于设计模式中的观察者模式,定义如下(维基百科): 觀察者模式(有時又被稱為發布/訂閱模式)是軟體設計模式的一種.在此種模式中,一個目標物件管理所有相依於它的觀察者物件,並且在它本身的狀態改變時主動發出通知.這通常透過呼叫各觀察者所提供的方法來實現.此種模式通常被用來實作事件處理系統.简单来说,一个被观察者有很多观察者,被观察者的状态的改变会引起所有观察者的响应操作. 那么我们用Python2.7来实现观察者模式. Python中的集合set 集合(set),类似于列表(list),但是它没有重

  • python动态监控日志内容的示例

    日志文件一般是按天产生,则通过在程序中判断文件的产生日期与当前时间,更换监控的日志文件程序只是简单的示例一下,监控test1.log 10秒,转向监控test2.log 程序监控使用是linux的命令tail -f来动态监控新追加的日志 复制代码 代码如下: #!/usr/bin/python# encoding=utf-8# Filename: monitorLog.pyimport osimport signalimport subprocessimport time logFile1 =

  • Python实现日志实时监测的示例详解

    目录 介绍 观察者模式类图 观察者模式示例 1.创建订阅者类 2.创建发布者类 3.应用客户端-Map_server_client.py 4.测试 介绍 观察者模式:是一种行为型设计模式.主要关注的是对象的责任,允许你定义一种订阅机制,可在对象事件发生时通知多个"观察"该对象的其他对象.用来处理对象之间彼此交互. 观察者模式也叫发布-订阅模式,定义了对象之间一对多依赖,当一个对象改变状态时,这个对象的所有依赖者都会收到通知并按照自己的方式进行更新. 观察者设计模式是最简单的行为模式之一

  • Python实现监控远程主机实时数据的示例详解

    目录 0 简述 1 程序说明文档 1.1 服务端 1.2 客户端 2 代码 0 简述 实时监控应用程序,使用Python的Socket库和相应的第三方库来监控远程主机的实时数据,比如CPU使用率.内存使用率.网络带宽等信息.可以允许多个用户同时访问服务端.注:部分指令响应较慢,请耐心等待. 1 程序说明文档 1.1 服务端 本程序为一个基于TCP协议的服务端程序,可以接收客户端发送的指令并执行相应的操作,最终将操作结果返回给客户端.程序运行在localhost(即本机)的8888端口. 主要功能

  • python可视化大屏库big_screen示例详解

    目录 big_screen 特点 安装环境 输入数据 本地运行 在线部署 对于从事数据领域的小伙伴来说,当需要阐述自己观点.展示项目成果时,我们需要在最短时间内让别人知道你的想法.我相信单调乏味的语言很难让别人快速理解.最直接有效的方式就是将数据如上图所示这样,进行可视化展现. 具体如下: big_screen 特点 便利性工具, 结构简单, 你只需传数据就可以实现数据大屏展示. 安装环境 pip install -i https://pypi.tuna.tsinghua.edu.cn/simp

  • python进阶collections标准库使用示例详解

    目录 前言 namedtuple namedtuple的由来 namedtuple的格式 namedtuple声明以及实例化 namedtuple的方法和属性 OrderedDict popitem(last=True) move_to_end(key, last=True) 支持reversed 相等测试敏感 defaultdict 小例子1 小例子2 小例子3 Counter对象 创建方式 elements() most_common([n]) 应用场景 deque([iterable[,

  • Python使用自定义装饰器的示例详解

    在Python自动化测试中,使用自定义的装饰器来给测试方法传递测试数据: reader.py import csv import json from openpyxl import load_workbook from setting import DATA_DIR from os import path class Reader: @classmethod def read_excel(cls,xlname, min_row, max_row, min_col, max_col): xlnam

  • python案例中Flask全局配置示例详解

    目录 WEB服务全局配置 Flask全局配置 before_request after_request Flask自定义中间件 WEB服务全局配置 在目前的开发过市场当中,有很多WEB服务框架,Flask只是其中之一,但是总体上来看,所有的WEB框架都是依据HTTP协议的逻辑从请求到响应设计的.固然有很多功能是独立的,但是也有一部分功能需要全局设定,比如安全校验,比如埋点日志,那么这里就用到了全局配置. 所谓的全局配置,就是在框架全局,请求前后,响应前后,设置的全局配置,比如登录校验,这个功能并

  • 对python 生成拼接xml报文的示例详解

    最近临时工作要生成xml报名,通过MQ接口发送.简单小程序. 自增长拼成xml报文 Test_001.py # encoding=utf-8 import time orderId = '' s1= "\n" # for ID in range(1,5): item1 = "<item>" + \ "<orderID>" + str(ID) + "</orderID>" + \ "

  • Python中bisect的用法及示例详解

    bisect是python内置模块,用于有序序列的插入和查找. 查找: bisect(array, item) 插入: insort(array,item) 查找 import bisect a = [1,4,6,8,12,15,20] position = bisect.bisect(a,13) print(position) # 用可变序列内置的insert方法插入 a.insert(position,13) print(a) 输出: 5 [1, 4, 6, 8, 12, 13, 15, 2

  • python实现三壶谜题的示例详解

    前言 有一个充满水的8品脱的水壶和两个空水壶(容积分别是5品脱和3品脱).通过将水壶完全倒满水和将水壶的水完全倒空这两种方式,在其中的一个水壶中得到4品脱的水. 一.算法思想 算法分析 采用的算法思想是将某个时刻水壶中水的数量看作一个状态,用一个长度为3的数组表示. 初始状态便为[8,0,0],再拓展他的下一结点的可能结构. 若下一结点的结构已经被拓展过了便放弃,若没有拓展过则加入拓展列表(open_list)中.然后递归上述操作. 直到拓展列表(open_list)为空或者找到目标为止. 思想

  • Python机器学习从ResNet到DenseNet示例详解

    目录 从ResNet到DenseNet 稠密块体 过渡层 DenseNet模型 训练模型 从ResNet到DenseNet 上图中,左边是ResNet,右边是DenseNet,它们在跨层上的主要区别是:使用相加和使用连结. 最后,将这些展开式结合到多层感知机中,再次减少特征的数量.实现起来非常简单:我们不需要添加术语,而是将它们连接起来.DenseNet这个名字由变量之间的"稠密连接"而得来,最后一层与之前的所有层紧密相连.稠密连接如下图所示: 稠密网络主要由2部分构成:稠密块(den

随机推荐