Package grassyknoll :: Package concurrent :: Module ThreadPool
[hide private]

Source Code for Module grassyknoll.concurrent.ThreadPool

  1  """Local L{ThreadPool}""" 
  2   
  3  from grassyknoll.lib.util import ensureName 
  4  from grassyknoll.lib.meta import AutoLogger, FactoryMixin 
  5  import copy 
  6   
  7  from lib import TimeOut, chat 
  8  import MailBox 
  9  import Message 
 10  import Worker 
 11   
 12  __all__=['ThreadPool', 'BroadcastThreadPool'] 
 13   
14 -class ThreadPool(FactoryMixin):
15 """a threadpool with L{MailBox} support 16 17 @ivar name: name of the threadpool. This is used for mailboxes, etc. 18 @type name: string 19 20 @ivar workers: the pool's L{Worker.Worker}s 21 @type workers: set 22 23 @ivar inbox: a shared inbox for the pool 24 @type inbox: L{MailBox.ThreadMailBox} 25 26 @cvar _workerClass: class of L{Worker.Worker} to create 27 @type _workerClass: class 28 """ 29 30 logger=AutoLogger() 31 _workerClass=Worker.Worker 32
33 - def __init__(self, context_factory, inbox=None, num_workers=1, name=None, **kwargs):
34 """ 35 Other arguments are passed L{_WorkerClass} 36 37 @arg num_workers: how many L{Worker.Worker}s to create 38 @type num_workers: int 39 """ 40 name = self.name = ensureName(name) 41 42 if num_workers <= 0: raise ValueError, "num_workers must be positive" 43 44 if inbox is None: 45 inbox=MailBox.ThreadMailBox(name) 46 elif not isinstance(inbox, MailBox.MailBox): 47 raise TypeError("inbox must be MailBox, not %r"%inbox) 48 self.inbox=inbox 49 50 self.workers=[] 51 for i in xrange(num_workers): 52 w=self._workerClass(context_factory=context_factory, 53 inbox=self.inbox, 54 name="%s-%d"%(self.name, i), 55 **kwargs) 56 self.workers.append(w)
57
58 - def __repr__(self):
59 return "<%s(%s)>" % (self.__class__.__name__, self.name)
60
61 - def start(self):
62 """Start the ThreadPool""" 63 for w in self.workers: 64 w.start()
65
66 - def aliveCount(self):
67 """ 68 @returns: number of workers still alive 69 @rtype: integer 70 """ 71 return len([w for w in self.workers if w.isAlive()])
72
73 - def stop(self):
74 """Shutdown the threadpool and close its inbox, preventing future messages""" 75 for w in self.workers: 76 if w.isAlive(): 77 self.inbox.sendNow(Message.Message(Worker.WorkerExit)) # XXX block? 78 else: 79 self.logger.warn("Skipped stop to dead %r", w) 80 self.inbox.close() # XXX race condition!
81
82 - def join(self, timeout=None):
83 """block until the pool terminates. 84 85 This function has the same behavior as L{threading.Thread.join}, see 86 its documentation. In particular, this function always returns None, 87 so you should use L{aliveCount} to see if the pool has terminated. 88 """ 89 timeout=TimeOut(timeout) 90 for w in self.workers: 91 w.join(timeout())
92
93 -class BroadcastThreadPool(ThreadPool):
94 """a L{ThreadPool} with support for broadcasting a message to all workers""" 95 96 _workerClass=Worker.PrivateBoxWorker 97
98 - def parallelBroadcast(self, payload, timeout=None):
99 """send L{payload} to all workers in parallel L{Message.Message}s 100 101 @note: this function does not check for replies 102 103 @arg payload: an object to be wrapped in a L{Message.Message} and sent 104 to all workers. The object will be L{copy.copy}'d for each message. 105 @type payload: object 106 """ 107 timeout=TimeOut(timeout) 108 for w in self.workers: 109 if w.isAlive(): 110 w.privbox.send(Message.Message(copy.copy(payload)), timeout=timeout()) 111 else: 112 self.logger.warn("Skipped parallel broadcast to dead %r", w)
113
114 - def serialBroadcast(self, payload, timeout=None, reply_box=None):
115 """send L{payload} to all workers in serial L{Message.Message}s 116 117 The function will wait for confirmation of each message before sending 118 the next one. Replies will be discarded, though exceptions will be 119 raised. Not all workers may receive a message if an exception occurs. 120 121 @arg payload: an object to be wrapped in a L{Message.Message} and sent 122 to all workers. The object will be L{copy.copy}'d for each message. 123 @type payload: object 124 125 @arg timeout: how long to wait for all replies, in seconds 126 @type timeout: float 127 """ 128 timeout=TimeOut(timeout) 129 for w in self.workers: 130 if w.isAlive(): 131 chat(copy.copy(payload), w.privbox, timeout=timeout(), reply_box=reply_box) 132 else: 133 self.logger.warn("Skipped serial broadcast to dead %r", w)
134