Python核心编程——第4章 多线程编程 笔记

    xiaoxiao2025-08-04  24

    Python核心编程——第4章 多线程编程 笔记

    引言通用概念python相关概念 多线程的实现方式两大模块三种替代方案使用Thread类主要的三种创建线程的方法相关模块 代码实现部分(python3)最简单的线程,定时等待使用锁来等待使用Thread类的三种创建线程的方法之一:创建Thread实例,传函数使用Thread类的三种创建线程的方法之二:创建Thread实例,传可调用类使用Thread类的三种创建线程的方法之三:派生Thread子类,并创建子类的实例多线程与单线程的比较之基础定义IO密集型实例:亚马逊书籍排名(bookrank.py)使用 concurrent.futures中的线程池模块的加强版亚马逊书籍排名使用信号量的实例队列+Thread: 生产者与消费者问题

    引言

    通用概念

    计算机程序: 存储在磁盘上的可执行二进制(或其他类型)文件。

    进程(重量级进程) 则是一个执行中程序,有生命周期,每个进程都拥有自己的地址空间、内存、数据栈以及其他用于跟踪执行的辅助数据。进程间通过进程间通信(IPC)方式共享信息。

    线程(轻量级进程) 在同一个进程下执行,共享相同的上下文。

    临界区代码 一般在多线程代码中,总会有一些特定的函数或代码块不希望(或不应该)被多个线程同时执行,通常包括修改数据库、更新文件或其他会产生竟态条件的类似情况。

    同步 任意数量的线程可以访问临界区的代码,但在给定的时刻只有一个线程可以通过时,就是使用同步的时候。

    信号量 最古老的同步原语之一。它是一个计数器,当资源消耗时递减,当资源释放时递增。

    多线程的目的 相互独立、无因果关系的任务同时进行,以显著提高整个任务的性能。

    适用的任务特点

    本质是上是异步的需要多个并发活动每个活动的处理顺序可能是不确定的,或者是随机、不可预测的这种任务可以被组织或划分成多个执行流,其中每个执行流都有一个指定要完成的任务。根据应用的不同,这些子任务可能需要计算出中间结果,然后合并为最终的输出结果。

    典型的两类任务

    计算密集型任务单线程多个外部输入源(划分为3个任务) UserRequestThread:负责读取客户端输入,该输入可能来自 I/O 通道。程序将创建多个线程,每个客户端一个,客户端的请求将会被放入队列中。RequestProcessor:该线程负责从队列中获取请求并进行处理,为第 3 个线程提供输出。ReplyThread:负责向用户输出,将结果传回给用户(如果是网络应用),或者把数据写到本地文件系统或数据库中。

    python相关概念

    执行方式 Python代码由Python虚拟机(又名解释器主循环)进行控制的。在主循环中同时只能有一个控制线程执行,任意给定时刻只有一个线程会被解释器执行。 全局解释器锁(GIL) 控制对Python虚拟机的访问。

    GIL执行方式 设置GIL。切换一个进程取执行。执行指定数量的字节码指令/线程主动让出控制权把线程设置辉睡眠状态(切换出进程)。解锁GIL。重复上述步骤

    守护线程 整个Python程序(主线程)将在所有非守护线程退出后才退出,即主线程结束后守护线程仍然可以工作。

    多线程的实现方式

    两大模块

    实现方式提供原语守护线程是否建议使用thread模块acquire获取、release释放、locked状态:较基础不支持一般不建议threading模块Lock锁、Condition、Semaphore信号量等:较丰富支持建议使用

    三种替代方案

    名称用途subprocess模块主要用于通过标准(stdin、stdout、stderr)进行进程间通信multiprocessing模块允许为多核或多CPU派生进程,接口与threading相似concurrent.futures模块新的高级库,在“任务”级别进行操作。线程池的使用。

    使用Thread类主要的三种创建线程的方法

    方式建议创建Thread的实例,传给它一个函数简单直接,建议创建Thread的实例,传给它一个可调用的类实例难以阅读,不建议派生Thread类的子类,并创建子类的实例更符合面向对象的接口时使用,建议

    相关模块

    多线程应用编程中可能会使用到的一些模块

    模块描述thread基本的、低级别的线程模块,python3中重命名为_threadthreading高级别的线程和同步对象multiprocessing使用“threading”接口派生、使用子进程subprocess完全跳过线程,使用进程来执行Queue供多线程使用的同步先入先出队列mutex互斥对象,python3.0已移除concurrent.futures异步执行的高级别库SocketServer创建、管理线程控制的TCP、UDP服务器

    代码实现部分(python3)

    最简单的线程,定时等待

    #!/usr/bin/env python import _thread as thread from time import sleep, ctime def loop0(): print('start loop 0 at: {}'.format(ctime())) sleep(4) print('loop 0 done at: {}'.format(ctime())) def loop1(): print('start loop 1 at: {}'.format(ctime())) sleep(2) print('loop 1 done at: {}'.format(ctime())) def main(): print('starting at: {}'.format(ctime())) thread.start_new_thread(loop0, ()) thread.start_new_thread(loop1, ()) sleep(6) # 停止6秒,该句去掉则先输出all DONE 再输出loop0或loop1 print('all DONE at: {}'.format(ctime())) if __name__ == '__main__': main()

    使用锁来等待

    #!/usr/bin/env python import _thread as thread from time import sleep, ctime def loop0(): print('start loop 0 at: {}'.format(ctime())) sleep(4) print('loop 0 done at: {}'.format(ctime())) def loop1(): print('start loop 1 at: {}'.format(ctime())) sleep(2) print('loop 1 done at: {}'.format(ctime())) def main(): print('starting at: {}'.format(ctime())) thread.start_new_thread(loop0, ()) thread.start_new_thread(loop1, ()) sleep(6) print('all DONE at: {}'.format(ctime())) if __name__ == '__main__': main()

    使用Thread类的三种创建线程的方法之一:创建Thread实例,传函数

    #!/usr/bin/env python # coding:utf-8 import threading from time import sleep, ctime loops = [4, 2] # 等待时间 def loop(nloop, nsec): print('start loop {} at: {}'.format(nloop, ctime())) sleep(nsec) print('loop {} done at: {}'.format(nloop, ctime())) def main(): print('starting at: {}'.format(ctime())) threads = [] nloops = list(range(len(loops))) for i in nloops: # 生成Thread对象,函数+参数 t = threading.Thread(target=loop, args=(i, loops[i])) threads.append(t) for i in nloops: threads[i].start() # 开始启动线程 for i in nloops: # 等待所有线程执行完毕 threads[i].join() # threads to finish print('all DONE at: {}'.format(ctime())) if __name__ == '__main__': main()

    运行结果

    starting at: Sat May 25 11:37:11 2019 start loop 0 at: Sat May 25 11:37:11 2019 start loop 1 at: Sat May 25 11:37:11 2019 loop 1 done at: Sat May 25 11:37:13 2019 loop 0 done at: Sat May 25 11:37:15 2019 all DONE at: Sat May 25 11:37:15 2019

    使用Thread类的三种创建线程的方法之二:创建Thread实例,传可调用类

    #!/usr/bin/env python import threading from time import sleep, ctime loops = [4, 2] class ThreadFunc(object): def __init__(self, func, args, name=''): self.name = name self.func = func self.args = args def __call__(self): self.func(*self.args) def loop(nloop, nsec): print('start loop {} at: {}'.format(nloop, ctime())) sleep(nsec) print('loop {} done at: {}'.format(nloop, ctime())) def main(): print('starting at: {}'.format(ctime())) threads = [] nloops = list(range(len(loops))) for i in nloops: t = threading.Thread(target=ThreadFunc(loop,(i, loops[i]), loop.__name__)) threads.append(t) for i in nloops: threads[i].start() # start threads for i in nloops: # wait for all threads[i].join() # threads to finish print('all DONE at: {}'.format(ctime())) if __name__ == '__main__': main()

    使用Thread类的三种创建线程的方法之三:派生Thread子类,并创建子类的实例

    #!/usr/bin/env python # coding:utf-8 import threading from time import sleep, ctime loops = [4, 2] class MyThread(threading.Thread): # 继承Thread类 def __init__(self, func, args, name=''): threading.Thread.__init__(self) self.name = name self.func = func self.args = args def run(self): self.func(*self.args) def loop(nloop, nsec): print('start loop {} at: {}'.format(nloop, ctime())) sleep(nsec) print('loop {} done at: {}'.format(nloop, ctime())) def main(): print('starting at: {}'.format(ctime())) threads = [] nloops = list(range(len(loops))) for i in nloops: t = MyThread(loop,(i, loops[i]), loop.__name__) # 生成派生子类实例 threads.append(t) for i in nloops: threads[i].start() # start threads for i in nloops: # wait for all threads[i].join() # threads to finish print('all DONE at: {}'.format(ctime())) if __name__ == '__main__': main()

    多线程与单线程的比较之基础定义

    #!/usr/bin/env python import threading from time import ctime class MyThread(threading.Thread): def __init__(self, func, args, name=''): threading.Thread.__init__(self) self.name = name self.func = func self.args = args def getResult(self): return self.res def run(self): print('staring {} at: {}'.format(self.name, ctime())) self.res = self.func(*self.args) print('{} finished at: {}'.format(self.name, ctime()))

    多线程与单线程的比较

    #!/usr/bin/env python from myThread import MyThread from time import ctime, sleep def fib(x): sleep(0.005) if x < 2: return 1 return (fib(x-2) + fib(x-1)) def fac(x): sleep(0.1) if x < 2: return 1 return (x * fac(x-1)) The def sum(x): sleep(0.1) if x < 2: return 1 return (x + sum(x-1)) funcs = [fib, fac, sum] n = 12 def main(): nfuncs = list(range(len(funcs))) print('*** SINGLE THREAD') for i in nfuncs: print('starting', funcs[i].__name__, 'at:', ctime()) print(funcs[i](n)) print(funcs[i].__name__, 'finished at:', ctime()) print('\n*** MULTIPLE THREADS') threads = [] for i in nfuncs: t = MyThread(funcs[i], (n,), funcs[i].__name__) threads.append(t) for i in nfuncs: threads[i].start() for i in nfuncs: threads[i].join() print(threads[i].getResult()) print('all DONE') if __name__ == '__main__': main()

    多线程与单线程的比较之输出结果 5s vs. 2s

    *** SINGLE THREAD starting fib at: Sat May 25 12:08:06 2019 233 fib finished at: Sat May 25 12:08:08 2019 starting fac at: Sat May 25 12:08:08 2019 479001600 fac finished at: Sat May 25 12:08:09 2019 starting sum at: Sat May 25 12:08:09 2019 78 sum finished at: Sat May 25 12:08:11 2019

    *** MULTIPLE THREADS staring fib at: Sat May 25 12:08:11 2019staring fac at: Sat May 25 12:08:11 2019 staring sum at: Sat May 25 12:08:11 2019 sum finished at: Sat May 25 12:08:12 2019 fac finished at: Sat May 25 12:08:12 2019 fib finished at: Sat May 25 12:08:13 2019 233 479001600 78 all DONE

    IO密集型实例:亚马逊书籍排名(bookrank.py)

    #!/usr/bin/env python # coding:utf-8 from atexit import register from re import compile from threading import Thread from time import ctime import requests REGEX = compile('#([\d,]+) in Books') # 正则规则 AMZN = 'https://amazon.com/dp/' ISBNs = { '0132269937': 'Core Python Programming', '0132356139': 'Python Web Development with Django', '0137143419': 'Python Fundamentals', } def getRanking(isbn): _url = '%s%s' % (AMZN, isbn) user_agent = 'Mozilla/4.0 (compatible; MSIE 5.5; Windows NT)' headers = {'User-Agent': user_agent} page = requests.get(_url, headers=headers) data = page.text if page.status_code == 200: return REGEX.findall(data)[0] else: return "unknown" def _showRanking(isbn): print('- %r randked %s' %(ISBNs[isbn], getRanking(isbn))) def _main(): print('At', ctime(), 'on Amazon...') for isbn in ISBNs: # _showRanking(isbn) Thread(target=_showRanking, args=(isbn,)).start() @register # atexit模块主要的作用就是在程序即将结束之前执行的代码, atexit.register 注册函数,注jupyter notebook运行时无效,原因未知 def _atexit(): print('all DONE at:', ctime()) if __name__ == '__main__': _main()

    运行结果

    At Sun May 26 22:33:05 2019 on Amazon… - ‘Python Web Development with Django’ randked 451,395 - ‘Python Fundamentals’ randked 5,301,299 - ‘Core Python Programming’ randked 794,988 all DONE at: Sun May 26 22:33:10 2019

    使用 concurrent.futures中的线程池模块的加强版亚马逊书籍排名

    #!/usr/bin/env python from concurrent.futures import ThreadPoolExecutor from re import compile from time import ctime from urllib.request import urlopen as uopen, Request REGEX = compile(b'#([\d,]+) in Books ') AMZN = 'http://amazon.com/dp/' ISBNs={ '0132269937': 'Core Python Programming', '0132356139': 'Python Web Development with Django', '0137143419': 'Python Fundamentals', } def getRanking(isbn): # 伪装成浏览器访问,直接访问的话会拒绝 myUrl = '{0}{1}'.format(AMZN, isbn) user_agent = 'Mozilla/4.0 (compatible; MSIE 5.5; Windows NT)' headers = {'User-Agent': user_agent} # 构造请求 req = Request(myUrl, headers=headers) with uopen(req) as page: return str(REGEX.findall(page.read())[0], 'utf-8') def _main(): print('At', ctime(), 'on Amazon...') with ThreadPoolExecutor(3) as executor: for isbn, ranking in zip( ISBNs, executor.map(getRanking, ISBNs)): print('- %r ranked %s' % (ISBNs[isbn], ranking)) print('all DONE at:', ctime()) if __name__ == '__main__': _main()

    输出结果

    At Sun May 26 22:36:40 2019 on Amazon… - ‘Core Python Programming’ ranked 794,988 - ‘Python Web Development with Django’ ranked 451,395 - ‘Python Fundamentals’ ranked 5,301,299 all DONE at: Sun May 26 22:36:57 2019

    使用信号量的实例

    #!/usr/bin/env python from atexit import register from random import randrange from threading import BoundedSemaphore, Lock, Thread from time import sleep, ctime lock = Lock() MAX = 5 candytray = BoundedSemaphore(MAX) def refill(): lock.acquire() print('Refilling candy...', end=' ') try: candytray.release() except ValueError: print('full, skipping') else: print('OK') lock.release() def buy(): lock.acquire() print('Buying candy...') if candytray.acquire(False): print('OK') else: print('empty, skipping') lock.release() def producer(loops): for i in range(loops): refill() sleep(randrange(3)) def consumer(loops): for i in range(loops): buy() sleep(randrange(3)) def _main(): print('starting at:', ctime()) nloops = randrange(2, 6) print('THE CANDY MACHINE (full with %d bars)!' % MAX) Thread(target=consumer, args=(randrange( nloops, nloops+MAX+2),)).start() # buyer Thread(target=producer, args=(nloops,)).start() #vndr @register def _atexit(): print('all DONE at:', ctime()) if __name__ == '__main__': _main()

    输出结果

    starting at: Sun May 26 22:39:31 2019 THE CANDY MACHINE (full with 5 bars)! Buying candy… OK Buying candy… OK Refilling candy… OK Refilling candy… OK Buying candy… OK Refilling candy… OK Refilling candy… full, skipping Refilling candy… full, skipping Buying candy… OK Buying candy… OK Buying candy… OK Buying candy… OK Buying candy… OK Buying candy… empty, skipping Buying candy… empty, skipping Buying candy… empty, skipping all DONE at: Sun May 26 22:39:45 2019

    队列+Thread: 生产者与消费者问题

    #!/usr/bin/env python from random import randint from time import sleep from queue import Queue from myThread import MyThread def writeQ(queue): print('producing object for Q…') queue.put('xxx', 1) print("size now", queue.qsize()) def readQ(queue): val = queue.get(1) print('consumed object from Q… size now', queue.qsize()) def writer(queue, loops): for i in range(loops): writeQ(queue) sleep(randint(1, 3)) def reader(queue, loops): for i in range(loops): readQ(queue) sleep(randint(2, 5)) funcs = [writer, reader] nfuncs = list(range(len(funcs))) def main(): nloops = randint(2, 5) q = Queue(32) threads =[] for i in nfuncs: t = MyThread(funcs[i], (q, nloops), funcs[i].__name__) threads.append(t) for i in nfuncs: threads[i].start() for i in nfuncs: threads[i].join() if __name__ == '__main__': main()

    输出结果

    staring writer at: Sun May 26 22:41:42 2019 producing object for Q… size now 1 staring reader at: Sun May 26 22:41:42 2019 consumed object from Q… size now 0 producing object for Q… size now 1 producing object for Q… size now 2 consumed object from Q… size now 1 writer finished at: Sun May 26 22:41:45 2019 consumed object from Q… size now 0 reader finished at: Sun May 26 22:41:52 2019

    主要参考资料:

    《Python核心编程》第三版https://blog.csdn.net/zhou8201/article/details/72848452
    最新回复(0)