telepathy-gabble: servicetest: add infrastructure for faking D-Bus services , from MC

Simon McVittie smcv at kemper.freedesktop.org
Thu Sep 26 06:20:43 PDT 2013


Module: telepathy-gabble
Branch: master
Commit: 1f89abdfd46a3d0979229c6a516ebe425e875e68
URL:    http://cgit.freedesktop.org/telepathy/telepathy-gabble/commit/?id=1f89abdfd46a3d0979229c6a516ebe425e875e68

Author: Simon McVittie <simon.mcvittie at collabora.co.uk>
Date:   Wed Sep 25 17:13:00 2013 +0100

servicetest: add infrastructure for faking D-Bus services, from MC

I accidentally a small Python D-Bus binding.

Bug: https://bugs.freedesktop.org/show_bug.cgi?id=69822
Reviewed-by: Guillaume Desmottes <guillaume.desmottes at collabora.co.uk>

---

 tests/twisted/servicetest.py |  146 +++++++++++++++++++++++++++++++++++++++++-
 1 files changed, 145 insertions(+), 1 deletions(-)

diff --git a/tests/twisted/servicetest.py b/tests/twisted/servicetest.py
index a04b72d..b25fe42 100644
--- a/tests/twisted/servicetest.py
+++ b/tests/twisted/servicetest.py
@@ -1,6 +1,23 @@
+# Copyright (C) 2009 Nokia Corporation
+# Copyright (C) 2009-2013 Collabora Ltd.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+# 02110-1301 USA
 
 """
-Infrastructure code for testing connection managers.
+Infrastructure code for testing Telepathy services.
 """
 
 from twisted.internet import glib2reactor
@@ -14,6 +31,7 @@ import pprint
 import unittest
 
 import dbus
+import dbus.lowlevel
 from dbus.mainloop.glib import DBusGMainLoop
 DBusGMainLoop(set_as_default=True)
 
@@ -291,6 +309,11 @@ class IteratingEventQueue(BaseEventQueue):
 
     def __init__(self, timeout=None):
         BaseEventQueue.__init__(self, timeout)
+        self._dbus_method_impls = []
+        self._buses = []
+        # a message filter which will claim we handled everything
+        self._dbus_dev_null = \
+                lambda bus, message: dbus.lowlevel.HANDLER_RESULT_HANDLED
 
     def wait(self, queues=None):
         stop = [False]
@@ -315,6 +338,127 @@ class IteratingEventQueue(BaseEventQueue):
         else:
             raise TimeoutError
 
+    def add_dbus_method_impl(self, cb, bus=None, **kwargs):
+        if bus is None:
+            bus = self._buses[0]
+
+        self._dbus_method_impls.append(
+                (EventPattern('dbus-method-call', **kwargs), cb))
+
+    def dbus_emit(self, path, iface, name, *a, **k):
+        bus = k.pop('bus', self._buses[0])
+        assert 'signature' in k, k
+        message = dbus.lowlevel.SignalMessage(path, iface, name)
+        message.append(*a, **k)
+        bus.send_message(message)
+
+    def dbus_return(self, in_reply_to, *a, **k):
+        bus = k.pop('bus', self._buses[0])
+        assert 'signature' in k, k
+        reply = dbus.lowlevel.MethodReturnMessage(in_reply_to)
+        reply.append(*a, **k)
+        bus.send_message(reply)
+
+    def dbus_raise(self, in_reply_to, name, message=None, bus=None):
+        if bus is None:
+            bus = self._buses[0]
+
+        reply = dbus.lowlevel.ErrorMessage(in_reply_to, name, message)
+        bus.send_message(reply)
+
+    def attach_to_bus(self, bus):
+        if not self._buses:
+            # first-time setup
+            self._dbus_filter_bound_method = self._dbus_filter
+
+        self._buses.append(bus)
+
+        # Only subscribe to messages on the first bus connection (assumed to
+        # be the shared session bus connection used by the simulated connection
+        # manager and most of the test suite), not on subsequent bus
+        # connections (assumed to represent extra clients).
+        #
+        # When we receive a method call on the other bus connections, ignore
+        # it - the eavesdropping filter installed on the first bus connection
+        # will see it too.
+        #
+        # This is highly counter-intuitive, but it means our messages are in
+        # a guaranteed order (we don't have races between messages arriving on
+        # various connections).
+        if len(self._buses) > 1:
+            bus.add_message_filter(self._dbus_dev_null)
+            return
+
+        try:
+            # for dbus > 1.5
+            bus.add_match_string("eavesdrop=true,type='signal'")
+        except dbus.DBusException:
+            bus.add_match_string("type='signal'")
+            bus.add_match_string("type='method_call'")
+        else:
+            bus.add_match_string("eavesdrop=true,type='method_call'")
+
+        bus.add_message_filter(self._dbus_filter_bound_method)
+
+        bus.add_signal_receiver(
+                lambda *args, **kw:
+                    self.append(
+                        Event('dbus-signal',
+                            path=unwrap(kw['path']),
+                            signal=kw['member'],
+                            args=map(unwrap, args),
+                            interface=kw['interface'])),
+                None,
+                None,
+                None,
+                path_keyword='path',
+                member_keyword='member',
+                interface_keyword='interface',
+                byte_arrays=True,
+                )
+
+    def cleanup(self):
+        if self._buses:
+            self._buses[0].remove_message_filter(self._dbus_filter_bound_method)
+        for bus in self._buses[1:]:
+            bus.remove_message_filter(self._dbus_dev_null)
+
+        self._buses = []
+        self._dbus_method_impls = []
+
+    def _dbus_filter(self, bus, message):
+        if isinstance(message, dbus.lowlevel.MethodCallMessage):
+
+            destination = message.get_destination()
+            sender = message.get_sender()
+
+            if (destination == 'org.freedesktop.DBus' or
+                    sender == self._buses[0].get_unique_name()):
+                # suppress reply and don't make an Event
+                return dbus.lowlevel.HANDLER_RESULT_HANDLED
+
+            e = Event('dbus-method-call', message=message,
+                interface=message.get_interface(), path=message.get_path(),
+                raw_args=message.get_args_list(byte_arrays=True),
+                args=map(unwrap, message.get_args_list(byte_arrays=True)),
+                destination=str(destination),
+                method=message.get_member(),
+                sender=message.get_sender(),
+                handled=False)
+
+            for pair in self._dbus_method_impls:
+                pattern, cb = pair
+                if pattern.match(e):
+                    cb(e)
+                    e.handled = True
+                    break
+
+            self.append(e)
+
+            return dbus.lowlevel.HANDLER_RESULT_HANDLED
+
+        return dbus.lowlevel.HANDLER_RESULT_NOT_YET_HANDLED
+
 class TestEventQueue(BaseEventQueue):
     def __init__(self, events):
         BaseEventQueue.__init__(self)



More information about the telepathy-commits mailing list