内容梗概: 1.管道 2.数据共享 3.进程池 4.回调函数 1.管道 进程间通信(IPC)方式二:管道(不推荐使用,了解即可),会导致数据不安全的情况出现 实例: import time from multiprocessing import Process,Pipe def func(conn1,conn2): conn1.close() conn2.close() # msg = conn2.recv() msg = conn2.recv() print(">>",msg) if __name__ == '__main__': conn1,conn2 = Pipe() p1 = Process(target=func,args=(conn1,conn2,)) p1.start() conn1.close() conn2.close() conn1.send("嘿嘿") conn1.send("嘿嘿") conn1.close() p1.join() EOFError:接收端两端都关闭的的时候 OSError:发送端关闭后,仍用该发送端发送 2.数据共享 进程间数据是独立的,可以借助于队列或管道实现通信,二者都是基于消息传递的 虽然进程间数据独立,但可以通过Manager实现数据共享,事实上Manager的功能远不止于此 import time from multiprocessing import Process,Manager,Lock def func(dic,lock): with lock: #因为数据共享,必须加锁,不认有可能会争抢数据,导致出错 dic["num"] -= 1 if __name__ == '__main__': m = Manager() lock = Lock() dic = m.dict({"num":100}) lis = [] for i in range(100): p = Process(target=func,args=(dic,lock)) p.start() lis.append(p) for el in lis: el.join() print(dic["num"]) 3.进程池 进程池的概念,定义一个池子,在里面放上固定数量的进程,有需求来了,就拿一个池中的进程来处理任务,等到处理完毕,进程并不关闭,而是将进程再放回进程池中继续等待任务。 如果有很多任务需要执行,池中的进程数量不够,任务就要等待之前的进程执行任务完毕归来,拿到空闲进程才能继续执行 进程池实例:(创建多个进程和进程池运算的区别) import time from multiprocessing import Process,Manager,Lock,Pool def func(i): num = 0 for j in range(5): num += i if __name__ == '__main__': pool = Pool(4) #创建多个进程运算 lis = [] start_time = time.time() for i in range(250): p = Process(target=func,args=(i,)) p.start() lis.append(p) for el in lis: el.join() end_time = time.time() dif_time = abs(start_time - end_time) print(dif_time) #进程池运算 s_time = time.time() pool.map(func,range(250)) #此处map所使用的方法,与内置函数用法一致 e_time = time.time() d_time = abs(s_time - e_time) print(d_time) 结果:13.073268413543701 0.0017964839935302734 运算速度差距较大,因为创建进程是很浪费资源的 注意:1.,map是异步执行的,并且自带close和join 2.一般约定俗成的是进程池中的进程数量为CPU的数量,工作中要看具体情况来考量。 进程池中常用方法 1.同步方法 import time from multiprocessing import Process,Pool def func(i): num = 0 for j in range(5): num += i time.sleep(0.5) return num if __name__ == '__main__': pool = Pool(4) lis = [] for i in range(10): res = pool.apply(func,args=(i,)) # print(res) 2.异步方法 import time from multiprocessing import Process,Pool def func(i): num = 0 for j in range(5): num += i time.sleep(0.5) print('>>>>>', num) # return num if __name__ == '__main__': lis = [] pool = Pool(4) for i in range(10): res = pool.apply_async(func,args=(i,)) ## 异步运行,根据进程池中有的进程数,每次最多3个子进程在异步执行,并且可以执行不同的任务,传送任意的参数了 # lis.append(res) pool.close() # 不是关闭进程池,只是锁定 pool.join() for el in lis: print(el.get()) 需要注意的是,进程池中的三个进程不会同时开启或者同时结束 而是执行完一个就释放一个进程,这个进程就去接收新的任务。 4.回调函数 回调函数的场景:进程池中任何一个任务一旦处理完了,就立即告知主进程:我好了额,你可以处理我的结果了。主进程则调用一个函数去处理该结果,该函数即回调函数,这是进程池特有的,普通进程没有这个机制,但是我们也可以通过进程通信来拿到返回值,进程池的这个回调也是进程通信的机制完成的。 我们可以把耗时间(阻塞)的任务放到进程池中,然后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果 import time,os from multiprocessing import Pool,Process def func(n): print("子进程pid",os.getpid()) return n*2,"约吗?" def call_back_func(x): print("call_back pid",os.getpid()) print(x) #回调函数在写的时候注意一点,回调函数的形参执行有一个,如果你的执行函数有多个返回值,那么也可以被回调函数的这一个形参接收,接收的是一个元祖,包含着你执行函数的所有返回值。 if __name__ == '__main__': pool = Pool(4) pool.apply_async(func,args=(2,),callback=call_back_func) #回调函数只能接受值,不能赋值 print('主进程pid', os.getpid()) pool.close() pool.join() from threading import Thread def func(n): print(">>>") print(n) if __name__ == '__main__': t = Thread(target=func,args=(5,)) t.start() t.join() print("呵呵") import time from threading import Thread class mythread(Thread): def __init__(self,num): super().__init__() self.num = num def run(self): print(">>>heiehi") if __name__ == '__main__': t = mythread(58) t.start() time.sleep(0.5) print("<<>>") import threading,time from threading import Thread,current_thread def func(n): time.sleep(2) print('我是子线程,名字是', current_thread().getName()) print('我是子线程,id是', current_thread().ident) if __name__ == '__main__': t = Thread(target=func,args=(5,)) t.start() print('我是主线程,id是', current_thread().ident) print(threading.enumerate()) print(threading.active_count())