Package grassyknoll :: Package concurrent :: Module lib
[hide private]

Source Code for Module grassyknoll.concurrent.lib

  1  import threading 
  2  from time import time as _time 
  3   
  4  from grassyknoll.lib.meta import localProperty, LocalState 
  5   
  6  from grassyknoll.concurrent.MailBox import ThreadMailBox, MailBox 
  7  from grassyknoll.concurrent.Message import Message 
  8  from grassyknoll.concurrent.Fault import Fault 
  9  from grassyknoll.concurrent.errors import * 
 10   
 11   
 12  __all__=['thisThread', 'TimeOut', 'chat', 'talk'] 
 13   
 14  # XXX the mailboxen leak - see http://bugs.python.org/issue1868 
15 -class _ThisThread(LocalState):
16 @localProperty
17 - def mailbox(self):
18 return ThreadMailBox("ThreadLocal-%s"%threading.currentThread().getName())
19 20 thisThread=_ThisThread() 21
22 -class TimeOut(object):
23 """TimeOut helps manage timeouts that are spread across several operations. 24 25 Create it with the maximum amount of time you want to wait: 26 27 >>> import time 28 >>> timeout=TimeOut(0.5) 29 30 Calling the object will return the amount of time left: 31 >>> time.sleep(.3) 32 >>> timeout() # doctest:+SKIP 33 0.2 34 35 If you run out of time, calling a TimeOut will return zero: 36 >>> time.sleep(.3) 37 >>> timeout() 38 0.0 39 40 If you pass None as max_time, you get None back: 41 >>> timeout=TimeOut(None) 42 >>> timeout() is None 43 True 44 45 @ivar end_time: when time is up 46 @type end_time: float or None 47 """ 48 49 __slots__=['end_time'] 50
51 - def __init__(self, max_time):
52 """ 53 @arg max_time: maximum time to wait, in seconds, or None, to wait forever 54 @type max_time: float or None 55 """ 56 self.end_time=_time()+max_time if max_time is not None else None
57
58 - def __call__(self):
59 return max(0.0, self.end_time-_time()) if self.end_time is not None else None
60
61 -def talk(payload, to_box, timeout=None, reply_box=None):
62 """send a single payload and receive a single reply. 63 64 @arg payload: the payload to send 65 @type payload: object 66 67 @arg to_box: the box to send to 68 @type to_box: L{MailBox} 69 70 @arg reply_box: the box replies should be sent to. If None, use 71 L{thisThread.mailbox} 72 @type reply_box: L{MailBox} 73 74 @arg timeout: how long to in I{total} to block before raising an 75 exception, in seconds. 76 @type timeout: float 77 78 @returns: the reply 79 @rtype: L{Message.Message} 80 81 @warning: Any messages in L{reply_box} which are not in response to the 82 message sent by a call to this function will be discarded. This may 83 include late replies (ie, after a timeout or other receiving error 84 occurs). 85 """ 86 reply_box = reply_box if reply_box is not None else thisThread.mailbox 87 88 assert isinstance(reply_box, MailBox) 89 90 ## note that this function *always* blocks for some amount of time, since 91 ## it must take non-zero time to calculate a reply. 92 if timeout == 0.0: 93 raise TimeoutError() 94 else: 95 timeout=TimeOut(timeout) 96 97 mesg=Message(payload, reply_box=reply_box) 98 to_box.send(mesg, timeout=timeout()) 99 100 ## the while loops below discard messages from reply_box until we find one 101 ## that's in response to mesg. A naive alternative approach would be to 102 ## create a new box for each call. Doing so would have the same behavior, 103 ## but with much more object allocation. 104 105 reply=reply_box.recv(timeout=timeout()) 106 while reply.re_id != mesg.id: 107 reply=reply_box.recv(timeout=timeout()) 108 109 return reply
110
111 -def chat(payload, to_box, timeout=None, reply_box=None):
112 """like L{talk}, but unpack the reply and return its payload 113 114 @returns: the reply's payload 115 @rtype: object 116 117 @raises L{Exception}: if the reply's payload is a L{Fault.Fault}, the 118 L{underlying exception<Fault.Fault.Value>} is raised. 119 120 @raises L{ConcurrentError}: if the reply's payload is a ConcurrentError, 121 it is raised. 122 """ 123 reply=talk(payload, to_box, timeout, reply_box) 124 125 if isinstance(reply.payload, Fault): 126 raise reply.payload.value 127 elif isinstance(reply.payload, ConcurrentError): 128 raise reply.payload 129 else: 130 return reply.payload
131