Python中使用matplotlib绘制mqtt数据实时图像功能

目录
  • 效果图
  • mqtt发布
  • mqtt订阅
  • matplotlib绘制动态图
  • matplotlib绘制mqtt数据实时图像

效果图

mqtt发布

本代码中publish是一个死循环,数据一直往外发送。

import random
import time
from paho.mqtt import client as mqtt_client
import json
from datetime import datetime

broker = 'broker.emqx.io'
port = 1883
topic = "/python/mqtt/li"
client_id = f'python-mqtt-{random.randint(0, 1000)}'  # 随机生成客户端id

def connect_mqtt():
    def on_connect(client, userdata, flags, rc):
        if rc == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %d\n", rc)

    client = mqtt_client.Client(client_id)
    client.on_connect = on_connect
    client.connect(broker, port)
    return client

def publish(client):
    while True:
        time.sleep(0.01)
        msg = json.dumps({"MAC": "0123456789",
                          "samplerate": 12,
                          "sampletime": str(datetime.utcnow().strftime('%Y/%m/%d-%H:%M:%S.%f')[:-3]),
                          "battery": 0.5,
                          "acc": [
                              [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
                              [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
                              [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
                              [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
                              [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
                              [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
                              [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
                              [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
                              [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
                              [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
                              [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
                              [random.randint(200, 350), -random.randint(200, 350), -random.randint(200, 350), random.randint(200, 350), random.randint(200, 350), random.randint(200, 350)],
                          ]})
        result = client.publish(topic, msg)
        status = result[0]
        if status == 0:
            print(f"Send `{msg}` to topic `{topic}`")
        else:
            print(f"Failed to send message to topic {topic}")

def run():
    client = connect_mqtt()
    client.loop_start()
    publish(client)

if __name__ == '__main__':
    run()

mqtt订阅

from paho.mqtt import client as mqtt_client
import time
import os

broker = 'broker.emqx.io'
port = 1883
topic = "/python/mqtt/li"

def connect_mqtt(client_id):
    """    MQTT 连接函数。    """
    def on_connect(client, userdata, flags, rc):
        """
        连接回调函数
        在客户端连接后被调用,在该函数中可以依据 rc 来判断客户端是否连接成功。
        """
        if rc == 0:
            print("Connected to MQTT Broker! return code %d" % rc)
        else:
            print("Failed to connect, return code %d\n", rc)

    client = mqtt_client.Client(client_id)
    # client.username_pw_set('uname', 'upwd')  # 链接mqtt所需的用户名和密码,没有可不写
    client.on_connect = on_connect
    client.connect(broker , port)
    return client

def subscribe(client: mqtt_client, a_topic):
    """     订阅消息       """
    def on_message(client, userdata, msg):
        """
        消息回调函数
        在客户端从 MQTT Broker 收到消息后被调用,在该函数中我们将打印出订阅的 topic 名称以及接收到的消息内容。
         * 这里可添加自定义数据处理程序
        """
        print('From topic : %s\n\tmsg : %s' % (msg.topic, msg.payload.decode()))

    client.subscribe(topic)
    client.on_message = on_message

def run(client_id, topic):
    client = connect_mqtt(client_id)
    subscribe(client, topic)
    client.loop_forever()

if __name__ == '__main__':
    run('test_eartag-003-python-li', 'zk100/gw/#')

matplotlib绘制动态图

import matplotlib.pyplot as plt
import numpy as np

count = 100  # 图中最多数据量

ax = list(range(count))  # 保存图1数据
ay = [0] * 100
bx = list(range(count))  # 保存图2数据
by = [0] * 100
num = count  # 计数

plt.ion()  # 开启一个画图的窗口进入交互模式,用于实时更新数据
plt.rcParams['figure.figsize'] = (10, 10)  # 图像显示大小
plt.rcParams['font.sans-serif'] = ['SimHei']  # 防止中文标签乱码,还有通过导入字体文件的方法
plt.rcParams['axes.unicode_minus'] = False
plt.rcParams['lines.linewidth'] = 0.5  # 设置曲线线条宽度
plt.tight_layout()
while True:
    plt.clf()  # 清除刷新前的图表,防止数据量过大消耗内存
    plt.suptitle("总标题", fontsize=30)  # 添加总标题,并设置文字大小
    g1 = np.random.random()  # 生成随机数画图
    # 图表1
    ax.append(num)  # 追加x坐标值
    ay.append(g1)  # 追加y坐标值
    agraphic = plt.subplot(2, 1, 1)
    agraphic.set_title('子图表标题1')  # 添加子标题
    agraphic.set_xlabel('x轴', fontsize=10)  # 添加轴标签
    agraphic.set_ylabel('y轴', fontsize=20)
    plt.plot(ax[-count:], ay[-count:], 'g-')  # 等于agraghic.plot(ax,ay,'g-')
    # 图表2
    bx.append(num)
    by.append(g1)
    bgraghic = plt.subplot(2, 1, 2)
    bgraghic.set_title('子图表标题2')
    bgraghic.plot(bx[-count:], by[-count:], 'r^')

    plt.pause(0.001)  # 设置暂停时间,太快图表无法正常显示
    num = num + 1

matplotlib绘制mqtt数据实时图像

  • 单线程

先启动mqtt订阅服务
mqtt订阅中有阻塞,更新数据后因订阅服务没有结束,导致绘图程序无法绘图
先启动绘图程序
绘图程序本身也是个循环,拿不到mqtt的实时数据,图像无法更新

  • 两个服务加入协程,也不行。具体原因还不知道,容后补充。
  • mqtt作为线程启动,可解决上述问题
import json
import random
from paho.mqtt import client as mqtt_client
import time
import datetime
from math import ceil, floor
import matplotlib.pyplot as plt
import _thread

# 公共变量
broker = 'broker.emqx.io'
topic = "/python/mqtt/li"
port = 1883
client_id = f'python-mqtt-li-{random.randint(0, 100)}'

show_num = 300

x_num = [-1]  # 计数
acc1 = []
acc2 = []
acc3 = []
acc4 = []
acc5 = []
acc6 = []
stime = []

"""mqtt subscribe topic"""
def str_microsecond_datetime2int_13timestamp(str_microsecond_datetime):
    """将字符串型【毫秒级】格式化时间 转为 【13位】整型时间戳"""
    datetime_obj = datetime.datetime.strptime(str_microsecond_datetime, "%Y/%m/%d-%H:%M:%S.%f")
    obj_stamp = int(time.mktime(datetime_obj.timetuple()) * 1000.0 + datetime_obj.microsecond / 1000.0) / 1000.0
    return obj_stamp

def int2datetime(int_float_timestamp):
    """
    有小数点:分离小数点,整数转为格式化时间,小数点直接跟在后面
    无小数点:从第10位进行分离,
    所以本函数只适用于时间戳整数位数大于9且小于11.
    """
    if '.' in str(int_float_timestamp):
        int_float = str(int_float_timestamp).split('.')
        date = time.localtime(int(int_float[0]))
        tempDate = time.strftime("%Y/%m/%d-%H:%M:%S", date)
        secondafter = '.' + str(int_float[1])
        return str(tempDate) + secondafter

def parse_mqttmsg(msg):
    """解析mqt头数据   MAC samplerate sampletime battery acc"""
    content = json.loads(msg.payload.decode())
    span = 1000 / content['samplerate'] * 10
    time_span = [ceil(span) / 10 / 1000, floor(span) / 10 / 1000]
    sampletime = content['sampletime']
    sampletime_int = str_microsecond_datetime2int_13timestamp(sampletime)
    acc = content['acc']
    for i in range(len(acc)):
        x_num.append(x_num[-1] + 1)
        acc1.append(acc[i][0])
        acc2.append(acc[i][1])
        acc3.append(acc[i][2])
        acc4.append(acc[i][3])
        acc5.append(acc[i][4])
        acc6.append(acc[i][5])
        if i != 0:
            sampletime_int += time_span[i % 2]
            stime.append(int2datetime(round(sampletime_int * 1000, 0) / 1000))
        else:
            stime.append(sampletime)
        print(x_num[-1], stime[-1], acc1[-1], acc2[-1], acc3[-1], acc4[-1], acc5[-1], acc6[-1])

def connect_mqtt():
    def on_connect(client, userdata, flags, rc):
        if rc == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %d\n", rc)
            pass

    client = mqtt_client.Client(client_id)
    client.on_connect = on_connect
    client.connect(broker, port)
    return client

def subscribe(client: mqtt_client):
    def on_message(client, userdata, msg):
        # print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
        parse_mqttmsg(msg)

    client.subscribe(topic)
    client.on_message = on_message

def run():
    client = connect_mqtt()
    subscribe(client)
    client.loop_forever()

""" draw figures """
def draw_figure():
    plt.ion()  # 开启一个画图的窗口进入交互模式,用于实时更新数据
    plt.rcParams['figure.figsize'] = (10, 10)  # 图像显示大小
    plt.rcParams['font.sans-serif'] = ['SimHei']  # 防止中文标签乱码,还有通过导入字体文件的方法
    plt.rcParams['axes.unicode_minus'] = False
    plt.rcParams['lines.linewidth'] = 0.5  # 设置曲线线条宽度

    count = 0
    while True:
        plt.clf()  # 清除刷新前的图表,防止数据量过大消耗内存
        plt.suptitle("总标题", fontsize=30)  # 添加总标题,并设置文字大小
        plt.tight_layout()

        # 图表1
        agraphic = plt.subplot(2, 1, 1)
        agraphic.set_title('子图表标题1')  # 添加子标题
        agraphic.set_xlabel('x轴', fontsize=10)  # 添加轴标签
        agraphic.set_ylabel('y轴', fontsize=20)
        plt.plot(x_num[1:][-show_num:], acc1[-show_num:], 'g-')
        try:
            xtricks = list(range(len(acc1) - show_num, len(acc1), 10))  # **1**
            xlabels = [stime[i] for i in xtricks]  # **2**
            plt.xticks(xtricks, xlabels, rotation=15)
        except:
            pass

        # 图表2
        bgraghic = plt.subplot(2, 1, 2)
        bgraghic.set_title('子图表标题2')
        bgraghic.set_xlabel('x轴', fontsize=10)  # 添加轴标签
        bgraghic.set_ylabel('y轴', fontsize=20)
        bgraghic.plot(x_num[1:][-show_num:], acc2[-show_num:], 'r^')

        plt.pause(0.001)  # 设置暂停时间,太快图表无法正常显示
        count = count + 1

if __name__ == '__main__':
    # 多线程
    _thread.start_new_thread(run, ())
    draw_figure()

到此这篇关于Python中使用matplotlib绘制mqtt数据实时图像的文章就介绍到这了,更多相关matplotlib绘制mqtt数据实时图像内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • python使用matplotlib显示图像失真的解决方案

    在python显示图象时,我们用matplotlib模块时会遇到图像色彩失真问题,究竟是什么原因呢,下面就来看看究竟. 待显示图像为: import cv2 from matplotlib import pyplot as plt img = cv2.imread('demo_2.jpg',0) plt.imshow(img, cmap = 'gray', interpolation = 'bicubic') plt.xticks([]), plt.yticks([]) # to hide ti

  • Python 用matplotlib画以时间日期为x轴的图像

    1.效果展示 主要效果就是,x轴 显示时间单位. 下图展示的就是想要到达的效果. 其实主要是运用了datetime.date这个类型的变量作为x轴坐标的数据输入. 2. 源码 将data.txt中的数据读入,用matplotlib中的pyplot画出,x轴为时间. 数据文本 data.txt,除了第一行表头外,每一列都用制表符Tab(\t)隔开. 原创 粉丝 喜欢 评论 等级 访问 积分 排名 2018/06/01 69 134 266 64 5 309132 3345 12956 2018/0

  • Python使用matplotlib模块绘制图像并设置标题与坐标轴等信息示例

    本文实例讲述了Python使用matplotlib模块绘制图像并设置标题与坐标轴等信息.分享给大家供大家参考,具体如下: 进行图像绘制有时候需要设定坐标轴以及图像标题等信息,示例代码如下: #-*- coding: utf-8 -*- #!/usr/bin/python import matplotlib.pyplot as plt from numpy.random import randn x = range(100) y = randn(100) fig = plt.figure() ax

  • python matplotlib包图像配色方案分享

    可选的配色方案: Accent, Accent_r, Blues, Blues_r, BrBG, BrBG_r, BuGn, BuGn_r, BuPu, BuPu_r, CMRmap, CMRmap_r, Dark2, Dark2_r, GnBu, GnBu_r, Greens, Greens_r, Greys, Greys_r, OrRd, OrRd_r, Oranges, Oranges_r, PRGn, PRGn_r, Paired, Paired_r, Pastel1, Pastel1_

  • python/Matplotlib绘制复变函数图像教程

    今天发现sympy依赖的库mpmath里也有很多数学函数,其中也有在复平面绘制二维图的函数cplot,具体例子如下 from mpmath import * def f1(z): return z def f2(z): return z**3 def f3(z): return (z**4-1)**(1/4) def f4(z): return 1/z def f5(z): return atan(z) def f6(z): return sqrt(z) cplot(f1) cplot(f2)

  • Python matplotlib画图时图例说明(legend)放到图像外侧详解

    用python的matplotlib画图时,往往需要加图例说明.如果不设置任何参数,默认是加到图像的内侧的最佳位置. import matplotlib.pyplot as plt import numpy as np x = np.arange(10) fig = plt.figure() ax = plt.subplot(111) for i in xrange(5): ax.plot(x, i * x, label='$y = %ix$' % i) plt.legend() plt.sho

  • Python中使用matplotlib绘制mqtt数据实时图像功能

    目录 效果图 mqtt发布 mqtt订阅 matplotlib绘制动态图 matplotlib绘制mqtt数据实时图像 效果图 mqtt发布 本代码中publish是一个死循环,数据一直往外发送. import random import time from paho.mqtt import client as mqtt_client import json from datetime import datetime broker = 'broker.emqx.io' port = 1883 t

  • 用python中的matplotlib绘制方程图像代码

    import numpy as np import matplotlib.pyplot as plt def main(): # 设置x和y的坐标范围 x=np.arange(-2,2,0.01) y=np.arange(-2,2,0.01) # 转化为网格 x,y=np.meshgrid(x,y) z=np.power(x,2)+np.power(y,2)-1 plt.contour(x,y,z,0) plt.show() main() 绘制的时候要保证x,y,z的维度相同 结果如下: 以上这

  • Python中使用matplotlib模块errorbar函数绘制误差棒图实例代码

    目录 1.基本参数 2.代码实现 3.结果显示 4.更多参数请参考matplotlib官网 总结 Python的matplotlib模块中的errorbar函数可以绘制误差棒图,本次主要绘制不带折线的误差棒图. 1.基本参数 errorbar函数的基本参数主要有: x,y:主要定于二维数据的横纵坐标值 yerr :定义y轴方向的误差棒的大小,可以是一个数,也可以是二维数组(分别传递平均值与最小值的差和最大值与平均值的差). xerr:定义y轴方向的误差棒的大小,同样也可以是一个数,也可以是二维数

  • Python实现在tkinter中使用matplotlib绘制图形的方法示例

    本文实例讲述了Python实现在tkinter中使用matplotlib绘制图形的方法.分享给大家供大家参考,具体如下: 一. 代码: # coding=utf-8 import sys import Tkinter as Tk import matplotlib from numpy import arange, sin, pi from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg,NavigationToolbar2T

  • python学习之matplotlib绘制散点图实例

    要绘制单个点,可使用函数scatter(),并向其传递一对x和y坐标,它将在指定位置绘制一个点: """使用scatter()绘制散点图""" import matplotlib.pyplot as plt plt.scatter(2, 4) plt.show() 下面来设置输出的样式:添加标题,给轴加上标签,并确保所有文本都大到能够看清.并使用scatter()绘制一系列点 """使用scatter()绘制散点图&

  • Python中ROC曲线绘制

    首先以支持向量机模型为例 先导入需要使用的包,我们将使用roc_curve这个函数绘制ROC曲线! from sklearn.svm import SVC from sklearn.metrics import roc_curve from sklearn.datasets import make_blobs from sklearn. model_selection import train_test_split import matplotlib.pyplot as plt %matplot

  • Python中的图形绘制简单动画实操

    目录 前言: 1.画螺旋曲线代码 2.输出​​ 3​.代码的部分解释 前言: Matplotlib 是一个非常广泛的库,它也支持图形动画. 动画工具以 matplotlib.animation 基类为中心,它提供了一个框架,围绕该框架构建动画功能. 主要接口有TimedAnimation和FuncAnimation,两者中FuncAnimation是最方便使用的. 1.画螺旋曲线代码 import matplotlib.pyplot as plt import matplotlib.animat

  • 如何在Python中利用matplotlib.pyplot画出函数图详解

    目录 0.引言 1.绘图 (1)导入所需库 (2)设置函数 (3)plt.figure() (4)plt.plot(),plt.axhline(),plt.axvline(),plt.axhspan(),plt.axvspan() (5)设置 x,y 轴的数值范围 (6)设置 x,y 轴的标题文本 (7)设置图例和标题 (8)plt.show() 2运行结果 总结 0.引言 为了让用户能够使用python时,方便地绘制 2D 图表,PYTHON的模块中提供Matplotlib模块中所含的子库py

  • 在python中,使用scatter绘制散点图的实例

    如下所示: # coding=utf-8 import matplotlib.pyplot as plt x_values=[1,2,3,4,5] y_values=[1,4,9,16,25] # s为点的大小 plt.scatter(x_values,y_values,s=100) # 设置图表标题并给坐标轴加上标签 plt.title("Scatter pic",fontsize=24) plt.xlabel("Value",fontsize=14) plt.y

  • Python 中pandas索引切片读取数据缺失数据处理问题

    引入 numpy已经能够帮助我们处理数据,能够结合matplotlib解决我们数据分析的问题,那么pandas学习的目的在什么地方呢? numpy能够帮我们处理处理数值型数据,但是这还不够 很多时候,我们的数据除了数值之外,还有字符串,还有时间序列等 比如:我们通过爬虫获取到了存储在数据库中的数据 比如:之前youtube的例子中除了数值之外还有国家的信息,视频的分类(tag)信息,标题信息等 所以,numpy能够帮助我们处理数值,但是pandas除了处理数值之外(基于numpy),还能够帮助我

随机推荐