目录
一. 多进程(process)
一. 开启多进程
2. 多个子进程
3.以类的方法开启子进程
4. 进程之间数据隔离
5. 守护进程
二. 锁
三. 信号量(同一时间指定几个可以执行)
四. 事件
五. 队列
六. 生产着消费者 队列
1. 基于queue
2. 基于JoinableQueue
七. 管道, Manager
八. 进程池
1. map 第二个参数要是可迭代的
2. 进程池 异步的apply_async
九. 回调函数
二. 多线程 (threading)
一. 开启多线程
二. 线程之间数据共享
三. join join感知子线程结束,等待子线程结束后继续执行(主等子)
四. 守护线程 子线程等待主线程运行完毕,才会结束被销毁,(子等主)
五. 线程锁
1. 互斥锁(感觉就是普通的锁)
2. 死锁
3. 递归锁 RLock (解决死锁)
六. 条件
七. 信号量Semaphore和BoundedSemaphore
BoundedSemaphore
八. 条件 (和信号量像)
九. 队列和栈
1. 队列(先进先出)
2. 栈(先进后出)
3. import queue 和 from multiprocessing import Queue
十. 进程池和线程池 (依赖于新的内置模块, 新模块的进程池和上面的进程池都可实现)
1. submit提交 可以接受返回值
2. map提交,参数是可迭代的,不能接受返回值
3. 回调函数
三. 协程(greenlet, Gevent)
1. greenlet
2. Gevent(更牛逼)
3. 同步异步
4. 事件
5. 生产者消费者模型
6. 协程的本地变量
四. I/O模型
1. 阻塞I/O模型
2. 同步非阻塞I/O模型
3. 多路复用I/O模型
4. 异步非阻塞I/O模型
五 .双向队列
守护进程会在主进程代码执行结束后就终止, 守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children
#1.对主进程来说,运行完毕指的是主进程代码运行完毕 #2.对主线程来说,运行完毕指的是主线程所在的进程内所有非守护线程统统运行完毕,主线程才算运行完毕 # 守护进程, 主进程结束, 子进程也跟着结束 def func(): while True: time.sleep(0.2) print('我是没死的子进程') if __name__ == "__main__": p = Process(target=func) p.daemon = True # 设置子进程为守护进程, 一定要在start前执行 p.start() i = 0 while i < 5: print("我是主进程") time.sleep(1) i += 1 # 守护进程 会 随着 主进程的代码执行 完毕 而 结束锁保证了数据安全,但是却把多线程逼成了单线程,效率降低了
事件实例
# 事件实例 红绿灯 import random import time from multiprocessing import Event, Process def cars(e, i): if not e.is_set(): # 如果是False print('car%i在等待' % i) e.wait() # 阻塞直到得到一个事件变成True的信号 print('\033[0;32;40m car %i通过\033[0m' % i) def light(e): while True: if e.is_set(): e.clear() print('\033[31m红灯亮了\033[0m') else: e.set() print('\033[32m绿灯亮了\033[0m') time.sleep(2) if __name__ == "__main__": e = Event() traffic = Process(target=light, args=(e, )) traffic.start() for i in range(20): car = Process(target=cars, args=(e, i)) car.start() time.sleep(random.random())为什么要使用生产者和消费者模式
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。
什么是生产者消费者模式
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
进程间数据是独立的,可以借助于队列或管道实现通信,二者都是基于消息传递的
管道
from multiprocessing import Pipe, Process def func(conn1, conn2): conn2.close() # conn2 用来发送,先关闭 while True: try: msg = conn1.recv() print(msg) except EOFError: conn1.close() break if __name__ == "__main__": conn1, conn2 = Pipe() Process(target=func, args=(conn1, conn2)).start() conn1.close() # conn1 用来接收 先关闭 for i in range(20): conn2.send("are you ok") conn2.close()
manager + 锁 实现子进程修改主进程变量, 不加锁数据不安全
from multiprocessing import Manager, Process, Lock def main(dic, lock): lock.acquire() dic['count'] -= 1 lock.release() if __name__ == "__main__": m = Manager() l = Lock() dic = m.dict({'count': 100}) p_list = [] for i in range(50): p = Process(target=main, args=(dic, l)) p.start() p_list.append(p) for i in p_list: i.join() print('主进程', dic)
定义一个池子,在里面放上固定数量的进程,有需求来了,就拿一个池中的进程来处理任务,等到处理完毕,进程并不关闭,而是将进程再放回进程池中继续等待任务。如果有很多任务需要执行,池中的进程数量不够,任务就要等待之前的进程执行任务完毕归来,拿到空闲进程才能继续执行。也就是说,池中进程的数量是固定的,那么同一时间最多有固定数量的进程在运行。这样不会增加操作系统的调度难度,还节省了开闭进程的时间,也一定程度上能够实现并发效果。
map: 异步 自带close和join, 返回结果[]
apply: 同步的,只有当func执行之后,才会继续向下执行其他代码, 返回值是func的return
apply_async: 异步的, 但func被注册进入一个进程之后,程序继续向下执行,返回值,为对象, obj.get()。 要先close()再join()来保持子进程和主进程代码的同步性
1 通过函数
from threading import Thread import time # 开启10个线程 def func(n): time.sleep(1) print(n) for i in range(10): t = Thread(target=func, args=(i,)) t.start()2. 通过类
# init 和 run 是固定不变的 class MyTread(Thread): def __init__(self, arg): super().__init__() # 继承父类的init self.arg = arg # 写init 就可以给run 传参数 def run(self): time.sleep(1) print(self.arg) for i in range(10): t = MyTread(10) t.start()即 子线程会随着主线程结束而结束
1.对主进程来说,运行完毕指的是主进程代码运行完毕 2.对主线程来说,运行完毕指的是主线程所在的进程内所有非守护线程统统运行完毕,主线程才算运行完毕
3 主进程在其代码结束后就已经算运行完毕了(守护进程在此时就被回收),然后主进程会一直等非守护的子进程都运行完毕后回收子进程的资源(否则会产生僵尸进程),才会结束, 4 主线程在其他非守护线程运行完毕后才算运行完毕(守护线程在此时就被回收)。因为主线程的结束意味着进程的结束,进程整体的资源都将被回收,而进程必须保证非守护线程都运行完毕后才能结束。
import time from threading import Thread def func(): while True: print('*'*10) time.sleep(1) def func1(): print('func1') time.sleep(5) if __name__ == "__main__": t = Thread(target=func, ) t.daemon = True # 设置守护线程,子线程会随着主线程结束而结束 t.start() t2 = Thread(target=func1) t2.start() print('主线程') # t2是主线程的一部分,睡了5秒, 这5秒守护线程,也一直执行, 但t2的funct1执行完,主线程结束,t线程结束 # 即线程在其他非守护线程运行完毕后才算运行完毕 """ ********** func1 主线程 ********** ********** ********** ********** """开启十个子线程, 每个子线程对全局变量-1
# 加锁 速度是真TM慢 Lock 互斥锁 import time from threading import Lock, Thread def func(lock): global n lock.acquire() temp = n time.sleep(0.2) n = temp - 1 lock.release() # tem 模拟寄存器操作 if __name__ == "__main__": n = 10 t_lst = [] lock = Lock() for i in range(10): t = Thread(target=func, args=(lock,)) t.start() t_lst.append(t) for t in t_lst: t.join() # 感知每个子线程结束,在打印n print(n) # 0一直干耗着,谁都不肯先解锁
这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁:
from threading import RLock mutexA = RLock() mutexA.acquire() mutexA.acquire() print(123) mutexA.release() mutexA.release() # 123递归锁的例子
from threading import RLock, Thread fork_lock = noodle_lock = RLock() def eat1(name): noodle_lock.acquire() print('%s拿到面条' % name) fork_lock.acquire() print('%s拿到叉子' % name) print('%s吃面' % name) fork_lock.release() noodle_lock.release() Thread(target=eat1, args=('aki', )).start() Thread(target=eat1, args=('えなこ',)).start() """ aki拿到面条 aki拿到叉子 aki吃面 えなこ拿到面条 えなこ拿到叉子 えなこ吃面 ""同进程的条件
指定同一时间有几个线程执行代码
Semaphore
# 信号量 # 被 acquire和release 保护起来的代码, 同一时间只能由指定数量的线程执行 import time from threading import Semaphore, Thread def func(sem, a, b): sem.acquire() time.sleep(1) print(a+b) sem.release() sem = Semaphore(4) for i in range(10): t = Thread(target=func, args=(sem, i, i+5)) t.start()它有计数功能, 这个计数器的值永远不会超过它的初始值
import random import time from atexit import register from threading import BoundedSemaphore, Lock, Thread lock = Lock() MAX = 5 candytray = BoundedSemaphore(MAX) # 向机器里加糖果 def refill(): lock.acquire() # 加锁 print('Refilling candy ...') try: candytray.release() # 让信号量计数器+1,加糖 如果超过5个, 则会触发异常 except ValueError: print('full, skipping') else: print('ojbk') lock.release() # 释放锁 # 买糖 def buy(): lock.acquire() print('Buying candy') if candytray.acquire(False): # 有False 说明非阻塞,让信号量—1,if 语句看看能不能拿到糖果 print('ok') else: print('empty skipping') lock.release() def producer(loops): for i in range(loops): refill() time.sleep(random.randrange(3)) def consumer(loops): for i in range(loops): buy() time.sleep(random.randrange(3)) def _main(): print('starting at:', time.ctime()) nloops = random.randrange(2, 6) print('THE CANDY MACHINE (full with %d bars)!' % MAX) Thread(target=consumer, args=(random.randrange(nloops, nloops + MAX + 2),)).start() # 里面的数学操作,是的真正消费的糖果数可能会比生产者放入得多,否则不会出现向空机器要糖的结果 Thread(target=producer, args=(nloops,)).start() @register def _atexit(): print('all DONE at:', time.ctime()) # 让解释器在退出脚本前能执行它 if __name__ == "__main__": _main()
上面的代码一直input,要求输入个int。相当于让notify造了指定的几把钥匙,也就是指定了先运行几个线程,但线程总数超过10会报错。
普通队列(Queue)
# 普通队列 import queue q = queue.Queue() q.put(1) q.get() q.get() # 只put1个,再get会阻塞 # q.get_nowait() # get不到就报错, 如果不写它,get不到就堵塞 # q.put_nowait() # put 类似上面优先队列(PriorityQueue)
# 优先队列 # 优先队列 第一个参数是优先级, 越小优先级越高,越先get到 import queue q = queue.PriorityQueue() q.put((20, 'a')) q.put((1, 'e')) q.put((1, 'c')) # 优先级 相同 按asci码,打印在前面的c q.put((45, 'r')) print(q.get())用法相同但是不是一个玩意儿
Queue.Queue是进程内非阻塞队列,multiprocess.Queue是跨进程通信队列。
1.from queue import Queue 这个是普通的队列模式,类似于普通列表,先进先出模式,get方法会阻塞请求,直到有数据get出来为止
2.from multiprocessing.Queue import Queue(各子进程共有) 这个是多进程并发的Queue队列,用于解决多进程间的通信问题。普通Queue实现不了。例如来跑多进程对一批IP列表进行运算,运算后的结果都存到Queue队列里面,这个就必须使用multiprocessing提供的Queue来实现
注意: 这个模块当需要进程池时只要把ThreadPoolExecutor改为ProcessPoolExecutor就好了
1 介绍 concurrent.futures模块提供了高度封装的异步调用接口 ThreadPoolExecutor:线程池,提供异步调用 ProcessPoolExecutor: 进程池,提供异步调用
2 基本方法 #submit(fn, *args, **kwargs) 异步提交任务 #map(func, *iterables, timeout=None, chunksize=1) 取代for循环submit的操作
#shutdown(wait=True) 相当于进程池的pool.close()+pool.join()操作 wait=True,等待池内所有任务执行完毕回收完资源后才继续 wait=False,立即返回,并不会等待池内的任务执行完毕 但不管wait参数为何值,整个程序都会等到所有任务执行完毕 submit和map必须在shutdown之前 #result(timeout=None) 取得结果 #add_done_callback(fn) 回调函数 # done() 判断某一个线程是否完成 # cancle() 取消某个任务
协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的。 协程的本质就是在单线程下,由用户自己控制一个任务遇到io阻塞了就切换另外一个任务去执行,以此来提升效率。
优点如下:
1. 协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级
2. 单线程内就可以实现并发的效果,最大限度地利用cpu
缺点如下:
1. 协程的本质是单线程下,无法利用多核,可以是一个程序开启多个进程,每个进程内开启多个线程,每个线程内开启协程
2. 协程指的是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程
总结协程特点:
必须在只有一个单线程里实现并发修改共享数据不需加锁用户程序里自己保存多个控制流的上下文栈自己的理解:
协程就是线程里的一个或多个函数,只能一个接着一个运行,不能同时执行,相对于进程和线程是操作系统控制的而言,它是用户自己控制的。而且能保存状态,切换回来接着上次切换走的状态接着来。
单纯的切换(在没有io的情况下或者没有重复开辟内存空间的操作),反而会降低程序的执行速度,
当切到一个任务执行时如果遇到io,那就原地阻塞,仍然是没有解决遇到IO自动切换来提升效率的问题。
我们完全可以在执行任务1时遇到阻塞,就利用阻塞的时间去执行任务2。。。。如此,才能提高效率,这就用到了Gevent模块。
from greenlet import greenlet def eat(): print('eat start') g2.switch() # 切换到g2的paly() print('eat end') g2.switch() def play(): print('play start') g1.switch() # 切换到g1的eat() print('play end') g1 = greenlet(eat) g2 = greenlet(play) g1.switch() # 切换到eat, 执行eat # eat start # play start # eat end # play end注意:
这里的gevent.sleep()相当于time.sleep()。 因为直接用time.sleep()不识别。
所以要from gevent import monkey;monkey.patch_all()必须放到被打补丁者的前面,如time,socket模块之前
必须要在前面。那个分号就是分行的意思,也可以协程两行。
event
import gevent from gevent.event import Event evt = Event() def setter(): print('Wait for me') gevent.sleep(3) # 3秒后唤醒所有在evt上等待的协程 print("Ok, I'm done") evt.set() # 唤醒 def waiter(): print("I'll wait for you") evt.wait() # 等待 print('Finish waiting') gevent.joinall([ gevent.spawn(setter), gevent.spawn(waiter), gevent.spawn(waiter), gevent.spawn(waiter), ]) """ Wait for me I'll wait for you I'll wait for you I'll wait for you Ok, I'm done Finish waiting Finish waiting Finish waiting """ AsyncResult 它可以在唤醒时传递消息 from gevent.event import AsyncResult import gevent aevt = AsyncResult() def setter(): print('Wait for me') gevent.sleep(3) # 3秒后唤醒所有在evt上等待的协程 print("Ok, I'm done") aevt.set('Hello!') # 唤醒,并传递消息 def waiter(): print("I'll wait for you") message = aevt.get() # 等待,并在唤醒时获取消息 print('Got wake up message: %s' % message) # 必须叫message gevent.joinall([ gevent.spawn(setter), gevent.spawn(waiter), gevent.spawn(waiter), gevent.spawn(waiter), ]) """ Wait for me I'll wait for you I'll wait for you I'll wait for you Ok, I'm done Got wake up message: Hello! Got wake up message: Hello! Got wake up message: Hello! """同线程类似,协程也有本地变量,也就是只在当前协程内可被访问的变量:
通过将变量存放在local对象中,即可将其的作用域限制在当前协程内,当其他协程要访问该变量时,就会抛出异常。不同协程间可以有重名的本地变量,而且互相不影响。因为协程本地变量的实现,就是将其存放在以的”greenlet.getcurrent()”的返回为键值的私有的命名空间内。
import gevent from gevent.local import local data = local() def f1(): data.x = 1 print(data.x) def f2(): try: print(data.x) except AttributeError: print('x is not visible') gevent.joinall([ gevent.spawn(f1), gevent.spawn(f2) ]) # 1 # x is not visible
当一个read操作发生时,该操作会经历两个阶段:
1)等待数据准备 (Waiting for the data to be ready) 2)将数据从内核拷贝到进程中(Copying the data from the kernel to the process)
同步阻塞I/O模型同步非阻塞I/O模型I/O多路模型异步I/O模型信号驱动I/O模型 (没啥用,不说)同步和异步的概念描述的是用户线程与内核的交互方式:同步是指用户线程发起IO请求后需要等待或者轮询内核IO操作完成后才能继续执行;而异步是指用户线程发起IO请求后仍继续执行,当内核IO操作完成后会通知用户线程,或者调用用户线程注册的回调函数。
阻塞和非阻塞的概念描述的是用户线程调用内核IO操作的方式:阻塞是指IO操作需要彻底完成后才返回到用户空间;而非阻塞是指IO操作被调用后立即返回给用户一个状态值,无需等到IO操作彻底完成。
例子:排队买票,排三天买到一张票,在火车站一直等三天,什么也没干。
如图所示,用户线程通过系统调用read发起IO读操作,由用户空间转到内核空间。内核等到数据包到达后,然后将接收的数据拷贝到用户空间,完成read操作。在等待数据和数据拷贝用户线程都被阻塞,一直等。
即用户需要等待read将socket中的数据读取到buffer后,才继续处理接收的数据。整个IO请求的过程中,用户线程是被阻塞的,这导致用户在发起IO请求时,不能做任何事情,对CPU的资源利用率不够。
例子:每隔12小时去火车站问有没有票,三天后买到一张票,期间可以干别的。
同步非阻塞IO是在同步阻塞IO的基础上,将socket设置为NONBLOCK。这样做用户线程可以在发起IO请求后可以立即返回。
如图所示,由于socket是非阻塞的方式,因此用户线程发起IO请求时立即返回。但并未读取到任何数据,用户线程需要不断地发起IO请求,直到数据到达后,才真正读取到数据,继续执行。
也就是说非阻塞的recvform系统调用调用之后,进程并没有被阻塞,内核马上返回给进程,如果数据还没准备好,此时会返回一个error。进程在返回之后,可以干点别的事情,然后再发起recvform系统调用。重复上面的过程,循环往复的进行recvform系统调用。这个过程通常被称之为轮询。轮询检查内核数据,直到数据准备好,再拷贝数据到进程,进行数据处理。需要注意,拷贝数据整个过程,进程仍然是属于阻塞的状态。
所以,在非阻塞式IO中,用户进程其实是需要不断的主动询问kernel数据准备好了没有。
缺点:1. 循环调用recv()将大幅度推高CPU占用率;
2. 任务完成的响应延迟增大了,因为每过一段时间才去轮询一次read操作,而任务可能在两次轮询之间的任意时间完成。这会导致整体数据吞吐量的降低。
例子:1 select/poll 去火车站买票,委托黄牛,然后每隔6小时打电话问黄牛,黄牛三天内买到票,然后去找黄牛要票
2 epoll 委托黄牛, 黄牛拿到票,主动通知你去取票
IO多路复用模型是建立在内核提供的多路分离函数select基础之上的,使用select函数可以避免同步非阻塞IO模型中轮询等待的问题。
如图所示,用户首先将需要进行IO操作的socket添加到select中,然后阻塞等待select系统调用返回。当数据到达时,socket被激活,select函数返回。用户线程正式发起read请求,读取数据并继续执行。
从流程上来看,使用select函数进行IO请求和同步阻塞模型没有太大的区别,甚至还多了添加监视socket,以及调用select函数的额外操作,效率更差。但是,使用select以后最大的优势是用户可以在一个线程内同时处理多个socket的IO请求。用户可以注册多个socket,然后不断地调用select读取被激活的socket,即可达到在同一个线程内同时处理多个IO请求的目的。而在同步阻塞模型中,必须通过多线程的方式才能达到这个目的。
# io多路复用 依赖windows, python 提供的select模块只是使用操作系统的功能 import select import socket sk = socket.socket() sk.bind(('127.0.0.1', 9999)) sk.setblocking(False) # 设置成非阻塞, 默认阻塞 sk.listen() read_list = [sk] # 添加conn , 监听两个 while True: r_lst, w_lst, x_lst = select.select(read_list, [], []) # 必须要三个列表参数, for i in r_lst: if i is sk: conn, addr = i.accept() read_list.append(conn) else: ret = i.recv(1024) if ret == b'': i.close() read_list.remove(i) continue print(ret) i.send(b'bai')IO多路复用是最常使用的IO模型,但是其异步程度还不够“彻底”,因为它使用了会阻塞线程的select系统调用。因此IO多路复用只能称为异步阻塞IO,而非真正的异步IO。
例子:去火车站给售票员留下电话,有票的话会通知你,并送货上门。
如图所示,异步IO模型中,用户线程直接使用内核提供的异步IO API发起read请求,且发起后立即返回,继续执行用户线程代码。不过此时用户线程已经将调用的AsynchronousOperation和CompletionHandler注册到内核,然后操作系统开启独立的内核线程去处理IO操作。当read请求的数据到达时,由内核负责读取socket中的数据,并写入用户指定的缓冲区中。最后内核将read的数据和用户线程注册的CompletionHandler分发给内部Proactor,Proactor将IO完成的信息通知给用户线程(一般通过调用用户线程注册的完成事件处理函数),完成异步IO。
# # 异步非阻塞 import socket sk = socket.socket() sk.bind(('127.0.0.1', 8765)) sk.setblocking(False) # 把socket当中所有阻塞方法都改变成非阻塞 sk.listen() conn_l = [] del_conn = [] while True: try: conn, addr = sk.accept() # 不会阻塞, 但是没人连会报 BlockingIOError的错 print('建立连接', addr) conn_l.append(conn) except BlockingIOError: for con in conn_l: try: msg = con.recv(1024) if msg == b'': del_conn.append(con) continue print(msg) con.send(b'see you') except BlockingIOError: pass for con in del_conn: con.close() conn_l.remove(con) del_conn.clear()把socket当中所有阻塞方法都改变成非阻塞。当没连接时,整段代码实际都不运行(conn, addr = sk.accept(), 没人连接,直接跳到except BlockingIOError, 又因为conn_l = [],del_conn = []为空,for 里面都不执行)。当有了一个连接,conn_l.append(conn),conn_l里面有值。当建立连接后,会运行外层的except BlockingIOError(因为只能有一次连接),执行for con in conn_l。后面不解释了。