Python 多线程与多进程与协程

Author Avatar
Tr0y 3月 13, 2018 21:24:03 本文共 11.2k 字
  • 文为知己者书
  • 在其它设备中阅读本文章

虽然 Python Multi 一直时不时地在用,但是最近发现对 multi 这块的知识点还是有点零碎,于是专门花时间去学了一下,做个笔记(仅 py3.x,有可能不适用于 2.x,但是方法基本上是一样的)。资料来自于 CSDN,博客园,知乎等,加上了我自己的一些理解. 花费了我 6h 整理

进程

  1. 进程:程序是指令、数据及其组织形式的描述,进程是程序的实体。

  2. 子进程:子进程指的是由另一进程(对应称之为父进程)所创建的进程。

  3. fork():Unix/Linux 操作系统提供了一个 fork() 系统调用,它非常特殊。普通的函数,调用一次,返回一次,但是fork() 调用一次,返回两次,因为操作系统自动把当前进程(父进程)复制了一份(子进程),然后,分别在父进程和子进程内返回。子进程永远返回 0,而父进程返回子进程的 ID。这样做的理由是,一个父进程可以 fork 出很多子进程,所以,父进程要记下每个子进程的 ID,而子进程只需要调用 getpid() 就可以拿到父进程的 ID。

罗嗦了这么多,一点都不直观,看栗子吧

fork

Python 的 os 模块封装了常见的系统调用,其中就包括 fork,可以在 Python 程序中轻松创建子进程:

import os

print('Process (%s) start...' % os.getpid())
# Only works on Unix/Linux/Mac:
pid = os.fork()
if pid == 0:
    print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
    print('I (%s) just created a child process (%s).' % (os.getpid(), pid))

结果:

Process (876) start...
I (876) just created a child process (877).
I am child process (877) and my parent is 876.

由于 Windows 没有 fork 调用,上面的代码在 Windows 上无法运行。

有了 fork 调用,一个进程在接到新任务时就可以复制出一个子进程来处理新任务,常见的 Apache 服务器就是由父进程监听端口,每当有新的 http 请求时,就 fork 出子进程来处理新的 http 请求。

如果你打算编写多进程的服务程序,Unix/Linux 无疑是正确的选择。

由于 Windows 没有 fork 调用,难道在 Windows 上无法用 Python 编写多进程的程序?

由于 Python 是跨平台的,自然也应该提供一个跨平台的多进程支持。multiprocessing 模块就是跨平台版本的多进程模块。在 Unix/Linux 下,multiprocessing 模块封装了 fork() 调用,使我们不需要关注 fork() 的细节。由于 Windows 没有 fork 调用,因此,multiprocessing 需要“模拟”出 fork 的效果,父进程所有 Python 对象都必须通过 pickle 序列化再传到子进程去,所有,如果 multiprocessing 在 Windows 下调用失败了,要先考虑是不是 pickle 失败了。

multiprocessing

Process 类的介绍

multiprocessing 模块提供了一个 Process 类来代表一个进程对象

  1. 创建进程的类

    Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,表示一个子进程中的任务(尚未启动)

    强调:

    1. 需要使用关键字的方式来指定参数
    2. args 指定的为传给 target 函数的位置参数,是一个元组形式,必须有逗号
  2. 参数介绍

    group 参数未使用,值始终为 None

    target 表示调用对象,即子进程要执行的任务

    args 表示调用对象的位置参数元组,args=(1,2,’hexin’,)

    kwargs 表示调用对象的字典,kwargs={‘name’:’hexin’,’age’:18}

    name 为子进程的名称

  3. 方法介绍

    p.start():启动进程,并调用该子进程中的 p.run()
    p.run():进程启动时运行的方法,正是它去调用 target 指定的函数,我们自定义类的类中一定要实现该方法

    p.terminate():强制终止进程 p,不会进行任何清理操作,如果 p 创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果 p 还保存了一个锁那么也将不会被释放,进而导致死锁
    p.is_alive():如果 p 仍然运行,返回 True

    p.join([timeout]):主线程等待 p 终止(强调:是主线程处于等的状态,而 p 是处于运行的状态)。timeout 是可选的超时时间(超过这个时间,父线程不再等待子线程,继续往下执行),需要强调的是,p.join 只能 join 住 start 开启的进程,而不能 join 住 run 开启的进程

  4. 属性介绍

    p.daemon:默认值为 False,如果设为 True,代表 p 为后台运行的守护进程;当 p 的父进程终止时,p 也随之终止,并且设定为 True 后,p 不能创建自己的新进程;必须在 p.start()之前设置

    p.name:进程的名称

    p.pid:进程的 pid

    p.exitcode:进程在运行时为 None、如果为– N,表示被信号 N 结束(了解即可)

    p.authkey:进程的身份验证键,默认是由 os.urandom()随机生成的 32 字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功(了解即可)

Process 类的使用

注意:在 windows 中 Process()必须放到 if __name__ == '__main__':

下面的例子演示了启动一个子进程并等待其结束:

from multiprocessing import Process
import os

# 子进程要执行的代码
def run_proc(name):
    print('Run child process %s (%s)...' % (name, os.getpid()))


print('Parent process %s.' % os.getpid())
p = Process(target=run_proc, args=('test',))
print('Child process will start.')
p.start()
p.join()
print('Child process end.')

创建子进程时,只需要传入一个执行函数和函数的参数,创建一个 Process 实例,用 start() 方法启动,这样创建进程比 fork() 还要简单。

join() 方法可以等待子进程结束后再继续往下运行,通常用于进程间的同步。

当然,也可以使用类,继承 Process 来创建子进程

import time
import random
from multiprocessing import Process


class MyProcess(Process):
    def __init__(self,name):
        super().__init__()
        self.name = name
    def run(self):
        print('%s running' %self.name)

        time.sleep(random.randrange(1,5))
        print('%s stop' %self.name)

p1 = MyProcess('1')
p2 = MyProcess('2')
p3 = MyProcess('3')
p4 = MyProcess('4')

p1.start() #start 会自动调用 run
p2.start()
p3.start()
p4.start()
print('主线程')

结果

1 running
2 running
主线程
3 running
4 running
1 stop
4 stop
2 stop
3 stop

很多时候,子进程并不是自身,而是一个外部进程。我们创建了子进程后,还需要控制子进程的输入和输出。subprocess 模块可以让我们非常方便地启动一个子进程,然后控制其输入和输出。

下面的例子演示了如何在 Python 代码中运行命令 nslookup www.python.org,这和命令行直接运行的效果是一样的:

import subprocess

print('$ nslookup www.python.org')
r = subprocess.call(['nslookup', 'www.python.org'])
print('Exit code:', r)

结果

$ nslookup www.python.org
Server:        192.168.19.4
Address:    192.168.19.4#53

Non-authoritative answer:
www.python.org    canonical name = python.map.fastly.net.
Name:    python.map.fastly.net
Address: 199.27.79.223

Exit code: 0

Pool

开多进程的目的是为了并发,如果有多核,通常有几个核就开几个进程,进程开启过多,效率反而会下降(开启进程是需要占用系统资源的,而且开启多余核数目的进程也无法做到并行),但很明显需要并发执行的任务要远大于核数,这时我们就可以通过维护一个进程池来控制进程数目,比如 httpd 的进程模式,规定最小进程数和最大进程数等。

当被操作对象数目不大时,可以直接利用 multiprocessing 中的 Process 动态成生多个进程,十几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。

而且对于远程过程调用的高级应用程序而言,应该使用进程池,Pool 可以提供指定数量的进程,供用户调用,当有新的请求提交到 pool 中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,就重用进程池中的进程。

p = Pool([numprocess [,initializer [, initargs]]]) :创建进程池

  1. 参数

    1. numprocess:要创建的进程数,如果省略,将默认使用 cpu_count()的值
    1. initializer:是每个工作进程启动时要执行的可调用对象,默认为 None
    2. initargs:是要传给 initializer 的参数组
  2. 方法

    主要方法

    1. p.apply(func[, args[, kwargs]]):在一个池工作进程中执行 func(*args,**kwargs),然后返回结果。需要强调的是:此操作并不会在所有池工作进程中并执行 func 函数。如果要通过不同参数并发地执行 func 函数,必须从不同线程调用 p.apply()函数或者使用 p.apply_async()

      apply 很少使用

    2. apply_async(func[, arg[, kwds={}[, callback=None]]]):在一个池工作进程中执行 func(*args,**kwargs),然后返回结果。此方法的结果是 AsyncResult 类的实例,callback 是可调用对象,接收输入参数。当 func 的结果变为可用时,将理解传递给 callback。callback 禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。

    3. p.map(func, iterable[, chunksize=None]):Pool 类中的 map 方法,与内置的 map 函数用法行为基本一致,它会使进程阻塞直到返回结果。 注意,虽然第二个参数是一个迭代器,但在实际使用中,必须在整个队列都就绪后,程序才会运行子进程。

    4. map_async 与 map 的关系同 apply 与 apply_async

    5. p.close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成

    6. p.join():等待所有工作进程退出。此方法只能在 close()或 teminate()之后调用,让其不再接受新的 Process。

    7. p.terminate():结束工作进程,不再处理未处理的任务。

    其他方法

    方法 apply_async()map_async() 的返回值是 AsyncResul 的实例 obj。实例具有以下方法

    1. obj.get():返回结果,如果有必要则等待结果到达。timeout 是可选的。如果在指定时间内还没有到达,将引发异常。如果远程操作中引发了异常,它将在调用此方法时再次被引发。
    2. obj.ready():如果调用完成,返回 True
    3. obj.successful():如果调用完成且没有引发异常,返回 True,如果在结果就绪之前调用此方法,引发异常
    4. obj.wait([timeout]):等待结果变为可用。
    5. obj.terminate():立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。如果 p 被垃圾回收,将自动调用此函数

例子

from multiprocessing import Pool
import os, time, random

def long_time_task(name):
    print('Run task %s (%s)...' % (name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print('Task %s runs %0.2f seconds.' % (name, (end - start)))


print('Parent process %s.' % os.getpid())
p = Pool(4)
for i in range(5):
    p.apply_async(long_time_task, args=(i,))
print('Waiting for all subprocesses done...')
p.close()
p.join()
print('All subprocesses done.')

结果

Parent process 669.
Waiting for all subprocesses done...
Run task 0 (671)...
Run task 1 (672)...
Run task 2 (673)...
Run task 3 (674)...
Task 2 runs 0.14 seconds.
Run task 4 (673)...
Task 1 runs 0.27 seconds.
Task 3 runs 0.86 seconds.
Task 0 runs 1.41 seconds.
Task 4 runs 1.91 seconds.
All subprocesses done.

请注意输出的结果,task 0123 是立刻执行的,而 task 4 要等待前面某个 task 完成后才执行,这是因为 Pool 的默认大小在我的电脑上是 4,因此,最多同时执行 4 个进程。这是 Pool 有意设计的限制,并不是操作系统的限制。如果改成:p = Pool(5) 就可以同时跑 5 个进程。

由于 Pool 的默认大小是 CPU 的核数,如果你不幸(逃)拥有 8 核 CPU,你要提交至少 9 个子进程才能看到上面的等待效果。

又一个例子:

提交任务,并在主进程中拿到结果

(之前的 Process 是执行任务,结果放到队列里,现在可以在主进程中直接拿到结果)

from multiprocessing import Pool
import time

def work(n):
    print('开工啦...')
    time.sleep(3)
    return n ** 2

q = Pool()
#异步 apply_async 用法:如果使用异步提交的任务,主进程需要使用 jion,等待进程池内任务都处理完,然后可以用 get 收集结果,否则,主进程结束,进程池可能还没来得及执行,也就跟着一起结束了
res = q.apply_async(work, args=(2,))
q.close()
q.join() #join 在 close 之后调用
print(res.get())

#同步 apply 用法:主进程一直等 apply 提交的任务结束后才继续执行后续代码
# res=q.apply(work,args=(2,))
# print(res)

结果

开工啦...
4

Pool 对象调用 join() 方法会等待所有子进程执行完毕,调用 join() 之前必须先调用 close(),调用 close() 之后就不能继续添加新的 Process 了。

进程间通信(IPC)

注意,进程之间默认是不能共享全局变量的(子进程不能改变主进程中全局变量的值)。如果要共享全局变量需要用:

  • multiprocessing.Value("d",10.0):数值
  • multiprocessing.Array("i",[1,2,3,4,5]):列表
  1. 外部进程的通信

    有时候,子进程并不是自身,而是一个外部进程。我们创建了子进程后,还需要控制子进程的输入和输出。

    subprocess 模块可以让我们非常方便地启动一个子进程,然后控制其输入和输出。

    在上面的用 Python 模拟命令 nslookup www.python.org 的例子中,如果子进程还需要输入,则可以通过 communicate() 方法输入:

    import subprocess
    
    print('$ nslookup')
    p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    output, err = p.communicate(b'set q=mx\npython.org\nexit\n')
    print(output.decode('utf-8'))
    print('Exit code:', p.returncode)
    

    上面的代码相当于在命令行执行命令 nslookup,然后手动输入:

    set q=mx
    python.org
    exit
    
  2. 进程间通信

    Process 之间肯定是需要通信的,操作系统提供了很多机制来实现进程间的通信。Python 的 multiprocessing 模块包装了底层的机制,提供了 QueuePipes 等多种方式来交换数据。

    Queue

    创建队列的类(底层就是以管道和锁定的方式实现):

    Queue([maxsize]) :创建共享的进程队列,Queue 是多进程安全的队列,可以使用 Queue 实现多进程之间的数据传递。

    1. 参数:

      maxsize 是队列中允许最大项数,省略则无大小限制。

    2. 方法

      1. q.put():用以插入数据到队列。put 方法还有两个可选参数:blocked 和 timeout。如果 blocked 为 True(默认值),并且 timeout 为正值,该方法会阻塞 timeout 指定的时间,直到该队列有剩余的空间。如果超时,会抛出 Queue.Full 异常。如果 blocked 为 False,但该 Queue 已满,会立即抛出 Queue.Full 异常。
      2. q.get():可以从队列读取并且删除一个元素。get 方法有两个可选参数:blocked 和 timeout。如果 blocked 为 True(默认值),并且 timeout 为正值,那么在等待时间内没有取到任何元素,会抛出 Queue.Empty 异常。如果 blocked 为 False,有两种情况存在,如果 Queue 有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出 Queue.Empty 异常。若不希望在 empty 的时候抛出异常,令 blocked 为 True 或者参数全部置空即可。
      3. q.get_nowait():同 q.get(False)
      4. q.put_nowait():同 q.put(False)
      5. q.empty():调用此方法时 q 为空则返回 True,该结果不可靠,比如在返回 True 的过程中,如果队列中又加入了项目。
      6. q.full():调用此方法时 q 已满则返回 True,该结果不可靠,比如在返回 True 的过程中,如果队列中的项目被取走。
      7. q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同 q.empty()和 q.full()一样

    在父进程中创建两个子进程,一个往 Queue 里写数据,一个从 Queue 里读数据:

    from multiprocessing import Process, Queue
    import os, time, random
    
    # 写数据进程执行的代码:
    def write(q):
        print('Process to write: %s' % os.getpid())
        for value in ['A', 'B', 'C']:
            print('Put %s to queue...' % value)
            q.put(value)
            time.sleep(random.random())
    
    # 读数据进程执行的代码:
    def read(q):
        print('Process to read: %s' % os.getpid())
        while True:
            value = q.get(True)
            print('Get %s from queue.' % value)
    
    # 父进程创建 Queue,并传给各个子进程:
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 启动子进程 pw,写入:
    pw.start()
    # 启动子进程 pr,读取:
    pr.start()
    # 等待 pw 结束:
    pw.join()
    # pr 进程里是死循环,无法等待其结束,只能强行终止:
    pr.terminate()
    

    结果如下:

    Process to write: 50563
    Put A to queue...
    Process to read: 50564
    Get A from queue.
    Put B to queue...
    Get B from queue.
    Put C to queue...
    Get C from queue.
    

    Queue 通信,相当于父进程赋值了一个 Queue 给子进程,子进程在这个 Queue 放好数据后,序列化一个中间翻译,然后在反序列化返回给父进程,因为进程之间内存独立,不能传递传递的其实就是 序列化 的数据

    JoinableQueue

    JoinableQueue([maxsize]) :这就像是一个 Queue 对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。

    maxsize:队列中允许最大项数,省略则无大小限制。

    JoinableQueue 的实例 p 除了与 Queue 对象相同的方法之外还具有:

    1. q.task_done():使用者使用此方法发出信号,表示 q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发 ValueError 异常
    2. q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用 q.task_done()方法为止
    from multiprocessing import Process,JoinableQueue
    import time,random
    
    def consumer(q):
        while True:
            res = q.get()
            print('消费者拿到了 %s' %res)
            q.task_done()
    
    def producer(seq, q):
        for item in seq:
            #time.sleep(random.randrange(1,2))
            q.put(item)
            print('生产者做好了 %s' %item)
        q.join()
    
    q = JoinableQueue()
    seq = ('包子 %s' %i for i in range(5))
    
    p = Process(target=consumer, args=(q, ))
    p.daemon = True #设置为守护进程,在主线程停止时 p 也停止,但是不用担心,producer 内调用 q.join 保证了 consumer 已经处理完队列中的所有元素
    p.start()
    
    producer(seq, q)
    
    print('主线程')
    

    结果

    生产者做好了 包子 0
    生产者做好了 包子 1
    消费者拿到了 包子 0
    消费者拿到了 包子 1
    生产者做好了 包子 2
    消费者拿到了 包子 2
    生产者做好了 包子 3
    消费者拿到了 包子 3
    生产者做好了 包子 4
    消费者拿到了 包子 4
    主线程
    

    Condition

    Condition([lock/rlock])

    1. 参数

      可以传递一个 Lock/RLock 实例给构造方法,否则它将自己生成一个 RLock 实例。

    2. 方法:

      1. acquire([timeout]):首先进行 acquire,然后判断一些条件。如果条件不满足则 wait
      2. release():释放 Lock
      3. wait([timeout]): 调用这个方法将使线程进入 Condition 的等待池等待通知,并释放锁。使用前线程必须已获得锁定,否则将抛出异常。 处于 wait 状态的线程接到通知后会重新判断条件。
      4. notify(): 调用这个方法将从等待池挑选一个线程并通知,收到通知的线程将自动调用 acquire()尝试获得锁定(进入锁定池);其他线程仍然在等待池中。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。
      5. notifyAll(): 调用这个方法将通知等待池中所有的线程,这些线程都将进入锁定池尝试获得锁定。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。

    Pipe

    多进程还有一种数据传递方式叫 管道 原理和 Queue 相同

    Pipe([duplex]) :在进程之间创建一条管道,并返回元组(conn1,conn2),其中 conn1,conn2 表示管道两端的连接对象,强调一点:必须在产生 Process 对象之前产生管道

    1. 参数

      dumplex:默认管道是全双工的,如果将 duplex 射成 False,conn1 只能用于接收,conn2 只能用于发送。

    2. 方法

      主要:

      1. conn1.send(obj):通过连接发送对象。obj 是与序列化兼容的任意对象
      2. conn1.recv():接收 conn2.send(obj)发送的对象。如果没有消息可接收,recv 方法会一直阻塞。如果连接的另外一端已经关闭,那么 recv 方法会抛出 EOFError。

      其他:

      1. conn1.close():关闭连接。如果 conn1 被垃圾回收,将自动调用此方法
      2. conn1.fileno():返回连接使用的整数文件描述符
      3. conn1.poll([timeout]):如果连接上的数据可用,返回 True。timeout 指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将 timeout 射成 None,操作将无限期地等待数据到达。
      4. conn1.recv_bytes([maxlength]):接收 c.send_bytes()方法发送的一条完整的字节消息。maxlength 指定要接收的最大字节数。如果进入的消息,超过了这个最大值,将引发 IOError 异常,并且在连接上无法进行进一步读取。如果连接的另外一端已经关闭,再也不存在任何数据,将引发 EOFError 异常。
      5. conn.send_bytes(buffer [, offset [, size]]):通过连接发送字节数据缓冲区,buffer 是支持缓冲区接口的任意对象,offset 是缓冲区中的字节偏移量,而 size 是要发送字节数。结果数据以单条消息的形式发出,然后调用 c.recv_bytes()函数进行接收
      6. conn1.recv_bytes_into(buffer [, offset]):接收一条完整的字节消息,并把它保存在 buffer 对象中,该对象支持可写入的缓冲区接口(即 bytearray 对象或类似的对象)。offset 指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。如果消息长度大于可用的缓冲区空间,将引发 BufferTooShort 异常。

    举例:

    from  multiprocessing import Process, Pipe
    import time
    
    #子进程执行方法
    def f(Subconn):
        time.sleep(1)
        Subconn.send("吃了吗")
        print("来自父亲的问候:", Subconn.recv())
        Subconn.close()
    
    #创建管道两端
    parent_conn, child_conn = Pipe()
    
    #创建子进程
    p = Process(target=f, args=(child_conn,))
    p.start()
    print("来自儿子的问候:", parent_conn.recv())
    parent_conn.send("嗯")
    

    结果

    来自儿子的问候: 吃了吗
    来自父亲的问候: 嗯
    

    因为 recv 会阻塞当前进程,所以上面的结果在运行 1s 后才出现

    再来看一个更复杂点的例子

    from multiprocessing import Process, Pipe
    import time
    
    def consumer(p, name):
        left, right=p
        left.close()
        while True:
            try:
                baozi = right.recv()
                print('%s 收到包子:%s' %(name, baozi))
            except EOFError:
                #right.close()
                print('break')
                break
    
    def producer(seq, p):
        left, right = p
        #right.close()
        for i in seq:
            left.send(i)
            print('Send')
            time.sleep(0.01)
        left.close()
    
    left,right = Pipe()
    c1 = Process(target=consumer, args=((left, right), 'c1'))
    c1.start()
    
    seq = (i for i in range(10))
    producer(seq, (left, right))
    
    #right.close()
    #left.close()
    
    c1.join()
    print('主进程')
    

    注意:要正确使用 close(),否则可能会在 recv()操作上挂起。管道是由操作系统进行引用计数的,必须在所有进程中关闭管道后才能生产 EOFError 异常。因此在生产者中关闭管道不会有任何效果,除非消费者中也关闭了相同的管道端点。

    进程锁

    有些资源并不能同时使用,比如屏幕。虽然内存独立,但是为了防止进程间抢屏幕打印输出造成混乱,可以使用进程锁。加锁保证了多个进程修改同一块数据时,同一时间只能有一个修改,即串行的修改,没错,速度是慢了,牺牲了速度而保证了数据安全。

    无锁:

    from multiprocessing import  Process, Lock
    from time import sleep
    import sys
    
    #子进程执行方法
    def f(lock, num):
        #with lock:
        for i in range(20):
            print(num, end=', ')
    
        sys.stdout.flush()
        sleep(2)
        print()
    
    lock = Lock()
    pList = []
    #循环创建 100 个子进程
    for num in range(4):
        p = Process(target=f, args=(lock, num))
        pList.append(p)
        p.start()
    
    for p in pList:
        p.join()
    
    print('[!]All done!')
    

结果

0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3,



[!]All done!

有锁:

from multiprocessing import  Process, Lock
from time import sleep
import sys

#子进程执行方法
def f(lock, num):
    with lock:
        for i in range(20):
            print(num, end=', ')

        sys.stdout.flush()
        sleep(2)
        print()

lock = Lock()
pList = []

#循环创建 4 个子进程
for num in range(4):
    p = Process(target=f, args=(lock, num))
    pList.append(p)
    p.start()

for p in pList:
    p.join()

print('[!]All done!')

结果:

0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3,
[!]All done!

注意:这里使用上下文管理器来管理锁,也可以手动管理:先 lock.acquire(),再 lock.release()

死锁

是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程.

解决方法,递归锁,在 Python 中为了支持在同一进程/线程中多次请求同一资源,python 提供了可重入锁 RLock。这个 RLock 内部维护着一个 Lock 和一个 counter 变量,counter 记录了 acquire 的次数,从而使得资源可以被多次 require。直到一个线程所有的 acquire 都被 release,其他的线程才能获得资源。

信号量

即 Semahpore

互斥锁 同时只允许一个线程更改数据,而 Semaphore 是同时允许一定数量的线程更改数据 ,比如厕所有 3 个坑,那最多只允许 3 个人上厕所,后面的人只能等里面有人出来了才能再进去,如果指定信号量为 3,那么来一个人获得一把锁,计数加 1,当计数等于 3 时,后面的人均需要等待。一旦释放,就有人可以获得一把锁。

from multiprocessing import Process, Semaphore
import time, random

def go_wc(sem, user):
    sem.acquire()
    print('%s 占到一个茅坑' %user)
    time.sleep(random.randint(0, 3))
    sem.release()
    print(user, 'OK')

sem = Semaphore(2)
p_l=[]
for i in range(5):
    p = Process(target=go_wc, args=(sem, 'user%s' %i,))
    p.start()
    p_l.append(p)

for i in p_l:
    i.join()

结果

user0 占到一个茅坑
user1 占到一个茅坑
user0 OK
user2 占到一个茅坑
user1 OK
user3 占到一个茅坑
user3 OK
user4 占到一个茅坑
user2 OK
user4 OK

Event

事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为 True,那么 event.wait 方法时便不再阻塞。

clear:将“Flag”设置为 False
set:将“Flag”设置为 True

  1. 结语

    在 Unix/Linux 下,可以使用 fork() 调用实现多进程。

    要实现跨平台的多进程,可以使用 multiprocessing 模块。

    进程间通信是通过 QueuePipes 等实现的。

线程

  1. 线程:轻量级进程。线程是进程中的一个实体,是被系统独立调度和分派的基本单位,线程自己不拥有系统资源,只拥有一点儿在运行中必不可少的资源,但它可与同属一个进程的其它线程共享进程所拥有的全部资源。一个线程可以创建和撤消另一个线程,同一进程中的多个线程之间可以并发执行。由于线程之间的相互制约,致使线程在运行中呈现出间断性。线程也有就绪、阻塞和运行三种基本状态。就绪状态是指线程具备运行的所有条件,逻辑上可以运行,在等待处理机;运行状态是指线程占有处理机正在运行;阻塞状态是指线程在等待一个事件(如某个信号量),逻辑上不可执行。每一个程序都至少有一个线程,若程序只有一个线程,那就是程序本身。

threading

multiprocessing 模仿了 threading 模块,所以,只要理解了 multiprocessing,threading 也就懂了

一些区别

  1. 开启速度,主进程下开启线程速度较快。

  2. 在主进程下开启多个线程,每个线程都跟主进程的 pid 一样,开多个进程,每个进程都有不同的 pid

  3. 定时器:

    定时器,指定 n 秒后执行某操作

from threading import Timer


def hello():
    print("hello, world")

t = Timer(1, hello)
t.start()  # after 1 seconds, "hello, world" will be printed

GIL

关于 GIL

  1. GIL,即全局解释器锁。在 Cpython 解释器中,同一个进程下开启的多线程,同一时刻只能有一个线程执行,无法利用多核优势。
  2. GIL 并不是 Python 的特性,它是在实现 Python 解析器(CPython)时所引入的一个概念。JPython 就没有 GIL。

GIL 示意图

GIL vs Lock

Python 已经有一个 GIL 来保证同一时间只能有一个线程来执行了,为什么还需要 Lock?

锁的目的是为了保护共享的数据,同一时间只能有一个线程来修改共享的数据

所以,保护不同的数据就应该加不同的锁。

GIL 与 Lock 是两把锁,保护的数据不一样,前者是解释器级别的(当然保护的就是解释器级别的数据,比如垃圾回收的数据),后者是保护用户自己开发的应用程序的数据,很明显 GIL 不负责这件事,只能用户自定义加锁处理,即 Lock。Python 解释器帮你自动定期进行内存回收,你可以理解为 python 解释器里有一个独立的线程,每过一段时间它起 wake up 做一次全局轮询看看哪些内存数据是可以被清空的,此时你自己的程序 里的线程和 py 解释器自己的线程是并发运行的,假设你的线程删除了一个变量,py 解释器的垃圾回收线程在清空这个变量的过程中的 clearing 时刻,可能一个其它线程正好又重新给这个还没来及得清空的内存空间赋值了,结果就有可能新赋值的数据被删除了,为了解决类似的问题,python 解释器简单粗暴的加了锁,即当一个线程运行时,其它人都不能动,这样就解决了上述的问题, 这可以说是 Python 早期版本的遗留问题。 

协程

协程:协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的。

  1. python 的线程属于内核级别的,即由操作系统控制调度(如单线程一旦遇到 io 就被迫交出 cpu 执行权限,切换其他线程运行)

  2. 单线程内开启协程,一旦遇到 io,从应用程序级别(而非操作系统)控制切换

    对比操作系统控制线程的切换,用户在单线程内控制协程的切换,优点如下:

    1. 协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级

    2. 单线程内就可以实现并发的效果,最大限度地利用 cpu

    缺点:

    协程的本质是单线程下,无法利用多核,可以是一个程序开启多个进程,每个进程内开启多个线程,每个线程内开启协程

    协程指的是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程

Python 中的协程经历了很长的一段发展历程。其大概经历了如下三个阶段:

  1. 最初的生成器变形 yield/send
  2. 引入@asyncio.coroutine 和 yield from
  3. 在最近的 Python3.5 版本中引入 async/await 关键字

计算斐波那契数列

以计算斐波那契数列为例:

普通版:

def old_fib(n):
    res = [0] * n
    index = 0
    a = 0
    b = 1
    while index < n:
        res[index] = b
        a, b = b, a + b
        index += 1
    return res

print('-'*10 + 'test old fib' + '-'*10)
for fib_res in old_fib(20):
    print(fib_res)

如果我们仅仅是需要拿到斐波那契序列的第 n 位,或者仅仅是希望依此产生斐波那契序列,那么上面这种传统方式就会比较耗费内存。举个例子,要计算第 100 位,需要储存 1-99 位。

从 yield 说起

这时,yield 就派上用场了。

def fib(n):
    index = 0
    a = 0
    b = 1
    while index < n:
        yield b
        a, b = b, a + b
        index += 1

print('-'*10 + 'test yield fib' + '-'*10)
for fib_res in fib(20):
    print(fib_res)

当一个函数中包含 yield 语句时,python 会自动将其识别为一个生成器。这时 fib(20)并不会真正调用函数体,而是以函数体生成了一个生成器对象实例。

yield 在这里可以保留 fib 函数的计算现场,暂停 fib 的计算并将 b 返回。而将 fib 放入 for…in 循环中时,每次循环都会调用next(fib(20)),唤醒生成器,执行到下一个 yield 语句处,直到抛出 StopIteration 异常。此异常会被 for 循环捕获,导致跳出循环。

从上面的程序中可以看到,目前只有数据从 fib(20)中通过 yield 流向外面的 for 循环;如果可以向 fib(20)发送数据,那不是就可以在 Python 中实现协程了嘛。

Send 来了

于是,Python 中的生成器有了 send 函数,yield 也拥有了返回值

我们用这个特性,模拟一个慢速斐波那契数列的计算:

import time
import random

def stupid_fib(n):
    index = 0
    a = 0
    b = 1
    while index < n:
        sleep_cnt = yield b
        print('let me think {0} secs'.format(sleep_cnt))
        time.sleep(sleep_cnt)
        a, b = b, a + b
        index += 1

print('-'*10 + 'test yield send' + '-'*10)
N = 20
sfib = stupid_fib(N)
fib_res = next(sfib)

while True:
    print(fib_res)
    try:
        fib_res = sfib.send(random.uniform(0, 0.5))
    except StopIteration:
        break

其中 next(sfib) 相当于 sfib.send(None),可以使得 sfib 运行至第一个 yield 处返回。后续的 sfib.send(random.uniform(0, 0.5)) 则将一个随机的秒数发送给 sfib,作为当前中断的 yield 表达式的返回值。这样,我们可以从“主”程序中控制协程计算斐波那契数列时的思考时间,协程可以返回给“主”程序计算结果,Perfect!

yield from 横空出世

yield from 是 Python3.3 后新加的语言结构,用于重构生成器,可以这么使用:

def fib(n):
    index = 0
    a = 0
    b = 1
    while index < n:
        yield b
        a, b = b, a + b
        index += 1

def copy_fib(n):
    print('I am copy from fib')
    yield from fib(n)
    print('Copy end')

print('-'*10 + 'test yield from' + '-'*10)
for fib_res in fib(20):
    #print(fib_res)
    pass

这种使用方式很简单,但远远不是 yield from 的全部。yield from 的作用还体现在可以像一个管道一样将 send 信息传递给内层协程,并且处理好了各种异常情况,因此,对于 stupid_fib 也可以这样包装和使用:

import time
import random

def stupid_fib(n):
    index = 0
    a = 0
    b = 1
    while index < n:
        sleep_cnt = yield b
        print('let me think {0} secs'.format(sleep_cnt))
        time.sleep(sleep_cnt)
        a, b = b, a + b
        index += 1

def copy_stupid_fib(n):
    print('I am copy from stupid fib')
    yield from stupid_fib(n)
    print('Copy end')

print('-'*10 + 'test yield from and send' + '-'*10)
N = 20
csfib = copy_stupid_fib(N)
fib_res = next(csfib)
while True:
    print(fib_res)
    try:
        fib_res = csfib.send(random.uniform(0, 0.5))
    except StopIteration:
        break

这是一个生成器嵌套生成器的例子。yeild from 看上去似乎没啥卵用,但实际上,它帮我们做了很多事情。比如,要是我们自己实现一个生成器嵌套生成器,那么要注意哪些事情呢?PEP-380 规定了 yield from 的语义,或者说嵌套的 generator 应该 有的行为模式。假设我们呢自己实现的时候也按照这个规定来。

总的来说,外层的 generator 要负责为里面的 generator 做消息传递

以下假设 a 为外层,a 中包含着 b 这个生成器

  1. b 迭代产生的每个值都直接传递给 a 的调用者。

  2. 所有通过 send 方法发送到 a 的值都被直接传递给 b。如果发送的值是 None,则调用 b 的next()方法,否则调用 b 的 send 方法。如果对 b 的方法调用产生 StopIteration 异常,a 会继续 执行 yield from 后面的语句,而其他异常则会传播到 a 中,导致 a 在执行 yield from 的时候抛出异常。

  3. 如果有除 GeneratorExit 以外的异常被 throw 到 a 中的话,该异常会被直接 throw 到 b 中。如果 b 的 throw 方法抛出 StopIteration, a 会继续执行;其他异常则会导致 a 也抛出异常。

  4. 如果一个 GeneratorExit 异常被 throw 到 a 中,或者 a 的 close 方法被调用了,并且 b 也有 close 方法的话,b 的 close 方法也 会被调用。如果 b 的这个方法抛出了异常,则会导致 a 也抛出异常。 反之,如果 b 成功 close 掉了,a 也会抛出异常,但是是特定的 GeneratorExit 异常。

  5. a 中 yield from 表达式的求值结果是 b 迭代结束时抛出的 StopIteration 异常的第一个参数。

  6. b 中的 return <expr> 语句实际上会抛出 StopIteration(<expr>) 异常,所以 b 中 return 的值会成为中 yield from 表达式的返回值。

5、6 点,体现了 yield from 可以使用 return 来返回值而 yield 只能使用 try … except StopIteration … 来捕获异常的 value 值。

综上,每次自己去构造符合这些要求的嵌套生成器显然是吃力不讨好的。

yeild from 语法就是将生成器函数中包括 yield 语句的部分逻辑封装到一个子生成器函数中。然后在子生成器函数外面可以做一些其他的业务逻辑。整个生成器函数(包括子生成器函数)对外就是一个生成器函数。

asyncio.coroutine 和 yield from

yield from 在 asyncio 模块中得以发扬光大。

asyncio 是 Python 3.4 版本引入的标准库,直接内置了对异步 IO 的支持。

asyncio 的编程模型就是一个消息循环。

我们从 asyncio 模块中直接获取一个 EventLoop 的引用
然后把需要执行的协程扔到 EventLoop 中执行,就实现了异步 IO。

  1. asyncio 提供了完善的异步 IO 支持;

  2. 异步操作需要在 coroutine 中通过 yield from 完成;

  3. 多个 coroutine 可以封装成一组 Task 然后并发执行。

举个例子

import asyncio
import threading
from datetime import datetime

@asyncio.coroutine
def hello():
    print("hello!")
    #异步调用 asyncio.sleep(1)
    r = yield from asyncio.sleep(1)
    print("等待结束,返回:", r)


#get EventLoop
loop = asyncio.get_event_loop()
#exec coroutine
loop.run_until_complete(hello())
loop.close()

`@asyncio.coroutine把一个 generator 标记为 coroutine 类型,然后,我们就把这个coroutine扔到EventLoop` 中执行。

hello() 会首先打印出 Hello world!,然后,yield from 语法可以让我们方便地调用另一个 generator。由于 asyncio.sleep() 也是一个 coroutine,所以线程不会等待 asyncio.sleep(),而是直接中断并执行下一个消息循环。当 asyncio.sleep() 返回时,线程就可以从 yield from 拿到返回值(此处是 None),然后接着执行下一行语句。

asyncio.sleep(1) 看成是一个耗时 1 秒的 IO 操作,在此期间,主线程并未等待,而是去执行 EventLoop 中其他可以执行的 coroutine 了,因此可以实现并发执行。

我们用 Task 封装两个 coroutine 试试:

import threading
import asyncio

@asyncio.coroutine
def hello():
    print('Hello world! (%s)' % threading.currentThread())
    yield from asyncio.sleep(1)
    print('Hello again! (%s)' % threading.currentThread())

loop = asyncio.get_event_loop()
tasks = [hello(), hello()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

观察执行过程:

Hello world! (<_MainThread(MainThread, started 140735195337472)>)
Hello world! (<_MainThread(MainThread, started 140735195337472)>)
(暂停约 1 秒)
Hello again! (<_MainThread(MainThread, started 140735195337472)>)
Hello again! (<_MainThread(MainThread, started 140735195337472)>)

由打印的当前线程名称可以看出,两个 coroutine 是由同一个线程并发执行的。

如果把 asyncio.sleep() 换成真正的 IO 操作,则多个 coroutine 就可以由一个线程并发执行。

我们用 asyncio 的异步网络连接来获取 sina、sohu 和 163 的网站首页:

import asyncio

@asyncio.coroutine
def wget(host):
    print('wget %s...' % host)
    connect = asyncio.open_connection(host, 80)
    reader, writer = yield from connect
    header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host
    writer.write(header.encode('utf-8'))
    yield from writer.drain()
    while True:
        line = yield from reader.readline()
        if line == b'\r\n':
            break
        print('%s header > %s' % (host, line.decode('utf-8').rstrip()))
    # Ignore the body, close the socket
    writer.close()

loop = asyncio.get_event_loop()
tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

执行结果如下:

wget www.sohu.com...
wget www.sina.com.cn...
wget www.163.com...
(等待一段时间)
(打印出 sohu 的 header)
www.sohu.com header > HTTP/1.1 200 OK
www.sohu.com header > Content-Type: text/html
...
(打印出 sina 的 header)
www.sina.com.cn header > HTTP/1.1 200 OK
www.sina.com.cn header > Date: Wed, 20 May 2015 04:56:33 GMT
...
(打印出 163 的 header)
www.163.com header > HTTP/1.0 302 Moved Temporarily
www.163.com header > Server: Cdn Cache Server V2.0
...

可见 3 个连接由一个线程通过 coroutine 并发完成。

注意:yield from 后面接协程的时候才会发生中断,仅仅接生成器是不会中断的,比如 line = yield from reader.readline() 这句。

那么问题来了,如果我自己实现了一个 IO 密集型的操作(requests or others),要怎么接到 asyncio 中呢?

例子

import threading
import asyncio
import random
import time

@asyncio.coroutine
def hello():
    t = random.randint(1, 3)
    print(t)
    loop = asyncio.get_event_loop()
    yield from loop.run_in_executor(None, time.sleep, t)
    print('6666')

loop = asyncio.get_event_loop()
tasks = [hello(), hello()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

结果

Hello world! (<_MainThread(MainThread, started 140194883168064)>)
1
Hello world! (<_MainThread(MainThread, started 140194883168064)>)
3
6666
Hello again! (<_MainThread(MainThread, started 140194883168064)>)
6666
Hello again! (<_MainThread(MainThread, started 140194883168064)>)

这个问题困扰了我很久,最后在 stackoverflow 找到了例子。

asyncio 一些重要的类/参数/方法:

Task 对象是 Future 的子类,它将 coroutine 和 Future 联系在一起,将 coroutine 封装成一个 Future 对象。

一般会看到两种任务启动方法:

tasks = asyncio.gather(
  asyncio.ensure_future(func1()),
  asyncio.ensure_future(func2())
)

loop.run_until_complete(tasks)
tasks = [
  asyncio.ensure_future(func1()),
  asyncio.ensure_future(func2())
  ]
loop.run_until_complete(asyncio.wait(tasks))

BaseEventLoop.create_task(coro)
这个方法排定协程的执行时间,返回一个 asyncio.Task 对象。

asyncio 包中有多个函数会自动把参数指定的协程包装在 asyncio.Task 对象中,例如 BaseEventLoop.run_until_complete(…) 方法。

在 asyncio 包中, BaseEventLoop.create_task(…) 方法接收一个协程,排定它的运行时间,然后返回一个 asyncio.Task 实例——也是 asyncio.Future 类的实例,因为 Task 是 Future 的子类,用于包装协程。

ensure_future 可以将 coroutine 封装成 Task。这个函数统一了协程和期物(Future):第一个参数可以是二者中的任何一个。如果是 Future 或 Task 对象,那就原封不动地返回。如果是协程,那么 async 函数会调用 loop.create_task(…) 方法创建 Task 对象。 loop= 关键字参数是可选的,用于传入事件循环;如果没有传入,那么 async 函数会通过调用 asyncio.get_event_loop() 函数获取循环对象。asyncio.gather 将一些 Future 和 coroutine 封装成一个 Future。

asyncio.wait 则本身就是 coroutine,run_until_complete 既可以接收 Future 对象,也可以是 coroutine 对象。

End

What do you think?

本文标题: Python 多线程与多进程与协程
原始链接: http://www.tr0y.wang/2018/03/13/pymulti/
发布时间: 2018.03.13-21:24
最后更新: 2018.11.23-16:10
版权声明: 本站文章均采用CC BY-NC-SA 4.0协议进行许可。转载请注明出处!