Package grassyknoll :: Package tests :: Package test_concurrent :: Module test_talk
[hide private]

Source Code for Module grassyknoll.tests.test_concurrent.test_talk

  1  from nose.tools import * 
  2  from grassyknoll.concurrent.lib import * 
  3  from grassyknoll.concurrent.Message import Message 
  4  from grassyknoll.concurrent.MailBox import * 
  5  from grassyknoll.concurrent.Worker import Worker, WorkerExit 
  6  from grassyknoll.concurrent.Fault import Fault 
  7  from grassyknoll.concurrent.errors import * 
  8  from grassyknoll.concurrent.Wrappers import * 
  9   
 10   
 11  #import logging; logging.basicConfig() 
 12   
 13  TIMEOUT=.1 
 14   
 15  import time 
16 -def sleep():
17 time.sleep(TIMEOUT)
18 19 @errorMessenger(ZeroDivisionError) 20 @functionMessenger()
21 -def myfunc(x):
22 return 100/x
23 24 @errorMessenger(ZeroDivisionError) 25 @functionMessenger()
26 -def slowfunc(x):
27 sleep(); sleep(); 28 return 100/x
29
30 -class BaseTestTalk(object):
31
32 - def getWorkerFunc(self):
33 return myfunc
34
35 - def getFunc(self):
36 raise NotImplementedError
37
38 - def setUp(self):
39 self.reply_box=ThreadMailBox("ReplyBox", 1) 40 self.worker=Worker(ThreadMailBox("WorkerBox", 1), 41 NullContextManager.factory(func=self.getWorkerFunc()), 42 unhandled="die") 43 self.worker.start()
44
45 - def tearDown(self):
46 try: 47 self.worker.inbox.sendNow(Message(WorkerExit)) 48 except ConcurrentError: 49 pass 50 51 self.reply_box.close() 52 del self.reply_box 53 del self.worker
54
55 - def test_spurious_mesg(self):
56 self.reply_box.sendNow(Message("pants")) 57 self.test_basic()
58
59 -class TestTalk(BaseTestTalk):
60 - def test_basic(self):
61 reply=talk(FunctionCall(10), self.worker.inbox, TIMEOUT, self.reply_box) 62 assert isinstance(reply, Message) 63 assert reply.payload == 10
64
65 - def test_error(self):
66 reply=talk(FunctionCall(0), self.worker.inbox, TIMEOUT, self.reply_box) 67 assert isinstance(reply, Message) 68 assert isinstance(reply.payload, Fault) 69 assert reply.payload.type is ZeroDivisionError
70
71 - def test_crash(self):
72 reply=talk(FunctionCall("oops"), self.worker.inbox, TIMEOUT, self.reply_box) 73 assert isinstance(reply.payload, CrashError) 74 assert reply.payload.type is TypeError
75
76 - def test_bad_timeout(self):
77 assert_raises(TimeoutError, talk, FunctionCall(10), self.worker.inbox, 0.0, self.reply_box)
78
79 - def test_closed_send(self):
80 self.worker.inbox.close() 81 assert_raises(BoxClosedError, talk, FunctionCall(10), self.worker.inbox, TIMEOUT, self.reply_box)
82
83 - def test_closed_recv(self):
84 self.reply_box.close() 85 assert_raises(BoxEmptyError, talk, FunctionCall(10), self.worker.inbox, TIMEOUT, self.reply_box)
86 87
88 -class TestChat(BaseTestTalk):
89
90 - def test_basic(self):
91 x=chat(FunctionCall(10), self.worker.inbox, TIMEOUT, self.reply_box) 92 assert x == 10
93
94 - def test_error(self):
95 assert_raises(ZeroDivisionError, chat, FunctionCall(0), self.worker.inbox, TIMEOUT, self.reply_box)
96
97 - def test_crash(self):
98 assert_raises(CrashError, chat, FunctionCall("oops"), self.worker.inbox, TIMEOUT, self.reply_box)
99 100
101 - def test_closed_send(self):
102 self.worker.inbox.close() 103 assert_raises(BoxClosedError, chat, FunctionCall(10), self.worker.inbox, TIMEOUT, self.reply_box)
104
105 - def test_closed_recv(self):
106 self.reply_box.close() 107 assert_raises(BoxEmptyError, chat, FunctionCall(10), self.worker.inbox, TIMEOUT, self.reply_box)
108
109 -class BaseSlowTest(BaseTestTalk):
110
111 - def getWorkerFunc(self):
112 return slowfunc
113
114 -class TestSlowTalk(BaseSlowTest):
115
116 - def test_basic(self):
117 assert_raises(BoxEmptyError, talk, FunctionCall(10), self.worker.inbox, TIMEOUT, self.reply_box)
118
119 -class TestSlowChat(BaseSlowTest):
120 121 func=staticmethod(slowfunc) 122
123 - def test_basic(self):
124 assert_raises(BoxEmptyError, chat, FunctionCall(10), self.worker.inbox, TIMEOUT, self.reply_box)
125