Python多线程多进程编程

There must be a good reason for our suffering. ——《加勒比海盗》

苦尽则甘来。

进程与线程

进程

  计算机程序只是存储在磁盘上的可执行二进制(或其他类型)文件。只有把它们加载到内存中并被操作系统调用,才拥有其生命期。进程(有时称为重量级进程)则是一个执行中的程序。 每个进程都拥有自己的地址空间、 内存、 数据栈以及其他用于跟踪执行的辅助数据。操作系统管理其上所有进程的执行,并为这些进程合理地分配时间。进程也可以通过派生(fork 或 spawn)新的进程来执行其他任务,不过因为每个新进程也都拥有自己的内存和数据栈等,所以只能采用进程间通信(IPC)的方式共享信息。

线程

  线程是程序执行时的最小单位(有时候称为轻量级进程),不过它们是在同一个进程下执行的,它是进程的一个执行流,并共享相同的上下文。可以将它们认为是在一个主进程或"主线程"中并行运行的一些"迷你进程" 。

  线程包括开始、执行顺序和结束三部分。它有一个指令指针,用于记录当前运行的上下文。当其他线程运行时,它可以被抢占(中断)和临时挂起(也称为睡眠)。

  一个进程中的各个线程与主线程之间可以共享同一块数据空间,因此相比于独立的进程而言,线程间的信息共享和通信更加容易。线程一般是以并发方式执行的,正是由于这种并行和数据共享机制,使得多任务间的协作成为可能。当然,在单核 CPU 系统中,因为真正的并发是不可能的,所以线程的执行实际上还是同步执行的,只是系统会根据调度算法在不同的时间安排某一个线程在CPU上执行一小会,然后让其他线程在CPU上再执行一小会,通过这种在多个线程间不断切换的方式让多个线程交替执行。因此,在宏观上看,即使在单核CPU的系统上仍然像多个线程并发运行一样。

GIL即全局解释器锁

  Python之父荷兰人吉多·范罗苏姆(Guido van Rossum)在CPU单核时代为了充分地利用单核CPU的使用率,在解释器层面实现了一把全局互斥锁。

  官方说明:In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)

  翻译:Python官方解释器(CPython解释器)的内存管理是不安全的,因此它有一个全局解释器锁(Global Interpreter Lock简称GIL),它使得在任何时刻都只有一个线程在执行Python字节码。这也是使得标准版本的Python并不能实现真正的多线程并发的直接原因。

  不过对于现在的多核处理器时代来说就是:一核有难,多核围观。

Python多线程

Thread类与线程函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import threading
from time import sleep, ctime


def func(name, sec):
"""
:param name: 名字
:param sec: 休眠时间,单位:秒
"""
print("Hello:", name, "时间:", ctime())
sleep(sec)
print("see you", name, ctime())


def main():
# 创建第一个线程对象,通过target关键字参数传入指定函数,通过args传入指定函数所需参数,以元组形式
thread1 = threading.Thread(target=func, args=("xiuxing", 5))
# 启动第一个线程
thread1.start()
# 创建第二个线程对象,通过target关键字参数传入指定函数,通过args传入指定函数所需参数,以元组形式
thread2 = threading.Thread(target=func, args=("Mr.xiuxing", 10))
# 启动第二个线程
thread2.start()
# 等待第一个线程函数执行完毕
thread1.join()
# 等待第二个线程函数执行完毕
thread2.join()


if __name__ == '__main__':
main()

输出结果:

1
2
3
4
>>>Hello: xiuxing 时间: Wed Apr  1 16:35:55 2020
Hello: Mr.xiuxing 时间: Wed Apr 1 16:35:55 2020
see you xiuxing Wed Apr 1 16:36:00 2020
see you Mr.xiuxing Wed Apr 1 16:36:05 2020

Thread类与线程对象

  Thread类构造方法的target关键字参数不仅可以是一个函数,还可以是一个对象,可以称这个对象为线程对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import threading
from time import sleep, ctime


class MyThread(object):
# func 表示线程函数, args表示线程函数的参数
def __init__(self, func, args):
self.func = func
self.args = args

# 当线程启动时会调用该函数
def __call__(self):
self.func(*self.args)


# 线程函数
def func(name, sec):
print("Hello:", name, "时间:", ctime())
sleep(sec)
print("see you", name, ctime())


def main():
print("程序开始时间:", ctime())
# 创建第一个线程对象,通过target关键字参数传入指定对象
thread1 = threading.Thread(target=MyThread(func, ("xiuxing", 5)))
# 启动第一个线程
thread1.start()
# 创建第二个线程对象,通过target关键字参数传入指定对象
thread2 = threading.Thread(target=MyThread(func, ("Mr.xiuxing", 10)))
# 启动第二个线程
thread2.start()
# 创建第三个线程对象,通过target关键字参数传入指定对象
thread3 = threading.Thread(target=MyThread(func, ("Sir", 20)))
# 启动第三个线程
thread3.start()
# 等待第一个线程函数执行完毕
thread1.join()
# 等待第二个线程函数执行完毕
thread2.join()
# 等待第三个线程函数执行完毕
thread3.join()
print("程序结束时间:", ctime())


if __name__ == "__main__":
main()

输出结果:

1
2
3
4
5
6
7
8
>>>程序开始时间: Wed Apr  1 17:11:06 2020
Hello: xiuxing 时间: Wed Apr 1 17:11:06 2020
Hello: Mr.xiuxing 时间: Wed Apr 1 17:11:06 2020
Hello: Sir 时间: Wed Apr 1 17:11:06 2020
see you xiuxing Wed Apr 1 17:11:11 2020
see you Mr.xiuxing Wed Apr 1 17:11:16 2020
see you Sir Wed Apr 1 17:11:26 2020
程序结束时间: Wed Apr 1 17:11:26 2020

继承Thread类

  继承threading.Thread,重构父类的构造方法和 run 方法,但不将线程函数写进run方法中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import threading
from time import sleep, ctime


class MyThread(threading.Thread):
# 重写父类构造方法, func为线程函数,name为线程名,args为传入线程函数的参数
def __init__(self, func, args, name):
super().__init__(target=func, name=name, args=args)

# 重写run方法
def run(self):
self._target(*self._args)


# 线程函数
def func(name, sec):
print("Hello:", name, "时间:", ctime())
sleep(sec)
print("see you", name, ctime())


def main():
print("程序开始时间:", ctime())
# 创建第一个线程对象,传入线程函数,线程函数参数(以元组形式),线程名
thread1 = MyThread(func, ("xiuxing", 5), '线程1')
# 创建第二个线程对象,传入线程函数,线程函数参数(以元组形式),线程名
thread2 = MyThread(func, ("Mr.xiuxing", 10), '线程2')
# 启动第一个线程
thread1.start()
print(thread1.name)
# 启动第二个线程
thread2.start()
print(thread2.name)
# 等待第一个线程函数执行完毕
thread1.join()
# 等待第二个线程函数执行完毕
thread2.join()
print("程序结束时间:", ctime())


if __name__ == "__main__":
main()

输出结果:

1
2
3
4
5
6
7
8
>>>程序开始时间: Wed Apr  1 17:45:51 2020
Hello: xiuxing 时间: Wed Apr 1 17:45:51 2020
线程1
Hello: Mr.xiuxing 时间: Wed Apr 1 17:45:51 2020
线程2
see you xiuxing Wed Apr 1 17:45:56 2020
see you Mr.xiuxing Wed Apr 1 17:46:01 2020
程序结束时间: Wed Apr 1 17:46:01 2020

  继承threading.Thread,重构父类的构造方法,并将线程函数写进run方法中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class MyThread(threading.Thread):
# 重写父类构造方法, f_name、sec为线程函数参数,name为线程名
def __init__(self, f_name, sec, name):
super().__init__(name=name)
self.f_name = f_name
self.sec = sec

# 将线程函数写进run方法中
def run(self):
print("Hello:", self.f_name, "时间:", ctime())
sleep(self.sec)
print("see you", self.f_name, ctime())


def main():
print("程序开始时间:", ctime())
# 创建第一个线程对象,传入重构后run方法参数以及线程名
thread1 = MyThread("xiuxing", 5, '线程1')
# 创建第二个线程对象,传入重构后run方法参数以及线程名
thread2 = MyThread("Mr.xiuxing", 10, '线程2')
# 启动第一个线程
thread1.start()
print(thread1.name)
# 启动第二个线程
thread2.start()
print(thread2.name)
# 等待第一个线程函数执行完毕
thread1.join()
# 等待第二个线程函数执行完毕
thread2.join()
print("程序结束时间:", ctime())


if __name__ == "__main__":
main()

输出结果:

1
2
3
4
5
6
7
8
>>>程序开始时间: Wed Apr  1 17:56:02 2020
Hello: xiuxing 时间: Wed Apr 1 17:56:02 2020
线程1
Hello: Mr.xiuxing 时间: Wed Apr 1 17:56:02 2020
线程2
see you xiuxing Wed Apr 1 17:56:07 2020
see you Mr.xiuxing Wed Apr 1 17:56:12 2020
程序结束时间: Wed Apr 1 17:56:12 2020

  从上面两个例子可以看出,run方法不一定要在MyThread类中重写,因为Thread中已经有默认实现的了。如果想扩展也可以重写。

线程锁

  多线程的目的是为了让多个程序并发执行,线程之间是进行随机调度,但是在某些情况下让多个程序同时运行会有很多麻烦,如果这些并发运行的程序还共享数据,很有可能会产生脏数据(当有多个程序同时读写一个或一组变量时,因为读写顺序的问题造成最终结果与期望值不一致)。
  线程锁的目的是将一段代码锁住,一旦获得了锁权限,除非解锁,否则其他任何代码都无法再次获得锁权限。也就是在同一时刻仅允许一个线程执行操作。
  想要使用线程锁,需要先创建Lock类的实例,然后通过Lock对象的acquire方法获得锁权限,当需要完成原子操作的代码执行完后,再使用Lock对象的release方法解锁。锁对象需要放在线程函数的外面作为一个全局变量,这样所有的线程函数实例就可以共享这个变量了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import random
from atexit import register
from time import sleep, ctime
from threading import Thread, Lock, currentThread


# 创建锁对象
lock = Lock()


def func():
# 获取锁权限
lock.acquire()
for i in range(5):
f = open('text.txt', 'a')
print("Thread Name =", currentThread().name, 'i =', i)
f.write("Thread Name = "+currentThread().name+' i = '+str(i)+'\n')
f.close()
sleep((random.randint(1, 5)))
# 释放线程锁
lock.release()


def main():
# 通过循环创建并启动三个线程
for i in range(3):
Thread(target=func).start()


# 当程序结束时会调用这个函数
@register
def end():
print('线程执行完毕:', ctime())


if __name__ == "__main__":
main()

  先将func函数中的 lock.acquire() 和 lock.release() 语句注释,即在不使用线程锁的情况下运行程序,观察输出结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Thread Name = Thread-1 i = 0
Thread Name = Thread-2 i = 0
Thread Name = Thread-3 i = 0
Thread Name = Thread-3 i = 1
Thread Name = Thread-1 i = 1
Thread Name = Thread-2 i = 1
Thread Name = Thread-3 i = 2
Thread Name = Thread-1 i = 2
Thread Name = Thread-1 i = 3
Thread Name = Thread-2 i = 2
Thread Name = Thread-3 i = 3
Thread Name = Thread-2 i = 3
Thread Name = Thread-1 i = 4
Thread Name = Thread-3 i = 4
Thread Name = Thread-2 i = 4
线程执行完毕: Tue Apr 7 15:13:20 2020

  结果显而易见,如果没有使用线程锁,当调用sleep函数让线程休眠时,当前线程会释放CPU计算资源,而其他线程就会趁机抢占CPU计算资源,因此程序在启动的三个线程中交替执行。
  现在为func函数加上线程锁,再次运行程序,观察输出结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Thread Name = Thread-1 i = 0
Thread Name = Thread-1 i = 1
Thread Name = Thread-1 i = 2
Thread Name = Thread-1 i = 3
Thread Name = Thread-1 i = 4
Thread Name = Thread-2 i = 0
Thread Name = Thread-2 i = 1
Thread Name = Thread-2 i = 2
Thread Name = Thread-2 i = 3
Thread Name = Thread-2 i = 4
Thread Name = Thread-3 i = 0
Thread Name = Thread-3 i = 1
Thread Name = Thread-3 i = 2
Thread Name = Thread-3 i = 3
Thread Name = Thread-3 i = 4
线程执行完毕: Tue Apr 7 15:19:50 2020

  如果为func函数加上线程锁,只用当某个线程的线程函数执行完毕,才会运行另一个线程函数。

信号量

  信号量(Semaphore),用于控制获取资源的线程数量。它使用与线程锁同样的方法名消耗和释放资源。acquire方法用于消耗资源,调用该方法计数器 -1,release方法用于释放资源,调用该方法计数器 +1。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from threading import BoundedSemaphore, Lock, Thread


# 创建信号量对象,并设置计数器最大值,计数器不能超过这个值
semaphore = BoundedSemaphore(2)
# 输出当前计数器的值,输出结果:2
print(semaphore._value)
# 获取资源,计数器 -1
semaphore.acquire()
# 输出结果:1
print(semaphore._value)
# 获取资源,计数器 -1
semaphore.acquire()
# 输出结果:0
print(semaphore._value)
# 当计数器为0时,不能再获取资源, 所以acquire方法会返回False
# 输出结果:False
print(semaphore.acquire(False))
# 释放资源,计数器 +1
semaphore.release()
# 输出结果 :1
print(semaphore._value)
# 释放资源,计数器 +1
semaphore.release()
# 输出结果 :2
print(semaphore._value)
# 抛出异常,当计数器达到最大值时,不能再释放资源
semaphore.release()

输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
Traceback (most recent call last):
File "E:/Code/Test/信号量.py", line 31, in <module>
semaphore.release()
File "c:\program\anaconda\Lib\threading.py", line 482, in release
raise ValueError("Semaphore released too many times")
ValueError: Semaphore released too many times
2
1
0
False
1
2

  当计数器为0时调用acquire方法会有两种结果。第一种:当acquire方法的参数值为True或不指定参数时,acquire会处于阻塞状态,直到release释放资源后。第二种:acquire方法的参数值为False,当计数器为0时调用acquire方法不会发生阻塞,而是返回False,表示未获得资源,如果成功获取资源则会返回True。
  release方法在释放资源时,如果计数器已经达到最大值,会抛出异常,表示没有资源可供释放。

生产者消费者模型

  生产者生产商品不用等待消费者处理,将其放到缓冲队列,消费者不用找生产者要数据,而是从缓冲队列提取商品处理。即生产者和消费者彼此之间不直接通讯,而是通过缓冲队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
from queue import Queue
from random import randrange
from time import sleep, ctime
from threading import Lock, Thread


# 创建线程锁
lock = Lock()


# 从Thread派生的子类
class MyThread(Thread):
def __init__(self, func, args):
super().__init__(target=func, args=args)


# 生产者向队列中添加商品
def producerQ(queue):
# 获取线程锁
lock.acquire()
print(ctime(), '生产了一个对象,并将其添加到队列中', end=' ')
# 向队列中添加商品
queue.put('商品')
print('队列大小', queue.qsize())
# 释放线程锁
lock.release()


# 消费者从队列中获取商品
def consumerQ(queue):
# 获取线程锁
lock.acquire()
# 向队列中添加商品
queue.get(1)
print(ctime(), '消费了一个对象,队列大小', queue.qsize())
# 释放线程锁
lock.release()


# 生成多个生产者
def producer(queue, loops):
for i in range(loops):
producerQ(queue)
sleep(randrange(1, 2))


# 生成多个消费者
def consumer(queue, loops):
for i in range(loops):
consumerQ(queue)
sleep(randrange(1, 2))


funcs = [producer, consumer]


def main():
l_loops = randrange(3, 6)
q = Queue(32)
threads = []
# 创建2个线程运行producer函数和consumer
for func in funcs:
t = MyThread(func, (q, l_loops))
threads.append(t)
# 启动线程
for thread in threads:
thread.start()
# 等待线程结束
for thread in threads:
thread.join()
print('所有工作完成')


if __name__ == "__main__":
main()

运行结果:

1
2
3
4
5
6
7
8
9
Tue Apr  7 18:03:20 2020 生产了一个对象,并将其添加到队列中 队列大小 1
Tue Apr 7 18:03:20 2020 消费了一个对象,队列大小 0
Tue Apr 7 18:03:21 2020 生产了一个对象,并将其添加到队列中 队列大小 1
Tue Apr 7 18:03:21 2020 消费了一个对象,队列大小 0
Tue Apr 7 18:03:22 2020 生产了一个对象,并将其添加到队列中 队列大小 1
Tue Apr 7 18:03:22 2020 消费了一个对象,队列大小 0
Tue Apr 7 18:03:23 2020 生产了一个对象,并将其添加到队列中 队列大小 1
Tue Apr 7 18:03:23 2020 消费了一个对象,队列大小 0
所有工作完成

Python多进程

  尽管多线程可以实现并发,但由于多线程之间是共享了当前进程的内存,也就是说:线程可以申请到的资源有限。想要更进一步利用CPU性能,就需要使用多进程。在Python中可以使用multiprocessing 模块的Pool类创建进程池。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from multiprocessing import Pool
import os, time, random


def func():
print('Run process %s (%s)...' % (current_process().name, os.getpid()))
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print('Task %s runs %0.2f seconds.' % (current_process().name, (end - start)))


if __name__ == '__main__':
print('Parent process %s.' % os.getpid())
pool = Pool(processes=4)
for i in range(5):
pool.apply_async(func) # apply_async实现非阻塞模式,apply实现阻塞模式
print('Waiting for all sub-processes done...')
pool.close() # 先调用close()才能调用join()
pool.join() # 等待所有子进程执行完毕
print('All sub-processes done.')

输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
Parent process 7120.
Waiting for all sub-processes done...
Run process SpawnPoolWorker-3 (13136)...
Run process SpawnPoolWorker-1 (13868)...
Run process SpawnPoolWorker-2 (4636)...
Run process SpawnPoolWorker-4 (10076)...
Task SpawnPoolWorker-2 runs 0.09 seconds.
Run process SpawnPoolWorker-2 (4636)...
Task SpawnPoolWorker-3 runs 0.28 seconds.
Task SpawnPoolWorker-4 runs 0.42 seconds.
Task SpawnPoolWorker-2 runs 1.81 seconds.
Task SpawnPoolWorker-1 runs 2.70 seconds.
All sub-processes done.

未完待续。。。有时间再补充