Package grassyknoll :: Package concurrent :: Module MailBox
[hide private]

Source Code for Module grassyknoll.concurrent.MailBox

  1  """contains L{MailBox} and implementations""" 
  2  from grassyknoll.lib.util import ensureName 
  3  from grassyknoll.lib import ClosingQueue 
  4   
  5  from errors import * 
  6  import Message 
  7  import Fault 
  8   
  9  __all__=['MailBox', 'ThreadMailBox'] 
 10   
11 -class MailBox(object):
12 """abstract base class representing a destination for L{Message.Message}s 13 14 Timeouts are interpreted as follows: 15 16 - C{None}: No timeout. Block indefinitely. 17 - C{0.0}: Non-blocking. Return or raise immediately. 18 - C{XX}: Block for up to XX seconds. 19 20 @ivar name: a human-readable name for this box. May be None. 21 @type name: string 22 """ 23 __slots__=['name'] 24
25 - def __init__(self, name=None):
26 self.name=ensureName(name)
27
28 - def __repr__(self):
29 return "<%s(%r)>" % (self.__class__.__name__, self.name)
30
31 - def send(self, mesg, timeout=None):
32 """send message to this box 33 34 @arg mesg: the message to send 35 @type mesg: L{Message.Message} 36 37 @arg timeout: how long to block sending before raising an exception, 38 in seconds. See class docstring. 39 @type timeout: float 40 41 @raises SendError: on any problem sending mesg 42 @raises BoxFullError: when the box has no more room for new messages. 43 """ 44 raise NotImplementedError
45
46 - def recv(self, timeout=None):
47 """receive a message from this box 48 49 @returns: a message that was sent to this box 50 @rtype mesg: L{Message.Message} 51 52 @arg timeout: how long to block receiving before raising an exception, 53 in seconds. See class docstring. 54 @type timeout: float 55 56 @raises RecieveError: on any problem recieving a message 57 @raises BoxEmptyError: when the box has no new messages. 58 """ 59 raise NotImplementedError
60
61 - def sendNow(self, mesg):
62 """send message non-blocking (ie, timeout=None). See L{send}""" 63 return self.send(mesg, timeout=0.0)
64
65 - def recvNow(self):
66 """receive message non-blocking (ie, timeout=None). See L{send}""" 67 return self.recv(timeout=0.0)
68
69 - def __len__(self):
70 raise NotImplementedError
71
72 -class ThreadMailBox(MailBox):
73 """a L{MailBox} for use with threads 74 75 @ivar queue: a queue of L{Message.Message}s 76 @type queue: L{ClosingQueue.ClosingQueue} 77 """ 78 79 __slots__=['queue'] 80
81 - def __init__(self, name=None, qsize=None):
82 super(ThreadMailBox, self).__init__(name) 83 self.queue=ClosingQueue.ClosingQueue(qsize)
84
85 - def send(self, mesg, timeout=None):
86 if not isinstance(mesg, Message.Message): 87 raise TypeError("mesg must be Message, not %r"%mesg) 88 89 block = not timeout == 0.0 90 try: 91 self.queue.put(mesg, block, timeout) 92 except ClosingQueue.Full: 93 raise BoxFullError(mesg) 94 except ClosingQueue.Closed: 95 raise BoxClosedError(mesg)
96
97 - def recv(self, timeout=None):
98 block = not timeout == 0.0 99 try: 100 mesg=self.queue.get(block, timeout) 101 except ClosingQueue.Empty: 102 raise BoxEmptyError 103 else: 104 assert isinstance(mesg, Message.Message) 105 return mesg
106
107 - def done(self):
108 self.queue.task_done()
109
110 - def close(self):
111 self.queue.close()
112
113 - def join(self):
114 self.queue.join()
115
116 - def __len__(self):
117 return self.queue.qsize()
118