[telepathy-salut/master] two-muc-dbus-tubes.py: test DBusNames property
Guillaume Desmottes
guillaume.desmottes at collabora.co.uk
Fri Jun 26 07:28:15 PDT 2009
---
tests/twisted/avahi/tubes/two-muc-dbus-tubes.py | 20 ++++++++++++++++++--
1 files changed, 18 insertions(+), 2 deletions(-)
diff --git a/tests/twisted/avahi/tubes/two-muc-dbus-tubes.py b/tests/twisted/avahi/tubes/two-muc-dbus-tubes.py
index e2a8504..6c98206 100644
--- a/tests/twisted/avahi/tubes/two-muc-dbus-tubes.py
+++ b/tests/twisted/avahi/tubes/two-muc-dbus-tubes.py
@@ -15,12 +15,18 @@ sample_parameters = dbus.Dictionary({
muc_name = "test-two-muc-stream-tubes"
+def check_dbus_names(tube, members):
+ names = tube.Get(cs.CHANNEL_TYPE_DBUS_TUBE, 'DBusNames',
+ dbus_interface=cs.PROPERTIES_IFACE)
+ assert set(names.keys()) == set(members), names.keys()
+
def test(q, bus, conn):
contact1_name, conn2, contact2_name, contact2_handle_on_conn1,\
contact1_handle_on_conn2 = t.connect_two_accounts(q, bus, conn)
conn1_self_handle = conn.GetSelfHandle()
+ conn2_self_handle = conn2.GetSelfHandle()
# first connection: join muc
muc_handle1, group1 = t.join_muc(q, conn, muc_name)
@@ -99,6 +105,8 @@ def test(q, bus, conn):
dbus_interface=cs.PROPERTIES_IFACE)
assert state == cs.TUBE_CHANNEL_STATE_OPEN
+ check_dbus_names(contact1_dbus_tube, [conn1_self_handle])
+
t.invite_to_muc(q, group1, conn2, contact2_handle_on_conn1, contact1_handle_on_conn2)
# tubes channel is created
@@ -129,7 +137,7 @@ def test(q, bus, conn):
assert props[cs.DBUS_TUBE_SUPPORTED_ACCESS_CONTROLS] == [cs.SOCKET_ACCESS_CONTROL_CREDENTIALS]
contact2_tube = bus.get_object(conn.bus_name, path)
- contact2_stream_tube = make_channel_proxy(conn, path,
+ contact2_dbus_tube = make_channel_proxy(conn, path,
"Channel.Type.DBusTube")
contact2_tube_channel = make_channel_proxy(conn, path, "Channel")
tube2_path = path
@@ -155,8 +163,11 @@ def test(q, bus, conn):
dbus_interface=cs.PROPERTIES_IFACE)
assert state == cs.TUBE_CHANNEL_STATE_LOCAL_PENDING
+ # first connection: contact2 is not in the tube yet
+ check_dbus_names(contact1_dbus_tube, [conn1_self_handle])
+
# second connection: accept the tube (new API)
- unix_socket_adr = contact2_stream_tube.Accept(cs.SOCKET_ACCESS_CONTROL_CREDENTIALS)
+ unix_socket_adr = contact2_dbus_tube.Accept(cs.SOCKET_ACCESS_CONTROL_CREDENTIALS)
state = contact2_tube.Get(cs.CHANNEL_IFACE_TUBE, 'State',
dbus_interface=cs.PROPERTIES_IFACE)
@@ -172,6 +183,9 @@ def test(q, bus, conn):
assert added.keys() == [contact2_handle_on_conn1]
assert removed == []
+ check_dbus_names(contact1_dbus_tube, [conn1_self_handle, contact2_handle_on_conn1])
+ check_dbus_names(contact2_dbus_tube, [conn2_self_handle, contact1_handle_on_conn2])
+
# TODO: use the tube
call_async(q, contact1_tube_channel, 'Close')
@@ -188,6 +202,8 @@ def test(q, bus, conn):
assert added == {}
assert removed == [contact1_handle_on_conn2]
+ check_dbus_names(contact2_dbus_tube, [conn2_self_handle])
+
call_async(q, contact2_tube_channel, 'Close')
q.expect_many(
EventPattern('dbus-return', method='Close'),
--
1.5.6.5
More information about the telepathy-commits
mailing list