加入收藏 | 设为首页 | 会员中心 | 我要投稿 安卓应用网 (https://www.0791zz.com/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Python > 正文

浅谈python 线程池threadpool之实现

发布时间:2020-05-24 16:14:04 所属栏目:Python 来源:互联网
导读:首先介绍一下自己使用到的名词:工作线程(worker):创建线程池时,按照指定的线程数量,创建工作线程,等待从任务队列中get任务;

首先介绍一下自己使用到的名词:

工作线程(worker):创建线程池时,按照指定的线程数量,创建工作线程,等待从任务队列中get任务;

任务(requests):即工作线程处理的任务,任务可能成千上万个,但是工作线程只有少数。任务通过          makeRequests来创建

任务队列(request_queue):存放任务的队列,使用了queue实现的。工作线程从任务队列中get任务进行处理;

任务处理函数(callable):工作线程get到任务后,通过调用任务的任务处理函数即(request.callable_)具体     的     处理任务,并返回处理结果;

任务结果队列(result_queue):任务处理完成后,将返回的处理结果,放入到任务结果队列中(包括异常);

任务异常处理函数或回调(exc_callback):从任务结果队列中get结果,如果设置了异常,则需要调用异常回调处理异常;

任务结果回调(callback):从任务结果队列中get结果,对result进行进一步处理;

上一节介绍了线程池threadpool的安装和使用,本节将主要介绍线程池工作的主要流程:

(1)线程池的创建
(2)工作线程的启动
(3)任务的创建
(4)任务的推送到线程池
(5)线程处理任务
(6)任务结束处理
(7)工作线程的退出

下面是threadpool的定义:

class ThreadPool: 
  """A thread pool,distributing work requests and collecting results. 
 
  See the module docstring for more information. 
 
  """ 
  def __init__(self,num_workers,q_size=0,resq_size=0,poll_timeout=5): 
    pass 
  def createWorkers(self,poll_timeout=5): 
    pass 
  def dismissWorkers(self,do_join=False): 
    pass 
  def joinAllDismissedWorkers(self): 
    pass 
  def putRequest(self,request,block=True,timeout=None): 
    pass 
  def poll(self,block=False): 
    pass 
  def wait(self): 
    pass 

1、线程池的创建(ThreadPool(args))

task_pool=threadpool.ThreadPool(num_works)

task_pool=threadpool.ThreadPool(num_works) 
  def __init__(self,poll_timeout=5): 
    """Set up the thread pool and start num_workers worker threads. 
 
    ``num_workers`` is the number of worker threads to start initially. 
 
    If ``q_size > 0`` the size of the work *request queue* is limited and 
    the thread pool blocks when the queue is full and it tries to put 
    more work requests in it (see ``putRequest`` method),unless you also 
    use a positive ``timeout`` value for ``putRequest``. 
 
    If ``resq_size > 0`` the size of the *results queue* is limited and the 
    worker threads will block when the queue is full and they try to put 
    new results in it. 
 
    .. warning: 
      If you set both ``q_size`` and ``resq_size`` to ``!= 0`` there is 
      the possibilty of a deadlock,when the results queue is not pulled 
      regularly and too many jobs are put in the work requests queue. 
      To prevent this,always set ``timeout > 0`` when calling 
      ``ThreadPool.putRequest()`` and catch ``Queue.Full`` exceptions. 
 
    """ 
    self._requests_queue = Queue.Queue(q_size)#任务队列,通过threadpool.makeReuests(args)创建的任务都会放到此队列中 
    self._results_queue = Queue.Queue(resq_size)#字典,任务对应的任务执行结果</span> 
    self.workers = []#工作线程list,通过self.createWorkers()函数内创建的工作线程会放到此工作线程list中 
    self.dismissedWorkers = []#被设置线程事件并且没有被join的工作线程 
    self.workRequests = {}#字典,记录任务被分配到哪个工作线程中</span> 
    self.createWorkers(num_workers,poll_timeout) 

其中,初始化参数为:

num_works:线程池中线程个数

q_size :任务队列的长度限制,如果限制了队列的长度,那么当调用putRequest()添加任务时,到达限制长度后,那么putRequest将会不断尝试添加任务,除非在putRequest()设置了超时或者阻塞; 

esq_size: 任务结果队列的长度;

pool_timeout:工作线程如果从request队列中,读取不到request,则会阻塞pool_timeout,如果仍没request则直接返回;

其中,成员变量:

self._requests_queue:  任务队列,通过threadpool.makeReuests(args)创建的任务都会放到此队列中;
self._results_queue:  字典,任务对应的任务执行 
self.workers:  工作线程list,通过self.createWorkers()函数内创建的工作线程会放到此工作线程list中;
self.dismisssedWorkers:  被设置线程事件,并且没有被join的工作线程
self.workRequests:  字典,记录推送到线程池的任务,结构为requestID:request。其中requestID是任务的唯一标识,会在后面作介绍。

2、工作线程的启动(self.createWorks(args))

函数定义:

def createWorkers(self,poll_timeout=5): 
   """Add num_workers worker threads to the pool. 
 
   ``poll_timout`` sets the interval in seconds (int or float) for how 
   ofte threads should check whether they are dismissed,while waiting for 
   requests. 
 
   """ 
   for i in range(num_workers): 
     self.workers.append(WorkerThread(self._requests_queue,self._results_queue,poll_timeout=poll_timeout)) 

其中WorkerThread()继承自thread,即python内置的线程类,将创建的WorkerThread对象放入到self.workers队列中。下面看一下WorkerThread类的定义:

从self.__init__(args)可看出:

class WorkerThread(threading.Thread): 
  """Background thread connected to the requests/results queues. 
 
  A worker thread sits in the background and picks up work requests from 
  one queue and puts the results in another until it is dismissed. 
 
  """ 
 
  def __init__(self,requests_queue,results_queue,poll_timeout=5,**kwds): 
    """Set up thread in daemonic mode and start it immediatedly. 
 
    ``requests_queue`` and ``results_queue`` are instances of 
    ``Queue.Queue`` passed by the ``ThreadPool`` class when it creates a 
    new worker thread. 
 
    """ 
    threading.Thread.__init__(self,**kwds) 
    self.setDaemon(1)# 
    self._requests_queue = requests_queue#任务队列 
    self._results_queue = results_queue#任务结果队列 
    self._poll_timeout = poll_timeout#run函数中从任务队列中get任务时的超时时间,如果超时则继续while(true); 
    self._dismissed = threading.Event()#线程事件,如果set线程事件则run会执行break,直接退出工作线程; 
    self.start() 
 
  def run(self): 
    """Repeatedly process the job queue until told to exit.""" 
    while True: 
      if self._dismissed.isSet():#如果设置了self._dismissed则退出工作线程 
 
        # we are dismissed,break out of loop 
        break 
      # get next work request. If we don't get a new request from the 
      # queue after self._poll_timout seconds,we jump to the start of 
      # the while loop again,to give the thread a chance to exit. 
      try: 
        request = self._requests_queue.get(True,self._poll_timeout) 
      except Queue.Empty:#尝从任务 队列self._requests_queue 中get任务,如果队列为空,则continue 
        continue 
      else: 
        if self._dismissed.isSet():#检测此工作线程事件是否被set,如果被设置,意味着要结束此工作线程,那么就需要将取到的任务返回到任务队列中,并且退出线程 
          # we are dismissed,put back request in queue and exit loop 
          self._requests_queue.put(request) 
          break 
        try:<span style="color:#如果线程事件没有被设置,那么执行任务处理函数request.callable,并将返回的result,压入到任务结果队列中 
          result = request.callable(*request.args,**request.kwds) 
          self._results_queue.put((request,result)) 
        except: 
          request.exception = True 
          self._results_queue.put((request,sys.exc_info()))#如果任务处理函数出现异常,则将异常压入到队列中 
 
  def dismiss(self):</span> 
    """Sets a flag to tell the thread to exit when done with current job. 
    """ 
    self._dismissed.set() 

初始化中变量:

self._request_queue:任务队列;
self._resutls_queuqe,:任务结果队列 ;
self._pool_timeout:run函数中从任务队列中get任务时的超时时间,如果超时则继续while(true);
self._dismissed:线程事件,如果set线程事件则run会执行break,直接退出工作线程;

最后调用self.start()启动线程,run函数定义见上面:

从上面run函数while执行步骤如下:

(1)如果设置了self._dismissed则退出工作线程,否则执行第2步
(2)尝从任务 队列self._requests_queue 中get任务,如果队列为空,则continue 执行下一次while循环,否则执行第3步
(3)检测此工作线程事件是否被set,如果被设置,意味着要结束此工作线程,那么就需要将取到的任务返回到任务队列中,并且退出线程。如果线程事件没有被设置,那么执行任务处理函数request.callable,并将返回的result,压入到任务结果队列中,如果任务处理函数出现异常,则将异常压入到队列中。最后跳转第4步
(4)继续循环,返回1

到此工作线程创建完毕,根据设置的线程池线程数量,创建工作线程,工作线程从任务队列中get任务,进行任务处理,并将任务处理结果压入到任务结果队列中。

3、任务的创建(makeRequests)

(编辑:安卓应用网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读