本文主要讲述进程线程协程在python中的使用。主要说明各自的创建、通信及联系与区别,了解各自的适用场景,能更好的利用并发实现多任务开发。
第一部分:
线程
2.1、线程的创建
2.1.1 使用target目标函数创建
def job(name):
print('子线程{}--{}'.format(name,threading.current_thread().getName()))
time.sleep(1)
print("do something1 %s" % (name))
def job1(name):
print('子线程{}--{}'.format(name, threading.current_thread().getName()))
time.sleep(2)
print("do something2 %s" % (name))
if __name__ == "__main__":
t1 = threading.Thread(target=job, args=('job',))
t2 = threading.Thread(target=job1, args=('job1',))
# t2.setDaemon(True) #守护线程
t1.start()
t2.start()
# t1.join()
# t2.join()
print('主线程{}'.format(os.getpid()))
2.1.2 使用threading类创建
class myThread(threading.Thread):
def __init__(self, ti):
threading.Thread.__init__(self)
self.ti = ti
def run(self):
for i in range(self.ti):
msg = 'I am' + self.name + '@' + str(i)
time.sleep(1)
print(msg)
if __name__ == "__main__":
t = myThread(3)
t.start()
t.join()
print('主线程')
2.2、线程通信
线程通信的常见方式是共享变量和queue消息队列。共享变量通信,是线程间通信最简单的方式,但也是最容易出问题的方式。消息队列通信也就是使用queue这个类来表示变量,从而达到线程安全,由于queue这个类内部封装了deque,也就是python中的双端队列。双端对列本身就是安全界别很高的一种类型,实现线程间的安全操作。
class SafeQueue(threading.Thread):
# 退出队列的信号
SIG_QUIT = 'sig_quit'
def __init__(self,recv_calback):
threading.Thread.__init__(self)
## 构造线程安全队列
self.Q = queue.Queue()
self.recv_calback =recv_calback
self.start()
#放入队列
def put(self,datas):
threadName =threading.currentThread().name
# print(threadName)
self.Q.put(datas)
#关闭队列
def close(self):
self.put(SafeQueue.SIG_QUIT)
##主循环,处理队列接收
def run(self):
while True:
try:
print('等待中消息队列任务……')
datas = self.Q.get()
if datas == SafeQueue.SIG_QUIT: #收到退出队列信号
break
#回调客户端
self.recv_calback(datas)
except: # 抛出打断异常
break
##队列回调函数
def queue_callback(datas):
print("接收到数据:",datas)
## 将子任务结果加入 全局集合
try:
array_mutex.acquire()#锁定
datas_array.append(datas)
if len(datas_array) == 4:
safeQueue.close()
print("=======大任务计算结束=========== result:", datas_array)
finally:
array_mutex.release()#释放
## 子任务计算函数
def calclulate(num):
threadName =threading.currentThread().name
print(threadName , ' 正在计算')
time.sleep(2)
print(threadName, ' 计算完成,加入队列')
#将结果放入队列
safeQueue.put(threadName+"' result" + str(num))
#### ---------- main start ----------
#创建锁
array_mutex = threading.Lock()
## 存储 子任务计算结果的 集合
datas_array = []
##构造安全队列
safeQueue = SafeQueue(queue_callback) #开辟子线程队列,在get处阻塞
# safeQueue.join()
for i in range(1,5):
threading.Thread(target=calclulate, args=(i,)).start()
2.3、线程同步安全
多个线程操作一个资源的情况下,导致资源数据前后不一致。这样就需要协调线程的调度,即线程同步。解决多个线程使用共同资源的方法是:线程操作资源时独占资源,其他线程不能访问资源。
线程同步常见方式:1、threading.Event;2、Semaphores:信号量,实际上也是一种锁,该锁用于限制线程的并发量;3、Condition(条件变量);4、互斥锁(Lock和RLock)。
Event内部包含了一个标志位,初始的时候为false。可以使用使用set()来将其设置为true;或者使用clear()将其从新设置为false;可以使用is_set()来检查标志位的状态;另一个最重要的函数就是wait(timeout=None),用来阻塞当前线程,直到event的内部标志位被设置为true或者timeout超时。如果内部标志位为true则wait()函数理解返回。e.is_set()初始时false,e.clear()时false。e.set设置为True。
def wait_for_event(e):
print('event通信开始等待……')
e.wait()
print('event通信时标志位是{}'.format(e.is_set()))
def wait_for_event_timeout(e,t):
print('event超时通信开始等待……')
e.wait(t)
print('event超时通信时标志位是{}'.format(e.is_set()))
if __name__ == '__main__':
e = threading.Event()
w1 = threading.Thread(name='block', target=wait_for_event,args=(e,))
w1.start()
w2 = threading.Thread(name='non-block', target=wait_for_event_timeout,
args=(e, 2))
w2.start()
time.sleep(3) #等待3秒后超时通信在2秒时就会往下继续执行,此时标志位还未开始设置仍然时false,3s后设置标志位w1是True
e.set()
print('标志位已经被设置为True')
Semaphores:信号量是一个更高级的锁机制。信号量内部有一个计数器而不像锁对象内部有锁标识,而且只有当占用信号量的线程数超过信号量时线程才阻塞。这允许了多个线程可以同时访问相同的代码区。当信号量被获取的时候,计数器减小;当信号量被释放的时候,计数器增大。当获取信号量的时候,如果计数器值为0,则该进程将阻塞。当某一信号量被释放,counter值增加为1时,被阻塞的线程(如果有的话)中会有一个得以继续运行。如果你不传任何初始化参数,计数器的值会被初始化为1.
# 信号量
sema = Semaphore(3) # 限制同时能访问资源的数量为3,设置为1时可以防止资源竞争
num = 0
def foo(tid):
sema.acquire()
# with sema:
global num
for i in range(1000000):
num +=1
print('{} acquire sema'.format(tid))
print(num)
time.sleep(1)
sema.release()
print('{} release sema'.format(tid))
threads = []
for i in range(5):
t = Thread(target=foo, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
# 注意:在多线程编程中,为了防止不同的线程同时对一个公用的资源(比如全部变量)进行修改,
# 需要进行同时访问的数量(通常是1)的限制。
Condition被称为条件变量,除了提供与Lock类似的acquire和release方法外,还提供了wait和notify方法。线程首先acquire一个条件变量,然后判断一些条件。如果条件不满足则wait;如果条件满足,进行一些处理改变条件后,通过notify方法通知其他线程,其他处于wait状态的线程接到通知后会重新判断条件。不断的重复这一过程,从而解决复杂的同步问题。
con = threading.Condition()
num = 0
# 生产者
class Producer(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
# 锁定线程
global num
con.acquire()
while True:
print ("{}开始添加!!!".format(threading.current_thread().getName()))
num += 1
print ("火锅里面鱼丸个数:%s" % str(num))
time.sleep(1)
if num >= 5:
print ("火锅里面里面鱼丸数量已经到达5个,无法添加了!")
# 唤醒等待的线程
con.notify() # 唤醒小伙伴开吃啦
# 等待通知
con.wait() #调用这个方法将使线程进入Condition的等待池等待通知,并释放锁
# 释放锁
# con.release()
# 消费者
class Consumers(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
print('等待中……')
con.acquire()
print(45555)
global num
while True:
print ("{}开始吃啦!!!".format(threading.current_thread().getName()))
num -= 1
print ("火锅里面剩余鱼丸数量:%s" %str(num))
time.sleep(2)
if num <= 0:
print ("锅底没货了,赶紧加鱼丸吧!")
con.notify() # 唤醒其它线程
# 等待通知
con.wait()
# con.release()
p = Producer()
p.start()
c = Consumers()
c.start()
# condition应用场景:线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其他线程需要通过断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。
线程之间共享全局变量,方便简单。但是坏处就是共享变量容易出现数据竞争,不是线程安全的,解决方法就是使用互斥锁。
锁主要分为Lock和RLock两种。为了支持在同一线程中多次请求同一资源,python提供了“可重入锁”:threading.RLock。RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次acquire。直到一个线程所有的acquire都被release,其他的线程才能获得资源。
threading.RLock和threading.Lock 的区别:
lock = threading.Lock() #Lock对象
lock.acquire()
lock.acquire() #产生了死琐。
lock.release()
lock.release()
rLock = threading.RLock() #RLock对象
rLock.acquire()
rLock.acquire() #在同一线程内,程序不会堵塞。
rLock.release()
rLock.release()
globals_num=0
deffunc1(lock):
global globals_num
lock.acquire()#获得锁
foriinrange(1000000):
try:
globals_num=globals_num+1
except:
pass
lock.release()#释放锁
print('线程1结果{}'.format(globals_num))
def func2(lock):
globalglobals_num
withlock:
foriinrange(1000000):
globals_num=globals_num+1
print('线程2结果{}'.format(globals_num))
if __name__ == '__main__':
lock=threading.Lock()
#lock=threading.Semaphore(1)
t1=threading.Thread(target=func1,args=(lock,))
t2 = threading.Thread(target=func2, args=(lock,))
t1.start()
t2.start()'''
结果:
线程1结果1000000
线程2结果2000000
'''
死锁:在任一时刻,一个锁对象可能被一个线程获取,或者不被任何线程获取。如果一个线程尝试去获取一个已经被另一个线程获取到的锁对象,那么这时候就会产生死锁现象。
解决死锁办法:1、对于多个线程分别要获取多个锁时,可以把多把锁按锁id进行排序,再依次获取。2、获取锁时发生死锁时,释放本身已经获取的锁对象。3、银行家算法。4、一个线程只使用一把锁(RLock),也可以实现多次上锁。
#对多把锁按锁id进行排序上锁
_local = threading.local() #每个线程开辟一个独立的空间进行数据存储@contextmanager
defacquire(flag,*locks,**kwargs):
print(flag,'开始获取锁......................')
locks = sorted(locks, key=lambda x: id(x)) #按照锁ID进行排序
acquired=getattr(_local,'acquired',[])#getattr()函数用于返回一个对象属性值。getattr(a, 'bar2', 3)#属性 bar2 不存在,但设置了默认值,返回3
if acquired and max(id(lock) for lock in acquired) >= id(locks[0]):
raise RuntimeError('Lock Order Violation')
acquired.extend(locks)
_local.acquired=acquired
try:
forlockinlocks:
lock.acquire()
iflock==acquired[0]:
print('{}获取第一把锁{}后,执行{}里面的任务代码:'.format(flag,lock,flag))
else:
print('{}获取第二把锁{}后,执行{}里面的任务代码'.format(flag,lock,flag))
yield
finally:
forlockinreversed(locks):
lock.release()
delacquired[-len(locks):]
print('{}执行完任务逻辑后释放锁'.format(flag))
x_lock=threading.Lock()
y_lock = threading.Lock()
g_num = 0
defthread_1():
globalg_num
foriinrange(1):
withacquire('thread1',y_lock,x_lock):
print('thread1获取锁并执行完任务代码......................')
g_num += 1
defthread_2():
globalg_num
foriinrange(1):
withacquire('thread2',y_lock,x_lock):
print('thread2获取锁并执行完任务代码......................')
g_num += 1
t1=threading.Thread(target=thread_1)
t1.start()
t2=threading.Thread(target=thread_2)
t2.start()'''
结果:
thread1 开始获取锁......................
thread1获取第一把锁<locked _thread.lock object at 0x10c34b3c8>后,执行thread1里面的任务代码:
thread1获取第二把锁<locked _thread.lock object at 0x10c34b8a0>后,执行thread1里面的任务代码
thread1获取锁并执行完任务代码......................
thread2 开始获取锁......................
thread1执行完任务逻辑后释放锁
thread2获取第一把锁<locked _thread.lock object at 0x10c34b3c8>后,执行thread2里面的任务代码:
thread2获取第二把锁<locked _thread.lock object at 0x10c34b8a0>后,执行thread2里面的任务代码
thread2获取锁并执行完任务代码......................
thread2执行完任务逻辑后释放锁
'''
2.4、线程池
使用线程池原因:线程池在系统启动时即创建大量空闲的线程,程序只要将一个函数提交给线程池,线程池就会启动一个空闲的线程来执行它。当该函数执行结束后,该线程并不会死亡,而是再次返回到线程池中变成空闲状态,等待执行下一个函数。
此外,使用线程池可以有效地控制系统中并发线程的数量。当系统中包含有大量的并发线程时,会导致系统性能急剧下降,甚至导致 Python 解释器崩溃,而线程池的最大线程数参数可以控制系统中并发线程的数量不超过此数。
2.4.1 内置线程池ThreadPool和threadpool方法
def sayhello (a):
print("hello: "+a)
def main():
global result
seed=["a","b","c"]
start=time.time()
task_pool = threadpool.ThreadPool(3)
requests =threadpool.makeRequests(sayhello, seed)
for ret in requests:
task_pool.putRequest(ret)
task_pool.wait()
end=time.time()
time_m = end-start
print("threadpool花费时间time: "+str(time_m))
time.sleep(1)
start2=time.time()
p = ThreadPool(3) # 直接使用内置的
for i in seed:
p.apply_async(sayhello, args=(i,))
p.close()
p.join()
end2=time.time()
print("内置线程池花费时间time2: "+str(end2-start2))
if __name__ == '__main__':
main()
2.4.2 ThreadPoolExecutor
ef sayhello(a):
print("hello: "+a)
def main():
seed=["a","b","c"]
with ThreadPoolExecutor(3) as executor:
executor.submit(sayhello,str(seed)) #submit
time.sleep(2)
with ThreadPoolExecutor(3) as executor1:
executor1.map(sayhello,seed) #map
if __name__ == '__main__':
main()
concurrent.futures.ThreadPoolExecutor,在提交任务的时候,有两种方式,一种是submit()函数,另一种是map()函数,两者的主要区别在于:
1)map可以保证输出的顺序, submit输出的顺序是乱的
2)如果你要提交的任务的函数是一样的,就可以简化成map。但是假如提交的任务函数是不一样的,或者执行的过程之可能出现异常(使用map执行过程中发现问题会直接抛出错误)就要用到submit()
3)submit和map的参数是不同的,submit每次都需要提交一个目标函数和对应的参数,map只需要提交一次目标函数,目标函数的参数放在一个迭代器(列表,字典)里就可以。
submit方法:
#参数times用来模拟网络请求的时间defget_html(times):
time.sleep(times)
print("getpage{}sfinished".format(times))
returntimes
executor=ThreadPoolExecutor(max_workers=2)
#通过submit函数提交执行的函数到线程池中,submit函数立即返回,不阻塞task1=executor.submit(get_html,(3))
task2=executor.submit(get_html,(2))
# done方法用于判定某个任务是否完成,完成返回True,未执行也返回True
print('任务是否完成{}'.format(task1.done()))
#cancel方法用于取消某个任务,该任务没有放入线程池中才能取消成功print('取消某个任务{}'.format(task2.cancel()))
time.sleep(4)
print(task1.done())
print(task2.done())
# result方法可以获取task的执行结果print('执行结果{}'.format(task1.result()))
print('='*50)
"""
说明:
ThreadPoolExecutor构造实例的时候,传入max_workers参数来设置线程池中最多能同时运行的线程数目。使用submit函数来提交线程需要执行的任务(函数名和参数)到线程池中,并返回该任务的句柄(类似于文件、画图),注意submit()不是阻塞的,而是立即返回。通过submit函数返回的任务句柄,能够使用done()方法判断该任务是否结束。上面的例子可以看出,由于任务有2s的延时,在task1提交后立刻判断,task1还未完成,而在延时4s之后判断,task1就完成了。使用cancel()方法可以取消提交的任务,如果任务已经在线程池中运行了,就取消不了。这个例子中,线程池的大小设置为2,任务已经在运行了,所以取消失败。如果改变线程池的大小为1,那么先提交的是task1,task2还在排队等候,这是时候就可以成功取消。使用result()方法可以获取任务的返回值。查看内部代码,发现这个方法是阻塞的。
"""defget_html(times):
time.sleep(times)
print("getpage{}sfinished".format(times))
return times
executor=ThreadPoolExecutor(max_workers=2)
urls=[3,2,4]#并不是真的url
all_task = [executor.submit(get_html, (url)) for url in urls] #每次都需要提交一个目标函数和对应的参数print('所有任务{}'.format(all_task))
forfutureinas_completed(all_task):#在没有任务完成的时候,会阻塞,在有某个任务完成的时候,会yield这个任务
data=future.result()
print("inmain:getpage{}ssuccess".format(data))
print('='*50)
"""说明:上面虽然提供了判断任务是否结束的方法,但是不能在主线程中一直判断。有时候我们是得知某个任务结束了,就去获取结果,而不是一直判断每个任务有没有结束。这是就可以使用as_completed方法一次取出所有任务的结果。
as_completed()方法是一个生成器,在没有任务完成的时候,会阻塞,在有某个任务完成的时候,会yield这个任务,就能执行for循环下面的语句,然后继续阻塞住,循环到所有的任务结束。从结果也可以看出,先完成的任务会先通知主线程.
"""
defget_html(times):
time.sleep(times)
print("getpage{}sfinished".format(times))
returntimes
executor=ThreadPoolExecutor(max_workers=2)
urls=[3,2,4]#并不是真的url
all_task=[executor.submit(get_html,(url))forurlinurls]
wait(all_task, timeout=3, return_when=ALL_COMPLETED)
print("main")
print('='*50)
"""
说明:
wait方法接收3个参数,等待的任务序列、超时时间以及等待条件。等待条件return_when默认为ALL_COMPLETED,表明要等待所有的任务都结束。可以看到运行结果中,确实是所有任务都完成了,主线程才打印出main。等待条件还可以设置为FIRST_COMPLETED,表示第一个任务完成就停止等待
"""
map方法:
fget_html(times):
time.sleep(times)
print("getpage{}sfinished".format(times))
return times
executor=ThreadPoolExecutor(max_workers=2)
urls=[3,2,4]#并不是真的url
for data in executor.map(get_html, urls): #函数的参数放在一个迭代器(列表,字典)里就可以
print("in main: get page {}s success".format(data)) #按照提交的顺序打印 """
说明:使用map方法,无需提前使用submit方法,map方法与python标准库中的map含义相同,都是将序列中的每个元素都执行同一个函数。上面的代码就是对urls的每个元素都执行get_html函数,并分配各线程池。可以看到执行结果与上面的as_completed方法的结果不同,输出顺序和urls列表的顺序相同,就算2s的任务先执行完成,也会先打印出3s的任务先完成,再打印2s的任务完成。
"""
合理设置工作线程数:
N核服务器,通过执行业务的单线程分析出本地计算时间为x,等待时间为y,则工作线程数(线程池线程数)设置为 N*(x+y)/x,能让CPU的利用率最大化。一般为了简化,会简单认为计算和等待时间各占50%,
1)假设此时是单核,则设置为2个工作线程就可以把CPU充分利用起来,让CPU跑到100%,
2)假设此时是N核,则设置为2N个工作现场就可以把CPU充分利用起来,让CPU跑到N*100%。
但考虑到线程任务的执行类型,要分以下几种情况:
1)高并发、任务执行时间短的业务,线程池线程数可以设置为CPU核数+1,减少线程上下文的切换
2)并发不高、任务执行时间长的业务要区分开看:
a)假如是业务时间长集中在IO操作上,也就是IO密集型的任务,因为IO操作并不占用CPU,所以不要让所有的CPU闲下来,可以加大线程池中的线程数目,让CPU处理更多的业务
b)假如是业务时间长集中在计算操作上,也就是计算密集型任务,这个就没办法了,和(1)一样,线程池中的线程数设置得少一些,减少线程上下文的切换。
2.5、Gil全局解释锁( Global Interpreter Lock)
GIL全局解释器锁。当我们使用多线程的时候,每一个进程中只有一个GIL锁,那么这多个线程中谁拿到GIL锁,谁就可以使用cpu(ps:多个进程有多个Gil锁,但每个进程中只有一个GIL),所以当python用cpython作为解释器的时候,多线程就不是真正意义上的多线程,属于伪并发的多线程。
在python中,提供的线程是内核级的,python的线程切换主要有两种方式1.一个线程当进行sleep,i/o操作时这是别的线程就有机会获得GIL,还有一种是,在py2中,当一个线程无中断的运行了1000个字节(py3中是15毫秒)那么他会被强制放弃GIL,在python3中,GIL不使用ticks计数,改为使用计时器,改善了一些,但是依然效率不尽如人意。
既然线程切换是不可控的,那么如果要实现线程安全,就得保证操作是原子性的。什么是原子操作呢,就是不会因为线程并发或者进程并发而中断操作,要么全执行,要么全不执行,执行过程中不会被终断!
互斥锁和Gil锁的关系:
Gil锁 :保证同一时刻只有一个线程能使用到cpu
互斥锁 : 多线程时,保证修改共享数据时有序的修改,不会产生数据修改混乱
-首先假设只有一个进程,这个进程中有两个线程 Thread1,Thread2, 要修改共享的数据date, 并且有互斥锁:
执行以下步骤:
1、多线程运行,假设Thread1获得GIL可以使用cpu,这时Thread1获得互斥锁lock,Thread1可以改date数据(但并没有开始修改数据);
2、Thread1线程在修改date数据前发生了 i/o操作或者 ticks计数满100((注意就是没有运行到修改data数据),这个时候 Thread1 让出了Gil,Gil锁可以被竞争);
3、Thread1 和 Thread2 开始竞争Gil (注意:如果Thread1是因为i/o 阻塞让出的Gil,Thread2必定拿到Gil,如果Thread1是因为ticks计数满100让出Gil这个时候Thread1 和 Thread2 公平竞争); 解释了线程为什么适用于io操作
4、假设 Thread2正好获得了GIL, 运行代码去修改共享数据date,由于Thread1有互斥锁lock,所以Thread2无法更改共享数据date,这时Thread2让出Gil锁, GIL锁再次发生竞争; 解释了线程上锁后为什么可以防止资源竞争
5、假设Thread1又抢到GIL,由于其有互斥锁Lock所以其可以继续修改共享数据data,当Thread1修改完数据释放互斥锁lock,Thread2在获得GIL与lock后才可对data进行修改。
2.6、使用线程原因:
既然在python中线程并不能实现真正意义上的并行,属于伪并发,那为什么还要使用线程?
1、针对于I/O操作而言,相对于进程而言线程更为使用,因为遇到I/O阻塞时,GIL全局解释锁会自动释放,让别的线程获取锁而区使用cpu,不会因一直阻塞导致cpu空闲,进程遇到阻塞时会一直阻塞下去,导致cpu空闲不会得到充分利用。
2、我们常说进程时资源分配的单位(开辟4G虚拟内存),而线程是执行的基本单元(即进程调度器能给线程分配cpu时间片)。所以针对同一任务使用多线程即使不能实现并行,但也会增大获取cpu执行任务的概率从而导致任务执行的速度更快。如果使用多进程实现同一任务,进程之间切换效率远低于线程之间切换。
2.7、进程、线程关系:
进程是系统进行资源分配和调度的一个独立单位。每个进程都有自己的独立内存空间,不同进程通过进程间通信来通信。由于进程比较重量,占据独立的内存,所以上下文进程间的切换开销(栈、寄存器、虚拟内存、文件句柄等)比较大,但相对比较稳定安全。
线程是进程的一个实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位.线程自己基本上不拥有系统资源,只拥有一点在运行中必不可少的资源(如程序计数器,一组寄存器和栈),但是它可与同属一个进程的其他的线程共享进程所拥有的全部资源。线程间通信主要通过共享内存,上下文切换很快,资源开销较少,但相比进程不够稳定容易丢失数据。
0)线程是指进程内的一个执行单元,也是进程内的可调度实体。
1) 地址空间:线程是进程内的一个执行单元,进程内至少有一个线程,它们共享进程的地址空间,而进程有自己独立的地址空间
2) 资源拥有:进程是资源分配和拥有的单位,同一个进程内的线程共享进程的资源
3) 线程是处理器调度的基本单位,但进程不是
4) 二者均可并发执行
5) 每个独立的进程有一个程序运行的入口、顺序执行序列和程序的出口,但是线程不能够独立执行,必须依存在应用程序中,由应用程序提供多个线程执行控制
协程
3.1、创建
# greenlet创建def test1(gr, g):
foriinrange(100):
print("---A--")
gr.switch(g,gr)#切换到另一个协程执行
time.sleep(0.5)
deftest2(gr,g):
foriinrange(100):
print("---B--")
gr.switch(g,gr)
time.sleep(0.5)
if __name__ == '__main__':
# 创建协程
gr1=greenlet(test1)
gr2 = greenlet(test2)
# 启动切换协程
gr1.switch(gr2, gr1) # gevent创建
monkey.patch_all() # 将程序中用到的耗时操作的代码,换为gevent中自己实现的模块,time.sleep()等效与gevent.sleep()
def coroutine_work(coroutine_name):
for i in range(10):
print('当前协程是{}'.format(coroutine_name))
time.sleep(1)
gevent.joinall([
gevent.spawn(coroutine_work, 'work1'),
gevent.spawn(coroutine_work, 'work2'),
])
3.2、通信同步
协程间的异步通讯可以使用事件(Event)对象。该对象的”wait()”方法可以阻塞当前协程,而”set()”方法可以唤醒之前阻塞的协程。
evt = Event()
def setter():
print('Wait for me')
gevent.sleep(2) # 2秒后唤醒所有在evt上等待的协程
print("Ok, I'm done")
evt.set() # 唤醒
def waiter():
print("I'll wait for you")
evt.wait() # 等待
print('Finish waiting')
gevent.joinall([
gevent.spawn(setter),
gevent.spawn(waiter),
gevent.spawn(waiter),
gevent.spawn(waiter),
])
除了Event事件外,gevent还提供了AsyncResult事件,它可以在唤醒时传递消息。
evt = AsyncResult()
def setter():
print('Wait for me')
gevent.sleep(2) # 2秒后唤醒所有在evt上等待的协程
print("Ok, I'm done")
evt.set() # 唤醒
def waiter():
print("I'll wait for you")
evt.wait() # 等待
print('Finishwaiting')
gevent.joinall([
gevent.spawn(setter),
gevent.spawn(waiter),
gevent.spawn(waiter),
gevent.spawn(waiter),
gevent的队列对象Queue可以实现协程之间安全的访问。
q = Queue()
def consumer(name):
while not q.empty():
print('%s got product %s' % (name, q.get()))
gevent.sleep(0)
def producer():
for i in range(1, 4):
q.put(i)
gevent.joinall([
gevent.spawn(producer),
gevent.spawn(consumer, 'steve'),
gevent.spawn(consumer, 'john'),
gevent.spawn(consumer, 'nancy'),
])
信号量可以用来限制协程并发的个数。它有两个方法,acquire和release。顾名思义,acquire就是获取信号量,而release就是释放。当所有信号量都已被获取,那剩余的协程就只能等待任一协程释放信号量后才能得以运行。
sem = BoundedSemaphore(2)
def worker(n):
sem.acquire()
print('Worker %i acquired semaphore' % n)
gevent.sleep(1)
sem.release()
print('Worker %i released semaphore' % n)
gevent.joinall([gevent.spawn(worker, i) for i in range(0, 4)])
协程本地变量,同线程类似,协程也有本地变量,也就是只在当前协程内可被访问的变量。
data=local()
def f1():
data.x = 1
print('协程1:{}'.format(data.x))
def f2():
data.x = 2
try:
print('协程2:{}'.format(data.x))
except AttributeError:
print('xisnotvisible')
gevent.joinall([
gevent.spawn(f1),
gevent.spawn(f2)
])
3.3、线程、协程关系
1) 线程进程都是同步机制,而协程则是异步
2) 协程能保留上一次调用时的状态,每次过程重入时,就相当于进入上一次调用的状态
3) 协程是用户级的任务调度,线程是内核级的任务调度。 协程需要操作员单独写调度逻辑,对CPU来说,协程也就是单线程,因此CPU 不需要考虑怎么调度、切换上下文,省去了CPU开销,所以,协程又在一定程度上好于多线程。
4) 线程是被动调度的,协程是主动调度的。
协程是一种编程组件,可以在不陷入内核的情况进行上下文切换,而进程、线程的上下文切换都需要和内核进行交互,协程是在用户态保存寄存器状态的,也就是协程的上下文切换发生在内存的用户空间而不是内核空间。
线程和协同程序的主要不同在于:在多处理器情况下,从概念上来讲多线程程序同时运行多个线程;而协同程序是通过协作来完成,在任一指定时刻只有一个协同程序在运行,并且这个正在运行的协同程序只在必要时才会被挂起。
协程适合对某任务进行分时协同处理,线程适合多任务同时处理。
缺点:
无法利用多核资源:协程的本质是个单线程,它不能同时将单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上。最简单的方法是多进程+协程,既充分利用多核,又充分发挥协程的高效率,可获得极高的性能。当然我们日常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用