Package grassyknoll :: Package tests :: Package test_concurrent :: Module test_Worker
[hide private]

Source Code for Module grassyknoll.tests.test_concurrent.test_Worker

  1  from nose.tools import * 
  2  from grassyknoll.lib.meta import Factory 
  3   
  4  from grassyknoll.concurrent.Worker import * 
  5  from grassyknoll.concurrent.Message import * 
  6  from grassyknoll.concurrent.MailBox import * 
  7  from grassyknoll.concurrent.Wrappers import * 
  8  from grassyknoll.concurrent.errors import * 
  9  from grassyknoll.concurrent.Fault import Fault 
 10   
 11  #import logging; logging.basicConfig() 
 12   
 13  TIMEOUT=.1 
 14   
 15  import time 
16 -def sleep():
17 time.sleep(TIMEOUT)
18 19 @errorMessenger(ZeroDivisionError) 20 @functionMessenger()
21 -def myfunc(x):
22 return 100/x
23
24 -def test_worker_exit():
25 box=ThreadMailBox() 26 27 worker=Worker(ThreadMailBox("WorkerBox", 1), 28 NullContextManager.factory(func=myfunc)) 29 worker.start() 30 m=Message(WorkerExit, reply_box=box) 31 worker.inbox.sendNow(m) 32 r=box.recv(TIMEOUT) 33 assert isinstance(r.payload, bool) 34 assert r.payload 35 assert r.re_id == m.id 36 assert not worker.isAlive() 37 assert worker.context is None 38 assert not worker.newcontext
39
40 -def test_context_exit():
41 box=ThreadMailBox() 42 try: 43 worker=Worker(ThreadMailBox("WorkerBox", 1), 44 NullContextManager.factory(func=myfunc)) 45 worker.start() 46 sleep() # let context get set up 47 assert worker.context is not None 48 context=worker.context 49 m=Message(ContextExit, reply_box=box) 50 worker.inbox.sendNow(m) 51 r=box.recv(TIMEOUT) 52 assert isinstance(r.payload, bool) 53 assert r.payload 54 assert r.re_id == m.id 55 assert worker.isAlive() 56 assert worker.context is not None 57 assert worker.context is not context 58 assert worker.newcontext 59 finally: 60 worker.inbox.sendNow(Message(WorkerExit))
61
62 -class BaseTestWorker(object):
63 64 unhandled=None # define in subclasses 65
66 - def setUp(self):
67 self.reply_box=ThreadMailBox("ReplyBox", 1) 68 self.worker=Worker(ThreadMailBox("WorkerBox", 1), 69 NullContextManager.factory(func=myfunc), 70 unhandled=self.unhandled) 71 self.worker.start()
72
73 - def tearDown(self):
74 # clear out the inbox 75 while 1: 76 try: 77 self.worker.inbox.recvNow() 78 except ReceiveError: 79 break 80 81 try: 82 self.worker.inbox.sendNow(Message(WorkerExit)) 83 except ConcurrentError: 84 pass 85 86 self.reply_box.close() 87 del self.reply_box 88 del self.worker
89
90 - def makeMesg(self, x):
91 return Message(FunctionCall(x), reply_box=self.reply_box)
92
93 - def test_basic(self):
94 m=self.makeMesg(10) 95 self.worker.inbox.sendNow(m) 96 r=self.reply_box.recv(TIMEOUT) 97 assert r.re_id == m.id 98 assert r.payload == 10
99
100 -class TestWorker(BaseTestWorker):
101 unhandled="die" 102
103 - def test_error(self):
104 m=self.makeMesg(0) 105 self.worker.inbox.sendNow(m) 106 r=self.reply_box.recv(TIMEOUT) 107 assert r.re_id == m.id 108 assert isinstance(r.payload, Fault) 109 assert r.payload.type is ZeroDivisionError
110
111 - def test_noreply(self):
112 m=Message(FunctionCall(10)) 113 self.worker.inbox.sendNow(m) 114 assert_raises(BoxEmptyError, self.reply_box.recv, timeout=TIMEOUT) 115 assert self.worker.isAlive()
116
117 - def test_reply_box_full(self):
118 # fill up reply box 119 junk=self.makeMesg(42) 120 self.reply_box.sendNow(junk) 121 122 m1=self.makeMesg(25) 123 self.worker.inbox.sendNow(m1) 124 125 sleep() # so that reply_box is full when worker runs 126 127 r=self.reply_box.recvNow() 128 assert r is junk 129 assert self.worker.isAlive() 130 131 assert_raises(BoxEmptyError, self.reply_box.recv, TIMEOUT) 132 assert self.worker.isAlive()
133
134 -class TestDieWorker(BaseTestWorker):
135 136 unhandled="die" 137
138 - def test_crash(self):
139 m=self.makeMesg("aaaaa") 140 self.worker.inbox.sendNow(m) 141 r=self.reply_box.recv(TIMEOUT) 142 assert r.re_id == m.id 143 assert isinstance(r.payload, CrashError) 144 assert r.payload.type is TypeError 145 assert not self.worker.isAlive() 146 assert self.worker.context is None
147
148 -class TestKeepRunningWorker(BaseTestWorker):
149 150 unhandled="keeprunning" 151
152 - def test_crash(self):
153 sleep() # make sure worker gets started 154 context=self.worker.context 155 m=self.makeMesg("aaaaa") 156 self.worker.inbox.sendNow(m) 157 r=self.reply_box.recv(TIMEOUT) 158 assert r.re_id == m.id 159 assert isinstance(r.payload, CrashError) 160 assert r.payload.type is TypeError 161 assert self.worker.isAlive() 162 assert self.worker.context is context 163 164 self.test_basic()
165
166 -class TestNewContextWorker(BaseTestWorker):
167 168 unhandled="newcontext" 169
170 - def test_crash(self):
171 sleep() # make sure worker gets started 172 context=self.worker.context 173 m=self.makeMesg("aaaaa") 174 self.worker.inbox.sendNow(m) 175 r=self.reply_box.recv(TIMEOUT) 176 assert r.re_id == m.id 177 assert isinstance(r.payload, CrashError) 178 assert r.payload.type is TypeError 179 assert self.worker.isAlive() 180 assert self.worker.context is not None 181 assert self.worker.context is not context 182 183 self.test_basic()
184
185 -class TestPrivateBoxWorker(BaseTestWorker):
186 - def setUp(self):
187 self.reply_box=ThreadMailBox("ReplyBox", 1) 188 self.worker=PrivateBoxWorker(inbox=ThreadMailBox("WorkerBox", 1), 189 context_factory=NullContextManager.factory(func=myfunc), 190 unhandled="die", 191 inbox_timeout=.05) 192 self.worker.start()
193
194 - def test_private(self):
195 m=self.makeMesg(10) 196 self.worker.privbox.sendNow(m) 197 r=self.reply_box.recv(TIMEOUT) 198 assert r.re_id == m.id 199 assert r.payload == 10
200