python multiprocessing.dummy Pool多线程、进程任务队列使用 http压力测试简单示例
工作中有个常用的场景,比如现在需要下载10W张图片,我们不可能写个for循环一张一张的下载吧,又或者是我们做个简单的HTTP压力测试肯定是要使用多个,进程或者线程去做(每个请求handler,会有一个参数(所有的参数生成一个队列))然后把handler和队列map到Pool里面。肯定要用多线程或者是多进程,然后把这100W的队列丢给线程池或者进程池去处理在python中multiprocessing Pool进程池,以及multiprocessing.dummy非常好用,一般:
前者是多个进程,后者使用的是线程,之所以dummy(中文意思“假的”)
下面给出一个简单的http压力测试示例:
# _*_ coding:utf-8 _*_ """ This file a sample demo to do http stress test """ import requests import time from multiprocessing.dummy import Pool as ThreadPool import urllib def get_ret_from_http(url): """cited from https://stackoverflow.com/questions/645312/what-is-the-quickest-way-to-http-get-in-python """ ret = requests.get(url) print ret.content # eg. result: {"error":false,"resultMap":{"check_ret":1},"success":true} def multi_process_stress_test(): """ start up 4 thread to issue 1000 http requests to server and test consume time :return: """ start = time.time() # 实际中url带参数的一般使用下面的make_url函数生成,这里示例就不用(前面写的现在懒得改了) url = """http://127.0.0.1:9325/shortvideo/checkBlack?url=http%3A%2F%2Fzbasecapture.bs2.yy.com%2F42269159_1499248536403_3.jpg&serial=abcdddddddd""" # generate task queue list lst_url = [url, url1]*50 # use 5 threads pool = ThreadPool(5) # task and handles to pool ret = pool.map(get_ret_from_http, lst_url) pool.close() pool.join() print 'time consume %s' % (time.time() - start) def make_url(): """ generate url with parameter https://xy.com/index.php? url=http%3A//xy.xxx.com/22.jpg&SecretId=xy_123_move cited from https://stackoverflow.com/questions/2506379/add-params-to-given-url-in-python https://github.com/gruns/furl a good util for url operator :return: """ para = {"SecretId": "xy_123_move", "url": "http://xy.xxx.com/22.jpg"} print urllib.urlencode(para) #url=http%3A%2F%2Fxy.xxx.com%2F22.jpg&SecretId=xy_123_move base_url = 'xy.com/index.php' return 'https://%s?%s' % (base_url, '&'.join('%s=%s' % (k, urllib.quote(str(v))) for k, v in para.iteritems())) if __name__ == '__main__': # get_ret_from_http() multi_process_stress_test() # print make_url() pass
下面在给出另一个简单的示例,handler函数每次睡眠随机的秒数(根据指定的参数),我们可以选择使用进程或者是线程来完成队列中所有的任务(一般CPU密集型的选择用多进程,IO密集型的可以选择多线程)
# _*_ coding:utf-8 _*_ """ This file is about thread(dummy)/process pool """ from multiprocessing import Pool as ProcessPool from multiprocessing.dummy import Pool as ThreadPool import logging from time import sleep, time from random import randrange logging.basicConfig(level=logging.DEBUG, format='%(levelname)s %(asctime)s %(processName)s %(message)s', datefmt='%Y-%m-%d %I:%M:%S') def handler(sec): logging.debug('now I will sleep %s S', sec) sleep(sec) def get_pool(b_dummy=True, num=4): """ if b_dummy is True then get ThreadPool, or get process pool :param b_dummy: dummy thread Pool or Process pool :param num: thread or process num :return: pool object """ if b_dummy: pool = ThreadPool(num) else: pool = ProcessPool(num) return pool def test_dummy_thread_pool(): start_time = time() # generate task queue parameters lists lst_sleep_sec = [randrange(3, 10) for i in xrange(10)] pool = get_pool(b_dummy=False) results = pool.map(handler, lst_sleep_sec) logging.debug(results) pool.close() pool.join() logging.debug('time consume %s', time() - start_time) pass if __name__ == '__main__': test_dummy_thread_pool() pass
工作中使用的语言比较多写过C++,java, 部分html+js, python的.由于用到语言的间歇性,比如还几个月没有使用python了许多技巧就忘记了,于是我把一些常用的python代码分类项目在本人的github中,当实际中用到某一方法的时候就把常用的方法放到一个文件中方便查询。