[telepathy-salut/master] add two-muc-dbus-tubes.py
Guillaume Desmottes
guillaume.desmottes at collabora.co.uk
Fri Jun 26 07:28:14 PDT 2009
---
tests/twisted/Makefile.am | 3 +-
tests/twisted/avahi/tubes/two-muc-dbus-tubes.py | 180 +++++++++++++++++++++++
2 files changed, 182 insertions(+), 1 deletions(-)
create mode 100644 tests/twisted/avahi/tubes/two-muc-dbus-tubes.py
diff --git a/tests/twisted/Makefile.am b/tests/twisted/Makefile.am
index 2f7d41a..dc21561 100644
--- a/tests/twisted/Makefile.am
+++ b/tests/twisted/Makefile.am
@@ -49,7 +49,8 @@ TWISTED_AVAHI_TESTS = \
avahi/tubes/test-two-muc-stream-tubes.py \
avahi/tubes/test-two-private-stream-tubes.py \
avahi/tubes/test-tube-close.py \
- avahi/tubes/tubes-to-nonexistant-ids.py
+ avahi/tubes/tubes-to-nonexistant-ids.py \
+ avahi/tubes/two-muc-dbus-tubes.py
TWISTED_AVAHI_OLPC_TESTS = \
avahi/test-olpc-activity-announcements.py
diff --git a/tests/twisted/avahi/tubes/two-muc-dbus-tubes.py b/tests/twisted/avahi/tubes/two-muc-dbus-tubes.py
new file mode 100644
index 0000000..ce76d7b
--- /dev/null
+++ b/tests/twisted/avahi/tubes/two-muc-dbus-tubes.py
@@ -0,0 +1,180 @@
+from saluttest import exec_test
+import dbus
+
+from servicetest import make_channel_proxy, call_async, EventPattern
+
+import constants as cs
+import tubetestutil as t
+
+sample_parameters = dbus.Dictionary({
+ 's': 'hello',
+ 'ay': dbus.ByteArray('hello'),
+ 'u': dbus.UInt32(123),
+ 'i': dbus.Int32(-123),
+ }, signature='sv')
+
+muc_name = "test-two-muc-stream-tubes"
+
+def test(q, bus, conn):
+
+ contact1_name, conn2, contact2_name, contact2_handle_on_conn1,\
+ contact1_handle_on_conn2 = t.connect_two_accounts(q, bus, conn)
+
+ conn1_self_handle = conn.GetSelfHandle()
+
+ # first connection: join muc
+ muc_handle1, group1 = t.join_muc(q, conn, muc_name)
+
+ # Can we request muc D-Bus tubes?
+ properties = conn.GetAll(cs.CONN_IFACE_REQUESTS,
+ dbus_interface=cs.PROPERTIES_IFACE)
+
+ assert ({cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_DBUS_TUBE,
+ cs.TARGET_HANDLE_TYPE: cs.HT_ROOM},
+ [cs.TARGET_HANDLE, cs.TARGET_ID, cs.DBUS_TUBE_SERVICE_NAME]
+ ) in properties.get('RequestableChannelClasses'),\
+ properties['RequestableChannelClasses']
+
+ # request a stream tube channel (new API)
+ requestotron = dbus.Interface(conn, cs.CONN_IFACE_REQUESTS)
+
+ requestotron.CreateChannel({
+ cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_DBUS_TUBE,
+ cs.TARGET_HANDLE_TYPE: cs.HT_ROOM,
+ cs.TARGET_ID: muc_name,
+ cs.DBUS_TUBE_SERVICE_NAME: 'com.example.TestCase'})
+
+ e = q.expect('dbus-signal', signal='NewChannels')
+ channels = e.args[0]
+ assert len(channels) == 2
+
+ # get the list of all channels to check that newly announced ones are in it
+ all_channels = conn.Get(cs.CONN_IFACE_REQUESTS, 'Channels', dbus_interface=cs.PROPERTIES_IFACE,
+ byte_arrays=True)
+
+ got_tubes, got_tube = False, False
+ for path, props in channels:
+ if props[cs.CHANNEL_TYPE] == cs.CHANNEL_TYPE_TUBES:
+ got_tubes = True
+ assert props[cs.REQUESTED] == False
+ assert props[cs.INTERFACES] == [cs.CHANNEL_IFACE_GROUP]
+ elif props[cs.CHANNEL_TYPE] == cs.CHANNEL_TYPE_DBUS_TUBE:
+ got_tube = True
+ assert props[cs.REQUESTED] == True
+ assert props[cs.INTERFACES] == [cs.CHANNEL_IFACE_GROUP,
+ cs.CHANNEL_IFACE_TUBE]
+ assert props[cs.DBUS_TUBE_SERVICE_NAME] == 'com.example.TestCase'
+ assert props[cs.DBUS_TUBE_SUPPORTED_ACCESS_CONTROLS] == [cs.SOCKET_ACCESS_CONTROL_CREDENTIALS]
+
+ contact1_tube = bus.get_object(conn.bus_name, path)
+ contact1_dbus_tube = make_channel_proxy(conn, path,
+ "Channel.Type.DBusTube")
+ contact1_tube_channel = make_channel_proxy(conn, path, "Channel")
+ tube1_path = path
+ else:
+ assert False
+
+ assert props[cs.INITIATOR_HANDLE] == conn1_self_handle
+ assert props[cs.INITIATOR_ID] == contact1_name
+ assert props[cs.TARGET_ID] == muc_name
+
+ assert (path, props) in all_channels, (path, props)
+
+ assert got_tubes
+ assert got_tube
+
+ state = contact1_dbus_tube.Get(cs.CHANNEL_IFACE_TUBE, 'State',
+ dbus_interface=cs.PROPERTIES_IFACE)
+ assert state == cs.TUBE_CHANNEL_STATE_NOT_OFFERED
+
+ call_async(q, contact1_dbus_tube, 'Offer', sample_parameters,
+ cs.SOCKET_ACCESS_CONTROL_CREDENTIALS)
+
+ q.expect_many(
+ EventPattern('dbus-signal', signal='TubeChannelStateChanged',
+ args=[cs.TUBE_CHANNEL_STATE_OPEN]),
+ EventPattern('dbus-return', method='Offer'))
+
+ state = contact1_dbus_tube.Get(cs.CHANNEL_IFACE_TUBE, 'State',
+ dbus_interface=cs.PROPERTIES_IFACE)
+ assert state == cs.TUBE_CHANNEL_STATE_OPEN
+
+ t.invite_to_muc(q, group1, conn2, contact2_handle_on_conn1, contact1_handle_on_conn2)
+
+ # tubes channel is created
+ e = q.expect('dbus-signal', signal='NewChannels')
+ channels = e.args[0]
+ assert len(channels) == 2
+
+ # get the list of all channels to check that newly announced ones are in it
+ all_channels = conn2.Get(cs.CONN_IFACE_REQUESTS, 'Channels', dbus_interface=cs.PROPERTIES_IFACE,
+ byte_arrays=True)
+
+ got_tubes, got_tube = False, False
+ for path, props in channels:
+ if props[cs.CHANNEL_TYPE] == cs.CHANNEL_TYPE_TUBES:
+ got_tubes = True
+ assert props[cs.REQUESTED] == False
+ assert props[cs.INTERFACES] == [cs.CHANNEL_IFACE_GROUP]
+ elif props[cs.CHANNEL_TYPE] == cs.CHANNEL_TYPE_DBUS_TUBE:
+ got_tube = True
+ assert props[cs.REQUESTED] == False
+ assert props[cs.INTERFACES] == [cs.CHANNEL_IFACE_GROUP,
+ cs.CHANNEL_IFACE_TUBE]
+ assert props[cs.TUBE_PARAMETERS] == sample_parameters
+ assert props[cs.DBUS_TUBE_SERVICE_NAME] == 'com.example.TestCase'
+ assert props[cs.DBUS_TUBE_SUPPORTED_ACCESS_CONTROLS] == [cs.SOCKET_ACCESS_CONTROL_CREDENTIALS]
+
+ contact2_tube = bus.get_object(conn.bus_name, path)
+ contact2_stream_tube = make_channel_proxy(conn, path,
+ "Channel.Type.DBusTube")
+ contact2_tube_channel = make_channel_proxy(conn, path, "Channel")
+ tube2_path = path
+ else:
+ assert False
+
+ assert props[cs.INITIATOR_HANDLE] == contact1_handle_on_conn2
+ assert props[cs.INITIATOR_ID] == contact1_name
+ assert props[cs.TARGET_ID] == muc_name
+
+ assert (path, props) in all_channels, (path, props)
+
+ assert got_tubes
+ assert got_tube
+
+ state = contact2_tube.Get(cs.CHANNEL_IFACE_TUBE, 'State',
+ dbus_interface=cs.PROPERTIES_IFACE)
+ assert state == cs.TUBE_CHANNEL_STATE_LOCAL_PENDING
+
+ # second connection: accept the tube (new API)
+ unix_socket_adr = contact2_stream_tube.Accept(cs.SOCKET_ACCESS_CONTROL_CREDENTIALS)
+
+ state = contact2_tube.Get(cs.CHANNEL_IFACE_TUBE, 'State',
+ dbus_interface=cs.PROPERTIES_IFACE)
+ assert state == cs.TUBE_CHANNEL_STATE_OPEN
+
+ e = q.expect('dbus-signal', signal='TubeChannelStateChanged',
+ path=tube2_path, args=[cs.TUBE_CHANNEL_STATE_OPEN])
+
+ # TODO: use the tube
+
+ call_async(q, contact1_tube_channel, 'Close')
+ q.expect_many(
+ EventPattern('dbus-return', method='Close'),
+ EventPattern('dbus-signal', signal='Closed'),
+ EventPattern('dbus-signal', signal='TubeClosed'),
+ EventPattern('dbus-signal', signal='ChannelClosed'))
+
+ call_async(q, contact2_tube_channel, 'Close')
+ q.expect_many(
+ EventPattern('dbus-return', method='Close'),
+ EventPattern('dbus-signal', signal='Closed'),
+ EventPattern('dbus-signal', signal='TubeClosed'),
+ EventPattern('dbus-signal', signal='ChannelClosed'))
+
+ conn.Disconnect()
+ conn2.Disconnect()
+
+if __name__ == '__main__':
+ # increase timer because Clique takes some time to join an existing muc
+ exec_test(test, timeout=60)
--
1.5.6.5
More information about the telepathy-commits
mailing list