[telepathy-salut/master] add two-muc-dbus-tubes.py

Guillaume Desmottes guillaume.desmottes at collabora.co.uk
Fri Jun 26 07:28:14 PDT 2009


---
 tests/twisted/Makefile.am                       |    3 +-
 tests/twisted/avahi/tubes/two-muc-dbus-tubes.py |  180 +++++++++++++++++++++++
 2 files changed, 182 insertions(+), 1 deletions(-)
 create mode 100644 tests/twisted/avahi/tubes/two-muc-dbus-tubes.py

diff --git a/tests/twisted/Makefile.am b/tests/twisted/Makefile.am
index 2f7d41a..dc21561 100644
--- a/tests/twisted/Makefile.am
+++ b/tests/twisted/Makefile.am
@@ -49,7 +49,8 @@ TWISTED_AVAHI_TESTS = \
 	avahi/tubes/test-two-muc-stream-tubes.py \
 	avahi/tubes/test-two-private-stream-tubes.py \
 	avahi/tubes/test-tube-close.py \
-	avahi/tubes/tubes-to-nonexistant-ids.py
+	avahi/tubes/tubes-to-nonexistant-ids.py \
+	avahi/tubes/two-muc-dbus-tubes.py
 
 TWISTED_AVAHI_OLPC_TESTS = \
 	avahi/test-olpc-activity-announcements.py
diff --git a/tests/twisted/avahi/tubes/two-muc-dbus-tubes.py b/tests/twisted/avahi/tubes/two-muc-dbus-tubes.py
new file mode 100644
index 0000000..ce76d7b
--- /dev/null
+++ b/tests/twisted/avahi/tubes/two-muc-dbus-tubes.py
@@ -0,0 +1,180 @@
+from saluttest import exec_test
+import dbus
+
+from servicetest import make_channel_proxy, call_async, EventPattern
+
+import constants as cs
+import tubetestutil as t
+
+sample_parameters = dbus.Dictionary({
+    's': 'hello',
+    'ay': dbus.ByteArray('hello'),
+    'u': dbus.UInt32(123),
+    'i': dbus.Int32(-123),
+    }, signature='sv')
+
+muc_name = "test-two-muc-stream-tubes"
+
+def test(q, bus, conn):
+
+    contact1_name, conn2, contact2_name, contact2_handle_on_conn1,\
+        contact1_handle_on_conn2 = t.connect_two_accounts(q, bus, conn)
+
+    conn1_self_handle = conn.GetSelfHandle()
+
+    # first connection: join muc
+    muc_handle1, group1 = t.join_muc(q, conn, muc_name)
+
+    # Can we request muc D-Bus tubes?
+    properties = conn.GetAll(cs.CONN_IFACE_REQUESTS,
+        dbus_interface=cs.PROPERTIES_IFACE)
+
+    assert ({cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_DBUS_TUBE,
+             cs.TARGET_HANDLE_TYPE: cs.HT_ROOM},
+         [cs.TARGET_HANDLE, cs.TARGET_ID, cs.DBUS_TUBE_SERVICE_NAME]
+        ) in properties.get('RequestableChannelClasses'),\
+                 properties['RequestableChannelClasses']
+
+    # request a stream tube channel (new API)
+    requestotron = dbus.Interface(conn, cs.CONN_IFACE_REQUESTS)
+
+    requestotron.CreateChannel({
+        cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_DBUS_TUBE,
+        cs.TARGET_HANDLE_TYPE: cs.HT_ROOM,
+        cs.TARGET_ID: muc_name,
+        cs.DBUS_TUBE_SERVICE_NAME: 'com.example.TestCase'})
+
+    e = q.expect('dbus-signal', signal='NewChannels')
+    channels = e.args[0]
+    assert len(channels) == 2
+
+    # get the list of all channels to check that newly announced ones are in it
+    all_channels = conn.Get(cs.CONN_IFACE_REQUESTS, 'Channels', dbus_interface=cs.PROPERTIES_IFACE,
+        byte_arrays=True)
+
+    got_tubes, got_tube = False, False
+    for path, props in channels:
+        if props[cs.CHANNEL_TYPE] == cs.CHANNEL_TYPE_TUBES:
+            got_tubes = True
+            assert props[cs.REQUESTED] == False
+            assert props[cs.INTERFACES] == [cs.CHANNEL_IFACE_GROUP]
+        elif props[cs.CHANNEL_TYPE] == cs.CHANNEL_TYPE_DBUS_TUBE:
+            got_tube = True
+            assert props[cs.REQUESTED] == True
+            assert props[cs.INTERFACES] == [cs.CHANNEL_IFACE_GROUP,
+                cs.CHANNEL_IFACE_TUBE]
+            assert props[cs.DBUS_TUBE_SERVICE_NAME] == 'com.example.TestCase'
+            assert props[cs.DBUS_TUBE_SUPPORTED_ACCESS_CONTROLS] == [cs.SOCKET_ACCESS_CONTROL_CREDENTIALS]
+
+            contact1_tube = bus.get_object(conn.bus_name, path)
+            contact1_dbus_tube = make_channel_proxy(conn, path,
+                "Channel.Type.DBusTube")
+            contact1_tube_channel = make_channel_proxy(conn, path, "Channel")
+            tube1_path = path
+        else:
+            assert False
+
+        assert props[cs.INITIATOR_HANDLE] == conn1_self_handle
+        assert props[cs.INITIATOR_ID] == contact1_name
+        assert props[cs.TARGET_ID] == muc_name
+
+        assert (path, props) in all_channels, (path, props)
+
+    assert got_tubes
+    assert got_tube
+
+    state = contact1_dbus_tube.Get(cs.CHANNEL_IFACE_TUBE, 'State',
+        dbus_interface=cs.PROPERTIES_IFACE)
+    assert state == cs.TUBE_CHANNEL_STATE_NOT_OFFERED
+
+    call_async(q, contact1_dbus_tube, 'Offer', sample_parameters,
+        cs.SOCKET_ACCESS_CONTROL_CREDENTIALS)
+
+    q.expect_many(
+        EventPattern('dbus-signal', signal='TubeChannelStateChanged',
+            args=[cs.TUBE_CHANNEL_STATE_OPEN]),
+        EventPattern('dbus-return', method='Offer'))
+
+    state = contact1_dbus_tube.Get(cs.CHANNEL_IFACE_TUBE, 'State',
+        dbus_interface=cs.PROPERTIES_IFACE)
+    assert state == cs.TUBE_CHANNEL_STATE_OPEN
+
+    t.invite_to_muc(q, group1, conn2, contact2_handle_on_conn1, contact1_handle_on_conn2)
+
+    # tubes channel is created
+    e = q.expect('dbus-signal', signal='NewChannels')
+    channels = e.args[0]
+    assert len(channels) == 2
+
+    # get the list of all channels to check that newly announced ones are in it
+    all_channels = conn2.Get(cs.CONN_IFACE_REQUESTS, 'Channels', dbus_interface=cs.PROPERTIES_IFACE,
+        byte_arrays=True)
+
+    got_tubes, got_tube = False, False
+    for path, props in channels:
+        if props[cs.CHANNEL_TYPE] == cs.CHANNEL_TYPE_TUBES:
+            got_tubes = True
+            assert props[cs.REQUESTED] == False
+            assert props[cs.INTERFACES] == [cs.CHANNEL_IFACE_GROUP]
+        elif props[cs.CHANNEL_TYPE] == cs.CHANNEL_TYPE_DBUS_TUBE:
+            got_tube = True
+            assert props[cs.REQUESTED] == False
+            assert props[cs.INTERFACES] == [cs.CHANNEL_IFACE_GROUP,
+                cs.CHANNEL_IFACE_TUBE]
+            assert props[cs.TUBE_PARAMETERS] == sample_parameters
+            assert props[cs.DBUS_TUBE_SERVICE_NAME] == 'com.example.TestCase'
+            assert props[cs.DBUS_TUBE_SUPPORTED_ACCESS_CONTROLS] == [cs.SOCKET_ACCESS_CONTROL_CREDENTIALS]
+
+            contact2_tube = bus.get_object(conn.bus_name, path)
+            contact2_stream_tube = make_channel_proxy(conn, path,
+                "Channel.Type.DBusTube")
+            contact2_tube_channel = make_channel_proxy(conn, path, "Channel")
+            tube2_path = path
+        else:
+            assert False
+
+        assert props[cs.INITIATOR_HANDLE] == contact1_handle_on_conn2
+        assert props[cs.INITIATOR_ID] == contact1_name
+        assert props[cs.TARGET_ID] == muc_name
+
+        assert (path, props) in all_channels, (path, props)
+
+    assert got_tubes
+    assert got_tube
+
+    state = contact2_tube.Get(cs.CHANNEL_IFACE_TUBE, 'State',
+        dbus_interface=cs.PROPERTIES_IFACE)
+    assert state == cs.TUBE_CHANNEL_STATE_LOCAL_PENDING
+
+    # second connection: accept the tube (new API)
+    unix_socket_adr = contact2_stream_tube.Accept(cs.SOCKET_ACCESS_CONTROL_CREDENTIALS)
+
+    state = contact2_tube.Get(cs.CHANNEL_IFACE_TUBE, 'State',
+        dbus_interface=cs.PROPERTIES_IFACE)
+    assert state == cs.TUBE_CHANNEL_STATE_OPEN
+
+    e = q.expect('dbus-signal', signal='TubeChannelStateChanged',
+        path=tube2_path, args=[cs.TUBE_CHANNEL_STATE_OPEN])
+
+    # TODO: use the tube
+
+    call_async(q, contact1_tube_channel, 'Close')
+    q.expect_many(
+        EventPattern('dbus-return', method='Close'),
+        EventPattern('dbus-signal', signal='Closed'),
+        EventPattern('dbus-signal', signal='TubeClosed'),
+        EventPattern('dbus-signal', signal='ChannelClosed'))
+
+    call_async(q, contact2_tube_channel, 'Close')
+    q.expect_many(
+        EventPattern('dbus-return', method='Close'),
+        EventPattern('dbus-signal', signal='Closed'),
+        EventPattern('dbus-signal', signal='TubeClosed'),
+        EventPattern('dbus-signal', signal='ChannelClosed'))
+
+    conn.Disconnect()
+    conn2.Disconnect()
+
+if __name__ == '__main__':
+    # increase timer because Clique takes some time to join an existing muc
+    exec_test(test, timeout=60)
-- 
1.5.6.5




More information about the telepathy-commits mailing list