背景

话就不多说了,直接上代码

多进程

多进程下可以用Manager类来共享变量,apply_async可以调用多个参数,用map只能调用单个参数

from multiprocessing import Pool, Manager


def rice(x,results):
    result = "吃{}碗饭...".format(x)
    # print(result)
    results.append(result)


if __name__ == '__main__':

    results = Manager().list()
    pool = Pool(processes=2)
    jobs = []
    for x in range(3):
        job = pool.apply_async(rice, (x, results))
        jobs.append(job)
    pool.close()
    pool.join()
    print(results)

# Output:   
# ['吃0碗饭...', '吃1碗饭...', '吃2碗饭...']

当然在Python3.2之后还可以用(写法)更简单一点的concurrent.futures

from concurrent.futures import ProcessPoolExecutor
p = ProcessPoolExecutor(2)
for i in range(5):
    obj = p.submit(task, i)

多线程

不是特殊情况下,一般不会用Python的多线程,毕竟有GIL锁,想了解GIL锁可以看另一篇文章:Python进阶——什么是Python GIL?

from multiprocessing.pool import ThreadPool


def rice(x,results):
    result = "吃{}碗饭...".format(x)
    results.append(result)


if __name__ == '__main__':
    results = []
    pool = ThreadPool(2)
    jobs = []
    for x in range(3):
        job = pool.apply_async(rice, (x, results))
        jobs.append(job)
    pool.close()
    pool.join()
    print(results)

# Output:   
# ['吃0碗饭...', '吃1碗饭...', '吃2碗饭...']

同样的在Python3.2之后还可以用(写法)更简单一点的concurrent.futures

from concurrent.futures import ThreadPoolExecutor
p = ThreadPoolExecutor(2)
for i in range(5):
    obj = p.submit(task, i)

协程

协程这里其实可以有很多框架可以做例子,asynciogevent,这里用gevent举栗子

from gevent import monkey, pool
monkey.patch_all()
import gevent


def rice(x):
    result = "吃{}碗饭...".format(x)
    print(result)
    gevent.sleep(1)
    print("歇{}会再吃...".format(x))

if __name__ == '__main__':
    num = 3
    pool = pool.Pool(num)
    for i in range(3):
        pool.spawn(rice, i)
    pool.join()

# Output:
吃0碗饭...
吃1碗饭...
吃2碗饭...
歇0会再吃...
歇1会再吃...
歇2会再吃...

多进程 + 多线程

这种类型比较适合CPU密集型场合,因为多线程里没有自动对锁的切换,如果用在大规模网络请求中还是比较推荐下面的多进程+协程

import os
from multiprocessing import Pool
from multiprocessing.pool import ThreadPool


def rice(x,y):
    print("{} -- 吃{}碗饭...".format(x,y))

def thread_func(x):
    pool = ThreadPool(3)
    for y in range(3):
        pool.apply_async(rice, (x,y))
    pool.close()
    pool.join()
    print("{}吃完了.".format(x))

if __name__ == '__main__':
    pool = Pool(processes=3)
    for x in range(3):
        pool.apply_async(thread_func, (x,))
    pool.close()
    pool.join()

# Output:
0 -- 吃0碗饭...
1 -- 吃0碗饭...
0 -- 吃1碗饭...
0 -- 吃2碗饭...
1 -- 吃1碗饭...
1 -- 吃2碗饭...
2 -- 吃0碗饭...
2 -- 吃1碗饭...
2 -- 吃2碗饭...
0吃完了.
1吃完了.
2吃完了.

多进程 + 协程

爬虫运用这种配合可以达到效率最快化,当然还要根据实际情况进行微调。

from gevent import monkey, pool
monkey.patch_all(thread=False)
from multiprocessing import Pool
import gevent, requests, os, time


url_lists = """https://www.0akarma.com/
https://www.0akarma.com/blog
https://www.0akarma.com/archives
https://www.0akarma.com/flicks
https://www.0akarma.com/life
https://www.0akarma.com/about
https://www.0akarma.com/search?q=
http://www.0akarma.com/
https://www.0akarma.com/feed
https://www.0akarma.com/vue-learning.html
https://www.0akarma.com/tags/Vue
https://www.0akarma.com/host-series-vulns.html
https://www.0akarma.com/tags/BlackHat
https://www.0akarma.com/tags/Host-Series
https://www.0akarma.com/tags/%E4%BB%A3%E7%A0%81%E5%AE%A1%E8%AE%A1
https://www.0akarma.com/python-universal-vuln-learning.html
https://www.0akarma.com/blog?page=2
https://www.0akarma.com/tags/Python
https://www.0akarma.com/feedlybot.html
https://www.0akarma.com/about.html
https://www.0akarma.com/tags/SQL%20Injection
https://www.0akarma.com/tags/CTF
https://www.0akarma.com/tags/Spider
https://www.0akarma.com/tags/Scrapy
https://www.0akarma.com/tags/Vulnhub
https://www.0akarma.com/tags/Hexo
https://www.0akarma.com/tags/Docker
https://www.0akarma.com/tags/Mac
https://www.0akarma.com/tags/Mentohust
https://www.0akarma.com/tags/Flask
https://www.0akarma.com/tags/Exploitation
https://www.0akarma.com/tags/Nginx
https://www.0akarma.com/tags/CTFd
https://www.0akarma.com/tags/Coolq
https://www.0akarma.com/junior-flags.html
https://www.0akarma.com/self-talking-of-2018.html
https://www.0akarma.com/ballroom-e-youkoso.html
https://www.0akarma.com/tags/Meditation
https://www.0akarma.com/tags/Flag
https://www.0akarma.com/tags/Movies
https://www.0akarma.com/life?page=2
https://www.0akarma.com/tags/Perception
https://www.0akarma.com/tags/About
https://www.0akarma.com/sccc-2019.html
https://www.0akarma.com/magic-ctfd.html
https://www.0akarma.com/BlueWhale-pwn.html
https://www.0akarma.com/yzmcms-audit.html
https://www.0akarma.com/Nginx-VulnConfig.html
https://www.0akarma.com/code-breaking-2018.html
https://www.0akarma.com/35c3-POST.html
https://www.0akarma.com/fireshell-2019.html
https://www.0akarma.com/lctf-2018%0A.html
https://www.0akarma.com/0ak-blog.html
https://www.0akarma.com/php-dangerfuncs.html
https://www.0akarma.com/hctf-2018.html
https://www.0akarma.com/SSRF.html
https://www.0akarma.com/xxe-learning.html
https://www.0akarma.com/HR-audit.html
https://www.0akarma.com/ArbFileUpload.html
https://www.0akarma.com/hexo-vps.html
https://www.0akarma.com/docker-website.html
https://www.0akarma.com/flicks?page=2
"""

def fetch_url(url):
    requests.get(url)
    # print("{} -- {}".format(os.getpid(),gevent.getcurrent()))

def gevent_func(x):
    num = 3
    p = pool.Pool(num)
    for url in url_lists.split("\n"):
        p.spawn(fetch_url, url)

if __name__ == '__main__':
    start_time = time.time()
    num = 3
    pool = Pool(processes=num)
    for i in range(num):
        pool.apply_async(gevent_func, (i,))
    pool.close()
    pool.join()
    end_time = time.time()
    run_time = end_time - start_time
    print("Spend: %s" % run_time)

# Output:
# async
Spend: 7.983597040176392
# sync
Spend: 23.319398164749146

当然如果懒的话,还可以直接用gevent开发者包装好的grequests,非常简单。

生产者+消费者

如果上面的都解决不了需求的话,那就老老实实上生产者消费者模型吧。