[Telepathy-commits] [telepathy-gabble/master] move test accepting private dbus tube to accept-private-dbus-tube.py
Guillaume Desmottes
guillaume.desmottes at collabora.co.uk
Fri Feb 27 05:41:42 PST 2009
---
tests/twisted/Makefile.am | 1 +
tests/twisted/tubes/accept-private-dbus-tube.py | 215 ++++++++++++++++++++
.../tubes/offer-accept-private-dbus-stream-tube.py | 118 +-----------
3 files changed, 217 insertions(+), 117 deletions(-)
create mode 100644 tests/twisted/tubes/accept-private-dbus-tube.py
diff --git a/tests/twisted/Makefile.am b/tests/twisted/Makefile.am
index fe87003..defb425 100644
--- a/tests/twisted/Makefile.am
+++ b/tests/twisted/Makefile.am
@@ -43,6 +43,7 @@ TWISTED_TESTS = \
text/test-text-no-body.py \
text/test-text.py \
tubes/accept-muc-dbus-tube.py \
+ tubes/accept-private-dbus-tube.py \
tubes/muc-presence.py \
tubes/close-muc-with-closed-tube.py \
tubes/crash-on-list-channels.py \
diff --git a/tests/twisted/tubes/accept-private-dbus-tube.py b/tests/twisted/tubes/accept-private-dbus-tube.py
new file mode 100644
index 0000000..5649297
--- /dev/null
+++ b/tests/twisted/tubes/accept-private-dbus-tube.py
@@ -0,0 +1,215 @@
+"""Test 1-1 tubes support."""
+
+import dbus
+
+from servicetest import call_async, EventPattern, sync_dbus
+from gabbletest import acknowledge_iq, sync_stream
+import constants as cs
+import ns
+import tubetestutil as t
+from bytestream import parse_si_reply
+
+from twisted.words.xish import domish, xpath
+
+sample_parameters = dbus.Dictionary({
+ 's': 'hello',
+ 'ay': dbus.ByteArray('hello'),
+ 'u': dbus.UInt32(123),
+ 'i': dbus.Int32(-123),
+ }, signature='sv')
+
+new_sample_parameters = dbus.Dictionary({
+ 's': 'newhello',
+ 'ay': dbus.ByteArray('newhello'),
+ 'u': dbus.UInt32(123),
+ 'i': dbus.Int32(-123),
+ }, signature='sv')
+
+def contact_offer_dbus_tube(bytestream, tube_id):
+ iq, si = bytestream.create_si_offer(ns.TUBES)
+
+ tube = si.addElement((ns.TUBES, 'tube'))
+ tube['type'] = 'dbus'
+ tube['service'] = 'com.example.TestCase2'
+ tube['id'] = tube_id
+ parameters = tube.addElement((None, 'parameters'))
+ parameter = parameters.addElement((None, 'parameter'))
+ parameter['type'] = 'str'
+ parameter['name'] = 'login'
+ parameter.addContent('TEST')
+
+ bytestream.stream.send(iq)
+
+def test(q, bus, conn, stream, bytestream_cls):
+ t.check_conn_properties(q, conn)
+
+ conn.Connect()
+
+ _, vcard_event, roster_event = q.expect_many(
+ EventPattern('dbus-signal', signal='StatusChanged', args=[0, 1]),
+ EventPattern('stream-iq', to=None, query_ns='vcard-temp',
+ query_name='vCard'),
+ EventPattern('stream-iq', query_ns='jabber:iq:roster'))
+
+ self_handle = conn.GetSelfHandle()
+
+ acknowledge_iq(stream, vcard_event.stanza)
+
+ roster = roster_event.stanza
+ roster['type'] = 'result'
+ item = roster_event.query.addElement('item')
+ item['jid'] = 'bob at localhost' # Bob can do tubes
+ item['subscription'] = 'both'
+ stream.send(roster)
+
+ bob_full_jid = 'bob at localhost/Bob'
+ self_full_jid = 'test at localhost/Resource'
+
+ # Send Bob presence and his tube caps
+ presence = domish.Element(('jabber:client', 'presence'))
+ presence['from'] = bob_full_jid
+ presence['to'] = self_full_jid
+ c = presence.addElement('c')
+ c['xmlns'] = 'http://jabber.org/protocol/caps'
+ c['node'] = 'http://example.com/ICantBelieveItsNotTelepathy'
+ c['ver'] = '1.2.3'
+ stream.send(presence)
+
+ event = q.expect('stream-iq', iq_type='get',
+ query_ns='http://jabber.org/protocol/disco#info',
+ to=bob_full_jid)
+ result = event.stanza
+ result['type'] = 'result'
+ assert event.query['node'] == \
+ 'http://example.com/ICantBelieveItsNotTelepathy#1.2.3'
+ feature = event.query.addElement('feature')
+ feature['var'] = ns.TUBES
+ stream.send(result)
+
+ # A tube request can be done only if the contact has tube capabilities
+ # Ensure that Bob's caps have been received
+ sync_stream(q, stream)
+
+ # Also ensure that all the new contact list channels have been announced,
+ # so that the NewChannel(s) signals we look for after calling
+ # RequestChannel are the ones we wanted.
+ sync_dbus(bus, q, conn)
+
+ # let's try to accept a D-Bus tube using the old API
+ bytestream4 = bytestream_cls(stream, q, 'beta', bob_full_jid,
+ 'test at localhost/Reource', True)
+
+ contact_offer_dbus_tube(bytestream4, '69')
+
+ # tubes channel is created
+ event = q.expect('dbus-signal', signal='NewChannel')
+
+ bob_handle = conn.RequestHandles(cs.HT_CONTACT, ['bob at localhost'])[0]
+
+ t.check_NewChannel_signal(event.args, cs.CHANNEL_TYPE_TUBES, None,
+ bob_handle, False)
+
+ tubes_chan = bus.get_object(conn.bus_name, event.args[0])
+ tubes_iface = dbus.Interface(tubes_chan, cs.CHANNEL_TYPE_TUBES)
+
+ event = q.expect('dbus-signal', signal='NewTube')
+ id = event.args[0]
+ initiator = event.args[1]
+ type = event.args[2]
+ service = event.args[3]
+ parameters = event.args[4]
+ state = event.args[5]
+
+ assert id == 69
+ initiator_jid = conn.InspectHandles(1, [initiator])[0]
+ assert initiator_jid == 'bob at localhost'
+ assert type == cs.TUBE_TYPE_DBUS
+ assert service == 'com.example.TestCase2'
+ assert parameters == {'login': 'TEST'}
+ assert state == cs.TUBE_STATE_LOCAL_PENDING
+
+ # accept the tube (old API)
+ call_async(q, tubes_iface, 'AcceptDBusTube', id)
+
+ event = q.expect('stream-iq', iq_type='result')
+ bytestream = parse_si_reply (event.stanza)
+ assert bytestream == bytestream4.get_ns()
+ tube = xpath.queryForNodes('/iq/si/tube[@xmlns="%s"]' % ns.TUBES,
+ event.stanza)
+ assert len(tube) == 1
+
+ # Init the bytestream
+ event = bytestream4.open_bytestream(EventPattern('dbus-return', method='AcceptDBusTube'))
+
+ address = event.value[0]
+ assert len(address) > 0
+
+ event = q.expect('dbus-signal', signal='TubeStateChanged',
+ args=[69, 2]) # 2 == OPEN
+ id = event.args[0]
+ state = event.args[1]
+
+ # OK, now let's try to accept a D-Bus tube using the new API
+ bytestream5 = bytestream_cls(stream, q, 'gamma', bob_full_jid,
+ self_full_jid, True)
+
+ contact_offer_dbus_tube(bytestream5, '70')
+
+ e = q.expect('dbus-signal', signal='NewChannels')
+ channels = e.args[0]
+ assert len(channels) == 1
+ path, props = channels[0]
+
+ assert props[cs.CHANNEL_TYPE] == cs.CHANNEL_TYPE_DBUS_TUBE
+ assert props[cs.INITIATOR_HANDLE] == bob_handle
+ assert props[cs.INITIATOR_ID] == 'bob at localhost'
+ assert props[cs.INTERFACES] == [cs.CHANNEL_IFACE_TUBE]
+ assert props[cs.REQUESTED] == False
+ assert props[cs.TARGET_HANDLE] == bob_handle
+ assert props[cs.TARGET_ID] == 'bob at localhost'
+ assert props[cs.DBUS_TUBE_SERVICE_NAME] == 'com.example.TestCase2'
+ assert props[cs.TUBE_PARAMETERS] == {'login': 'TEST'}
+ assert cs.TUBE_STATE not in props
+
+ tube_chan = bus.get_object(conn.bus_name, path)
+ tube_chan_iface = dbus.Interface(tube_chan, cs.CHANNEL)
+ dbus_tube_iface = dbus.Interface(tube_chan, cs.CHANNEL_TYPE_DBUS_TUBE)
+
+ status = tube_chan.Get(cs.CHANNEL_IFACE_TUBE, 'State', dbus_interface=cs.PROPERTIES_IFACE)
+ assert status == cs.TUBE_STATE_LOCAL_PENDING
+
+ # accept the tube (new API)
+ call_async(q, dbus_tube_iface, 'AcceptDBusTube')
+
+ event = q.expect('stream-iq', iq_type='result')
+ bytestream = parse_si_reply (event.stanza)
+ assert bytestream == bytestream5.get_ns()
+ tube = xpath.queryForNodes('/iq/si/tube[@xmlns="%s"]' % ns.TUBES, event.stanza)
+ assert len(tube) == 1
+
+ # Init the bytestream
+ return_event = bytestream5.open_bytestream(EventPattern('dbus-return', method='AcceptDBusTube'))
+
+ _, state_event = q.expect_many(
+ EventPattern('stream-iq', iq_type='result'),
+ EventPattern('dbus-signal', signal='TubeChannelStateChanged'))
+
+ addr = return_event.value[0]
+ assert len(addr) > 0
+
+ assert state_event.args[0] == cs.TUBE_STATE_OPEN
+
+ # close the tube
+ tube_chan_iface.Close()
+
+ q.expect_many(
+ EventPattern('dbus-signal', signal='Closed'),
+ EventPattern('dbus-signal', signal='ChannelClosed'))
+
+ # OK, we're done
+ conn.Disconnect()
+
+ q.expect('dbus-signal', signal='StatusChanged', args=[2, 1])
+
+if __name__ == '__main__':
+ t.exec_tube_test(test)
diff --git a/tests/twisted/tubes/offer-accept-private-dbus-stream-tube.py b/tests/twisted/tubes/offer-accept-private-dbus-stream-tube.py
index 63712d7..9562056 100644
--- a/tests/twisted/tubes/offer-accept-private-dbus-stream-tube.py
+++ b/tests/twisted/tubes/offer-accept-private-dbus-stream-tube.py
@@ -9,9 +9,7 @@ from gabbletest import acknowledge_iq, sync_stream
import constants as cs
import ns
import tubetestutil as t
-from bytestream import parse_si_offer, create_si_reply, parse_si_reply
-
-from dbus import PROPERTIES_IFACE
+from bytestream import parse_si_offer, create_si_reply
from twisted.words.xish import domish, xpath
@@ -29,25 +27,7 @@ new_sample_parameters = dbus.Dictionary({
'i': dbus.Int32(-123),
}, signature='sv')
-def contact_offer_dbus_tube(bytestream, tube_id):
- iq, si = bytestream.create_si_offer(ns.TUBES)
-
- tube = si.addElement((ns.TUBES, 'tube'))
- tube['type'] = 'dbus'
- tube['service'] = 'com.example.TestCase2'
- tube['id'] = tube_id
- parameters = tube.addElement((None, 'parameters'))
- parameter = parameters.addElement((None, 'parameter'))
- parameter['type'] = 'str'
- parameter['name'] = 'login'
- parameter.addContent('TEST')
-
- bytestream.stream.send(iq)
-
def test(q, bus, conn, stream, bytestream_cls):
- t.set_up_echo("")
- t.set_up_echo("2")
-
t.check_conn_properties(q, conn)
conn.Connect()
@@ -199,102 +179,6 @@ def test(q, bus, conn, stream, bytestream_cls):
bytestream4 = bytestream_cls(stream, q, 'beta', bob_full_jid,
'test at localhost/Reource', True)
- contact_offer_dbus_tube(bytestream4, '69')
-
- event = q.expect('dbus-signal', signal='NewTube')
- id = event.args[0]
- initiator = event.args[1]
- type = event.args[2]
- service = event.args[3]
- parameters = event.args[4]
- state = event.args[5]
-
- assert id == 69
- initiator_jid = conn.InspectHandles(1, [initiator])[0]
- assert initiator_jid == 'bob at localhost'
- assert type == cs.TUBE_TYPE_DBUS
- assert service == 'com.example.TestCase2'
- assert parameters == {'login': 'TEST'}
- assert state == cs.TUBE_STATE_LOCAL_PENDING
-
- # accept the tube (old API)
- call_async(q, tubes_iface, 'AcceptDBusTube', id)
-
- event = q.expect('stream-iq', iq_type='result')
- bytestream = parse_si_reply (event.stanza)
- assert bytestream == bytestream4.get_ns()
- tube = xpath.queryForNodes('/iq/si/tube[@xmlns="%s"]' % ns.TUBES,
- event.stanza)
- assert len(tube) == 1
-
- # Init the bytestream
- event = bytestream4.open_bytestream(EventPattern('dbus-return', method='AcceptDBusTube'))
-
- address = event.value[0]
- assert len(address) > 0
-
- event = q.expect('dbus-signal', signal='TubeStateChanged',
- args=[69, 2]) # 2 == OPEN
- id = event.args[0]
- state = event.args[1]
-
- # OK, now let's try to accept a D-Bus tube using the new API
- bytestream5 = bytestream_cls(stream, q, 'gamma', bob_full_jid,
- self_full_jid, True)
-
- contact_offer_dbus_tube(bytestream5, '70')
-
- e = q.expect('dbus-signal', signal='NewChannels')
- channels = e.args[0]
- assert len(channels) == 1
- path, props = channels[0]
-
- assert props[cs.CHANNEL_TYPE] == cs.CHANNEL_TYPE_DBUS_TUBE
- assert props[cs.INITIATOR_HANDLE] == bob_handle
- assert props[cs.INITIATOR_ID] == 'bob at localhost'
- assert props[cs.INTERFACES] == [cs.CHANNEL_IFACE_TUBE]
- assert props[cs.REQUESTED] == False
- assert props[cs.TARGET_HANDLE] == bob_handle
- assert props[cs.TARGET_ID] == 'bob at localhost'
- assert props[cs.DBUS_TUBE_SERVICE_NAME] == 'com.example.TestCase2'
- assert props[cs.TUBE_PARAMETERS] == {'login': 'TEST'}
- assert cs.TUBE_STATE not in props
-
- tube_chan = bus.get_object(conn.bus_name, path)
- tube_chan_iface = dbus.Interface(tube_chan, cs.CHANNEL)
- dbus_tube_iface = dbus.Interface(tube_chan, cs.CHANNEL_TYPE_DBUS_TUBE)
-
- status = tube_chan.Get(cs.CHANNEL_IFACE_TUBE, 'State', dbus_interface=PROPERTIES_IFACE)
- assert status == cs.TUBE_STATE_LOCAL_PENDING
-
- # accept the tube (new API)
- call_async(q, dbus_tube_iface, 'AcceptDBusTube')
-
- event = q.expect('stream-iq', iq_type='result')
- bytestream = parse_si_reply (event.stanza)
- assert bytestream == bytestream5.get_ns()
- tube = xpath.queryForNodes('/iq/si/tube[@xmlns="%s"]' % ns.TUBES, event.stanza)
- assert len(tube) == 1
-
- # Init the bytestream
- return_event = bytestream5.open_bytestream(EventPattern('dbus-return', method='AcceptDBusTube'))
-
- _, state_event = q.expect_many(
- EventPattern('stream-iq', iq_type='result'),
- EventPattern('dbus-signal', signal='TubeChannelStateChanged'))
-
- addr = return_event.value[0]
- assert len(addr) > 0
-
- assert state_event.args[0] == cs.TUBE_STATE_OPEN
-
- # close the tube
- tube_chan_iface.Close()
-
- q.expect_many(
- EventPattern('dbus-signal', signal='Closed'),
- EventPattern('dbus-signal', signal='ChannelClosed'))
-
# OK, we're done
conn.Disconnect()
--
1.5.6.5
More information about the telepathy-commits
mailing list