[telepathy-mission-control/master] mctest: add a simulated client
Simon McVittie
simon.mcvittie at collabora.co.uk
Wed Apr 1 05:55:15 PDT 2009
---
test/twisted/mctest.py | 94 ++++++++++++++++++++++++++++++++++++++++++++++++
1 files changed, 94 insertions(+), 0 deletions(-)
diff --git a/test/twisted/mctest.py b/test/twisted/mctest.py
index d43dced..377b6a0 100644
--- a/test/twisted/mctest.py
+++ b/test/twisted/mctest.py
@@ -181,6 +181,100 @@ def aasv(x):
return dbus.Array([dbus.Dictionary(d, signature='sv') for d in x],
signature='a{sv}')
+class SimulatedClient(object):
+ def __init__(self, q, bus, clientname,
+ observe=[], approve=[], handle=[], bypass_approval=False):
+ self.q = q
+ self.bus = bus
+ self.bus_name = '.'.join([cs.tp_name_prefix, 'Client', clientname])
+ self._bus_name_ref = dbus.service.BusName(self.bus_name, self.bus)
+ self.object_path = '/' + self.bus_name.replace('.', '/')
+ self.observe = aasv(observe)
+ self.approve = aasv(approve)
+ self.handle = aasv(handle)
+ self.bypass_approval = bool(bypass_approval)
+ self.handled_channels = dbus.Array([], signature='o')
+
+ q.add_dbus_method_impl(self.Get_Interfaces,
+ path=self.object_path, interface=cs.PROPERTIES_IFACE,
+ method='Get', args=[cs.CLIENT, 'Interfaces'])
+ q.add_dbus_method_impl(self.GetAll_Client, path=self.object_path,
+ interface=cs.PROPERTIES_IFACE, method='GetAll',
+ args=[cs.CLIENT])
+
+ q.add_dbus_method_impl(self.Get_ObserverChannelFilter,
+ path=self.object_path, interface=cs.PROPERTIES_IFACE,
+ method='Get', args=[cs.OBSERVER, 'ObserverChannelFilter'])
+ q.add_dbus_method_impl(self.GetAll_Observer, path=self.object_path,
+ interface=cs.PROPERTIES_IFACE, method='GetAll',
+ args=[cs.OBSERVER])
+
+ q.add_dbus_method_impl(self.Get_ApproverChannelFilter,
+ path=self.object_path, interface=cs.PROPERTIES_IFACE,
+ method='Get', args=[cs.APPROVER, 'ApproverChannelFilter'])
+ q.add_dbus_method_impl(self.GetAll_Approver, path=self.object_path,
+ interface=cs.PROPERTIES_IFACE, method='GetAll',
+ args=[cs.APPROVER])
+
+ q.add_dbus_method_impl(self.Get_HandlerChannelFilter,
+ path=self.object_path, interface=cs.PROPERTIES_IFACE,
+ method='Get', args=[cs.HANDLER, 'HandlerChannelFilter'])
+ q.add_dbus_method_impl(self.GetAll_Handler, path=self.object_path,
+ interface=cs.PROPERTIES_IFACE, method='GetAll',
+ args=[cs.HANDLER])
+
+ def get_interfaces(self):
+ ret = dbus.Array([], signature='s', variant_level=1)
+
+ if self.observe:
+ ret.append(cs.OBSERVER)
+
+ if self.approve:
+ ret.append(cs.APPROVER)
+
+ if self.handle:
+ ret.append(cs.HANDLER)
+
+ return ret
+
+ def Get_Interfaces(self, e):
+ self.q.dbus_return(e.message, self.get_interfaces(), signature='v')
+
+ def GetAll_Client(self, e):
+ self.q.dbus_return(e.message, {'Interfaces': self.get_interfaces()},
+ signature='a{sv}')
+
+ def GetAll_Observer(self, e):
+ assert self.observe
+ self.q.dbus_return(e.message, {'ObserverChannelFilter': self.observe},
+ signature='a{sv}')
+
+ def Get_ObserverChannelFilter(self, e):
+ assert self.observe
+ self.q.dbus_return(e.message, self.observe, signature='v')
+
+ def GetAll_Approver(self, e):
+ assert self.approve
+ self.q.dbus_return(e.message, {'ApproverChannelFilter': self.approve},
+ signature='a{sv}')
+
+ def Get_ApproverChannelFilter(self, e):
+ assert self.approve
+ self.q.dbus_return(e.message, self.approve, signature='v')
+
+ def GetAll_Handler(self, e):
+ assert self.handle
+ self.q.dbus_return(e.message, {
+ 'HandlerChannelFilter': self.handle,
+ 'BypassApproval': self.bypass_approval,
+ 'HandledChannels': self.handled_channels,
+ },
+ signature='a{sv}')
+
+ def Get_HandlerChannelFilter(self, e):
+ assert self.handle
+ self.q.dbus_return(e.message, self.handle, signature='v')
+
def create_fakecm_account(q, bus, mc, params):
"""Create a fake connection manager and an account that uses it.
"""
--
1.5.6.5
More information about the telepathy-commits
mailing list