上一篇文章介绍了线程内的并发——协程,但有的时候协程也不能解决所有的问题,因为Python自己启用了GIL(全局解释锁,具体是什么还没研究),所以不能发挥多核处理器的优势。所以我想到在进程级别再并发一次好了。

multiprocessing这个库提供了对于多进程的支持,主要就是使用其中的Pool进程池技术。

来看Python Reference中最简单的例子:

1
2
3
4
5
6
7
8
from multiprocessing import Pool

def f(x):
return x*x

if __name__ == '__main__':
with Pool(5) as p:
print(p.map(f, [1, 2, 3]))

得到的结果是[1, 4, 9],这个是最简单的实现,Pool.map这个函数能够将f这个函数分别作用于列表中的每个元素(参考基本的map内建函数),但其实是并发实现的,而且给出了返回值的结果组成了一个列表。

Pool的构造函数的第一个参数是processes为最大并发数量。

再看一个复杂的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from multiprocessing import Pool
from time import sleep

def f(x):
return x*x

if __name__ == '__main__':
# start 4 worker processes
with Pool(processes=4) as pool:

# print "[0, 1, 4,..., 81]"
print(pool.map(f, range(10)))

# print same numbers in arbitrary order
for i in pool.imap_unordered(f, range(10)):
print(i)

# evaluate "f(10)" asynchronously
res = pool.apply_async(f, [10])
print(res.get(timeout=1)) # prints "100"

# make worker sleep for 10 secs
res = pool.apply_async(sleep, [10])
print(res.get(timeout=1)) # raises multiprocessing.TimeoutError

# exiting the 'with'-block has stopped the pool

apply_async函数为异步作用函数f于参数,如果有多个参数,以元组或者列表的形式给出。然而map也有一个异步的版本map_async,但我还没研究出来怎么作用多个参数的函数……

给出我自己写的一个小例子(其实就是上次协程的例子改了一下….)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import multiprocessing

total_start = 0
total_end = 10000
workload = 200

def generate_tasklist():
start_list = range(total_start, total_end, workload)
return map(lambda x, y: (x, y-1), start_list, start_list[1:] + [total_end+1])

def fronter(id, since, end):
# some code to crawl
print '[%d] Starting ...' % id
response = urllib2.urlopen('https://github.com')
print '[%d] Finish with %d' % response.getcode()

if __name__ == '__main__':
task_list = generate_tasklist()
pool = multiprocessing.Pool(processes=len(tasklist))
for i, task in enumerate(task_list):
pool.apply_async(fronter, args=(i,)+task)
pool.close() # 关闭进程池,不再接受任务提交
pool.join() # 等待所有任务完成
print '[+] Total workers: %d' % len(task_list)

执行apply_async任务就开始运行了不会阻塞,所以要使用join来等待。

友情提醒一下,多进程可是会吃掉很多CPU和内存的,不要把并发数设得跟协程那样高不然肯定会卡死的…….而且其它的进程不太好控制,如果不是非常熟悉进程树操作的话还是别太激进…..

貌似之前看网上的资料有人说windows下出现多个窗口卡死什么的,解决方案是:

1
2
import multiprocessing
multiprocessing.freeze_support()

不过我倒是没碰到过这种情况…..

Python 官方手册:multiprocessing(3和2好像没有多大区别)

闲扯

貌似这个东西与gevent有点冲突,如果之前用了monkey.patch_all的话,这里的多进程会出现卡死的情况,不是系统卡死,而是进程都不工作了,也许是在互相切换……

去掉一些patch就可以解决这个问题,例如我只执行了patch_socket与patch_ssl这就可以正常了,具体是哪个patch出的问题我也不知道,至于原理,呵呵……