基于Python3.0写的http并发测试脚本
一、编写代码
!#/usr/bin/python #-*- encoding:utf-8 -*-
import os import time import logging import requests import threading from multiprocessing import Manager from concurrent import futures import json # 这里更换成自己要进行压力测试的URL地址(这里是通过下载链接的方式进行测试) download_url = 'http://192.168.105.124:9100/metrics' cpu_count = os.cpu_count() session = requests.Session() def handle(cost, mutex, contain): ''' 计算压测数据,打印日志 :param cost: 该次请求花费时间 :param mutex: 锁 :param contain: 全局统计指标 :return: ''' # 获取到锁的进程,可以计算并打印压测结果 with mutex: min_cost = contain['min_cost'] max_cost = contain['max_cost'] hit_count = contain['hit_count'] average_cost = contain['average_cost'] if min_cost == 0: contain['min_cost'] = cost if min_cost > cost: contain['min_cost'] = cost if max_cost < cost: contain['max_cost'] = cost average_cost = (average_cost * hit_count + cost) / (hit_count + 1) hit_count += 1 contain['average_cost'] = average_cost contain['hit_count'] = hit_count logging.info(contain) def download_one(mutex, contain): while True: try: stime = time.time() # 发送给服务器的数据 data = { "image": "sadfasdfasd", "dfas": "sdfasdf" } request = requests.Request(method='POST', url=download_url, json=json.dumps(data)) prep = session.prepare_request(request) response = session.send(prep, timeout=50) etime = time.time() # os.getpid() :当前进程id # threading.current_thread().ident : 当前线程标识 # response.status_code : 服务器响应代码 200、500 .. logging.info('process[%s] thread[%s] status[%s] cost[%s]', os.getpid(), threading.current_thread().ident, response.status_code, etime - stime) handle(float(etime - stime), mutex, contain) # time.sleep(1) except Exception as e: logging.error(e) print(e) # def new_thread_pool(mutex, contain): # # 创建线程池,创建的线程数量为默认计算机核数*5 # with futures.ThreadPoolExecutor(workers) as executor: # # 向xiancheng # for i in range(workers): # executor.submit(download_one, mutex, contain) def subprocess(): manager = Manager() # 创建锁,当某个线程获取到锁时,其它线程等待 mutex = manager.Lock() # 全局统计指标 contain = manager.dict({'average_cost': 0, 'min_cost': 0, 'max_cost': 0, 'hit_count': 0}) # 创建进程池,创建的进程数量为默认进程数量与cpu数量相等 with futures.ProcessPoolExecutor() as executor: # 向进程池提交要执行的函数,提交次数与cpu数量相等,去提高并行度 for i in range(cpu_count): executor.submit(download_one, mutex, contain) if __name__ == '__main__': logging.basicConfig(filename="client.log", level=logging.INFO, format="%(asctime)s [%(filename)s:%(lineno)d] %(message)s", datefmt="%m/%d/%Y %H:%M:%S [%A]") subprocess()
二、测试验证
1、脚本测试之前的性能截图
2、脚本测试之后的性能截图
直接用Pycharm进行调试
压力测试后的状态显示
- 本文固定链接: https://www.gayj.cn/?p=702
- 转载请注明: https://www.gayj.cn/
捐 赠如果您觉得这篇文章有用处,请支持作者!鼓励作者写出更好更多的文章!