1 from nose.tools import *
2 from grassyknoll.tests import assert_sorted_equals
3
4 from grassyknoll.lib.meta import Factory
5
6 from grassyknoll.concurrent.ThreadPool import *
7 from grassyknoll.concurrent.Worker import *
8 from grassyknoll.concurrent.Message import *
9 from grassyknoll.concurrent.MailBox import *
10 from grassyknoll.concurrent.Wrappers import *
11 from grassyknoll.concurrent.errors import *
12 from grassyknoll.concurrent.Fault import Fault
13
14 import threading
15
16
17
18
19
20 TIMEOUT=.1
21
22 import time
25
26 @errorMessenger(ZeroDivisionError)
27 @functionMessenger()
30
45
63
82
84
85 num_workers=1
86 box_factor=2
87 kwargs={}
88
89 _ThreadPoolClass=ThreadPool
90
91 @property
94
97
105
107 cleanup_pool(self.pool)
108
109 self.reply_box.close()
110 del self.reply_box
111 del self.pool
112
114
117
119 m=self.makeMesg(10)
120 self.pool.inbox.sendNow(m)
121 r=self.reply_box.recv(TIMEOUT)
122 assert r.re_id == m.id
123 assert r.payload == 10
124
126 mesgs=[self.makeMesg(10) for i in xrange(self.box_size)]
127 for m in mesgs:
128 self.pool.inbox.sendNow(m)
129
130 replies=[self.reply_box.recv(TIMEOUT) for i in xrange(self.box_size)]
131
132 for r in replies:
133 assert r.payload == 10
134
135 assert len(set(r.id for r in replies)) == len(replies)
136
137 mesg_ids=[m.id for m in mesgs]
138 reply_ids=[r.re_id for r in replies]
139 assert_sorted_equals(mesg_ids, reply_ids)
140
142 m=self.makeMesg(0)
143 self.pool.inbox.sendNow(m)
144 r=self.reply_box.recv(TIMEOUT)
145 assert r.re_id == m.id
146 assert isinstance(r.payload, Fault)
147 assert r.payload.type is ZeroDivisionError
148
154
174
177
180
182 _ThreadPoolClass=BroadcastThreadPool
183
184 num_workers=10
185 kwargs={'inbox_timeout':.05}
186
190
191 @methodMessenger()
193 self.mylist.append(threading.currentThread().getName())
194 return True
195
198
203
210
212 N=20
213
214 @errorMessenger(ZeroDivisionError)
215 @functionMessenger()
216 def slowfunc(x):
217 sleep()
218 return 100/x
219
220 def doit(pool, reply_box):
221 for i in xrange(N):
222 pool.inbox.sendNow(Message(FunctionCall(float(i)), reply_box=reply_box))
223
224 t=time.time()
225 for i in xrange(N):
226 reply_box.recv(60)
227 t=time.time()-t
228 return t
229
230 reply_box1=ThreadMailBox("ReplyBox")
231 pool1=ThreadPool(NullContextManager.factory(func=slowfunc),
232 inbox=ThreadMailBox("PoolBox"),
233 num_workers=2)
234 pool1.start()
235
236 t1=doit(pool1, reply_box1)
237
238 reply_box2=ThreadMailBox("ReplyBox")
239 pool2=ThreadPool(NullContextManager.factory(func=slowfunc),
240 inbox=ThreadMailBox("PoolBox"),
241 num_workers=10)
242 pool2.start()
243
244 t2=doit(pool2, reply_box2)
245
246 assert t1 / t2 > 4
247
248 cleanup_pool(pool1)
249 cleanup_pool(pool2)
250