用Python进行websocket接口测试

我们在做接口测试时,除了常见的http接口,还有一种比较多见,就是socket接口,今天讲解下怎么用Python进行websocket接口测试。

现在大多数用的都是websocket,那我们就先来安装一下websocket的安装包。

pip install websocket-client

安装完之后,我们就开始我们的websocket之旅了。

我们先来看个炒鸡简单的栗子:

import websocket
ws = websocket.WebSocket()
ws.connect("ws://example.com/websocket",
      http_proxy_host="proxy_host_name",
      http_proxy_port=3128)

这个栗子就是创建一个websocket连接,这个模块支持通过http代理访问websocket。代理服务器允许使用connect方法连接到websocket端口。默认的squid设置是“只允许连接HTTPS端口”。

在websocket里,我们有常用的这几个方法:

on_message方法:

def on_message(ws, message):
  print(message)

on_message是用来接受消息的,server发送的所有消息都可以用on_message这个方法来收取。

on_error方法:

def on_error(ws, error):
  print(error)

这个方法是用来处理错误异常的,如果一旦socket的程序出现了通信的问题,就可以被这个方法捕捉到。

on_open方法:

def on_open(ws):
  def run(*args):
    for i in range(30):
      # send the message, then wait
      # so thread doesn't exit and socket
      # isn't closed
      ws.send("Hello %d" % i)
      time.sleep(1)

    time.sleep(1)
    ws.close()
    print("Thread terminating...")

  Thread(target=run).start()

on_open方法是用来保持连接的,上面这样的一个例子,就是保持连接的一个过程,每隔一段时间就会来做一件事,他会在30s内一直发送hello。最后停止。

on_close方法:

def on_close(ws):
  print("### closed ###")

onclose主要就是关闭socket连接的。

如何创建一个websocket应用:

ws = websocket.WebSocketApp("wss://echo.websocket.org")

括号里面就是你要连接的socket的地址,在WebSocketApp这个实例化的方法里面还可以有其他参数,这些参数就是我们刚刚介绍的这些方法。

ws = websocket.WebSocketApp("ws://echo.websocket.org/",
              on_message=on_message,
              on_error=on_error,
              on_close=on_close)

指定了这些参数之后就可以直接进行调用了,例如:

ws.on_open = on_open

这样就是调用了on_open方法

如果我们想让我们的socket保持长连接,一直连接着,就可以使用run_forever方法:

ws.run_forever()

完整代码:

import websocket
from threading import Thread
import time
import sys

def on_message(ws, message):
  print(message)

def on_error(ws, error):
  print(error)

def on_close(ws):
  print("### closed ###")

def on_open(ws):
  def run(*args):
    for i in range(3):
      # send the message, then wait
      # so thread doesn't exit and socket
      # isn't closed
      ws.send("Hello %d" % i)
      time.sleep(1)

    time.sleep(1)
    ws.close()
    print("Thread terminating...")

  Thread(target=run).start()

if __name__ == "__main__":

  websocket.enableTrace(True)
  host = "ws://echo.websocket.org/"
  ws = websocket.WebSocketApp(host,
                on_message=on_message,
                on_error=on_error,
                on_close=on_close)
  ws.on_open = on_open
  ws.run_forever()

如果想要通信一条短消息,并在完成后立即断开连接,我们可以使用短连接:

from websocket import create_connection
ws = create_connection("ws://echo.websocket.org/")
print("Sending 'Hello, World'...")
ws.send("Hello, World")
print("Sent")
print("Receiving...")
result = ws.recv()
print("Received '%s'" % result)
ws.close()

关于websocket的介绍就到这儿了。

以上就是用Python进行websocket接口测试的详细内容,更多关于python 接口测试的资料请关注我们其它相关文章!

(0)

相关推荐

  • 对python自动生成接口测试的示例讲解

    在python中Template可以将字符串的格式固定下来,重复利用. 同一套测试框架为了可以复用,所以我们可以将用例部分做参数化,然后运用到各个项目中. 代码如下: coding=utf-8 ''' 作者:大石 功能:自动生成pyunit框架下的接口测试用例 环境:python2.7.6 用法:将用户给的参数处理成对应格式,然后调用模块类生成函数,并将参数传入即可 ''' from string import Template #动态生成单个测试用例函数字符串 def singleMethod

  • python自动化测试三部曲之request+django实现接口测试

    国庆期间准备写三篇博客,介绍和总结下接口测试,由于国庆期间带娃,没有按照计划完成,今天才完成第二篇,惭愧惭愧. 这里我第一篇博客的地址:https://www.jb51.net/article/197004.htm,主要是介绍unittest框架,有兴趣的同学们可以移步去查阅 这里废话少说,进入正题 我的思路是这样的 1.先用django实现登陆.增加.删除.查看4个接口 2.在excel定义好测试案例.然后读取excel中的案例,然后把案例用unittest框架组装和封装 3.启动django

  • Python requests接口测试实现代码

    1.get方法请求接口 url:显而易见,就是接口的地址url啦 headers:请求头,例如:content-type = application/x-www-form-urlencoded params:用于传递测试接口所要用的参数,这里我们用python中的字典形式(key:value)进行参数的传递. 举个例子: import requests url="http://api.shein.com/login" header={"content-type":&

  • Python3 webservice接口测试代码详解

    一.使用python3做webervice接口测试的第三方库选择suds-jurko库,可以直接pip命令直接下载,也可以在pypi官网下载压缩包进行手动安装 二.安装好后,导入Client:from suds.client import Client.发送一条请求 from suds.client import Client url = 'http://www.webxml.com.cn/WebServices/WeatherWebService.asmx?wsdl' client = Cli

  • Python脚本完成post接口测试的实例

    一个post类型的接口怎么编写脚本实现 1.打开网页,在fiddler上获取到接口的URL 2.用Python的requests库实现 import requests new_url="http://10.31.143.2:8989/system/systemOrgan/list" params = {"access_token": "807ad226-cbcc-4620-9544-8f53e1d51405"} payload = { "

  • python 接口测试response返回数据对比的方法

    背景:之前写的接口测试一直没有支持无限嵌套对比key,上次testerhome逛论坛,有人分享了他的框架,看了一下,有些地方不合适我这边自己修改了一下,部署在jenkins上跑完效果还不错,拿出来分享一下.ps:还是要多看看别人写的,新学了不少python自带的一些常用方法. 这次直接上代码,下面写一下这次我新学一些方法和思路. def check_response_hope_key(self,response={},hope_response={}): temp_data={} for n1

  • Python reques接口测试框架实现代码

    一.框架菜单 1.1 common模块 1.2 其他 二.Excel接口测试案例编写 三.读取Excel测试封装(核心封装) excel_utils.py 读取Excel中的数据 import os import xlrd #内置模块.第三方模块pip install 自定义模块 class ExcelUtils(): def __init__(self,file_path,sheet_name): self.file_path = file_path self.sheet_name = she

  • python利用requests库进行接口测试的方法详解

    前言 之前介绍了接口测试中需要关注得测试点,现在我们来看看如何进行接口测试,现在接口测试工具有很多种,例如:postman,soapui,jemter等等,对于简单接口而言,或者我们只想调试一下,使用工具是非常便捷而且快速得,但是对于更复杂得场景,这些工具虽然也能实现,但是难度要比写代码更大,而且定制化受到工具得功能影响,会 遇到一些障碍,当然我们还要实现自动化等等,鉴于以上因素,我们还是要学会使用代码进行接口测试,便于维护与扩展,或者算是我们知识得补充把~ requests库是python用来

  • Python接口测试数据库封装实现原理

    引言 做接口测试的时候,避免不了操作数据库.因为数据校验需要,测试数据初始化需要.一些参数化场景需要等. 数据库操作框架设计 这里主要操作mysql数据库,整体思路: 封装实现 具体代码实现: import pymysql import json class OperateMysql(object): def __init__(self): # 数据库初始化连接 self.connect_interface_testing = pymysql.connect( "localhost",

  • Python接口测试get请求过程详解

    python 做借口测试用到的是requests模块,首先要导入requests库,pip install requests 1.get直接请求方式 以豆瓣网为例: url = 'https://read.douban.com/' respose = requests.get(url=url) # status_code 为返回的状态码 print(respose.status_code) # text为返回的数据 print(respose.text) 请求结果:返回状态码为200,表明请求的

  • Python+request+unittest实现接口测试框架集成实例

    1.为什么要写代码实现接口自动化 大家知道很多接口测试工具可以实现对接口的测试,如postman.jmeter.fiddler等等,而且使用方便,那么为什么还要写代码实现接口自动化呢?工具虽然方便,但也不足之处: 测试数据不可控制 接口测试本质是对数据的测试,调用接口,输入一些数据,随后,接口返回一些数据.验证接口返回数据的正确性.在用工具运行测试用例之前不得不手动向数据库中插入测试数据.这样我们的接口测试是不是就没有那么"自动化了". 无法测试加密接口 这是接口测试工具的一大硬伤,如

随机推荐