1 """L{Worker} threads
2
3 I was so tempted to call this module B{WageSlave} it's not even funny.
4 """
5 from __future__ import with_statement
6
7 import threading
8 import logging
9 import random
10 import socket
11 import sys
12
13 from grassyknoll.lib.util import ensureName
14 from grassyknoll.lib.meta import AutoLogger
15
16 from Message import Message
17 import MailBox
18 from errors import *
19
20 __all__=['Worker', 'PrivateBoxWorker', 'WorkerExit', 'ContextExit']
21
22
23
24
25 CATCH_ERRORS=(StandardError, socket.error)
26
27
28 FATAL_ERRORS=(SystemError, MemoryError)
29
31 """singleton indicating that a L{Worker} should shutdown
32
33 Do not create instances of WorkerExit, use the class directly instead.
34 """
35
37 raise RuntimeError, "do not create instances of WorkerExit, use class instead"
38
39 -class ContextExit(object):
40 """singleton indicating that a new context should be created
41
42 Do not create instances of ContextExit, use the class directly instead.
43 """
44
46 raise RuntimeError, "do not create instances of ContextExit, use class instead"
47
48 -class Worker(threading.Thread):
49 """a worker L{threading.Thread} with L{MailBox.MailBox} support
50
51 @ivar context_factory: factory returning a context manager. See module
52 docs.
53 @arg context_factory: factory
54
55 @ivar inbox: shared source of L{Message}s
56 @type inbox: L{MailBox.MailBox}
57
58 @ivar unhandled: one of C{die, newcontext, keeprunning}, indicating how
59 uncaught exceptions should be handled.
60 @type unhandled: string
61
62 @ivar context: the in-use context. For internal use/debugging.
63 @type context: context manager
64
65 @ivar newcontext: should new contexts be created? When False, the thread
66 exits. For internal use.
67 @type newcontext: bool
68
69 @note: Using L{ThreadPool} is preferred, even with only a single worker.
70 """
71
72 logger=AutoLogger()
73
74 - def __init__(self, inbox, context_factory, name=None, verbose=None,
75 daemon=False, unhandled="die"):
76 super(Worker, self).__init__(name=ensureName(name), verbose=verbose)
77 assert isinstance(inbox, MailBox.MailBox)
78 self.inbox=inbox
79 self.setDaemon(daemon)
80
81 self.context_factory=context_factory
82
83
84
85 self.context=None
86 self.newcontext=False
87
88 if unhandled in ('die', 'newcontext', 'keeprunning'):
89 self.unhandled=unhandled
90 else:
91 raise ValueError("Unknown unhandled %r"%unhandled)
92
94 """check for new L{Message}s for this worker
95
96 @returns: a L{Message} and the the L{MailBox.MailBox} it came
97 from.
98 @rtype: tuple
99 """
100 return self.inbox.recv(), self.inbox
101
103 """do the work; do not call directly - try L{start}() instead"""
104
105
106
107 self.newcontext = True
108 while self.newcontext:
109
110
111 assert self.context is None
112 self.context = self.context_factory()
113
114 with self.context as func:
115
116
117 while self.context is not None:
118
119
120 try:
121 mesg, from_box = self.checkMessages()
122 except ReceiveError:
123 self.logger.warn("Error checking messages", exc_info=True)
124 continue
125
126
127 if mesg.payload in (WorkerExit, ContextExit):
128 reply = self.doExits(mesg)
129
130
131 else:
132 reply = self.callFunc(func, mesg)
133
134
135 from_box.done()
136
137
138 if isinstance(reply, Message):
139 self.sendReply(mesg, reply)
140 elif reply is None:
141 continue
142 else:
143 logger.warning("Unknown reply %r", reply)
144 continue
145
146
147 assert self.context is None
148
149
150 assert not self.newcontext
151
153 """handle L{WorkerExit} and L{ContextExit}. For internal use.
154
155 @arg mesg: a L{Message} with L{WorkerExit} or L{ContextExit}
156 as a payload.
157 @type mesg: L{Message}
158
159 @returns: a L{Message} with True as a payload
160 @rtype: L{Message}
161 """
162 if mesg.payload is WorkerExit:
163 self.newcontext = False
164 self.context = None
165 return Message(True, re_id=mesg.id)
166 elif mesg.payload is ContextExit:
167 self.context = None
168 return Message(True, re_id=mesg.id)
169 else:
170 assert False, "bad payload %r"%mesg.payload
171
173 """Call the current L{func} with a L{mesg}. For internal use.
174
175 @arg mesg: the current L{Message}
176 @type mesg: L{Message}
177
178 @arg func: the function from the current context, that returns a L{Message}
179 @type func: callable
180
181 @returns: the L{Message} returned by L{func}. In the event of
182 an unhandled exception, may return a message with a L{CrashError} as a
183 payload. See L{unhandled}.
184 @rtype: L{Message}
185 """
186
187 try:
188 reply = func(mesg)
189
190
191
192
193 except FATAL_ERRORS, e:
194 print >> sys.stderr, "Fatal exception", e
195 raise
196
197
198 except CATCH_ERRORS, e:
199 self.logger.error("Unhandled exception", exc_info=True)
200 reply = Message(CrashError(*sys.exc_info()), re_id=mesg.id)
201
202 if self.unhandled == 'die':
203 self.newcontext = False
204 self.context = None
205 elif self.unhandled == 'newcontext':
206 self.context = None
207 elif self.unhandled == 'keeprunning':
208 pass
209 else:
210 assert False, "unreachable"
211
212 return reply
213
215 """send a L{reply} to L{mesg}. For internal use.
216
217 @arg mesg: the original L{Message}
218 @type mesg: L{Message}
219
220 @arg reply: a reply to L{mesg}
221 @type reply: L{Message}
222 """
223 if mesg.reply_box is not None:
224 try:
225 mesg.reply_box.sendNow(reply)
226 except SendError, e:
227 self.logger.error("Error sending reply to %r: %r",
228 mesg.reply_box, e)
229 else:
230 self.logger.info("Discarding reply %r", reply)
231
233 """A L{Worker} with a private L{MailBox}.
234
235 @ivar privbox: a private box for this thread.
236 @type privbox: L{MailBox.ThreadMailBox}
237
238 @ivar inbox_timeout: how long, in seconds, to block on an empty inbox
239 before trying privbox again. The actual wait time will be +/- 25%.
240 @type inbox_timeout: float
241 """
242
243 - def __init__(self, inbox_timeout=15, *args, **kwargs):
247
249 return self.inbox_timeout*random.uniform(.75, 1.25)
250
252 while 1:
253
254 try:
255 return self.privbox.recvNow(), self.privbox
256
257
258 except BoxEmptyError, e:
259
260
261
262 try:
263 return self.inbox.recv(timeout=self.__timeout()), self.inbox
264
265
266 except BoxEmptyError:
267 continue
268