telepathy-gabble: servicetest: add infrastructure for faking D-Bus services , from MC
Simon McVittie
smcv at kemper.freedesktop.org
Thu Sep 26 06:20:43 PDT 2013
Module: telepathy-gabble
Branch: master
Commit: 1f89abdfd46a3d0979229c6a516ebe425e875e68
URL: http://cgit.freedesktop.org/telepathy/telepathy-gabble/commit/?id=1f89abdfd46a3d0979229c6a516ebe425e875e68
Author: Simon McVittie <simon.mcvittie at collabora.co.uk>
Date: Wed Sep 25 17:13:00 2013 +0100
servicetest: add infrastructure for faking D-Bus services, from MC
I accidentally a small Python D-Bus binding.
Bug: https://bugs.freedesktop.org/show_bug.cgi?id=69822
Reviewed-by: Guillaume Desmottes <guillaume.desmottes at collabora.co.uk>
---
tests/twisted/servicetest.py | 146 +++++++++++++++++++++++++++++++++++++++++-
1 files changed, 145 insertions(+), 1 deletions(-)
diff --git a/tests/twisted/servicetest.py b/tests/twisted/servicetest.py
index a04b72d..b25fe42 100644
--- a/tests/twisted/servicetest.py
+++ b/tests/twisted/servicetest.py
@@ -1,6 +1,23 @@
+# Copyright (C) 2009 Nokia Corporation
+# Copyright (C) 2009-2013 Collabora Ltd.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+# 02110-1301 USA
"""
-Infrastructure code for testing connection managers.
+Infrastructure code for testing Telepathy services.
"""
from twisted.internet import glib2reactor
@@ -14,6 +31,7 @@ import pprint
import unittest
import dbus
+import dbus.lowlevel
from dbus.mainloop.glib import DBusGMainLoop
DBusGMainLoop(set_as_default=True)
@@ -291,6 +309,11 @@ class IteratingEventQueue(BaseEventQueue):
def __init__(self, timeout=None):
BaseEventQueue.__init__(self, timeout)
+ self._dbus_method_impls = []
+ self._buses = []
+ # a message filter which will claim we handled everything
+ self._dbus_dev_null = \
+ lambda bus, message: dbus.lowlevel.HANDLER_RESULT_HANDLED
def wait(self, queues=None):
stop = [False]
@@ -315,6 +338,127 @@ class IteratingEventQueue(BaseEventQueue):
else:
raise TimeoutError
+ def add_dbus_method_impl(self, cb, bus=None, **kwargs):
+ if bus is None:
+ bus = self._buses[0]
+
+ self._dbus_method_impls.append(
+ (EventPattern('dbus-method-call', **kwargs), cb))
+
+ def dbus_emit(self, path, iface, name, *a, **k):
+ bus = k.pop('bus', self._buses[0])
+ assert 'signature' in k, k
+ message = dbus.lowlevel.SignalMessage(path, iface, name)
+ message.append(*a, **k)
+ bus.send_message(message)
+
+ def dbus_return(self, in_reply_to, *a, **k):
+ bus = k.pop('bus', self._buses[0])
+ assert 'signature' in k, k
+ reply = dbus.lowlevel.MethodReturnMessage(in_reply_to)
+ reply.append(*a, **k)
+ bus.send_message(reply)
+
+ def dbus_raise(self, in_reply_to, name, message=None, bus=None):
+ if bus is None:
+ bus = self._buses[0]
+
+ reply = dbus.lowlevel.ErrorMessage(in_reply_to, name, message)
+ bus.send_message(reply)
+
+ def attach_to_bus(self, bus):
+ if not self._buses:
+ # first-time setup
+ self._dbus_filter_bound_method = self._dbus_filter
+
+ self._buses.append(bus)
+
+ # Only subscribe to messages on the first bus connection (assumed to
+ # be the shared session bus connection used by the simulated connection
+ # manager and most of the test suite), not on subsequent bus
+ # connections (assumed to represent extra clients).
+ #
+ # When we receive a method call on the other bus connections, ignore
+ # it - the eavesdropping filter installed on the first bus connection
+ # will see it too.
+ #
+ # This is highly counter-intuitive, but it means our messages are in
+ # a guaranteed order (we don't have races between messages arriving on
+ # various connections).
+ if len(self._buses) > 1:
+ bus.add_message_filter(self._dbus_dev_null)
+ return
+
+ try:
+ # for dbus > 1.5
+ bus.add_match_string("eavesdrop=true,type='signal'")
+ except dbus.DBusException:
+ bus.add_match_string("type='signal'")
+ bus.add_match_string("type='method_call'")
+ else:
+ bus.add_match_string("eavesdrop=true,type='method_call'")
+
+ bus.add_message_filter(self._dbus_filter_bound_method)
+
+ bus.add_signal_receiver(
+ lambda *args, **kw:
+ self.append(
+ Event('dbus-signal',
+ path=unwrap(kw['path']),
+ signal=kw['member'],
+ args=map(unwrap, args),
+ interface=kw['interface'])),
+ None,
+ None,
+ None,
+ path_keyword='path',
+ member_keyword='member',
+ interface_keyword='interface',
+ byte_arrays=True,
+ )
+
+ def cleanup(self):
+ if self._buses:
+ self._buses[0].remove_message_filter(self._dbus_filter_bound_method)
+ for bus in self._buses[1:]:
+ bus.remove_message_filter(self._dbus_dev_null)
+
+ self._buses = []
+ self._dbus_method_impls = []
+
+ def _dbus_filter(self, bus, message):
+ if isinstance(message, dbus.lowlevel.MethodCallMessage):
+
+ destination = message.get_destination()
+ sender = message.get_sender()
+
+ if (destination == 'org.freedesktop.DBus' or
+ sender == self._buses[0].get_unique_name()):
+ # suppress reply and don't make an Event
+ return dbus.lowlevel.HANDLER_RESULT_HANDLED
+
+ e = Event('dbus-method-call', message=message,
+ interface=message.get_interface(), path=message.get_path(),
+ raw_args=message.get_args_list(byte_arrays=True),
+ args=map(unwrap, message.get_args_list(byte_arrays=True)),
+ destination=str(destination),
+ method=message.get_member(),
+ sender=message.get_sender(),
+ handled=False)
+
+ for pair in self._dbus_method_impls:
+ pattern, cb = pair
+ if pattern.match(e):
+ cb(e)
+ e.handled = True
+ break
+
+ self.append(e)
+
+ return dbus.lowlevel.HANDLER_RESULT_HANDLED
+
+ return dbus.lowlevel.HANDLER_RESULT_NOT_YET_HANDLED
+
class TestEventQueue(BaseEventQueue):
def __init__(self, events):
BaseEventQueue.__init__(self)
More information about the telepathy-commits
mailing list