Python调用飞书发送消息的示例

一、创建飞书机器人  

  自定义飞书机器人操作步骤,具体详见飞书官方文档:《机器人 | 如何在群聊中使用机器人?》

二、调用飞书发送消息

  自定义机器人添加完成后,就能向其 webhook 地址发送 POST 请求,从而在群聊中推送消息了。支持推送的消息格式有文本、富文本、图片消息,也可以分享群名片等。

  参数msg_type代表消息类型,可传入:text(文本)/ post(富文本)/ image(图片)/ share_chat(分享群名片)/ interactive(消息卡片),可参照飞书接口文档:https://open.feishu.cn/document/ukTMukTMukTM/uUjNz4SN2MjL1YzM
发送文本消息
  请求的消息体示例:

{
"open_id":"ou_5ad573a6411d72b8305fda3a9c15c70e",
"root_id":"om_40eb06e7b84dc71c03e009ad3c754195",
"chat_id":"oc_5ad11d72b830411d72b836c20",
"user_id": "92e39a99",
"email":"fanlv@gmail.com",
"msg_type":"text",
"content":{
"text":"text content<at user_id=\"ou_88a56e7e8e9f680b682f6905cc09098e\">test</at>"
}
}

Curl 请求 Demo

curl -X POST \
 https://open.feishu.cn/open-apis/message/v4/send/ \
 -H 'Authorization: Bearer t-fee42159a366c575f2cd2b2acde2ed1e94c89d5f' \
 -H 'Content-Type: application/json' \
 -d '{
  "chat_id": "oc_f5b1a7eb27ae2c7b6adc2a74faf339ff",
  "msg_type": "text",
  "content": {
    "text": "text content<at user_id=\"ou_88a56e7e8e9f680b682f6905cc09098e\">test</at>"
  }
}'

使用Python封装飞书请求

接下来我们以发送文本格式消息类型,进行以下封装,上代码:

# -*- coding:utf-8 -*-
'''
@File   :  feiShuTalk.py
@Time   :  2020/11/9 11:45
@Author  :  DY
@Version  :  V1.0.0
@Desciption:
'''

import requests
import json
import logging
import time
import urllib
import urllib3
urllib3.disable_warnings()

try:
  JSONDecodeError = json.decoder.JSONDecodeError
except AttributeError:
  JSONDecodeError = ValueError

def is_not_null_and_blank_str(content):
  """
  非空字符串
  :param content: 字符串
  :return: 非空 - True,空 - False
  """
  if content and content.strip():
    return True
  else:
    return False

class FeiShutalkChatbot(object):

  def __init__(self, webhook, secret=None, pc_slide=False, fail_notice=False):
    '''
    机器人初始化
    :param webhook: 飞书群自定义机器人webhook地址
    :param secret: 机器人安全设置页面勾选“加签”时需要传入的密钥
    :param pc_slide: 消息链接打开方式,默认False为浏览器打开,设置为True时为PC端侧边栏打开
    :param fail_notice: 消息发送失败提醒,默认为False不提醒,开发者可以根据返回的消息发送结果自行判断和处理
    '''
    super(FeiShutalkChatbot, self).__init__()
    self.headers = {'Content-Type': 'application/json; charset=utf-8'}
    self.webhook = webhook
    self.secret = secret
    self.pc_slide = pc_slide
    self.fail_notice = fail_notice

  def send_text(self, msg, open_id=[]):
    """
    消息类型为text类型
    :param msg: 消息内容
    :return: 返回消息发送结果
    """
    data = {"msg_type": "text", "at": {}}
    if is_not_null_and_blank_str(msg):  # 传入msg非空
      data["content"] = {"text": msg}
    else:
      logging.error("text类型,消息内容不能为空!")
      raise ValueError("text类型,消息内容不能为空!")

    logging.debug('text类型:%s' % data)
    return self.post(data)

  def post(self, data):
    """
    发送消息(内容UTF-8编码)
    :param data: 消息数据(字典)
    :return: 返回消息发送结果
    """
    try:
      post_data = json.dumps(data)
      response = requests.post(self.webhook, headers=self.headers, data=post_data, verify=False)
    except requests.exceptions.HTTPError as exc:
      logging.error("消息发送失败, HTTP error: %d, reason: %s" % (exc.response.status_code, exc.response.reason))
      raise
    except requests.exceptions.ConnectionError:
      logging.error("消息发送失败,HTTP connection error!")
      raise
    except requests.exceptions.Timeout:
      logging.error("消息发送失败,Timeout error!")
      raise
    except requests.exceptions.RequestException:
      logging.error("消息发送失败, Request Exception!")
      raise
    else:
      try:
        result = response.json()
      except JSONDecodeError:
        logging.error("服务器响应异常,状态码:%s,响应内容:%s" % (response.status_code, response.text))
        return {'errcode': 500, 'errmsg': '服务器响应异常'}
      else:
        logging.debug('发送结果:%s' % result)
        # 消息发送失败提醒(errcode 不为 0,表示消息发送异常),默认不提醒,开发者可以根据返回的消息发送结果自行判断和处理
        if self.fail_notice and result.get('errcode', True):
          time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))
          error_data = {
            "msgtype": "text",
            "text": {
              "content": "[注意-自动通知]飞书机器人消息发送失败,时间:%s,原因:%s,请及时跟进,谢谢!" % (
                time_now, result['errmsg'] if result.get('errmsg', False) else '未知异常')
            },
            "at": {
              "isAtAll": False
            }
          }
          logging.error("消息发送失败,自动通知:%s" % error_data)
          requests.post(self.webhook, headers=self.headers, data=json.dumps(error_data))
        return result

  封装后我们就可以直接调用封装的类,进行消息代码发送;执行以下代码后,就可以使用飞书发送消息咯,是不是很简单。

  webhook = "https://open.feishu.cn/open-apis/bot/v2/hook/1d7b5d0c-03a5-44a9-8d7a-4d09b24bfea1"
  feishu = FeiShutalkChatbot(webhook)
  feishu.send_text("重庆百货-新世纪鱼胡路店内商品'1000800370-牛心白 约1kg'在商详[8]和榜单[7]中排名不一致")

以上就是Python调用飞书发送消息的示例的详细内容,更多关于python 飞书发送信息的资料请关注我们其它相关文章!

(0)

相关推荐

  • python模仿网页版微信发送消息功能

    这个微信版网页版虽然繁琐,但是不是很难,全程不带加密的.有兴趣的可以试着玩一玩,如果有兴趣的话,可以完善一下,做一些比较有意思的东西. 开发环境:Windows10 开发语言:Python3.6 开发工具:pycharm 抓包工具:fiddler 抓的包如下: import requests import time import re from bs4 import BeautifulSoup import json import random from copyheaders import h

  • 使用python3调用wxpy模块监控linux日志并定时发送消息给群组或好友

    使用python3调用wxpy模块,监控linux日志并定时发送消息给群组或好友,具体代码如下所示: #!/usr/bin/env python3 # -*- coding: utf-8 -*- from __future__ import unicode_literals from threading import Timer from wxpy import * import requests import subprocess import time from PIL import Ima

  • Python 网络编程起步(Socket发送消息)

    一.服务端(Server.py)    服务端要做的事情是:    1. 创建一个Socket对象 Code highlighting produced by Actipro CodeHighlighter (freeware) http://www.CodeHighlighter.com/ -->import sockets = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)    2. 绑定一个端口 Code highlighting pro

  • Python定时发送消息的脚本:每天跟你女朋友说晚安

    首先 你要有个女朋友 效果: 需要安装几个包 pip install wxpy pip install wechat_sender pip install requests 代码如下: from __future__ import unicode_literals from threading import Timer from wxpy import * from wechat_sender import Sender import time,requests bot = Bot(consol

  • python实现的自动发送消息功能详解

    本文实例讲述了python实现的自动发送消息功能.分享给大家供大家参考,具体如下: 一个简单的脚本 #-*- coding:utf-8 -*- from __future__ import unicode_literals from threading import Timer import itchat import requests # 抓取金山毒霸每日一句,英文和翻译 def get_news(): url = "http://open.iciba.com/dsapi/" r =

  • python3+pyqt5+itchat微信定时发送消息的方法

    编这个程序是想过节过年,一些重要的纪念日,给亲戚好友发祝福之类的,但要凌晨0点才显得比较有诚意,可我又比较贪睡,常常忘了,所以就有了编个微信定时发送消息小程序. 运行环境: python 3.x,不支持python2 准备工作 由于我用到了微信的接口,所以引入itchat 界面用了pyqt5 安装命令如下: pip install PyQt5 pip install itchat 代码部分 # -*- coding: utf-8 -*- # @Time : 2018/9/25 11:06 # @

  • python实现给微信公众号发送消息的方法

    本文实例讲述了python实现给微信公众号发送消息的方法.分享给大家供大家参考,具体如下: 现在通过发微信公众号信息来做消息通知和告警已经很普遍了.最常见的就是运维通过zabbix调用shell脚本给微信发消息,起到告警的作用.当要发送的信息较多,而且希望按照指定格式显示的好看一点的时候,shell处理起来,个人感觉不太方便.于是我用Python重写了发微信的功能. #coding:utf-8 import urllib2 import json import sys def getMsg():

  • python实现给微信指定好友定时发送消息

    python有很多有趣的库,其中wxpy是连接微信的接口,具体可以查看官方文档.可以实现自动操作,wxpy 支持 Python 3.4-3.6,以及 2.7 版本. 一.安装 win10环境,直接在cmd中,输入 pip install wxpy 有时网络不稳定,可能出现错误,重新执行操作尝试一下. 二.简单介绍 # 导入模块 from wxpy import * # 初始化机器人,扫码登陆 bot = Bot() # 搜索名称含有 "游否" 的男性深圳好友 my_friend = b

  • Python微信企业号开发之回调模式接收微信端客户端发送消息及被动返回消息示例

    本文实例讲述了Python微信企业号开发之回调模式接收微信端客户端发送消息及被动返回消息.分享给大家供大家参考,具体如下: 说明:此代码用于接收手机微信端发送的消息 #-*- coding:utf-8 -*- from flask import Flask,request from WXBizMsgCrypt import WXBizMsgCrypt import xml.etree.cElementTree as ET import sys app = Flask(__name__) @app

  • python实现微信定时每天和女友发送消息

    但凡有些事情重复时,我就在想怎么可以用程序来自动化.这里想分享如何每天给女友定时微信发送"晚安",如果只是晚安,就略显单调,于是爬取金山词霸每日一句,英文和翻译,借此设定定时器进行发送. 准备: pip install wxpy pip install requests 实现代码: from __future__ import unicode_literals from threading import Timer from wxpy import * import requests

随机推荐