[Telepathy-commits] [telepathy-gabble/master] Add a test for SOCKS5 tubes

Marco Barisione marco at barisione.org
Tue Jan 6 08:41:21 PST 2009


---
 tests/twisted/tubes/test-si-socks5-tubes.py |  552 +++++++++++++++++++++++++++
 1 files changed, 552 insertions(+), 0 deletions(-)
 create mode 100644 tests/twisted/tubes/test-si-socks5-tubes.py

diff --git a/tests/twisted/tubes/test-si-socks5-tubes.py b/tests/twisted/tubes/test-si-socks5-tubes.py
new file mode 100644
index 0000000..8583bbf
--- /dev/null
+++ b/tests/twisted/tubes/test-si-socks5-tubes.py
@@ -0,0 +1,552 @@
+"""Test 1-1 tubes support."""
+
+import base64
+import errno
+import os
+
+import dbus
+from dbus.connection import Connection
+from dbus.lowlevel import SignalMessage
+
+from servicetest import call_async, EventPattern, tp_name_prefix, watch_tube_signals
+from gabbletest import exec_test, acknowledge_iq
+
+from twisted.words.xish import domish, xpath
+from twisted.internet.protocol import Factory, Protocol
+from twisted.internet import reactor
+from twisted.words.protocols.jabber.client import IQ
+
+from gabbleconfig import HAVE_DBUS_TUBES
+
+NS_TUBES = 'http://telepathy.freedesktop.org/xmpp/tubes'
+NS_SI = 'http://jabber.org/protocol/si'
+NS_FEATURE_NEG = 'http://jabber.org/protocol/feature-neg'
+NS_IBB = 'http://jabber.org/protocol/ibb'
+NS_X_DATA = 'jabber:x:data'
+NS_BYTESTREAMS = 'http://jabber.org/protocol/bytestreams'
+
+sample_parameters = dbus.Dictionary({
+    's': 'hello',
+    'ay': dbus.ByteArray('hello'),
+    'u': dbus.UInt32(123),
+    'i': dbus.Int32(-123),
+    }, signature='sv')
+
+
+class Echo(Protocol):
+    def dataReceived(self, data):
+        self.transport.write(data.lower())
+
+def set_up_echo():
+    factory = Factory()
+    factory.protocol = Echo
+    try:
+        os.remove(os.getcwd() + '/stream')
+    except OSError, e:
+        if e.errno != errno.ENOENT:
+            raise
+    reactor.listenUNIX(os.getcwd() + '/stream', factory)
+
+class S5BProtocol(Protocol):
+    def connectionMade(self):
+        self.factory.event_func(EventPattern('s5b-connected',
+            transport=self.transport))
+
+    def dataReceived(self, data):
+        self.factory.event_func(EventPattern('s5b-data-received', data=data,
+            transport=self.transport))
+
+class S5BFactory(Factory):
+    protocol = S5BProtocol
+
+    def __init__(self, event_func):
+        self.event_func = event_func
+
+    def buildProtocol(self, addr):
+        protocol = Factory.buildProtocol(self, addr)
+        return protocol
+
+    def startedConnecting(self, connector):
+        pass
+
+def test(q, bus, conn, stream):
+    set_up_echo()
+    conn.Connect()
+
+    properties = conn.GetAll(
+            'org.freedesktop.Telepathy.Connection.Interface.Requests',
+            dbus_interface='org.freedesktop.DBus.Properties')
+    assert properties.get('Channels') == [], properties['Channels']
+    assert ({'org.freedesktop.Telepathy.Channel.ChannelType':
+                'org.freedesktop.Telepathy.Channel.Type.Tubes',
+             'org.freedesktop.Telepathy.Channel.TargetHandleType': 1,
+             },
+             ['org.freedesktop.Telepathy.Channel.TargetHandle',
+              'org.freedesktop.Telepathy.Channel.TargetID',
+             ],
+             ) in properties.get('RequestableChannelClasses'),\
+                     properties['RequestableChannelClasses']
+
+    _, vcard_event, roster_event = q.expect_many(
+        EventPattern('dbus-signal', signal='StatusChanged', args=[0, 1]),
+        EventPattern('stream-iq', to=None, query_ns='vcard-temp',
+            query_name='vCard'),
+        EventPattern('stream-iq', query_ns='jabber:iq:roster'))
+
+    acknowledge_iq(stream, vcard_event.stanza)
+
+    roster = roster_event.stanza
+    roster['type'] = 'result'
+    item = roster_event.query.addElement('item')
+    item['jid'] = 'bob at localhost'
+    item['subscription'] = 'both'
+    stream.send(roster)
+
+    presence = domish.Element(('jabber:client', 'presence'))
+    presence['from'] = 'bob at localhost/Bob'
+    presence['to'] = 'test at localhost/Resource'
+    c = presence.addElement('c')
+    c['xmlns'] = 'http://jabber.org/protocol/caps'
+    c['node'] = 'http://example.com/ICantBelieveItsNotTelepathy'
+    c['ver'] = '1.2.3'
+    stream.send(presence)
+
+    event = q.expect('stream-iq', iq_type='get',
+        query_ns='http://jabber.org/protocol/disco#info',
+        to='bob at localhost/Bob')
+    result = event.stanza
+    result['type'] = 'result'
+    assert event.query['node'] == \
+        'http://example.com/ICantBelieveItsNotTelepathy#1.2.3'
+    feature = event.query.addElement('feature')
+    feature['var'] = NS_TUBES
+    stream.send(result)
+
+    bob_handle = conn.RequestHandles(1, ['bob at localhost'])[0]
+
+    call_async(q, conn, 'RequestChannel',
+            tp_name_prefix + '.Channel.Type.Tubes', 1, bob_handle, True);
+
+    ret, old_sig, new_sig = q.expect_many(
+        EventPattern('dbus-return', method='RequestChannel'),
+        EventPattern('dbus-signal', signal='NewChannel'),
+        EventPattern('dbus-signal', signal='NewChannels'),
+        )
+
+
+    assert len(ret.value) == 1
+    chan_path = ret.value[0]
+
+    assert old_sig.args[0] == chan_path
+    assert old_sig.args[1] == tp_name_prefix + '.Channel.Type.Tubes'
+    assert old_sig.args[2] == 1         # contact handle
+    assert old_sig.args[3] == bob_handle
+    assert old_sig.args[4] == True      # suppress handler
+
+    assert len(new_sig.args) == 1
+    assert len(new_sig.args[0]) == 1        # one channel
+    assert len(new_sig.args[0][0]) == 2     # two struct members
+    assert new_sig.args[0][0][0] == ret.value[0]
+    emitted_props = new_sig.args[0][0][1]
+
+    assert emitted_props[tp_name_prefix + '.Channel.ChannelType'] ==\
+            tp_name_prefix + '.Channel.Type.Tubes'
+    assert emitted_props[tp_name_prefix + '.Channel.TargetHandleType'] == 1
+    assert emitted_props[tp_name_prefix + '.Channel.TargetHandle'] ==\
+            bob_handle
+    assert emitted_props[tp_name_prefix + '.Channel.TargetID'] == \
+            'bob at localhost'
+    assert emitted_props[tp_name_prefix + '.Channel.Requested'] == True
+    assert emitted_props[tp_name_prefix + '.Channel.InitiatorHandle'] \
+            == conn.GetSelfHandle()
+    assert emitted_props[tp_name_prefix + '.Channel.InitiatorID'] == \
+            'test at localhost'
+
+    properties = conn.GetAll(
+            'org.freedesktop.Telepathy.Connection.Interface.Requests',
+            dbus_interface='org.freedesktop.DBus.Properties')
+
+    assert new_sig.args[0][0] in properties['Channels'], \
+            (new_sig.args[0][0], properties['Channels'])
+
+    tubes_chan = bus.get_object(conn.bus_name, chan_path)
+    tubes_iface = dbus.Interface(tubes_chan,
+        tp_name_prefix + '.Channel.Type.Tubes')
+
+    # Exercise basic Channel Properties from spec 0.17.7
+    channel_props = tubes_chan.GetAll(
+            'org.freedesktop.Telepathy.Channel',
+            dbus_interface='org.freedesktop.DBus.Properties')
+    assert channel_props.get('TargetHandle') == bob_handle,\
+            (channel_props.get('TargetHandle'), bob_handle)
+    assert channel_props.get('TargetHandleType') == 1,\
+            channel_props.get('TargetHandleType')
+    assert channel_props.get('ChannelType') == \
+            'org.freedesktop.Telepathy.Channel.Type.Tubes',\
+            channel_props.get('ChannelType')
+    assert 'Interfaces' in channel_props, channel_props
+    assert 'org.freedesktop.Telepathy.Channel.Interface.Group' not in \
+            channel_props['Interfaces'], \
+            channel_props['Interfaces']
+    assert channel_props['TargetID'] == 'bob at localhost'
+
+    self_handle = conn.GetSelfHandle()
+
+    assert channel_props['Requested'] == True
+    assert channel_props['InitiatorID'] == 'test at localhost'
+    assert channel_props['InitiatorHandle'] == self_handle
+
+    # Unix socket
+    path = os.getcwd() + '/stream'
+    call_async(q, tubes_iface, 'OfferStreamTube',
+        'echo', sample_parameters, 0, dbus.ByteArray(path), 0, "")
+
+    event = q.expect('stream-message')
+    message = event.stanza
+    tube_nodes = xpath.queryForNodes('/message/tube[@xmlns="%s"]' % NS_TUBES,
+        message)
+    if tube_nodes is None:
+        return False
+
+    assert len(tube_nodes) == 1
+    tube = tube_nodes[0]
+
+    assert tube['service'] == 'echo'
+    assert tube['type'] == 'stream'
+    assert not tube.hasAttribute('initiator')
+    stream_tube_id = long(tube['id'])
+
+    params = {}
+    parameter_nodes = xpath.queryForNodes('/tube/parameters/parameter', tube)
+    for node in parameter_nodes:
+        assert node['name'] not in params
+        params[node['name']] = (node['type'], str(node))
+    assert params == {'ay': ('bytes', 'aGVsbG8='),
+                      's': ('str', 'hello'),
+                      'i': ('int', '-123'),
+                      'u': ('uint', '123'),
+                     }
+
+    # The CM is the server, so fake a client wanting to talk to it
+    iq = IQ(stream, 'set')
+    iq['to'] = 'test at localhost/Resource'
+    iq['from'] = 'bob at localhost/Bob'
+    si = iq.addElement((NS_SI, 'si'))
+    si['id'] = 'alpha'
+    si['profile'] = NS_TUBES
+    feature = si.addElement((NS_FEATURE_NEG, 'feature'))
+    x = feature.addElement((NS_X_DATA, 'x'))
+    x['type'] = 'form'
+    field = x.addElement((None, 'field'))
+    field['var'] = 'stream-method'
+    field['type'] = 'list-single'
+    option = field.addElement((None, 'option'))
+    value = option.addElement((None, 'value'))
+    value.addContent(NS_BYTESTREAMS)
+
+    stream_node = si.addElement((NS_TUBES, 'stream'))
+    stream_node['tube'] = str(stream_tube_id)
+    stream.send(iq)
+
+    si_reply_event, _ = q.expect_many(
+            EventPattern('stream-iq', iq_type='result'),
+            EventPattern('dbus-signal', signal='TubeStateChanged',
+                args=[stream_tube_id, 2])) # 2 == OPEN
+    iq = si_reply_event.stanza
+    si = xpath.queryForNodes('/iq/si[@xmlns="%s"]' % NS_SI,
+        iq)[0]
+    value = xpath.queryForNodes('/si/feature/x/field/value', si)
+    assert len(value) == 1
+    proto = value[0]
+    assert str(proto) == NS_BYTESTREAMS
+    tube = xpath.queryForNodes('/si/tube[@xmlns="%s"]' % NS_TUBES, si)
+    assert len(tube) == 1
+
+    q.expect('dbus-signal', signal='StreamTubeNewConnection',
+        args=[stream_tube_id, bob_handle])
+
+    tubes = tubes_iface.ListTubes(byte_arrays=True)
+    assert tubes == [(
+        stream_tube_id,
+        self_handle,
+        1,      # Unix stream
+        'echo',
+        sample_parameters,
+        2,      # OPEN
+        )]
+
+    reactor.listenTCP(5086, S5BFactory(q.append))
+
+    # have the fake client open the stream
+    iq = IQ(stream, 'set')
+    iq['to'] = 'test at localhost/Resource'
+    iq['from'] = 'bob at localhost/Bob'
+    query = iq.addElement((NS_BYTESTREAMS, 'query'))
+    query['sid'] = 'alpha'
+    query['mode'] = 'tcp'
+    streamhost = query.addElement('streamhost')
+    # FIXME test with a non-working streamhost
+    streamhost['jid'] = 'bob at localhost/Bob'
+    streamhost['host'] = '127.0.0.1'
+    streamhost['port'] = '5086'
+    stream.send(iq)
+
+    event = q.expect('s5b-data-received')
+    print event.properties['data']
+    assert event.properties['data'] == '\x05\x01\x00' # version 5, 1 auth method, no auth
+    event.properties['transport'].write('\x05\x00') # version 5, no auth
+    event = q.expect('s5b-data-received')
+    # ver + cmd + rsv + atyp + len + SHA1 (40) + dst.port (2) = 47
+    assert len(event.properties['data']) == 47
+    # version 5, connect, reserved, domain type, len(SHA-1)
+    assert event.properties['data'].startswith('\x05\x01\x00\x03' + chr(40))
+    # port
+    assert event.properties['data'].endswith('\x00\x00')
+
+    event.properties['transport'].write('\x05\x00') #version 5, ok
+    q.expect('stream-iq', iq_type='result')
+
+    event.properties['transport'].write("HELLO WORLD")
+    event = q.expect('s5b-data-received')
+    assert event.properties['data'] == 'hello world'
+
+    if not HAVE_DBUS_TUBES:
+        return
+
+    # OK, how about D-Bus?
+    call_async(q, tubes_iface, 'OfferDBusTube',
+        'com.example.TestCase', sample_parameters)
+
+    event = q.expect('stream-iq', iq_type='set', to='bob at localhost/Bob')
+    iq = event.stanza
+    si_nodes = xpath.queryForNodes('/iq/si', iq)
+    if si_nodes is None:
+        return False
+
+    assert len(si_nodes) == 1
+    si = si_nodes[0]
+    assert si['profile'] == NS_TUBES
+    dbus_stream_id = si['id']
+
+    feature = xpath.queryForNodes('/si/feature', si)[0]
+    x = xpath.queryForNodes('/feature/x', feature)[0]
+    assert x['type'] == 'form'
+    field = xpath.queryForNodes('/x/field', x)[0]
+    assert field['var'] == 'stream-method'
+    assert field['type'] == 'list-single'
+    value = xpath.queryForNodes('/field/option/value', field)[0]
+    assert str(value) == NS_BYTESTREAMS
+    value = xpath.queryForNodes('/field/option/value', field)[1]
+    assert str(value) == NS_IBB
+
+    tube = xpath.queryForNodes('/si/tube', si)[0]
+    assert tube['initiator'] == 'test at localhost'
+    assert tube['service'] == 'com.example.TestCase'
+    assert tube['stream-id'] == dbus_stream_id
+    assert not tube.hasAttribute('dbus-name')
+    assert tube['type'] == 'dbus'
+    dbus_tube_id = long(tube['id'])
+
+    params = {}
+    parameter_nodes = xpath.queryForNodes('/tube/parameters/parameter', tube)
+    for node in parameter_nodes:
+        assert node['name'] not in params
+        params[node['name']] = (node['type'], str(node))
+    assert params == {'ay': ('bytes', 'aGVsbG8='),
+                      's': ('str', 'hello'),
+                      'i': ('int', '-123'),
+                      'u': ('uint', '123'),
+                     }
+
+    result = IQ(stream, 'result')
+    result['id'] = iq['id']
+    result['from'] = iq['to']
+    result['to'] = 'test at localhost/Resource'
+    res_si = result.addElement((NS_SI, 'si'))
+    res_feature = res_si.addElement((NS_FEATURE_NEG, 'feature'))
+    res_x = res_feature.addElement((NS_X_DATA, 'x'))
+    res_x['type'] = 'submit'
+    res_field = res_x.addElement((None, 'field'))
+    res_field['var'] = 'stream-method'
+    res_value = res_field.addElement((None, 'value'))
+    res_value.addContent(NS_BYTESTREAMS)
+
+    stream.send(result)
+
+    event = q.expect('stream-iq', iq_type='set', to='bob at localhost/Bob')
+    iq = event.stanza
+    query = xpath.queryForNodes('/iq/query', iq)[0]
+    assert query.uri == NS_BYTESTREAMS
+    assert query['mode'] == 'tcp'
+    assert query['sid'] == dbus_stream_id
+    streamhost = xpath.queryForNodes('/query/streamhost', query)[0]
+    reactor.connectTCP(streamhost['host'], int(streamhost['port']),
+        S5BFactory(q.append))
+
+    event = q.expect('s5b-connected')
+    transport = event.properties['transport']
+    transport.write('\x05\x01\x00') #version 5, 1 auth method, no auth
+
+    event = q.expect('s5b-data-received')
+    event.properties['data'] == '\x05\x00' # version 5, no auth
+
+    # FIXME: send the correct data
+    transport.write('X' * 47)
+
+    event = q.expect('s5b-data-received')
+    event.properties['data'] == '\x05\x00' # version 5, ok
+    transport = event.properties['transport']
+
+    result = IQ(stream, 'result')
+    result['id'] = iq['id']
+    result['from'] = iq['to']
+    result['to'] = 'test at localhost/Resource'
+
+    stream.send(result)
+
+    q.expect('dbus-signal', signal='TubeStateChanged',
+        args=[dbus_tube_id, 2]) # 2 == OPEN
+
+    tubes = tubes_iface.ListTubes(byte_arrays=True)
+    assert sorted(tubes) == sorted([(
+        dbus_tube_id,
+        self_handle,
+        0,      # DBUS
+        'com.example.TestCase',
+        sample_parameters,
+        2,      # OPEN
+        ),(
+        stream_tube_id,
+        self_handle,
+        1,      # stream
+        'echo',
+        sample_parameters,
+        2,      # OPEN
+        )])
+
+    dbus_tube_adr = tubes_iface.GetDBusTubeAddress(dbus_tube_id)
+    dbus_tube_conn = Connection(dbus_tube_adr)
+
+    signal = SignalMessage('/', 'foo.bar', 'baz')
+    my_bus_name = ':123.whatever.you.like'
+    signal.set_sender(my_bus_name)
+    signal.append(42, signature='u')
+    dbus_tube_conn.send_message(signal)
+
+    event = q.expect('s5b-data-received')
+    dbus_message = event.properties['data']
+
+    # little and big endian versions of: SIGNAL, NO_REPLY, protocol v1,
+    # 4-byte payload
+    assert dbus_message.startswith('l\x04\x01\x01' '\x04\x00\x00\x00') or \
+           dbus_message.startswith('B\x04\x01\x01' '\x00\x00\x00\x04')
+    # little and big endian versions of the 4-byte payload, UInt32(42)
+    assert (dbus_message[0] == 'l' and dbus_message.endswith('\x2a\x00\x00\x00')) or \
+           (dbus_message[0] == 'B' and dbus_message.endswith('\x00\x00\x00\x2a'))
+    # XXX: verify that it's actually in the "sender" slot, rather than just
+    # being in the message somewhere
+    assert my_bus_name in dbus_message
+
+    watch_tube_signals(q, dbus_tube_conn)
+
+    # Have the fake client send us a message all in one go...
+    transport.write(dbus_message)
+
+    # ... and a message one byte at a time ...
+    for byte in dbus_message:
+        transport.write(byte)
+
+    # ... and two messages in one go
+    transport.write(dbus_message + dbus_message)
+
+    q.expect('tube-signal', signal='baz', args=[42], tube=dbus_tube_conn)
+    q.expect('tube-signal', signal='baz', args=[42], tube=dbus_tube_conn)
+    q.expect('tube-signal', signal='baz', args=[42], tube=dbus_tube_conn)
+    q.expect('tube-signal', signal='baz', args=[42], tube=dbus_tube_conn)
+
+    # OK, now let's try to accept a D-Bus tube
+    iq = IQ(stream, 'set')
+    iq['to'] = 'test at localhost/Resource'
+    iq['from'] = 'bob at localhost/Bob'
+    si = iq.addElement((NS_SI, 'si'))
+    si['id'] = 'beta'
+    si['profile'] = NS_TUBES
+    feature = si.addElement((NS_FEATURE_NEG, 'feature'))
+    x = feature.addElement((NS_X_DATA, 'x'))
+    x['type'] = 'form'
+    field = x.addElement((None, 'field'))
+    field['var'] = 'stream-method'
+    field['type'] = 'list-single'
+    option = field.addElement((None, 'option'))
+    value = option.addElement((None, 'value'))
+    value.addContent(NS_IBB)
+
+    tube = si.addElement((NS_TUBES, 'tube'))
+    tube['type'] = 'dbus'
+    tube['service'] = 'com.example.TestCase2'
+    tube['id'] = '69'
+    parameters = tube.addElement((None, 'parameters'))
+    parameter = parameters.addElement((None, 'parameter'))
+    parameter['type'] = 'str'
+    parameter['name'] = 'login'
+    parameter.addContent('TEST')
+
+    stream.send(iq)
+
+    event = q.expect('dbus-signal', signal='NewTube')
+    id = event.args[0]
+    initiator = event.args[1]
+    type = event.args[2]
+    service = event.args[3]
+    parameters = event.args[4]
+    state = event.args[5]
+
+    assert id == 69
+    initiator_jid = conn.InspectHandles(1, [initiator])[0]
+    assert initiator_jid == 'bob at localhost'
+    assert type == 0 # D-Bus tube
+    assert service == 'com.example.TestCase2'
+    assert parameters == {'login': 'TEST'}
+    assert state == 0 # local pending
+
+    # accept the tube
+    call_async(q, tubes_iface, 'AcceptDBusTube', id)
+
+    event = q.expect('stream-iq', iq_type='result')
+    iq = event.stanza
+    si = xpath.queryForNodes('/iq/si[@xmlns="%s"]' % NS_SI,
+        iq)[0]
+    value = xpath.queryForNodes('/si/feature/x/field/value', si)
+    assert len(value) == 1
+    proto = value[0]
+    assert str(proto) == NS_IBB
+    tube = xpath.queryForNodes('/si/tube[@xmlns="%s"]' % NS_TUBES, si)
+    assert len(tube) == 1
+
+    # Init the IBB bytestream
+    iq = IQ(stream, 'set')
+    iq['to'] = 'test at localhost/Resource'
+    iq['from'] = 'bob at localhost/Bob'
+    open = iq.addElement((NS_IBB, 'open'))
+    open['sid'] = 'beta'
+    open['block-size'] = '4096'
+    stream.send(iq)
+
+    event = q.expect('dbus-return', method='AcceptDBusTube')
+    address = event.value[0]
+    # FIXME: this is currently broken. See FIXME in tubes-channel.c
+    #assert len(address) > 0
+
+    event = q.expect('dbus-signal', signal='TubeStateChanged',
+        args=[69, 2]) # 2 == OPEN
+    id = event.args[0]
+    state = event.args[1]
+
+    # OK, we're done
+    conn.Disconnect()
+
+    q.expect('tube-signal', signal='Disconnected')
+    q.expect('dbus-signal', signal='StatusChanged', args=[2, 1])
+
+if __name__ == '__main__':
+    exec_test(test)
-- 
1.5.6.5




More information about the Telepathy-commits mailing list