Python 進(jìn)程池

python 進(jìn)程池

可以使用與創(chuàng)建和使用線程池相同的方式創(chuàng)建和使用進(jìn)程池。進(jìn)程池可以定義為預(yù)先實(shí)例化和空閑進(jìn)程的組,它們隨時(shí)可以進(jìn)行工作。當(dāng)我們需要執(zhí)行大量任務(wù)時(shí),創(chuàng)建進(jìn)程池優(yōu)先于為每個(gè)任務(wù)實(shí)例化新進(jìn)程。

 

python模塊 - concurrent.futures

python標(biāo)準(zhǔn)庫有一個(gè)名為 concurrent.futures 的模塊。該模塊是在python 3.2中添加的,用于為開發(fā)人員提供啟動(dòng)異步任務(wù)的高級(jí)接口。它是python的線程和多處理模塊之上的抽象層,用于提供使用線程池或進(jìn)程池運(yùn)行任務(wù)的接口。

在接下來的部分中,我們將查看concurrent.futures模塊的不同子類。

 

執(zhí)行者類

executor 是 concurrent.futures python模塊 的抽象類。它不能直接使用,我們需要使用以下具體子類之一

  • threadpoolexecutor
  • processpoolexecutor

processpoolexecutor - 一個(gè)具體的子類

它是executor類的具體子類之一。它使用多處理,我們獲得了一組用于提交任務(wù)的流程。此池將任務(wù)分配給可用進(jìn)程并安排它們運(yùn)行。

如何創(chuàng)建processpoolexecutor?

在 concurrent.futures 模塊及其具體子類 executor 的幫助下,我們可以輕松創(chuàng)建一個(gè)進(jìn)程池。為此,我們需要構(gòu)建一個(gè) processpoolexecutor ,其中包含我們?cè)诔刂兴璧倪M(jìn)程數(shù)。默認(rèn)情況下,該數(shù)字為5.然后,將任務(wù)提交到流程池。

我們現(xiàn)在將考慮在創(chuàng)建線程池時(shí)使用的相同示例,唯一的區(qū)別是現(xiàn)在我們將使用 processpoolexecutor 而不是 threadpoolexecutor 。

from concurrent.futures import processpoolexecutor
from time import sleep
def task(message):
   sleep(2)
   return message

def main():
   executor = processpoolexecutor(5)
   future = executor.submit(task, ("completed"))
   print(future.done())
   sleep(2)
   print(future.done())
   print(future.result())
if __name__ == '__main__':
main()

輸出

false
false
completed

在上面的示例中,process poolexecutor 已構(gòu)造為5個(gè)線程。然后,在給出消息之前等待2秒的任務(wù)被提交給進(jìn)程池執(zhí)行器。從輸出中可以看出,任務(wù)直到2秒才完成,因此第一次調(diào)用 done() 將返回false。2秒后,任務(wù)完成,我們通過調(diào)用 result() 方法得到未來的 結(jié)果 。

實(shí)例化processpoolexecutor - 上下文管理器

實(shí)例化processpoolexecutor的另一種方法是在上下文管理器的幫助下。它的工作方式與上例中使用的方法類似。使用上下文管理器的主要優(yōu)點(diǎn)是它在語法上看起來很好。實(shí)例化可以在以下代碼的幫助下完成

with processpoolexecutor(max_workers = 5) as executor

為了更好地理解,我們采用與創(chuàng)建線程池時(shí)使用的相同的示例。在此示例中,我們需要先導(dǎo)入 concurrent.futures 模塊。然后創(chuàng)建一個(gè)名為 load_url() 的函數(shù),它將加載請(qǐng)求的url。該 processpoolexecutor 然后用5號(hào)在池中的線程創(chuàng)建的。process poolexecutor 已被用作上下文管理器。我們可以通過調(diào)用 result() 方法獲得未來的 結(jié)果 。

import concurrent.futures
from concurrent.futures import processpoolexecutor
import urllib.request

urls = ['http://www.foxnews.com/',
   'http://www.cnn.com/',
   'http://europe.wsj.com/',
   'http://www.bbc.co.uk/',
   'http://some-made-up-domain.com/']

def load_url(url, timeout):
   with urllib.request.urlopen(url, timeout = timeout) as conn:
      return conn.read()

def main():
   with concurrent.futures.processpoolexecutor(max_workers=5) as executor:
      future_to_url = {executor.submit(load_url, url, 60): url for url in urls}
      for future in concurrent.futures.as_completed(future_to_url):
      url = future_to_url[future]
      try:
         data = future.result()
      except exception as exc:
         print('%r generated an exception: %s' % (url, exc))
      else:
         print('%r page is %d bytes' % (url, len(data)))

if __name__ == '__main__':
   main()

輸出

上面的python腳本將生成以下輸出

'http://some-made-up-domain.com/' generated an exception: 
'http://www.foxnews.com/' page is 229476 bytes
'http://www.cnn.com/' page is 165323 bytes
'http://www.bbc.co.uk/' page is 284981 bytes
'http://europe.wsj.com/' page is 967575 bytes

使用executor.map()函數(shù)

python map() 函數(shù)廣泛用于執(zhí)行許多任務(wù)。一個(gè)這樣的任務(wù)是將特定函數(shù)應(yīng)用于迭代中的每個(gè)元素。類似地,我們可以將迭代器的所有元素映射到函數(shù),并將它們作為獨(dú)立的作業(yè)提交給 processpoolexecutor 。請(qǐng)考慮以下python腳本示例來理解這一點(diǎn)。

我們將考慮使用 executor.map() 函數(shù)創(chuàng)建線程池時(shí)使用的相同示例。在下面給出的示例中,map函數(shù)用于將 square() 函數(shù)應(yīng)用于values數(shù)組中的每個(gè)值。

from concurrent.futures import processpoolexecutor
from concurrent.futures import as_completed
values = [2,3,4,5]
def square(n):
   return n * n
def main():
   with processpoolexecutor(max_workers = 3) as executor:
      results = executor.map(square, values)
   for result in results:
      print(result)
if __name__ == '__main__':
   main()

輸出

上面的python腳本將生成以下輸出

4
9
16
25

何時(shí)使用processpoolexecutor和threadpoolexecutor?

既然我們已經(jīng)研究了executor類 - threadpoolexecutor和processpoolexecutor,我們需要知道何時(shí)使用哪個(gè)執(zhí)行器。我們需要在遇到cpu限制的工作負(fù)載時(shí)選擇processpoolexecutor,在i / o綁定工作負(fù)載的情況下選擇threadpoolexecutor。

如果我們使用 processpoolexecutor ,那么我們不需要擔(dān)心gil,因?yàn)樗褂枚嗵幚怼6?,與 threadpoolexecution 相比,執(zhí)行時(shí)間會(huì)更短。請(qǐng)考慮以下python腳本示例來理解這一點(diǎn)。

import time
import concurrent.futures

value = [8000000, 7000000]

def counting(n):
   start = time.time()
   while n > 0:
      n -= 1
   return time.time() - start

def main():
   start = time.time()
   with concurrent.futures.processpoolexecutor() as executor:
      for number, time_taken in zip(value, executor.map(counting, value)):
         print('start: {} time taken: {}'.format(number, time_taken))
   print('total time taken: {}'.format(time.time() - start))

if __name__ == '__main__':
main()

輸出

start: 8000000 time taken: 1.5509998798370361
start: 7000000 time taken: 1.3259999752044678
total time taken: 2.0840001106262207

example- python script with threadpoolexecutor:
import time
import concurrent.futures

value = [8000000, 7000000]

def counting(n):
   start = time.time()
   while n > 0:
      n -= 1
   return time.time() - start

def main():
   start = time.time()
   with concurrent.futures.threadpoolexecutor() as executor:
      for number, time_taken in zip(value, executor.map(counting, value)):
         print('start: {} time taken: {}'.format(number, time_taken))
      print('total time taken: {}'.format(time.time() - start))

if __name__ == '__main__':
main()

輸出

start: 8000000 time taken: 3.8420000076293945
start: 7000000 time taken: 3.6010000705718994
total time taken: 3.8480000495910645

從上述兩個(gè)程序的輸出中,我們可以看到使用 processpoolexecutor 和 threadpoolexecutor時(shí) 執(zhí)行時(shí)間的差異。

下一節(jié):python 多處理器

相關(guān)文章
亚洲国产精品第一区二区,久久免费视频77,99V久久综合狠狠综合久久,国产免费久久九九免费视频