[telepathy-salut/master] sync servicetest.py from Gabble

Guillaume Desmottes guillaume.desmottes at collabora.co.uk
Fri Jun 26 08:13:11 PDT 2009


The only change is the path argument of dbus-signal events which is the
full D-Bus path and not the truncated one as in Gabble.
---
 tests/twisted/servicetest.py |  257 +++++++++++++++---------------------------
 1 files changed, 91 insertions(+), 166 deletions(-)

diff --git a/tests/twisted/servicetest.py b/tests/twisted/servicetest.py
index 69ef0f9..adf6a9b 100644
--- a/tests/twisted/servicetest.py
+++ b/tests/twisted/servicetest.py
@@ -8,7 +8,6 @@ from twisted.internet.protocol import Protocol, Factory, ClientFactory
 glib2reactor.install()
 
 import pprint
-import traceback
 import unittest
 
 import dbus.glib
@@ -18,38 +17,6 @@ from twisted.internet import reactor
 tp_name_prefix = 'org.freedesktop.Telepathy'
 tp_path_prefix = '/org/freedesktop/Telepathy'
 
-class TryNextHandler(Exception):
-    pass
-
-def lazy(func):
-    def handler(event, data):
-        if func(event, data):
-            return True
-        else:
-            raise TryNextHandler()
-    handler.__name__ = func.__name__
-    return handler
-
-def match(type, **kw):
-    def decorate(func):
-        def handler(event, data, *extra, **extra_kw):
-            if event.type != type:
-                return False
-
-            for key, value in kw.iteritems():
-                if not hasattr(event, key):
-                    return False
-
-                if getattr(event, key) != value:
-                    return False
-
-            return func(event, data, *extra, **extra_kw)
-
-        handler.__name__ = func.__name__
-        return handler
-
-    return decorate
-
 class Event:
     def __init__(self, type, **kw):
         self.__dict__.update(kw)
@@ -68,112 +35,13 @@ def format_event(event):
 
     return ret
 
-class EventTest:
-    """Somewhat odd event dispatcher for asynchronous tests.
-
-    Callbacks are kept in a queue. Incoming events are passed to the first
-    callback. If the callback returns True, the callback is removed. If the
-    callback raises AssertionError, the test fails. If there are no more
-    callbacks, the test passes. The reactor is stopped when the test passes.
-    """
-
-    def __init__(self):
-        self.queue = []
-        self.data = {'test': self}
-        self.timeout_delayed_call = reactor.callLater(5, self.timeout_cb)
-        #self.verbose = True
-        self.verbose = False
-        # ugh
-        self.stopping = False
-
-    def timeout_cb(self):
-        print 'timed out waiting for events'
-        print self.queue[0]
-        self.fail()
-
-    def fail(self):
-        # ugh; better way to stop the reactor and exit(1)?
-        import os
-        os._exit(1)
-
-    def expect(self, f):
-        self.queue.append(f)
-
-    def log(self, s):
-        if self.verbose:
-            print s
-
-    def try_stop(self):
-        if self.stopping:
-            return True
-
-        if not self.queue:
-            self.log('no handlers left; stopping')
-            self.stopping = True
-            reactor.stop()
-            return True
-
-        return False
-
-    def call_handlers(self, event):
-        self.log('trying %r' % self.queue[0])
-        handler = self.queue.pop(0)
-
-        try:
-            ret = handler(event, self.data)
-            if not ret:
-                self.queue.insert(0, handler)
-        except TryNextHandler, e:
-            if self.queue:
-                ret = self.call_handlers(event)
-            else:
-                ret = False
-            self.queue.insert(0, handler)
-
-        return ret
-
-    def handle_event(self, event):
-        if self.try_stop():
-            return
-
-        self.log('got event:')
-        self.log('- type: %s' % event.type)
-        map(self.log, format_event(event))
-
-        try:
-            ret = self.call_handlers(event)
-        except SystemExit, e:
-            if e.code:
-                print "Unsuccessful exit:", e
-                self.fail()
-            else:
-                self.queue[:] = []
-                ret = True
-        except AssertionError, e:
-            print 'test failed:'
-            traceback.print_exc()
-            self.fail()
-        except (Exception, KeyboardInterrupt), e:
-            print 'error in handler:'
-            traceback.print_exc()
-            self.fail()
-
-        if ret not in (True, False):
-            print ("warning: %s() returned something other than True or False"
-                % self.queue[0].__name__)
-
-        if ret:
-            self.timeout_delayed_call.reset(5)
-            self.log('event handled')
-        else:
-            self.log('event not handled')
-
-        self.log('')
-        self.try_stop()
-
 class EventPattern:
     def __init__(self, type, **properties):
         self.type = type
+        self.predicate = lambda x: True
+        if 'predicate' in properties:
+            self.predicate = properties['predicate']
+            del properties['predicate']
         self.properties = properties
 
     def match(self, event):
@@ -187,7 +55,11 @@ class EventPattern:
             except AttributeError:
                 return False
 
-        return True
+        if self.predicate(event):
+            return True
+
+        return False
+
 
 class TimeoutError(Exception):
     pass
@@ -197,10 +69,11 @@ class BaseEventQueue:
 
     Implement the wait() method to have something that works.
     """
-    hooks = []
 
     def __init__(self, timeout=None):
         self.verbose = False
+        self.past_events = []
+        self.forbidden_events = set()
 
         if timeout is None:
             self.timeout = 5
@@ -211,12 +84,45 @@ class BaseEventQueue:
         if self.verbose:
             print s
 
-    def hook(self, func, type, **kw):
-        self.hooks.append( (EventPattern(type, **kw), func))
+    def flush_past_events(self):
+        self.past_events = []
+
+    def expect_racy(self, type, **kw):
+        pattern = EventPattern(type, **kw)
+
+        for event in self.past_events:
+            if pattern.match(event):
+                self.log('past event handled')
+                map(self.log, format_event(event))
+                self.log('')
+                self.past_events.remove(event)
+                return event
 
-    def check_hooks(self, event):
-        map (lambda x: x[1](self, event),
-            filter (lambda x: x[0].match(event), self.hooks))
+        return self.expect(type, **kw)
+
+    def forbid_events(self, patterns):
+        """
+        Add patterns (an iterable of EventPattern) to the set of forbidden
+        events. If a forbidden event occurs during an expect or expect_many,
+        the test will fail.
+        """
+        self.forbidden_events.update(set(patterns))
+
+    def unforbid_events(self, patterns):
+        """
+        Remove 'patterns' (an iterable of EventPattern) from the set of
+        forbidden events. These must be the same EventPattern pointers that
+        were passed to forbid_events.
+        """
+        self.forbidden_events.difference_update(set(patterns))
+
+    def _check_forbidden(self, event):
+        for e in self.forbidden_events:
+            if e.match(event):
+                print "forbidden event occurred:"
+                for x in format_event(event):
+                    print x
+                assert False
 
     def expect(self, type, **kw):
         pattern = EventPattern(type, **kw)
@@ -226,11 +132,14 @@ class BaseEventQueue:
             self.log('got event:')
             map(self.log, format_event(event))
 
+            self._check_forbidden(event)
+
             if pattern.match(event):
                 self.log('handled')
                 self.log('')
                 return event
 
+            self.past_events.append(event)
             self.log('not handled')
             self.log('')
 
@@ -242,6 +151,8 @@ class BaseEventQueue:
             self.log('got event:')
             map(self.log, format_event(event))
 
+            self._check_forbidden(event)
+
             for i, pattern in enumerate(patterns):
                 if pattern.match(event):
                     self.log('handled')
@@ -249,6 +160,7 @@ class BaseEventQueue:
                     ret[i] = event
                     break
             else:
+                self.past_events.append(event)
                 self.log('not handled')
                 self.log('')
 
@@ -289,9 +201,7 @@ class IteratingEventQueue(BaseEventQueue):
 
         if self.events:
             delayed_call.cancel()
-            e = self.events.pop(0)
-            self.check_hooks(e)
-            return e
+            return self.events.pop(0)
         else:
             raise TimeoutError
 
@@ -383,6 +293,9 @@ class ProxyWrapper:
     def __init__(self, object, default, others):
         self.object = object
         self.default_interface = dbus.Interface(object, default)
+        self.Properties = dbus.Interface(object, dbus.PROPERTIES_IFACE)
+        self.TpProperties = \
+            dbus.Interface(object, tp_name_prefix + '.Properties')
         self.interfaces = dict([
             (name, dbus.Interface(object, iface))
             for name, iface in others.iteritems()])
@@ -396,6 +309,27 @@ class ProxyWrapper:
 
         return getattr(self.default_interface, name)
 
+def wrap_connection(conn):
+    return ProxyWrapper(conn, tp_name_prefix + '.Connection',
+        dict([
+            (name, tp_name_prefix + '.Connection.Interface.' + name)
+            for name in ['Aliasing', 'Avatars', 'Capabilities', 'Contacts',
+              'Presence', 'SimplePresence', 'Requests']] +
+        [('Peer', 'org.freedesktop.DBus.Peer')]))
+
+def wrap_channel(chan, type_, extra=None):
+    interfaces = {
+        type_: tp_name_prefix + '.Channel.Type.' + type_,
+        'Group': tp_name_prefix + '.Channel.Interface.Group',
+        }
+
+    if extra:
+        interfaces.update(dict([
+            (name, tp_name_prefix + '.Channel.Interface.' + name)
+            for name in extra]))
+
+    return ProxyWrapper(chan, tp_name_prefix + '.Channel', interfaces)
+
 def make_connection(bus, event_func, name, proto, params):
     cm = bus.get_object(
         tp_name_prefix + '.ConnectionManager.%s' % name,
@@ -404,15 +338,7 @@ def make_connection(bus, event_func, name, proto, params):
 
     connection_name, connection_path = cm_iface.RequestConnection(
         proto, params)
-    conn = bus.get_object(connection_name, connection_path)
-    conn = ProxyWrapper(conn, tp_name_prefix + '.Connection',
-        dict([
-            (name, tp_name_prefix + '.Connection.Interface.' + name)
-            for name in ['Aliasing', 'Avatars', 'Capabilities', 'Contacts',
-              'Presence']] +
-        [  ('ActivityProperties', 'org.laptop.Telepathy.ActivityProperties'),
-           ('BuddyInfo', 'org.laptop.Telepathy.BuddyInfo'),
-           ('Peer', 'org.freedesktop.DBus.Peer')]))
+    conn = wrap_connection(bus.get_object(connection_name, connection_path))
 
     bus.add_signal_receiver(
         lambda *args, **kw:
@@ -438,17 +364,6 @@ def make_channel_proxy(conn, path, iface):
     chan = dbus.Interface(chan, tp_name_prefix + '.' + iface)
     return chan
 
-def load_event_handlers():
-    path, _, _, _ = traceback.extract_stack()[0]
-    import compiler
-    import __main__
-    ast = compiler.parseFile(path)
-    return [
-        getattr(__main__, node.name)
-        for node in ast.node.asList()
-        if node.__class__ == compiler.ast.Function and
-            node.name.startswith('expect_')]
-
 class EventProtocol(Protocol):
     def __init__(self, queue=None):
         self.queue = queue
@@ -485,6 +400,16 @@ def watch_tube_signals(q, tube):
         path_keyword='path', member_keyword='member',
         byte_arrays=True)
 
+def assertEquals(expected, value):
+    assert expected == value, "expected: %r; got: %r" % (expected, value)
+
+def assertContains(element, value):
+    assert element in value, "expected: %r in %r" % (element, value)
+
+def assertLength(length, value):
+    assert len(value) == length, \
+        "expected: length %d, got length %d (%r)" % (length, len(value), value)
+
 if __name__ == '__main__':
     unittest.main()
 
-- 
1.5.6.5




More information about the telepathy-commits mailing list