首页 > Script > Python > Http并发测试脚本
2020
03-27

Http并发测试脚本

基于Python3.0写的http并发测试脚本

一、编写代码

!#/usr/bin/python
#-*- encoding:utf-8 -*-
import os
import time
import logging
import requests
import threading
from multiprocessing import Manager
from concurrent import futures
import json

# 这里更换成自己要进行压力测试的URL地址(这里是通过下载链接的方式进行测试)
download_url = 'http://192.168.105.124:9100/metrics' 
cpu_count = os.cpu_count()

session = requests.Session()


def handle(cost, mutex, contain):
'''
计算压测数据,打印日志
:param cost: 该次请求花费时间
:param mutex: 锁
:param contain: 全局统计指标
:return:
'''
# 获取到锁的进程,可以计算并打印压测结果
with mutex:
min_cost = contain['min_cost']
max_cost = contain['max_cost']
hit_count = contain['hit_count']
average_cost = contain['average_cost']
if min_cost == 0:
contain['min_cost'] = cost
if min_cost > cost:
contain['min_cost'] = cost
if max_cost < cost:
contain['max_cost'] = cost
average_cost = (average_cost * hit_count + cost) / (hit_count + 1)
hit_count += 1
contain['average_cost'] = average_cost
contain['hit_count'] = hit_count
logging.info(contain)


def download_one(mutex, contain):
while True:
try:
stime = time.time()
# 发送给服务器的数据
data = {
"image": "sadfasdfasd",
"dfas": "sdfasdf"
}
request = requests.Request(method='POST', url=download_url, json=json.dumps(data))
prep = session.prepare_request(request)
response = session.send(prep, timeout=50)
etime = time.time()
# os.getpid() :当前进程id
# threading.current_thread().ident : 当前线程标识
# response.status_code : 服务器响应代码 200、500 ..
logging.info('process[%s] thread[%s] status[%s] cost[%s]', os.getpid(), threading.current_thread().ident,
response.status_code, etime - stime)
handle(float(etime - stime), mutex, contain)
# time.sleep(1)
except Exception as e:
logging.error(e)
print(e)


# def new_thread_pool(mutex, contain):
# # 创建线程池,创建的线程数量为默认计算机核数*5
# with futures.ThreadPoolExecutor(workers) as executor:
# # 向xiancheng
# for i in range(workers):
# executor.submit(download_one, mutex, contain)


def subprocess():
manager = Manager()
# 创建锁,当某个线程获取到锁时,其它线程等待
mutex = manager.Lock()
# 全局统计指标
contain = manager.dict({'average_cost': 0, 'min_cost': 0, 'max_cost': 0, 'hit_count': 0})

# 创建进程池,创建的进程数量为默认进程数量与cpu数量相等
with futures.ProcessPoolExecutor() as executor:
# 向进程池提交要执行的函数,提交次数与cpu数量相等,去提高并行度
for i in range(cpu_count):
executor.submit(download_one, mutex, contain)


if __name__ == '__main__':
logging.basicConfig(filename="client.log", level=logging.INFO,
format="%(asctime)s [%(filename)s:%(lineno)d] %(message)s", datefmt="%m/%d/%Y %H:%M:%S [%A]")
subprocess()

二、测试验证

1、脚本测试之前的性能截图

Http并发测试脚本 - 第1张  | 运维手册

Http并发测试脚本 - 第2张  | 运维手册

2、脚本测试之后的性能截图

直接用Pycharm进行调试

Http并发测试脚本 - 第3张  | 运维手册

压力测试后的状态显示

Http并发测试脚本 - 第4张  | 运维手册

Http并发测试脚本 - 第5张  | 运维手册

 

最后编辑:
作者:李国庆
这个作者貌似有点懒,什么都没有留下。
捐 赠如果您觉得这篇文章有用处,请支持作者!鼓励作者写出更好更多的文章!

留下一个回复

你的email不会被公开。