顾名思义,进程即正在执行的一个过程。进程是对正在运行程序的一个抽象。
进程的概念起源于操作系统,是操作系统最核心的概念,也是操作系统提供的最古老也是最重要的抽象概念之一。操作系统的其他所有内容都是围绕进程的概念展开的。
必备的理论基础:
进程:正在进行的一个过程或者说一个任务。而负责执行任务则是cpu。
举例(单核+多道,实现多个进程的并发执行):
egon备一会课,再去跟李杰的女朋友聊聊天,再去打一会王者荣耀....这就保证了每个任务都在进行中.
程序仅仅只是一堆代码而已,而进程指的是程序的运行过程。
举例:
想象一位有一手好厨艺的计算机科学家egon正在为他的女儿元昊烘制生日蛋糕。
他有做生日蛋糕的食谱,
厨房里有所需的原料:面粉、鸡蛋、韭菜,蒜泥等。
在这个比喻中:
做蛋糕的食谱就是程序(即用适当形式描述的算法)
计算机科学家就是处理器(cpu)
而做蛋糕的各种原料就是输入数据。
进程就是厨师阅读食谱、取来各种原料以及烘制蛋糕等一系列动作的总和。
科学家egon想了想,处理儿子alex蛰伤的任务比给女儿元昊做蛋糕的任务更重要,于是
计算机科学家就记录下他照着食谱做到哪儿了(保存进程的当前状态),然后拿出一本急救手册,按照其中的指示处理蛰伤。这里,我们看到处理机从一个进程(做蛋糕)切换到另一个高优先级的进程(实施医疗救治),每个进程拥有各自的程序(食谱和急救手册)。当蜜蜂蛰伤处理完之后,这位计算机科学家又回来做蛋糕,从他离开时的那一步继续做下去。
需要强调的是:同一个程序执行两次,那也是两个进程,比如打开暴风影音,虽然都是同一个软件,但是一个可以播放苍井空,一个可以播放饭岛爱。
无论是并行还是并发,在用户看来都是'同时'运行的,不管是进程还是线程,都只是一个任务而已,真是干活的是cpu,cpu来做这些任务,而一个cpu同一时刻只能执行一个任务
一并发:是伪并行,即看起来是同时运行。单个cpu+多道技术就可以实现并发,(并行也属于并发)
单cpu,多进程,并发举例一
你是一个cpu,你同时谈了三个女朋友,每一个都可以是一个恋爱任务,你被这三个任务共享要玩出并发恋爱的效果,应该是你先跟女友1去看电影,看了一会说:不好,我要拉肚子,然后跑去跟第二个女友吃饭,吃了一会说:那啥,我去趟洗手间,然后跑去跟女友3开了个房单cpu,多进程,并发举例二
某天下午,egon,yuanhao,wupeiqi,alex约好了一起去嫖娼,但娼只有一个,cpu只有一个,但是却要‘同时’干四个任务(嫖出并发的效果),那就必须是干一会egon,再干一会yuanhao,再干一会wupeiqi,再干一会alexegon:花了200块钱,因为人美活好yuanhao:500块钱wupeiqi:100块钱,可能是不太行alex:没要钱,为啥???因为大家刚刚嫖的是他女朋友二并行:同时运行,只有具备多个cpu才能实现并行
单核下,可以利用多道技术,多个核,每个核也都可以利用多道技术(多道技术是针对单核而言的)
而一旦任务1的I/O结束了,操作系统会重新调用它(需知进程的调度、分配给哪个cpu运行,由操作系统说了算),可能被分配给四个cpu中的任意一个去执行
启动一个进程来杀毒(360软件)
启动一个进程来看电影(暴风影音)
所有的这些进程都需被管理,于是一个支持多进程的多道程序系统是至关重要的
多道技术概念回顾:内存中同时存入多道(多个)程序,cpu从一个进程快速切换到另外一个,使每个进程各自运行几十或几百毫秒,这样,虽然在某一个瞬间,一个cpu只能执行一个任务,但在1秒内,cpu却可以运行多个进程,这就给人产生了并行的错觉,即伪并发,以此来区分多处理器操作系统的真正硬件并行(多个cpu共享同一个物理内存)
同步:
#阻塞调用是指调用结果返回之前,当前线程会被挂起(如遇到io操作)。函数只有在得到结果之后才会将阻塞的线程激活。有人也许会把阻塞调用和同步调用等同起来,实际上他是不同的。对于同步调用来说,很多时候当前线程还是激活的,只是从逻辑上当前函数没有返回而已。#举例:#1.同步调用:apply一个累计1亿次的任务,该调用会一直等待,直到任务返回结果为止,但并未阻塞住(即便是被抢走cpu的执行权限,那也是处于就绪态);#2.阻塞调用:当socket工作在阻塞模式的时候,如果没有数据的情况下调用recv函数,则当前线程就会被挂起,直到有数据为止。非阻塞:
#非阻塞和阻塞的概念相对应,指在不能立刻得到结果之前也会立刻返回,同时该函数不会阻塞当前线程。小结:
#1.同步与异步针对的是函数/任务的调用方式:同步就是当一个进程发起一个函数(任务)调用的时候,一直等到函数(任务)完成,而进程继续处于激活状态。而异步情况下是当一个进程发起一个函数(任务)调用的时候,不会等函数返回,而是继续往下执行当,函数返回的时候通过状态、通知、事件等方式通知进程任务完成。#2.阻塞与非阻塞针对的是进程或线程:阻塞是当请求不能满足的时候就将进程挂起,而非阻塞则不会阻塞当前进程五进程的创建(了解)但凡是硬件,都需要有操作系统去管理,只要有操作系统,就有进程的概念,就需要有创建进程的方式,一些操作系统只为一个应用程序设计,比如微波炉中的控制器,一旦启动微波炉,所有的进程都已经存在。
而对于通用系统(跑很多应用程序),需要有系统运行过程中创建或撤销进程的能力,主要分为4中形式创建新的进程
1.系统初始化(查看进程linux中用ps命令,windows中用任务管理器,前台进程负责与用户交互,后台运行的进程与用户无关,运行在后台并且只在需要时才唤醒的进程,称为守护进程,如电子邮件、web页面、新闻、打印)
2.一个进程在运行过程中开启了子进程(如nginx开启多进程,os.fork,subprocess.Popen等)
3.用户的交互式请求,而创建一个新进程(如用户双击暴风影音)
4.一个批处理作业的初始化(只在大型机的批处理系统中应用)
无论哪一种,新进程的创建都是由一个已经存在的进程执行了一个用于创建进程的系统调用而创建的:
1.在UNIX中该系统调用是:fork,fork会创建一个与父进程一模一样的副本,二者有相同的存储映像、同样的环境字符串和同样的打开文件(在shell解释器进程中,执行一个命令就会创建一个子进程)
2.在windows中该系统调用是:CreateProcess,CreateProcess既处理进程的创建,也负责把正确的程序装入新进程。
关于创建的子进程,UNIX和windows
1.相同的是:进程创建后,父进程和子进程有各自不同的地址空间(多道技术要求物理层面实现进程之间内存的隔离),任何一个进程的在其地址空间中的修改都不会影响到另外一个进程。
2.不同的是:在UNIX中,子进程的初始地址空间是父进程的一个副本,提示:子进程和父进程是可以有只读的共享内存区的。但是对于windows系统来说,从一开始父进程与子进程的地址空间就是不同的。
1.正常退出(自愿,如用户点击交互式页面的叉号,或程序执行完毕调用发起系统调用正常退出,在linux中用exit,在windows中用ExitProcess)
2.出错退出(自愿,pythona.py中a.py不存在)
3.严重错误(非自愿,执行非法指令,如引用不存在的内存,1/0等,可以捕捉异常,try...except...)
4.被其他进程杀死(非自愿,如kill-9)
无论UNIX还是windows,进程只有一个父进程,不同的是:
2.在windows中,没有进程层次的概念,所有的进程都是地位相同的,唯一类似于进程层次的暗示,是在创建进程时,父进程得到一个特别的令牌(称为句柄),该句柄可以用来控制子进程,但是父进程有权把该句柄传给其他子进程,这样就没有层次了。
tail-faccess.log|grep'404'
执行程序tail,开启一个子进程,执行程序grep,开启另外一个子进程,两个进程之间基于管道'|'通讯,将tail的结果作为grep的输入。
进程grep在等待输入(即I/O)时的状态称为阻塞,此时grep命令都无法运行
其实在两种情况下会导致一个进程在逻辑上不能运行,
1.进程挂起是自身原因,遇到I/O阻塞,便要让出CPU让其他进程去执行,这样保证CPU一直在工作
因而一个进程由三种状态
进程并发的实现在于,硬件中断一个正在运行的进程,把此时进程运行的所有状态保存下来,为此,操作系统维护一张表格,即进程表(processtable),每个进程占用一个进程表项(这些表项也称为进程控制块)
该表存放了进程状态的重要信息:程序计数器、堆栈指针、内存分配状况、所有打开文件的状态、帐号和调度信息,以及其他在进程由运行态转为就绪态或阻塞态时,必须保存的信息,从而保证该进程在再次启动时,就像从未被中断过一样。
python并发编程之多进程
python中的多线程无法利用多核优势,如果想要充分地使用多核CPU的资源(os.cpu_count()查看),在python中大部分情况需要使用多进程。Python提供了multiprocessing。multiprocessing模块用来开启子进程,并在子进程中执行我们定制的任务(比如函数),该模块与多线程模块threading的编程接口类似。
multiprocessing模块的功能众多:支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。
需要再次强调的一点是:与线程不同,进程没有任何共享状态,进程修改的数据,改动仅限于该进程内。
创建进程的类:
Process([group[,target[,name[,args[,kwargs]]]]]),由该类实例化得到的对象,表示一个子进程中的任务(尚未启动)强调:1.需要使用关键字的方式来指定参数2.args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号参数介绍:
1group参数未使用,值始终为None23target表示调用对象,即子进程要执行的任务45args表示调用对象的位置参数元组,args=(1,2,'egon',)67kwargs表示调用对象的字典,kwargs={'name':'egon','age':18}89name为子进程的名称方法介绍:
1p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置23p.name:进程的名称45p.pid:进程的pid67p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可)89p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功(了解即可)三Process类的使用注意:在windows中Process()必须放到#if__name__=='__main__':下
详细解释:
SinceWindowshasnofork,themultiprocessingmodulestartsanewPythonprocessandimportsthecallingmodule.IfProcess()getscalleduponimport,thenthissetsoffaninfinitesuccessionofnewprocesses(oruntilyourmachinerunsoutofresources).ThisisthereasonforhidingcallstoProcess()insideif__name__=="__main__"sincestatementsinsidethisif-statementwillnotgetcalleduponimport.由于Windows没有fork,多处理模块启动一个新的Python进程并导入调用模块。如果在导入时调用Process(),那么这将启动无限继承的新进程(或直到机器耗尽资源)。这是隐藏对Process()内部调用的原,使用if__name__==“__main__”,这个if语句中的语句将不会在导入时被调用。创建并开启子进程的两种方式
方法一:
#开进程的方法一:importtimeimportrandomfrommultiprocessingimportProcessdefpiao(name):print('%spiaoing'%name)time.sleep(random.randrange(1,5))print('%spiaoend'%name)p1=Process(target=piao,args=('egon',))#必须加,号p2=Process(target=piao,args=('alex',))p3=Process(target=piao,args=('wupeqi',))p4=Process(target=piao,args=('yuanhao',))p1.start()p2.start()p3.start()p4.start()print('主线程')方法二:
#开进程的方法二:importtimeimportrandomfrommultiprocessingimportProcessclassPiao(Process):def__init__(self,name):super().__init__()self.name=namedefrun(self):print('%spiaoing'%self.name)time.sleep(random.randrange(1,5))print('%spiaoend'%self.name)p1=Piao('egon')p2=Piao('alex')p3=Piao('wupeiqi')p4=Piao('yuanhao')p1.start()#start会自动调用runp2.start()p3.start()p4.start()print('主线程')进程直接的内存空间是隔离的
frommultiprocessingimportProcessn=100#在windows系统中应该把全局变量定义在if__name__=='__main__'之上就可以了defwork():globalnn=0print('子进程内:',n)if__name__=='__main__':p=Process(target=work)p.start()print('主进程内:',n)练习1:把上周所学的socket通信变成并发的形式
server端:
fromsocketimport*frommultiprocessingimportProcessserver=socket(AF_INET,SOCK_STREAM)server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)server.bind(('127.0.0.1',8080))server.listen(5)deftalk(conn,client_addr):whileTrue:try:msg=conn.recv(1024)ifnotmsg:breakconn.send(msg.upper())exceptException:breakif__name__=='__main__':#windows下start进程一定要写到这下面whileTrue:conn,client_addr=server.accept()p=Process(target=talk,args=(conn,client_addr))p.start()多client客户端:
fromsocketimport*client=socket(AF_INET,SOCK_STREAM)client.connect(('127.0.0.1',8080))whileTrue:msg=input('>>:').strip()ifnotmsg:continueclient.send(msg.encode('utf-8'))msg=client.recv(1024)print(msg.decode('utf-8'))这么实现有没有问题???:
每来一个客户端,都在服务端开启一个进程,如果并发来一个万个客户端,要开启一万个进程吗,你自己尝试着在你自己的机器上开启一万个,10万个进程试一试。解决方法:进程池Process对象的join方法
join:主进程等,等待子进程结束
frommultiprocessingimportProcessimporttimeimportrandomclassPiao(Process):def__init__(self,name):self.name=namesuper().__init__()defrun(self):print('%sispiaoing'%self.name)time.sleep(random.randrange(1,3))print('%sispiaoend'%self.name)p=Piao('egon')p.start()p.join(0.0001)#等待p停止,等0.0001秒就不再等了print('开始')有了join,程序不就是串行了吗???
terminate与is_alive
#进程对象的其他方法一:terminate,is_alivefrommultiprocessingimportProcessimporttimeimportrandomclassPiao(Process):def__init__(self,name):self.name=namesuper().__init__()defrun(self):print('%sispiaoing'%self.name)time.sleep(random.randrange(1,5))print('%sispiaoend'%self.name)p1=Piao('egon1')p1.start()p1.terminate()#关闭进程,不会立即关闭,所以is_alive立刻查看的结果可能还是存活print(p1.is_alive())#结果为Trueprint('开始')print(p1.is_alive())#结果为Falsename与pid
frommultiprocessingimportProcessimporttimeimportrandomclassPiao(Process):def__init__(self,name):#self.name=name#super().__init__()#Process的__init__方法会执行self.name=Piao-1,##所以加到这里,会覆盖我们的self.name=name#为我们开启的进程设置名字的做法super().__init__()self.name=namedefrun(self):print('%sispiaoing'%self.name)time.sleep(random.randrange(1,3))print('%sispiaoend'%self.name)p=Piao('egon')p.start()print('开始')print(p.pid)#查看pid僵尸进程与孤儿进程(了解)
其一:守护进程会在主进程代码执行结束后就终止
其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError:daemonicprocessesarenotallowedtohavechildren
注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止
ViewCode
frommultiprocessingimportProcessimporttimeimportrandomclassPiao(Process):def__init__(self,name):self.name=namesuper().__init__()defrun(self):print('%sispiaoing'%self.name)time.sleep(random.randrange(1,3))print('%sispiaoend'%self.name)p=Piao('egon')p.daemon=True#一定要在p.start()前设置,设置p为守护进程,禁止p创建子进程,并且父进程代码执行结束,p即终止运行p.start()print('主')迷惑人的例子
#主进程代码运行完毕,守护进程就会结束frommultiprocessingimportProcessfromthreadingimportThreadimporttimedeffoo():print(123)time.sleep(1)print("end123")defbar():print(456)time.sleep(3)print("end456")p1=Process(target=foo)p2=Process(target=bar)p1.daemon=Truep1.start()p2.start()print("main-------")#打印该行则主进程代码结束,则守护进程p1应该被终止,可能会有p1任务执行的打印信息123,因为主进程打印main----时,p1也执行了,但是随即被终止五进程同步(锁)进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的,
而共享带来的是竞争,竞争带来的结果就是错乱,如何控制,就是加锁处理
part1:多个进程共享同一打印终端
并发运行,效率高,但竞争同一打印终端,带来了打印错乱
#并发运行,效率高,但竞争同一打印终端,带来了打印错乱frommultiprocessingimportProcessimportos,timedefwork():print('%sisrunning'%os.getpid())time.sleep(2)print('%sisdone'%os.getpid())if__name__=='__main__':foriinrange(3):p=Process(target=work)p.start()加锁:由并发变成了串行,牺牲了运行效率,但避免了竞争
#由并发变成了串行,牺牲了运行效率,但避免了竞争frommultiprocessingimportProcess,Lockimportos,timedefwork(lock):lock.acquire()print('%sisrunning'%os.getpid())time.sleep(2)print('%sisdone'%os.getpid())lock.release()if__name__=='__main__':lock=Lock()foriinrange(3):p=Process(target=work,args=(lock,))p.start()part2:多个进程共享同一文件
文件当数据库,模拟抢票
并发运行,效率高,但竞争写同一文件,数据写入错乱
#文件db的内容为:{"count":1}#注意一定要用双引号,不然json无法识别frommultiprocessingimportProcess,Lockimporttime,json,randomdefsearch():dic=json.load(open('db.txt'))print('\033[43m剩余票数%s\033[0m'%dic['count'])defget():dic=json.load(open('db.txt'))time.sleep(0.1)#模拟读数据的网络延迟ifdic['count']>0:dic['count']-=1time.sleep(0.2)#模拟写数据的网络延迟json.dump(dic,open('db.txt','w'))print('\033[43m购票成功\033[0m')deftask(lock):search()get()if__name__=='__main__':lock=Lock()foriinrange(100):#模拟并发100个客户端抢票p=Process(target=task,args=(lock,))p.start()加锁:购票行为由并发变成了串行,牺牲了运行效率,但保证了数据安全
#文件db的内容为:{"count":1}#注意一定要用双引号,不然json无法识别frommultiprocessingimportProcess,Lockimporttime,json,randomdefsearch():dic=json.load(open('db.txt'))print('\033[43m剩余票数%s\033[0m'%dic['count'])defget():dic=json.load(open('db.txt'))time.sleep(0.1)#模拟读数据的网络延迟ifdic['count']>0:dic['count']-=1time.sleep(0.2)#模拟写数据的网络延迟json.dump(dic,open('db.txt','w'))print('\033[43m购票成功\033[0m')deftask(lock):search()lock.acquire()get()lock.release()if__name__=='__main__':lock=Lock()foriinrange(100):#模拟并发100个客户端抢票p=Process(target=task,args=(lock,))p.start()总结:
创建队列的类(底层就是以管道和锁定的方式实现):
1Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。参数介绍:
1maxsize是队列中允许最大项数,省略则无大小限制。方法介绍:
1q.cancel_join_thread():不会在进程退出时自动连接后台线程。可以防止join_thread()方法阻塞2q.close():关闭队列,防止队列中加入更多数据。调用此方法,后台线程将继续写入那些已经入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果q被垃圾收集,将调用此方法。关闭队列不会在队列使用者中产生任何类型的数据结束信号或异常。例如,如果某个使用者正在被阻塞在get()操作上,关闭生产者中的队列不会导致get()方法返回错误。3q.join_thread():连接队列的后台线程。此方法用于在调用q.close()方法之后,等待所有队列项被消耗。默认情况下,此方法由不是q的原始创建者的所有进程调用。调用q.cancel_join_thread方法可以禁止这种行为应用:
'''multiprocessing模块支持进程间通信的两种主要形式:管道和队列都是基于消息传递实现的,但是队列接口'''frommultiprocessingimportProcess,Queueimporttimeq=Queue(3)#put,get,put_nowait,get_nowait,full,emptyq.put(3)q.put(3)q.put(3)print(q.full())#满了print(q.get())print(q.get())print(q.get())print(q.empty())#空了生产者消费者模型
在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。
为什么要使用生产者和消费者模式
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。
什么是生产者消费者模式
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
基于队列实现生产者消费者模型
frommultiprocessingimportProcess,Queueimporttime,random,osdefconsumer(q):whileTrue:res=q.get()time.sleep(random.randint(1,3))print('\033[45m%s吃%s\033[0m'%(os.getpid(),res))defproducer(q):foriinrange(10):time.sleep(random.randint(1,3))res='包子%s'%iq.put(res)print('\033[44m%s生产了%s\033[0m'%(os.getpid(),res))if__name__=='__main__':q=Queue()#生产者们:即厨师们p1=Process(target=producer,args=(q,))#消费者们:即吃货们c1=Process(target=consumer,args=(q,))#开始p1.start()c1.start()print('主')#生产者消费者模型总结#程序中有两类角色一类负责生产数据(生产者)一类负责处理数据(消费者)#引入生产者消费者模型为了解决的问题是:平衡生产者与消费者之间的工作能力,从而提高程序整体处理数据的速度#如何实现:生产者<-->队列<——>消费者#生产者消费者模型实现类程序的解耦和此时的问题是主进程永远不会结束,原因是:生产者p在生产完后就结束了,但是消费者c在取空了q之后,则一直处于死循环中且卡在q.get()这一步。
解决方式无非是让生产者在生产完毕后,往队列中再发一个结束信号,这样消费者在接收到结束信号后就可以break出死循环
生产者在生产完毕后发送结束信号None
frommultiprocessingimportProcess,Queueimporttime,random,osdefconsumer(q):whileTrue:res=q.get()ifresisNone:break#收到结束信号则结束time.sleep(random.randint(1,3))print('\033[45m%s吃%s\033[0m'%(os.getpid(),res))defproducer(q):foriinrange(10):time.sleep(random.randint(1,3))res='包子%s'%iq.put(res)print('\033[44m%s生产了%s\033[0m'%(os.getpid(),res))q.put(None)#发送结束信号if__name__=='__main__':q=Queue()#生产者们:即厨师们p1=Process(target=producer,args=(q,))#消费者们:即吃货们c1=Process(target=consumer,args=(q,))#开始p1.start()c1.start()print('主')注意:结束信号None,不一定要由生产者发,主进程里同样可以发,但主进程需要等生产者结束后才应该发送该信号
主进程在生产者生产完毕后发送结束信号None
frommultiprocessingimportProcess,Queueimporttime,random,osdefconsumer(q):whileTrue:res=q.get()ifresisNone:break#收到结束信号则结束time.sleep(random.randint(1,3))print('\033[45m%s吃%s\033[0m'%(os.getpid(),res))defproducer(q):foriinrange(2):time.sleep(random.randint(1,3))res='包子%s'%iq.put(res)print('\033[44m%s生产了%s\033[0m'%(os.getpid(),res))if__name__=='__main__':q=Queue()#生产者们:即厨师们p1=Process(target=producer,args=(q,))#消费者们:即吃货们c1=Process(target=consumer,args=(q,))#开始p1.start()c1.start()p1.join()q.put(None)#发送结束信号print('主')但上述解决方式,在有多个生产者和多个消费者时,我们则需要用一个很low的方式去解决
有几个消费者就需要发送几次结束信号:相当low
frommultiprocessingimportProcess,Queueimporttime,random,osdefconsumer(q):whileTrue:res=q.get()ifresisNone:break#收到结束信号则结束time.sleep(random.randint(1,3))print('\033[45m%s吃%s\033[0m'%(os.getpid(),res))defproducer(name,q):foriinrange(2):time.sleep(random.randint(1,3))res='%s%s'%(name,i)q.put(res)print('\033[44m%s生产了%s\033[0m'%(os.getpid(),res))if__name__=='__main__':q=Queue()#生产者们:即厨师们p1=Process(target=producer,args=('包子',q))p2=Process(target=producer,args=('骨头',q))p3=Process(target=producer,args=('泔水',q))#消费者们:即吃货们c1=Process(target=consumer,args=(q,))c2=Process(target=consumer,args=(q,))#开始p1.start()p2.start()p3.start()c1.start()p1.join()#必须保证生产者全部生产完毕,才应该发送结束信号p2.join()p3.join()q.put(None)#有几个消费者就应该发送几次结束信号Noneq.put(None)#发送结束信号print('主')其实我们的思路无非是发送结束信号而已,有另外一种队列提供了这种机制
介绍
frommultiprocessingimportProcess,Pipeimporttime,osdefconsumer(p,name):left,right=pleft.close()whileTrue:try:baozi=right.recv()print('%s收到包子:%s'%(name,baozi))exceptEOFError:right.close()breakdefproducer(seq,p):left,right=pright.close()foriinseq:left.send(i)#time.sleep(1)else:left.close()if__name__=='__main__':left,right=Pipe()c1=Process(target=consumer,args=((left,right),'c1'))c1.start()seq=(iforiinrange(10))producer(seq,(left,right))right.close()left.close()c1.join()print('主进程')注意:生产者和消费者都没有使用管道的某个端点,就应该将其关闭,如在生产者中关闭管道的右端,在消费者中关闭管道的左端。如果忘记执行这些步骤,程序可能再消费者中的recv()操作上挂起。管道是由操作系统进行引用计数的,必须在所有进程中关闭管道后才能生产EOFError异常。因此在生产者中关闭管道不会有任何效果,付费消费者中也关闭了相同的管道端点。
管道可以用于双向通信,利用通常在客户端/服务器中使用的请求/响应模型或远程过程调用,就可以使用管道编写与进程交互的程序
展望未来,基于消息传递的并发编程是大势所趋
即便是使用线程,推荐做法也是将程序设计为大量独立的线程集合
通过消息队列交换数据。这样极大地减少了对使用锁定和其他同步手段的需求,
还可以扩展到分布式系统中
进程间通信应该尽量避免使用本节所讲的共享数据的方式
进程间数据是独立的,可以借助于队列或管道实现通信,二者都是基于消息传递的虽然进程间数据独立,但可以通过Manager实现数据共享,事实上Manager的功能远不止于此AmanagerobjectreturnedbyManager()controlsaserverprocesswhichholdsPythonobjectsandallowsotherprocessestomanipulatethemusingproxies.AmanagerreturnedbyManager()willsupporttypeslist,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Barrier,Queue,ValueandArray.Forexample,进程之间操作共享的数据
frommultiprocessingimportManager,Process,Lockimportosdefwork(d,lock):#withlock:#不加锁而操作共享的数据,肯定会出现数据错乱d['count']-=1if__name__=='__main__':lock=Lock()withManager()asm:dic=m.dict({'count':100})p_l=[]foriinrange(100):p=Process(target=work,args=(dic,lock))p_l.append(p)p.start()forpinp_l:p.join()print(dic)#{'count':94}九信号量(了解)信号量Semahpore(同线程一样)
互斥锁同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去,如果指定信号量为3,那么来一个人获得一把锁,计数加1,当计数等于3时,后面的人均需要等待。一旦释放,就有人可以获得一把锁信号量与进程池的概念很像,但是要区分开,信号量涉及到加锁的概念frommultiprocessingimportProcess,Semaphoreimporttime,randomdefgo_wc(sem,user):sem.acquire()print('%s占到一个茅坑'%user)time.sleep(random.randint(0,3))#模拟每个人拉屎速度不一样,0代表有的人蹲下就起来了sem.release()if__name__=='__main__':sem=Semaphore(5)p_l=[]foriinrange(13):p=Process(target=go_wc,args=(sem,'user%s'%i,))p.start()p_l.append(p)foriinp_l:i.join()print('============》')十事件(了解)Event(同线程一样)
例如当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个。。。手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。
创建进程池的类:如果指定numprocess为3,则进程池会从无到有创建三个进程,然后自始至终使用这三个进程去执行所有任务,不会开启其他进程
1Pool([numprocess[,initializer[,initargs]]]):创建进程池参数介绍:
1numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值2initializer:是每个工作进程启动时要执行的可调用对象,默认为None3initargs:是要传给initializer的参数组方法介绍:
1p.apply(func[,args[,kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async()2p.apply_async(func[,args[,kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。34p.close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成5P.jion():等待所有工作进程退出。此方法只能在close()或teminate()之后调用其他方法(了解部分)
同步调用apply
frommultiprocessingimportPoolimportos,timedefwork(n):print('%srun'%os.getpid())time.sleep(3)returnn**2if__name__=='__main__':p=Pool(3)#进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务res_l=[]foriinrange(10):res=p.apply(work,args=(i,))#同步调用,直到本次任务执行完毕拿到res,等待任务work执行的过程中可能有阻塞也可能没有阻塞,但不管该任务是否存在阻塞,同步调用都会在原地等着,只是等的过程中若是任务发生了阻塞就会被夺走cpu的执行权限res_l.append(res)print(res_l)异步调用apply_async
frommultiprocessingimportPoolimportos,timedefwork(n):print('%srun'%os.getpid())time.sleep(3)returnn**2if__name__=='__main__':p=Pool(3)#进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务res_l=[]foriinrange(10):res=p.apply_async(work,args=(i,))#同步运行,阻塞、直到本次任务执行完毕拿到resres_l.append(res)#异步apply_async用法:如果使用异步提交的任务,主进程需要使用jion,等待进程池内任务都处理完,然后可以用get收集结果,否则,主进程结束,进程池可能还没来得及执行,也就跟着一起结束了p.close()p.join()forresinres_l:print(res.get())#使用get来获取apply_aync的结果,如果是apply,则没有get方法,因为apply是同步执行,立刻获取结果,也根本无需get详解:apply_async与apply
回掉函数:
需要回调函数的场景:进程池中任何一个任务一旦处理完了,就立即告知主进程:我好了额,你可以处理我的结果了。主进程则调用一个函数去处理该结果,该函数即回调函数
在传统操作系统中,每个进程有一个地址空间,而且默认就有一个控制线程
线程顾名思义,就是一条流水线工作的过程,一条流水线必须属于一个车间,一个车间的工作过程是一个进程
车间负责把资源整合到一起,是一个资源单位,而一个车间内至少有一个流水线
流水线的工作需要电源,电源就相当于cpu
所以,进程只是用来把资源集中到一起(进程只是一个资源单位,或者说资源集合),而线程才是cpu上的执行单位。
多线程(即多个控制线程)的概念是,在一个进程中存在多个控制线程,多个控制线程共享该进程的地址空间,相当于一个车间内有多条流水线,都共用一个车间的资源。
例如,北京地铁与上海地铁是不同的进程,而北京地铁里的13号线是一个线程,北京地铁所有的线路共享北京地铁所有的资源,比如所有的乘客可以被所有线路拉。
创建进程的开销要远大于线程?
如果我们的软件是一个工厂,该工厂有多条流水线,流水线工作需要电源,电源只有一个即cpu(单核cpu)
一个车间就是一个进程,一个车间至少一条流水线(一个进程至少一个线程)
创建一个进程,就是创建一个车间(申请空间,在该空间内建至少一条流水线)
而建线程,就只是在一个车间内造一条流水线,无需申请空间,所以创建开销小
进程之间是竞争关系,线程之间是协作关系?
车间直接是竞争/抢电源的关系,竞争(不同的进程直接是竞争关系,是不同的程序员写的程序运行的,迅雷抢占其他进程的网速,360把其他进程当做病毒干死)一个车间的不同流水线式协同工作的关系(同一个进程的线程之间是合作关系,是同一个程序写的程序内开启动,迅雷内的线程是合作关系,不会自己干自己)
多线程指的是,在一个进程中开启多个线程,简单的讲:如果多个任务共用一块地址空间,那么必须在一个进程内开启多个线程。详细的讲分为4点:
1.多线程共享一个进程的地址空间
2.线程比进程更轻量级,线程比进程更容易创建可撤销,在许多操作系统中,创建一个线程比创建一个进程要快10-100倍,在有大量线程需要动态和快速修改时,这一特性很有用
3.若多个线程都是cpu密集型的,那么并不能获得性能上的增强,但是如果存在大量的计算和大量的I/O处理,拥有多个线程允许这些活动彼此重叠运行,从而会加快程序执行的速度。
4.在多cpu系统中,为了最大限度的利用多核,可以开启多个线程,比开进程开销要小的多。(这一条并不适用于python)
开启一个字处理软件进程,该进程肯定需要办不止一件事情,比如监听键盘输入,处理文字,定时自动将文字保存到硬盘,这三个任务操作的都是同一块数据,因而不能用多进程。只能在一个进程里并发地开启三个线程,如果是单线程,那就只能是,键盘输入时,不能处理文字和自动保存,自动保存时又不能输入和处理文字。
多个线程共享同一个进程的地址空间中的资源,是对一台计算机上多个进程的模拟,有时也称线程为轻量级的进程
而对一台计算机上多个进程,则共享物理内存、磁盘、打印机等其他物理资源。
多线程的运行也多进程的运行类似,是cpu在多个线程之间的快速切换。
类似于进程,每个线程也有自己的堆栈
不同于进程,线程库无法利用时钟中断强制线程让出CPU,可以调用thread_yield运行线程自动放弃cpu,让另外一个线程运行。
线程通常是有益的,但是带来了不小程序设计难度,线程的问题是:
1.父进程有多个线程,那么开启的子线程是否需要同样多的线程
如果是,那么附近中某个线程被阻塞,那么copy到子进程后,copy版的线程也要被阻塞吗,想一想nginx的多线程模式接收用户连接。
2.在同一个进程中,如果一个线程关闭了问题,而另外一个线程正准备往该文件内写内容呢?
如果一个线程注意到没有内存了,并开始分配更多的内存,在工作一半时,发生线程切换,新的线程也发现内存不够用了,又开始分配更多的内存,这样内存就被分配了多次,这些问题都是多线程编程的典型问题,需要仔细思考和设计。
为了实现可移植的线程程序,IEEE在IEEE标准1003.1c中定义了线程标准,它定义的线程包叫Pthread。大部分UNIX系统都支持该标准,简单介绍如下
线程的实现可以分为两类:用户级线程(User-LevelThread)和内核线线程(Kernel-LevelThread),后者又称为内核支持的线程或轻量级进程。在多线程操作系统中,各个系统的实现方式并不相同,在有的系统中实现了用户级线程,有的系统中实现了内核级线程。
用户级线程内核的切换由用户态程序自己控制内核切换,不需要内核干涉,少了进出内核态的消耗,但不能很好的利用多核Cpu,目前Linuxpthread大体是这么做的。
在用户空间模拟操作系统对进程的调度,来调用一个进程中的线程,每个进程中都会有一个运行时系统,用来调度线程。此时当该进程获取cpu时,进程内再调度出一个线程去执行,同一时刻只有一个线程执行。
内核级线程:切换由内核控制,当线程进行切换的时候,由用户态转化为内核态。切换完毕要从内核态返回用户态;可以很好的利用smp,即利用多核cpu。windows线程就是这样的。
一:以下是用户级线程和内核级线程的区别:
二:内核线程的优缺点
优点:
缺点:
三:用户进程的优缺点
用户级与内核级的多路复用,内核同一调度内核线程,每个内核线程对应n个用户线程
python并发编程之多线程
multiprocess模块的完全模仿了threading模块的接口,二者在使用层面,有很大的相似性,因而不再详细介绍
1谁的开启速度快
fromthreadingimportThreadfrommultiprocessingimportProcessimportosdefwork():print('hello')if__name__=='__main__':#在主进程下开启线程t=Thread(target=work)t.start()print('主线程/主进程')'''打印结果:hello主线程/主进程'''#在主进程下开启子进程t=Process(target=work)t.start()print('主线程/主进程')'''打印结果:主线程/主进程hello'''2瞅一瞅pid
fromthreadingimportThreadfrommultiprocessingimportProcessimportosdefwork():print('hello',os.getpid())if__name__=='__main__':#part1:在主进程下开启多个线程,每个线程都跟主进程的pid一样t1=Thread(target=work)t2=Thread(target=work)t1.start()t2.start()print('主线程/主进程pid',os.getpid())#part2:开多个进程,每个进程都有不同的pidp1=Process(target=work)p2=Process(target=work)p1.start()p2.start()print('主线程/主进程pid',os.getpid())3同一进程内的线程共享该进程的数据?
fromthreadingimportThreadfrommultiprocessingimportProcessimportosdefwork():globalnn=0if__name__=='__main__':#n=100#p=Process(target=work)#p.start()#p.join()#print('主',n)#毫无疑问子进程p已经将自己的全局的n改成了0,但改的仅仅是它自己的,查看父进程的n仍然为100n=1t=Thread(target=work)t.start()t.join()print('主',n)#查看结果为0,因为同一进程内的线程之间共享进程内的数据四练习练习一:
多线程并发的socket服务端
#_*_coding:utf-8_*_#!/usr/bin/envpythonimportmultiprocessingimportthreadingimportsockets=socket.socket(socket.AF_INET,socket.SOCK_STREAM)s.bind(('127.0.0.1',8080))s.listen(5)defaction(conn):whileTrue:data=conn.recv(1024)print(data)conn.send(data.upper())if__name__=='__main__':whileTrue:conn,addr=s.accept()p=threading.Thread(target=action,args=(conn,))p.start()客户端
#_*_coding:utf-8_*_#!/usr/bin/envpythonimportsockets=socket.socket(socket.AF_INET,socket.SOCK_STREAM)s.connect(('127.0.0.1',8080))whileTrue:msg=input('>>:').strip()ifnotmsg:continues.send(msg.encode('utf-8'))data=s.recv(1024)print(data)练习二:三个任务,一个接收用户输入,一个将用户输入的内容格式化成大写,一个将格式化后的结果存入文件
fromthreadingimportThreadimporttimedefsayhi(name):time.sleep(2)print('%ssayhello'%name)if__name__=='__main__':t=Thread(target=sayhi,args=('egon',))t.start()t.join()print('主线程')print(t.is_alive())'''egonsayhello主线程False'''六守护线程无论是进程还是线程,都遵循:守护xxx会等待主xxx运行完毕后被销毁
需要强调的是:运行完毕并非终止运行
#1.对主进程来说,运行完毕指的是主进程代码运行完毕#2.对主线程来说,运行完毕指的是主线程所在的进程内所有非守护线程统统运行完毕,主线程才算运行完毕详细解释:
#1主进程在其代码结束后就已经算运行完毕了(守护进程在此时就被回收),然后主进程会一直等非守护的子进程都运行完毕后回收子进程的资源(否则会产生僵尸进程),才会结束,#2主线程在其他非守护线程运行完毕后才算运行完毕(守护线程在此时就被回收)。因为主线程的结束意味着进程的结束,进程整体的资源都将被回收,而进程必须保证非守护线程都运行完毕后才能结束。fromthreadingimportThreadimporttimedefsayhi(name):time.sleep(2)print('%ssayhello'%name)if__name__=='__main__':t=Thread(target=sayhi,args=('egon',))t.setDaemon(True)#必须在t.start()之前设置t.start()print('主线程')print(t.is_alive())'''主线程True'''迷惑人的例子
三个需要注意的点:#1.线程抢的是GIL锁,GIL锁相当于执行权限,拿到执行权限后才能拿到互斥锁Lock,其他线程也可以抢到GIL,但如果发现Lock仍然没有被释放则阻塞,即便是拿到执行权限GIL也要立刻交出来#2.join是等待所有,即整体串行,而锁只是锁住修改共享数据的部分,即部分串行,要想保证数据安全的根本原理在于让并发变成串行,join与互斥锁都可以实现,毫无疑问,互斥锁的部分串行效率要更高#3.一定要看本小节最后的GIL与互斥锁的经典分析GILVSLock
然后,我们可以得出结论:保护不同的数据就应该加不同的锁。
最后,问题就很明朗了,GIL与Lock是两把锁,保护的数据不一样,前者是解释器级别的(当然保护的就是解释器级别的数据,比如垃圾回收的数据),后者是保护用户自己开发的应用程序的数据,很明显GIL不负责这件事,只能用户自定义加锁处理,即Lock
过程分析:所有线程抢的是GIL锁,或者说所有线程抢的是执行权限
线程1抢到GIL锁,拿到执行权限,开始执行,然后加了一把Lock,还没有执行完毕,即线程1还未释放Lock,有可能线程2抢到GIL锁,开始执行,执行过程中发现Lock还没有被线程1释放,于是线程2进入阻塞,被夺走执行权限,有可能线程1拿到GIL,然后正常执行到释放Lock。。。这就导致了串行运行的效果
既然是串行,那我们执行
t1.start()
t1.join
t2.start()
t2.join()
这也是串行执行啊,为何还要加Lock呢,需知join是等待t1所有的代码执行完,相当于锁住了t1的所有代码,而Lock只是锁住一部分操作共享数据的代码。
importthreadingR=threading.Lock()R.acquire()'''对公共数据的操作'''R.release()#####################################################################################################################################################fromthreadingimportThread,Lockimportos,timedefwork():globalnlock.acquire()temp=ntime.sleep(0.1)n=temp-1lock.release()if__name__=='__main__':lock=Lock()n=100l=[]foriinrange(100):p=Thread(target=work)l.append(p)p.start()forpinl:p.join()print(n)#结果肯定为0,由原来的并发执行变成串行,牺牲了执行效率保证了数据安全GIL锁与互斥锁综合分析(重点!!!)
分析:#1.100个线程去抢GIL锁,即抢执行权限#2.肯定有一个线程先抢到GIL(暂且称为线程1),然后开始执行,一旦执行就会拿到lock.acquire()#3.极有可能线程1还未运行完毕,就有另外一个线程2抢到GIL,然后开始运行,但线程2发现互斥锁lock还未被线程1释放,于是阻塞,被迫交出执行权限,即释放GIL#4.直到线程1重新抢到GIL,开始从上次暂停的位置继续执行,直到正常释放互斥锁lock,然后其他的线程再重复234的过程互斥锁与join的区别(重点!!!)
所谓死锁:是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,如下就是死锁
fromthreadingimportThread,LockimporttimemutexA=Lock()mutexB=Lock()classMyThread(Thread):defrun(self):self.func1()self.func2()deffunc1(self):mutexA.acquire()print('\033[41m%s拿到A锁\033[0m'%self.name)mutexB.acquire()print('\033[42m%s拿到B锁\033[0m'%self.name)mutexB.release()mutexA.release()deffunc2(self):mutexB.acquire()print('\033[43m%s拿到B锁\033[0m'%self.name)time.sleep(2)mutexA.acquire()print('\033[44m%s拿到A锁\033[0m'%self.name)mutexA.release()mutexB.release()if__name__=='__main__':foriinrange(10):t=MyThread()t.start()'''Thread-1拿到A锁Thread-1拿到B锁Thread-1拿到B锁Thread-2拿到A锁然后就卡住,死锁了'''解决方法,递归锁,在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。
这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁:
mutexA=mutexB=threading.RLock()#一个线程拿到锁,counter加1,该线程内又碰到加锁的情况,则counter继续加1,这期间所有其他线程都只能等待,等待该线程释放所有锁,即counter递减到0为止十信号量Semaphore同进程的一样
Semaphore管理一个内置的计数器,每当调用acquire()时内置计数器-1;调用release()时内置计数器+1;计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。
实例:(同时只有5个线程可以获得semaphore,即可以限制最大连接数为5):
fromthreadingimportThread,Semaphoreimportthreadingimporttime#deffunc():#ifsm.acquire():#print(threading.currentThread().getName()+'getsemaphore')#time.sleep(2)#sm.release()deffunc():sm.acquire()print('%sgetsm'%threading.current_thread().getName())time.sleep(3)sm.release()if__name__=='__main__':sm=Semaphore(5)foriinrange(23):t=Thread(target=func)t.start()与进程池是完全不同的概念,进程池Pool(4),最大只能产生4个进程,而且从头到尾都只是这四个进程,不会产生新的,而信号量是产生一堆线程/进程
同进程的一样
线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象,而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件,继续执行
例如,有多个工作线程尝试链接MySQL,我们想要在链接前确保MySQL服务正常才让那些工作线程去连接MySQL服务器,如果连接不成功,都会去尝试重新连接。那么我们就可以采用threading.Event机制来协调各个工作线程的连接操作
fromthreadingimportThread,Eventimportthreadingimporttime,randomdefconn_mysql():count=1whilenotevent.is_set():ifcount>3:raiseTimeoutError('链接超时')print('<%s>第%s次尝试链接'%(threading.current_thread().getName(),count))event.wait(0.5)count+=1print('<%s>链接成功'%threading.current_thread().getName())defcheck_mysql():print('\033[45m[%s]正在检查mysql\033[0m'%threading.current_thread().getName())time.sleep(random.randint(2,4))event.set()if__name__=='__main__':event=Event()conn1=Thread(target=conn_mysql)conn2=Thread(target=conn_mysql)check=Thread(target=check_mysql)conn1.start()conn2.start()check.start()十二条件Condition(了解)使得线程等待,只有满足某条件时,才释放n个线程
importthreadingdefrun(n):con.acquire()con.wait()print("runthethread:%s"%n)con.release()if__name__=='__main__':con=threading.Condition()foriinrange(10):t=threading.Thread(target=run,args=(i,))t.start()whileTrue:inp=input('>>>')ifinp=='q':breakcon.acquire()con.notify(int(inp))con.release()############################################################################
defcondition_func():ret=Falseinp=input('>>>')ifinp=='1':ret=Truereturnretdefrun(n):con.acquire()con.wait_for(condition_func)print("runthethread:%s"%n)con.release()if__name__=='__main__':con=threading.Condition()foriinrange(10):t=threading.Thread(target=run,args=(i,))t.start()十三定时器定时器,指定n秒后执行某操作
fromthreadingimportTimerdefhello():print("hello,world")t=Timer(1,hello)t.start()#after1seconds,"hello,world"willbeprinted验证码定时器
fromthreadingimportTimerimportrandom,timeclassCode:def__init__(self):self.make_cache()defmake_cache(self,interval=5):self.cache=self.make_code()print(self.cache)self.t=Timer(interval,self.make_cache)self.t.start()defmake_code(self,n=4):res=''foriinrange(n):s1=str(random.randint(0,9))s2=chr(random.randint(65,90))res+=random.choice([s1,s2])returnresdefcheck(self):whileTrue:inp=input('>>:').strip()ifinp.upper()==self.cache:print('验证成功',end='\n')self.t.cancel()breakif__name__=='__main__':obj=Code()obj.check()十四线程queuequeue队列:使用importqueue,用法与进程Queue一样
queueisespeciallyusefulinthreadedprogrammingwheninformationmustbeexchangedsafelybetweenmultiplethreads.
importqueueq=queue.Queue()q.put('first')q.put('second')q.put('third')print(q.get())print(q.get())print(q.get())'''结果(先进先出):firstsecondthird'''classqueue.LifoQueue(maxsize=0)#lastinfisrtout
importqueueq=queue.LifoQueue()q.put('first')q.put('second')q.put('third')print(q.get())print(q.get())print(q.get())'''结果(后进先出):thirdsecondfirst'''classqueue.PriorityQueue(maxsize=0)#存储数据时可设置优先级的队列
importqueueq=queue.PriorityQueue()#put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高q.put((20,'a'))q.put((10,'b'))q.put((30,'c'))print(q.get())print(q.get())print(q.get())'''结果(数字越小优先级越高,优先级高的优先出队):(10,'b')(20,'a')(30,'c')'''其他
#1介绍concurrent.futures模块提供了高度封装的异步调用接口ThreadPoolExecutor:线程池,提供异步调用ProcessPoolExecutor:进程池,提供异步调用Bothimplementthesameinterface,whichisdefinedbytheabstractExecutorclass.#2基本方法#submit(fn,*args,**kwargs)异步提交任务#map(func,*iterables,timeout=None,chunksize=1)取代for循环submit的操作#shutdown(wait=True)相当于进程池的pool.close()+pool.join()操作wait=True,等待池内所有任务执行完毕回收完资源后才继续wait=False,立即返回,并不会等待池内的任务执行完毕但不管wait参数为何值,整个程序都会等到所有任务执行完毕submit和map必须在shutdown之前#result(timeout=None)取得结果#add_done_callback(fn)回调函数ProcessPoolExecutor
#介绍TheProcessPoolExecutorclassisanExecutorsubclassthatusesapoolofprocessestoexecutecallsasynchronously.ProcessPoolExecutorusesthemultiprocessingmodule,whichallowsittoside-steptheGlobalInterpreterLockbutalsomeansthatonlypicklableobjectscanbeexecutedandreturned.classconcurrent.futures.ProcessPoolExecutor(max_workers=None,mp_context=None)AnExecutorsubclassthatexecutescallsasynchronouslyusingapoolofatmostmax_workersprocesses.Ifmax_workersisNoneornotgiven,itwilldefaulttothenumberofprocessorsonthemachine.Ifmax_workersislowerorequalto0,thenaValueErrorwillberaised.#用法fromconcurrent.futuresimportThreadPoolExecutor,ProcessPoolExecutorimportos,time,randomdeftask(n):print('%sisruning'%os.getpid())time.sleep(random.randint(1,3))returnn**2if__name__=='__main__':executor=ProcessPoolExecutor(max_workers=3)futures=[]foriinrange(11):future=executor.submit(task,i)futures.append(future)executor.shutdown(True)print('+++>')forfutureinfutures:print(future.result())ThreadPoolExecutor
#介绍ThreadPoolExecutorisanExecutorsubclassthatusesapoolofthreadstoexecutecallsasynchronously.classconcurrent.futures.ThreadPoolExecutor(max_workers=None,thread_name_prefix='')AnExecutorsubclassthatusesapoolofatmostmax_workersthreadstoexecutecallsasynchronously.Changedinversion3.5:Ifmax_workersisNoneornotgiven,itwilldefaulttothenumberofprocessorsonthemachine,multipliedby5,assumingthatThreadPoolExecutorisoftenusedtooverlapI/OinsteadofCPUworkandthenumberofworkersshouldbehigherthanthenumberofworkersforProcessPoolExecutor.Newinversion3.6:Thethread_name_prefixargumentwasaddedtoallowuserstocontrolthethreading.Threadnamesforworkerthreadscreatedbythepoolforeasierdebugging.#用法与ProcessPoolExecutor相同map的用法
fromconcurrent.futuresimportThreadPoolExecutor,ProcessPoolExecutorimportos,time,randomdeftask(n):print('%sisruning'%os.getpid())time.sleep(random.randint(1,3))returnn**2if__name__=='__main__':executor=ThreadPoolExecutor(max_workers=3)#foriinrange(11):#future=executor.submit(task,i)executor.map(task,range(1,12))#map取代了for+submit回调函数
paramiko是一个用于做远程控制的模块,使用该模块可以对远程服务器进行命令或文件操作,值得一说的是,fabric和ansible内部的远程管理就是使用的paramiko来现实。2.下载安装
pip3installparamiko#在python3中在python2中
pycrypto,由于paramiko模块内部依赖pycrypto,所以先下载安装pycrypto#在python2中pip3installpycryptopip3installparamiko注:如果在安装pycrypto2.0.1时发生如下错误command'gcc'failedwithexitstatus1...可能是缺少python-dev安装包导致如果gcc没有安装,请事先安装gcc3.使用
SSHClient
用于连接远程服务器并执行基本命令
基于用户名密码连接:
importparamiko#创建SSH对象ssh=paramiko.SSHClient()#允许连接不在know_hosts文件中的主机ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())#连接服务器ssh.connect(hostname='120.92.84.249',port=22,username='root',password='xxx')#执行命令stdin,stdout,stderr=ssh.exec_command('df')#获取命令结果result=stdout.read()print(result.decode('utf-8'))#关闭连接ssh.close()SSHClient封装Transport
importparamikotransport=paramiko.Transport(('120.92.84.249',22))transport.connect(username='root',password='xxx')ssh=paramiko.SSHClient()ssh._transport=transportstdin,stdout,stderr=ssh.exec_command('df')res=stdout.read()print(res.decode('utf-8'))transport.close()基于公钥密钥连接:
客户端文件名:id_rsa
服务端必须有文件名:authorized_keys(在用ssh-keygen时,必须制作一个authorized_keys,可以用ssh-copy-id来制作)
importparamikoprivate_key=paramiko.RSAKey.from_private_key_file('/tmp/id_rsa')#创建SSH对象ssh=paramiko.SSHClient()#允许连接不在know_hosts文件中的主机ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())#连接服务器ssh.connect(hostname='120.92.84.249',port=22,username='root',pkey=private_key)#执行命令stdin,stdout,stderr=ssh.exec_command('df')#获取命令结果result=stdout.read()print(result.decode('utf-8'))#关闭连接ssh.close()SSHClient封装Transport
importparamikoprivate_key=paramiko.RSAKey.from_private_key_file('/tmp/id_rsa')transport=paramiko.Transport(('120.92.84.249',22))transport.connect(username='root',pkey=private_key)ssh=paramiko.SSHClient()ssh._transport=transportstdin,stdout,stderr=ssh.exec_command('df')result=stdout.read()print(result.decode('utf-8'))transport.close()基于私钥字符串进行连接
用于连接远程服务器并执行上传下载
基于用户名密码上传下载
importparamikotransport=paramiko.Transport(('120.92.84.249',22))transport.connect(username='root',password='xxx')sftp=paramiko.SFTPClient.from_transport(transport)#将location.py上传至服务器/tmp/test.pysftp.put('/tmp/id_rsa','/etc/test.rsa')#将remove_path下载到本地local_pathsftp.get('remove_path','local_path')transport.close()基于公钥密钥上传下载
importparamikoprivate_key=paramiko.RSAKey.from_private_key_file('/tmp/id_rsa')transport=paramiko.Transport(('120.92.84.249',22))transport.connect(username='root',pkey=private_key)sftp=paramiko.SFTPClient.from_transport(transport)#将location.py上传至服务器/tmp/test.pysftp.put('/tmp/id_rsa','/tmp/a.txt')#将remove_path下载到本地local_pathsftp.get('remove_path','local_path')transport.close()Demo
#!/usr/bin/envpython#-*-coding:utf-8-*-importparamikoimportuuidclassHaproxy(object):def__init__(self):self.host='172.16.103.191'self.port=22self.username='root'self.pwd='123'self.__k=Nonedefcreate_file(self):file_name=str(uuid.uuid4())withopen(file_name,'w')asf:f.write('sb')returnfile_namedefrun(self):self.connect()self.upload()self.rename()self.close()defconnect(self):transport=paramiko.Transport((self.host,self.port))transport.connect(username=self.username,password=self.pwd)self.__transport=transportdefclose(self):self.__transport.close()defupload(self):#连接,上传file_name=self.create_file()sftp=paramiko.SFTPClient.from_transport(self.__transport)#将location.py上传至服务器/tmp/test.pysftp.put(file_name,'/home/root/tttttttttttt.py')defrename(self):ssh=paramiko.SSHClient()ssh._transport=self.__transport#执行命令stdin,stdout,stderr=ssh.exec_command('mv/home/root/tttttttttttt.py/home/root/ooooooooo.py')#获取命令结果result=stdout.read()ha=Haproxy()ha.run()