[Telepathy-commits] [telepathy-gabble/master] remove offer-accept-private-dbus-stream-tube-socks5.py as offer-accept-private-dbus-stream-tube.py now test using SOCKS5 too
Guillaume Desmottes
guillaume.desmottes at collabora.co.uk
Fri Feb 27 04:50:27 PST 2009
---
tests/twisted/Makefile.am | 1 -
...offer-accept-private-dbus-stream-tube-socks5.py | 484 --------------------
2 files changed, 0 insertions(+), 485 deletions(-)
delete mode 100644 tests/twisted/tubes/offer-accept-private-dbus-stream-tube-socks5.py
diff --git a/tests/twisted/Makefile.am b/tests/twisted/Makefile.am
index 866a917..99fdbb1 100644
--- a/tests/twisted/Makefile.am
+++ b/tests/twisted/Makefile.am
@@ -51,7 +51,6 @@ TWISTED_TESTS = \
tubes/accept-muc-stream-tube.py \
tubes/accept-private-stream-tube.py \
tubes/offer-accept-private-dbus-stream-tube.py \
- tubes/offer-accept-private-dbus-stream-tube-socks5.py \
tubes/offer-accept-private-stream-tube-si-fallback.py \
tubes/offer-muc-dbus-tube.py \
tubes/offer-muc-stream-tube.py \
diff --git a/tests/twisted/tubes/offer-accept-private-dbus-stream-tube-socks5.py b/tests/twisted/tubes/offer-accept-private-dbus-stream-tube-socks5.py
deleted file mode 100644
index c0f4446..0000000
--- a/tests/twisted/tubes/offer-accept-private-dbus-stream-tube-socks5.py
+++ /dev/null
@@ -1,484 +0,0 @@
-"""Test 1-1 tubes support."""
-
-import os
-
-import dbus
-from dbus.connection import Connection
-from dbus.lowlevel import SignalMessage
-
-from servicetest import call_async, EventPattern, watch_tube_signals, sync_dbus
-from gabbletest import exec_test, acknowledge_iq, sync_stream
-import constants as cs
-import ns
-import tubetestutil as t
-from bytestream import expect_socks5_reply, create_si_offer, parse_si_reply,\
- create_si_reply, parse_si_offer, BytestreamS5B
-
-from twisted.words.xish import domish, xpath
-
-sample_parameters = dbus.Dictionary({
- 's': 'hello',
- 'ay': dbus.ByteArray('hello'),
- 'u': dbus.UInt32(123),
- 'i': dbus.Int32(-123),
- }, signature='sv')
-
-new_sample_parameters = dbus.Dictionary({
- 's': 'newhello',
- 'ay': dbus.ByteArray('newhello'),
- 'u': dbus.UInt32(123),
- 'i': dbus.Int32(-123),
- }, signature='sv')
-
-def test(q, bus, conn, stream):
- t.set_up_echo("")
- t.set_up_echo("2")
-
- t.check_conn_properties(q, conn)
-
- conn.Connect()
-
- _, vcard_event, roster_event = q.expect_many(
- EventPattern('dbus-signal', signal='StatusChanged', args=[0, 1]),
- EventPattern('stream-iq', to=None, query_ns='vcard-temp',
- query_name='vCard'),
- EventPattern('stream-iq', query_ns='jabber:iq:roster'))
-
- self_handle = conn.GetSelfHandle()
-
- acknowledge_iq(stream, vcard_event.stanza)
-
- roster = roster_event.stanza
- roster['type'] = 'result'
- item = roster_event.query.addElement('item')
- item['jid'] = 'bob at localhost' # Bob can do tubes
- item['subscription'] = 'both'
- stream.send(roster)
-
- bob_full_jid = 'bob at localhost/Bob'
- self_full_jid = 'test at localhost/Resource'
-
- # Send Bob presence and his tube caps
- presence = domish.Element(('jabber:client', 'presence'))
- presence['from'] = bob_full_jid
- presence['to'] = self_full_jid
- c = presence.addElement('c')
- c['xmlns'] = 'http://jabber.org/protocol/caps'
- c['node'] = 'http://example.com/ICantBelieveItsNotTelepathy'
- c['ver'] = '1.2.3'
- stream.send(presence)
-
- event = q.expect('stream-iq', iq_type='get',
- query_ns='http://jabber.org/protocol/disco#info',
- to=bob_full_jid)
- result = event.stanza
- result['type'] = 'result'
- assert event.query['node'] == \
- 'http://example.com/ICantBelieveItsNotTelepathy#1.2.3'
- feature = event.query.addElement('feature')
- feature['var'] = ns.TUBES
- stream.send(result)
-
- # A tube request can be done only if the contact has tube capabilities
- # Ensure that Bob's caps have been received
- sync_stream(q, stream)
- # Also ensure that all the new contact list channels have been announced,
- # so that the NewChannel(s) signals we look for after calling
- # RequestChannel are the ones we wanted.
- sync_dbus(bus, q, conn)
-
- requestotron = dbus.Interface(conn, cs.CONN_IFACE_REQUESTS)
-
- # Test tubes with Bob. Bob does not have tube capabilities.
- bob_handle = conn.RequestHandles(1, ['bob at localhost'])[0]
-
- # old requestotron
- call_async(q, conn, 'RequestChannel', cs.CHANNEL_TYPE_TUBES, cs.HT_CONTACT,
- bob_handle, True);
-
- ret, old_sig, new_sig = q.expect_many(
- EventPattern('dbus-return', method='RequestChannel'),
- EventPattern('dbus-signal', signal='NewChannel'),
- EventPattern('dbus-signal', signal='NewChannels'),
- )
-
- assert len(ret.value) == 1
- chan_path = ret.value[0]
-
- t.check_NewChannel_signal(old_sig.args, cs.CHANNEL_TYPE_TUBES, chan_path,
- bob_handle, True)
- t.check_NewChannels_signal(new_sig.args, cs.CHANNEL_TYPE_TUBES, chan_path,
- bob_handle, 'bob at localhost', self_handle)
- old_tubes_channel_properties = new_sig.args[0][0]
-
- t.check_conn_properties(q, conn, [old_tubes_channel_properties])
- # Try to CreateChannel with correct properties
- # Gabble must succeed
- call_async(q, requestotron, 'CreateChannel',
- {cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_STREAM_TUBE,
- cs.TARGET_HANDLE_TYPE: cs.HT_CONTACT,
- cs.TARGET_HANDLE: bob_handle,
- cs.STREAM_TUBE_SERVICE: "newecho",
- })
- ret, old_sig, new_sig = q.expect_many(
- EventPattern('dbus-return', method='CreateChannel'),
- EventPattern('dbus-signal', signal='NewChannel'),
- EventPattern('dbus-signal', signal='NewChannels'),
- )
-
- new_chan_path, new_chan_prop_asv = ret.value
-
- assert new_chan_path.find("StreamTube") != -1, new_chan_path
- assert new_chan_path.find("SITubesChannel") == -1, new_chan_path
- # The path of the Channel.Type.Tubes object MUST be different to the path
- # of the Channel.Type.StreamTube object !
- assert chan_path != new_chan_path
-
- t.check_NewChannel_signal(old_sig.args, cs.CHANNEL_TYPE_STREAM_TUBE,
- new_chan_path, bob_handle, True)
- t.check_NewChannels_signal(new_sig.args, cs.CHANNEL_TYPE_STREAM_TUBE,
- new_chan_path, bob_handle, 'bob at localhost', self_handle)
- stream_tube_channel_properties = new_sig.args[0][0]
-
- t.check_conn_properties(q, conn,
- [old_tubes_channel_properties, stream_tube_channel_properties])
-
- tubes_chan = bus.get_object(conn.bus_name, chan_path)
- tubes_iface = dbus.Interface(tubes_chan, cs.CHANNEL_TYPE_TUBES)
-
- t.check_channel_properties(q, bus, conn, tubes_chan, cs.CHANNEL_TYPE_TUBES,
- bob_handle, "bob at localhost")
-
- # Offer the tube, old API
- # FIXME: make set_up_echo return this
- path = os.getcwd() + '/stream'
- call_async(q, tubes_iface, 'OfferStreamTube',
- 'echo', sample_parameters, 0, dbus.ByteArray(path), 0, "")
-
- event = q.expect('stream-message')
- message = event.stanza
- assert message['to'] == bob_full_jid # check the resource
- tube_nodes = xpath.queryForNodes('/message/tube[@xmlns="%s"]' % ns.TUBES,
- message)
- assert tube_nodes is not None
- assert len(tube_nodes) == 1
- tube = tube_nodes[0]
-
- assert tube['service'] == 'echo'
- assert tube['type'] == 'stream'
- assert not tube.hasAttribute('initiator')
- stream_tube_id = long(tube['id'])
-
- params = {}
- parameter_nodes = xpath.queryForNodes('/tube/parameters/parameter', tube)
- for node in parameter_nodes:
- assert node['name'] not in params
- params[node['name']] = (node['type'], str(node))
- assert params == {'ay': ('bytes', 'aGVsbG8='),
- 's': ('str', 'hello'),
- 'i': ('int', '-123'),
- 'u': ('uint', '123'),
- }
-
- # We offered a tube using the old tube API and created one with the new
- # API, so there are 2 tubes. Check the new tube API works
- assert len(filter(lambda x:
- x[1] == cs.CHANNEL_TYPE_TUBES,
- conn.ListChannels())) == 1
- channels = filter(lambda x:
- x[1] == cs.CHANNEL_TYPE_STREAM_TUBE and
- x[0] == new_chan_path,
- conn.ListChannels())
- assert len(channels) == 1
- assert new_chan_path == channels[0][0]
-
- tube_chan = bus.get_object(conn.bus_name, channels[0][0])
- tube_iface = dbus.Interface(tube_chan, cs.CHANNEL_TYPE_STREAM_TUBE)
- tube_prop_iface = dbus.Interface(tube_chan, dbus.PROPERTIES_IFACE)
-
- service = tube_prop_iface.Get(cs.CHANNEL_TYPE_STREAM_TUBE, 'Service')
- assert service == "newecho", service
-
- t.check_channel_properties(q, bus, conn, tubes_chan, cs.CHANNEL_TYPE_TUBES,
- bob_handle, "bob at localhost")
- t.check_channel_properties(q, bus, conn, tube_chan,
- cs.CHANNEL_TYPE_STREAM_TUBE, bob_handle, "bob at localhost",
- cs.TUBE_STATE_NOT_OFFERED)
-
- # Offer the tube, new API
- path2 = os.getcwd() + '/stream2'
- call_async(q, tube_iface, 'OfferStreamTube',
- 0, dbus.ByteArray(path2), 0, "", new_sample_parameters)
-
- event = q.expect('stream-message')
- message = event.stanza
- assert message['to'] == bob_full_jid # check the resource
- tube_nodes = xpath.queryForNodes('/message/tube[@xmlns="%s"]' % ns.TUBES,
- message)
- assert tube_nodes is not None
- assert len(tube_nodes) == 1
- tube = tube_nodes[0]
-
- assert tube['service'] == 'newecho'
- assert tube['type'] == 'stream'
- assert not tube.hasAttribute('initiator')
- new_stream_tube_id = long(tube['id'])
-
- params = {}
- parameter_nodes = xpath.queryForNodes('/tube/parameters/parameter', tube)
- for node in parameter_nodes:
- assert node['name'] not in params
- params[node['name']] = (node['type'], str(node))
- assert params == {'ay': ('bytes', 'bmV3aGVsbG8='),
- 's': ('str', 'newhello'),
- 'i': ('int', '-123'),
- 'u': ('uint', '123'),
- }
- # The new tube has been offered, the parameters cannot be changed anymore
- # We need to use call_async to check the error
- call_async(q, tube_prop_iface, 'Set', cs.CHANNEL_IFACE_TUBE,
- 'Parameters', dbus.Dictionary(
- {dbus.String(u'foo2'): dbus.String(u'bar2')},
- signature=dbus.Signature('sv')),
- dbus_interface=dbus.PROPERTIES_IFACE)
- set_error = q.expect('dbus-error')
- # check it is *not* correctly changed
- params = tube_prop_iface.Get(cs.CHANNEL_IFACE_TUBE, 'Parameters',
- byte_arrays=True)
- assert params == new_sample_parameters, (params, new_sample_parameters)
-
- # The CM is the server, so fake a client wanting to talk to it
- # Old API tube
-
- bytestream1 = BytestreamS5B(stream, q, 'alpha', bob_full_jid,
- self_full_jid, True)
-
- iq, si = create_si_offer(stream, bytestream1.initiator, bytestream1.target,
- bytestream1.stream_id, ns.TUBES, [bytestream1.get_ns()])
-
- stream_node = si.addElement((ns.TUBES, 'stream'))
- stream_node['tube'] = str(stream_tube_id)
- stream.send(iq)
-
- si_reply_event, _ = q.expect_many(
- EventPattern('stream-iq', iq_type='result'),
- EventPattern('dbus-signal', signal='TubeStateChanged',
- args=[stream_tube_id, cs.TUBE_STATE_OPEN]))
-
- bytestream = parse_si_reply(si_reply_event.stanza)
- assert bytestream == ns.BYTESTREAMS
- tube = xpath.queryForNodes('/iq/si/tube[@xmlns="%s"]' % ns.TUBES, si_reply_event.stanza)
- assert len(tube) == 1
-
- q.expect('dbus-signal', signal='StreamTubeNewConnection',
- args=[stream_tube_id, bob_handle])
-
- expected_tube = (stream_tube_id, self_handle, cs.TUBE_TYPE_STREAM, 'echo',
- sample_parameters, cs.TUBE_STATE_OPEN)
- tubes = tubes_iface.ListTubes(byte_arrays=True)
- t.check_tube_in_tubes(expected_tube, tubes)
-
- # The CM is the server, so fake a client wanting to talk to it
- # New API tube
- bytestream2 = BytestreamS5B(stream, q, 'beta', bob_full_jid,
- self_full_jid, True)
-
- iq, si = create_si_offer(stream, bytestream2.initiator, bytestream2.target,
- bytestream2.stream_id, ns.TUBES, [bytestream2.get_ns()])
-
- stream_node = si.addElement((ns.TUBES, 'stream'))
- stream_node['tube'] = str(new_stream_tube_id)
- stream.send(iq)
-
- si_reply_event, _ = q.expect_many(
- EventPattern('stream-iq', iq_type='result'),
- EventPattern('dbus-signal', signal='TubeChannelStateChanged',
- args=[cs.TUBE_STATE_OPEN]))
-
- bytestream = parse_si_reply(si_reply_event.stanza)
- assert bytestream == ns.BYTESTREAMS
- tube = xpath.queryForNodes('/iq//si/tube[@xmlns="%s"]' % ns.TUBES, si_reply_event.stanza)
- assert len(tube) == 1
-
- q.expect('dbus-signal', signal='StreamTubeNewConnection',
- args=[bob_handle])
-
- tubes = tubes_iface.ListTubes(byte_arrays=True)
- assert (
- new_stream_tube_id,
- self_handle,
- 1, # Unix stream
- 'newecho',
- new_sample_parameters,
- cs.TUBE_STATE_OPEN,
- ) in tubes, tubes
-
-
- bytestream1.open_bytestream()
-
- streamhost_used = expect_socks5_reply(q)
- assert streamhost_used['jid'] == bob_full_jid
-
- bytestream1.send_data("HELLO WORLD")
- event = q.expect('s5b-data-received')
- assert event.data == 'hello world'
-
- # this connection is disconnected
- bytestream1.transport.loseConnection()
-
- bytestream2.open_bytestream()
-
- streamhost_used = expect_socks5_reply(q)
- assert streamhost_used['jid'] == bob_full_jid
-
- bytestream2.send_data("HELLO, NEW WORLD")
- event = q.expect('s5b-data-received')
- assert event.data == 'hello, new world'
-
- # OK, how about D-Bus?
- call_async(q, tubes_iface, 'OfferDBusTube',
- 'com.example.TestCase', sample_parameters)
-
- event = q.expect('stream-iq', iq_type='set', to=bob_full_jid)
- profile, dbus_stream_id, bytestreams = parse_si_offer(event.stanza)
-
- assert profile == ns.TUBES
- assert bytestreams == [ns.BYTESTREAMS, ns.IBB]
-
- tube = xpath.queryForNodes('/iq/si/tube', event.stanza)[0]
- assert tube['initiator'] == 'test at localhost'
- assert tube['service'] == 'com.example.TestCase'
- assert tube['stream-id'] == dbus_stream_id
- assert not tube.hasAttribute('dbus-name')
- assert tube['type'] == 'dbus'
- dbus_tube_id = long(tube['id'])
-
- params = {}
- parameter_nodes = xpath.queryForNodes('/tube/parameters/parameter', tube)
- for node in parameter_nodes:
- assert node['name'] not in params
- params[node['name']] = (node['type'], str(node))
- assert params == {'ay': ('bytes', 'aGVsbG8='),
- 's': ('str', 'hello'),
- 'i': ('int', '-123'),
- 'u': ('uint', '123'),
- }
-
- bytestream3 = BytestreamS5B(stream, q, dbus_stream_id, self_full_jid,
- event.stanza['to'], False)
-
- result, si = create_si_reply(stream, event.stanza, self_full_jid, bytestream3.get_ns())
- stream.send(result)
-
- bytestream3.wait_bytestream_open()
-
- q.expect('dbus-signal', signal='TubeStateChanged',
- args=[dbus_tube_id, cs.TUBE_STATE_OPEN])
-
- tubes = tubes_iface.ListTubes(byte_arrays=True)
- expected_dtube = (dbus_tube_id, self_handle, cs.TUBE_TYPE_DBUS,
- 'com.example.TestCase', sample_parameters, cs.TUBE_STATE_OPEN)
- expected_stube = (stream_tube_id, self_handle, cs.TUBE_TYPE_STREAM,
- 'echo', sample_parameters, cs.TUBE_STATE_OPEN)
- t.check_tube_in_tubes(expected_dtube, tubes)
- t.check_tube_in_tubes(expected_stube, tubes)
-
- dbus_tube_adr = tubes_iface.GetDBusTubeAddress(dbus_tube_id)
- dbus_tube_conn = Connection(dbus_tube_adr)
-
- signal = SignalMessage('/', 'foo.bar', 'baz')
- my_bus_name = ':123.whatever.you.like'
- signal.set_sender(my_bus_name)
- signal.append(42, signature='u')
- dbus_tube_conn.send_message(signal)
-
- event = q.expect('s5b-data-received')
- dbus_message = event.data
-
- # little and big endian versions of: SIGNAL, NO_REPLY, protocol v1,
- # 4-byte payload
- assert dbus_message.startswith('l\x04\x01\x01' '\x04\x00\x00\x00') or \
- dbus_message.startswith('B\x04\x01\x01' '\x00\x00\x00\x04')
- # little and big endian versions of the 4-byte payload, UInt32(42)
- assert (dbus_message[0] == 'l' and dbus_message.endswith('\x2a\x00\x00\x00')) or \
- (dbus_message[0] == 'B' and dbus_message.endswith('\x00\x00\x00\x2a'))
- # XXX: verify that it's actually in the "sender" slot, rather than just
- # being in the message somewhere
- assert my_bus_name in dbus_message
-
- watch_tube_signals(q, dbus_tube_conn)
-
- # Have the fake client send us a message all in one go...
- bytestream3.send_data(dbus_message)
-
- # ... and a message one byte at a time ...
- for byte in dbus_message:
- bytestream3.send_data(byte)
-
- # ... and two messages in one go
- bytestream3.send_data(dbus_message + dbus_message)
-
- q.expect('tube-signal', signal='baz', args=[42], tube=dbus_tube_conn)
- q.expect('tube-signal', signal='baz', args=[42], tube=dbus_tube_conn)
- q.expect('tube-signal', signal='baz', args=[42], tube=dbus_tube_conn)
- q.expect('tube-signal', signal='baz', args=[42], tube=dbus_tube_conn)
-
- # OK, now let's try to accept a D-Bus tube
- bytestream4 = BytestreamS5B(stream, q, 'beta', bob_full_jid, self_full_jid, True)
-
- iq, si = create_si_offer(stream, bytestream4.initiator, bytestream4.target,
- bytestream4.stream_id, ns.TUBES, [bytestream4.get_ns()])
-
- tube = si.addElement((ns.TUBES, 'tube'))
- tube['type'] = 'dbus'
- tube['service'] = 'com.example.TestCase2'
- tube['id'] = '69'
- parameters = tube.addElement((None, 'parameters'))
- parameter = parameters.addElement((None, 'parameter'))
- parameter['type'] = 'str'
- parameter['name'] = 'login'
- parameter.addContent('TEST')
-
- stream.send(iq)
-
- event = q.expect('dbus-signal', signal='NewTube')
- id = event.args[0]
- initiator = event.args[1]
- type = event.args[2]
- service = event.args[3]
- parameters = event.args[4]
- state = event.args[5]
-
- assert id == 69
- initiator_jid = conn.InspectHandles(1, [initiator])[0]
- assert initiator_jid == 'bob at localhost'
- assert type == cs.TUBE_TYPE_DBUS
- assert service == 'com.example.TestCase2'
- assert parameters == {'login': 'TEST'}
- assert state == cs.TUBE_STATE_LOCAL_PENDING
-
- # accept the tube
- call_async(q, tubes_iface, 'AcceptDBusTube', id)
-
- event = q.expect('stream-iq', iq_type='result')
- bytestream = parse_si_reply(event.stanza)
- assert bytestream == ns.BYTESTREAMS
- tube = xpath.queryForNodes('/iq//si/tube[@xmlns="%s"]' % ns.TUBES, event.stanza)
- assert len(tube) == 1
-
- expected = EventPattern('dbus-return', method='AcceptDBusTube')
-
- # Init the SOCKS5 bytestream
- event = bytestream4.open_bytestream(expected)
-
- address = event.value[0]
- assert len(address) > 0
-
- # OK, we're done
- conn.Disconnect()
-
- q.expect('tube-signal', signal='Disconnected')
- q.expect('dbus-signal', signal='StatusChanged', args=[2, 1])
-
-if __name__ == '__main__':
- exec_test(test)
--
1.5.6.5
More information about the telepathy-commits
mailing list