[Telepathy-commits] [telepathy-gabble/master] move test accepting private dbus tube to accept-private-dbus-tube.py

Guillaume Desmottes guillaume.desmottes at collabora.co.uk
Fri Feb 27 05:41:42 PST 2009


---
 tests/twisted/Makefile.am                          |    1 +
 tests/twisted/tubes/accept-private-dbus-tube.py    |  215 ++++++++++++++++++++
 .../tubes/offer-accept-private-dbus-stream-tube.py |  118 +-----------
 3 files changed, 217 insertions(+), 117 deletions(-)
 create mode 100644 tests/twisted/tubes/accept-private-dbus-tube.py

diff --git a/tests/twisted/Makefile.am b/tests/twisted/Makefile.am
index fe87003..defb425 100644
--- a/tests/twisted/Makefile.am
+++ b/tests/twisted/Makefile.am
@@ -43,6 +43,7 @@ TWISTED_TESTS = \
 	text/test-text-no-body.py \
 	text/test-text.py \
 	tubes/accept-muc-dbus-tube.py \
+	tubes/accept-private-dbus-tube.py \
 	tubes/muc-presence.py \
 	tubes/close-muc-with-closed-tube.py \
 	tubes/crash-on-list-channels.py \
diff --git a/tests/twisted/tubes/accept-private-dbus-tube.py b/tests/twisted/tubes/accept-private-dbus-tube.py
new file mode 100644
index 0000000..5649297
--- /dev/null
+++ b/tests/twisted/tubes/accept-private-dbus-tube.py
@@ -0,0 +1,215 @@
+"""Test 1-1 tubes support."""
+
+import dbus
+
+from servicetest import call_async, EventPattern, sync_dbus
+from gabbletest import acknowledge_iq, sync_stream
+import constants as cs
+import ns
+import tubetestutil as t
+from bytestream import parse_si_reply
+
+from twisted.words.xish import domish, xpath
+
+sample_parameters = dbus.Dictionary({
+    's': 'hello',
+    'ay': dbus.ByteArray('hello'),
+    'u': dbus.UInt32(123),
+    'i': dbus.Int32(-123),
+    }, signature='sv')
+
+new_sample_parameters = dbus.Dictionary({
+    's': 'newhello',
+    'ay': dbus.ByteArray('newhello'),
+    'u': dbus.UInt32(123),
+    'i': dbus.Int32(-123),
+    }, signature='sv')
+
+def contact_offer_dbus_tube(bytestream, tube_id):
+    iq, si = bytestream.create_si_offer(ns.TUBES)
+
+    tube = si.addElement((ns.TUBES, 'tube'))
+    tube['type'] = 'dbus'
+    tube['service'] = 'com.example.TestCase2'
+    tube['id'] = tube_id
+    parameters = tube.addElement((None, 'parameters'))
+    parameter = parameters.addElement((None, 'parameter'))
+    parameter['type'] = 'str'
+    parameter['name'] = 'login'
+    parameter.addContent('TEST')
+
+    bytestream.stream.send(iq)
+
+def test(q, bus, conn, stream, bytestream_cls):
+    t.check_conn_properties(q, conn)
+
+    conn.Connect()
+
+    _, vcard_event, roster_event = q.expect_many(
+        EventPattern('dbus-signal', signal='StatusChanged', args=[0, 1]),
+        EventPattern('stream-iq', to=None, query_ns='vcard-temp',
+            query_name='vCard'),
+        EventPattern('stream-iq', query_ns='jabber:iq:roster'))
+
+    self_handle = conn.GetSelfHandle()
+
+    acknowledge_iq(stream, vcard_event.stanza)
+
+    roster = roster_event.stanza
+    roster['type'] = 'result'
+    item = roster_event.query.addElement('item')
+    item['jid'] = 'bob at localhost' # Bob can do tubes
+    item['subscription'] = 'both'
+    stream.send(roster)
+
+    bob_full_jid = 'bob at localhost/Bob'
+    self_full_jid = 'test at localhost/Resource'
+
+    # Send Bob presence and his tube caps
+    presence = domish.Element(('jabber:client', 'presence'))
+    presence['from'] = bob_full_jid
+    presence['to'] = self_full_jid
+    c = presence.addElement('c')
+    c['xmlns'] = 'http://jabber.org/protocol/caps'
+    c['node'] = 'http://example.com/ICantBelieveItsNotTelepathy'
+    c['ver'] = '1.2.3'
+    stream.send(presence)
+
+    event = q.expect('stream-iq', iq_type='get',
+        query_ns='http://jabber.org/protocol/disco#info',
+        to=bob_full_jid)
+    result = event.stanza
+    result['type'] = 'result'
+    assert event.query['node'] == \
+        'http://example.com/ICantBelieveItsNotTelepathy#1.2.3'
+    feature = event.query.addElement('feature')
+    feature['var'] = ns.TUBES
+    stream.send(result)
+
+    # A tube request can be done only if the contact has tube capabilities
+    # Ensure that Bob's caps have been received
+    sync_stream(q, stream)
+
+    # Also ensure that all the new contact list channels have been announced,
+    # so that the NewChannel(s) signals we look for after calling
+    # RequestChannel are the ones we wanted.
+    sync_dbus(bus, q, conn)
+
+    # let's try to accept a D-Bus tube using the old API
+    bytestream4 = bytestream_cls(stream, q, 'beta', bob_full_jid,
+        'test at localhost/Reource', True)
+
+    contact_offer_dbus_tube(bytestream4, '69')
+
+    # tubes channel is created
+    event = q.expect('dbus-signal', signal='NewChannel')
+
+    bob_handle = conn.RequestHandles(cs.HT_CONTACT, ['bob at localhost'])[0]
+
+    t.check_NewChannel_signal(event.args, cs.CHANNEL_TYPE_TUBES, None,
+        bob_handle, False)
+
+    tubes_chan = bus.get_object(conn.bus_name, event.args[0])
+    tubes_iface = dbus.Interface(tubes_chan, cs.CHANNEL_TYPE_TUBES)
+
+    event = q.expect('dbus-signal', signal='NewTube')
+    id = event.args[0]
+    initiator = event.args[1]
+    type = event.args[2]
+    service = event.args[3]
+    parameters = event.args[4]
+    state = event.args[5]
+
+    assert id == 69
+    initiator_jid = conn.InspectHandles(1, [initiator])[0]
+    assert initiator_jid == 'bob at localhost'
+    assert type == cs.TUBE_TYPE_DBUS
+    assert service == 'com.example.TestCase2'
+    assert parameters == {'login': 'TEST'}
+    assert state == cs.TUBE_STATE_LOCAL_PENDING
+
+    # accept the tube (old API)
+    call_async(q, tubes_iface, 'AcceptDBusTube', id)
+
+    event = q.expect('stream-iq', iq_type='result')
+    bytestream = parse_si_reply (event.stanza)
+    assert bytestream == bytestream4.get_ns()
+    tube = xpath.queryForNodes('/iq/si/tube[@xmlns="%s"]' % ns.TUBES,
+        event.stanza)
+    assert len(tube) == 1
+
+    # Init the bytestream
+    event = bytestream4.open_bytestream(EventPattern('dbus-return', method='AcceptDBusTube'))
+
+    address = event.value[0]
+    assert len(address) > 0
+
+    event = q.expect('dbus-signal', signal='TubeStateChanged',
+        args=[69, 2]) # 2 == OPEN
+    id = event.args[0]
+    state = event.args[1]
+
+    # OK, now let's try to accept a D-Bus tube using the new API
+    bytestream5 = bytestream_cls(stream, q, 'gamma', bob_full_jid,
+        self_full_jid, True)
+
+    contact_offer_dbus_tube(bytestream5, '70')
+
+    e = q.expect('dbus-signal', signal='NewChannels')
+    channels = e.args[0]
+    assert len(channels) == 1
+    path, props = channels[0]
+
+    assert props[cs.CHANNEL_TYPE] == cs.CHANNEL_TYPE_DBUS_TUBE
+    assert props[cs.INITIATOR_HANDLE] == bob_handle
+    assert props[cs.INITIATOR_ID] == 'bob at localhost'
+    assert props[cs.INTERFACES] == [cs.CHANNEL_IFACE_TUBE]
+    assert props[cs.REQUESTED] == False
+    assert props[cs.TARGET_HANDLE] == bob_handle
+    assert props[cs.TARGET_ID] == 'bob at localhost'
+    assert props[cs.DBUS_TUBE_SERVICE_NAME] == 'com.example.TestCase2'
+    assert props[cs.TUBE_PARAMETERS] == {'login': 'TEST'}
+    assert cs.TUBE_STATE not in props
+
+    tube_chan = bus.get_object(conn.bus_name, path)
+    tube_chan_iface = dbus.Interface(tube_chan, cs.CHANNEL)
+    dbus_tube_iface = dbus.Interface(tube_chan, cs.CHANNEL_TYPE_DBUS_TUBE)
+
+    status = tube_chan.Get(cs.CHANNEL_IFACE_TUBE, 'State', dbus_interface=cs.PROPERTIES_IFACE)
+    assert status == cs.TUBE_STATE_LOCAL_PENDING
+
+    # accept the tube (new API)
+    call_async(q, dbus_tube_iface, 'AcceptDBusTube')
+
+    event = q.expect('stream-iq', iq_type='result')
+    bytestream = parse_si_reply (event.stanza)
+    assert bytestream == bytestream5.get_ns()
+    tube = xpath.queryForNodes('/iq/si/tube[@xmlns="%s"]' % ns.TUBES, event.stanza)
+    assert len(tube) == 1
+
+    # Init the bytestream
+    return_event = bytestream5.open_bytestream(EventPattern('dbus-return', method='AcceptDBusTube'))
+
+    _, state_event = q.expect_many(
+        EventPattern('stream-iq', iq_type='result'),
+        EventPattern('dbus-signal', signal='TubeChannelStateChanged'))
+
+    addr = return_event.value[0]
+    assert len(addr) > 0
+
+    assert state_event.args[0] == cs.TUBE_STATE_OPEN
+
+    # close the tube
+    tube_chan_iface.Close()
+
+    q.expect_many(
+        EventPattern('dbus-signal', signal='Closed'),
+        EventPattern('dbus-signal', signal='ChannelClosed'))
+
+    # OK, we're done
+    conn.Disconnect()
+
+    q.expect('dbus-signal', signal='StatusChanged', args=[2, 1])
+
+if __name__ == '__main__':
+    t.exec_tube_test(test)
diff --git a/tests/twisted/tubes/offer-accept-private-dbus-stream-tube.py b/tests/twisted/tubes/offer-accept-private-dbus-stream-tube.py
index 63712d7..9562056 100644
--- a/tests/twisted/tubes/offer-accept-private-dbus-stream-tube.py
+++ b/tests/twisted/tubes/offer-accept-private-dbus-stream-tube.py
@@ -9,9 +9,7 @@ from gabbletest import acknowledge_iq, sync_stream
 import constants as cs
 import ns
 import tubetestutil as t
-from bytestream import parse_si_offer, create_si_reply, parse_si_reply
-
-from dbus import PROPERTIES_IFACE
+from bytestream import parse_si_offer, create_si_reply
 
 from twisted.words.xish import domish, xpath
 
@@ -29,25 +27,7 @@ new_sample_parameters = dbus.Dictionary({
     'i': dbus.Int32(-123),
     }, signature='sv')
 
-def contact_offer_dbus_tube(bytestream, tube_id):
-    iq, si = bytestream.create_si_offer(ns.TUBES)
-
-    tube = si.addElement((ns.TUBES, 'tube'))
-    tube['type'] = 'dbus'
-    tube['service'] = 'com.example.TestCase2'
-    tube['id'] = tube_id
-    parameters = tube.addElement((None, 'parameters'))
-    parameter = parameters.addElement((None, 'parameter'))
-    parameter['type'] = 'str'
-    parameter['name'] = 'login'
-    parameter.addContent('TEST')
-
-    bytestream.stream.send(iq)
-
 def test(q, bus, conn, stream, bytestream_cls):
-    t.set_up_echo("")
-    t.set_up_echo("2")
-
     t.check_conn_properties(q, conn)
 
     conn.Connect()
@@ -199,102 +179,6 @@ def test(q, bus, conn, stream, bytestream_cls):
     bytestream4 = bytestream_cls(stream, q, 'beta', bob_full_jid,
         'test at localhost/Reource', True)
 
-    contact_offer_dbus_tube(bytestream4, '69')
-
-    event = q.expect('dbus-signal', signal='NewTube')
-    id = event.args[0]
-    initiator = event.args[1]
-    type = event.args[2]
-    service = event.args[3]
-    parameters = event.args[4]
-    state = event.args[5]
-
-    assert id == 69
-    initiator_jid = conn.InspectHandles(1, [initiator])[0]
-    assert initiator_jid == 'bob at localhost'
-    assert type == cs.TUBE_TYPE_DBUS
-    assert service == 'com.example.TestCase2'
-    assert parameters == {'login': 'TEST'}
-    assert state == cs.TUBE_STATE_LOCAL_PENDING
-
-    # accept the tube (old API)
-    call_async(q, tubes_iface, 'AcceptDBusTube', id)
-
-    event = q.expect('stream-iq', iq_type='result')
-    bytestream = parse_si_reply (event.stanza)
-    assert bytestream == bytestream4.get_ns()
-    tube = xpath.queryForNodes('/iq/si/tube[@xmlns="%s"]' % ns.TUBES,
-        event.stanza)
-    assert len(tube) == 1
-
-    # Init the bytestream
-    event = bytestream4.open_bytestream(EventPattern('dbus-return', method='AcceptDBusTube'))
-
-    address = event.value[0]
-    assert len(address) > 0
-
-    event = q.expect('dbus-signal', signal='TubeStateChanged',
-        args=[69, 2]) # 2 == OPEN
-    id = event.args[0]
-    state = event.args[1]
-
-    # OK, now let's try to accept a D-Bus tube using the new API
-    bytestream5 = bytestream_cls(stream, q, 'gamma', bob_full_jid,
-        self_full_jid, True)
-
-    contact_offer_dbus_tube(bytestream5, '70')
-
-    e = q.expect('dbus-signal', signal='NewChannels')
-    channels = e.args[0]
-    assert len(channels) == 1
-    path, props = channels[0]
-
-    assert props[cs.CHANNEL_TYPE] == cs.CHANNEL_TYPE_DBUS_TUBE
-    assert props[cs.INITIATOR_HANDLE] == bob_handle
-    assert props[cs.INITIATOR_ID] == 'bob at localhost'
-    assert props[cs.INTERFACES] == [cs.CHANNEL_IFACE_TUBE]
-    assert props[cs.REQUESTED] == False
-    assert props[cs.TARGET_HANDLE] == bob_handle
-    assert props[cs.TARGET_ID] == 'bob at localhost'
-    assert props[cs.DBUS_TUBE_SERVICE_NAME] == 'com.example.TestCase2'
-    assert props[cs.TUBE_PARAMETERS] == {'login': 'TEST'}
-    assert cs.TUBE_STATE not in props
-
-    tube_chan = bus.get_object(conn.bus_name, path)
-    tube_chan_iface = dbus.Interface(tube_chan, cs.CHANNEL)
-    dbus_tube_iface = dbus.Interface(tube_chan, cs.CHANNEL_TYPE_DBUS_TUBE)
-
-    status = tube_chan.Get(cs.CHANNEL_IFACE_TUBE, 'State', dbus_interface=PROPERTIES_IFACE)
-    assert status == cs.TUBE_STATE_LOCAL_PENDING
-
-    # accept the tube (new API)
-    call_async(q, dbus_tube_iface, 'AcceptDBusTube')
-
-    event = q.expect('stream-iq', iq_type='result')
-    bytestream = parse_si_reply (event.stanza)
-    assert bytestream == bytestream5.get_ns()
-    tube = xpath.queryForNodes('/iq/si/tube[@xmlns="%s"]' % ns.TUBES, event.stanza)
-    assert len(tube) == 1
-
-    # Init the bytestream
-    return_event = bytestream5.open_bytestream(EventPattern('dbus-return', method='AcceptDBusTube'))
-
-    _, state_event = q.expect_many(
-        EventPattern('stream-iq', iq_type='result'),
-        EventPattern('dbus-signal', signal='TubeChannelStateChanged'))
-
-    addr = return_event.value[0]
-    assert len(addr) > 0
-
-    assert state_event.args[0] == cs.TUBE_STATE_OPEN
-
-    # close the tube
-    tube_chan_iface.Close()
-
-    q.expect_many(
-        EventPattern('dbus-signal', signal='Closed'),
-        EventPattern('dbus-signal', signal='ChannelClosed'))
-
     # OK, we're done
     conn.Disconnect()
 
-- 
1.5.6.5




More information about the telepathy-commits mailing list