上一篇文章介绍了线程内的并发——协程,但有的时候协程也不能解决所有的问题,因为Python自己启用了GIL(全局解释锁,具体是什么还没研究),所以不能发挥多核处理器的优势。所以我想到在进程级别再并发一次好了。
multiprocessing这个库提供了对于多进程的支持,主要就是使用其中的Pool进程池技术。
来看Python Reference中最简单的例子:
1 2 3 4 5 6 7 8
| from multiprocessing import Pool
def f(x): return x*x
if __name__ == '__main__': with Pool(5) as p: print(p.map(f, [1, 2, 3]))
|
得到的结果是[1, 4, 9],这个是最简单的实现,Pool.map这个函数能够将f这个函数分别作用于列表中的每个元素(参考基本的map内建函数),但其实是并发实现的,而且给出了返回值的结果组成了一个列表。
Pool的构造函数的第一个参数是processes为最大并发数量。
再看一个复杂的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| from multiprocessing import Pool from time import sleep
def f(x): return x*x
if __name__ == '__main__': with Pool(processes=4) as pool:
print(pool.map(f, range(10)))
for i in pool.imap_unordered(f, range(10)): print(i)
res = pool.apply_async(f, [10]) print(res.get(timeout=1))
res = pool.apply_async(sleep, [10]) print(res.get(timeout=1))
|
apply_async函数为异步作用函数f于参数,如果有多个参数,以元组或者列表的形式给出。然而map也有一个异步的版本map_async,但我还没研究出来怎么作用多个参数的函数……
给出我自己写的一个小例子(其实就是上次协程的例子改了一下….)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| import multiprocessing
total_start = 0 total_end = 10000 workload = 200
def generate_tasklist(): start_list = range(total_start, total_end, workload) return map(lambda x, y: (x, y-1), start_list, start_list[1:] + [total_end+1])
def fronter(id, since, end): print '[%d] Starting ...' % id response = urllib2.urlopen('https://github.com') print '[%d] Finish with %d' % response.getcode()
if __name__ == '__main__': task_list = generate_tasklist() pool = multiprocessing.Pool(processes=len(tasklist)) for i, task in enumerate(task_list): pool.apply_async(fronter, args=(i,)+task) pool.close() pool.join() print '[+] Total workers: %d' % len(task_list)
|
执行apply_async任务就开始运行了不会阻塞,所以要使用join来等待。
友情提醒一下,多进程可是会吃掉很多CPU和内存的,不要把并发数设得跟协程那样高不然肯定会卡死的…….而且其它的进程不太好控制,如果不是非常熟悉进程树操作的话还是别太激进…..
貌似之前看网上的资料有人说windows下出现多个窗口卡死什么的,解决方案是:
1 2
| import multiprocessing multiprocessing.freeze_support()
|
不过我倒是没碰到过这种情况…..
Python 官方手册:multiprocessing(3和2好像没有多大区别)
闲扯
貌似这个东西与gevent有点冲突,如果之前用了monkey.patch_all的话,这里的多进程会出现卡死的情况,不是系统卡死,而是进程都不工作了,也许是在互相切换……
去掉一些patch就可以解决这个问题,例如我只执行了patch_socket与patch_ssl这就可以正常了,具体是哪个patch出的问题我也不知道,至于原理,呵呵……