[telepathy-mission-control/master] mctest: add a simulated client

Simon McVittie simon.mcvittie at collabora.co.uk
Wed Apr 1 05:55:15 PDT 2009


---
 test/twisted/mctest.py |   94 ++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 94 insertions(+), 0 deletions(-)

diff --git a/test/twisted/mctest.py b/test/twisted/mctest.py
index d43dced..377b6a0 100644
--- a/test/twisted/mctest.py
+++ b/test/twisted/mctest.py
@@ -181,6 +181,100 @@ def aasv(x):
     return dbus.Array([dbus.Dictionary(d, signature='sv') for d in x],
             signature='a{sv}')
 
+class SimulatedClient(object):
+    def __init__(self, q, bus, clientname,
+            observe=[], approve=[], handle=[], bypass_approval=False):
+        self.q = q
+        self.bus = bus
+        self.bus_name = '.'.join([cs.tp_name_prefix, 'Client', clientname])
+        self._bus_name_ref = dbus.service.BusName(self.bus_name, self.bus)
+        self.object_path = '/' + self.bus_name.replace('.', '/')
+        self.observe = aasv(observe)
+        self.approve = aasv(approve)
+        self.handle = aasv(handle)
+        self.bypass_approval = bool(bypass_approval)
+        self.handled_channels = dbus.Array([], signature='o')
+
+        q.add_dbus_method_impl(self.Get_Interfaces,
+                path=self.object_path, interface=cs.PROPERTIES_IFACE,
+                method='Get', args=[cs.CLIENT, 'Interfaces'])
+        q.add_dbus_method_impl(self.GetAll_Client, path=self.object_path,
+                interface=cs.PROPERTIES_IFACE, method='GetAll',
+                args=[cs.CLIENT])
+
+        q.add_dbus_method_impl(self.Get_ObserverChannelFilter,
+                path=self.object_path, interface=cs.PROPERTIES_IFACE,
+                method='Get', args=[cs.OBSERVER, 'ObserverChannelFilter'])
+        q.add_dbus_method_impl(self.GetAll_Observer, path=self.object_path,
+                interface=cs.PROPERTIES_IFACE, method='GetAll',
+                args=[cs.OBSERVER])
+
+        q.add_dbus_method_impl(self.Get_ApproverChannelFilter,
+                path=self.object_path, interface=cs.PROPERTIES_IFACE,
+                method='Get', args=[cs.APPROVER, 'ApproverChannelFilter'])
+        q.add_dbus_method_impl(self.GetAll_Approver, path=self.object_path,
+                interface=cs.PROPERTIES_IFACE, method='GetAll',
+                args=[cs.APPROVER])
+
+        q.add_dbus_method_impl(self.Get_HandlerChannelFilter,
+                path=self.object_path, interface=cs.PROPERTIES_IFACE,
+                method='Get', args=[cs.HANDLER, 'HandlerChannelFilter'])
+        q.add_dbus_method_impl(self.GetAll_Handler, path=self.object_path,
+                interface=cs.PROPERTIES_IFACE, method='GetAll',
+                args=[cs.HANDLER])
+
+    def get_interfaces(self):
+        ret = dbus.Array([], signature='s', variant_level=1)
+
+        if self.observe:
+            ret.append(cs.OBSERVER)
+
+        if self.approve:
+            ret.append(cs.APPROVER)
+
+        if self.handle:
+            ret.append(cs.HANDLER)
+
+        return ret
+
+    def Get_Interfaces(self, e):
+        self.q.dbus_return(e.message, self.get_interfaces(), signature='v')
+
+    def GetAll_Client(self, e):
+        self.q.dbus_return(e.message, {'Interfaces': self.get_interfaces()},
+                signature='a{sv}')
+
+    def GetAll_Observer(self, e):
+        assert self.observe
+        self.q.dbus_return(e.message, {'ObserverChannelFilter': self.observe},
+                signature='a{sv}')
+
+    def Get_ObserverChannelFilter(self, e):
+        assert self.observe
+        self.q.dbus_return(e.message, self.observe, signature='v')
+
+    def GetAll_Approver(self, e):
+        assert self.approve
+        self.q.dbus_return(e.message, {'ApproverChannelFilter': self.approve},
+                signature='a{sv}')
+
+    def Get_ApproverChannelFilter(self, e):
+        assert self.approve
+        self.q.dbus_return(e.message, self.approve, signature='v')
+
+    def GetAll_Handler(self, e):
+        assert self.handle
+        self.q.dbus_return(e.message, {
+            'HandlerChannelFilter': self.handle,
+            'BypassApproval': self.bypass_approval,
+            'HandledChannels': self.handled_channels,
+            },
+                signature='a{sv}')
+
+    def Get_HandlerChannelFilter(self, e):
+        assert self.handle
+        self.q.dbus_return(e.message, self.handle, signature='v')
+
 def create_fakecm_account(q, bus, mc, params):
     """Create a fake connection manager and an account that uses it.
     """
-- 
1.5.6.5




More information about the telepathy-commits mailing list