消息队列 Queue 通信原理:在内存中建立队列模型,进程通过队列存取消息实现通信, 实现方法:
from multiprocessing import Queue q = Queue(maxsize = 0) 功能:创建消息队列 参数:最多存放多少消息 q.put(data,[block,timeout]) 功能:向队列存入消息 参数:data 要存入的数据 block False为非阻塞 timeout 超时时间 q.get([block,timeout]) 功能:从队列中取出消息 参数:block False为非阻塞 timeout 超时时间 q.full() 判断队列是否为满 q.empty() 判断队列是否为空 q.qsize() 获取队列中消息个数 q.close() 关闭队列代码部分
from multiprocessing import Queue,Process from time import sleep # 创建消息队列 q = Queue(3) def fun1(): for i in range(3): sleep(1) q.put((1,3)) def fun2(): for i in range(4): try: a,b = q.get(timeout = 3) except: return print("sum = ",a+b) p1 = Process(target=fun1) p2 = Process(target=fun2) p1.start() p2.start() p1.join() p2.join()共享内存 Value 通信原理:在内存中开辟一块空间,进程可以写入和读取内容,但是每次写入内容都会覆盖之前内容 实现方法::
from multiprocessing import Value,Array obj = Value(ctype,data) 功能:创建共享内存 参数:ctype 共享内存类型 data 共享内存初始数据 obj.value 对该属性修改查看即共享内存读写 obj = Aeeay(ctype,data) 功能:创建共享内存 参数:ctype 共享内存类型 i整数 c单个字符 data 列表表示共享内存初始数据,整数表示共享内存开辟数据元素个数 obj进行遍历或者索引方式获取值,也可通过索引直接赋值。 obj.value 用于整体打印共享内存中字节串 from multiprocessing import Process,Value import time,random # 创建共享内存 money = Value("i",5000) # 操作共享内存 def man(): for i in range(30): time.sleep(0.2) money.value += random.randint(1,1000) def girl(): for i in range(30): time.sleep(0.18) money.value -= random.randint(100,800) m = Process(target=man) g= Process(target=girl) m.start() g.start() m.join() g.join() print("一月余额:",money.value)信号量(信号灯集) 通信原理:给定一个数量对多个进程可见,多个进程都可以操作数量的增减,并根据数量决定行为, 实现方法
from multiprocessing import Semaphoer sem = semaphoer(num) 功能:创建信号量对象 参数:信号量初始值 返回值:信号量对象 sem.acquire() 消耗一个信号量,当信号量为0时阻塞。 sem.get_value() 获取信号值 sem.release() 增加一个信号量注意:当在父进程中创建对象(文件对象,套接字对象,进程间通信对象), 子进程从父进程中拷贝对象时父子进程对该进程的使用会有属性的相互影响,如果在父子进程中各自创建,则无影响。
from multiprocessing import Semaphore,Process from time import sleep import os # 创建信号量 sem = Semaphore(3) # 系统中最多有3个进程同时执行该事件 def fun(): sem.acquire()#消耗一个信号量 print("%d执行事件"%os.getpid()) sleep(3) print("%d执行完毕"%os.getpid()) sem.release()#增加信号量 jobs = [] for i in range(5): p = Process(target=fun) jobs.append(p) p.start() for i in jobs: i.join() print(sem.get_value())线程Event from threading import Event e = Event() #创建线程Event对象 e.wait([timeout]) #阻塞等待直到e被set e.set() #设置e,使wait结束阻塞 e.clear() #清楚e 的设置,wait会阻塞 e.is_set() #判断e 的状态
from threading import Event,Thread from time import sleep s = None e = Event() #创建event对象 def yang(): # sleep(0.2) print("杨子荣拜山头") global s s = "天王盖地虎" e.set() f = Thread(target = yang) f.start() # 验证口令 print("说对口令就是自己人") # sleep(2) e.wait() #添加阻塞 if s == "天王盖地虎": print("确认") else: print("打死他") f.join()