1 """Local L{ThreadPool}"""
2
3 from grassyknoll.lib.util import ensureName
4 from grassyknoll.lib.meta import AutoLogger, FactoryMixin
5 import copy
6
7 from lib import TimeOut, chat
8 import MailBox
9 import Message
10 import Worker
11
12 __all__=['ThreadPool', 'BroadcastThreadPool']
13
15 """a threadpool with L{MailBox} support
16
17 @ivar name: name of the threadpool. This is used for mailboxes, etc.
18 @type name: string
19
20 @ivar workers: the pool's L{Worker.Worker}s
21 @type workers: set
22
23 @ivar inbox: a shared inbox for the pool
24 @type inbox: L{MailBox.ThreadMailBox}
25
26 @cvar _workerClass: class of L{Worker.Worker} to create
27 @type _workerClass: class
28 """
29
30 logger=AutoLogger()
31 _workerClass=Worker.Worker
32
33 - def __init__(self, context_factory, inbox=None, num_workers=1, name=None, **kwargs):
34 """
35 Other arguments are passed L{_WorkerClass}
36
37 @arg num_workers: how many L{Worker.Worker}s to create
38 @type num_workers: int
39 """
40 name = self.name = ensureName(name)
41
42 if num_workers <= 0: raise ValueError, "num_workers must be positive"
43
44 if inbox is None:
45 inbox=MailBox.ThreadMailBox(name)
46 elif not isinstance(inbox, MailBox.MailBox):
47 raise TypeError("inbox must be MailBox, not %r"%inbox)
48 self.inbox=inbox
49
50 self.workers=[]
51 for i in xrange(num_workers):
52 w=self._workerClass(context_factory=context_factory,
53 inbox=self.inbox,
54 name="%s-%d"%(self.name, i),
55 **kwargs)
56 self.workers.append(w)
57
59 return "<%s(%s)>" % (self.__class__.__name__, self.name)
60
62 """Start the ThreadPool"""
63 for w in self.workers:
64 w.start()
65
67 """
68 @returns: number of workers still alive
69 @rtype: integer
70 """
71 return len([w for w in self.workers if w.isAlive()])
72
74 """Shutdown the threadpool and close its inbox, preventing future messages"""
75 for w in self.workers:
76 if w.isAlive():
77 self.inbox.sendNow(Message.Message(Worker.WorkerExit))
78 else:
79 self.logger.warn("Skipped stop to dead %r", w)
80 self.inbox.close()
81
82 - def join(self, timeout=None):
83 """block until the pool terminates.
84
85 This function has the same behavior as L{threading.Thread.join}, see
86 its documentation. In particular, this function always returns None,
87 so you should use L{aliveCount} to see if the pool has terminated.
88 """
89 timeout=TimeOut(timeout)
90 for w in self.workers:
91 w.join(timeout())
92
94 """a L{ThreadPool} with support for broadcasting a message to all workers"""
95
96 _workerClass=Worker.PrivateBoxWorker
97
99 """send L{payload} to all workers in parallel L{Message.Message}s
100
101 @note: this function does not check for replies
102
103 @arg payload: an object to be wrapped in a L{Message.Message} and sent
104 to all workers. The object will be L{copy.copy}'d for each message.
105 @type payload: object
106 """
107 timeout=TimeOut(timeout)
108 for w in self.workers:
109 if w.isAlive():
110 w.privbox.send(Message.Message(copy.copy(payload)), timeout=timeout())
111 else:
112 self.logger.warn("Skipped parallel broadcast to dead %r", w)
113
115 """send L{payload} to all workers in serial L{Message.Message}s
116
117 The function will wait for confirmation of each message before sending
118 the next one. Replies will be discarded, though exceptions will be
119 raised. Not all workers may receive a message if an exception occurs.
120
121 @arg payload: an object to be wrapped in a L{Message.Message} and sent
122 to all workers. The object will be L{copy.copy}'d for each message.
123 @type payload: object
124
125 @arg timeout: how long to wait for all replies, in seconds
126 @type timeout: float
127 """
128 timeout=TimeOut(timeout)
129 for w in self.workers:
130 if w.isAlive():
131 chat(copy.copy(payload), w.privbox, timeout=timeout(), reply_box=reply_box)
132 else:
133 self.logger.warn("Skipped serial broadcast to dead %r", w)
134