Files
Phoenix/unittests/test_lib_pubsub_topicmgr.py
2013-05-04 02:00:34 +00:00

430 lines
16 KiB
Python

"""
:copyright: Copyright 2006-2009 by Oliver Schoenborn, all rights reserved.
:license: BSD, see LICENSE.txt for details.
"""
import imp_unittest, unittest
import wtc
from wx.lib.pubsub.pub import \
ALL_TOPICS, \
ListenerSpecInvalid, \
ITopicDefnProvider, \
TopicTreeTraverser, \
UndefinedTopic, \
UndefinedSubtopic, \
ListenerNotValidatable
from wx.lib.pubsub.core.topicmgr import \
ArgSpecGiven
from wx.lib.pubsub.core.topicutils import \
TopicNameInvalid, \
validateName
from wx.lib.pubsub.utils.topictreeprinter import \
printTreeDocs, ITopicTreeVisitor
class lib_pubsub_TopicMgr0_Basic(wtc.PubsubTestCase):
"""
Only tests TopicMgr methods. This must use some query methods on
topic objects to validate that TopicMgr did it's job properly.
"""
def failTopicName(self, name):
self.assertRaises(TopicNameInvalid, validateName, name)
def test10_GoodTopicNames(self):
#
# Test that valid topic names are accepted by pubsub'
#
validateName('test.asdf')
validateName('test.a')
validateName('test.a.b')
def test10_BadTopicNames(self):
#
# Test that invalid topic names are rejected by pubsub
#
# parts of topic name are 'empty'
self.failTopicName( '' )
self.failTopicName( ('',) )
self.failTopicName( ('test','asdf','') )
self.failTopicName( ('test','a', None) )
# parts of topic name have invalid char
self.failTopicName( ('test','a','b','_') )
self.failTopicName( ('(aa',) )
self.failTopicName( (ALL_TOPICS,) )
class lib_pubsub_TopicMgr1_GetOrCreate_NoDefnProv(wtc.PubsubTestCase):
"""
Only tests TopicMgr methods. This must use some query methods on
topic objects to validate that TopicMgr did it's job properly.
"""
def test10_NoProtoListener(self):
#
# Test the getOrCreateTopic without proto listener
#
topicMgr = self.pub.getDefaultTopicMgr()
def verifyNonSendable(topicObj, nameTuple, parent):
"""Any non-sendable topic will satisfy these conditions:"""
self.assertEqual(0, topicMgr.isTopicSpecified(nameTuple))
assert not topicObj.isSendable()
assert topicObj.getListeners() == []
assert topicObj.getNameTuple() == nameTuple
assert topicObj.getNumListeners() == 0
assert topicObj.getParent() is parent
assert topicObj.getNodeName() == topicObj.getNameTuple()[-1]
def foobar():
pass
assert not topicObj.hasListener(foobar)
assert not topicObj.hasListeners()
assert not topicObj.hasSubtopic('asdfafs')
assert not topicObj.isAll()
self.assertRaises(ListenerNotValidatable, topicObj.isValid, foobar)
self.assertRaises(ListenerNotValidatable, topicObj.validate, foobar)
# check that getTopic and getOrCreateTopic won't create again:
assert topicMgr.getOrCreateTopic(nameTuple) is topicObj
assert topicMgr.getTopic(nameTuple) is topicObj
# test with a root topic
rootName = 'GetOrCreate_NoProtoListener'
tName = rootName
# verify doesn't exist yet
assert topicMgr.getTopic(tName, True) is None
# ok create it, unsendable
rootTopic = topicMgr.getOrCreateTopic(tName)
verifyNonSendable(rootTopic, (rootName,), topicMgr.getRootTopic())
DESC_NO_SPEC = 'UNDOCUMENTED: created without spec'
assert rootTopic.getDescription() == DESC_NO_SPEC
assert rootTopic.isRoot()
assert rootTopic.getSubtopics() == []
assert not rootTopic.isAll()
assert not rootTopic.hasSubtopic()
# test with a subtopic
tName1 = (rootName, 'stB')
tName2 = tName1 + ('sstC',)
assert topicMgr.getTopic(tName1, True) is None
assert topicMgr.getTopic(tName2, True) is None
subsubTopic = topicMgr.getOrCreateTopic(tName2)
# verify that parent was created implicitly
subTopic = topicMgr.getTopic(tName1)
verifyNonSendable(subTopic, tName1, rootTopic)
verifyNonSendable(subsubTopic, tName2, subTopic)
assert subsubTopic.getDescription() == DESC_NO_SPEC
DESC_PARENT_NO_SPEC = 'UNDOCUMENTED: created as parent without specification'
assert subTopic.getDescription() == DESC_PARENT_NO_SPEC
assert rootTopic.getSubtopics() == [subTopic]
assert rootTopic.hasSubtopic()
assert subTopic.getSubtopics() == [subsubTopic]
assert subTopic.hasSubtopic()
assert subsubTopic.getSubtopics() == []
assert not subsubTopic.hasSubtopic()
# check that getTopic raises expected exception when undefined topic:
tName = 'Undefined'
self.assertRaises(UndefinedTopic, topicMgr.getTopic, tName)
tName = rootName + '.Undefined'
self.assertRaises(UndefinedSubtopic, topicMgr.getTopic, tName)
def test20_WithProtoListener(self):
#
# Test the getOrCreateTopic with proto listener
#
topicMgr = self.pub.getDefaultTopicMgr()
rootName = 'GetOrCreate_WithProtoListener'
tName = rootName
# verify doesn't exist yet
assert topicMgr.getTopic(tName, True) is None
def protoListener(arg1, arg2=None):
pass
# ok create it, sendable
rootTopic = topicMgr.getOrCreateTopic(tName, protoListener)
# check that getTopic and getOrCreateTopic won't create again:
assert topicMgr.getOrCreateTopic(tName) is rootTopic
assert topicMgr.getTopic(tName) is rootTopic
assert rootTopic.isSendable()
assert topicMgr.isTopicSpecified(tName)
expectDesc = 'UNDOCUMENTED: created from protoListener "protoListener" in module test_lib_pubsub_topicmgr'
assert rootTopic.getDescription() == expectDesc
# check that topic created can discern between good and bad listener
assert rootTopic.isValid(protoListener)
def badListener1():
pass # missing required arg
def badListener2(arg2):
pass # opt arg is required
def badListener3(arg1, arg3):
pass # extra required arg
assert not rootTopic.isValid(badListener1)
assert not rootTopic.isValid(badListener2)
assert not rootTopic.isValid(badListener3)
# verify that missing parent created is not sendable, child is
def protoListener2(arg1, arg2=None):
pass
tName = (tName, 'stA', 'sstB')
subsubTopic = topicMgr.getOrCreateTopic(tName, protoListener2)
subTopic = topicMgr.getTopic( tName[:-1] )
assert not topicMgr.isTopicSpecified( tName[:-1] )
assert topicMgr.isTopicSpecified( tName )
assert subsubTopic.isValid(protoListener2)
class lib_pubsub_TopicMgr2_GetOrCreate_DefnProv(wtc.PubsubTestCase):
"""
Test TopicManager when one or more definition providers
can provide for some topic definitions.
"""
def test10_DefnProvider(self):
#
# Test the addition and clearing of definition providers
#
topicMgr = self.pub.getDefaultTopicMgr()
class DefnProvider:
pass
dp1 = DefnProvider()
dp2 = DefnProvider()
assert 1 == topicMgr.addDefnProvider(dp1)
assert 1 == topicMgr.addDefnProvider(dp1)
assert 2 == topicMgr.addDefnProvider(dp2)
assert 2 == topicMgr.addDefnProvider(dp2)
assert 2 == topicMgr.addDefnProvider(dp1)
topicMgr.clearDefnProviders()
assert 0 == topicMgr.getNumDefnProviders()
assert 1 == topicMgr.addDefnProvider(dp1)
topicMgr.clearDefnProviders()
def test20_UseProvider(self):
#
# Test the use of definition providers for topics. We create
# two so we can check that more than one can work together.
# One provides good definitions, one provides some with errors.
#
topicMgr = self.pub.getDefaultTopicMgr()
class DefnProvider(ITopicDefnProvider):
"""
Provide definitions for a root topic, subtopic, and
one subtopic whose parent is not defined here. It is easier
to use sub-only definitions.
"""
def __init__(self):
self.defns = {
('a',) : (dict(arg1='arg1 desc', arg2='arg2 desc'),
('arg1',) ),
('a', 'b') : (dict(arg1='arg1 desc', arg2='arg2 desc',
arg3='arg3 desc', arg4='arg2 desc'),
('arg1', 'arg3',) ),
# parent doesn't have defn
('a', 'c', 'd') : (
dict(arg1='arg1 desc', arg2='arg2 desc',
arg3='arg3 desc', arg4='arg4 desc',
arg5='arg5 desc', arg6='arg6 desc'),
('arg1', 'arg3', 'arg5',)),
}
def getDefn(self, topicNameTuple):
if topicNameTuple not in self.defns:
return None, None
defn = ArgSpecGiven()
defn.setAll( * self.defns[topicNameTuple] )
desc = '%s desc' % '.'.join(topicNameTuple)
return desc, defn
class DefnProviderErr(ITopicDefnProvider):
"""
Provide some definitions that have wrong arg spec. It is
easier to use the 'all-spec' for definitions, which provides
an opportunity for a different method of ArgSpecGiven.
"""
def __init__(self):
self.defns = {
('a', 'err1') : (# missing arg2
dict(arg1=''),
('arg1',) ),
('a', 'err2') : (# missing arg1
dict(arg2=''), ),
('a', 'err3') : (# arg1 is no longer required
dict(arg1='', arg2=''), ),
}
def getDefn(self, topicNameTuple):
if topicNameTuple not in self.defns:
return None, None
defn = ArgSpecGiven()
defn.setAll( * self.defns[topicNameTuple] )
desc = '%s desc' % '.'.join(topicNameTuple)
return desc, defn
topicMgr.addDefnProvider( DefnProvider() )
topicMgr.addDefnProvider( DefnProviderErr() )
# create some topics that will use defn provider
topic = topicMgr.getOrCreateTopic('a')
assert topic.getDescription() == 'a desc'
assert topic.isSendable()
topic = topicMgr.getOrCreateTopic('a.b')
assert topic.getDescription() == 'a.b desc'
assert topic.isSendable()
topic = topicMgr.getOrCreateTopic('a.c.d')
assert topic.getDescription() == 'a.c.d desc'
assert topic.isSendable()
assert not topicMgr.isTopicSpecified('a.c')
# check
parent = topicMgr.getTopic('a.c')
assert not parent.isSendable()
def protoListener(arg1, arg3, arg2=None, arg4=None): pass
parent = topicMgr.getOrCreateTopic('a.c', protoListener)
assert parent.isSendable()
assert topic.isSendable()
# now the erroneous ones:
def testRaises(topicName, expectMsg):
self.assertRaises(ListenerSpecInvalid, topicMgr.getOrCreateTopic,
topicName)
try:
assert topicMgr.getOrCreateTopic(topicName) is None
except ListenerSpecInvalid, exc:
# ok, did raise but is it correct message?
try:
str(exc).index(expectMsg)
except ValueError:
msg = 'Wrong message, expected \n "%s", got \n "%s"'
raise RuntimeError(msg % (expectMsg, str(exc)) )
testRaises('a.err1', 'Params [arg1] missing inherited [arg2] for topic "a.err1"')
testRaises('a.err2', 'Params [arg2] missing inherited [arg1] for topic "a.err2"')
testRaises('a.err3', 'Params [] missing inherited [arg1] for topic "a.err3" required args')
def test30_DelTopic(self):
#
# Test topic deletion
#
topicMgr = self.pub.getDefaultTopicMgr()
topicMgr.getOrCreateTopic('delTopic.b.c.d.e')
assert topicMgr.getTopic('delTopic.b.c.d.e') is not None
assert topicMgr.getTopic('delTopic.b.c.d').hasSubtopic('e')
assert topicMgr.getTopic('delTopic.b').hasSubtopic('c')
topicMgr.delTopic('delTopic.b.c')
assert not topicMgr.getTopic('delTopic.b').hasSubtopic('c')
assert topicMgr.getTopic('delTopic.b.c.d.e', okIfNone=True) is None
assert topicMgr.getTopic('delTopic.b.c.d', okIfNone=True) is None
assert topicMgr.getTopic('delTopic.b.c', okIfNone=True) is None
class lib_pubsub_TopicMgr3_TreeTraverser(wtc.PubsubTestCase):
expectedOutput = '''\
\-- Topic "a2"
\-- Topic "a"
\-- Topic "a"
\-- Topic "b"
\-- Topic "b"
\-- Topic "a"
\-- Topic "b"'''
def test1(self):
#
# Test printing of topic tree
#
topicMgr = self.pub.getDefaultTopicMgr()
root = topicMgr.getOrCreateTopic('a2')
topicMgr.getOrCreateTopic('a2.a.a')
topicMgr.getOrCreateTopic('a2.a.b')
topicMgr.getOrCreateTopic('a2.b.a')
topicMgr.getOrCreateTopic('a2.b.b')
from StringIO import StringIO
buffer = StringIO()
printTreeDocs(rootTopic=root, width=70, fileObj=buffer)
self.assertEqual( buffer.getvalue(), self.expectedOutput )
def test2(self):
#
# Test traversing with and without filtering, breadth and depth
#
topicMgr = self.pub.getDefaultTopicMgr()
class MyTraverser(ITopicTreeVisitor):
def __init__(self, pub):
self.traverser = pub.TopicTreeTraverser(self)
self.calls = ''
self.topics = []
def traverse(self, rootTopic, **kwargs):
self.traverser.traverse(rootTopic, **kwargs)
def __append(self, val):
self.calls = self.calls + str(val)
def _startTraversal(self):
self.__append(1)
def _accept(self, topicObj):
self.__append(2)
# only accept topics at root or second level tree, or if tailName() is 'A'
return len(topicObj.getNameTuple()) <= 2 or topicObj.getNodeName() == 'A'
def _onTopic(self, topicObj):
self.__append(3)
self.topics.append(topicObj.getNodeName())
def _startChildren(self):
self.__append(4)
def _endChildren(self):
self.__append(5)
def _doneTraversal(self):
self.__append(6)
root = topicMgr.getOrCreateTopic('traversal')
topicMgr.getOrCreateTopic('traversal.a.A')
topicMgr.getOrCreateTopic('traversal.a.B.foo')
topicMgr.getOrCreateTopic('traversal.b.C')
topicMgr.getOrCreateTopic('traversal.b.D.bar')
def exe(expectCalls, expectTopics, **kwargs):
traverser = MyTraverser(self.pub)
traverser.traverse(root, **kwargs)
self.assertEqual(traverser.topics, expectTopics)
self.assertEqual(traverser.calls, expectCalls)
exe(expectCalls = '13434345343455534345343455556',
expectTopics = ['traversal', 'a', 'A', 'B', 'foo', 'b', 'C', 'D', 'bar'],
onlyFiltered = False)
exe(expectCalls = '13433543354335454354543545456',
expectTopics = ['traversal', 'a', 'b', 'A', 'B', 'C', 'D', 'foo', 'bar'],
how = TopicTreeTraverser.BREADTH, onlyFiltered = False)
exe(expectCalls = '123423423452523422556',
expectTopics = ['traversal','a','A','b'])
exe(expectCalls = '123423235423254225456',
expectTopics = ['traversal','a','b','A'],
how = TopicTreeTraverser.BREADTH)
#---------------------------------------------------------------------------
if __name__ == '__main__':
unittest.main()