Python中进程、线程与协程的详细解析是怎样的?

2026-05-26 12:441阅读0评论SEO资源
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计10941个文字,预计阅读时间需要44分钟。

Python中进程、线程与协程的详细解析是怎样的?

1、概念【注意】公众微信号轻松学编程了解更多。回复协议获取获取本文字源代码。从计算机硬件角度:计算机的核心CPU承担了所有计算任务。一个CPU在某一时刻只能处理一个任务。


1、概念

从计算机硬件角度:

计算机的核心是CPU,承担了所有的计算任务。

一个CPU,在一个时间切片里只能运行一个程序。

图1. 操作系统

1.1 进程

进程:是CPU对程序的一次执行过程、一次执行任务。各个进程有自己的内存空间、数据栈等。操作系统分配内存的基本单位(打开、执行、保存…)

1.2 线程

线程:是进程中执行运算的最小单位,是进程中的一个实体。(打开、执行、保存…)

一个程序至少有一个进程,一个进程至少有一个线程。

操作系统分配CPU的基本单位

1.3 协程

协程:比线程更小的执行单元,又称微线程,在单线程上执行多个任务,自带CPU上下文

用函数切换,开销极小。不通过操作系统调度,

没有进程、线程的切换开销。(gevent,monkey.patchall)

举例

我们假设把一个进程比作我们实际生活中的一个拉面馆,负责保持拉面馆运行的服务员就是线程,每个餐桌代表要完成的任务。

当我们用多线程完成任务时,模式是这样的:每来一桌的客人,就在那张桌子上安排一个服务员,即有多少桌客人就得对应多少个服务员;

而当我们用协程来完成任务时,模式却有所不同了: 就安排一个服务员,来吃饭得有一个点餐和等菜的过程,当A在点菜,就去B服务,B叫了菜在等待,我就去C,当C也在等菜并且A点菜点完了,赶紧到A来服务… …依次类推。

从上面的例子可以看出,想要使用协程,那么我们的任务必须有等待。当我们要完成的任务有耗时任务,属于IO密集型任务时,我们使用协程来执行任务会节省很多的资源(一个服务员和多个服务员的区别,并且可以极大的利用到系统的资源。

1.4 线程安全

多线程环境中,共享数据同一时间只能有一个线程来操作。

1.5 原子操作

原子操作就是不会因为进程并发或者线程并发而导致被中断的操作。

1.6 并行和并发

串行:单个CPU核心,按顺序执行

并行:多个CPU核心,不同的程序就分配给不同的CPU来运行。可以让多个程序同时执行。(多进程)

并发:单个CPU核心,在一个时间切片里一次只能运行一个程序,如果需要运行多个程序,则串行执行,遇到IO阻塞就切换,即计算机在逻辑上能处理多任务的能力。(多进程,多线程)

1.7 多进程/多线程

表示可以同时执行多个任务,进程和线程的调度是由操作系统自动完成。

进程:每个进程都有自己独立的内存空间,不同进程之间的内存空间不共享。

线程:一个进程可以有多个线程,所有线程共享进程的内存空间,通讯效率高,切换开销小。共享意味着竞争,导致数据不安全,为了保护内存空间的数据安全,引入"互斥锁",“递归锁”,“升序锁”等。

1.8 Python的多线程:

GIL:Global Interpreter Lock, 全局解释器锁,线程的执行权限,在Python的进程里只有一个GIL。

一个线程需要执行任务,必须获取GIL。

​好处​:直接杜绝了多个线程访问内存空间的安全问题。

​坏处​:Python的多线程不是真正多线程,不能充分利用多核CPU的资源。

但是,在I/O阻塞的时候,解释器会释放GIL。

1.9 同步、异步、阻塞、非阻塞

​异步​,异步本质上是单线程的,因为 IO 操作在很多时候会存在阻塞,异步就是在这种阻塞的时候,通过控制权的交换来实现多任务的。即异步本质上是运行过程中的控制权的交换。最典型的例子就是生产者消费者模型。

​同步​,即程序协同进行,遇到阻塞就等待,直到任务完成为止。

2.0 运用场景

​多进程​:密集CPU任务,需要充分使用多核CPU资源(服务器,大量的并行计算)的时候,用多进程。 multiprocessing

缺陷:多个进程之间通信成本高,切换开销大。

​多线程​:密集I/O任务(网络I/O,磁盘I/O,数据库I/O)使用多线程合适。

threading.Thread、multiprocessing.dummy

缺陷:同一个时间切片只能运行一个线程,不能做到高并行,但是可以做到高并发。

​协程​:又称微线程(一种用户态的轻量级线程),在单线程上执行多个任务,用函数切换,由程序自身控制,开销极小。

不通过操作系统调度,没有进程、线程的切换开销。

每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置,不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多。

当程序中存在大量不需要CPU的操作时(IO),遇到IO操作自动切换到其它协程。

​greenlet​, ​gevent​,​monkey.patchall​,​yield​,​async​

多线程请求返回是无序的,哪个线程有数据返回就处理哪个线程,而协程返回的数据是有序的。

因为协程是一个线程执行,所以想要利用多核CPU,最简单的方法是多进程+协程,这样既充分利用多核,又充分发挥协程的高效率。

缺陷:单线程执行,处理密集CPU和本地磁盘IO的时候,性能较低。处理网络I/O性能还是比较高.

2.1 互斥锁、递归锁、升序锁

Python的GIL只能保证原子操作的线程安全,因此在多线程编程时我们需要通过加锁来保证线程安全。

最简单的锁是​互斥锁(同步锁)​,互斥锁是用来解决IO密集型场景产生的计算错误,即目的是为了保护共享的数据,同一时间只能有一个线程来修改共享的数据。

​递归锁​:就是在一个大锁中再包含子锁

​升序锁​:两个线程想获取到的锁,都被对方线程拿到了,那么我们只需要保证在这两个线程中,获取锁的顺序保持一致就可以了。举个例子,我们有线程thread_a, thread_b, 锁lock_1, lock_2。只要我们规定好了锁的使用顺序,比如先用lock_1,再用lock_2,当线程thread_a获得lock_1时,其他线程如thread_b就无法获得lock_1这个锁,也就无法进行下一步操作(获得lock_2这个锁),也就不会导致互相等待导致的死锁。简言之,解决死锁问题的一种方案是为程序中的每一个锁分配一个唯一的id,然后只允许按照升序规则来使用多个锁,这个规则使用上下文管理器 是非常容易实现的。

2.2 代码

创建进程

# encoding: utf-8
'''
@contact: 1257309054@qq.com
@wechat: 1257309054
@Software: PyCharm
@file: 创建进程.py
@time: 2020/3/3 12:22
@author:LDC
'''

import multiprocessing
import time


def func(arg):
pname = multiprocessing.current_process().name # 获取当前进程名称
pid = multiprocessing.current_process().pid # 获取当前进程id
print("当前进程ID=%d,name=%s" % (pid, pname))

for i in range(5):
print(pname, pid, arg)
time.sleep(1)

pass


if __name__ == "__main__":
pname = multiprocessing.current_process().name
pid = multiprocessing.current_process().pid
print("当前进程ID=%d,name=%s" % (pid, pname))

p = multiprocessing.Process(target=func, name='我是子进程', args=("hello",))
p.daemon = True # 设为(随主进程的结束而结束)
p.start()

while True:
print("子进程是否活着?", p.is_alive())
# 知道进程名称,可以通过以下方式检验
processing = list(filter(lambda mp: mp.name == '我是子进程', multiprocessing.active_children()))
if processing:
print('这个进程还活着')
if not p.is_alive():
break
time.sleep(1)
pass

print("main over")

​进程间的通讯​

每个进程都拥有自己的内存空间,因此不同进程间内存是不共享的,要想实现两个进程间的数据交换,有几种常用的方法:

​Queue(队列)​:

# encoding: utf-8
'''
@contact: 1257309054@qq.com
@wechat: 1257309054
@Software: PyCharm
@file: 进程间的通讯.py
@time: 2020/3/3 15:13
@author:LDC
'''

from multiprocessing import Process, Queue, current_process

def start(q):
pname = current_process().name
pid = current_process().pid
print('当前进程是{}_{}'.format(pid, pname))
# 从队列中取出数据,先判断队列 是否为空
if not q.empty():
print(q.get())
# 存数据进队列
q.put('hello from {}_{}'.format(pid, pname))


if __name__ == '__main__':
q = Queue()
p_list = []
for i in range(0, 2):
p = Process(target=start, args=(q,))
p.start()
p_list.append(p)
# 确保所有进程执行完
for p in p_list:
p.join()

​Manager​(实现了进程间真正的数据共享):

# encoding: utf-8
'''
@contact: 1257309054@qq.com
@wechat: 1257309054
@Software: PyCharm
@file: 进程间的通讯(manager).py
@time: 2020/3/3 15:54
@author:LDC
'''

from multiprocessing import Process, Manager, current_process


def start(m_dict, m_list):
pname = current_process().name
pid = current_process().pid
print('当前进程是{}_{}'.format(pid, pname))
print(m_dict)
m_dict[pid] = pname
m_list.append(pid)



if __name__ == '__main__':
manager = Manager()
m_dict = manager.dict() # 通过manager生成一个字典
m_list = manager.list() # 通过manager生成一个列表
p_list = []
for i in range(10):
p = Process(target=start, args=(m_dict, m_list))
p.start()
p_list.append(p)
for res in p_list:
res.join()

print(m_dict)
print(m_list)

​进程池(多进程)​

进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止。

进程池中有两个方法:

1、​apply​(同步)

2、​apply_async​(异步)

# encoding: utf-8
'''
@contact: 1257309054@qq.com
@wechat: 1257309054
@Software: PyCharm
@file: 进程池.py
@time: 2020/3/3 16:05
@author:LDC
'''

from multiprocessing import Pool, current_process
import time


def Foo(i):
pid = current_process().pid
pname = current_process().name
time.sleep(1)
print('hello','{},{}_{}'.format(i, pid, pname))
return '{},{}_{}'.format(i, pid, pname)


def Bar(arg):
print('number::', arg)


if __name__ == "__main__":
pool = Pool(3) # 定义一个进程池,里面有3个进程
for i in range(10):
# 使用异步线程时,需要定义一个回调函数,当执行完后把结果传给回调函数
pool.apply_async(func=Foo, args=(i,), callback=Bar)
# pool.apply(func=Foo, args=(i,))

pool.close() # 关闭进程池
pool.join() # 进程池中进程执行完毕后再关闭,(必须先close在join)

输出:

hello 0,12776_SpawnPoolWorker-1
number:: 0,12776_SpawnPoolWorker-1
hello 1,8832_SpawnPoolWorker-2
hello 2,4704_SpawnPoolWorker-3
number:: 1,8832_SpawnPoolWorker-2
number:: 2,4704_SpawnPoolWorker-3
hello 5,8832_SpawnPoolWorker-2
hello 4,4704_SpawnPoolWorker-3
number:: 5,8832_SpawnPoolWorker-2
number:: 4,4704_SpawnPoolWorker-3
hello 3,12776_SpawnPoolWorker-1
number:: 3,12776_SpawnPoolWorker-1
hello 6,8832_SpawnPoolWorker-2
hello 7,4704_SpawnPoolWorker-3
number:: 6,8832_SpawnPoolWorker-2
number:: 7,4704_SpawnPoolWorker-3
hello 8,12776_SpawnPoolWorker-1
number:: 8,12776_SpawnPoolWorker-1
hello 9,4704_SpawnPoolWorker-3
number:: 9,4704_SpawnPoolWorker-3

callback是​回调函数​,就是在执行完Foo方法后会自动执行Bar函数,并且自动把Foo函数的返回值作为参数传入Bar函数.

​多进程​:

# encoding: utf-8
'''
@contact: 1257309054@qq.com
@wechat: 1257309054
@Software: PyCharm
@file: 多进程.py
@time: 2020/3/3 16:19
@author:LDC
'''
import time
from concurrent import futures
from multiprocessing import current_process


def foo(i):
pid = current_process().pid
pname = current_process().name
time.sleep(3)
print('hello', '{},{}_{}'.format(i, pid, pname))
return '{},{}_{}'.format(i, pid, pname)


if __name__ == '__main__':
i_list = [1, 2, 3, 4, 5]
with futures.ProcessPoolExecutor(5) as executor:

res = executor.map(foo, i_list)
# to_do = [executor.submit(foo, item) for item in i_list]
# ret = [future.result() for future in futures.as_completed(to_do)]

​注意​:​map​可以保证输出的顺序, ​submit​输出的顺序是乱的

如果你要提交的任务的函数是一样的,就可以简化成map。但是假如提交的任务函数是不一样的,或者执行的过程之可能出现异常(使用map执行过程中发现问题会直接抛出错误)就要用到submit()

submit和map的参数是不同的,submit每次都需要提交一个目标函数和对应的参数,map只需要提交一次目标函数,目标函数的参数放在一个迭代器(列表,字典)里就可以。

​线程​

方法:
  start 线程准备就绪,等待CPU调度
  setName 设置线程名称
  getName 获取线程名称
  setDaemon 把一个主进程设置为Daemon线程后,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论有没执行完成,都会停止
  join 逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义  
  run 线程被cpu调度后自动执行线程对象的run方法# encoding: utf-8
'''
@contact: 1257309054@qq.com
@wechat: 1257309054
@Software: PyCharm
@file: 创建线程(常用).py
@time: 2020/3/3 16:49
@author:LDC
'''

import threading
import time

'''直接调用'''


def foo(name):
time.sleep(name)
# 获取当前线程名称与标志号
print(threading.currentThread().name, threading.currentThread().ident)
print("Hello %s" % name)



if __name__ == "__main__":
t_list = []
# 创建了两个线程
for i in range(2):
t = threading.Thread(target=foo, args=(i+2,), name='t_{}'.format(i)) # 生成线程实例
# t.setDaemon(True) # True表示子线程设置为守护线程,主线程死去,子线程也跟着死去,不管是否执行完
# t.setDaemon(False) # False表示子线程设置为非守护线程,主线程死去,子线程依然在执行
t_list.append(t)
t.start()
# 知道线程名,可以通过以下方式检验
threading_list = list(filter(lambda th: th.getName() == 't_1', threading.enumerate()))
if threading_list:
print('这个线程还活着')
# for t in t_list:
# t.join() # 等待子线程执行完,
# print(t.getName()) # 获取线程名

​多线程​

# 可以使用for循环创建多个线程
for i in range3):
t = threading.Thread(target=foo, args=(i+2,), name='t_{}'.format(i))
t.start()

​线程池​

为什么要使用线程池?

Python中进程、线程与协程的详细解析是怎样的?

对于任务数量不断增加的程序,每有一个任务就生成一个线程,最终会导致线程数量的失控,例如,整站爬虫,假设初始只有一个链接a,那么,这个时候只启动一个线程,运行之后,得到这个链接对应页面上的b,c,d,,,等等新的链接,作为新任务,这个时候,就要为这些新的链接生成新的线程,线程数量暴涨。在之后的运行中,线程数量还会不停的增加,完全无法控制。所以,对于任务数量不端增加的程序,固定线程数量的线程池是必要的。

# encoding: utf-8
'''
@contact: 1257309054@qq.com
@wechat: 1257309054
@Software: PyCharm
@file: 线程池.py
@time: 2020/3/3 20:36
@author:LDC
'''
import time
from concurrent.futures import ThreadPoolExecutor


# 任务
def doSth(args):

print("hello", args)
# time.sleep(2)


if __name__ == '__main__':
# max_workers 线程数
argsList = (1, 2, 3, 4, 5, 6)
# 使用sumbit()函数提交任务
with ThreadPoolExecutor(5) as exe:
for a in argsList:
exe.submit(doSth, a)
# 使用map()函数提交任务
print("使用map()提交任务")
with ThreadPoolExecutor(5) as exe:
exe.map(doSth, argsList)

​线程冲突​

当多个线程同时访问同一个变量时就可能造成线程冲突

# encoding: utf-8
'''
@contact: 1257309054@qq.com
@wechat: 1257309054
@Software: PyCharm
@file: 线程冲突.py
@time: 2020/3/3 20:57
@author:LDC
'''

import threading

money = 0


def add_money():
global money
for i in range(10000000):
money += 1


if __name__ == '__main__':
add_money()
add_money()
print('调用两次函数money实际值为:', money)
money = 0
t_list = []
for i in range(2):
t = threading.Thread(target=add_money)
t.start()
t_list.append(t)
for t in t_list:
t.join()
print('使用线程后money实际值为:', money)

输出:

调用两次函数money实际值为: 20000000
使用线程后money实际值为: 11656269

​原因​:当对全局资源存在写操作时,如果不能保证写入过程的原子性,会出现脏读脏写的情况,即线程不安全。Python的GIL只能保证原子操作的线程安全,因此在多线程编程时我们需要通过加锁来保证线程安全。

最简单的锁是​互斥锁(同步锁)​,互斥锁是用来解决IO密集型场景产生的计算错误,即目的是为了保护共享的数据,同一时间只能有一个线程来修改共享的数据。

​互斥锁​

# encoding: utf-8
'''
@contact: 1257309054@qq.com
@wechat: 1257309054
@Software: PyCharm
@file: 互斥锁.py
@time: 2020/3/3 21:08
@author:LDC
'''

import threading

lock = threading.Lock()
money = 0

def add_money():
global money
with lock:
for i in range(10000000):
money += 1

if __name__ == '__main__':
add_money()
add_money()
print('调用两次函数money实际值为:', money)
money = 0
t_list = []
for i in range(2):
t = threading.Thread(target=add_money)
t.start()
t_list.append(t)
for t in t_list:
t.join()
print('使用线程后money实际值为:', money)

锁适用于访问和修改同一个资源的时候,引起资源争用的情况下。

使用锁的注意事项:

1. 少用锁,除非有必要。多线程访问加锁的资源时,由于锁的存在,实际就变成了串行。
2. 加锁时间越短越好,不需要就立即释放锁。
3. 一定要避免死锁,使用with或者try...finally。

第一种死锁:​迭代死锁​

该情况是一个线程“迭代”请求同一个资源,直接就会造成死锁。这种死锁产生的原因是我们标准互斥锁threading.Lock的缺点导致的。

​标准的锁对象(threading.Lock)并不关心当前是哪个线程占有了该锁;如果该锁已经被占有了,那么任何其它尝试获取该锁的线程都会被阻塞,包括已经占有该锁的线程也会被阻塞。​

比如:

# encoding: utf-8
'''
@contact: 1257309054@qq.com
@wechat: 1257309054
@Software: PyCharm
@file: 迭代死锁.py
@time: 2020/3/3 22:10
@author:LDC
'''

import threading
import time

count_list = [0, 0]
lock = threading.Lock()


def change_0():
global count_list
with lock:
tmp = count_list[0]
time.sleep(0.001)
count_list[0] = tmp + 1
time.sleep(2)
print("Done. count_list[0]:%s" % count_list[0])


def change_1():
global count_list
with lock:
tmp = count_list[1]
time.sleep(0.001)
count_list[1] = tmp + 1
time.sleep(2)
print("Done. count_list[1]:%s" % count_list[1])


def change():
with lock:
change_0()
time.sleep(0.001)
change_1()


def verify(sub):
global count_list
thread_list = []
for i in range(5):
t = threading.Thread(target=sub, args=())
t.start()
thread_list.append(t)
for j in thread_list:
j.join()
print(count_list)


if __name__ == "__main__":
verify(change)

示例中,我们有一个共享资源count_list,有两个分别取这个共享资源第一部分和第二部分的数字(count_list[0]和count_list[1])。两个访问函数都使用了锁来确保在获取数据时没有其它线程修改对应的共享数据。

现在,如果我们思考如何添加第三个函数来获取两个部分的数据。一个简单的方法是依次调用这两个函数,然后返回结合的结果。

这里的问题是,如有某个线程在两个函数调用之间修改了共享资源,那么我们最终会得到不一致的数据。

最明显的解决方法是在这个函数中也使用lock。然而,这是不可行的。里面的两个访问函数将会阻塞,因为外层语句已经占有了该锁。

结果是没有任何输出,死锁。

为了解决这个问题,我们可以用递归锁代替互斥锁。

​递归锁​

就是在一个​大锁中再包含子锁​。它相当于一个字典,记录了锁的门与锁的对应值,当开门的时候会根据对应钥匙来开锁。

# encoding: utf-8
'''
@contact: 1257309054@qq.com
@wechat: 1257309054
@Software: PyCharm
@file: 迭代死锁.py
@time: 2020/3/3 22:10
@author:LDC
'''

import threading
import time

count_list = [0, 0]
lock = threading.RLock() # 递归锁


def change_0():
global count_list
with lock: # 小锁
tmp = count_list[0]
time.sleep(0.001)
count_list[0] = tmp + 1
time.sleep(2)
print("Done. count_list[0]:%s" % count_list[0])


def change_1():
global count_list
with lock: # 小锁
tmp = count_list[1]
time.sleep(0.001)
count_list[1] = tmp + 1
time.sleep(2)
print("Done. count_list[1]:%s" % count_list[1])


def change():
with lock: # 大锁
change_0()
time.sleep(0.001)
change_1()


def verify(sub):
global count_list
thread_list = []
for i in range(5):
t = threading.Thread(target=sub, args=())
t.start()
thread_list.append(t)
for j in thread_list:
j.join()
print(count_list)


if __name__ == "__main__":
verify(change)

第二种死锁:​互相等待死锁​

死锁的另外一个原因是两个进程想要获得的锁已经被对方进程获得,只能互相等待又无法释放已经获得的锁,而导致死锁。假设银行系统中,用户a试图转账100块给用户b,与此同时用户b试图转账500块给用户a,则可能产生死锁。

2个线程互相等待对方的锁,互相占用着资源不释放。

下面是一个互相调用导致死锁的例子:

# encoding: utf-8
'''
@contact: 1257309054@qq.com
@wechat: 1257309054
@Software: PyCharm
@file: 互相等待死锁.py
@time: 2020/3/3 22:29
@author:LDC
'''
import threading
import time


class Account(object):
def __init__(self, name, balance, lock):
self.name = name
self.balance = balance
self.lock = lock

def withdraw(self, amount):
# 转账
self.balance -= amount

def deposit(self, amount):
# 存款
self.balance += amount


def transfer(from_account, to_account, amount):
# 转账操作
with from_account.lock:
from_account.withdraw(amount)
time.sleep(1)
print("trying to get %s's lock..." % to_account.name)
with to_account.lock:
to_account.deposit(amount)
print("transfer finish")


if __name__ == "__main__":
a = Account('a', 1000, threading.RLock())
b = Account('b', 1000, threading.RLock())
thread_list = []
thread_list.append(threading.Thread(target=transfer, args=(a, b, 100)))
thread_list.append(threading.Thread(target=transfer, args=(b, a, 500)))
for i in thread_list:
i.start()
for j in thread_list:
j.join()

最终结果是死锁:

trying to get account a's lock...
trying to get account b's lock...

即我们的问题是:

你正在写一个多线程程序,其中线程需要一次获取多个锁,此时如何避免死锁问题。

解决方案:

在多线程程序中,死锁问题很大一部分是由于线程同时获取多个锁造成的。

举个例子:一个线程获取了第一个锁,然后在获取第二个锁的时候发生阻塞,那么这个线程就可能阻塞其他线程的执行,从而导致整个程序假死。

其实解决这个问题,核心思想是:目前我们遇到的问题是两个线程想获取到的锁,都被对方线程拿到了,那么我们只需要保证在这两个线程中,获取锁的顺序保持一致就可以了。举个例子,我们有线程thread_a, thread_b, 锁lock_1, lock_2。只要我们规定好了锁的使用顺序,比如先用lock_1,再用lock_2,当线程thread_a获得lock_1时,其他线程如thread_b就无法获得lock_1这个锁,也就无法进行下一步操作(获得lock_2这个锁),也就不会互相等待导致的死锁。

简言之,解决死锁问题的一种方案是为程序中的每一个锁分配一个唯一的id,然后只允许按照升序规则来使用多个锁,这个规则使用上下文管理器 是非常容易实现的,

​升序锁​

# encoding: utf-8
'''
@contact: 1257309054@qq.com
@wechat: 1257309054
@Software: PyCharm
@file: 升序锁.py
@time: 2020/3/3 22:39
@author:LDC
'''

import threading
import time
from contextlib import contextmanager

thread_local = threading.local()


@contextmanager
def acquire(*locks):
# sort locks by object identifier
# 根据对象标识符对锁进行排序
locks = sorted(locks, key=lambda x: id(x))

# make sure lock order of previously acquired locks is not violated
# 确保没有违反先前获取的锁的顺序
acquired = getattr(thread_local, 'acquired', [])
if acquired and (max(id(lock) for lock in acquired) >= id(locks[0])):
raise RuntimeError('Lock Order Violation')

# Acquire all the locks
# 获取所有锁
acquired.extend(locks)
thread_local.acquired = acquired

try:
for lock in locks:
lock.acquire()
yield
finally:
for lock in reversed(locks):
lock.release()
del acquired[-len(locks):]


class Account(object):
def __init__(self, name, balance, lock):
self.name = name
self.balance = balance
self.lock = lock

def withdraw(self, amount):
self.balance -= amount

def deposit(self, amount):
self.balance += amount


def transfer(from_account, to_account, amount):
print("%s transfer..." % amount)
with acquire(from_account.lock, to_account.lock):
from_account.withdraw(amount)
time.sleep(1)
to_account.deposit(amount)
print("%s transfer... %s:%s ,%s: %s" % (
amount, from_account.name, from_account.balance, to_account.name, to_account.balance))
print("transfer finish")


if __name__ == "__main__":
a = Account('a', 1000, threading.Lock())
b = Account('b', 1000, threading.Lock())

thread_list = []
thread_list.append(threading.Thread(target=transfer, args=(a, b, 100)))
thread_list.append(threading.Thread(target=transfer, args=(b, a, 500)))
for i in thread_list:
i.start()
for j in thread_list:
j.join()

运行结果:

transfer...
transfer...
transfer... a:900 ,b:1100
transfer finish
transfer... b:600, a:1400
transfer finish

成功的避免了互相等待导致的死锁问题。

在上述代码中,有几点语法需要解释:

  • 装饰器@contextmanager是用来让我们能用with语句调用锁的,从而简化锁的获取和释放过程。关于with语句,大家可以参考浅谈 Python 的 with 语句(www.ibm.com/developerworks/cn/opensource/os-cn-pythonwith/)。简言之,with语句在调用时,先执行 enter()方法,然后执行with结构体内的语句,最后执行exit()语句。有了装饰器@contextmanager. 生成器函数中 yield 之前的语句在 enter() 方法中执行,yield 之后的语句在 exit() 中执行,而 yield 产生的值赋给了 as 子句中的 value 变量。
  • try和finally语句中实现的是锁的获取和释放。
  • try之前的语句,实现的是对锁的排序,以及锁排序是否被破坏的判断。
  • ​线程同步(Event)​

    线程之间经常需要协同工作,通过某种技术,让一个线程访问某些数据时,其它线程不能访问这些数据,直到该线程完成对数据的操作。这些技术包括​临界区(Critical Section)​,​互斥量(Mutex)​,​信号量(Semaphore)​,​事件Event​等。

    ​互斥锁、递归锁、升序锁是实现线程同步的一个方法,其它的还有semaphore 信号量机制,event 事件机制。​

    semaphore 信号量机制在python 里面也很简单就能够实现线程的同步。

    如果对操作系统有一定的了解, 那么对操作系统的PV原语操作应该有印象, 信号量其实就是基于这个机制的.

    ​semaphore​ 类是threading 模块下的一个类, 主要两个函数: acquire 函数, release 函数这和lock 类的函数是一样的, 只不过功能不一样, semaphore 机制的acquire 函数的参数允许你自己设置最大的并发量, 就是说允许多少个线程来操作同一个函数或是变量, 同时执行一次就会递减一次, release 函数则是递增, 如果计数到了0, 则阻塞起线程, 不再允许线程访问该方法或是变量.

    # encoding: utf-8
    '''
    @contact: 1257309054@qq.com
    @wechat: 1257309054
    @Software: PyCharm
    @file: 信号量实现线程同步.py
    @time: 2020/3/4 22:30
    @author:LDC
    '''
    # python 多线程同步 semaphore
    import threading
    import time

    # 初始化信号量数量...当调用acquire 将数量置为 0, 将阻塞线程等待其他线程调用release() 函数
    semaphore = threading.Semaphore(2)


    def func():
    if semaphore.acquire():
    for i in range(2):
    print(threading.currentThread().getName() + ' get semaphore')
    time.sleep(1)
    semaphore.release()
    print(threading.currentThread().getName() + ' release semaphore')


    if __name__ == '__main__':
    for i in range(4):
    t1 = threading.Thread(target=func)
    t1.start()

    输出

    Thread-1 get semaphore
    Thread-2 get semaphore
    Thread-1 get semaphore
    Thread-2 get semaphore
    Thread-1 release semaphore
    Thread-3 get semaphore
    Thread-2 release semaphore
    Thread-4 get semaphore
    Thread-3 get semaphore
    Thread-4 get semaphore
    Thread-3 release semaphore
    Thread-4 release semaphore

    可以看到主体函数一次只允许两个线程访问。

    ​event 机制​不仅能够实现线程间的通信, 也是实现线程同步的一个好方法。

    事件是线程之间通信的最简单的机制之一, 一个线程指示一个事件和其他线程等待它.

    event.py 是threading 模块下的一个类,

    相比较前面两个机制, 这个类提供了四个方法, 分别是:

    is_set() 函数, set() 函数, clear() 函数, wait() 函数. threading库中的event对象通过使用内部一个flag标记,通过flag的True或者False的变化来进行操作。

    名称

    含义

    set( )

    标记设置为True

    clear( )

    标记设置为False

    is_set( )

    标记是否为True

    wait(timeout=None)

    设置等待标记为True的时长,None为无限等待。等到返回True,等不到返回False

    # encoding: utf-8
    '''
    @contact: 1257309054@qq.com
    @wechat: 1257309054
    @Software: PyCharm
    @file: 事件机制实现线程同步.py
    @time: 2020/3/4 22:46
    @author:LDC
    '''

    import logging
    import threading
    import time


    # 打印线程名以及日志信息
    logging.basicConfig(level=logging.DEBUG, format="(%(threadName)-10s : %(message)s", )


    def wait_for_event_timeout(e, t):
    """Wait t seconds and then timeout"""
    while not e.isSet():
    logging.debug("wait_for_event_timeout starting")
    event_is_set = e.wait(t) # 阻塞, 等待设置为true
    logging.debug("event set: %s" % event_is_set)
    if event_is_set:
    logging.debug("processing event")
    else:
    logging.debug("doing other work")


    e = threading.Event() # 初始化为false
    t2 = threading.Thread(name="nonblock", target=wait_for_event_timeout, args=(e, 2))
    t2.start()
    logging.debug("Waiting before calling Event.set()")
    # time.sleep(7)
    e.set() # 唤醒线程, 同时将event 设置为true
    logging.debug("Event is set")

    输出:

    ​协程​

    ​协程,是单线程下的并发​,又称微线程,英文名Coroutine。是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的。协程能保留上一次调用时的状态,每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置,当程序中存在大量不需要CPU的操作时(IO),适用于协程。

    协程有极高的执行效率,因为子程序切换不是线程切换,而是由程序自身控制,因此,没有线程切换的开销。

    不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多。

    因为协程是一个线程执行,所以想要利用多核CPU,最简单的方法是多进程+协程,这样既充分利用多核,又充分发挥协程的高效率。

    那符合什么条件就能称之为协程:

    1、必须在只有一个单线程里实现并发

    2、修改共享数据不需加锁

    3、用户程序里自己保存多个控制流的上下文栈

    4、一个协程遇到IO操作自动切换到其它协程

    python中对于协程有四个模块,​greenlet、gevent、yield和async来实现切换+保存线程​

    ​通过yield来实现任务切换+保存线程​

    # encoding: utf-8
    '''
    @contact: 1257309054@qq.com
    @wechat: 1257309054
    @Software: PyCharm
    @file: yield_test.py
    @time: 2020/3/5 15:39
    @author:LDC
    '''
    import time


    def func1():
    while True:
    print('func1')
    yield '返回func1'


    def func2():
    g = func1()
    print(next(g))
    for i in range(3):
    print(next(g))
    time.sleep(3)
    print('func2')


    if __name__ == '__main__':
    start = time.time()
    func2()
    stop = time.time()
    print(stop - start)

    yield不能节省IO时间,只是单纯的进行程序切换

    # 基于yield并发执行,多任务之间来回切换,这就是个简单的协程的体现,但是他能够节省I/O时间吗?不能
    import time


    def consumer():
    '''任务1:接收数据,处理数据'''
    while True:
    x = yield
    time.sleep(1) # 发现什么?只是进行了切换,但是并没有节省I/O时间
    print('处理了数据:', x)


    def producer():
    '''任务2:生产数据'''
    g = consumer()
    next(g) # 找到了consumer函数的yield位置
    for i in range(3):
    g.send(i) # 给yield传值,然后再循环给下一个yield传值,并且多了切换的程序,比直接串行执行还多了一些步骤,导致执行效率反而更低了。
    print('发送了数据:', i)


    if __name__ == '__main__':
    start = time.time()
    # 基于yield保存状态,实现两个任务直接来回切换,即并发的效果
    # PS:如果每个任务中都加上打印,那么明显地看到两个任务的打印是你一次我一次,即并发执行的.
    producer() # 我在当前线程中只执行了这个函数,但是通过这个函数里面的send切换了另外一个任务
    stop = time.time()
    print(stop - start)
    # 串行执行的方式
    # start = time.time()
    # res = producer()
    # consumer()
    # stop = time.time()
    # print(stop - start)

    yield检测不到IO,无法实现遇到IO自动切换。

    ​greenlet是手动切换​

    # encoding: utf-8
    '''
    @contact: 1257309054@qq.com
    @wechat: 1257309054
    @Software: PyCharm
    @file: greenlet创建协程.py
    @time: 2020/3/4 23:08
    @author:LDC
    '''

    '''
    使用greenlet + switch实现协程调度
    '''
    from greenlet import greenlet
    import time


    def func1():
    print("开门走进卫生间")
    time.sleep(3)
    gr2.switch() # 把CPU执行权交给gr2

    print("飞流直下三千尺")
    time.sleep(3)
    gr2.switch()
    pass


    def func2():
    print("一看拖把放旁边")
    time.sleep(3)
    gr1.switch()

    print("疑是银河落九天")
    pass


    if __name__ == '__main__':
    gr1 = greenlet(func1)
    gr2 = greenlet(func2)
    gr1.switch() # 把CPU执行权先给gr1
    pass

    greenlet只是提供了一种比yield(生成器)更加便捷的切换方式,当切到一个任务执行时如果遇到IO,那就原地阻塞,仍然是没有解决遇到IO自动切换来提升效率的问题。

    ​Gevent实现自动切换协程(多协程​)

    协程的本质就是在单线程下,由用户自己控制一个任务遇到io阻塞了就切换另外一个任务去执行,以此来提升效率。

    一般在工作中我们都是进程+线程+协程的方式来实现并发,以达到最好的并发效果。

    如果是4核的CPU,一般起5个进程,每个进程中20个线程(5倍CPU数量),每个线程可以起500个协程,大规模爬取页面的时候,等待网络延迟的时间的时候,我们就可以用协程去实现并发。并发数量=5​20​500从而达到5000个并发,这是一般一个4个CPU的机器最大的并发数。nginx在负载均衡的时候最大承载量是5w个。

    单线程里的这20个任务的代码通常既有计算操作又有阻塞操作,我们完全可以在执行任务1时遇到阻塞,就利用阻塞的时间去执行任务2。。。如此,才能提高效率,这就用到了Gevent模块。

    ​Gevent(自动切换,由于切换是在IO操作时自动完成,所以gevent需要修改Python自带的一些标准库,这一过程在启动时通过monkey patch完成)。​

    # encoding: utf-8
    '''
    @contact: 1257309054@qq.com
    @wechat: 1257309054
    @Software: PyCharm
    @file: monkey_patch.py
    @time: 2020/3/4 23:40
    @author:LDC
    '''

    '''
    使用gevent + monkey.patch_all()自动调度网络IO协程
    '''
    from gevent import monkey;

    monkey.patch_all() # 将替换为

    4、关键字:async/await

    ​asyncio​ 是用来编写 ​并发​ 代码的库,使用 ​async/await​ 语法。

    asyncio 被用作多个提供高性能 Python 异步框架的基础,包括网络和网站服务,数据库连接库,分布式任务队列等等。

    ​asyncio 往往是构建 IO 密集型和高层级 结构化 网络代码的最佳选择。​

    ​正常的函数在执行时是不会中断的,所以你要写一个能够中断的函数,就需要添加async关键。​

    ​async 用来声明一个函数为异步函数​,异步函数的特点是能在函数执行过程中挂起,去执行其他异步函数,等到挂起条件(假设挂起条件是asyncio.sleep(5))消失后,也就是5秒到了再回来执行。

    ​await 用来用来声明程序挂起​,比如异步程序执行到某一步时需要等待的时间很长,就将此挂起,去执行其他的异步程序。await 后面只能跟异步程序或有await属性的对象,因为异步程序与一般程序不同。假设有两个异步函数async a,async b,a中的某一步有await,当程序碰到关键字await b()后,异步程序挂起后去执行另一个异步b程序,就是从函数内部跳出去执行其他函数,当挂起条件消失后,不管b是否执行完,要马上从b程序中跳出来,回到原程序执行原来的操作。如果await后面跟的b函数不是异步函数,那么操作就只能等b执行完再返回,无法在b执行的过程中返回。如果要在b执行完才返回,也就不需要用await关键字了,直接调用b函数就行。所以这就需要await后面跟的是异步函数了。在一个异步函数中,可以不止一次挂起,也就是可以用多个await。

    ​可以使用async、await来实现协程的并发​,下面以一个爬虫例子来说明:

    # encoding: utf-8
    '''
    @contact: 1257309054@qq.com
    @wechat: 1257309054
    @Software: PyCharm
    @file: async_await.py
    @time: 2020/3/5 23:15
    @author:LDC
    '''
    from gevent import monkey;

    monkey.patch_all()
    import gevent

    import asyncio
    from functools import wraps, partial
    import time

    import requests


    # 定义一个查看函数执行时间的装饰器
    def func_use_time(func):
    @wraps(func)
    def inside(*arg, **kwargs):
    start = time.clock()
    res = func(*arg, **kwargs)
    print('***************执行时间*****************', time.clock() - start)
    return res

    return inside


    def get_page_text(url):
    # 爬取网站
    print(url)
    try:
    headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.88 Safari/537.36"
    }
    resp = requests.get(url, headers=headers) # 发起网络请求,返回需要时间——阻塞IO

    html = resp.text
    return html
    except Exception as e:
    print('{}发生错误,{}'.format(url, e))
    return ''


    class Narmal():
    # 正常爬取
    def __init__(self, urls):
    self.urls = urls
    self.res_dict = {}

    @func_use_time
    def run(self):
    for url in self.urls:
    res = get_page_text(url)
    self.res_dict[url] = len(res)
    print('串行获取结果', self.res_dict)


    class UseAsyncio():
    # 使用async实现协程并发
    def __init__(self, urls):
    self.urls = urls
    self.res_dict = {}

    # 定义一个异步函数,执行爬取任务
    async def task(self, url):
    print(url)
    try:
    headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.88 Safari/537.36"
    }
    # 利用BaseEventLoop.run_in_executor()可以在coroutine中执行第三方的命令,例如requests.get()
    # 第三方命令的参数与关键字利用functools.partial传入
    future = asyncio.get_event_loop().run_in_executor(None, partial(requests.get, url, headers=headers))
    resp = await future
    html = resp.text
    self.res_dict[url] = len(html)
    return html
    except Exception as e:
    print('{}发生错误,{}'.format(url, e))
    return ''

    @func_use_time
    def run(self):
    loop = asyncio.get_event_loop()
    tasks = [asyncio.ensure_future(self.task(url)) for url in self.urls]
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()
    # 获取async结果
    # for task in tasks:
    # print(task.result())
    print('async获取结果', self.res_dict)


    class UseGevent():
    # 使用Gevent实现协程并发
    def __init__(self, urls):
    self.urls = urls
    self.res_dict = {}

    def task(self, url):
    res = get_page_text(url)
    self.res_dict[url] = len(res)

    @func_use_time
    def run(self):
    gevent.joinall([gevent.spawn(self.task, url) for url in self.urls])
    print(self.res_dict)


    if __name__ == '__main__':
    urls = ["www.sina.com", "www.qq.com", "www.baidu.com",
    "www.163.com", "www.4399.com", "www.sohu.com",
    "www.youku.com",
    ]
    print("使用正常爬取方式,即串行")
    Narmal(urls).run()
    print("使用Asyncio爬取方式,async实现协程并发")
    UseAsyncio(urls).run()
    print("使用Gevent爬取方式,实现协程并发")
    UseGevent(urls).run()

    输出:

    使用正常爬取方式,即串行
    www.sina.com
    www.qq.com
    www.baidu.com
    www.163.com
    www.4399.com
    www.sohu.com
    www.youku.com
    串行获取结果 {'www.sina.com': 539723, 'www.qq.com': 227753, 'www.baidu.com': 166916, 'www.163.com': 483531, 'www.4399.com': 172837, 'www.sohu.com': 178312, 'www.youku.com': 990760}
    ***************执行时间***************** 1.9352532
    使用Asyncio爬取方式,async实现协程并发
    www.sina.com
    www.qq.com
    www.baidu.com
    www.163.com
    www.4399.com
    www.sohu.com
    www.youku.com
    async获取结果 {'www.4399.com': 172837, 'www.163.com': 483531, 'www.qq.com': 227753, 'www.sohu.com': 178310, 'www.baidu.com': 166625, 'www.sina.com': 539723, 'www.youku.com': 1047892}
    ***************执行时间***************** 0.951011
    使用Gevent爬取方式,实现协程并发
    www.sina.com
    www.qq.com
    www.baidu.com
    www.163.com
    www.4399.com
    www.sohu.com
    www.youku.com
    {'www.163.com': 483531, 'www.4399.com': 172837, 'www.sohu.com': 178312, 'www.qq.com': 227753, 'www.baidu.com': 166760, 'www.sina.com': 539723, 'www.youku.com': 994926}
    ***************执行时间***************** 1.0057508

    相对来说还是使用​async执行效率高些​。

    后记

    为了让大家能够轻松学编程,我创建了一个公众号,里面有让你快速学会编程的文章,当然也有一些干货提高你的编程水平,也有一些编程项目适合做一些课程设计等课题。


    如果文章对您有帮助,请我喝杯咖啡吧!

    公众号


    关注我,我们一起成长~~



    本文共计10941个文字,预计阅读时间需要44分钟。

    Python中进程、线程与协程的详细解析是怎样的?

    1、概念【注意】公众微信号轻松学编程了解更多。回复协议获取获取本文字源代码。从计算机硬件角度:计算机的核心CPU承担了所有计算任务。一个CPU在某一时刻只能处理一个任务。


    1、概念

    从计算机硬件角度:

    计算机的核心是CPU,承担了所有的计算任务。

    一个CPU,在一个时间切片里只能运行一个程序。

    图1. 操作系统

    1.1 进程

    进程:是CPU对程序的一次执行过程、一次执行任务。各个进程有自己的内存空间、数据栈等。操作系统分配内存的基本单位(打开、执行、保存…)

    1.2 线程

    线程:是进程中执行运算的最小单位,是进程中的一个实体。(打开、执行、保存…)

    一个程序至少有一个进程,一个进程至少有一个线程。

    操作系统分配CPU的基本单位

    1.3 协程

    协程:比线程更小的执行单元,又称微线程,在单线程上执行多个任务,自带CPU上下文

    用函数切换,开销极小。不通过操作系统调度,

    没有进程、线程的切换开销。(gevent,monkey.patchall)

    举例

    我们假设把一个进程比作我们实际生活中的一个拉面馆,负责保持拉面馆运行的服务员就是线程,每个餐桌代表要完成的任务。

    当我们用多线程完成任务时,模式是这样的:每来一桌的客人,就在那张桌子上安排一个服务员,即有多少桌客人就得对应多少个服务员;

    而当我们用协程来完成任务时,模式却有所不同了: 就安排一个服务员,来吃饭得有一个点餐和等菜的过程,当A在点菜,就去B服务,B叫了菜在等待,我就去C,当C也在等菜并且A点菜点完了,赶紧到A来服务… …依次类推。

    从上面的例子可以看出,想要使用协程,那么我们的任务必须有等待。当我们要完成的任务有耗时任务,属于IO密集型任务时,我们使用协程来执行任务会节省很多的资源(一个服务员和多个服务员的区别,并且可以极大的利用到系统的资源。

    1.4 线程安全

    多线程环境中,共享数据同一时间只能有一个线程来操作。

    1.5 原子操作

    原子操作就是不会因为进程并发或者线程并发而导致被中断的操作。

    1.6 并行和并发

    串行:单个CPU核心,按顺序执行

    并行:多个CPU核心,不同的程序就分配给不同的CPU来运行。可以让多个程序同时执行。(多进程)

    并发:单个CPU核心,在一个时间切片里一次只能运行一个程序,如果需要运行多个程序,则串行执行,遇到IO阻塞就切换,即计算机在逻辑上能处理多任务的能力。(多进程,多线程)

    1.7 多进程/多线程

    表示可以同时执行多个任务,进程和线程的调度是由操作系统自动完成。

    进程:每个进程都有自己独立的内存空间,不同进程之间的内存空间不共享。

    线程:一个进程可以有多个线程,所有线程共享进程的内存空间,通讯效率高,切换开销小。共享意味着竞争,导致数据不安全,为了保护内存空间的数据安全,引入"互斥锁",“递归锁”,“升序锁”等。

    1.8 Python的多线程:

    GIL:Global Interpreter Lock, 全局解释器锁,线程的执行权限,在Python的进程里只有一个GIL。

    一个线程需要执行任务,必须获取GIL。

    ​好处​:直接杜绝了多个线程访问内存空间的安全问题。

    ​坏处​:Python的多线程不是真正多线程,不能充分利用多核CPU的资源。

    但是,在I/O阻塞的时候,解释器会释放GIL。

    1.9 同步、异步、阻塞、非阻塞

    ​异步​,异步本质上是单线程的,因为 IO 操作在很多时候会存在阻塞,异步就是在这种阻塞的时候,通过控制权的交换来实现多任务的。即异步本质上是运行过程中的控制权的交换。最典型的例子就是生产者消费者模型。

    ​同步​,即程序协同进行,遇到阻塞就等待,直到任务完成为止。

    2.0 运用场景

    ​多进程​:密集CPU任务,需要充分使用多核CPU资源(服务器,大量的并行计算)的时候,用多进程。 multiprocessing

    缺陷:多个进程之间通信成本高,切换开销大。

    ​多线程​:密集I/O任务(网络I/O,磁盘I/O,数据库I/O)使用多线程合适。

    threading.Thread、multiprocessing.dummy

    缺陷:同一个时间切片只能运行一个线程,不能做到高并行,但是可以做到高并发。

    ​协程​:又称微线程(一种用户态的轻量级线程),在单线程上执行多个任务,用函数切换,由程序自身控制,开销极小。

    不通过操作系统调度,没有进程、线程的切换开销。

    每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置,不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多。

    当程序中存在大量不需要CPU的操作时(IO),遇到IO操作自动切换到其它协程。

    ​greenlet​, ​gevent​,​monkey.patchall​,​yield​,​async​

    多线程请求返回是无序的,哪个线程有数据返回就处理哪个线程,而协程返回的数据是有序的。

    因为协程是一个线程执行,所以想要利用多核CPU,最简单的方法是多进程+协程,这样既充分利用多核,又充分发挥协程的高效率。

    缺陷:单线程执行,处理密集CPU和本地磁盘IO的时候,性能较低。处理网络I/O性能还是比较高.

    2.1 互斥锁、递归锁、升序锁

    Python的GIL只能保证原子操作的线程安全,因此在多线程编程时我们需要通过加锁来保证线程安全。

    最简单的锁是​互斥锁(同步锁)​,互斥锁是用来解决IO密集型场景产生的计算错误,即目的是为了保护共享的数据,同一时间只能有一个线程来修改共享的数据。

    ​递归锁​:就是在一个大锁中再包含子锁

    ​升序锁​:两个线程想获取到的锁,都被对方线程拿到了,那么我们只需要保证在这两个线程中,获取锁的顺序保持一致就可以了。举个例子,我们有线程thread_a, thread_b, 锁lock_1, lock_2。只要我们规定好了锁的使用顺序,比如先用lock_1,再用lock_2,当线程thread_a获得lock_1时,其他线程如thread_b就无法获得lock_1这个锁,也就无法进行下一步操作(获得lock_2这个锁),也就不会导致互相等待导致的死锁。简言之,解决死锁问题的一种方案是为程序中的每一个锁分配一个唯一的id,然后只允许按照升序规则来使用多个锁,这个规则使用上下文管理器 是非常容易实现的。

    2.2 代码

    创建进程

    # encoding: utf-8
    '''
    @contact: 1257309054@qq.com
    @wechat: 1257309054
    @Software: PyCharm
    @file: 创建进程.py
    @time: 2020/3/3 12:22
    @author:LDC
    '''

    import multiprocessing
    import time


    def func(arg):
    pname = multiprocessing.current_process().name # 获取当前进程名称
    pid = multiprocessing.current_process().pid # 获取当前进程id
    print("当前进程ID=%d,name=%s" % (pid, pname))

    for i in range(5):
    print(pname, pid, arg)
    time.sleep(1)

    pass


    if __name__ == "__main__":
    pname = multiprocessing.current_process().name
    pid = multiprocessing.current_process().pid
    print("当前进程ID=%d,name=%s" % (pid, pname))

    p = multiprocessing.Process(target=func, name='我是子进程', args=("hello",))
    p.daemon = True # 设为(随主进程的结束而结束)
    p.start()

    while True:
    print("子进程是否活着?", p.is_alive())
    # 知道进程名称,可以通过以下方式检验
    processing = list(filter(lambda mp: mp.name == '我是子进程', multiprocessing.active_children()))
    if processing:
    print('这个进程还活着')
    if not p.is_alive():
    break
    time.sleep(1)
    pass

    print("main over")

    ​进程间的通讯​

    每个进程都拥有自己的内存空间,因此不同进程间内存是不共享的,要想实现两个进程间的数据交换,有几种常用的方法:

    ​Queue(队列)​:

    # encoding: utf-8
    '''
    @contact: 1257309054@qq.com
    @wechat: 1257309054
    @Software: PyCharm
    @file: 进程间的通讯.py
    @time: 2020/3/3 15:13
    @author:LDC
    '''

    from multiprocessing import Process, Queue, current_process

    def start(q):
    pname = current_process().name
    pid = current_process().pid
    print('当前进程是{}_{}'.format(pid, pname))
    # 从队列中取出数据,先判断队列 是否为空
    if not q.empty():
    print(q.get())
    # 存数据进队列
    q.put('hello from {}_{}'.format(pid, pname))


    if __name__ == '__main__':
    q = Queue()
    p_list = []
    for i in range(0, 2):
    p = Process(target=start, args=(q,))
    p.start()
    p_list.append(p)
    # 确保所有进程执行完
    for p in p_list:
    p.join()

    ​Manager​(实现了进程间真正的数据共享):

    # encoding: utf-8
    '''
    @contact: 1257309054@qq.com
    @wechat: 1257309054
    @Software: PyCharm
    @file: 进程间的通讯(manager).py
    @time: 2020/3/3 15:54
    @author:LDC
    '''

    from multiprocessing import Process, Manager, current_process


    def start(m_dict, m_list):
    pname = current_process().name
    pid = current_process().pid
    print('当前进程是{}_{}'.format(pid, pname))
    print(m_dict)
    m_dict[pid] = pname
    m_list.append(pid)



    if __name__ == '__main__':
    manager = Manager()
    m_dict = manager.dict() # 通过manager生成一个字典
    m_list = manager.list() # 通过manager生成一个列表
    p_list = []
    for i in range(10):
    p = Process(target=start, args=(m_dict, m_list))
    p.start()
    p_list.append(p)
    for res in p_list:
    res.join()

    print(m_dict)
    print(m_list)

    ​进程池(多进程)​

    进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进程,那么程序就会等待,直到进程池中有可用进程为止。

    进程池中有两个方法:

    1、​apply​(同步)

    2、​apply_async​(异步)

    # encoding: utf-8
    '''
    @contact: 1257309054@qq.com
    @wechat: 1257309054
    @Software: PyCharm
    @file: 进程池.py
    @time: 2020/3/3 16:05
    @author:LDC
    '''

    from multiprocessing import Pool, current_process
    import time


    def Foo(i):
    pid = current_process().pid
    pname = current_process().name
    time.sleep(1)
    print('hello','{},{}_{}'.format(i, pid, pname))
    return '{},{}_{}'.format(i, pid, pname)


    def Bar(arg):
    print('number::', arg)


    if __name__ == "__main__":
    pool = Pool(3) # 定义一个进程池,里面有3个进程
    for i in range(10):
    # 使用异步线程时,需要定义一个回调函数,当执行完后把结果传给回调函数
    pool.apply_async(func=Foo, args=(i,), callback=Bar)
    # pool.apply(func=Foo, args=(i,))

    pool.close() # 关闭进程池
    pool.join() # 进程池中进程执行完毕后再关闭,(必须先close在join)

    输出:

    hello 0,12776_SpawnPoolWorker-1
    number:: 0,12776_SpawnPoolWorker-1
    hello 1,8832_SpawnPoolWorker-2
    hello 2,4704_SpawnPoolWorker-3
    number:: 1,8832_SpawnPoolWorker-2
    number:: 2,4704_SpawnPoolWorker-3
    hello 5,8832_SpawnPoolWorker-2
    hello 4,4704_SpawnPoolWorker-3
    number:: 5,8832_SpawnPoolWorker-2
    number:: 4,4704_SpawnPoolWorker-3
    hello 3,12776_SpawnPoolWorker-1
    number:: 3,12776_SpawnPoolWorker-1
    hello 6,8832_SpawnPoolWorker-2
    hello 7,4704_SpawnPoolWorker-3
    number:: 6,8832_SpawnPoolWorker-2
    number:: 7,4704_SpawnPoolWorker-3
    hello 8,12776_SpawnPoolWorker-1
    number:: 8,12776_SpawnPoolWorker-1
    hello 9,4704_SpawnPoolWorker-3
    number:: 9,4704_SpawnPoolWorker-3

    callback是​回调函数​,就是在执行完Foo方法后会自动执行Bar函数,并且自动把Foo函数的返回值作为参数传入Bar函数.

    ​多进程​:

    # encoding: utf-8
    '''
    @contact: 1257309054@qq.com
    @wechat: 1257309054
    @Software: PyCharm
    @file: 多进程.py
    @time: 2020/3/3 16:19
    @author:LDC
    '''
    import time
    from concurrent import futures
    from multiprocessing import current_process


    def foo(i):
    pid = current_process().pid
    pname = current_process().name
    time.sleep(3)
    print('hello', '{},{}_{}'.format(i, pid, pname))
    return '{},{}_{}'.format(i, pid, pname)


    if __name__ == '__main__':
    i_list = [1, 2, 3, 4, 5]
    with futures.ProcessPoolExecutor(5) as executor:

    res = executor.map(foo, i_list)
    # to_do = [executor.submit(foo, item) for item in i_list]
    # ret = [future.result() for future in futures.as_completed(to_do)]

    ​注意​:​map​可以保证输出的顺序, ​submit​输出的顺序是乱的

    如果你要提交的任务的函数是一样的,就可以简化成map。但是假如提交的任务函数是不一样的,或者执行的过程之可能出现异常(使用map执行过程中发现问题会直接抛出错误)就要用到submit()

    submit和map的参数是不同的,submit每次都需要提交一个目标函数和对应的参数,map只需要提交一次目标函数,目标函数的参数放在一个迭代器(列表,字典)里就可以。

    ​线程​

    方法:
      start 线程准备就绪,等待CPU调度
      setName 设置线程名称
      getName 获取线程名称
      setDaemon 把一个主进程设置为Daemon线程后,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论有没执行完成,都会停止
      join 逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义  
      run 线程被cpu调度后自动执行线程对象的run方法# encoding: utf-8
    '''
    @contact: 1257309054@qq.com
    @wechat: 1257309054
    @Software: PyCharm
    @file: 创建线程(常用).py
    @time: 2020/3/3 16:49
    @author:LDC
    '''

    import threading
    import time

    '''直接调用'''


    def foo(name):
    time.sleep(name)
    # 获取当前线程名称与标志号
    print(threading.currentThread().name, threading.currentThread().ident)
    print("Hello %s" % name)



    if __name__ == "__main__":
    t_list = []
    # 创建了两个线程
    for i in range(2):
    t = threading.Thread(target=foo, args=(i+2,), name='t_{}'.format(i)) # 生成线程实例
    # t.setDaemon(True) # True表示子线程设置为守护线程,主线程死去,子线程也跟着死去,不管是否执行完
    # t.setDaemon(False) # False表示子线程设置为非守护线程,主线程死去,子线程依然在执行
    t_list.append(t)
    t.start()
    # 知道线程名,可以通过以下方式检验
    threading_list = list(filter(lambda th: th.getName() == 't_1', threading.enumerate()))
    if threading_list:
    print('这个线程还活着')
    # for t in t_list:
    # t.join() # 等待子线程执行完,
    # print(t.getName()) # 获取线程名

    ​多线程​

    # 可以使用for循环创建多个线程
    for i in range3):
    t = threading.Thread(target=foo, args=(i+2,), name='t_{}'.format(i))
    t.start()

    ​线程池​

    为什么要使用线程池?

    Python中进程、线程与协程的详细解析是怎样的?

    对于任务数量不断增加的程序,每有一个任务就生成一个线程,最终会导致线程数量的失控,例如,整站爬虫,假设初始只有一个链接a,那么,这个时候只启动一个线程,运行之后,得到这个链接对应页面上的b,c,d,,,等等新的链接,作为新任务,这个时候,就要为这些新的链接生成新的线程,线程数量暴涨。在之后的运行中,线程数量还会不停的增加,完全无法控制。所以,对于任务数量不端增加的程序,固定线程数量的线程池是必要的。

    # encoding: utf-8
    '''
    @contact: 1257309054@qq.com
    @wechat: 1257309054
    @Software: PyCharm
    @file: 线程池.py
    @time: 2020/3/3 20:36
    @author:LDC
    '''
    import time
    from concurrent.futures import ThreadPoolExecutor


    # 任务
    def doSth(args):

    print("hello", args)
    # time.sleep(2)


    if __name__ == '__main__':
    # max_workers 线程数
    argsList = (1, 2, 3, 4, 5, 6)
    # 使用sumbit()函数提交任务
    with ThreadPoolExecutor(5) as exe:
    for a in argsList:
    exe.submit(doSth, a)
    # 使用map()函数提交任务
    print("使用map()提交任务")
    with ThreadPoolExecutor(5) as exe:
    exe.map(doSth, argsList)

    ​线程冲突​

    当多个线程同时访问同一个变量时就可能造成线程冲突

    # encoding: utf-8
    '''
    @contact: 1257309054@qq.com
    @wechat: 1257309054
    @Software: PyCharm
    @file: 线程冲突.py
    @time: 2020/3/3 20:57
    @author:LDC
    '''

    import threading

    money = 0


    def add_money():
    global money
    for i in range(10000000):
    money += 1


    if __name__ == '__main__':
    add_money()
    add_money()
    print('调用两次函数money实际值为:', money)
    money = 0
    t_list = []
    for i in range(2):
    t = threading.Thread(target=add_money)
    t.start()
    t_list.append(t)
    for t in t_list:
    t.join()
    print('使用线程后money实际值为:', money)

    输出:

    调用两次函数money实际值为: 20000000
    使用线程后money实际值为: 11656269

    ​原因​:当对全局资源存在写操作时,如果不能保证写入过程的原子性,会出现脏读脏写的情况,即线程不安全。Python的GIL只能保证原子操作的线程安全,因此在多线程编程时我们需要通过加锁来保证线程安全。

    最简单的锁是​互斥锁(同步锁)​,互斥锁是用来解决IO密集型场景产生的计算错误,即目的是为了保护共享的数据,同一时间只能有一个线程来修改共享的数据。

    ​互斥锁​

    # encoding: utf-8
    '''
    @contact: 1257309054@qq.com
    @wechat: 1257309054
    @Software: PyCharm
    @file: 互斥锁.py
    @time: 2020/3/3 21:08
    @author:LDC
    '''

    import threading

    lock = threading.Lock()
    money = 0

    def add_money():
    global money
    with lock:
    for i in range(10000000):
    money += 1

    if __name__ == '__main__':
    add_money()
    add_money()
    print('调用两次函数money实际值为:', money)
    money = 0
    t_list = []
    for i in range(2):
    t = threading.Thread(target=add_money)
    t.start()
    t_list.append(t)
    for t in t_list:
    t.join()
    print('使用线程后money实际值为:', money)

    锁适用于访问和修改同一个资源的时候,引起资源争用的情况下。

    使用锁的注意事项:

    1. 少用锁,除非有必要。多线程访问加锁的资源时,由于锁的存在,实际就变成了串行。
    2. 加锁时间越短越好,不需要就立即释放锁。
    3. 一定要避免死锁,使用with或者try...finally。

    第一种死锁:​迭代死锁​

    该情况是一个线程“迭代”请求同一个资源,直接就会造成死锁。这种死锁产生的原因是我们标准互斥锁threading.Lock的缺点导致的。

    ​标准的锁对象(threading.Lock)并不关心当前是哪个线程占有了该锁;如果该锁已经被占有了,那么任何其它尝试获取该锁的线程都会被阻塞,包括已经占有该锁的线程也会被阻塞。​

    比如:

    # encoding: utf-8
    '''
    @contact: 1257309054@qq.com
    @wechat: 1257309054
    @Software: PyCharm
    @file: 迭代死锁.py
    @time: 2020/3/3 22:10
    @author:LDC
    '''

    import threading
    import time

    count_list = [0, 0]
    lock = threading.Lock()


    def change_0():
    global count_list
    with lock:
    tmp = count_list[0]
    time.sleep(0.001)
    count_list[0] = tmp + 1
    time.sleep(2)
    print("Done. count_list[0]:%s" % count_list[0])


    def change_1():
    global count_list
    with lock:
    tmp = count_list[1]
    time.sleep(0.001)
    count_list[1] = tmp + 1
    time.sleep(2)
    print("Done. count_list[1]:%s" % count_list[1])


    def change():
    with lock:
    change_0()
    time.sleep(0.001)
    change_1()


    def verify(sub):
    global count_list
    thread_list = []
    for i in range(5):
    t = threading.Thread(target=sub, args=())
    t.start()
    thread_list.append(t)
    for j in thread_list:
    j.join()
    print(count_list)


    if __name__ == "__main__":
    verify(change)

    示例中,我们有一个共享资源count_list,有两个分别取这个共享资源第一部分和第二部分的数字(count_list[0]和count_list[1])。两个访问函数都使用了锁来确保在获取数据时没有其它线程修改对应的共享数据。

    现在,如果我们思考如何添加第三个函数来获取两个部分的数据。一个简单的方法是依次调用这两个函数,然后返回结合的结果。

    这里的问题是,如有某个线程在两个函数调用之间修改了共享资源,那么我们最终会得到不一致的数据。

    最明显的解决方法是在这个函数中也使用lock。然而,这是不可行的。里面的两个访问函数将会阻塞,因为外层语句已经占有了该锁。

    结果是没有任何输出,死锁。

    为了解决这个问题,我们可以用递归锁代替互斥锁。

    ​递归锁​

    就是在一个​大锁中再包含子锁​。它相当于一个字典,记录了锁的门与锁的对应值,当开门的时候会根据对应钥匙来开锁。

    # encoding: utf-8
    '''
    @contact: 1257309054@qq.com
    @wechat: 1257309054
    @Software: PyCharm
    @file: 迭代死锁.py
    @time: 2020/3/3 22:10
    @author:LDC
    '''

    import threading
    import time

    count_list = [0, 0]
    lock = threading.RLock() # 递归锁


    def change_0():
    global count_list
    with lock: # 小锁
    tmp = count_list[0]
    time.sleep(0.001)
    count_list[0] = tmp + 1
    time.sleep(2)
    print("Done. count_list[0]:%s" % count_list[0])


    def change_1():
    global count_list
    with lock: # 小锁
    tmp = count_list[1]
    time.sleep(0.001)
    count_list[1] = tmp + 1
    time.sleep(2)
    print("Done. count_list[1]:%s" % count_list[1])


    def change():
    with lock: # 大锁
    change_0()
    time.sleep(0.001)
    change_1()


    def verify(sub):
    global count_list
    thread_list = []
    for i in range(5):
    t = threading.Thread(target=sub, args=())
    t.start()
    thread_list.append(t)
    for j in thread_list:
    j.join()
    print(count_list)


    if __name__ == "__main__":
    verify(change)

    第二种死锁:​互相等待死锁​

    死锁的另外一个原因是两个进程想要获得的锁已经被对方进程获得,只能互相等待又无法释放已经获得的锁,而导致死锁。假设银行系统中,用户a试图转账100块给用户b,与此同时用户b试图转账500块给用户a,则可能产生死锁。

    2个线程互相等待对方的锁,互相占用着资源不释放。

    下面是一个互相调用导致死锁的例子:

    # encoding: utf-8
    '''
    @contact: 1257309054@qq.com
    @wechat: 1257309054
    @Software: PyCharm
    @file: 互相等待死锁.py
    @time: 2020/3/3 22:29
    @author:LDC
    '''
    import threading
    import time


    class Account(object):
    def __init__(self, name, balance, lock):
    self.name = name
    self.balance = balance
    self.lock = lock

    def withdraw(self, amount):
    # 转账
    self.balance -= amount

    def deposit(self, amount):
    # 存款
    self.balance += amount


    def transfer(from_account, to_account, amount):
    # 转账操作
    with from_account.lock:
    from_account.withdraw(amount)
    time.sleep(1)
    print("trying to get %s's lock..." % to_account.name)
    with to_account.lock:
    to_account.deposit(amount)
    print("transfer finish")


    if __name__ == "__main__":
    a = Account('a', 1000, threading.RLock())
    b = Account('b', 1000, threading.RLock())
    thread_list = []
    thread_list.append(threading.Thread(target=transfer, args=(a, b, 100)))
    thread_list.append(threading.Thread(target=transfer, args=(b, a, 500)))
    for i in thread_list:
    i.start()
    for j in thread_list:
    j.join()

    最终结果是死锁:

    trying to get account a's lock...
    trying to get account b's lock...

    即我们的问题是:

    你正在写一个多线程程序,其中线程需要一次获取多个锁,此时如何避免死锁问题。

    解决方案:

    在多线程程序中,死锁问题很大一部分是由于线程同时获取多个锁造成的。

    举个例子:一个线程获取了第一个锁,然后在获取第二个锁的时候发生阻塞,那么这个线程就可能阻塞其他线程的执行,从而导致整个程序假死。

    其实解决这个问题,核心思想是:目前我们遇到的问题是两个线程想获取到的锁,都被对方线程拿到了,那么我们只需要保证在这两个线程中,获取锁的顺序保持一致就可以了。举个例子,我们有线程thread_a, thread_b, 锁lock_1, lock_2。只要我们规定好了锁的使用顺序,比如先用lock_1,再用lock_2,当线程thread_a获得lock_1时,其他线程如thread_b就无法获得lock_1这个锁,也就无法进行下一步操作(获得lock_2这个锁),也就不会互相等待导致的死锁。

    简言之,解决死锁问题的一种方案是为程序中的每一个锁分配一个唯一的id,然后只允许按照升序规则来使用多个锁,这个规则使用上下文管理器 是非常容易实现的,

    ​升序锁​

    # encoding: utf-8
    '''
    @contact: 1257309054@qq.com
    @wechat: 1257309054
    @Software: PyCharm
    @file: 升序锁.py
    @time: 2020/3/3 22:39
    @author:LDC
    '''

    import threading
    import time
    from contextlib import contextmanager

    thread_local = threading.local()


    @contextmanager
    def acquire(*locks):
    # sort locks by object identifier
    # 根据对象标识符对锁进行排序
    locks = sorted(locks, key=lambda x: id(x))

    # make sure lock order of previously acquired locks is not violated
    # 确保没有违反先前获取的锁的顺序
    acquired = getattr(thread_local, 'acquired', [])
    if acquired and (max(id(lock) for lock in acquired) >= id(locks[0])):
    raise RuntimeError('Lock Order Violation')

    # Acquire all the locks
    # 获取所有锁
    acquired.extend(locks)
    thread_local.acquired = acquired

    try:
    for lock in locks:
    lock.acquire()
    yield
    finally:
    for lock in reversed(locks):
    lock.release()
    del acquired[-len(locks):]


    class Account(object):
    def __init__(self, name, balance, lock):
    self.name = name
    self.balance = balance
    self.lock = lock

    def withdraw(self, amount):
    self.balance -= amount

    def deposit(self, amount):
    self.balance += amount


    def transfer(from_account, to_account, amount):
    print("%s transfer..." % amount)
    with acquire(from_account.lock, to_account.lock):
    from_account.withdraw(amount)
    time.sleep(1)
    to_account.deposit(amount)
    print("%s transfer... %s:%s ,%s: %s" % (
    amount, from_account.name, from_account.balance, to_account.name, to_account.balance))
    print("transfer finish")


    if __name__ == "__main__":
    a = Account('a', 1000, threading.Lock())
    b = Account('b', 1000, threading.Lock())

    thread_list = []
    thread_list.append(threading.Thread(target=transfer, args=(a, b, 100)))
    thread_list.append(threading.Thread(target=transfer, args=(b, a, 500)))
    for i in thread_list:
    i.start()
    for j in thread_list:
    j.join()

    运行结果:

    transfer...
    transfer...
    transfer... a:900 ,b:1100
    transfer finish
    transfer... b:600, a:1400
    transfer finish

    成功的避免了互相等待导致的死锁问题。

    在上述代码中,有几点语法需要解释:

  • 装饰器@contextmanager是用来让我们能用with语句调用锁的,从而简化锁的获取和释放过程。关于with语句,大家可以参考浅谈 Python 的 with 语句(www.ibm.com/developerworks/cn/opensource/os-cn-pythonwith/)。简言之,with语句在调用时,先执行 enter()方法,然后执行with结构体内的语句,最后执行exit()语句。有了装饰器@contextmanager. 生成器函数中 yield 之前的语句在 enter() 方法中执行,yield 之后的语句在 exit() 中执行,而 yield 产生的值赋给了 as 子句中的 value 变量。
  • try和finally语句中实现的是锁的获取和释放。
  • try之前的语句,实现的是对锁的排序,以及锁排序是否被破坏的判断。
  • ​线程同步(Event)​

    线程之间经常需要协同工作,通过某种技术,让一个线程访问某些数据时,其它线程不能访问这些数据,直到该线程完成对数据的操作。这些技术包括​临界区(Critical Section)​,​互斥量(Mutex)​,​信号量(Semaphore)​,​事件Event​等。

    ​互斥锁、递归锁、升序锁是实现线程同步的一个方法,其它的还有semaphore 信号量机制,event 事件机制。​

    semaphore 信号量机制在python 里面也很简单就能够实现线程的同步。

    如果对操作系统有一定的了解, 那么对操作系统的PV原语操作应该有印象, 信号量其实就是基于这个机制的.

    ​semaphore​ 类是threading 模块下的一个类, 主要两个函数: acquire 函数, release 函数这和lock 类的函数是一样的, 只不过功能不一样, semaphore 机制的acquire 函数的参数允许你自己设置最大的并发量, 就是说允许多少个线程来操作同一个函数或是变量, 同时执行一次就会递减一次, release 函数则是递增, 如果计数到了0, 则阻塞起线程, 不再允许线程访问该方法或是变量.

    # encoding: utf-8
    '''
    @contact: 1257309054@qq.com
    @wechat: 1257309054
    @Software: PyCharm
    @file: 信号量实现线程同步.py
    @time: 2020/3/4 22:30
    @author:LDC
    '''
    # python 多线程同步 semaphore
    import threading
    import time

    # 初始化信号量数量...当调用acquire 将数量置为 0, 将阻塞线程等待其他线程调用release() 函数
    semaphore = threading.Semaphore(2)


    def func():
    if semaphore.acquire():
    for i in range(2):
    print(threading.currentThread().getName() + ' get semaphore')
    time.sleep(1)
    semaphore.release()
    print(threading.currentThread().getName() + ' release semaphore')


    if __name__ == '__main__':
    for i in range(4):
    t1 = threading.Thread(target=func)
    t1.start()

    输出

    Thread-1 get semaphore
    Thread-2 get semaphore
    Thread-1 get semaphore
    Thread-2 get semaphore
    Thread-1 release semaphore
    Thread-3 get semaphore
    Thread-2 release semaphore
    Thread-4 get semaphore
    Thread-3 get semaphore
    Thread-4 get semaphore
    Thread-3 release semaphore
    Thread-4 release semaphore

    可以看到主体函数一次只允许两个线程访问。

    ​event 机制​不仅能够实现线程间的通信, 也是实现线程同步的一个好方法。

    事件是线程之间通信的最简单的机制之一, 一个线程指示一个事件和其他线程等待它.

    event.py 是threading 模块下的一个类,

    相比较前面两个机制, 这个类提供了四个方法, 分别是:

    is_set() 函数, set() 函数, clear() 函数, wait() 函数. threading库中的event对象通过使用内部一个flag标记,通过flag的True或者False的变化来进行操作。

    名称

    含义

    set( )

    标记设置为True

    clear( )

    标记设置为False

    is_set( )

    标记是否为True

    wait(timeout=None)

    设置等待标记为True的时长,None为无限等待。等到返回True,等不到返回False

    # encoding: utf-8
    '''
    @contact: 1257309054@qq.com
    @wechat: 1257309054
    @Software: PyCharm
    @file: 事件机制实现线程同步.py
    @time: 2020/3/4 22:46
    @author:LDC
    '''

    import logging
    import threading
    import time


    # 打印线程名以及日志信息
    logging.basicConfig(level=logging.DEBUG, format="(%(threadName)-10s : %(message)s", )


    def wait_for_event_timeout(e, t):
    """Wait t seconds and then timeout"""
    while not e.isSet():
    logging.debug("wait_for_event_timeout starting")
    event_is_set = e.wait(t) # 阻塞, 等待设置为true
    logging.debug("event set: %s" % event_is_set)
    if event_is_set:
    logging.debug("processing event")
    else:
    logging.debug("doing other work")


    e = threading.Event() # 初始化为false
    t2 = threading.Thread(name="nonblock", target=wait_for_event_timeout, args=(e, 2))
    t2.start()
    logging.debug("Waiting before calling Event.set()")
    # time.sleep(7)
    e.set() # 唤醒线程, 同时将event 设置为true
    logging.debug("Event is set")

    输出:

    ​协程​

    ​协程,是单线程下的并发​,又称微线程,英文名Coroutine。是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的。协程能保留上一次调用时的状态,每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置,当程序中存在大量不需要CPU的操作时(IO),适用于协程。

    协程有极高的执行效率,因为子程序切换不是线程切换,而是由程序自身控制,因此,没有线程切换的开销。

    不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多。

    因为协程是一个线程执行,所以想要利用多核CPU,最简单的方法是多进程+协程,这样既充分利用多核,又充分发挥协程的高效率。

    那符合什么条件就能称之为协程:

    1、必须在只有一个单线程里实现并发

    2、修改共享数据不需加锁

    3、用户程序里自己保存多个控制流的上下文栈

    4、一个协程遇到IO操作自动切换到其它协程

    python中对于协程有四个模块,​greenlet、gevent、yield和async来实现切换+保存线程​

    ​通过yield来实现任务切换+保存线程​

    # encoding: utf-8
    '''
    @contact: 1257309054@qq.com
    @wechat: 1257309054
    @Software: PyCharm
    @file: yield_test.py
    @time: 2020/3/5 15:39
    @author:LDC
    '''
    import time


    def func1():
    while True:
    print('func1')
    yield '返回func1'


    def func2():
    g = func1()
    print(next(g))
    for i in range(3):
    print(next(g))
    time.sleep(3)
    print('func2')


    if __name__ == '__main__':
    start = time.time()
    func2()
    stop = time.time()
    print(stop - start)

    yield不能节省IO时间,只是单纯的进行程序切换

    # 基于yield并发执行,多任务之间来回切换,这就是个简单的协程的体现,但是他能够节省I/O时间吗?不能
    import time


    def consumer():
    '''任务1:接收数据,处理数据'''
    while True:
    x = yield
    time.sleep(1) # 发现什么?只是进行了切换,但是并没有节省I/O时间
    print('处理了数据:', x)


    def producer():
    '''任务2:生产数据'''
    g = consumer()
    next(g) # 找到了consumer函数的yield位置
    for i in range(3):
    g.send(i) # 给yield传值,然后再循环给下一个yield传值,并且多了切换的程序,比直接串行执行还多了一些步骤,导致执行效率反而更低了。
    print('发送了数据:', i)


    if __name__ == '__main__':
    start = time.time()
    # 基于yield保存状态,实现两个任务直接来回切换,即并发的效果
    # PS:如果每个任务中都加上打印,那么明显地看到两个任务的打印是你一次我一次,即并发执行的.
    producer() # 我在当前线程中只执行了这个函数,但是通过这个函数里面的send切换了另外一个任务
    stop = time.time()
    print(stop - start)
    # 串行执行的方式
    # start = time.time()
    # res = producer()
    # consumer()
    # stop = time.time()
    # print(stop - start)

    yield检测不到IO,无法实现遇到IO自动切换。

    ​greenlet是手动切换​

    # encoding: utf-8
    '''
    @contact: 1257309054@qq.com
    @wechat: 1257309054
    @Software: PyCharm
    @file: greenlet创建协程.py
    @time: 2020/3/4 23:08
    @author:LDC
    '''

    '''
    使用greenlet + switch实现协程调度
    '''
    from greenlet import greenlet
    import time


    def func1():
    print("开门走进卫生间")
    time.sleep(3)
    gr2.switch() # 把CPU执行权交给gr2

    print("飞流直下三千尺")
    time.sleep(3)
    gr2.switch()
    pass


    def func2():
    print("一看拖把放旁边")
    time.sleep(3)
    gr1.switch()

    print("疑是银河落九天")
    pass


    if __name__ == '__main__':
    gr1 = greenlet(func1)
    gr2 = greenlet(func2)
    gr1.switch() # 把CPU执行权先给gr1
    pass

    greenlet只是提供了一种比yield(生成器)更加便捷的切换方式,当切到一个任务执行时如果遇到IO,那就原地阻塞,仍然是没有解决遇到IO自动切换来提升效率的问题。

    ​Gevent实现自动切换协程(多协程​)

    协程的本质就是在单线程下,由用户自己控制一个任务遇到io阻塞了就切换另外一个任务去执行,以此来提升效率。

    一般在工作中我们都是进程+线程+协程的方式来实现并发,以达到最好的并发效果。

    如果是4核的CPU,一般起5个进程,每个进程中20个线程(5倍CPU数量),每个线程可以起500个协程,大规模爬取页面的时候,等待网络延迟的时间的时候,我们就可以用协程去实现并发。并发数量=5​20​500从而达到5000个并发,这是一般一个4个CPU的机器最大的并发数。nginx在负载均衡的时候最大承载量是5w个。

    单线程里的这20个任务的代码通常既有计算操作又有阻塞操作,我们完全可以在执行任务1时遇到阻塞,就利用阻塞的时间去执行任务2。。。如此,才能提高效率,这就用到了Gevent模块。

    ​Gevent(自动切换,由于切换是在IO操作时自动完成,所以gevent需要修改Python自带的一些标准库,这一过程在启动时通过monkey patch完成)。​

    # encoding: utf-8
    '''
    @contact: 1257309054@qq.com
    @wechat: 1257309054
    @Software: PyCharm
    @file: monkey_patch.py
    @time: 2020/3/4 23:40
    @author:LDC
    '''

    '''
    使用gevent + monkey.patch_all()自动调度网络IO协程
    '''
    from gevent import monkey;

    monkey.patch_all() # 将替换为

    4、关键字:async/await

    ​asyncio​ 是用来编写 ​并发​ 代码的库,使用 ​async/await​ 语法。

    asyncio 被用作多个提供高性能 Python 异步框架的基础,包括网络和网站服务,数据库连接库,分布式任务队列等等。

    ​asyncio 往往是构建 IO 密集型和高层级 结构化 网络代码的最佳选择。​

    ​正常的函数在执行时是不会中断的,所以你要写一个能够中断的函数,就需要添加async关键。​

    ​async 用来声明一个函数为异步函数​,异步函数的特点是能在函数执行过程中挂起,去执行其他异步函数,等到挂起条件(假设挂起条件是asyncio.sleep(5))消失后,也就是5秒到了再回来执行。

    ​await 用来用来声明程序挂起​,比如异步程序执行到某一步时需要等待的时间很长,就将此挂起,去执行其他的异步程序。await 后面只能跟异步程序或有await属性的对象,因为异步程序与一般程序不同。假设有两个异步函数async a,async b,a中的某一步有await,当程序碰到关键字await b()后,异步程序挂起后去执行另一个异步b程序,就是从函数内部跳出去执行其他函数,当挂起条件消失后,不管b是否执行完,要马上从b程序中跳出来,回到原程序执行原来的操作。如果await后面跟的b函数不是异步函数,那么操作就只能等b执行完再返回,无法在b执行的过程中返回。如果要在b执行完才返回,也就不需要用await关键字了,直接调用b函数就行。所以这就需要await后面跟的是异步函数了。在一个异步函数中,可以不止一次挂起,也就是可以用多个await。

    ​可以使用async、await来实现协程的并发​,下面以一个爬虫例子来说明:

    # encoding: utf-8
    '''
    @contact: 1257309054@qq.com
    @wechat: 1257309054
    @Software: PyCharm
    @file: async_await.py
    @time: 2020/3/5 23:15
    @author:LDC
    '''
    from gevent import monkey;

    monkey.patch_all()
    import gevent

    import asyncio
    from functools import wraps, partial
    import time

    import requests


    # 定义一个查看函数执行时间的装饰器
    def func_use_time(func):
    @wraps(func)
    def inside(*arg, **kwargs):
    start = time.clock()
    res = func(*arg, **kwargs)
    print('***************执行时间*****************', time.clock() - start)
    return res

    return inside


    def get_page_text(url):
    # 爬取网站
    print(url)
    try:
    headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.88 Safari/537.36"
    }
    resp = requests.get(url, headers=headers) # 发起网络请求,返回需要时间——阻塞IO

    html = resp.text
    return html
    except Exception as e:
    print('{}发生错误,{}'.format(url, e))
    return ''


    class Narmal():
    # 正常爬取
    def __init__(self, urls):
    self.urls = urls
    self.res_dict = {}

    @func_use_time
    def run(self):
    for url in self.urls:
    res = get_page_text(url)
    self.res_dict[url] = len(res)
    print('串行获取结果', self.res_dict)


    class UseAsyncio():
    # 使用async实现协程并发
    def __init__(self, urls):
    self.urls = urls
    self.res_dict = {}

    # 定义一个异步函数,执行爬取任务
    async def task(self, url):
    print(url)
    try:
    headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.88 Safari/537.36"
    }
    # 利用BaseEventLoop.run_in_executor()可以在coroutine中执行第三方的命令,例如requests.get()
    # 第三方命令的参数与关键字利用functools.partial传入
    future = asyncio.get_event_loop().run_in_executor(None, partial(requests.get, url, headers=headers))
    resp = await future
    html = resp.text
    self.res_dict[url] = len(html)
    return html
    except Exception as e:
    print('{}发生错误,{}'.format(url, e))
    return ''

    @func_use_time
    def run(self):
    loop = asyncio.get_event_loop()
    tasks = [asyncio.ensure_future(self.task(url)) for url in self.urls]
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()
    # 获取async结果
    # for task in tasks:
    # print(task.result())
    print('async获取结果', self.res_dict)


    class UseGevent():
    # 使用Gevent实现协程并发
    def __init__(self, urls):
    self.urls = urls
    self.res_dict = {}

    def task(self, url):
    res = get_page_text(url)
    self.res_dict[url] = len(res)

    @func_use_time
    def run(self):
    gevent.joinall([gevent.spawn(self.task, url) for url in self.urls])
    print(self.res_dict)


    if __name__ == '__main__':
    urls = ["www.sina.com", "www.qq.com", "www.baidu.com",
    "www.163.com", "www.4399.com", "www.sohu.com",
    "www.youku.com",
    ]
    print("使用正常爬取方式,即串行")
    Narmal(urls).run()
    print("使用Asyncio爬取方式,async实现协程并发")
    UseAsyncio(urls).run()
    print("使用Gevent爬取方式,实现协程并发")
    UseGevent(urls).run()

    输出:

    使用正常爬取方式,即串行
    www.sina.com
    www.qq.com
    www.baidu.com
    www.163.com
    www.4399.com
    www.sohu.com
    www.youku.com
    串行获取结果 {'www.sina.com': 539723, 'www.qq.com': 227753, 'www.baidu.com': 166916, 'www.163.com': 483531, 'www.4399.com': 172837, 'www.sohu.com': 178312, 'www.youku.com': 990760}
    ***************执行时间***************** 1.9352532
    使用Asyncio爬取方式,async实现协程并发
    www.sina.com
    www.qq.com
    www.baidu.com
    www.163.com
    www.4399.com
    www.sohu.com
    www.youku.com
    async获取结果 {'www.4399.com': 172837, 'www.163.com': 483531, 'www.qq.com': 227753, 'www.sohu.com': 178310, 'www.baidu.com': 166625, 'www.sina.com': 539723, 'www.youku.com': 1047892}
    ***************执行时间***************** 0.951011
    使用Gevent爬取方式,实现协程并发
    www.sina.com
    www.qq.com
    www.baidu.com
    www.163.com
    www.4399.com
    www.sohu.com
    www.youku.com
    {'www.163.com': 483531, 'www.4399.com': 172837, 'www.sohu.com': 178312, 'www.qq.com': 227753, 'www.baidu.com': 166760, 'www.sina.com': 539723, 'www.youku.com': 994926}
    ***************执行时间***************** 1.0057508

    相对来说还是使用​async执行效率高些​。

    后记

    为了让大家能够轻松学编程,我创建了一个公众号,里面有让你快速学会编程的文章,当然也有一些干货提高你的编程水平,也有一些编程项目适合做一些课程设计等课题。


    如果文章对您有帮助,请我喝杯咖啡吧!

    公众号


    关注我,我们一起成长~~