[telepathy-mission-control/master] servicetest: add support for treating D-Bus method calls like any other event

Simon McVittie simon.mcvittie at collabora.co.uk
Fri Mar 27 10:24:55 PDT 2009


---
 test/twisted/mctest.py      |    1 +
 test/twisted/servicetest.py |   51 +++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 52 insertions(+), 0 deletions(-)

diff --git a/test/twisted/mctest.py b/test/twisted/mctest.py
index 88fd35b..1c2e065 100644
--- a/test/twisted/mctest.py
+++ b/test/twisted/mctest.py
@@ -60,6 +60,7 @@ def exec_test_deferred (fun, params, protocol=None, timeout=None):
         or '-v' in sys.argv)
 
     bus = dbus.SessionBus()
+    queue.attach_to_bus(bus)
     mc = make_mc(bus, queue.append, params)
     error = None
 
diff --git a/test/twisted/servicetest.py b/test/twisted/servicetest.py
index e08ef02..5d00d0b 100644
--- a/test/twisted/servicetest.py
+++ b/test/twisted/servicetest.py
@@ -11,6 +11,8 @@ import pprint
 import traceback
 import unittest
 
+import dbus
+import dbus.lowlevel
 import dbus.glib
 
 from twisted.internet import reactor
@@ -294,6 +296,8 @@ class IteratingEventQueue(BaseEventQueue):
     def __init__(self, timeout=None):
         BaseEventQueue.__init__(self, timeout)
         self.events = []
+        self._dbus_method_impls = []
+        self._bus = None
 
     def wait(self):
         stop = [False]
@@ -318,6 +322,53 @@ class IteratingEventQueue(BaseEventQueue):
     # compatibility
     handle_event = append
 
+    def add_dbus_method_impl(self, cb, **kwargs):
+        self._dbus_method_impls.append(
+                (EventPattern('dbus-method-call', **kwargs), cb))
+
+    def dbus_emit(self, path, iface, name, *a, **k):
+        message = dbus.lowlevel.SignalMessage(path, iface, name)
+        message.append(*a, **k)
+        self._bus.send_message(message)
+
+    def dbus_return(self, in_reply_to, *a, **k):
+        reply = dbus.lowlevel.MethodReturnMessage(in_reply_to)
+        reply.append(*a, **k)
+        self._bus.send_message(reply)
+
+    def dbus_raise(self, in_reply_to, name, message=None):
+        reply = dbus.lowlevel.ErrorMessage(in_reply_to, name, message)
+        self._bus.send_message(reply)
+
+    def attach_to_bus(self, bus):
+        assert self._bus is None, self._bus
+        self._bus = bus
+        self._bus.add_message_filter(self._dbus_filter)
+
+    def _dbus_filter(self, bus, message):
+        if isinstance(message, dbus.lowlevel.MethodCallMessage):
+
+            e = Event('dbus-method-call', message=message,
+                interface=message.get_interface(), path=message.get_path(),
+                args=map(unwrap, message.get_args_list(byte_arrays=True)),
+                destination=message.get_destination(),
+                method=message.get_member(),
+                sender=message.get_sender(),
+                handled=False)
+
+            for pair in self._dbus_method_impls:
+                pattern, cb = pair
+                if pattern.match(e):
+                    cb(e)
+                    e.handled = True
+                    break
+
+            self.append(e)
+
+            return dbus.lowlevel.HANDLER_RESULT_HANDLED
+
+        return dbus.lowlevel.HANDLER_RESULT_NOT_YET_HANDLED
+
 class TestEventQueue(BaseEventQueue):
     def __init__(self, events):
         BaseEventQueue.__init__(self)
-- 
1.5.6.5




More information about the telepathy-commits mailing list