[telepathy-salut/master] sync servicetest.py from Gabble
Guillaume Desmottes
guillaume.desmottes at collabora.co.uk
Fri Jun 26 08:13:11 PDT 2009
The only change is the path argument of dbus-signal events which is the
full D-Bus path and not the truncated one as in Gabble.
---
tests/twisted/servicetest.py | 257 +++++++++++++++---------------------------
1 files changed, 91 insertions(+), 166 deletions(-)
diff --git a/tests/twisted/servicetest.py b/tests/twisted/servicetest.py
index 69ef0f9..adf6a9b 100644
--- a/tests/twisted/servicetest.py
+++ b/tests/twisted/servicetest.py
@@ -8,7 +8,6 @@ from twisted.internet.protocol import Protocol, Factory, ClientFactory
glib2reactor.install()
import pprint
-import traceback
import unittest
import dbus.glib
@@ -18,38 +17,6 @@ from twisted.internet import reactor
tp_name_prefix = 'org.freedesktop.Telepathy'
tp_path_prefix = '/org/freedesktop/Telepathy'
-class TryNextHandler(Exception):
- pass
-
-def lazy(func):
- def handler(event, data):
- if func(event, data):
- return True
- else:
- raise TryNextHandler()
- handler.__name__ = func.__name__
- return handler
-
-def match(type, **kw):
- def decorate(func):
- def handler(event, data, *extra, **extra_kw):
- if event.type != type:
- return False
-
- for key, value in kw.iteritems():
- if not hasattr(event, key):
- return False
-
- if getattr(event, key) != value:
- return False
-
- return func(event, data, *extra, **extra_kw)
-
- handler.__name__ = func.__name__
- return handler
-
- return decorate
-
class Event:
def __init__(self, type, **kw):
self.__dict__.update(kw)
@@ -68,112 +35,13 @@ def format_event(event):
return ret
-class EventTest:
- """Somewhat odd event dispatcher for asynchronous tests.
-
- Callbacks are kept in a queue. Incoming events are passed to the first
- callback. If the callback returns True, the callback is removed. If the
- callback raises AssertionError, the test fails. If there are no more
- callbacks, the test passes. The reactor is stopped when the test passes.
- """
-
- def __init__(self):
- self.queue = []
- self.data = {'test': self}
- self.timeout_delayed_call = reactor.callLater(5, self.timeout_cb)
- #self.verbose = True
- self.verbose = False
- # ugh
- self.stopping = False
-
- def timeout_cb(self):
- print 'timed out waiting for events'
- print self.queue[0]
- self.fail()
-
- def fail(self):
- # ugh; better way to stop the reactor and exit(1)?
- import os
- os._exit(1)
-
- def expect(self, f):
- self.queue.append(f)
-
- def log(self, s):
- if self.verbose:
- print s
-
- def try_stop(self):
- if self.stopping:
- return True
-
- if not self.queue:
- self.log('no handlers left; stopping')
- self.stopping = True
- reactor.stop()
- return True
-
- return False
-
- def call_handlers(self, event):
- self.log('trying %r' % self.queue[0])
- handler = self.queue.pop(0)
-
- try:
- ret = handler(event, self.data)
- if not ret:
- self.queue.insert(0, handler)
- except TryNextHandler, e:
- if self.queue:
- ret = self.call_handlers(event)
- else:
- ret = False
- self.queue.insert(0, handler)
-
- return ret
-
- def handle_event(self, event):
- if self.try_stop():
- return
-
- self.log('got event:')
- self.log('- type: %s' % event.type)
- map(self.log, format_event(event))
-
- try:
- ret = self.call_handlers(event)
- except SystemExit, e:
- if e.code:
- print "Unsuccessful exit:", e
- self.fail()
- else:
- self.queue[:] = []
- ret = True
- except AssertionError, e:
- print 'test failed:'
- traceback.print_exc()
- self.fail()
- except (Exception, KeyboardInterrupt), e:
- print 'error in handler:'
- traceback.print_exc()
- self.fail()
-
- if ret not in (True, False):
- print ("warning: %s() returned something other than True or False"
- % self.queue[0].__name__)
-
- if ret:
- self.timeout_delayed_call.reset(5)
- self.log('event handled')
- else:
- self.log('event not handled')
-
- self.log('')
- self.try_stop()
-
class EventPattern:
def __init__(self, type, **properties):
self.type = type
+ self.predicate = lambda x: True
+ if 'predicate' in properties:
+ self.predicate = properties['predicate']
+ del properties['predicate']
self.properties = properties
def match(self, event):
@@ -187,7 +55,11 @@ class EventPattern:
except AttributeError:
return False
- return True
+ if self.predicate(event):
+ return True
+
+ return False
+
class TimeoutError(Exception):
pass
@@ -197,10 +69,11 @@ class BaseEventQueue:
Implement the wait() method to have something that works.
"""
- hooks = []
def __init__(self, timeout=None):
self.verbose = False
+ self.past_events = []
+ self.forbidden_events = set()
if timeout is None:
self.timeout = 5
@@ -211,12 +84,45 @@ class BaseEventQueue:
if self.verbose:
print s
- def hook(self, func, type, **kw):
- self.hooks.append( (EventPattern(type, **kw), func))
+ def flush_past_events(self):
+ self.past_events = []
+
+ def expect_racy(self, type, **kw):
+ pattern = EventPattern(type, **kw)
+
+ for event in self.past_events:
+ if pattern.match(event):
+ self.log('past event handled')
+ map(self.log, format_event(event))
+ self.log('')
+ self.past_events.remove(event)
+ return event
- def check_hooks(self, event):
- map (lambda x: x[1](self, event),
- filter (lambda x: x[0].match(event), self.hooks))
+ return self.expect(type, **kw)
+
+ def forbid_events(self, patterns):
+ """
+ Add patterns (an iterable of EventPattern) to the set of forbidden
+ events. If a forbidden event occurs during an expect or expect_many,
+ the test will fail.
+ """
+ self.forbidden_events.update(set(patterns))
+
+ def unforbid_events(self, patterns):
+ """
+ Remove 'patterns' (an iterable of EventPattern) from the set of
+ forbidden events. These must be the same EventPattern pointers that
+ were passed to forbid_events.
+ """
+ self.forbidden_events.difference_update(set(patterns))
+
+ def _check_forbidden(self, event):
+ for e in self.forbidden_events:
+ if e.match(event):
+ print "forbidden event occurred:"
+ for x in format_event(event):
+ print x
+ assert False
def expect(self, type, **kw):
pattern = EventPattern(type, **kw)
@@ -226,11 +132,14 @@ class BaseEventQueue:
self.log('got event:')
map(self.log, format_event(event))
+ self._check_forbidden(event)
+
if pattern.match(event):
self.log('handled')
self.log('')
return event
+ self.past_events.append(event)
self.log('not handled')
self.log('')
@@ -242,6 +151,8 @@ class BaseEventQueue:
self.log('got event:')
map(self.log, format_event(event))
+ self._check_forbidden(event)
+
for i, pattern in enumerate(patterns):
if pattern.match(event):
self.log('handled')
@@ -249,6 +160,7 @@ class BaseEventQueue:
ret[i] = event
break
else:
+ self.past_events.append(event)
self.log('not handled')
self.log('')
@@ -289,9 +201,7 @@ class IteratingEventQueue(BaseEventQueue):
if self.events:
delayed_call.cancel()
- e = self.events.pop(0)
- self.check_hooks(e)
- return e
+ return self.events.pop(0)
else:
raise TimeoutError
@@ -383,6 +293,9 @@ class ProxyWrapper:
def __init__(self, object, default, others):
self.object = object
self.default_interface = dbus.Interface(object, default)
+ self.Properties = dbus.Interface(object, dbus.PROPERTIES_IFACE)
+ self.TpProperties = \
+ dbus.Interface(object, tp_name_prefix + '.Properties')
self.interfaces = dict([
(name, dbus.Interface(object, iface))
for name, iface in others.iteritems()])
@@ -396,6 +309,27 @@ class ProxyWrapper:
return getattr(self.default_interface, name)
+def wrap_connection(conn):
+ return ProxyWrapper(conn, tp_name_prefix + '.Connection',
+ dict([
+ (name, tp_name_prefix + '.Connection.Interface.' + name)
+ for name in ['Aliasing', 'Avatars', 'Capabilities', 'Contacts',
+ 'Presence', 'SimplePresence', 'Requests']] +
+ [('Peer', 'org.freedesktop.DBus.Peer')]))
+
+def wrap_channel(chan, type_, extra=None):
+ interfaces = {
+ type_: tp_name_prefix + '.Channel.Type.' + type_,
+ 'Group': tp_name_prefix + '.Channel.Interface.Group',
+ }
+
+ if extra:
+ interfaces.update(dict([
+ (name, tp_name_prefix + '.Channel.Interface.' + name)
+ for name in extra]))
+
+ return ProxyWrapper(chan, tp_name_prefix + '.Channel', interfaces)
+
def make_connection(bus, event_func, name, proto, params):
cm = bus.get_object(
tp_name_prefix + '.ConnectionManager.%s' % name,
@@ -404,15 +338,7 @@ def make_connection(bus, event_func, name, proto, params):
connection_name, connection_path = cm_iface.RequestConnection(
proto, params)
- conn = bus.get_object(connection_name, connection_path)
- conn = ProxyWrapper(conn, tp_name_prefix + '.Connection',
- dict([
- (name, tp_name_prefix + '.Connection.Interface.' + name)
- for name in ['Aliasing', 'Avatars', 'Capabilities', 'Contacts',
- 'Presence']] +
- [ ('ActivityProperties', 'org.laptop.Telepathy.ActivityProperties'),
- ('BuddyInfo', 'org.laptop.Telepathy.BuddyInfo'),
- ('Peer', 'org.freedesktop.DBus.Peer')]))
+ conn = wrap_connection(bus.get_object(connection_name, connection_path))
bus.add_signal_receiver(
lambda *args, **kw:
@@ -438,17 +364,6 @@ def make_channel_proxy(conn, path, iface):
chan = dbus.Interface(chan, tp_name_prefix + '.' + iface)
return chan
-def load_event_handlers():
- path, _, _, _ = traceback.extract_stack()[0]
- import compiler
- import __main__
- ast = compiler.parseFile(path)
- return [
- getattr(__main__, node.name)
- for node in ast.node.asList()
- if node.__class__ == compiler.ast.Function and
- node.name.startswith('expect_')]
-
class EventProtocol(Protocol):
def __init__(self, queue=None):
self.queue = queue
@@ -485,6 +400,16 @@ def watch_tube_signals(q, tube):
path_keyword='path', member_keyword='member',
byte_arrays=True)
+def assertEquals(expected, value):
+ assert expected == value, "expected: %r; got: %r" % (expected, value)
+
+def assertContains(element, value):
+ assert element in value, "expected: %r in %r" % (element, value)
+
+def assertLength(length, value):
+ assert len(value) == length, \
+ "expected: length %d, got length %d (%r)" % (length, len(value), value)
+
if __name__ == '__main__':
unittest.main()
--
1.5.6.5
More information about the telepathy-commits
mailing list