1 import threading
2 from time import time as _time
3
4 from grassyknoll.lib.meta import localProperty, LocalState
5
6 from grassyknoll.concurrent.MailBox import ThreadMailBox, MailBox
7 from grassyknoll.concurrent.Message import Message
8 from grassyknoll.concurrent.Fault import Fault
9 from grassyknoll.concurrent.errors import *
10
11
12 __all__=['thisThread', 'TimeOut', 'chat', 'talk']
13
14
16 @localProperty
18 return ThreadMailBox("ThreadLocal-%s"%threading.currentThread().getName())
19
20 thisThread=_ThisThread()
21
23 """TimeOut helps manage timeouts that are spread across several operations.
24
25 Create it with the maximum amount of time you want to wait:
26
27 >>> import time
28 >>> timeout=TimeOut(0.5)
29
30 Calling the object will return the amount of time left:
31 >>> time.sleep(.3)
32 >>> timeout() # doctest:+SKIP
33 0.2
34
35 If you run out of time, calling a TimeOut will return zero:
36 >>> time.sleep(.3)
37 >>> timeout()
38 0.0
39
40 If you pass None as max_time, you get None back:
41 >>> timeout=TimeOut(None)
42 >>> timeout() is None
43 True
44
45 @ivar end_time: when time is up
46 @type end_time: float or None
47 """
48
49 __slots__=['end_time']
50
52 """
53 @arg max_time: maximum time to wait, in seconds, or None, to wait forever
54 @type max_time: float or None
55 """
56 self.end_time=_time()+max_time if max_time is not None else None
57
59 return max(0.0, self.end_time-_time()) if self.end_time is not None else None
60
61 -def talk(payload, to_box, timeout=None, reply_box=None):
62 """send a single payload and receive a single reply.
63
64 @arg payload: the payload to send
65 @type payload: object
66
67 @arg to_box: the box to send to
68 @type to_box: L{MailBox}
69
70 @arg reply_box: the box replies should be sent to. If None, use
71 L{thisThread.mailbox}
72 @type reply_box: L{MailBox}
73
74 @arg timeout: how long to in I{total} to block before raising an
75 exception, in seconds.
76 @type timeout: float
77
78 @returns: the reply
79 @rtype: L{Message.Message}
80
81 @warning: Any messages in L{reply_box} which are not in response to the
82 message sent by a call to this function will be discarded. This may
83 include late replies (ie, after a timeout or other receiving error
84 occurs).
85 """
86 reply_box = reply_box if reply_box is not None else thisThread.mailbox
87
88 assert isinstance(reply_box, MailBox)
89
90
91
92 if timeout == 0.0:
93 raise TimeoutError()
94 else:
95 timeout=TimeOut(timeout)
96
97 mesg=Message(payload, reply_box=reply_box)
98 to_box.send(mesg, timeout=timeout())
99
100
101
102
103
104
105 reply=reply_box.recv(timeout=timeout())
106 while reply.re_id != mesg.id:
107 reply=reply_box.recv(timeout=timeout())
108
109 return reply
110
111 -def chat(payload, to_box, timeout=None, reply_box=None):
112 """like L{talk}, but unpack the reply and return its payload
113
114 @returns: the reply's payload
115 @rtype: object
116
117 @raises L{Exception}: if the reply's payload is a L{Fault.Fault}, the
118 L{underlying exception<Fault.Fault.Value>} is raised.
119
120 @raises L{ConcurrentError}: if the reply's payload is a ConcurrentError,
121 it is raised.
122 """
123 reply=talk(payload, to_box, timeout, reply_box)
124
125 if isinstance(reply.payload, Fault):
126 raise reply.payload.value
127 elif isinstance(reply.payload, ConcurrentError):
128 raise reply.payload
129 else:
130 return reply.payload
131