python利用微信公众号实现报警功能

微信公众号共有三种,服务号、订阅号、企业号。它们在获取AccessToken上各有不同。

其中订阅号比较坑,它的AccessToken是需定时刷新,重复获取将导致上次获取的AccessToken失效。

而企业号就比较好,AccessToken有效期同样为7200秒,但有效期内重复获取返回相同结果。

为兼容这两种方式,因此按照订阅号的方式处理。

 处理办法与接口文档中的要求相同:

为了保密appsecrect,第三方需要一个access_token获取和刷新的中控服务器。

而其他业务逻辑服务器所使用的access_token均来自于该中控服务器,不应该各自去刷新,否则会造成access_token覆盖而影响业务。

下面的代码以企业号为例,将access_token储存在sqlite3数据库中,相比储存在文本中,放在数

据库里,可以为后期存放其他数据提供向后兼容。如果放在文本中,则不如放在数据库中灵活。

设计思路和使用方法:

自动创建sqlite3数据库,包括表结构和数据,并能在数据库表结构不存在或者数据不存在或遭删除的情况下,创建新的可用的数据

尽可能的保证Class中每一个可执行的函数单独调用都能成功。

Class中只将真正能被用到的方法和变量设置为public的。

使用时只需要修改此文件中的weixin_qy_CorpID和weixin_qy_Secret改成自己的,并import此文件,使

用WeiXinTokenClass().get()方法即可得到access_token。

#!/usr/bin/python
# encoding: utf-8
# -*- coding: utf8 -*-
#Python学习群125240963每天更新资料,包括2018最新企业级项目案例,同千人一起交流。
import os
import sqlite3
import sys
import urllib
import urllib2
import json
import datetime
# import time
enable_debug = True
def debug(msg, code=None):
  if enable_debug:
    if code is None:
      print "message: %s" % msg
    else:
      print "message: %s, code: %s " % (msg, code)
AUTHOR_MAIL = "uberurey_ups@163.com"
weixin_qy_CorpID = "your_corpid"
weixin_qy_Secret = "your_secret"
# Build paths inside the project like this: os.path.join(BASE_DIR, ...)
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
# Database
# https://docs.djangoproject.com/en/1.9/ref/settings/#databases
DATABASES = {
  'default': {
    'ENGINE': 'db.backends.sqlite3',
    'NAME': os.path.join(BASE_DIR, '.odbp_db.sqlite3'),
  }
}
sqlite3_db_file = str(DATABASES['default']['NAME'])
def sqlite3_conn(database):
  try:
    conn = sqlite3.connect(database)
  except sqlite3.Error:
    print >> sys.stderr, """\
  There was a problem connecting to Database:
    %s
  The error leading to this problem was:
    %s
  It's possible that this database is broken or permission denied.
  If you cannot solve this problem yourself, please mail to:
    %s
  """ % (database, sys.exc_value, AUTHOR_MAIL)
    sys.exit(1)
  else:
    return conn
def sqlite3_commit(conn):
  return conn.commit()
def sqlite3_close(conn):
  return conn.close()
def sqlite3_execute(database, sql):
  try:
    sql_conn = sqlite3_conn(database)
    sql_cursor = sql_conn.cursor()
    sql_cursor.execute(sql)
    sql_conn.commit()
    sql_conn.close()
  except sqlite3.Error as e:
    print e
    sys.exit(1)
def sqlite3_create_table_token():
  sql_conn = sqlite3_conn(sqlite3_db_file)
  sql_cursor = sql_conn.cursor()
  sql_cursor.execute('''CREATE TABLE "main"."weixin_token" (
        "id" INTEGER ,
        "access_token" TEXT,
        "expires_in" TEXT,
        "expires_on" TEXT,
        "is_expired" INTEGER
        )
        ;
  ''')
  sqlite3_commit(sql_conn)
  sqlite3_close(sql_conn)
def sqlite3_create_table_account():
  sql_conn = sqlite3_conn(sqlite3_db_file)
  sql_cursor = sql_conn.cursor()
  sql_cursor.execute('''CREATE TABLE "main"."weixin_account" (
        "id" INTEGER,
        "name" TEXT,
        "corpid" TEXT,
        "secret" TEXT,
        "current" INTEGER
        )
        ;
  ''')
  sqlite3_commit(sql_conn)
  sqlite3_close(sql_conn)
def sqlite3_create_tables():
  print "sqlite3_create_tables"
  sql_conn = sqlite3_conn(sqlite3_db_file)
  sql_cursor = sql_conn.cursor()
  sql_cursor.execute('''CREATE TABLE "main"."weixin_token" (
        "id" INTEGER ,
        "access_token" TEXT,
        "expires_in" TEXT,
        "expires_on" TEXT,
        "is_expired" INTEGER
        )
        ;
  ''')
  sql_cursor.execute('''CREATE TABLE "main"."weixin_account" (
        "id" INTEGER,
        "name" TEXT,
        "corpid" TEXT,
        "secret" TEXT,
        "current" INTEGER
        )
        ;
  ''')
  sqlite3_commit(sql_conn)
  sqlite3_close(sql_conn)
def sqlite3_set_credential(corpid, secret):
  try:
    sql_conn = sqlite3_conn(sqlite3_db_file)
    sql_cursor = sql_conn.cursor()
    sql_cursor.execute('''INSERT INTO "weixin_account" ("id", "name", "corpid", "secret", "current") VALUES
                (1,
                'odbp',
                ?,
                ?,
                1)
''', (corpid, secret))
    sqlite3_commit(sql_conn)
    sqlite3_close(sql_conn)
  except sqlite3.Error:
    sqlite3_create_table_account()
    sqlite3_set_credential(corpid, secret)
def sqlite3_set_token(access_token, expires_in, expires_on, is_expired):
  try:
    sql_conn = sqlite3_conn(sqlite3_db_file)
    sql_cursor = sql_conn.cursor()
    sql_cursor.execute('''INSERT INTO "weixin_token"
               ("id", "access_token", "expires_in", "expires_on", "is_expired") VALUES
               (
               1,
               ?,
               ?,
               ?,
               ?
               )
''', (access_token, expires_in, expires_on, is_expired))
    sqlite3_commit(sql_conn)
    sqlite3_close(sql_conn)
  except sqlite3.Error:
    sqlite3_create_table_token()
    sqlite3_set_token(access_token, expires_in, expires_on, is_expired)
def sqlite3_get_credential():
  try:
    sql_conn = sqlite3_conn(sqlite3_db_file)
    sql_cursor = sql_conn.cursor()
    credential = sql_cursor.execute('''SELECT "corpid", "secret" FROM weixin_account WHERE current == 1;''')
    result = credential.fetchall()
    sqlite3_close(sql_conn)
  except sqlite3.Error:
    sqlite3_set_credential(weixin_qy_CorpID, weixin_qy_Secret)
    return sqlite3_get_credential()
  else:
    if result is not None and len(result) != 0:
      return result
    else:
      print "unrecoverable problem, please alter to %s" % AUTHOR_MAIL
      sys.exit(1)
def sqlite3_get_token():
  try:
    sql_conn = sqlite3_conn(sqlite3_db_file)
    sql_cursor = sql_conn.cursor()
    credential = sql_cursor.execute(
      '''SELECT "access_token", "expires_on" FROM weixin_token WHERE "is_expired" == 1 ;''')
    result = credential.fetchall()
    sqlite3_close(sql_conn)
  except sqlite3.Error:
    info = sys.exc_info()
    print info[0], ":", info[1]
  else:
    if result is not None and len(result) != 0:
      return result
    else:
      # print "unrecoverable problem, please alter to %s" % AUTHOR_MAIL
      # sys.exit(1)
      return None
def sqlite3_update_token(access_token, expires_on):
  sql_conn = sqlite3_conn(sqlite3_db_file)
  sql_cursor = sql_conn.cursor()
  sql_cursor.execute('''UPDATE "weixin_token" SET
             access_token=?,
             expires_on=?
             WHERE _ROWID_ = 1;''', (access_token, expires_on)
            )
  sqlite3_commit(sql_conn)
  sqlite3_close(sql_conn)
class WeiXinTokenClass(object):
  def __init__(self):
    self.__corpid = None
    self.__corpsecret = None
    self.__use_persistence = True
    self.__access_token = None
    self.__expires_in = None
    self.__expires_on = None
    self.__is_expired = None
    if self.__use_persistence:
      self.__corpid = sqlite3_get_credential()[0][0]
      self.__corpsecret = sqlite3_get_credential()[0][1]
    else:
      self.__corpid = weixin_qy_CorpID
      self.__corpsecret = weixin_qy_Secret
  def __get_token_from_weixin_qy_api(self):
    parameters = {
      "corpid": self.__corpid,
      "corpsecret": self.__corpsecret
    }
    url_parameters = urllib.urlencode(parameters)
    token_url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken?"
    url = token_url + url_parameters
    response = urllib2.urlopen(url)
    result = response.read()
    token_json = json.loads(result)
    if token_json['access_token'] is not None:
      get_time_now = datetime.datetime.now()
      # TODO(Guodong Ding) token will expired ahead of time or not expired after the time
      expire_time = get_time_now + datetime.timedelta(seconds=token_json['expires_in'])
      token_json['expires_on'] = str(expire_time)
      self.__access_token = token_json['access_token']
      self.__expires_in = token_json['expires_in']
      self.__expires_on = token_json['expires_on']
      self.__is_expired = 1
      try:
        token_result_set = sqlite3_get_token()
      except sqlite3.Error:
        token_result_set = None
      if token_result_set is None and len(token_result_set) == 0:
        sqlite3_set_token(self.__access_token, self.__expires_in, self.__expires_on, self.__is_expired)
      else:
        if self.__is_token_expired() is True:
          sqlite3_update_token(self.__access_token, self.__expires_on)
        else:
          debug("pass")
          return
    else:
      if token_json['errcode'] is not None:
        print "errcode is: %s" % token_json['errcode']
        print "errmsg is: %s" % token_json['errmsg']
      else:
        print result
  def __get_token_from_persistence_storage(self):
    try:
      token_result_set = sqlite3_get_token()
    except sqlite3.Error:
      self.__get_token_from_weixin_qy_api()
    finally:
      if token_result_set is None:
        self.__get_token_from_weixin_qy_api()
        token_result_set = sqlite3_get_token()
        access_token = token_result_set[0][0]
        expire_time = token_result_set[0][1]
      else:
        access_token = token_result_set[0][0]
        expire_time = token_result_set[0][1]
    expire_time = datetime.datetime.strptime(expire_time, '%Y-%m-%d %H:%M:%S.%f')
    now_time = datetime.datetime.now()
    if now_time < expire_time:
      # print "The token is %s" % access_token
      # print "The token will expire on %s" % expire_time
      return access_token
    else:
      self.__get_token_from_weixin_qy_api()
      return self.__get_token_from_persistence_storage()
  @staticmethod
  def __is_token_expired():
    try:
      token_result_set = sqlite3_get_token()
    except sqlite3.Error as e:
      print e
      sys.exit(1)
    expire_time = token_result_set[0][1]
    expire_time = datetime.datetime.strptime(expire_time, '%Y-%m-%d %H:%M:%S.%f')
    now_time = datetime.datetime.now()
    if now_time < expire_time:
      return False
    else:
      return True
  def get(self):
    return self.__get_token_from_persistence_storage()

Python实现通过微信企业号发送文本消息的Class

编程要点和调用方法:

支持发送中文,核心语句“payload = json.dumps(self.data, encoding='utf-8', ensure_ascii=False)”,关键字“python json 中文”

这个Class只有一个公共方法send()。

使用方法:import这个class,然后调用send方法即可,方法参数只需要两个,给谁(多UserID用"|"隔开),内容是什么,例如:

import odbp_sendMessage
msg = odbp_sendMessage.WeiXinSendMsgClass()
msg.send("dingguodong", "Python 大魔王!")
#!/usr/bin/python
# encoding: utf-8
# -*- coding: utf8 -*-
"""
Created by PyCharm.
File:        LinuxBashShellScriptForOps:odbp_sendMessage.py
User:        Guodong
Create Date:    2016/8/12
Create Time:    14:49
 """
import odbp_getToken
class WeiXinSendMsgClass(object):
  def __init__(self):
    self.access_token = odbp_getToken.WeiXinTokenClass().get()
    self.to_user = ""
    self.to_party = ""
    self.to_tag = ""
    self.msg_type = "text"
    self.agent_id = 2
    self.content = ""
    self.safe = 0
    self.data = {
      "touser": self.to_user,
      "toparty": self.to_party,
      "totag": self.to_tag,
      "msgtype": self.msg_type,
      "agentid": self.agent_id,
      "text": {
        "content": self.content
      },
      "safe": self.safe
    }
  def send(self, to_user, content):
    if to_user is not None and content is not None:
      self.data['touser'] = to_user
      self.data['text']['content'] = content
    else:
      print
      raise RuntimeError
    import requests
    import json
    url = "https://qyapi.weixin.qq.com/cgi-bin/message/send"
    querystring = {"access_token": self.access_token}
    payload = json.dumps(self.data, encoding='utf-8', ensure_ascii=False)
    headers = {
      'content-type': "application/json",
      'cache-control': "no-cache",
    }
    response = requests.request("POST", url, data=payload, headers=headers, params=querystring)
    return_content = json.loads(response.content)
    if return_content["errcode"] == 0 and return_content["errmsg"] == "ok":
      print "Send successfully! %s " % return_content
    else:
      print "Send failed! %s " % return_content

python调用mongodb发送微信企业号

python2.x

注意:data变量里, agent_id为刚刚创建的应用id(可在web页面看到)

toparty即为目标部门,或者可以用touser,totag指定目标账户

比较简单的调用,已实测,可以使用。

#coding:utf-8
import sys
import requests
import json
from pymongo import MongoClient
reload(sys)
sys.setdefaultencoding('utf-8')
class Weixin(object):
  def __init__(self, corp_id, corp_secret):
    self.token_url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=%s&corpsecret=%s' %(corp_id, corp_secret)
    self.send_url = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token='
  def get_token(self):
    try:
      r = requests.get(self.token_url, timeout=10)
    except Exception as e:
      print e
      sys.exit(1)
    if r.status_code == requests.codes.ok:
      data = r.json()
      if data.get('errcode'):
        print data['errmsg']
        sys.exit(1)
      return data['access_token']
    else:
      print r.status_code
      sys.exit(1)
  def send(self,message):
    url = self.send_url + self.get_token()
    data = {
      "touser": "hequan2011",
      "msgtype": "text",
      "agentid": "0",
      "text": {
        "content": message
      },
      "safe":"0"
    }
    send_data = json.dumps(data,ensure_ascii=False)
    try:
      r = requests.post(url, send_data)
    except Exception, e:
      print e
      sys.exit(1)
    if r.status_code == requests.codes.ok:
      print r.json()
    else:
      print r.code
      sys.exit(1)
corpid = 'xxxxxxxxxxx'
corpsecret = 'xxxxxxxxxxxxxxxxx'
client = MongoClient('mongodb://user:password@127.0.0.1:27017/')
db = client.ku
collection = db.biao
a = []
for data in collection.find():
  a.append(data)
l = a[0]
g = l
z = str(g["name"])
z1 = int(g["jg"])
print z
msg = "1:{0}\n 2:{1}\n".format(z,z1)
w = Weixin(corpid,corpsecret)
w.send(msg)

ZABBIX 微信报警 插件

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# __author__ = '懒懒的天空'
import requests
import sys
import json
from conf.INIFILES import read_config, write_config
import os
import datetime
from conf.BLog import Log
reload(sys)
sys.setdefaultencoding('utf-8')
class WeiXin(object):
  def __init__(self, corpid, corpsecret): # 初始化的时候需要获取corpid和corpsecret,需要从管理后台获取
    self.__params = {
      'corpid': corpid,
      'corpsecret': corpsecret
    }
    self.url_get_token = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken'
    self.url_send = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?'
    self.__token = self.__get_token()
    self.__token_params = {
      'access_token': self.__token
    }
  def __raise_error(self, res):
    raise Exception('error code: %s,error message: %s' % (res.json()['errcode'], res.json()['errmsg']))
    global senderr
    sendstatus = False
    senderr = 'error code: %s,error message: %s' % (res.json()['errcode'], res.json()['errmsg'])
  def __get_token(self):
    # print self.url_get_token
    headers = {'content-type': 'application/json'}
    res = requests.get(self.url_get_token, headers=headers, params=self.__params)
    try:
      return res.json()['access_token']
    except:
      self.__raise_error(res.content)
  def send_message(self, agentid, messages, userid='', toparty=''):
    payload = {
      'touser': userid,
      'toparty': toparty,
      'agentid': agentid,
      "msgtype": "news",
      "news": messages
    }
    headers = {'content-type': 'application/json'}
    data = json.dumps(payload, ensure_ascii=False).encode('utf-8')
    params = self.__token_params
    res = requests.post(self.url_send, headers=headers, params=params, data=data)
    try:
      return res.json()
    except:
      self.__raise_error(res)
def main(send_to, subject, content):
  try:
    global sendstatus
    global senderr
    data = ''
    messages = {}
    body = {}
    config_file_path = get_path()
    CorpID = read_config(config_file_path, 'wei', "CorpID")
    CorpSecret = read_config(config_file_path, 'wei', "CorpSecret")
    agentid = read_config(config_file_path, 'wei', "agentid")
    web = read_config(config_file_path, 'wei', "web")
    content = json.loads(content)
    messages["message_url"] = web
    body["url"] = web + "history.php?action=showgraph&itemids[]=" + content[u'监控ID']
    warn_message = ''
    if content[u'当前状态'] == 'PROBLEM':
      body["title"] = "服务器故障"
      warn_message += subject + '\n'
      warn_message += '详情:\n'
      warn_message += '告警等级:'+ content[u'告警等级'] + '\n'
      warn_message += '告警时间:'+ content[u'告警时间'] + '\n'
      warn_message += '告警地址:'+ content[u'告警地址'] + '\n'
      warn_message += '持续时间:'+ content[u'持续时间'] + '\n'
      warn_message += '监控项目:'+ content[u'监控项目'] + '\n'
      warn_message += content[u'告警主机'] + '故障(' + content[u'事件ID']+ ')'
    else:
      body["title"] = "服务器恢复"
      warn_message += subject + '\n'
      warn_message += '详情:\n'
      warn_message += '告警等级:'+ content[u'告警等级'] + '\n'
      warn_message += '恢复时间:'+ content[u'恢复时间'] + '\n'
      warn_message += '告警地址:'+ content[u'告警地址'] + '\n'
      warn_message += '持续时间:'+ content[u'持续时间'] + '\n'
      warn_message += '监控项目:'+ content[u'监控项目'] + '\n'
      warn_message += content[u'告警主机'] + '恢复(' + content[u'事件ID']+ ')'
    body['description'] = warn_message
    data = []
    data.append(body)
    messages['articles'] = data
    wx = WeiXin(CorpID, CorpSecret)
    data = wx.send_message(toparty=send_to, agentid=agentid, messages=messages)
    sendstatus = True
  except Exception, e:
    senderr = str(e)
    sendstatus = False
  logwrite(sendstatus, data)
def get_path():
  path = os.path.dirname(os.path.abspath(sys.argv[0]))
  config_path = path + '/config.ini'
  return config_path
def logwrite(sendstatus, content):
  logpath = '/var/log/zabbix/weixin'
  if not sendstatus:
    content = senderr
  t = datetime.datetime.now()
  daytime = t.strftime('%Y-%m-%d')
  daylogfile = logpath+'/'+str(daytime)+'.log'
  logger = Log(daylogfile, level="info", is_console=False, mbs=5, count=5)
  os.system('chown zabbix.zabbix {0}'.format(daylogfile))
  logger.info(content)
if __name__ == "__main__":
  if len(sys.argv) > 1:
    send_to = sys.argv[1]
    subject = sys.argv[2]
    content = sys.argv[3]
    logwrite(True, content)
    main(send_to, subject, content)

python实现微信企业号的文本消息推送

#!/usr/bin/python
# _*_coding:utf-8 _*_
import urllib2
import json
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
def gettoken(corpid, corpsecret):
  gettoken_url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=' + corpid + '&corpsecret=' + corpsecret
  try:
    token_file = urllib2.urlopen(gettoken_url)
  except urllib2.HTTPError as e:
    print e.code
    print e.read().decode("utf8")
    sys.exit()
  token_data = token_file.read().decode('utf-8')
  token_json = json.loads(token_data)
  token_json.keys()
  token = token_json['access_token']
  return token
def senddata(access_token, user, party, agent, subject, content):
  send_url = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=' + access_token
  send_values = "{\"touser\":\"" + user + "\",\"toparty\":\"" + party + "\",\"totag\":\"\",\"msgtype\":\"text\",\"agentid\":\"" + agent + "\",\"text\":{\"content\":\"" + subject + "\n" + content + "\"},\"safe\":\"0\"}"
  send_request = urllib2.Request(send_url, send_values)
  response = json.loads(urllib2.urlopen(send_request).read())
  print str(response)
if __name__ == '__main__':
  user = str(sys.argv[1]) # 参数1:发送给用户的账号,必须关注企业号,并对企业号有发消息权限
  party = str(sys.argv[2]) # 参数2:发送给组的id号,必须对企业号有权限
  agent = str(sys.argv[3]) # 参数3:企业号中的应用id
  subject = str(sys.argv[4]) # 参数4:标题【消息内容的一部分】
  content = str(sys.argv[5]) # 参数5:文本具体内容
  corpid = 'CorpID' # CorpID是企业号的标识
  corpsecret = 'corpsecretSecret' # corpsecretSecret是管理组凭证密钥
  try:
    accesstoken = gettoken(corpid, corpsecret)
    senddata(accesstoken, user, party, agent, subject, content)
  except Exception, e:
    print str(e) + "Error Please Check \"corpid\" or \"corpsecret\" Config"

Nagios调用Python程序控制微信公众平台发布报警信息

vim Notify-host-by-weixin-party.py
import urllib.request
import json
import sys
#以上是导入模块
#创建获取AccessToken的方法
def gettoken(corp_id,corp_secret):
  gettoken_url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=' + corp_id + '&corpsecret=' + corp_secret
  try:
    token_file = urllib.request.urlopen(gettoken_url)
  except urllib.error.HTTPError as e:
    print(e.code)
    print(e.read().decode("utf8"))
  token_data = token_file.read().decode('utf-8')
  token_json = json.loads(token_data)
  token_json.keys()
  token = token_json['access_token']
  return token
#这里是发送消息的方法
def senddata(access_token,notify_str):
  send_url = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=' + access_token
#我传入的参数是一段字符串每个信息用separator连起来,只要再用字符串的split("separator")方法分开信息就可以了。
  notifydata = notify_str.split("separator")
  party = notifydata[0]
  cationtype = notifydata[1]
  name = notifydata[2]
  state = notifydata[3]
  address = notifydata[4]
  output = notifydata[5]
  datatime = notifydata[6]
#  content = '[擦汗]Host Notification[擦汗]\n\n类型: ' + cationtype + '\n主机名: ' + name + '\n状态: ' + state + '\nIP地址: ' + address + '\n摘要: ' + output + '\n时间: ' + datatime + '\n'
  if cationtype == "RECOVERY":
    content = '[嘘]' + address + ' is ' + state + '[嘘]\n\nIP地址: ' + address + '\n主要用途: ' + name + '\n当前状态: ' + state + '\n\n日志摘要: ' + output + '\n检测时间: ' + datatime + '\n'
  else:
    content = '[擦汗]' + address + ' is ' + state + '[擦汗]\n\nIP地址: ' + address + '\n主要用途: ' + name + '\n当前状态: ' + state + '\n\n日志摘要: ' + output + '\n检测时间: ' + datatime + '\n'
  send_values = {
    "toparty":party,
    "totag":"2",
    "msgtype":"text",
    "agentid":"15",
    "text":{
      "content":content
      },
    "safe":"0"
    }
  send_data = json.dumps(send_values, ensure_ascii=False).encode(encoding='UTF8')
#设置为非ascii解析,使其支持中文
  send_request = urllib.request.Request(send_url, send_data)
  response = urllib.request.urlopen(send_request)
#这个是返回微信公共平台的信息,调试时比较有用
  msg = response.read()
  return msg
default_encoding = 'utf-8'
if sys.getdefaultencoding() != default_encoding:
  reload(sys)
  sys.setdefaultencoding(default_encoding)
#我编辑的脚本是要获取nagios传入的一段参数的(字符串),下面这条代码是获取执行脚本后获取的第一个参数(经测试nagios只能传入一个参进python,所以把所有包括用户名跟报警主机报警信息放进一个字符串里)
notifystr = str(sys.argv[1])
corpid = 'wxb6162862801114c9da602'
corpsecret = '2nCsNcHxepBCV4U9Lcf-23By1RGzU1Zs422tdJpKTQzqjQ1b26IFxP76ydG2rKkchGN6E'
accesstoken = gettoken(corpid,corpsecret)
msg = senddata(accesstoken,notifystr)
print(msg)
[root@localhost python]# vim Notify-service-by-weixin-party.py
import urllib.request
import json
import sys
def gettoken(corp_id,corp_secret):
  gettoken_url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=' + corp_id + '&corpsecret=' + corp_secret
  try:
    token_file = urllib.request.urlopen(gettoken_url)
  except urllib.error.HTTPError as e:
    print(e.code)
    print(e.read().decode("utf8"))
  token_data = token_file.read().decode('utf-8')
  token_json = json.loads(token_data)
  token_json.keys()
  token = token_json['access_token']
  return token
def senddata(access_token,notify_str):
  send_url = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=' + access_token
  notifydata = notify_str.split("separator")
  party = notifydata[0]
  cationtype = notifydata[1]
  desc = notifydata[2]
  alias = notifydata[3]
  address = notifydata[4]
  state = notifydata[5]
  datatime = notifydata[6]
  output = notifydata[7]
#  content ='[擦汗]Service Notification [擦汗]\n\n类型: ' + cationtype + '\n\n服务名: ' + desc + '\n主机名: ' + alias + '\nIP址: ' + address + '\n状态: ' + state + '\n时间: ' + datatime + '\n摘要:\n' + output + '\n'
  if cationtype == "RECOVERY":
    content ='[鼓掌]' + desc + ' is ' + state + '[鼓掌]\n\nIP地址: ' + address + '\n主要用途: ' + alias + '\n服务状态: ' + desc + ' is ' + state + '\n检测时间: ' + datatime + '\n日志摘要: \n' + output + '\n'
  else:
    content ='[擦汗]' + desc + ' is ' + state + '[擦汗]\n\nIP地址: ' + address + '\n主要用途: ' + alias + '\n服务状态: ' + desc + ' is ' + state + '\n检测时间: ' + datatime + '\n日志摘要: \n' + output + '\n'
  send_values = {
    "toparty":party,
    "totag":"2",
    "msgtype":"text",
    "agentid":"15",
    "text":{
      "content":content
      },
    "safe":"0"
    }
  send_data = json.dumps(send_values, ensure_ascii=False).encode(encoding='UTF8')
  send_request = urllib.request.Request(send_url, send_data)
  response = urllib.request.urlopen(send_request)
  msg = response.read()
  return msg
default_encoding = 'utf-8'
if sys.getdefaultencoding() != default_encoding:
  reload(sys)
  sys.setdefaultencoding(default_encoding)
notifystr = str(sys.argv[1])
corpid = 'wxb616286d28ds01114c9da602'
corpsecret = '2nCsNcHxepBCdtgV4U9Lcf-23By1RGzUgh1Zs422tdJpKTQzqjQ1b26IFxP76ydG2rKkchGN6E'
accesstoken = gettoken(corpid,corpsecret)
msg = senddata(accesstoken,notifystr)
print(msg)

shell和Python调用企业微信服务号进行报警

#!/bin/bash
corpid="wxd6b3"
corpsecret="aJTaPaGjW"
access_token=`curl -s "https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=$corpid&corpsecret=$corpsecret" |jq '.access_token' | awk -F'"' '{print $2}'`
curl -l -H "Content-type: application/json" -X POST -d '{"touser":"@all","msgtype":"text","toparty":"14","agentid":"14","text":{"content":"测试"} , "safe":"0"}'   "

Python脚本如下:

# coding:utf-8
import sys
import urllib2
import time
import json
import requests
reload(sys)
sys.setdefaultencoding('utf-8')
#title = sys.argv[2]  # 位置参数获取title 适用于zabbix
#content = sys.argv[3] # 位置参数获取content 适用于zabbix
title = "title 测试"  # 位置参数获取title 适用于zabbix
content = "content 测试" # 位置参数获取content 适用于zabbix
class Token(object):
  # 获取token
  def __init__(self, corpid, corpsecret):
    self.baseurl = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={0}&corpsecret={1}'.format(
      corpid, corpsecret)
    self.expire_time = sys.maxint
  def get_token(self):
    if self.expire_time > time.time():
      request = urllib2.Request(self.baseurl)
      response = urllib2.urlopen(request)
      ret = response.read().strip()
      ret = json.loads(ret)
      if 'errcode' in ret.keys():
        print >> ret['errmsg'], sys.stderr
        sys.exit(1)
      self.expire_time = time.time() + ret['expires_in']
      self.access_token = ret['access_token']
    return self.access_token
def send_msg(title, content):
  # 发送消息
  corpid = "" # 填写自己应用的
  corpsecret = "" # 填写自己应用的
  qs_token = Token(corpid=corpid, corpsecret=corpsecret).get_token()
  url = "https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={0}".format(
    qs_token)
  payload = {
    "touser": "@all",
    "msgtype": "text",
    "agentid": "14",
    "text": {
          "content": "标题:{0}\n 内容:{1}".format(title, content)
    },
    "safe": "0"
  }
  ret = requests.post(url, data=json.dumps(payload, ensure_ascii=False))
  print ret.json()
if __name__ == '__main__':
  # print title, content
  send_msg(title, content)

python利用微信订阅号报警

# coding=utf-8
import urllib
import urllib2
import cookielib
import json
import sys
data={'username':'yaokuaile-99',
   'pwd':'f4bb2d8abe7a799ad62495a912ae3363',
   'imgcode':'',
   'f':'json'
   }
cj=cookielib.LWPCookieJar()
opener=urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))
urllib2.install_opener(opener)
def getToken():
  headers = {'Accept': 'application/json, text/javascript, */*; q=0.01',
        'Accept-Encoding': 'gzip,deflate,sdch',
        'Accept-Language': 'zh-CN,zh;q=0.8',
        'Connection': 'keep-alive',
        'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
        'Content-Length': '74',
        'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
        'Host': 'mp.weixin.qq.com',
        'Origin': 'https://mp.weixin.qq.com',
        'Referer': 'https://mp.weixin.qq.com/cgi-bin/loginpage?t=wxm2-login&lang=zh_CN',
        'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/33.0.1750.154 Safari/537.36',
        'X-Requested-With': 'XMLHttpRequest',
       }
  req = urllib2.Request('https://mp.weixin.qq.com/cgi-bin/login?lang=zh_CN',urllib.urlencode(data),headers)
  ret = urllib2.urlopen(req)
  retread= ret.read()
  token = json.loads(retread)
  token=token['redirect_url'][44:]
  return token
### send msg
def sendWeixin(msg,token,tofakeid):
  msg = msg
  token = token
  data1 = {'type':'1','content':msg,'imgcode':'','imgcode':'','tofakeid':tofakeid,'f':'json','token':token,'ajax':'1'}
  headers = {'Accept':'*/*',
        'Accept-Encoding': 'gzip,deflate,sdch',
        'Accept-Language': 'zh-CN,zh;q=0.8',
        'Connection': 'keep-alive',
        'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
        'Host': 'mp.weixin.qq.com',
        'Origin': 'https://mp.weixin.qq.com',
        'Referer': 'https://mp.weixin.qq.com/cgi-bin/singlemsgpage?msgid=&source=&count=20&t=wxm-singlechat&fromfakeid=150890&token=%s&lang=zh_CN',
        'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/33.0.1750.154 Safari/537.36',
        'X-Requested-With':'XMLHttpRequest',
        }
  req2 = urllib2.Request('https://mp.weixin.qq.com/cgi-bin/singlesend?t=ajax-response&f=json&lang=zh_CN',urllib.urlencode(data1),headers)
  ret2=urllib2.urlopen(req2)
if __name__=='__main__':
  '''
  useage: ./send_wx.py msg
  '''
  token = getToken()
  msg = sys.argv[1:]
  msg = '\n'.join(msg)
  tofakeid = '2443746922'
  sendWeixin(msg, token, tofakeid)
#!/usr/bin/env python
#-*- coding:utf-8 -*-
import urllib2,json
import datetime,time
from config import *
import sys
reload(sys)
sys.setdefaultencoding("utf-8")
class WechatPush():
  def __init__(self,appid,secrect,file_name):
    # 传入appid
    self.appid = appid
    # 传入密码
    self.secrect = secrect
    # 传入记录token和过期时间的文件名
    self.file_name=file_name
  def build_timestamp(self,interval):
    # 传入时间间隔,得到指定interval后的时间 格式为"2015-07-01 14:41:40"
    now = datetime.datetime.now()
    delta = datetime.timedelta(seconds=interval)
    now_interval=now + delta
    return now_interval.strftime('%Y-%m-%d %H:%M:%S')
  def check_token_expires(self):
    # 判断token是否过期
    with open(self.file_name,'r') as f:
      line=f.read()
      if len(line)>0:
        expires_time=line.split(",")[1]
        token=line.split(",")[0]
      else:
        return "","true"
    curr_time=time.strftime('%Y-%m-%d %H:%M:%S')
    # 如果过期返回false
    if curr_time>expires_time:
      return token,"false"
    # 没过期返回true
    else:
      return token,"true"
  def getToken(self):
    # 获取accessToken
    url = 'https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid='+self.appid + "&secret="+self.secrect
    try:
      f = urllib2.urlopen(url)
      s = f.read()
      # 读取json数据
      j = json.loads(s)
      j.keys()
      # 从json中获取token
      token = j['access_token']
      # 从json中获取过期时长
      expires_in =j['expires_in']
      # 将得到的过期时长减去300秒然后与当前时间做相加计算然后写入到过期文件
      write_expires=self.build_timestamp(int(expires_in-300))
      content="%s,%s" % (token,write_expires)
      with open(self.file_name,'w') as f:
        f.write(content)
    except Exception,e:
      print e
    return token
  def post_data(self,url,para_dct):
    """触发post请求微信发送最终的模板消息"""
    para_data = para_dct
    f = urllib2.urlopen(url,para_data)
    content = f.read()
    return content
  def do_push(self,touser,template_id,url,topcolor,data):
    '''推送消息 '''
    #获取存入到过期文件中的token,同时判断是否过期
    token,if_token_expires=self.check_token_expires()
    #如果过期了就重新获取token
    if if_token_expires=="false":
      token=self.getToken()
    # 背景色设置,貌似不生效
    if topcolor.strip()=='':
      topcolor = "#7B68EE"
    #最红post的求情数据
    dict_arr = {'touser': touser, 'template_id':template_id, 'url':url, 'topcolor':topcolor,'data':data}
    json_template = json.dumps(dict_arr)
    requst_url = "https://api.weixin.qq.com/cgi-bin/message/template/send?access_token="+token
    content = self.post_data(requst_url,json_template)
    #读取json数据
    j = json.loads(content)
    j.keys()
    errcode = j['errcode']
    errmsg = j['errmsg']
    #print errmsg
if __name__ == "__main__":
  def alarm(title,hostname,timestap,level,message,state,tail):
    """报警函数"""
    color="#FF0000"
    data={"first":{"value":title},"keyword1":{"value":hostname,"color":color},"keyword2":{"value":timestap,"color":color},"keyword3":{"value":level,"color":color},"keyword4":{"value":message,"color":color},"keyword5":{"value":state,"color":color},"remark":{"value":tail}}
    return data
  def recover(title,message,alarm_time,recover_time,continue_time,tail):
    """恢复函数"""
    re_color="#228B22"
    data={"first":{"value":title},"content":{"value":message,"color":re_color},"occurtime":{"value":alarm_time,"color":re_color},"recovertime":{"value":recover_time,"color":re_color},"lasttime":{"value":continue_time,"color":re_color},"remark":{"value":tail}}
    return data
  # data=alarm("测试的报警消息","8.8.8.8",time.ctime(),"最高级别","然并卵","挂了","大傻路赶紧处理")
  # 实例化类
  webchart=WechatPush(appid,secrect,file_name)
  url="http://www.xiaoniu88.com"
  print len(sys.argv)
  # 发送报警消息
  if len(sys.argv) == 9:
    title=sys.argv[1]
    hostname=sys.argv[2]
    timestap=sys.argv[3]
    level=sys.argv[4]
    message=sys.argv[5]
    state=sys.argv[6]
    tail=sys.argv[7]
    print "sys.argv[1]"+sys.argv[1]
    print "sys.argv[2]"+sys.argv[2]
    print "sys.argv[3]"+sys.argv[3]
    print "sys.argv[4]"+sys.argv[4]
    print "sys.argv[5]"+sys.argv[5]
    print "sys.argv[6]"+sys.argv[6]
    print "sys.argv[7]"+sys.argv[7]
    print "sys.argv[8]"+sys.argv[8]
    with open("/etc/zabbix/moniter_scripts/test.log",'a+') as f:
      f.write(title+"\n")
      f.write(hostname+"\n")
      f.write(timestap+"\n")
      f.write(level+"\n")
      f.write(message+"\n")
      f.write(state+"\n")
      f.write(tail+"\n")
      f.write("%s_%s" % ("group",sys.argv[8])+"\n")
    data=alarm(title,hostname,timestap,level,message,state,tail)
    group_name="%s_%s" % ("group",sys.argv[8])
    for touser in eval("%s_%s" % ("group",sys.argv[8])):
      webchart.do_push(touser,alarm_id,url,"",data)
    for touser in group_super:
      webchart.do_push(touser,alarm_id,url,"",data)
  #发送恢复消息
  elif len(sys.argv) == 8:
    title=sys.argv[1]
    message=sys.argv[2]
    alarm_time=sys.argv[3]
    recover_time=sys.argv[4]
    continue_time=sys.argv[5]
    tail=sys.argv[6]
    print "sys.argv[1]"+sys.argv[1]
    print "sys.argv[2]"+sys.argv[2]
    print "sys.argv[3]"+sys.argv[3]
    print "sys.argv[4]"+sys.argv[4]
    print "sys.argv[5]"+sys.argv[5]
    print "sys.argv[6]"+sys.argv[6]
    print "sys.argv[7]"+sys.argv[7]
    data=recover(title,message,alarm_time,recover_time,continue_time,tail)
    for touser in eval("%s_%s" % ("group",sys.argv[7])):
      webchart.do_push(touser,recover_id,url,"",data)
    for touser in group_super:
      webchart.do_push(touser,recover_id,url,"",data)

zabbix使用微信报警python脚本

#!/usr/bin/python
# coding: utf-8
#python2将zabbix报警信息发送到微信。
#by linwangyi #2016-01-18
import urllib,urllib2
import json
import sys
def gettoken(corpid,corpsecret):
  gettoken_url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=' + corpid + '&corpsecret=' + corpsecret
  try:
token_file = urllib2.urlopen(gettoken_url)
  except urllib2.HTTPError as e:
    print e.code
    print e.read().decode("utf8")
sys.exit()
  token_data = token_file.read().decode('utf-8')
  token_json = json.loads(token_data)
  token_json.keys()
  token = token_json['access_token']
  return token
def senddata(access_token,user,content):
  send_url = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=' + access_token
  send_values = {
    "touser":user,  #企业号中的用户帐号,在zabbix用户Media中配置,如果配置不正常,将按部门发送。
    "toparty":"1",  #企业号中的部门id
    "msgtype":"text", #企业号中的应用id,消息类型。
    "agentid":"1",
    "text":{
      "content":content
      },
    "safe":"0"
    }
  send_data = json.dumps(send_values, ensure_ascii=False)
  send_request = urllib2.Request(send_url, send_data)
  response = json.loads(urllib2.urlopen(send_request).read())
  print str(response)
if __name__ == '__main__':
  user = str(sys.argv[1])  #zabbix传过来的第一个参数
  content = str(sys.argv[3]) #zabbix传过来的第三个参数
  corpid = 'XXXX'  #CorpID是企业号的标识
  corpsecret = 'XXXXX' #corpsecretSecret是管理组凭证密钥
  accesstoken = gettoken(corpid,corpsecret)
  senddata(accesstoken,user,content)
#!/usr/bin/python3
# coding: utf-8
#python3将zabbix报警信息发送到微信。
#by http://sunday208.blog.51cto.com/ #2016-01-18
import urllib.request
import json
import sys
def gettoken(corp_id,corp_secret):
  gettoken_url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=' + corp_id + '&corpsecret=' + corp_secret
  try:
    token_file = urllib.request.urlopen(gettoken_url)
  except urllib.error.HTTPError as e:
    print(e.code)
    print(e.read().decode("utf8"))
  token_data = token_file.read().decode('utf-8')
  token_json = json.loads(token_data)
  token_json.keys()
  token = token_json['access_token']
  return token
def senddata(access_token,user,content):
  try:
    send_url = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=' + access_token
    send_values = {
      "touser":user, #企业号中的用户帐号,在zabbix用户Media中配置,如果配置不正常,将按部门发送。
      "toparty":"1", #企业号中的部门id
      "msgtype":"text",
      "agentid":"1",  #企业号中的应用id,消息类型。
      "text":{
        "content":content
        },
      "safe":"0"
      }
    send_data = json.dumps(send_values, ensure_ascii=False).encode(encoding='UTF8')
    send_request = urllib.request.Request(send_url, send_data)
    response = urllib.request.urlopen(send_request)
    msg = response.read()
    print("returned value : " + str(msg))
  except:
    print("returned value : " + str(msg))
default_encoding = 'utf-8'
if sys.getdefaultencoding() != default_encoding:
  reload(sys)
  sys.setdefaultencoding(default_encoding)
user = str(sys.argv[1])  #zabbix传过来的第一个参数
content = str(sys.argv[3]) #zabbix传过来的第三个参数
corpid = 'XXXX'  #CorpID是企业号的标识
corpsecret = 'XXXXX' #corpsecretSecret是管理组凭证密钥
accesstoken = gettoken(corpid,corpsecret)
senddata(accesstoken,user,content)

总结

以上所述是小编给大家介绍的python利用微信公众号实现报警功能 ,希望对大家有所帮助,如果大家有任何疑问欢迎给我留言,小编会及时回复大家的!

(0)

相关推荐

  • Python实现个人微信号自动监控告警的示例

    wechat_sender 是基于 wxpy 和 tornado 实现的一个可以将你的网站.爬虫.脚本等其他应用中各种消息 (日志.报警.运行结果等) 发送到微信的工具. 运行环境 Python 2.7 及以上 Python 3 及以上 实现过程 安装 pip 工具 [root@server1 ~]# wget https://bootstrap.pypa.io/get-pip.py [root@server1 ~]# python get-pip.py pip 安装模块 ##安装依赖软件 [r

  • Python+微信接口实现运维报警

    说到运维报警,我觉得都可以写个长篇历史来详细解释了报警的前世来生,比如最早报警都是用邮件,但邮件实时性不高,比如下班回家总不能人一直盯着邮箱吧,所以邮件这种报警方式不适合用来报紧急的故障,日常磁盘利用率监控什么的可以用它来报没问题,网站宕机不能访问这种故障,用它就明显不合适了,那对这种业务稳定性要求比较高的业务,后来就发展成了用短信,就是公司买个短信机,提供一个http接口,然后运维人员写脚本把收集到的异常数据写入文件,然后脚本实时检测如果这个文件不为空,就调用短信机接口把文件里的内容发送出去,

  • Python WXPY实现微信监控报警功能的代码

    概述: 本文主要分享一下博主在学习wxpy 的过程中开发的一个小程序.博主在最近有一个监控报警的需求需要完成,然后刚好在学习wxpy 这个东西,因此很巧妙的将工作和学习联系在一起. 博文中主要使用到的技术设计到Python,Redis,以及Java.涉及到的技术看似很多,但是主要的语言是基于Python进行开发的. 架构涉及主要采用了 生产者消费者的涉及模式,使用Redis作为消息队列进行解耦操作. 主要架构涉及如下: 接下来开始介绍一下程序的实现过程,主要讲解wxpy -> python.re

  • python利用微信公众号实现报警功能

    微信公众号共有三种,服务号.订阅号.企业号.它们在获取AccessToken上各有不同. 其中订阅号比较坑,它的AccessToken是需定时刷新,重复获取将导致上次获取的AccessToken失效. 而企业号就比较好,AccessToken有效期同样为7200秒,但有效期内重复获取返回相同结果. 为兼容这两种方式,因此按照订阅号的方式处理.  处理办法与接口文档中的要求相同: 为了保密appsecrect,第三方需要一个access_token获取和刷新的中控服务器. 而其他业务逻辑服务器所使

  • 如何利用python给微信公众号发消息实例代码

    现在通过发微信公众号信息来做消息通知和告警已经很普遍了.最常见的就是运维通过zabbix调用shell脚本给微信发消息,起到告警的作用.当要发送的信息较多,而且希望按照指定格式显示的好看一点的时候,shell处理起来,个人感觉不太方便.于是我用python重写了发微信的功能. #coding:utf-8 import urllib2 import json import sys def getMsg(): #为了避免发送中文消息报错,使用utf8方式编码 reload(sys) sys.setd

  • python编写微信公众号首图思路详解

    前言 之前一直在美图秀秀调整自己的微信公众号首图,效果也不尽如人意,老是调来调去,最后发出来的图片被裁剪了一大部分,丢失部分关键信息,十分恼火,于是想着用python写一个程序,把微信公众号首图的模式固定下来,方便以后写公众号. 思路 根据微信公众号首图要求,可以上传一个不超过5M的图片,且图片尺寸要是2.35:1的尺寸,换算成像素是900:383,有了这些参数就可以做文章了,这里有两种思路 把今天推文的标题(文字)用图片展示出来,使得文字排列错落有致,简单粗暴,而又不失美感,这里可以利用mat

  • 使用 Python 实现微信公众号粉丝迁移流程

    近日,因公司业务需要,需将原两个公众号合并为一个,即要将其中一个公众号(主要是粉丝)迁移到另一个公众号.按微信规范,同一用户在不同公众号内的 openid 是不同的,我们的业务系统不例外地记录了用户的 openid,因此,涉及到两个公众号的 openid 的转换.幸好,微信公众号平台在账号迁移描述提供了方法和API供调用,详见: http://kf.qq.com/faq/170221aUnmmU170221eUZJNf.html 这里使用 Python 写个程序来完成,简单快捷,主要知识点有:

  • python采集微信公众号文章

    本文实例为大家分享了python采集微信公众号文章的具体代码,供大家参考,具体内容如下 在python一个子目录里存2个文件,分别是:采集公众号文章.py和config.py. 代码如下: 1.采集公众号文章.py from urllib.parse import urlencode import pymongo import requests from lxml.etree import XMLSyntaxError from requests.exceptions import Connec

  • python下载微信公众号相关文章

    本文实例为大家分享了python下载微信公众号相关文章的具体代码,供大家参考,具体内容如下 目的:从零开始学自动化测试公众号中下载"pytest"一系列文档 1.搜索微信号文章关键字搜索 2.对搜索结果前N页进行解析,获取文章标题和对应URL 主要使用的是requests和bs4中的Beautifulsoup Weixin.py import requests from urllib.parse import quote from bs4 import BeautifulSoup im

  • Python微信公众号开发平台

    上大学的时候,对微信公众号开发浅尝辄止的玩了一下,感觉还是挺有意思的. http://www.jb51.net/article/133677.htm后来服务器到期了,也就搁置了.由于发布web程序,使用PHP很顺手,就使用了PHP作为开发语言.但是其实微信公众号的开发和语言关联并不大,流程,原理上都是一致的. 快要做毕设了,想着到时候应该会部署一些代码到服务器上,进行长期的系统构建.所以趁着还是学生,就买了阿里云的学生机.买了之后,就想着玩点什么,于是微信公众号的开发,就又提上了日程.但是这次,

  • python微信公众号开发简单流程

    本文为大家分享了python微信公众号开发的简单过程,供大家参考,具体内容如下 网上有很多微信公众号的开发教程,但是都是好几年前的了,而且很多都是抄袭其他人的,内容几乎一模一样.真的无语了.只好自己总结一下开发的一些简单流程. 一.注册个微信公众号,这个就不详细说了. 二.登录后台,进入开发中的基本配置,配置下服务器 填写url和token,url是服务器的地址,token是自己定义的 三.登录服务器开发 网上很多教程用的什么新浪sae啊,webpy都是很久之前的.现在很多东西都变了,所以我没有

  • python如何导出微信公众号文章方法详解

    1.安装wkhtmltopdf 下载地址:https://wkhtmltopdf.org/downloads.html 我测试用的是windows的,下载安装后结果如下 2 编写python 代码导出微信公众号文章 不能直接使用wkhtmltopdf 导出微信公众号文章,导出的文章会缺失图片,所以需要使用 wechatsogou 将微信公众号文章页面抓取,之后将html文本转化为pdf pip install wechatsogou --upgrade pip install pdfkit 踩坑

  • 用python wxpy管理微信公众号并利用微信获取自己的开源数据

    之前了解到itchat 乃至于 wxpy时 是利用tuling聊天机器人的接口.调用接口并保存双方的问答结果可以作为自己的问答词库的一个数据库累计.这些数据可以用于自己训练. 而最近希望获取一些语音资源,用于卷积神经网络的训练.. -------------------------------------------------------------------------------- 首先wxpy是itchat的升级版,通过wxpy bot.core即可原封不动的调用itchat的指令.

随机推荐