Package grassyknoll :: Package tests :: Package test_concurrent :: Module test_ThreadPool
[hide private]

Source Code for Module grassyknoll.tests.test_concurrent.test_ThreadPool

  1  from nose.tools import * 
  2  from grassyknoll.tests import assert_sorted_equals 
  3   
  4  from grassyknoll.lib.meta import Factory 
  5   
  6  from grassyknoll.concurrent.ThreadPool import * 
  7  from grassyknoll.concurrent.Worker import * 
  8  from grassyknoll.concurrent.Message import * 
  9  from grassyknoll.concurrent.MailBox import * 
 10  from grassyknoll.concurrent.Wrappers import * 
 11  from grassyknoll.concurrent.errors import * 
 12  from grassyknoll.concurrent.Fault import Fault 
 13   
 14  import threading 
 15   
 16  #import logging; 
 17  #logging.basicConfig(level=logging.WARN, 
 18                      #format="%(levelname)s:%(name)s:%(threadName)s:%(message)s") 
 19   
 20  TIMEOUT=.1 
 21   
 22  import time 
23 -def sleep():
24 time.sleep(TIMEOUT)
25 26 @errorMessenger(ZeroDivisionError) 27 @functionMessenger()
28 -def myfunc(x):
29 return 100/x
30
31 -def cleanup_pool(pool):
32 # clear out the inbox 33 while 1: 34 try: 35 pool.inbox.recvNow() 36 except ReceiveError: 37 break 38 39 try: 40 pool.stop() 41 except ConcurrentError: 42 pass 43 else: 44 pool.join(TIMEOUT)
45
46 -def test_pool_start_stop():
47 pool=ThreadPool(NullContextManager.factory(func=myfunc), 48 num_workers=3) 49 50 assert len(pool.workers) == 3 51 assert pool.aliveCount() == 0 52 pool.start() 53 assert pool.aliveCount() == 3 54 55 pool.stop() 56 pool.join(TIMEOUT) 57 assert pool.aliveCount() == 0 58 59 ## stopping twice should be cool 60 pool.stop() 61 pool.join(TIMEOUT) 62 assert pool.aliveCount() == 0
63
64 -def test_pool_start_stop_with_dead():
65 pool=ThreadPool(NullContextManager.factory(func=myfunc), 66 num_workers=3) 67 68 assert len(pool.workers) == 3 69 assert pool.aliveCount() == 0 70 pool.start() 71 assert pool.aliveCount() == 3 72 73 # kill a thread 74 pool.inbox.sendNow(Message(WorkerExit)) 75 sleep() 76 assert len(pool.workers) == 3 77 assert pool.aliveCount() == 2 78 79 pool.stop() 80 pool.join(TIMEOUT) 81 assert pool.aliveCount() == 0
82
83 -class BaseTestPool(object):
84 85 num_workers=1 86 box_factor=2 87 kwargs={} 88 89 _ThreadPoolClass=ThreadPool 90 91 @property
92 - def box_size(self):
93 return self.num_workers * self.box_factor
94
95 - def getFunc(self):
96 return myfunc
97
98 - def setUp(self):
99 self.reply_box=ThreadMailBox("ReplyBox", self.box_size) 100 self.pool=self._ThreadPoolClass(NullContextManager.factory(func=self.getFunc()), 101 inbox=ThreadMailBox("PoolBox", self.box_size), 102 num_workers=self.num_workers, 103 **self.kwargs) 104 self.pool.start()
105
106 - def tearDown(self):
107 cleanup_pool(self.pool) 108 109 self.reply_box.close() 110 del self.reply_box 111 del self.pool
112
113 -class TestPool(BaseTestPool):
114
115 - def makeMesg(self, x):
116 return Message(FunctionCall(x), reply_box=self.reply_box)
117
118 - def test_basic(self):
119 m=self.makeMesg(10) 120 self.pool.inbox.sendNow(m) 121 r=self.reply_box.recv(TIMEOUT) 122 assert r.re_id == m.id 123 assert r.payload == 10
124
125 - def test_several_messages(self):
126 mesgs=[self.makeMesg(10) for i in xrange(self.box_size)] 127 for m in mesgs: 128 self.pool.inbox.sendNow(m) 129 130 replies=[self.reply_box.recv(TIMEOUT) for i in xrange(self.box_size)] 131 132 for r in replies: 133 assert r.payload == 10 134 135 assert len(set(r.id for r in replies)) == len(replies) 136 137 mesg_ids=[m.id for m in mesgs] 138 reply_ids=[r.re_id for r in replies] 139 assert_sorted_equals(mesg_ids, reply_ids)
140
141 - def test_error(self):
142 m=self.makeMesg(0) 143 self.pool.inbox.sendNow(m) 144 r=self.reply_box.recv(TIMEOUT) 145 assert r.re_id == m.id 146 assert isinstance(r.payload, Fault) 147 assert r.payload.type is ZeroDivisionError
148
149 - def test_noreply(self):
150 m=Message(FunctionCall(10)) 151 self.pool.inbox.sendNow(m) 152 assert_raises(BoxEmptyError, self.reply_box.recv, timeout=TIMEOUT) 153 assert self.pool.aliveCount() == self.num_workers
154
155 - def test_reply_box_full(self):
156 # fill up reply box 157 junks=[self.makeMesg(42) for i in xrange(self.box_size)] 158 159 for j in junks: 160 self.reply_box.sendNow(j) 161 162 m1=self.makeMesg(25) 163 self.pool.inbox.sendNow(m1) 164 165 sleep() # so that reply_box is full when worker runs 166 167 for j in junks: 168 r=self.reply_box.recvNow() 169 assert r is j 170 assert self.pool.aliveCount() == self.num_workers 171 172 assert_raises(BoxEmptyError, self.reply_box.recv, TIMEOUT) 173 assert self.pool.aliveCount() == self.num_workers
174
175 -class TestSeveralWorkersPool(TestPool):
176 num_workers=3
177
178 -class TestLotsWorkersPool(TestPool):
179 num_workers=100
180
181 -class TestBroadcastPool(BaseTestPool):
182 _ThreadPoolClass=BroadcastThreadPool 183 184 num_workers=10 185 kwargs={'inbox_timeout':.05} 186
187 - def setUp(self):
188 self.mylist=[] 189 super(TestBroadcastPool, self).setUp()
190 191 @methodMessenger()
192 - def appendList(self, x):
193 self.mylist.append(threading.currentThread().getName()) 194 return True
195
196 - def getFunc(self):
197 return self.appendList
198
199 - def test_parallel_broadcast(self):
200 self.pool.parallelBroadcast(FunctionCall(True), TIMEOUT) 201 sleep() 202 assert_sorted_equals(self.mylist, [w.getName() for w in self.pool.workers])
203
204 - def test_serial_broadcast(self):
205 self.pool.serialBroadcast(FunctionCall(True), 206 TIMEOUT * self.num_workers, 207 self.reply_box) 208 sleep() 209 assert_sorted_equals(self.mylist, [w.getName() for w in self.pool.workers])
210
211 -def test_speed():
212 N=20 213 214 @errorMessenger(ZeroDivisionError) 215 @functionMessenger() 216 def slowfunc(x): 217 sleep() 218 return 100/x
219 220 def doit(pool, reply_box): 221 for i in xrange(N): 222 pool.inbox.sendNow(Message(FunctionCall(float(i)), reply_box=reply_box)) 223 224 t=time.time() 225 for i in xrange(N): 226 reply_box.recv(60) 227 t=time.time()-t 228 return t 229 230 reply_box1=ThreadMailBox("ReplyBox") 231 pool1=ThreadPool(NullContextManager.factory(func=slowfunc), 232 inbox=ThreadMailBox("PoolBox"), 233 num_workers=2) 234 pool1.start() 235 236 t1=doit(pool1, reply_box1) 237 238 reply_box2=ThreadMailBox("ReplyBox") 239 pool2=ThreadPool(NullContextManager.factory(func=slowfunc), 240 inbox=ThreadMailBox("PoolBox"), 241 num_workers=10) 242 pool2.start() 243 244 t2=doit(pool2, reply_box2) 245 246 assert t1 / t2 > 4 # allow some overhead 247 248 cleanup_pool(pool1) 249 cleanup_pool(pool2) 250