流程互通是指流程之間的數(shù)據(jù)交換。有必要在進(jìn)程之間交換數(shù)據(jù)以開發(fā)并行應(yīng)用程序。下圖顯示了多個(gè)子進(jìn)程之間同步的各種通信機(jī)制
各種溝通機(jī)制
在本節(jié)中,我們將了解各種通信機(jī)制。機(jī)制如下所述 -
隊(duì)列
隊(duì)列可以與多進(jìn)程程序一起使用。 多處理 模塊的queue類與 queue.queue 類相似。因此,可以使用相同的api。 多處理 .queue為我們提供了一個(gè)線程和進(jìn)程安全的fifo(先進(jìn)先出)進(jìn)程之間的通信機(jī)制。
例
下面是一個(gè)簡(jiǎn)單的例子,它來(lái)自python官方文檔的多處理,以理解queue類的多處理的概念。
from multiprocessing import process, queue import queue import random def f(q): q.put([42, none, 'hello']) def main(): q = queue() p = process(target = f, args = (q,)) p.start() print (q.get()) if __name__ == '__main__': main()
輸出
[42, none, 'hello']
管道
它是一種數(shù)據(jù)結(jié)構(gòu),用于在多進(jìn)程程序中的進(jìn)程之間進(jìn)行通信。pipe()函數(shù)返回一個(gè)由管道連接的連接對(duì)象,默認(rèn)情況下是雙工(雙向)。它以下列方式工作 -
- 它返回一對(duì)連接對(duì)象,表示管道的兩端。
- 每個(gè)對(duì)象都有兩個(gè)方法 - send() 和 recv() ,以便在進(jìn)程之間進(jìn)行通信。
例
下面是一個(gè)簡(jiǎn)單的例子,它取自python官方文檔的多處理,以理解多處理的 pipe() 函數(shù)的概念。
from multiprocessing import process, pipe def f(conn): conn.send([42, none, 'hello']) conn.close() if __name__ == '__main__': parent_conn, child_conn = pipe() p = process(target = f, args = (child_conn,)) p.start() print (parent_conn.recv()) p.join()
產(chǎn)量
[42, none, 'hello']
manager
manager是一類多處理模塊,它提供了一種在所有用戶之間協(xié)調(diào)共享信息的方法。管理器對(duì)象控制服務(wù)器進(jìn)程,該進(jìn)程管理共享對(duì)象并允許其他進(jìn)程操作它們。換句話說,管理者提供了一種創(chuàng)建可在不同流程之間共享的數(shù)據(jù)的方法。以下是經(jīng)理對(duì)象的不同屬性
- 管理器的主要屬性是控制管理共享對(duì)象的服務(wù)器進(jìn)程。
- 另一個(gè)重要屬性是在任何進(jìn)程修改它時(shí)更新所有共享對(duì)象。
例
下面是一個(gè)示例,它使用manager對(duì)象在服務(wù)器進(jìn)程中創(chuàng)建列表記錄,然后在該列表中添加新記錄。
import multiprocessing def print_records(records): for record in records: print("name: {0}\nscore: {1}\n".format(record[0], record[1])) def insert_record(record, records): records.append(record) print("a new record is added\n") if __name__ == '__main__': with multiprocessing.manager() as manager: records = manager.list([('computers', 1), ('histoty', 5), ('hindi',9)]) new_record = ('english', 3) p1 = multiprocessing.process(target = insert_record, args = (new_record, records)) p2 = multiprocessing.process(target = print_records, args = (records,)) p1.start() p1.join() p2.start() p2.join()
輸出
a new record is added name: computers score: 1 name: histoty score: 5 name: hindi score: 9 name: english score: 3
管理器中命名空間的概念
manager class帶有命名空間的概念,這是一種跨多個(gè)進(jìn)程共享多個(gè)屬性的快捷方法。命名空間沒有任何可以調(diào)用的公共方法,但它們具有可寫屬性。
例
以下python腳本示例幫助我們利用名稱空間在主進(jìn)程和子進(jìn)程之間共享數(shù)據(jù) -
import multiprocessing def mng_nasp(using_ns): using_ns.x +=5 using_ns.y *= 10 if __name__ == '__main__': manager = multiprocessing.manager() using_ns = manager.namespace() using_ns.x = 1 using_ns.y = 1 print ('before', using_ns) p = multiprocessing.process(target = mng_nasp, args = (using_ns,)) p.start() p.join() print ('after', using_ns)
產(chǎn)量
before namespace(x = 1, y = 1) after namespace(x = 6, y = 10)
ctypes-array和value
多處理模塊提供array和value對(duì)象,用于將數(shù)據(jù)存儲(chǔ)在共享內(nèi)存映射中。 array 是從共享內(nèi)存分配的ctypes數(shù)組, value 是從共享內(nèi)存分配的ctypes對(duì)象。
為了使用,從多處理導(dǎo)入process,value,array。
例
以下python腳本是一個(gè)從python文檔中獲取的示例,它利用ctypes array和value在進(jìn)程之間共享一些數(shù)據(jù)。
def f(n, a): n.value = 3.1415927 for i in range(len(a)): a[i] = -a[i] if __name__ == '__main__': num = value('d', 0.0) arr = array('i', range(10)) p = process(target = f, args = (num, arr)) p.start() p.join() print (num.value) print (arr[:])
輸出
3.1415927 [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
通信順序進(jìn)程(csp)
csp用于說明系統(tǒng)與具有并發(fā)模型的其他系統(tǒng)的交互。csp是一個(gè)通過消息傳遞編寫并發(fā)或程序的框架,因此它對(duì)描述并發(fā)性很有效。
python庫(kù) - pycsp
為了實(shí)現(xiàn)csp中的核心原語(yǔ),python有一個(gè)名為pycsp的庫(kù)。它使實(shí)現(xiàn)非常簡(jiǎn)短和可讀,因此可以非常容易地理解它。以下是pycsp的基本流程網(wǎng)絡(luò)
在上面的pycsp過程網(wǎng)絡(luò)中,有兩個(gè)過程 - 過程1和過程2.這些過程通過兩個(gè)通道傳遞消息進(jìn)行通信 - 通道1和通道2。
安裝pycsp
借助以下命令,我們可以安裝python庫(kù)pycsp
pip install pycsp
例
以下python腳本是一個(gè)相互并行運(yùn)行兩個(gè)進(jìn)程的簡(jiǎn)單示例。它是在pycsp python libabary的幫助下完成的
from pycsp.parallel import * import time @process def p1(): time.sleep(1) print('p1 exiting') @process def p2(): time.sleep(1) print('p2 exiting') def main(): parallel(p1(), p2()) print('terminating') if __name__ == '__main__': main()
在上面的腳本中,創(chuàng)建了兩個(gè)函數(shù),即 p1 和 p2 ,然后使用 @process 進(jìn)行修飾,以將它們轉(zhuǎn)換為進(jìn)程。
輸出
p2 exiting p1 exiting terminating