python多线程http压力测试脚本

本文实例为大家分享了python多线程http压力测试的具体代码,供大家参考,具体内容如下

#coding=utf-8

import sys
import time
import thread
import httplib, urllib
import random
import uuid
import logging
logging.basicConfig(level=logging.DEBUG,
    format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
    datefmt='%a, %d %b %Y %H:%M:%S',
    filename='测试脚本日志.log',
    filemode='w')

def log_uncaught_exceptions(exception_type, exception, tb):
 logging.critical(''.join(traceback.format_tb(tb)))
 logging.critical('{0}: {1}'.format(exception_type, exception))
sys.excepthook = log_uncaught_exceptions

#网关地址
addr="172.18.2.4"
port=8080
thread_count = 15 #单次并发数量
requst_interval = 10 #请求间隔(秒)
test_count = sys.maxsize #sys.maxsize # 指定测试次数

#字段说明,必须一一对应
#login为空表示使用随机用户名

param_list=[
{"login":"user1","password":"qweqwe12"},
]

now_count = 0
lock_obj = thread.allocate()
def send_http():
 global now_count
 httpClient = None
 try:
  for user in user_list:
   tmp_user = user["login"]
   if tmp_user.strip() =='':
    tmp_user = str(uuid.uuid1()) + str(random.random())
   print tmp_user
   params = urllib.urlencode({"operationData":
      [{"login": tmp_user,"password":user["password"]}]})
   headers = {"Content-type": "application/x-www-form-urlencoded", "Accept": "text/plain"}

   httpClient = httplib.HTTPConnection(addr, port, timeout=5)
   httpClient.request("POST", "/simple/spider.task.distribute", params, headers)

   response = httpClient.getresponse()
   print '发送数据: ' + params
   print '返回码: ' + str(response.status)
   print '返回数据: ' + response.read()

   logging.info('发送数据: ' + params)
   logging.info('返回码: ' + str(response.status))
   logging.info('返回数据: ' + response.read())
   #print response.getheaders() #获取头信息
   sys.stdout.flush()
   now_count+=1
 except Exception, e:
  print e
  logging.info(e)
 finally:
  if httpClient:
   httpClient.close()

def test_func(run_count):
 global now_count
 global requst_interval
 global lock_obj
 cnt = 0
 while cnt < run_count:
  lock_obj.acquire()
  print ''
  print '***************************请求次数:' + str(now_count) + '*******************************'
  print 'Thread:(%d) Time:%s\n'%(thread.get_ident(), time.ctime())

  logging.info(' ')
  logging.info('***************************请求次数:' + str(now_count) + '*******************************')
  logging.info('Thread:(%d) Time:%s\n'%(thread.get_ident(), time.ctime()))
  cnt+=1
  send_http()
  sys.stdout.flush()
  lock_obj.release()
  time.sleep(requst_interval)

def test(ct):
 global thread_count
 for i in range(thread_count):
  thread.start_new_thread(test_func,(ct,))

if __name__=='__main__':
 global test_count
 test(test_count)
 while True:
  time.sleep(100)

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • Python实现的多线程http压力测试代码

    本文实例讲述了Python实现的多线程http压力测试代码.分享给大家供大家参考,具体如下: # Python version 3.3 __author__ = 'Toil' import sys, getopt import threading def httpGet(url, file): import http.client conn = http.client.HTTPConnection(url) conn.request("GET", file) r = conn.getr

  • python多线程http压力测试脚本

    本文实例为大家分享了python多线程http压力测试的具体代码,供大家参考,具体内容如下 #coding=utf-8 import sys import time import thread import httplib, urllib import random import uuid import logging logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(filename)s[line:%(lineno)d

  • 对python多线程SSH登录并发脚本详解

    测试系统中有一项记录ssh登录日志,需要对此进行并发压力测试. 于是用多线程进行python并发记录 因为需要安装的一些依赖和模块比较麻烦,脚本完成后再用pyinstaller打成exe包分发给其他测试人员一起使用. 1.脚本编写 # -*- coding: utf-8 -*- import paramiko import threading import time lt = [] def ssh(a,xh,sp): count = 0 for i in range(0,xh): try: ss

  • python多线程并发及测试框架案例

    这篇文章主要介绍了python多线程并发及测试框架案例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 1.循环创建多个线程,并通过循环启动执行 import threading from datetime import * from time import sleep # 单线程执行 def test(): print('hello world') t = threading.Thread(target=test) t.start() # 多线

  • python+requests接口压力测试500次,查看响应时间的实例

    接口压力测试500次,查看响应时间 import json import requests import logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) restime = [] OK=[] class Restime(): def API(self,

  • Python+Pytest实现压力测试详解

    目录 1.程序说明 1.1 设置测试参数 1.2 初始化测试结果 1.3 定义测试函数 1.4 创建线程.执行线程.等待 1.5 计算测试结果 1.6 将测试结果写入文件 2.程序执行 2.1 直接执行 2.2 加个装饰器然后出报告 3.案例缺陷 4 完整源码 在现代Web应用程序中,性能是至关重要的.为了确保应用程序能够在高负载下正常运行,我们需要进行性能测试. 今天,应小伙伴的提问, 田辛老师来写一个Pytest进行压力测试的简单案例. 这个案例的测试网站我们就隐藏了,不过网站的基本情况是:

  • python实现接口并发测试脚本

    常用的网站性能测试指标有:并发数.响应时间.吞吐量.性能计数器等. 1.并发数 并发数是指系统同时能处理的请求数量,这个也是反应了系统的负载能力. 2.响应时间 响应时间是一个系统最重要的指标之一,它的数值大小直接反应了系统的快慢.响应时间是指执行一个请求从开始到最后收到响应数据所花费的总体时间. 3.吞吐量 吞吐量是指单位时间内系统能处理的请求数量,体现系统处理请求的能力,这是目前最常用的性能测试指标. QPS(每秒查询数).TPS(每秒事务数)是吞吐量的常用量化指标,另外还有HPS(每秒HT

  • mysql压力测试脚本实例

    本文实例讲述了mysql压力测试的脚本,分享给大家供大家参考.具体如下: 创建表DEPT 复制代码 代码如下: CREATE TABLE dept( /*部门表*/ deptno MEDIUMINT   UNSIGNED  NOT NULL  DEFAULT 0, dname VARCHAR(20)  NOT NULL  DEFAULT "", loc VARCHAR(13) NOT NULL DEFAULT "" ) ENGINE=MyISAM DEFAULT C

  • python+ffmpeg视频并发直播压力测试

    通过python与ffmpeg结合使用,可生成进行视频点播.直播的压力测试脚本.可支持不同类型的视频流,比如rtmp或者hls形式. 通过如下方式执行脚本:python multiRealPlay.py [rtmp|http] [thread counts] [interval Time] [rtmp | http]:视频播放的不同形式 [thread counts]:并发线程数 [interval Time]:启动每个线程的间隔时间 代码: #!/usr/bin/python # -*- co

  • 使用Python的Treq on Twisted来进行HTTP压力测试

    从事API相关的工作很有挑战性,在高峰期保持系统的稳定及健壮性就是其中之一,这也是我们在Mailgun做很多压力测试的原因. 这么久以来,我们已经尝试了很多种方法,从简单的ApacheBench到复杂些的自定义测试套.但是本贴讲述的,是一种使用python进行"快速粗糙"却非常灵活的压力测试的方法. 使用python写HTTP客户端的时候,我们都很喜欢用 Requests library.这也是我们向我们的API用户们推荐的.Requests 很强大,但有一个缺点,它是一个模块化的每线

随机推荐