1、进程的定义——进程就是一个程序在一个数据集上的一次动态执行过程。进程一般由程序、数据集、进程控制块三部分组成。
举一例说明进程:想象一位有一手好厨艺的计算机科学家正在为他的女儿烘制生日蛋糕。他有做生日蛋糕的食谱,厨房里有所需的原料:面粉、鸡蛋、糖、香草汁等。在这个比喻中,做蛋糕的食谱就是程序(即用适当形式描述的算法)计算机科学家就是处理器(cpu),而做蛋糕的各种原料就是输入数据。进程就是厨师阅读食谱、取来各种原料以及烘制蛋糕等一系列动作的总和。现在假设计算机科学家的儿子哭着跑了进来,说他的头被一只蜜蜂蛰了。计算机科学家就记录下他照着食谱做到哪儿了(保存进程的当前状态),然后拿出一本急救手册,按照其中的指示处理蛰伤。这里,我们看到处理机从一个进程(做蛋糕)切换到另一个高优先级的进程(实施医疗救治),每个进程拥有各自的程序(食谱和急救手册)。当蜜蜂蛰伤处理完之后,这位计算机科学家又回来做蛋糕,从他离开时的那一步继续做下去。
2、线程的定义——一个进程有多个任务,线程是cpu的最小执行单元,例如文本编辑器有键盘输入、屏幕显示、保存硬盘
3、进程和线程的关系
(1)一个线程只能属于一个进程,而一个进程可以有多个线程,但至少有一个线程。(2)资源分配给进程,同一进程的所有线程共享该进程的所有资源。(3)CPU分给线程,即真正在CPU上运行的是线程。
4、并行和并发
二、threading模块
1、线程对象的创建(thread类直接创建)
importthreadingimporttimedefcountNum(n):#定义某个线程要运行的函数print("runningonnumber:%s"%n)time.sleep(3)if__name__=='__main__':t1=threading.Thread(target=countNum,args=(23,))#生成一个线程实例t2=threading.Thread(target=countNum,args=(34,))t1.start()#启动线程t2.start()print("ending!")2、thread类的实例方法
join()和setDaemon()
#继承Thread式创建importthreadingimporttimeclassMyThread(threading.Thread):def__init__(self,num):threading.Thread.__init__(self)self.num=numdefrun(self):print("runningonnumber:%s"%self.num)time.sleep(3)t1=MyThread(56)t2=MyThread(78)t1.start()t2.start()print("ending")4、GIL全局解释器锁
Python中的线程是操作系统的原生线程,Python虚拟机使用一个全局解释器锁(GlobalInterpreterLock)来互斥线程对Python虚拟机的使用。为了支持多线程机制,一个基本的要求就是需要实现不同线程对共享资源访问的互斥,所以引入了GIL。GIL:在一个线程拥有了解释器的访问权之后,其他的所有线程都必须等待它释放解释器的访问权,即使这些线程的下一条指令并不会互相影响。在调用任何PythonCAPI之前,要先获得GILGIL缺点:多处理器退化为单处理器;优点:避免大量的加锁解锁操作
GIL的影响:
无论你启多少个线程,你有多少个cpu,Python在执行一个进程的时候会淡定的在同一时刻只允许一个线程运行。所以,python是无法利用多核CPU实现多线程的。这样,python对于计算密集型的任务开多线程的效率甚至不如串行(没有大量切换),但是,对于IO密集型的任务效率还是有显著提升的。
5、死锁和递归锁
所谓死锁:是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程。
importthreadingimporttimeclassMyThread(threading.Thread):def__init__(self):threading.Thread.__init__(self)defrun(self):self.foo()self.bar()deffoo(self):lockA.acquire()print('iam%sgetlockA-----%s'%(self.name,time.time()))lockB.acquire()print('iam%sgetlockB-----%s'%(self.name,time.time()))lockB.release()lockA.release()defbar(self):lockB.acquire()print('iam%sgetlockB-----%s'%(self.name,time.time()))time.sleep(1)lockA.acquire()print('iam%sgetlockA-----%s'%(self.name,time.time()))lockA.release()lockB.release()lockA=threading.Lock()lockB=threading.Lock()foriinrange(10):t=MyThread()t.start()在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁:
mutex=threading.RLock()
6、信号量(Semaphore)
Semaphore管理一个内置的计数器,每当调用acquire()时内置计数器-1;调用release()时内置计数器+1;计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。
实例:(同时只有5个线程可以获得semaphore,即可以限制最大连接数为5):
#importthreading#importtime#semaphore=threading.Semaphore(5)#deffoo():#semaphore.acquire()#time.sleep(2)#print('>>>>>ok')#semaphore.release()##foriinrange(100):#t=threading.Thread(target=foo,args=())#t.start()7、队列(queue)
7.1get与put方法
'''创建一个“队列”对象importQueueq=Queue.Queue(maxsize=10)Queue.Queue类即是一个队列的同步实现。队列长度可为无限或者有限。可通过Queue的构造函数的可选参数maxsize来设定队列长度。如果maxsize小于1就表示队列长度无限。将一个值放入队列中q.put(10)调用队列对象的put()方法在队尾插入一个项目。put()有两个参数,第一个item为必需的,为插入项目的值;第二个block为可选参数,默认为1。如果队列当前为空且block为1,put()方法就使调用线程暂停,直到空出一个数据单元。如果block为0,put方法将引发Full异常。将一个值从队列中取出q.get()调用队列对象的get()方法从队头删除并返回一个项目。可选参数为block,默认为True。如果队列为空且block为True,get()就使调用线程暂停,直至有项目可用。如果队列为空且block为False,队列将引发Empty异常。'''7.2join和task_done方法
'''join()阻塞进程,直到所有任务完成,需要配合另一个方法task_done。defjoin(self):withself.all_tasks_done:whileself.unfinished_tasks:self.all_tasks_done.wait()task_done()表示某个任务完成。每一条get语句后需要一条task_done。importqueueq=queue.Queue(5)q.put(10)q.put(20)print(q.get())q.task_done()print(q.get())q.task_done()q.join()print("ending!")'''7.3其他的常用方法
'''PythonQueue模块有三种队列及构造函数:1、PythonQueue模块的FIFO队列先进先出。classqueue.Queue(maxsize)2、LIFO类似于堆,即先进后出。classqueue.LifoQueue(maxsize)3、还有一种是优先级队列级别越低越先出来。classqueue.PriorityQueue(maxsize)importqueue#先进后出q=queue.LifoQueue()q.put(34)q.put(56)q.put(12)#优先级q=queue.PriorityQueue()q.put([5,100])q.put([7,200])q.put([3,"hello"])q.put([4,{"name":"alex"}])while1:data=q.get()print(data)'''8、Event对象
线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象,而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件,继续执行
event.isSet():返回event的状态值;event.wait():如果event.isSet()==False将阻塞线程;event.set():设置event的状态值为True,所有阻塞池的线程激活进入就绪状态,等待操作系统调度;event.clear():恢复event的状态值为False。mportthreading,timeevent=threading.Event()deffoo():whilenotevent.is_set():print("wait......")event.wait(2)#event.wait()#ifevent对象内的标志位为False,阻塞,为Ture,继续执行print("Connecttoredisserver")foriinrange(5):t=threading.Thread(target=foo,args=())t.start()print("attempttostartredisserver")time.sleep(100)event.set()#设置标志位为True三、multiprocessing模块(进程的调用)
MultiprocessingisapackagethatsupportsspawningprocessesusinganAPIsimilartothethreadingmodule.Themultiprocessingpackageoffersbothlocalandremoteconcurrency,effectivelyside-steppingtheGlobalInterpreterLockbyusingsubprocessesinsteadofthreads.Duetothis,themultiprocessingmoduleallowstheprogrammertofullyleveragemultipleprocessorsonagivenmachine.ItrunsonbothUnixandWindows.
由于GIL的存在,python中的多线程其实并不是真正的多线程,如果想要充分地使用多核CPU的资源,在python中大部分情况需要使用多进程。
multiprocessing包是Python中的多进程管理包。与threading.Thread类似,它可以利用multiprocessing.Process对象来创建一个进程。该进程可以运行在Python程序内部编写的函数。该Process对象与Thread对象的用法相同,也有start(),run(),join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition类(这些对象可以像多线程那样,通过参数传递给各个进程),用以同步进程,其用法与threading包中的同名类一致。所以,multiprocessing的很大一部份与threading使用同一套API,只不过换到了多进程的情境。
importmultiprocessingimporttimedeffoo(n):ret=0foriinrange(n):ret+=iprint(ret)defbar(n):ret=1foriinrange(1,n):ret*=iprint(ret)if__name__=='__main__':s=time.time()#foo(100000000)#bar(100000)p1=multiprocessing.Process(target=foo,args=(100000000,))p1.start()p2=multiprocessing.Process(target=bar,args=(100000,))p2.start()p1.join()p2.join()print(time.time()-s)#13.591、process类
构造方法:
Process([group[,target[,name[,args[,kwargs]]]]])
group:线程组,目前还没有实现,库引用中提示必须是None;target:要执行的方法;name:进程名;args/kwargs:要传入方法的参数。
实例方法:
is_alive():返回进程是否在运行。
join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。
start():进程准备就绪,等待CPU调度
run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。
terminate():不管任务是否完成,立即停止工作进程
属性:
daemon:和线程的setDeamon功能一样
name:进程名字。
pid:进程号。
2、进程间的通讯
进程队列queue
#frommultiprocessingimportProcess,Queue#importqueue#deffoo(q,n):#q.put(n*n+1)#print("sonprocess",id(q))#if__name__=='__main__':#q=Queue()#print('mainprocess',id(q))#foriinrange(3):#p=Process(target=foo,args=(q,i))#p.start()##print(q.get())#print(q.get())#print(q.get())管道(pipe)
ThePipe()functionreturnsapairofconnectionobjectsconnectedbyapipewhichbydefaultisduplex(two-way).
#frommultiprocessingimportProcess,Pipe##deffoo(conn):#conn.send([12,{'name':'yuan'},'hello'])#response=conn.recv()#print('response',response)#conn.close()#if__name__=='__main__':#parent_conn,child_conn=Pipe()#p=Process(target=foo,args=(child_conn,))#p.start()#print(parent_conn.recv())#parent_conn.send('hellosun')#p.join()manage
Queue和pipe只是实现了数据交互,并没实现数据共享,即一个进程去更改另一个进程的数据。
AmanagerobjectreturnedbyManager()controlsaserverprocesswhichholdsPythonobjectsandallowsotherprocessestomanipulatethemusingproxies.
#frommultiprocessingimportManager,Process#deffoo(l,i):#l.append(i**2)##if__name__=='__main__':#manager=Manager()#Mlist=manager.list([11,22,33])#l=[]#foriinrange(5):#p=Process(target=foo,args=(Mlist,i))#p.start()#l.append(p)##foriinl:#i.join()#print(Mlist)进程池
进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。
#frommultiprocessingimportPool#importtime#deffoo(args):#time.sleep(1)#print(args)##if__name__=='__main__':#p=Pool(5)#foriinrange(30):#p.apply_async(func=foo,args=(i,))#p.close()#p.join()#print('ending')进程池内部维护一个进程序列,当使用时,去进程池中获取一个进程,如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止。
进程池中有以下几个主要方法:
四、协程
协程,又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程。
协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:
协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。
1、yield与协程
fromgreenletimportgreenletdeftest1():print(12)gr2.switch()print(34)gr2.switch()deftest2():print(56)gr1.switch()print(78)gr1=greenlet(test1)gr2=greenlet(test2)gr1.switch(3、基于greenlet的框架
greenlet模块实现协程
Python通过yield提供了对协程的基本支持,但是不完全。而第三方的gevent为Python提供了比较完善的协程支持。
gevent是第三方库,通过greenlet实现协程,其基本思想是:
当一个greenlet遇到IO操作时,比如访问网络,就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。由于IO操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO。
由于切换是在IO操作时自动完成,所以gevent需要修改Python自带的一些标准库,这一过程在启动时通过monkeypatch完成:
importgeventimporttimedeffoo():print("runninginfoo")gevent.sleep(2)print("switchtofooagain")defbar():print("switchtobar")gevent.sleep(5)print("switchtobaragain")start=time.time()gevent.joinall([gevent.spawn(foo),gevent.spawn(bar)])print(time.time()-start)当然,实际代码里,我们不会用gevent.sleep()去切换协程,而是在执行到IO操作时,gevent自动切换,代码如下:
协程的好处:
无需线程上下文切换的开销无需原子操作锁定及同步的开销方便切换控制流,简化编程模型高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理。缺点:
无法利用多核资源:协程的本质是个单线程,它不能同时将单个CPU的多个核用上,协程需要和进程配合才能运行在多CPU上.当然我们日常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序
五、IO模型
同步(synchronous)IO和异步(asynchronous)IO,阻塞(blocking)IO和非阻塞(non-blocking)IO分别是什么,到底有什么区别?这个问题其实不同的人给出的答案都可能不同,比如wiki,就认为asynchronousIO和non-blockingIO是一个东西。这其实是因为不同的人的知识背景不同,并且在讨论这个问题的时候上下文(context)也不相同。所以,为了更好的回答这个问题,先限定一下本文的上下文。本文讨论的背景是Linux环境下的networkIO。
由于signaldrivenIO在实际中并不常用,所以我这只提及剩下的四种IOModel。再说一下IO发生时涉及的对象和步骤。对于一个networkIO(这里我们以read举例),它会涉及到两个系统对象,一个是调用这个IO的process(orthread),另一个就是系统内核(kernel)。当一个read操作发生时,它会经历两个阶段:
记住这两点很重要,因为这些IOModel的区别就是在两个阶段上各有不同的情况。
1、非阻塞IO(non-blockingIO)
2、blockingIO(阻塞IO)
importsocket,timesock=socket.socket()sock.bind(("127.0.0.1",8080))sock.listen(5)while1:print('serveriswaiting.....')conn,addr=sock.accept()#默认阻塞方式,等待客户端连接print("conn:",conn)while1:data=conn.recv(1024)print(data.decode("utf8"))ifdata.decode("utf8")=="q":breakrespnse=input(">>>>>")conn.send(respnse.encode("utf8"))#exceptExceptionase:#print("yuan")#time.sleep(3)
2、IO多路复用(IOmultiplexing)
importsocketimportselectsock=socket.socket()sock.bind(("0.0.0.0",8080))sock.listen(5)sock.setblocking(False)inp=[sock,]#监听的套接字对象的列表while1:r=select.select(inp,[],[])#[sock,conn1,conn2,conn3]sock.accept()第一件事print("r",r[0])forobjinr[0]:#[conn1,conn3]ifobj==sock:conn,addr=obj.accept()inp.append(conn)else:data=obj.recv(1024)print(data.decode("utf8"))respnse=input(">>>>>")obj.send(respnse.encode("utf8"))
importselectors,socketsel=selectors.DefaultSelector()sock=socket.socket()sock.bind(("0.0.0.0",8080))sock.listen(5)sock.setblocking(False)defread(conn,mask):data=conn.recv(1024)print(data.decode("utf8"))resp=input(">>>>>")conn.send(resp.encode("utf8"))defaccept(sock,mask):conn,addr=sock.accept()sel.register(conn,selectors.EVENT_READ,read)#绑定套接字对象和函数sel.register(sock,selectors.EVENT_READ,accept)#while1:events=sel.select()#监听套接字对象forkey,maskinevents:callback=key.data#readcallback(key.fileobj,mask)