There must be a good reason for our suffering. ——《加勒比海盗》
苦尽则甘来。
进程与线程
进程
计算机程序只是存储在磁盘上的可执行二进制(或其他类型)文件。只有把它们加载到内存中并被操作系统调用,才拥有其生命期。进程(有时称为重量级进程)则是一个执行中的程序。 每个进程都拥有自己的地址空间、 内存、 数据栈以及其他用于跟踪执行的辅助数据。操作系统管理其上所有进程的执行,并为这些进程合理地分配时间。进程也可以通过派生(fork 或 spawn)新的进程来执行其他任务,不过因为每个新进程也都拥有自己的内存和数据栈等,所以只能采用进程间通信(IPC)的方式共享信息。
线程
线程是程序执行时的最小单位(有时候称为轻量级进程),不过它们是在同一个进程下执行的,它是进程的一个执行流,并共享相同的上下文。可以将它们认为是在一个主进程或"主线程"中并行运行的一些"迷你进程" 。
线程包括开始、执行顺序和结束三部分。它有一个指令指针,用于记录当前运行的上下文。当其他线程运行时,它可以被抢占(中断)和临时挂起(也称为睡眠)。
一个进程中的各个线程与主线程之间可以共享同一块数据空间,因此相比于独立的进程而言,线程间的信息共享和通信更加容易。线程一般是以并发方式执行的,正是由于这种并行和数据共享机制,使得多任务间的协作成为可能。当然,在单核 CPU 系统中,因为真正的并发是不可能的,所以线程的执行实际上还是同步执行的,只是系统会根据调度算法在不同的时间安排某一个线程在CPU上执行一小会,然后让其他线程在CPU上再执行一小会,通过这种在多个线程间不断切换的方式让多个线程交替执行。因此,在宏观上看,即使在单核CPU的系统上仍然像多个线程并发运行一样。
GIL即全局解释器锁
Python之父荷兰人吉多·范罗苏姆(Guido van Rossum)在CPU单核时代为了充分地利用单核CPU的使用率,在解释器层面实现了一把全局互斥锁。
官方说明:In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)
翻译:Python官方解释器(CPython解释器)的内存管理是不安全的,因此它有一个全局解释器锁(Global Interpreter Lock简称GIL),它使得在任何时刻都只有一个线程在执行Python字节码。这也是使得标准版本的Python并不能实现真正的多线程并发的直接原因。
不过对于现在的多核处理器时代来说就是:一核有难,多核围观。
Python多线程
Thread类与线程函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 import threadingfrom time import sleep, ctimedef func (name, sec ): """ :param name: 名字 :param sec: 休眠时间,单位:秒 """ print ("Hello:" , name, "时间:" , ctime()) sleep(sec) print ("see you" , name, ctime()) def main (): thread1 = threading.Thread(target=func, args=("xiuxing" , 5 )) thread1.start() thread2 = threading.Thread(target=func, args=("Mr.xiuxing" , 10 )) thread2.start() thread1.join() thread2.join() if __name__ == '__main__' : main()
输出结果:
1 2 3 4 >>>Hello: xiuxing 时间: Wed Apr 1 16 :35 :55 2020 Hello: Mr.xiuxing 时间: Wed Apr 1 16 :35 :55 2020 see you xiuxing Wed Apr 1 16 :36 :00 2020 see you Mr.xiuxing Wed Apr 1 16 :36 :05 2020
Thread类与线程对象
Thread类构造方法的target关键字参数不仅可以是一个函数,还可以是一个对象,可以称这个对象为线程对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 import threadingfrom time import sleep, ctimeclass MyThread (object ): def __init__ (self, func, args ): self.func = func self.args = args def __call__ (self ): self.func(*self.args) def func (name, sec ): print ("Hello:" , name, "时间:" , ctime()) sleep(sec) print ("see you" , name, ctime()) def main (): print ("程序开始时间:" , ctime()) thread1 = threading.Thread(target=MyThread(func, ("xiuxing" , 5 ))) thread1.start() thread2 = threading.Thread(target=MyThread(func, ("Mr.xiuxing" , 10 ))) thread2.start() thread3 = threading.Thread(target=MyThread(func, ("Sir" , 20 ))) thread3.start() thread1.join() thread2.join() thread3.join() print ("程序结束时间:" , ctime()) if __name__ == "__main__" : main()
输出结果:
1 2 3 4 5 6 7 8 >>>程序开始时间: Wed Apr 1 17 :11 :06 2020 Hello: xiuxing 时间: Wed Apr 1 17 :11 :06 2020 Hello: Mr.xiuxing 时间: Wed Apr 1 17 :11 :06 2020 Hello: Sir 时间: Wed Apr 1 17 :11 :06 2020 see you xiuxing Wed Apr 1 17 :11 :11 2020 see you Mr.xiuxing Wed Apr 1 17 :11 :16 2020 see you Sir Wed Apr 1 17 :11 :26 2020 程序结束时间: Wed Apr 1 17 :11 :26 2020
继承Thread类
继承threading.Thread,重构父类的构造方法和 run 方法,但不将线程函数写进run方法中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 import threadingfrom time import sleep, ctimeclass MyThread (threading.Thread): def __init__ (self, func, args, name ): super ().__init__(target=func, name=name, args=args) def run (self ): self._target(*self._args) def func (name, sec ): print ("Hello:" , name, "时间:" , ctime()) sleep(sec) print ("see you" , name, ctime()) def main (): print ("程序开始时间:" , ctime()) thread1 = MyThread(func, ("xiuxing" , 5 ), '线程1' ) thread2 = MyThread(func, ("Mr.xiuxing" , 10 ), '线程2' ) thread1.start() print (thread1.name) thread2.start() print (thread2.name) thread1.join() thread2.join() print ("程序结束时间:" , ctime()) if __name__ == "__main__" : main()
输出结果:
1 2 3 4 5 6 7 8 >>>程序开始时间: Wed Apr 1 17 :45 :51 2020 Hello: xiuxing 时间: Wed Apr 1 17 :45 :51 2020 线程1 Hello: Mr.xiuxing 时间: Wed Apr 1 17 :45 :51 2020 线程2 see you xiuxing Wed Apr 1 17 :45 :56 2020 see you Mr.xiuxing Wed Apr 1 17 :46 :01 2020 程序结束时间: Wed Apr 1 17 :46 :01 2020
继承threading.Thread,重构父类的构造方法,并将线程函数写进run方法中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 class MyThread (threading.Thread): def __init__ (self, f_name, sec, name ): super ().__init__(name=name) self.f_name = f_name self.sec = sec def run (self ): print ("Hello:" , self.f_name, "时间:" , ctime()) sleep(self.sec) print ("see you" , self.f_name, ctime()) def main (): print ("程序开始时间:" , ctime()) thread1 = MyThread("xiuxing" , 5 , '线程1' ) thread2 = MyThread("Mr.xiuxing" , 10 , '线程2' ) thread1.start() print (thread1.name) thread2.start() print (thread2.name) thread1.join() thread2.join() print ("程序结束时间:" , ctime()) if __name__ == "__main__" : main()
输出结果:
1 2 3 4 5 6 7 8 >>>程序开始时间: Wed Apr 1 17 :56 :02 2020 Hello: xiuxing 时间: Wed Apr 1 17 :56 :02 2020 线程1 Hello: Mr.xiuxing 时间: Wed Apr 1 17 :56 :02 2020 线程2 see you xiuxing Wed Apr 1 17 :56 :07 2020 see you Mr.xiuxing Wed Apr 1 17 :56 :12 2020 程序结束时间: Wed Apr 1 17 :56 :12 2020
从上面两个例子可以看出,run方法不一定要在MyThread类中重写,因为Thread中已经有默认实现的了。如果想扩展也可以重写。
线程锁
多线程的目的是为了让多个程序并发执行,线程之间是进行随机调度,但是在某些情况下让多个程序同时运行会有很多麻烦,如果这些并发运行的程序还共享数据,很有可能会产生脏数据(当有多个程序同时读写一个或一组变量时,因为读写顺序的问题造成最终结果与期望值不一致)。
线程锁的目的是将一段代码锁住,一旦获得了锁权限,除非解锁,否则其他任何代码都无法再次获得锁权限。也就是在同一时刻仅允许一个线程执行操作。
想要使用线程锁,需要先创建Lock类的实例,然后通过Lock对象的acquire方法获得锁权限,当需要完成原子操作的代码执行完后,再使用Lock对象的release方法解锁。锁对象需要放在线程函数的外面作为一个全局变量,这样所有的线程函数实例就可以共享这个变量了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 import randomfrom atexit import registerfrom time import sleep, ctimefrom threading import Thread, Lock, currentThreadlock = Lock() def func (): lock.acquire() for i in range (5 ): f = open ('text.txt' , 'a' ) print ("Thread Name =" , currentThread().name, 'i =' , i) f.write("Thread Name = " +currentThread().name+' i = ' +str (i)+'\n' ) f.close() sleep((random.randint(1 , 5 ))) lock.release() def main (): for i in range (3 ): Thread(target=func).start() @register def end (): print ('线程执行完毕:' , ctime()) if __name__ == "__main__" : main()
先将func函数中的 lock.acquire() 和 lock.release() 语句注释,即在不使用线程锁的情况下运行程序,观察输出结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 Thread Name = Thread-1 i = 0 Thread Name = Thread-2 i = 0 Thread Name = Thread-3 i = 0 Thread Name = Thread-3 i = 1 Thread Name = Thread-1 i = 1 Thread Name = Thread-2 i = 1 Thread Name = Thread-3 i = 2 Thread Name = Thread-1 i = 2 Thread Name = Thread-1 i = 3 Thread Name = Thread-2 i = 2 Thread Name = Thread-3 i = 3 Thread Name = Thread-2 i = 3 Thread Name = Thread-1 i = 4 Thread Name = Thread-3 i = 4 Thread Name = Thread-2 i = 4 线程执行完毕: Tue Apr 7 15 :13 :20 2020
结果显而易见,如果没有使用线程锁,当调用sleep函数让线程休眠时,当前线程会释放CPU计算资源,而其他线程就会趁机抢占CPU计算资源,因此程序在启动的三个线程中交替执行。
现在为func函数加上线程锁,再次运行程序,观察输出结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 Thread Name = Thread-1 i = 0 Thread Name = Thread-1 i = 1 Thread Name = Thread-1 i = 2 Thread Name = Thread-1 i = 3 Thread Name = Thread-1 i = 4 Thread Name = Thread-2 i = 0 Thread Name = Thread-2 i = 1 Thread Name = Thread-2 i = 2 Thread Name = Thread-2 i = 3 Thread Name = Thread-2 i = 4 Thread Name = Thread-3 i = 0 Thread Name = Thread-3 i = 1 Thread Name = Thread-3 i = 2 Thread Name = Thread-3 i = 3 Thread Name = Thread-3 i = 4 线程执行完毕: Tue Apr 7 15 :19 :50 2020
如果为func函数加上线程锁,只用当某个线程的线程函数执行完毕,才会运行另一个线程函数。
信号量
信号量(Semaphore),用于控制获取资源的线程数量。它使用与线程锁同样的方法名消耗和释放资源。acquire方法用于消耗资源,调用该方法计数器 -1,release方法用于释放资源,调用该方法计数器 +1。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 from threading import BoundedSemaphore, Lock, Threadsemaphore = BoundedSemaphore(2 ) print (semaphore._value)semaphore.acquire() print (semaphore._value)semaphore.acquire() print (semaphore._value)print (semaphore.acquire(False ))semaphore.release() print (semaphore._value)semaphore.release() print (semaphore._value)semaphore.release()
输出结果:
1 2 3 4 5 6 7 8 9 10 11 12 Traceback (most recent call last): File "E:/Code/Test/信号量.py" , line 31 , in <module> semaphore.release() File "c:\program\anaconda\Lib\threading.py" , line 482 , in release raise ValueError("Semaphore released too many times" ) ValueError: Semaphore released too many times 2 1 0 False 1 2
当计数器为0时调用acquire方法会有两种结果。第一种:当acquire方法的参数值为True或不指定参数时,acquire会处于阻塞状态,直到release释放资源后。第二种:acquire方法的参数值为False,当计数器为0时调用acquire方法不会发生阻塞,而是返回False,表示未获得资源,如果成功获取资源则会返回True。
release方法在释放资源时,如果计数器已经达到最大值,会抛出异常,表示没有资源可供释放。
生产者消费者模型
生产者生产商品不用等待消费者处理,将其放到缓冲队列,消费者不用找生产者要数据,而是从缓冲队列提取商品处理。即生产者和消费者彼此之间不直接通讯,而是通过缓冲队列。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 from queue import Queuefrom random import randrangefrom time import sleep, ctimefrom threading import Lock, Threadlock = Lock() class MyThread (Thread ): def __init__ (self, func, args ): super ().__init__(target=func, args=args) def producerQ (queue ): lock.acquire() print (ctime(), '生产了一个对象,并将其添加到队列中' , end=' ' ) queue.put('商品' ) print ('队列大小' , queue.qsize()) lock.release() def consumerQ (queue ): lock.acquire() queue.get(1 ) print (ctime(), '消费了一个对象,队列大小' , queue.qsize()) lock.release() def producer (queue, loops ): for i in range (loops): producerQ(queue) sleep(randrange(1 , 2 )) def consumer (queue, loops ): for i in range (loops): consumerQ(queue) sleep(randrange(1 , 2 )) funcs = [producer, consumer] def main (): l_loops = randrange(3 , 6 ) q = Queue(32 ) threads = [] for func in funcs: t = MyThread(func, (q, l_loops)) threads.append(t) for thread in threads: thread.start() for thread in threads: thread.join() print ('所有工作完成' ) if __name__ == "__main__" : main()
运行结果:
1 2 3 4 5 6 7 8 9 Tue Apr 7 18 :03:20 2020 生产了一个对象,并将其添加到队列中 队列大小 1 Tue Apr 7 18 :03:20 2020 消费了一个对象,队列大小 0 Tue Apr 7 18 :03:21 2020 生产了一个对象,并将其添加到队列中 队列大小 1 Tue Apr 7 18 :03:21 2020 消费了一个对象,队列大小 0 Tue Apr 7 18 :03:22 2020 生产了一个对象,并将其添加到队列中 队列大小 1 Tue Apr 7 18 :03:22 2020 消费了一个对象,队列大小 0 Tue Apr 7 18 :03:23 2020 生产了一个对象,并将其添加到队列中 队列大小 1 Tue Apr 7 18 :03:23 2020 消费了一个对象,队列大小 0 所有工作完成
Python多进程
尽管多线程可以实现并发,但由于多线程之间是共享了当前进程的内存,也就是说:线程可以申请到的资源有限。想要更进一步利用CPU性能,就需要使用多进程。在Python中可以使用multiprocessing 模块的Pool类创建进程池。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 from multiprocessing import Poolimport os, time, randomdef func (): print ('Run process %s (%s)...' % (current_process().name, os.getpid())) start = time.time() time.sleep(random.random() * 3 ) end = time.time() print ('Task %s runs %0.2f seconds.' % (current_process().name, (end - start))) if __name__ == '__main__' : print ('Parent process %s.' % os.getpid()) pool = Pool(processes=4 ) for i in range (5 ): pool.apply_async(func) print ('Waiting for all sub-processes done...' ) pool.close() pool.join() print ('All sub-processes done.' )
输出结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 Parent process 7120. Waiting for all sub-processes done... Run process SpawnPoolWorker-3 (13136 )... Run process SpawnPoolWorker-1 (13868 )... Run process SpawnPoolWorker-2 (4636 )... Run process SpawnPoolWorker-4 (10076 )... Task SpawnPoolWorker-2 runs 0.09 seconds. Run process SpawnPoolWorker-2 (4636 )... Task SpawnPoolWorker-3 runs 0.28 seconds. Task SpawnPoolWorker-4 runs 0.42 seconds. Task SpawnPoolWorker-2 runs 1.81 seconds. Task SpawnPoolWorker-1 runs 2.70 seconds. All sub-processes done.
未完待续。。。有时间再补充