[Telepathy-commits] [telepathy-gabble/master] Add a test for SOCKS5 tubes
Marco Barisione
marco at barisione.org
Tue Jan 6 08:41:21 PST 2009
---
tests/twisted/tubes/test-si-socks5-tubes.py | 552 +++++++++++++++++++++++++++
1 files changed, 552 insertions(+), 0 deletions(-)
create mode 100644 tests/twisted/tubes/test-si-socks5-tubes.py
diff --git a/tests/twisted/tubes/test-si-socks5-tubes.py b/tests/twisted/tubes/test-si-socks5-tubes.py
new file mode 100644
index 0000000..8583bbf
--- /dev/null
+++ b/tests/twisted/tubes/test-si-socks5-tubes.py
@@ -0,0 +1,552 @@
+"""Test 1-1 tubes support."""
+
+import base64
+import errno
+import os
+
+import dbus
+from dbus.connection import Connection
+from dbus.lowlevel import SignalMessage
+
+from servicetest import call_async, EventPattern, tp_name_prefix, watch_tube_signals
+from gabbletest import exec_test, acknowledge_iq
+
+from twisted.words.xish import domish, xpath
+from twisted.internet.protocol import Factory, Protocol
+from twisted.internet import reactor
+from twisted.words.protocols.jabber.client import IQ
+
+from gabbleconfig import HAVE_DBUS_TUBES
+
+NS_TUBES = 'http://telepathy.freedesktop.org/xmpp/tubes'
+NS_SI = 'http://jabber.org/protocol/si'
+NS_FEATURE_NEG = 'http://jabber.org/protocol/feature-neg'
+NS_IBB = 'http://jabber.org/protocol/ibb'
+NS_X_DATA = 'jabber:x:data'
+NS_BYTESTREAMS = 'http://jabber.org/protocol/bytestreams'
+
+sample_parameters = dbus.Dictionary({
+ 's': 'hello',
+ 'ay': dbus.ByteArray('hello'),
+ 'u': dbus.UInt32(123),
+ 'i': dbus.Int32(-123),
+ }, signature='sv')
+
+
+class Echo(Protocol):
+ def dataReceived(self, data):
+ self.transport.write(data.lower())
+
+def set_up_echo():
+ factory = Factory()
+ factory.protocol = Echo
+ try:
+ os.remove(os.getcwd() + '/stream')
+ except OSError, e:
+ if e.errno != errno.ENOENT:
+ raise
+ reactor.listenUNIX(os.getcwd() + '/stream', factory)
+
+class S5BProtocol(Protocol):
+ def connectionMade(self):
+ self.factory.event_func(EventPattern('s5b-connected',
+ transport=self.transport))
+
+ def dataReceived(self, data):
+ self.factory.event_func(EventPattern('s5b-data-received', data=data,
+ transport=self.transport))
+
+class S5BFactory(Factory):
+ protocol = S5BProtocol
+
+ def __init__(self, event_func):
+ self.event_func = event_func
+
+ def buildProtocol(self, addr):
+ protocol = Factory.buildProtocol(self, addr)
+ return protocol
+
+ def startedConnecting(self, connector):
+ pass
+
+def test(q, bus, conn, stream):
+ set_up_echo()
+ conn.Connect()
+
+ properties = conn.GetAll(
+ 'org.freedesktop.Telepathy.Connection.Interface.Requests',
+ dbus_interface='org.freedesktop.DBus.Properties')
+ assert properties.get('Channels') == [], properties['Channels']
+ assert ({'org.freedesktop.Telepathy.Channel.ChannelType':
+ 'org.freedesktop.Telepathy.Channel.Type.Tubes',
+ 'org.freedesktop.Telepathy.Channel.TargetHandleType': 1,
+ },
+ ['org.freedesktop.Telepathy.Channel.TargetHandle',
+ 'org.freedesktop.Telepathy.Channel.TargetID',
+ ],
+ ) in properties.get('RequestableChannelClasses'),\
+ properties['RequestableChannelClasses']
+
+ _, vcard_event, roster_event = q.expect_many(
+ EventPattern('dbus-signal', signal='StatusChanged', args=[0, 1]),
+ EventPattern('stream-iq', to=None, query_ns='vcard-temp',
+ query_name='vCard'),
+ EventPattern('stream-iq', query_ns='jabber:iq:roster'))
+
+ acknowledge_iq(stream, vcard_event.stanza)
+
+ roster = roster_event.stanza
+ roster['type'] = 'result'
+ item = roster_event.query.addElement('item')
+ item['jid'] = 'bob at localhost'
+ item['subscription'] = 'both'
+ stream.send(roster)
+
+ presence = domish.Element(('jabber:client', 'presence'))
+ presence['from'] = 'bob at localhost/Bob'
+ presence['to'] = 'test at localhost/Resource'
+ c = presence.addElement('c')
+ c['xmlns'] = 'http://jabber.org/protocol/caps'
+ c['node'] = 'http://example.com/ICantBelieveItsNotTelepathy'
+ c['ver'] = '1.2.3'
+ stream.send(presence)
+
+ event = q.expect('stream-iq', iq_type='get',
+ query_ns='http://jabber.org/protocol/disco#info',
+ to='bob at localhost/Bob')
+ result = event.stanza
+ result['type'] = 'result'
+ assert event.query['node'] == \
+ 'http://example.com/ICantBelieveItsNotTelepathy#1.2.3'
+ feature = event.query.addElement('feature')
+ feature['var'] = NS_TUBES
+ stream.send(result)
+
+ bob_handle = conn.RequestHandles(1, ['bob at localhost'])[0]
+
+ call_async(q, conn, 'RequestChannel',
+ tp_name_prefix + '.Channel.Type.Tubes', 1, bob_handle, True);
+
+ ret, old_sig, new_sig = q.expect_many(
+ EventPattern('dbus-return', method='RequestChannel'),
+ EventPattern('dbus-signal', signal='NewChannel'),
+ EventPattern('dbus-signal', signal='NewChannels'),
+ )
+
+
+ assert len(ret.value) == 1
+ chan_path = ret.value[0]
+
+ assert old_sig.args[0] == chan_path
+ assert old_sig.args[1] == tp_name_prefix + '.Channel.Type.Tubes'
+ assert old_sig.args[2] == 1 # contact handle
+ assert old_sig.args[3] == bob_handle
+ assert old_sig.args[4] == True # suppress handler
+
+ assert len(new_sig.args) == 1
+ assert len(new_sig.args[0]) == 1 # one channel
+ assert len(new_sig.args[0][0]) == 2 # two struct members
+ assert new_sig.args[0][0][0] == ret.value[0]
+ emitted_props = new_sig.args[0][0][1]
+
+ assert emitted_props[tp_name_prefix + '.Channel.ChannelType'] ==\
+ tp_name_prefix + '.Channel.Type.Tubes'
+ assert emitted_props[tp_name_prefix + '.Channel.TargetHandleType'] == 1
+ assert emitted_props[tp_name_prefix + '.Channel.TargetHandle'] ==\
+ bob_handle
+ assert emitted_props[tp_name_prefix + '.Channel.TargetID'] == \
+ 'bob at localhost'
+ assert emitted_props[tp_name_prefix + '.Channel.Requested'] == True
+ assert emitted_props[tp_name_prefix + '.Channel.InitiatorHandle'] \
+ == conn.GetSelfHandle()
+ assert emitted_props[tp_name_prefix + '.Channel.InitiatorID'] == \
+ 'test at localhost'
+
+ properties = conn.GetAll(
+ 'org.freedesktop.Telepathy.Connection.Interface.Requests',
+ dbus_interface='org.freedesktop.DBus.Properties')
+
+ assert new_sig.args[0][0] in properties['Channels'], \
+ (new_sig.args[0][0], properties['Channels'])
+
+ tubes_chan = bus.get_object(conn.bus_name, chan_path)
+ tubes_iface = dbus.Interface(tubes_chan,
+ tp_name_prefix + '.Channel.Type.Tubes')
+
+ # Exercise basic Channel Properties from spec 0.17.7
+ channel_props = tubes_chan.GetAll(
+ 'org.freedesktop.Telepathy.Channel',
+ dbus_interface='org.freedesktop.DBus.Properties')
+ assert channel_props.get('TargetHandle') == bob_handle,\
+ (channel_props.get('TargetHandle'), bob_handle)
+ assert channel_props.get('TargetHandleType') == 1,\
+ channel_props.get('TargetHandleType')
+ assert channel_props.get('ChannelType') == \
+ 'org.freedesktop.Telepathy.Channel.Type.Tubes',\
+ channel_props.get('ChannelType')
+ assert 'Interfaces' in channel_props, channel_props
+ assert 'org.freedesktop.Telepathy.Channel.Interface.Group' not in \
+ channel_props['Interfaces'], \
+ channel_props['Interfaces']
+ assert channel_props['TargetID'] == 'bob at localhost'
+
+ self_handle = conn.GetSelfHandle()
+
+ assert channel_props['Requested'] == True
+ assert channel_props['InitiatorID'] == 'test at localhost'
+ assert channel_props['InitiatorHandle'] == self_handle
+
+ # Unix socket
+ path = os.getcwd() + '/stream'
+ call_async(q, tubes_iface, 'OfferStreamTube',
+ 'echo', sample_parameters, 0, dbus.ByteArray(path), 0, "")
+
+ event = q.expect('stream-message')
+ message = event.stanza
+ tube_nodes = xpath.queryForNodes('/message/tube[@xmlns="%s"]' % NS_TUBES,
+ message)
+ if tube_nodes is None:
+ return False
+
+ assert len(tube_nodes) == 1
+ tube = tube_nodes[0]
+
+ assert tube['service'] == 'echo'
+ assert tube['type'] == 'stream'
+ assert not tube.hasAttribute('initiator')
+ stream_tube_id = long(tube['id'])
+
+ params = {}
+ parameter_nodes = xpath.queryForNodes('/tube/parameters/parameter', tube)
+ for node in parameter_nodes:
+ assert node['name'] not in params
+ params[node['name']] = (node['type'], str(node))
+ assert params == {'ay': ('bytes', 'aGVsbG8='),
+ 's': ('str', 'hello'),
+ 'i': ('int', '-123'),
+ 'u': ('uint', '123'),
+ }
+
+ # The CM is the server, so fake a client wanting to talk to it
+ iq = IQ(stream, 'set')
+ iq['to'] = 'test at localhost/Resource'
+ iq['from'] = 'bob at localhost/Bob'
+ si = iq.addElement((NS_SI, 'si'))
+ si['id'] = 'alpha'
+ si['profile'] = NS_TUBES
+ feature = si.addElement((NS_FEATURE_NEG, 'feature'))
+ x = feature.addElement((NS_X_DATA, 'x'))
+ x['type'] = 'form'
+ field = x.addElement((None, 'field'))
+ field['var'] = 'stream-method'
+ field['type'] = 'list-single'
+ option = field.addElement((None, 'option'))
+ value = option.addElement((None, 'value'))
+ value.addContent(NS_BYTESTREAMS)
+
+ stream_node = si.addElement((NS_TUBES, 'stream'))
+ stream_node['tube'] = str(stream_tube_id)
+ stream.send(iq)
+
+ si_reply_event, _ = q.expect_many(
+ EventPattern('stream-iq', iq_type='result'),
+ EventPattern('dbus-signal', signal='TubeStateChanged',
+ args=[stream_tube_id, 2])) # 2 == OPEN
+ iq = si_reply_event.stanza
+ si = xpath.queryForNodes('/iq/si[@xmlns="%s"]' % NS_SI,
+ iq)[0]
+ value = xpath.queryForNodes('/si/feature/x/field/value', si)
+ assert len(value) == 1
+ proto = value[0]
+ assert str(proto) == NS_BYTESTREAMS
+ tube = xpath.queryForNodes('/si/tube[@xmlns="%s"]' % NS_TUBES, si)
+ assert len(tube) == 1
+
+ q.expect('dbus-signal', signal='StreamTubeNewConnection',
+ args=[stream_tube_id, bob_handle])
+
+ tubes = tubes_iface.ListTubes(byte_arrays=True)
+ assert tubes == [(
+ stream_tube_id,
+ self_handle,
+ 1, # Unix stream
+ 'echo',
+ sample_parameters,
+ 2, # OPEN
+ )]
+
+ reactor.listenTCP(5086, S5BFactory(q.append))
+
+ # have the fake client open the stream
+ iq = IQ(stream, 'set')
+ iq['to'] = 'test at localhost/Resource'
+ iq['from'] = 'bob at localhost/Bob'
+ query = iq.addElement((NS_BYTESTREAMS, 'query'))
+ query['sid'] = 'alpha'
+ query['mode'] = 'tcp'
+ streamhost = query.addElement('streamhost')
+ # FIXME test with a non-working streamhost
+ streamhost['jid'] = 'bob at localhost/Bob'
+ streamhost['host'] = '127.0.0.1'
+ streamhost['port'] = '5086'
+ stream.send(iq)
+
+ event = q.expect('s5b-data-received')
+ print event.properties['data']
+ assert event.properties['data'] == '\x05\x01\x00' # version 5, 1 auth method, no auth
+ event.properties['transport'].write('\x05\x00') # version 5, no auth
+ event = q.expect('s5b-data-received')
+ # ver + cmd + rsv + atyp + len + SHA1 (40) + dst.port (2) = 47
+ assert len(event.properties['data']) == 47
+ # version 5, connect, reserved, domain type, len(SHA-1)
+ assert event.properties['data'].startswith('\x05\x01\x00\x03' + chr(40))
+ # port
+ assert event.properties['data'].endswith('\x00\x00')
+
+ event.properties['transport'].write('\x05\x00') #version 5, ok
+ q.expect('stream-iq', iq_type='result')
+
+ event.properties['transport'].write("HELLO WORLD")
+ event = q.expect('s5b-data-received')
+ assert event.properties['data'] == 'hello world'
+
+ if not HAVE_DBUS_TUBES:
+ return
+
+ # OK, how about D-Bus?
+ call_async(q, tubes_iface, 'OfferDBusTube',
+ 'com.example.TestCase', sample_parameters)
+
+ event = q.expect('stream-iq', iq_type='set', to='bob at localhost/Bob')
+ iq = event.stanza
+ si_nodes = xpath.queryForNodes('/iq/si', iq)
+ if si_nodes is None:
+ return False
+
+ assert len(si_nodes) == 1
+ si = si_nodes[0]
+ assert si['profile'] == NS_TUBES
+ dbus_stream_id = si['id']
+
+ feature = xpath.queryForNodes('/si/feature', si)[0]
+ x = xpath.queryForNodes('/feature/x', feature)[0]
+ assert x['type'] == 'form'
+ field = xpath.queryForNodes('/x/field', x)[0]
+ assert field['var'] == 'stream-method'
+ assert field['type'] == 'list-single'
+ value = xpath.queryForNodes('/field/option/value', field)[0]
+ assert str(value) == NS_BYTESTREAMS
+ value = xpath.queryForNodes('/field/option/value', field)[1]
+ assert str(value) == NS_IBB
+
+ tube = xpath.queryForNodes('/si/tube', si)[0]
+ assert tube['initiator'] == 'test at localhost'
+ assert tube['service'] == 'com.example.TestCase'
+ assert tube['stream-id'] == dbus_stream_id
+ assert not tube.hasAttribute('dbus-name')
+ assert tube['type'] == 'dbus'
+ dbus_tube_id = long(tube['id'])
+
+ params = {}
+ parameter_nodes = xpath.queryForNodes('/tube/parameters/parameter', tube)
+ for node in parameter_nodes:
+ assert node['name'] not in params
+ params[node['name']] = (node['type'], str(node))
+ assert params == {'ay': ('bytes', 'aGVsbG8='),
+ 's': ('str', 'hello'),
+ 'i': ('int', '-123'),
+ 'u': ('uint', '123'),
+ }
+
+ result = IQ(stream, 'result')
+ result['id'] = iq['id']
+ result['from'] = iq['to']
+ result['to'] = 'test at localhost/Resource'
+ res_si = result.addElement((NS_SI, 'si'))
+ res_feature = res_si.addElement((NS_FEATURE_NEG, 'feature'))
+ res_x = res_feature.addElement((NS_X_DATA, 'x'))
+ res_x['type'] = 'submit'
+ res_field = res_x.addElement((None, 'field'))
+ res_field['var'] = 'stream-method'
+ res_value = res_field.addElement((None, 'value'))
+ res_value.addContent(NS_BYTESTREAMS)
+
+ stream.send(result)
+
+ event = q.expect('stream-iq', iq_type='set', to='bob at localhost/Bob')
+ iq = event.stanza
+ query = xpath.queryForNodes('/iq/query', iq)[0]
+ assert query.uri == NS_BYTESTREAMS
+ assert query['mode'] == 'tcp'
+ assert query['sid'] == dbus_stream_id
+ streamhost = xpath.queryForNodes('/query/streamhost', query)[0]
+ reactor.connectTCP(streamhost['host'], int(streamhost['port']),
+ S5BFactory(q.append))
+
+ event = q.expect('s5b-connected')
+ transport = event.properties['transport']
+ transport.write('\x05\x01\x00') #version 5, 1 auth method, no auth
+
+ event = q.expect('s5b-data-received')
+ event.properties['data'] == '\x05\x00' # version 5, no auth
+
+ # FIXME: send the correct data
+ transport.write('X' * 47)
+
+ event = q.expect('s5b-data-received')
+ event.properties['data'] == '\x05\x00' # version 5, ok
+ transport = event.properties['transport']
+
+ result = IQ(stream, 'result')
+ result['id'] = iq['id']
+ result['from'] = iq['to']
+ result['to'] = 'test at localhost/Resource'
+
+ stream.send(result)
+
+ q.expect('dbus-signal', signal='TubeStateChanged',
+ args=[dbus_tube_id, 2]) # 2 == OPEN
+
+ tubes = tubes_iface.ListTubes(byte_arrays=True)
+ assert sorted(tubes) == sorted([(
+ dbus_tube_id,
+ self_handle,
+ 0, # DBUS
+ 'com.example.TestCase',
+ sample_parameters,
+ 2, # OPEN
+ ),(
+ stream_tube_id,
+ self_handle,
+ 1, # stream
+ 'echo',
+ sample_parameters,
+ 2, # OPEN
+ )])
+
+ dbus_tube_adr = tubes_iface.GetDBusTubeAddress(dbus_tube_id)
+ dbus_tube_conn = Connection(dbus_tube_adr)
+
+ signal = SignalMessage('/', 'foo.bar', 'baz')
+ my_bus_name = ':123.whatever.you.like'
+ signal.set_sender(my_bus_name)
+ signal.append(42, signature='u')
+ dbus_tube_conn.send_message(signal)
+
+ event = q.expect('s5b-data-received')
+ dbus_message = event.properties['data']
+
+ # little and big endian versions of: SIGNAL, NO_REPLY, protocol v1,
+ # 4-byte payload
+ assert dbus_message.startswith('l\x04\x01\x01' '\x04\x00\x00\x00') or \
+ dbus_message.startswith('B\x04\x01\x01' '\x00\x00\x00\x04')
+ # little and big endian versions of the 4-byte payload, UInt32(42)
+ assert (dbus_message[0] == 'l' and dbus_message.endswith('\x2a\x00\x00\x00')) or \
+ (dbus_message[0] == 'B' and dbus_message.endswith('\x00\x00\x00\x2a'))
+ # XXX: verify that it's actually in the "sender" slot, rather than just
+ # being in the message somewhere
+ assert my_bus_name in dbus_message
+
+ watch_tube_signals(q, dbus_tube_conn)
+
+ # Have the fake client send us a message all in one go...
+ transport.write(dbus_message)
+
+ # ... and a message one byte at a time ...
+ for byte in dbus_message:
+ transport.write(byte)
+
+ # ... and two messages in one go
+ transport.write(dbus_message + dbus_message)
+
+ q.expect('tube-signal', signal='baz', args=[42], tube=dbus_tube_conn)
+ q.expect('tube-signal', signal='baz', args=[42], tube=dbus_tube_conn)
+ q.expect('tube-signal', signal='baz', args=[42], tube=dbus_tube_conn)
+ q.expect('tube-signal', signal='baz', args=[42], tube=dbus_tube_conn)
+
+ # OK, now let's try to accept a D-Bus tube
+ iq = IQ(stream, 'set')
+ iq['to'] = 'test at localhost/Resource'
+ iq['from'] = 'bob at localhost/Bob'
+ si = iq.addElement((NS_SI, 'si'))
+ si['id'] = 'beta'
+ si['profile'] = NS_TUBES
+ feature = si.addElement((NS_FEATURE_NEG, 'feature'))
+ x = feature.addElement((NS_X_DATA, 'x'))
+ x['type'] = 'form'
+ field = x.addElement((None, 'field'))
+ field['var'] = 'stream-method'
+ field['type'] = 'list-single'
+ option = field.addElement((None, 'option'))
+ value = option.addElement((None, 'value'))
+ value.addContent(NS_IBB)
+
+ tube = si.addElement((NS_TUBES, 'tube'))
+ tube['type'] = 'dbus'
+ tube['service'] = 'com.example.TestCase2'
+ tube['id'] = '69'
+ parameters = tube.addElement((None, 'parameters'))
+ parameter = parameters.addElement((None, 'parameter'))
+ parameter['type'] = 'str'
+ parameter['name'] = 'login'
+ parameter.addContent('TEST')
+
+ stream.send(iq)
+
+ event = q.expect('dbus-signal', signal='NewTube')
+ id = event.args[0]
+ initiator = event.args[1]
+ type = event.args[2]
+ service = event.args[3]
+ parameters = event.args[4]
+ state = event.args[5]
+
+ assert id == 69
+ initiator_jid = conn.InspectHandles(1, [initiator])[0]
+ assert initiator_jid == 'bob at localhost'
+ assert type == 0 # D-Bus tube
+ assert service == 'com.example.TestCase2'
+ assert parameters == {'login': 'TEST'}
+ assert state == 0 # local pending
+
+ # accept the tube
+ call_async(q, tubes_iface, 'AcceptDBusTube', id)
+
+ event = q.expect('stream-iq', iq_type='result')
+ iq = event.stanza
+ si = xpath.queryForNodes('/iq/si[@xmlns="%s"]' % NS_SI,
+ iq)[0]
+ value = xpath.queryForNodes('/si/feature/x/field/value', si)
+ assert len(value) == 1
+ proto = value[0]
+ assert str(proto) == NS_IBB
+ tube = xpath.queryForNodes('/si/tube[@xmlns="%s"]' % NS_TUBES, si)
+ assert len(tube) == 1
+
+ # Init the IBB bytestream
+ iq = IQ(stream, 'set')
+ iq['to'] = 'test at localhost/Resource'
+ iq['from'] = 'bob at localhost/Bob'
+ open = iq.addElement((NS_IBB, 'open'))
+ open['sid'] = 'beta'
+ open['block-size'] = '4096'
+ stream.send(iq)
+
+ event = q.expect('dbus-return', method='AcceptDBusTube')
+ address = event.value[0]
+ # FIXME: this is currently broken. See FIXME in tubes-channel.c
+ #assert len(address) > 0
+
+ event = q.expect('dbus-signal', signal='TubeStateChanged',
+ args=[69, 2]) # 2 == OPEN
+ id = event.args[0]
+ state = event.args[1]
+
+ # OK, we're done
+ conn.Disconnect()
+
+ q.expect('tube-signal', signal='Disconnected')
+ q.expect('dbus-signal', signal='StatusChanged', args=[2, 1])
+
+if __name__ == '__main__':
+ exec_test(test)
--
1.5.6.5
More information about the Telepathy-commits
mailing list