博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
快速了解Python并发编程的工程实现(下)
阅读量:6489 次
发布时间:2019-06-24

本文共 4221 字,大约阅读时间需要 14 分钟。

关于我

编程界的一名小程序猿,目前在一个创业团队任team lead,技术栈涉及Android、Python、Java和Go,这个也是我们团队的主要技术栈。 联系:hylinux1024@gmail.com

0x00 使用进程实现并发

介绍了线程的使用。然而Python中由于(全局解释锁GIL)的存在,每个线程在在执行时需要获取到这个GIL,在同一时刻中只有一个线程得到解释锁的执行,Python中的线程并没有真正意义上的并发执行,多线程的执行效率也不一定比单线程的效率更高。 如果要充分利用现代多核CPU的并发能力,就要使用multipleprocessing模块了。

0x01 multipleprocessing

与使用线程的threading模块类似,multipleprocessing模块提供许多高级API。最常见的是Pool对象了,使用它的接口能很方便地写出并发执行的代码。

from multiprocessing import Pooldef f(x):    return x * xif __name__ == '__main__':    with Pool(5) as p:        # map方法的作用是将f()方法并发地映射到列表中的每个元素        print(p.map(f, [1, 2, 3]))# 执行结果# [1, 4, 9]复制代码

关于Pool下文中还会提到,这里我们先来看Process

Process

要创建一个进程可以使用Process类,使用start()方法启动进程。

from multiprocessing import Processimport osdef echo(text):    # 父进程ID    print("Process Parent ID : ", os.getppid())    # 进程ID    print("Process PID : ", os.getpid())    print('echo : ', text)if __name__ == '__main__':    p = Process(target=echo, args=('hello process',))    p.start()    p.join()    # 执行结果# Process Parent ID :  27382# Process PID :  27383# echo :  hello process复制代码
进程池

正如开篇提到的multiprocessing模块提供了Pool类可以很方便地实现一些简单多进程场景。 它主要有以下接口

  • apply(func[, args[, kwds]])
    执行func(args,kwds)方法,在方法结束返回前会阻塞。
  • apply_async(func[, args[, kwds[, callback[, error_callback]]]])
    异步执行func(args,kwds),会立即返回一个result对象,如果指定了callback参数,结果会通过回调方法返回,还可以指定执行出错的回调方法error_callback()
  • map(func, iterable[, chunksize])
    类似内置函数map(),可以并发执行func,是同步方法
  • map_async(func, iterable[, chunksize[, callback[, error_callback]]])
    异步版本的map
  • close()
    关闭进程池。当池中的所有工作进程都执行完毕时,进程会退出。
  • terminate()
    终止进程池
  • join()
    等待工作进程执行完,必需先调用close()或者terminate()
from multiprocessing import Pooldef f(x):    return x * xif __name__ == '__main__':    with Pool(5) as p:        # map方法的作用是将f()方法并发地映射到列表中的每个元素        a = p.map(f, [1, 2, 3])        print(a)        # 异步执行map        b = p.map_async(f, [3, 5, 7, 11])        # b 是一个result对象,代表方法的执行结果        print(b)        # 为了拿到结果,使用join方法等待池中工作进程退出        p.close()        # 调用join方法前,需先执行close或terminate方法        p.join()        # 获取执行结果        print(b.get())# 执行结果# [1, 4, 9]# 
# [9, 25, 49, 121]复制代码

map_async()apply_async()执行后会返回一个class multiprocessing.pool.AsyncResult对象,通过它的get()可以获取到执行结果,ready()可以判断AsyncResult的结果是否准备好。

进程间数据的传输

multiprocessing模块提供了两种方式用于进程间的数据共享:队列(Queue)和管道(Pipe)

Queue是线程安全,也是进程安全的。使用Queue可以实现进程间的数据共享,例如下面的demo中子进程put一个对象,在主进程中就能get到这个对象。 任何可以序列化的对象都可以通过Queue来传输。

from multiprocessing import Process, Queuedef f(q):    q.put([42, None, 'hello'])if __name__ == '__main__':    # 使用Queue进行数据通信    q = Queue()    p = Process(target=f, args=(q,))    p.start()    # 主进程取得子进程中的数据    print(q.get())  # prints "[42, None, 'hello']"    p.join()# 执行结果# [42, None, 'hello']复制代码

Pipe()返回一对通过管道连接的Connection对象。这两个对象可以理解为管道的两端,它们通过send()recv()发送和接收数据。

from multiprocessing import Process, Pipedef write(conn):    # 子进程中发送一个对象    conn.send([42, None, 'hello'])    conn.close()def read(conn):    # 在读的进程中通过recv接收对象    data = conn.recv()    print(data)if __name__ == '__main__':    # Pipe()方法返回一对连接对象    w_conn, r_conn = Pipe()    wp = Process(target=write, args=(w_conn,))    rp = Process(target=read, args=(r_conn,))    wp.start()    rp.start()# 执行结果# [42, None, 'hello']复制代码

需要注意的是,两个进程不能同时对一个连接对象进行sendrecv操作。

同步

我们知道线程间的同步是通过锁机制来实现的,进程也一样。

from multiprocessing import Process, Lockimport timedef print_with_lock(l, i):    l.acquire()    try:        time.sleep(1)        print('hello world', i)    finally:        l.release()def print_without_lock(i):    time.sleep(1)    print('hello world', i)if __name__ == '__main__':    lock = Lock()    # 先执行有锁的    for num in range(5):        Process(target=print_with_lock, args=(lock, num)).start()    # 再执行无锁的    # for num in range(5):    #     Process(target=print_without_lock, args=(num,)).start()复制代码

有锁的代码将每秒依次打印

hello world 0hello world 1hello world 2hello world 3hello world 4复制代码

如果执行无锁的代码,则在我的电脑上执行结果是这样的

hello worldhello world  01hello world 2hello world 3hello world 4复制代码

除了Lock,还包括RLockConditionSemaphoreEvent等进程间的同步原语。其用法也与线程间的同步原语很类似。API使用可以参考文末中引用的文档链接。

在工程中实现进程间的数据共享应当优先使用队列或管道。

0x02 总结

本文对multiprocessing模块中常见的API作了简单的介绍。讲述了ProcessPool的常见用法,同时介绍了进程间的数据方式:队列和管道。最后简单了解了进程间的同步原语。

通过与的对比学习,本文的内容应该是更加容易掌握的。

0x03 引用

转载于:https://juejin.im/post/5cefdc60f265da1bca51c0cf

你可能感兴趣的文章
YY博客园UML时序图之博客模块
查看>>
《深入浅出 Java Concurrency》—锁紧机构(一)Lock与ReentrantLock
查看>>
Nginx+Keepalived主备切换(包含nginx服务停止)
查看>>
【linux高级程序设计】(第十三章)Linux Socket网络编程基础 4
查看>>
android中画文字的换行 办法(对于遇到canvas.drawText(String s )无法实现换行问题的解决)...
查看>>
Android IOS WebRTC 音视频开发总结(三九)-- win10升级为何要p2p
查看>>
树莓派的rc.local档(设置开机)
查看>>
chrome打开本地文件目录
查看>>
mysql ODBC 在64位下提示找不到odbc驱动问题
查看>>
MySQL的事务处理及隔离级别
查看>>
一个测试SQL2005数据库连接JSP档
查看>>
JspContext对象与PageContext对象
查看>>
java中间==、equals和hashCode差额
查看>>
TextureView+SurfaceTexture+OpenGL ES来播放视频(一)
查看>>
才一年,H5的发展就成这样了......
查看>>
McBsp接口使用和概念
查看>>
关于WEB Service&WCF&WebApi实现身份验证之WCF篇(1)
查看>>
类是公共,它应该被命名为.java文件声明
查看>>
介绍一个超好用的HICHARTS扩展插件
查看>>
中断相关一【转】
查看>>