Package grassyknoll :: Package concurrent :: Module Worker
[hide private]

Source Code for Module grassyknoll.concurrent.Worker

  1  """L{Worker} threads 
  2   
  3  I was so tempted to call this module B{WageSlave} it's not even funny. 
  4  """ 
  5  from __future__ import with_statement 
  6   
  7  import threading 
  8  import logging 
  9  import random 
 10  import socket 
 11  import sys 
 12   
 13  from grassyknoll.lib.util import ensureName 
 14  from grassyknoll.lib.meta import AutoLogger 
 15   
 16  from Message import Message 
 17  import MailBox 
 18  from errors import * 
 19   
 20  __all__=['Worker', 'PrivateBoxWorker', 'WorkerExit', 'ContextExit'] 
 21   
 22  ## unhandled errors that aren't disastrous. Include socket.error because of a 
 23  ## bug in python itself: 
 24  ## http://mail.python.org/pipermail/python-list/2007-April/437253.html 
 25  CATCH_ERRORS=(StandardError, socket.error) 
 26   
 27  ## disastruous errors that inherit from StandardError for some reason 
 28  FATAL_ERRORS=(SystemError, MemoryError) 
 29   
30 -class WorkerExit(object):
31 """singleton indicating that a L{Worker} should shutdown 32 33 Do not create instances of WorkerExit, use the class directly instead. 34 """ 35
36 - def __init__(self):
37 raise RuntimeError, "do not create instances of WorkerExit, use class instead"
38
39 -class ContextExit(object):
40 """singleton indicating that a new context should be created 41 42 Do not create instances of ContextExit, use the class directly instead. 43 """ 44
45 - def __init__(self):
46 raise RuntimeError, "do not create instances of ContextExit, use class instead"
47
48 -class Worker(threading.Thread):
49 """a worker L{threading.Thread} with L{MailBox.MailBox} support 50 51 @ivar context_factory: factory returning a context manager. See module 52 docs. 53 @arg context_factory: factory 54 55 @ivar inbox: shared source of L{Message}s 56 @type inbox: L{MailBox.MailBox} 57 58 @ivar unhandled: one of C{die, newcontext, keeprunning}, indicating how 59 uncaught exceptions should be handled. 60 @type unhandled: string 61 62 @ivar context: the in-use context. For internal use/debugging. 63 @type context: context manager 64 65 @ivar newcontext: should new contexts be created? When False, the thread 66 exits. For internal use. 67 @type newcontext: bool 68 69 @note: Using L{ThreadPool} is preferred, even with only a single worker. 70 """ 71 72 logger=AutoLogger() 73
74 - def __init__(self, inbox, context_factory, name=None, verbose=None, 75 daemon=False, unhandled="die"):
76 super(Worker, self).__init__(name=ensureName(name), verbose=verbose) 77 assert isinstance(inbox, MailBox.MailBox) 78 self.inbox=inbox 79 self.setDaemon(daemon) 80 81 self.context_factory=context_factory 82 83 ## these are used in various states of run() and it's implementing 84 ## functions 85 self.context=None 86 self.newcontext=False 87 88 if unhandled in ('die', 'newcontext', 'keeprunning'): 89 self.unhandled=unhandled 90 else: 91 raise ValueError("Unknown unhandled %r"%unhandled)
92
93 - def checkMessages(self):
94 """check for new L{Message}s for this worker 95 96 @returns: a L{Message} and the the L{MailBox.MailBox} it came 97 from. 98 @rtype: tuple 99 """ 100 return self.inbox.recv(), self.inbox
101
102 - def run(self):
103 """do the work; do not call directly - try L{start}() instead""" 104 105 ## as long as newcontext is True, repeatedly create new contexts. The 106 ## worker exits when newcontext is False. 107 self.newcontext = True 108 while self.newcontext: 109 110 ## create a new context 111 assert self.context is None 112 self.context = self.context_factory() 113 114 with self.context as func: 115 ## as long as self.context is not None, repeatedly process new 116 ## messages with the same context. 117 while self.context is not None: 118 119 ## get a mesg 120 try: 121 mesg, from_box = self.checkMessages() 122 except ReceiveError: 123 self.logger.warn("Error checking messages", exc_info=True) 124 continue 125 126 ## check for shutdown/new context 127 if mesg.payload in (WorkerExit, ContextExit): 128 reply = self.doExits(mesg) 129 130 ## normal message, calculate the reply 131 else: 132 reply = self.callFunc(func, mesg) 133 134 ## mark message as finished 135 from_box.done() 136 137 ## send reply 138 if isinstance(reply, Message): 139 self.sendReply(mesg, reply) 140 elif reply is None: 141 continue 142 else: 143 logger.warning("Unknown reply %r", reply) 144 continue 145 146 # while self.context # 147 assert self.context is None 148 # with self.context # 149 # while self.newcontext # 150 assert not self.newcontext
151
152 - def doExits(self, mesg):
153 """handle L{WorkerExit} and L{ContextExit}. For internal use. 154 155 @arg mesg: a L{Message} with L{WorkerExit} or L{ContextExit} 156 as a payload. 157 @type mesg: L{Message} 158 159 @returns: a L{Message} with True as a payload 160 @rtype: L{Message} 161 """ 162 if mesg.payload is WorkerExit: 163 self.newcontext = False 164 self.context = None 165 return Message(True, re_id=mesg.id) 166 elif mesg.payload is ContextExit: 167 self.context = None 168 return Message(True, re_id=mesg.id) 169 else: 170 assert False, "bad payload %r"%mesg.payload
171
172 - def callFunc(self, func, mesg):
173 """Call the current L{func} with a L{mesg}. For internal use. 174 175 @arg mesg: the current L{Message} 176 @type mesg: L{Message} 177 178 @arg func: the function from the current context, that returns a L{Message} 179 @type func: callable 180 181 @returns: the L{Message} returned by L{func}. In the event of 182 an unhandled exception, may return a message with a L{CrashError} as a 183 payload. See L{unhandled}. 184 @rtype: L{Message} 185 """ 186 187 try: 188 reply = func(mesg) 189 190 ## a serious unhandled exception. Re-raise so Worker dies. 191 ## Don't even bother to reply, as the entire program is 192 ## about to go boom. 193 except FATAL_ERRORS, e: 194 print >> sys.stderr, "Fatal exception", e 195 raise 196 197 ## an unhandled exception we can deal with 198 except CATCH_ERRORS, e: 199 self.logger.error("Unhandled exception", exc_info=True) 200 reply = Message(CrashError(*sys.exc_info()), re_id=mesg.id) 201 202 if self.unhandled == 'die': 203 self.newcontext = False 204 self.context = None 205 elif self.unhandled == 'newcontext': 206 self.context = None 207 elif self.unhandled == 'keeprunning': 208 pass 209 else: 210 assert False, "unreachable" 211 212 return reply
213
214 - def sendReply(self, mesg, reply):
215 """send a L{reply} to L{mesg}. For internal use. 216 217 @arg mesg: the original L{Message} 218 @type mesg: L{Message} 219 220 @arg reply: a reply to L{mesg} 221 @type reply: L{Message} 222 """ 223 if mesg.reply_box is not None: 224 try: 225 mesg.reply_box.sendNow(reply) # XXX block here? 226 except SendError, e: 227 self.logger.error("Error sending reply to %r: %r", 228 mesg.reply_box, e) 229 else: 230 self.logger.info("Discarding reply %r", reply)
231
232 -class PrivateBoxWorker(Worker):
233 """A L{Worker} with a private L{MailBox}. 234 235 @ivar privbox: a private box for this thread. 236 @type privbox: L{MailBox.ThreadMailBox} 237 238 @ivar inbox_timeout: how long, in seconds, to block on an empty inbox 239 before trying privbox again. The actual wait time will be +/- 25%. 240 @type inbox_timeout: float 241 """ 242
243 - def __init__(self, inbox_timeout=15, *args, **kwargs):
244 super(PrivateBoxWorker, self).__init__(*args, **kwargs) 245 self.inbox_timeout=inbox_timeout 246 self.privbox=MailBox.ThreadMailBox(name=self.getName())
247
248 - def __timeout(self):
249 return self.inbox_timeout*random.uniform(.75, 1.25)
250
251 - def checkMessages(self):
252 while 1: 253 ## check privbox non-blocking 254 try: 255 return self.privbox.recvNow(), self.privbox 256 257 ## no private mesg 258 except BoxEmptyError, e: 259 260 ## check inbox. Timeout so that we try privbox eventually. 261 ## This has the worst latency under *low* load. 262 try: 263 return self.inbox.recv(timeout=self.__timeout()), self.inbox 264 265 ## no inbox mesg 266 except BoxEmptyError: 267 continue
268