[Telepathy-commits] [telepathy-gabble/master] Rename the SI test as it also tests IBB
Marco Barisione
marco at barisione.org
Tue Jan 6 08:41:21 PST 2009
---
tests/twisted/tubes/test-si-ibb-tubes.py | 545 ++++++++++++++++++++++++++++++
tests/twisted/tubes/test-si-tubes.py | 545 ------------------------------
2 files changed, 545 insertions(+), 545 deletions(-)
create mode 100644 tests/twisted/tubes/test-si-ibb-tubes.py
delete mode 100644 tests/twisted/tubes/test-si-tubes.py
diff --git a/tests/twisted/tubes/test-si-ibb-tubes.py b/tests/twisted/tubes/test-si-ibb-tubes.py
new file mode 100644
index 0000000..3ef8e6b
--- /dev/null
+++ b/tests/twisted/tubes/test-si-ibb-tubes.py
@@ -0,0 +1,545 @@
+"""Test 1-1 tubes support."""
+
+import base64
+import errno
+import os
+
+import dbus
+from dbus.connection import Connection
+from dbus.lowlevel import SignalMessage
+
+from servicetest import call_async, EventPattern, tp_name_prefix, watch_tube_signals
+from gabbletest import exec_test, acknowledge_iq
+
+from twisted.words.xish import domish, xpath
+from twisted.internet.protocol import Factory, Protocol
+from twisted.internet import reactor
+from twisted.words.protocols.jabber.client import IQ
+
+from gabbleconfig import HAVE_DBUS_TUBES
+
+NS_TUBES = 'http://telepathy.freedesktop.org/xmpp/tubes'
+NS_SI = 'http://jabber.org/protocol/si'
+NS_FEATURE_NEG = 'http://jabber.org/protocol/feature-neg'
+NS_IBB = 'http://jabber.org/protocol/ibb'
+NS_X_DATA = 'jabber:x:data'
+
+sample_parameters = dbus.Dictionary({
+ 's': 'hello',
+ 'ay': dbus.ByteArray('hello'),
+ 'u': dbus.UInt32(123),
+ 'i': dbus.Int32(-123),
+ }, signature='sv')
+
+
+class Echo(Protocol):
+ def dataReceived(self, data):
+ self.transport.write(data)
+
+def set_up_echo():
+ factory = Factory()
+ factory.protocol = Echo
+ try:
+ os.remove(os.getcwd() + '/stream')
+ except OSError, e:
+ if e.errno != errno.ENOENT:
+ raise
+ reactor.listenUNIX(os.getcwd() + '/stream', factory)
+
+
+def test(q, bus, conn, stream):
+ set_up_echo()
+ conn.Connect()
+
+ properties = conn.GetAll(
+ 'org.freedesktop.Telepathy.Connection.Interface.Requests',
+ dbus_interface='org.freedesktop.DBus.Properties')
+ assert properties.get('Channels') == [], properties['Channels']
+ assert ({'org.freedesktop.Telepathy.Channel.ChannelType':
+ 'org.freedesktop.Telepathy.Channel.Type.Tubes',
+ 'org.freedesktop.Telepathy.Channel.TargetHandleType': 1,
+ },
+ ['org.freedesktop.Telepathy.Channel.TargetHandle',
+ 'org.freedesktop.Telepathy.Channel.TargetID',
+ ],
+ ) in properties.get('RequestableChannelClasses'),\
+ properties['RequestableChannelClasses']
+
+ _, vcard_event, roster_event = q.expect_many(
+ EventPattern('dbus-signal', signal='StatusChanged', args=[0, 1]),
+ EventPattern('stream-iq', to=None, query_ns='vcard-temp',
+ query_name='vCard'),
+ EventPattern('stream-iq', query_ns='jabber:iq:roster'))
+
+ acknowledge_iq(stream, vcard_event.stanza)
+
+ roster = roster_event.stanza
+ roster['type'] = 'result'
+ item = roster_event.query.addElement('item')
+ item['jid'] = 'bob at localhost'
+ item['subscription'] = 'both'
+ stream.send(roster)
+
+ presence = domish.Element(('jabber:client', 'presence'))
+ presence['from'] = 'bob at localhost/Bob'
+ presence['to'] = 'test at localhost/Resource'
+ c = presence.addElement('c')
+ c['xmlns'] = 'http://jabber.org/protocol/caps'
+ c['node'] = 'http://example.com/ICantBelieveItsNotTelepathy'
+ c['ver'] = '1.2.3'
+ stream.send(presence)
+
+ event = q.expect('stream-iq', iq_type='get',
+ query_ns='http://jabber.org/protocol/disco#info',
+ to='bob at localhost/Bob')
+ result = event.stanza
+ result['type'] = 'result'
+ assert event.query['node'] == \
+ 'http://example.com/ICantBelieveItsNotTelepathy#1.2.3'
+ feature = event.query.addElement('feature')
+ feature['var'] = NS_TUBES
+ stream.send(result)
+
+ bob_handle = conn.RequestHandles(1, ['bob at localhost'])[0]
+
+ call_async(q, conn, 'RequestChannel',
+ tp_name_prefix + '.Channel.Type.Tubes', 1, bob_handle, True);
+
+ ret, old_sig, new_sig = q.expect_many(
+ EventPattern('dbus-return', method='RequestChannel'),
+ EventPattern('dbus-signal', signal='NewChannel'),
+ EventPattern('dbus-signal', signal='NewChannels'),
+ )
+
+
+ assert len(ret.value) == 1
+ chan_path = ret.value[0]
+
+ assert old_sig.args[0] == chan_path
+ assert old_sig.args[1] == tp_name_prefix + '.Channel.Type.Tubes'
+ assert old_sig.args[2] == 1 # contact handle
+ assert old_sig.args[3] == bob_handle
+ assert old_sig.args[4] == True # suppress handler
+
+ assert len(new_sig.args) == 1
+ assert len(new_sig.args[0]) == 1 # one channel
+ assert len(new_sig.args[0][0]) == 2 # two struct members
+ assert new_sig.args[0][0][0] == ret.value[0]
+ emitted_props = new_sig.args[0][0][1]
+
+ assert emitted_props[tp_name_prefix + '.Channel.ChannelType'] ==\
+ tp_name_prefix + '.Channel.Type.Tubes'
+ assert emitted_props[tp_name_prefix + '.Channel.TargetHandleType'] == 1
+ assert emitted_props[tp_name_prefix + '.Channel.TargetHandle'] ==\
+ bob_handle
+ assert emitted_props[tp_name_prefix + '.Channel.TargetID'] == \
+ 'bob at localhost'
+ assert emitted_props[tp_name_prefix + '.Channel.Requested'] == True
+ assert emitted_props[tp_name_prefix + '.Channel.InitiatorHandle'] \
+ == conn.GetSelfHandle()
+ assert emitted_props[tp_name_prefix + '.Channel.InitiatorID'] == \
+ 'test at localhost'
+
+ properties = conn.GetAll(
+ 'org.freedesktop.Telepathy.Connection.Interface.Requests',
+ dbus_interface='org.freedesktop.DBus.Properties')
+
+ assert new_sig.args[0][0] in properties['Channels'], \
+ (new_sig.args[0][0], properties['Channels'])
+
+ tubes_chan = bus.get_object(conn.bus_name, chan_path)
+ tubes_iface = dbus.Interface(tubes_chan,
+ tp_name_prefix + '.Channel.Type.Tubes')
+
+ # Exercise basic Channel Properties from spec 0.17.7
+ channel_props = tubes_chan.GetAll(
+ 'org.freedesktop.Telepathy.Channel',
+ dbus_interface='org.freedesktop.DBus.Properties')
+ assert channel_props.get('TargetHandle') == bob_handle,\
+ (channel_props.get('TargetHandle'), bob_handle)
+ assert channel_props.get('TargetHandleType') == 1,\
+ channel_props.get('TargetHandleType')
+ assert channel_props.get('ChannelType') == \
+ 'org.freedesktop.Telepathy.Channel.Type.Tubes',\
+ channel_props.get('ChannelType')
+ assert 'Interfaces' in channel_props, channel_props
+ assert 'org.freedesktop.Telepathy.Channel.Interface.Group' not in \
+ channel_props['Interfaces'], \
+ channel_props['Interfaces']
+ assert channel_props['TargetID'] == 'bob at localhost'
+
+ self_handle = conn.GetSelfHandle()
+
+ assert channel_props['Requested'] == True
+ assert channel_props['InitiatorID'] == 'test at localhost'
+ assert channel_props['InitiatorHandle'] == self_handle
+
+ # Unix socket
+ path = os.getcwd() + '/stream'
+ call_async(q, tubes_iface, 'OfferStreamTube',
+ 'echo', sample_parameters, 0, dbus.ByteArray(path), 0, "")
+
+ event = q.expect('stream-message')
+ message = event.stanza
+ tube_nodes = xpath.queryForNodes('/message/tube[@xmlns="%s"]' % NS_TUBES,
+ message)
+ if tube_nodes is None:
+ return False
+
+ assert len(tube_nodes) == 1
+ tube = tube_nodes[0]
+
+ assert tube['service'] == 'echo'
+ assert tube['type'] == 'stream'
+ assert not tube.hasAttribute('initiator')
+ stream_tube_id = long(tube['id'])
+
+ params = {}
+ parameter_nodes = xpath.queryForNodes('/tube/parameters/parameter', tube)
+ for node in parameter_nodes:
+ assert node['name'] not in params
+ params[node['name']] = (node['type'], str(node))
+ assert params == {'ay': ('bytes', 'aGVsbG8='),
+ 's': ('str', 'hello'),
+ 'i': ('int', '-123'),
+ 'u': ('uint', '123'),
+ }
+
+ # The CM is the server, so fake a client wanting to talk to it
+ iq = IQ(stream, 'set')
+ iq['to'] = 'test at localhost/Resource'
+ iq['from'] = 'bob at localhost/Bob'
+ si = iq.addElement((NS_SI, 'si'))
+ si['id'] = 'alpha'
+ si['profile'] = NS_TUBES
+ feature = si.addElement((NS_FEATURE_NEG, 'feature'))
+ x = feature.addElement((NS_X_DATA, 'x'))
+ x['type'] = 'form'
+ field = x.addElement((None, 'field'))
+ field['var'] = 'stream-method'
+ field['type'] = 'list-single'
+ option = field.addElement((None, 'option'))
+ value = option.addElement((None, 'value'))
+ value.addContent(NS_IBB)
+
+ stream_node = si.addElement((NS_TUBES, 'stream'))
+ stream_node['tube'] = str(stream_tube_id)
+ stream.send(iq)
+
+ si_reply_event, _ = q.expect_many(
+ EventPattern('stream-iq', iq_type='result'),
+ EventPattern('dbus-signal', signal='TubeStateChanged',
+ args=[stream_tube_id, 2])) # 2 == OPEN
+ iq = si_reply_event.stanza
+ si = xpath.queryForNodes('/iq/si[@xmlns="%s"]' % NS_SI,
+ iq)[0]
+ value = xpath.queryForNodes('/si/feature/x/field/value', si)
+ assert len(value) == 1
+ proto = value[0]
+ assert str(proto) == NS_IBB
+ tube = xpath.queryForNodes('/si/tube[@xmlns="%s"]' % NS_TUBES, si)
+ assert len(tube) == 1
+
+ q.expect('dbus-signal', signal='StreamTubeNewConnection',
+ args=[stream_tube_id, bob_handle])
+
+ tubes = tubes_iface.ListTubes(byte_arrays=True)
+ assert tubes == [(
+ stream_tube_id,
+ self_handle,
+ 1, # Unix stream
+ 'echo',
+ sample_parameters,
+ 2, # OPEN
+ )]
+
+ # have the fake client open the stream
+ iq = IQ(stream, 'set')
+ iq['to'] = 'test at localhost/Resource'
+ iq['from'] = 'bob at localhost/Bob'
+ open = iq.addElement((NS_IBB, 'open'))
+ open['sid'] = 'alpha'
+ open['block-size'] = '4096'
+ stream.send(iq)
+
+ q.expect('stream-iq', iq_type='result')
+ # have the fake client send us some data
+ message = domish.Element(('jabber:client', 'message'))
+ message['to'] = 'test at localhost/Resource'
+ message['from'] = 'bob at localhost/Bob'
+ data_node = message.addElement((NS_IBB, 'data'))
+ data_node['sid'] = 'alpha'
+ data_node['seq'] = '0'
+ data_node.addContent(base64.b64encode('hello, world'))
+ stream.send(message)
+
+ event = q.expect('stream-message', to='bob at localhost/Bob')
+ message = event.stanza
+
+ data_nodes = xpath.queryForNodes('/message/data[@xmlns="%s"]' % NS_IBB,
+ message)
+ assert data_nodes is not None
+ assert len(data_nodes) == 1
+ ibb_data = data_nodes[0]
+ assert ibb_data['sid'] == 'alpha'
+ binary = base64.b64decode(str(ibb_data))
+ assert binary == 'hello, world'
+
+ if not HAVE_DBUS_TUBES:
+ return
+
+ # OK, how about D-Bus?
+ call_async(q, tubes_iface, 'OfferDBusTube',
+ 'com.example.TestCase', sample_parameters)
+
+ event = q.expect('stream-iq', iq_type='set', to='bob at localhost/Bob')
+ iq = event.stanza
+ si_nodes = xpath.queryForNodes('/iq/si', iq)
+ if si_nodes is None:
+ return False
+
+ assert len(si_nodes) == 1
+ si = si_nodes[0]
+ assert si['profile'] == NS_TUBES
+ dbus_stream_id = si['id']
+
+ feature = xpath.queryForNodes('/si/feature', si)[0]
+ x = xpath.queryForNodes('/feature/x', feature)[0]
+ assert x['type'] == 'form'
+ field = xpath.queryForNodes('/x/field', x)[0]
+ assert field['var'] == 'stream-method'
+ assert field['type'] == 'list-single'
+ value = xpath.queryForNodes('/field/option/value', field)[0]
+ assert str(value) == NS_IBB
+
+ tube = xpath.queryForNodes('/si/tube', si)[0]
+ assert tube['initiator'] == 'test at localhost'
+ assert tube['service'] == 'com.example.TestCase'
+ assert tube['stream-id'] == dbus_stream_id
+ assert not tube.hasAttribute('dbus-name')
+ assert tube['type'] == 'dbus'
+ dbus_tube_id = long(tube['id'])
+
+ params = {}
+ parameter_nodes = xpath.queryForNodes('/tube/parameters/parameter', tube)
+ for node in parameter_nodes:
+ assert node['name'] not in params
+ params[node['name']] = (node['type'], str(node))
+ assert params == {'ay': ('bytes', 'aGVsbG8='),
+ 's': ('str', 'hello'),
+ 'i': ('int', '-123'),
+ 'u': ('uint', '123'),
+ }
+
+ result = IQ(stream, 'result')
+ result['id'] = iq['id']
+ result['from'] = iq['to']
+ result['to'] = 'test at localhost/Resource'
+ res_si = result.addElement((NS_SI, 'si'))
+ res_feature = res_si.addElement((NS_FEATURE_NEG, 'feature'))
+ res_x = res_feature.addElement((NS_X_DATA, 'x'))
+ res_x['type'] = 'submit'
+ res_field = res_x.addElement((None, 'field'))
+ res_field['var'] = 'stream-method'
+ res_value = res_field.addElement((None, 'value'))
+ res_value.addContent(NS_IBB)
+
+ stream.send(result)
+
+ event = q.expect('stream-iq', iq_type='set', to='bob at localhost/Bob')
+ iq = event.stanza
+ open = xpath.queryForNodes('/iq/open', iq)[0]
+ assert open.uri == NS_IBB
+ assert open['sid'] == dbus_stream_id
+
+ result = IQ(stream, 'result')
+ result['id'] = iq['id']
+ result['from'] = iq['to']
+ result['to'] = 'test at localhost/Resource'
+
+ stream.send(result)
+
+ q.expect('dbus-signal', signal='TubeStateChanged',
+ args=[dbus_tube_id, 2]) # 2 == OPEN
+
+ tubes = tubes_iface.ListTubes(byte_arrays=True)
+ assert sorted(tubes) == sorted([(
+ dbus_tube_id,
+ self_handle,
+ 0, # DBUS
+ 'com.example.TestCase',
+ sample_parameters,
+ 2, # OPEN
+ ),(
+ stream_tube_id,
+ self_handle,
+ 1, # stream
+ 'echo',
+ sample_parameters,
+ 2, # OPEN
+ )])
+
+ dbus_tube_adr = tubes_iface.GetDBusTubeAddress(dbus_tube_id)
+ dbus_tube_conn = Connection(dbus_tube_adr)
+
+ signal = SignalMessage('/', 'foo.bar', 'baz')
+ my_bus_name = ':123.whatever.you.like'
+ signal.set_sender(my_bus_name)
+ signal.append(42, signature='u')
+ dbus_tube_conn.send_message(signal)
+
+ event = q.expect('stream-message')
+ message = event.stanza
+
+ assert message['to'] == 'bob at localhost/Bob'
+
+ data_nodes = xpath.queryForNodes('/message/data[@xmlns="%s"]' % NS_IBB,
+ message)
+ assert data_nodes is not None
+ assert len(data_nodes) == 1
+ ibb_data = data_nodes[0]
+ assert ibb_data['sid'] == dbus_stream_id
+ binary = base64.b64decode(str(ibb_data))
+ # little and big endian versions of: SIGNAL, NO_REPLY, protocol v1,
+ # 4-byte payload
+ assert binary.startswith('l\x04\x01\x01' '\x04\x00\x00\x00') or \
+ binary.startswith('B\x04\x01\x01' '\x00\x00\x00\x04')
+ # little and big endian versions of the 4-byte payload, UInt32(42)
+ assert (binary[0] == 'l' and binary.endswith('\x2a\x00\x00\x00')) or \
+ (binary[0] == 'B' and binary.endswith('\x00\x00\x00\x2a'))
+ # XXX: verify that it's actually in the "sender" slot, rather than just
+ # being in the message somewhere
+ assert my_bus_name in binary
+
+ watch_tube_signals(q, dbus_tube_conn)
+
+ dbus_message = binary
+ seq = 0
+
+ # Have the fake client send us a message all in one go...
+ msg = domish.Element(('jabber:client', 'message'))
+ msg['to'] = 'test at localhost/Resource'
+ msg['from'] = 'bob at localhost/Bob'
+ data_node = msg.addElement('data', NS_IBB)
+ data_node['sid'] = dbus_stream_id
+ data_node['seq'] = str(seq)
+ data_node.addContent(base64.b64encode(dbus_message))
+ stream.send(msg)
+ seq += 1
+
+ # ... and a message one byte at a time ...
+
+ for byte in dbus_message:
+ msg = domish.Element(('jabber:client', 'message'))
+ msg['to'] = 'test at localhost/Resource'
+ msg['from'] = 'bob at localhost/Bob'
+ data_node = msg.addElement('data', NS_IBB)
+ data_node['sid'] = dbus_stream_id
+ data_node['seq'] = str(seq)
+ data_node.addContent(base64.b64encode(byte))
+ stream.send(msg)
+ seq += 1
+
+ # ... and two messages in one go
+
+ msg = domish.Element(('jabber:client', 'message'))
+ msg['to'] = 'test at localhost/Resource'
+ msg['from'] = 'bob at localhost/Bob'
+ data_node = msg.addElement('data', NS_IBB)
+ data_node['sid'] = dbus_stream_id
+ data_node['seq'] = str(seq)
+ data_node.addContent(base64.b64encode(dbus_message + dbus_message))
+ stream.send(msg)
+ seq += 1
+
+ q.expect('tube-signal', signal='baz', args=[42], tube=dbus_tube_conn)
+ q.expect('tube-signal', signal='baz', args=[42], tube=dbus_tube_conn)
+ q.expect('tube-signal', signal='baz', args=[42], tube=dbus_tube_conn)
+ q.expect('tube-signal', signal='baz', args=[42], tube=dbus_tube_conn)
+
+ # OK, now let's try to accept a D-Bus tube
+ iq = IQ(stream, 'set')
+ iq['to'] = 'test at localhost/Resource'
+ iq['from'] = 'bob at localhost/Bob'
+ si = iq.addElement((NS_SI, 'si'))
+ si['id'] = 'beta'
+ si['profile'] = NS_TUBES
+ feature = si.addElement((NS_FEATURE_NEG, 'feature'))
+ x = feature.addElement((NS_X_DATA, 'x'))
+ x['type'] = 'form'
+ field = x.addElement((None, 'field'))
+ field['var'] = 'stream-method'
+ field['type'] = 'list-single'
+ option = field.addElement((None, 'option'))
+ value = option.addElement((None, 'value'))
+ value.addContent(NS_IBB)
+
+ tube = si.addElement((NS_TUBES, 'tube'))
+ tube['type'] = 'dbus'
+ tube['service'] = 'com.example.TestCase2'
+ tube['id'] = '69'
+ parameters = tube.addElement((None, 'parameters'))
+ parameter = parameters.addElement((None, 'parameter'))
+ parameter['type'] = 'str'
+ parameter['name'] = 'login'
+ parameter.addContent('TEST')
+
+ stream.send(iq)
+
+ event = q.expect('dbus-signal', signal='NewTube')
+ id = event.args[0]
+ initiator = event.args[1]
+ type = event.args[2]
+ service = event.args[3]
+ parameters = event.args[4]
+ state = event.args[5]
+
+ assert id == 69
+ initiator_jid = conn.InspectHandles(1, [initiator])[0]
+ assert initiator_jid == 'bob at localhost'
+ assert type == 0 # D-Bus tube
+ assert service == 'com.example.TestCase2'
+ assert parameters == {'login': 'TEST'}
+ assert state == 0 # local pending
+
+ # accept the tube
+ call_async(q, tubes_iface, 'AcceptDBusTube', id)
+
+ event = q.expect('stream-iq', iq_type='result')
+ iq = event.stanza
+ si = xpath.queryForNodes('/iq/si[@xmlns="%s"]' % NS_SI,
+ iq)[0]
+ value = xpath.queryForNodes('/si/feature/x/field/value', si)
+ assert len(value) == 1
+ proto = value[0]
+ assert str(proto) == NS_IBB
+ tube = xpath.queryForNodes('/si/tube[@xmlns="%s"]' % NS_TUBES, si)
+ assert len(tube) == 1
+
+ # Init the IBB bytestream
+ iq = IQ(stream, 'set')
+ iq['to'] = 'test at localhost/Resource'
+ iq['from'] = 'bob at localhost/Bob'
+ open = iq.addElement((NS_IBB, 'open'))
+ open['sid'] = 'beta'
+ open['block-size'] = '4096'
+ stream.send(iq)
+
+ event = q.expect('dbus-return', method='AcceptDBusTube')
+ address = event.value[0]
+ # FIXME: this is currently broken. See FIXME in tubes-channel.c
+ #assert len(address) > 0
+
+ event = q.expect('dbus-signal', signal='TubeStateChanged',
+ args=[69, 2]) # 2 == OPEN
+ id = event.args[0]
+ state = event.args[1]
+
+ # OK, we're done
+ conn.Disconnect()
+
+ q.expect('tube-signal', signal='Disconnected')
+ q.expect('dbus-signal', signal='StatusChanged', args=[2, 1])
+
+if __name__ == '__main__':
+ exec_test(test)
diff --git a/tests/twisted/tubes/test-si-tubes.py b/tests/twisted/tubes/test-si-tubes.py
deleted file mode 100644
index 3ef8e6b..0000000
--- a/tests/twisted/tubes/test-si-tubes.py
+++ /dev/null
@@ -1,545 +0,0 @@
-"""Test 1-1 tubes support."""
-
-import base64
-import errno
-import os
-
-import dbus
-from dbus.connection import Connection
-from dbus.lowlevel import SignalMessage
-
-from servicetest import call_async, EventPattern, tp_name_prefix, watch_tube_signals
-from gabbletest import exec_test, acknowledge_iq
-
-from twisted.words.xish import domish, xpath
-from twisted.internet.protocol import Factory, Protocol
-from twisted.internet import reactor
-from twisted.words.protocols.jabber.client import IQ
-
-from gabbleconfig import HAVE_DBUS_TUBES
-
-NS_TUBES = 'http://telepathy.freedesktop.org/xmpp/tubes'
-NS_SI = 'http://jabber.org/protocol/si'
-NS_FEATURE_NEG = 'http://jabber.org/protocol/feature-neg'
-NS_IBB = 'http://jabber.org/protocol/ibb'
-NS_X_DATA = 'jabber:x:data'
-
-sample_parameters = dbus.Dictionary({
- 's': 'hello',
- 'ay': dbus.ByteArray('hello'),
- 'u': dbus.UInt32(123),
- 'i': dbus.Int32(-123),
- }, signature='sv')
-
-
-class Echo(Protocol):
- def dataReceived(self, data):
- self.transport.write(data)
-
-def set_up_echo():
- factory = Factory()
- factory.protocol = Echo
- try:
- os.remove(os.getcwd() + '/stream')
- except OSError, e:
- if e.errno != errno.ENOENT:
- raise
- reactor.listenUNIX(os.getcwd() + '/stream', factory)
-
-
-def test(q, bus, conn, stream):
- set_up_echo()
- conn.Connect()
-
- properties = conn.GetAll(
- 'org.freedesktop.Telepathy.Connection.Interface.Requests',
- dbus_interface='org.freedesktop.DBus.Properties')
- assert properties.get('Channels') == [], properties['Channels']
- assert ({'org.freedesktop.Telepathy.Channel.ChannelType':
- 'org.freedesktop.Telepathy.Channel.Type.Tubes',
- 'org.freedesktop.Telepathy.Channel.TargetHandleType': 1,
- },
- ['org.freedesktop.Telepathy.Channel.TargetHandle',
- 'org.freedesktop.Telepathy.Channel.TargetID',
- ],
- ) in properties.get('RequestableChannelClasses'),\
- properties['RequestableChannelClasses']
-
- _, vcard_event, roster_event = q.expect_many(
- EventPattern('dbus-signal', signal='StatusChanged', args=[0, 1]),
- EventPattern('stream-iq', to=None, query_ns='vcard-temp',
- query_name='vCard'),
- EventPattern('stream-iq', query_ns='jabber:iq:roster'))
-
- acknowledge_iq(stream, vcard_event.stanza)
-
- roster = roster_event.stanza
- roster['type'] = 'result'
- item = roster_event.query.addElement('item')
- item['jid'] = 'bob at localhost'
- item['subscription'] = 'both'
- stream.send(roster)
-
- presence = domish.Element(('jabber:client', 'presence'))
- presence['from'] = 'bob at localhost/Bob'
- presence['to'] = 'test at localhost/Resource'
- c = presence.addElement('c')
- c['xmlns'] = 'http://jabber.org/protocol/caps'
- c['node'] = 'http://example.com/ICantBelieveItsNotTelepathy'
- c['ver'] = '1.2.3'
- stream.send(presence)
-
- event = q.expect('stream-iq', iq_type='get',
- query_ns='http://jabber.org/protocol/disco#info',
- to='bob at localhost/Bob')
- result = event.stanza
- result['type'] = 'result'
- assert event.query['node'] == \
- 'http://example.com/ICantBelieveItsNotTelepathy#1.2.3'
- feature = event.query.addElement('feature')
- feature['var'] = NS_TUBES
- stream.send(result)
-
- bob_handle = conn.RequestHandles(1, ['bob at localhost'])[0]
-
- call_async(q, conn, 'RequestChannel',
- tp_name_prefix + '.Channel.Type.Tubes', 1, bob_handle, True);
-
- ret, old_sig, new_sig = q.expect_many(
- EventPattern('dbus-return', method='RequestChannel'),
- EventPattern('dbus-signal', signal='NewChannel'),
- EventPattern('dbus-signal', signal='NewChannels'),
- )
-
-
- assert len(ret.value) == 1
- chan_path = ret.value[0]
-
- assert old_sig.args[0] == chan_path
- assert old_sig.args[1] == tp_name_prefix + '.Channel.Type.Tubes'
- assert old_sig.args[2] == 1 # contact handle
- assert old_sig.args[3] == bob_handle
- assert old_sig.args[4] == True # suppress handler
-
- assert len(new_sig.args) == 1
- assert len(new_sig.args[0]) == 1 # one channel
- assert len(new_sig.args[0][0]) == 2 # two struct members
- assert new_sig.args[0][0][0] == ret.value[0]
- emitted_props = new_sig.args[0][0][1]
-
- assert emitted_props[tp_name_prefix + '.Channel.ChannelType'] ==\
- tp_name_prefix + '.Channel.Type.Tubes'
- assert emitted_props[tp_name_prefix + '.Channel.TargetHandleType'] == 1
- assert emitted_props[tp_name_prefix + '.Channel.TargetHandle'] ==\
- bob_handle
- assert emitted_props[tp_name_prefix + '.Channel.TargetID'] == \
- 'bob at localhost'
- assert emitted_props[tp_name_prefix + '.Channel.Requested'] == True
- assert emitted_props[tp_name_prefix + '.Channel.InitiatorHandle'] \
- == conn.GetSelfHandle()
- assert emitted_props[tp_name_prefix + '.Channel.InitiatorID'] == \
- 'test at localhost'
-
- properties = conn.GetAll(
- 'org.freedesktop.Telepathy.Connection.Interface.Requests',
- dbus_interface='org.freedesktop.DBus.Properties')
-
- assert new_sig.args[0][0] in properties['Channels'], \
- (new_sig.args[0][0], properties['Channels'])
-
- tubes_chan = bus.get_object(conn.bus_name, chan_path)
- tubes_iface = dbus.Interface(tubes_chan,
- tp_name_prefix + '.Channel.Type.Tubes')
-
- # Exercise basic Channel Properties from spec 0.17.7
- channel_props = tubes_chan.GetAll(
- 'org.freedesktop.Telepathy.Channel',
- dbus_interface='org.freedesktop.DBus.Properties')
- assert channel_props.get('TargetHandle') == bob_handle,\
- (channel_props.get('TargetHandle'), bob_handle)
- assert channel_props.get('TargetHandleType') == 1,\
- channel_props.get('TargetHandleType')
- assert channel_props.get('ChannelType') == \
- 'org.freedesktop.Telepathy.Channel.Type.Tubes',\
- channel_props.get('ChannelType')
- assert 'Interfaces' in channel_props, channel_props
- assert 'org.freedesktop.Telepathy.Channel.Interface.Group' not in \
- channel_props['Interfaces'], \
- channel_props['Interfaces']
- assert channel_props['TargetID'] == 'bob at localhost'
-
- self_handle = conn.GetSelfHandle()
-
- assert channel_props['Requested'] == True
- assert channel_props['InitiatorID'] == 'test at localhost'
- assert channel_props['InitiatorHandle'] == self_handle
-
- # Unix socket
- path = os.getcwd() + '/stream'
- call_async(q, tubes_iface, 'OfferStreamTube',
- 'echo', sample_parameters, 0, dbus.ByteArray(path), 0, "")
-
- event = q.expect('stream-message')
- message = event.stanza
- tube_nodes = xpath.queryForNodes('/message/tube[@xmlns="%s"]' % NS_TUBES,
- message)
- if tube_nodes is None:
- return False
-
- assert len(tube_nodes) == 1
- tube = tube_nodes[0]
-
- assert tube['service'] == 'echo'
- assert tube['type'] == 'stream'
- assert not tube.hasAttribute('initiator')
- stream_tube_id = long(tube['id'])
-
- params = {}
- parameter_nodes = xpath.queryForNodes('/tube/parameters/parameter', tube)
- for node in parameter_nodes:
- assert node['name'] not in params
- params[node['name']] = (node['type'], str(node))
- assert params == {'ay': ('bytes', 'aGVsbG8='),
- 's': ('str', 'hello'),
- 'i': ('int', '-123'),
- 'u': ('uint', '123'),
- }
-
- # The CM is the server, so fake a client wanting to talk to it
- iq = IQ(stream, 'set')
- iq['to'] = 'test at localhost/Resource'
- iq['from'] = 'bob at localhost/Bob'
- si = iq.addElement((NS_SI, 'si'))
- si['id'] = 'alpha'
- si['profile'] = NS_TUBES
- feature = si.addElement((NS_FEATURE_NEG, 'feature'))
- x = feature.addElement((NS_X_DATA, 'x'))
- x['type'] = 'form'
- field = x.addElement((None, 'field'))
- field['var'] = 'stream-method'
- field['type'] = 'list-single'
- option = field.addElement((None, 'option'))
- value = option.addElement((None, 'value'))
- value.addContent(NS_IBB)
-
- stream_node = si.addElement((NS_TUBES, 'stream'))
- stream_node['tube'] = str(stream_tube_id)
- stream.send(iq)
-
- si_reply_event, _ = q.expect_many(
- EventPattern('stream-iq', iq_type='result'),
- EventPattern('dbus-signal', signal='TubeStateChanged',
- args=[stream_tube_id, 2])) # 2 == OPEN
- iq = si_reply_event.stanza
- si = xpath.queryForNodes('/iq/si[@xmlns="%s"]' % NS_SI,
- iq)[0]
- value = xpath.queryForNodes('/si/feature/x/field/value', si)
- assert len(value) == 1
- proto = value[0]
- assert str(proto) == NS_IBB
- tube = xpath.queryForNodes('/si/tube[@xmlns="%s"]' % NS_TUBES, si)
- assert len(tube) == 1
-
- q.expect('dbus-signal', signal='StreamTubeNewConnection',
- args=[stream_tube_id, bob_handle])
-
- tubes = tubes_iface.ListTubes(byte_arrays=True)
- assert tubes == [(
- stream_tube_id,
- self_handle,
- 1, # Unix stream
- 'echo',
- sample_parameters,
- 2, # OPEN
- )]
-
- # have the fake client open the stream
- iq = IQ(stream, 'set')
- iq['to'] = 'test at localhost/Resource'
- iq['from'] = 'bob at localhost/Bob'
- open = iq.addElement((NS_IBB, 'open'))
- open['sid'] = 'alpha'
- open['block-size'] = '4096'
- stream.send(iq)
-
- q.expect('stream-iq', iq_type='result')
- # have the fake client send us some data
- message = domish.Element(('jabber:client', 'message'))
- message['to'] = 'test at localhost/Resource'
- message['from'] = 'bob at localhost/Bob'
- data_node = message.addElement((NS_IBB, 'data'))
- data_node['sid'] = 'alpha'
- data_node['seq'] = '0'
- data_node.addContent(base64.b64encode('hello, world'))
- stream.send(message)
-
- event = q.expect('stream-message', to='bob at localhost/Bob')
- message = event.stanza
-
- data_nodes = xpath.queryForNodes('/message/data[@xmlns="%s"]' % NS_IBB,
- message)
- assert data_nodes is not None
- assert len(data_nodes) == 1
- ibb_data = data_nodes[0]
- assert ibb_data['sid'] == 'alpha'
- binary = base64.b64decode(str(ibb_data))
- assert binary == 'hello, world'
-
- if not HAVE_DBUS_TUBES:
- return
-
- # OK, how about D-Bus?
- call_async(q, tubes_iface, 'OfferDBusTube',
- 'com.example.TestCase', sample_parameters)
-
- event = q.expect('stream-iq', iq_type='set', to='bob at localhost/Bob')
- iq = event.stanza
- si_nodes = xpath.queryForNodes('/iq/si', iq)
- if si_nodes is None:
- return False
-
- assert len(si_nodes) == 1
- si = si_nodes[0]
- assert si['profile'] == NS_TUBES
- dbus_stream_id = si['id']
-
- feature = xpath.queryForNodes('/si/feature', si)[0]
- x = xpath.queryForNodes('/feature/x', feature)[0]
- assert x['type'] == 'form'
- field = xpath.queryForNodes('/x/field', x)[0]
- assert field['var'] == 'stream-method'
- assert field['type'] == 'list-single'
- value = xpath.queryForNodes('/field/option/value', field)[0]
- assert str(value) == NS_IBB
-
- tube = xpath.queryForNodes('/si/tube', si)[0]
- assert tube['initiator'] == 'test at localhost'
- assert tube['service'] == 'com.example.TestCase'
- assert tube['stream-id'] == dbus_stream_id
- assert not tube.hasAttribute('dbus-name')
- assert tube['type'] == 'dbus'
- dbus_tube_id = long(tube['id'])
-
- params = {}
- parameter_nodes = xpath.queryForNodes('/tube/parameters/parameter', tube)
- for node in parameter_nodes:
- assert node['name'] not in params
- params[node['name']] = (node['type'], str(node))
- assert params == {'ay': ('bytes', 'aGVsbG8='),
- 's': ('str', 'hello'),
- 'i': ('int', '-123'),
- 'u': ('uint', '123'),
- }
-
- result = IQ(stream, 'result')
- result['id'] = iq['id']
- result['from'] = iq['to']
- result['to'] = 'test at localhost/Resource'
- res_si = result.addElement((NS_SI, 'si'))
- res_feature = res_si.addElement((NS_FEATURE_NEG, 'feature'))
- res_x = res_feature.addElement((NS_X_DATA, 'x'))
- res_x['type'] = 'submit'
- res_field = res_x.addElement((None, 'field'))
- res_field['var'] = 'stream-method'
- res_value = res_field.addElement((None, 'value'))
- res_value.addContent(NS_IBB)
-
- stream.send(result)
-
- event = q.expect('stream-iq', iq_type='set', to='bob at localhost/Bob')
- iq = event.stanza
- open = xpath.queryForNodes('/iq/open', iq)[0]
- assert open.uri == NS_IBB
- assert open['sid'] == dbus_stream_id
-
- result = IQ(stream, 'result')
- result['id'] = iq['id']
- result['from'] = iq['to']
- result['to'] = 'test at localhost/Resource'
-
- stream.send(result)
-
- q.expect('dbus-signal', signal='TubeStateChanged',
- args=[dbus_tube_id, 2]) # 2 == OPEN
-
- tubes = tubes_iface.ListTubes(byte_arrays=True)
- assert sorted(tubes) == sorted([(
- dbus_tube_id,
- self_handle,
- 0, # DBUS
- 'com.example.TestCase',
- sample_parameters,
- 2, # OPEN
- ),(
- stream_tube_id,
- self_handle,
- 1, # stream
- 'echo',
- sample_parameters,
- 2, # OPEN
- )])
-
- dbus_tube_adr = tubes_iface.GetDBusTubeAddress(dbus_tube_id)
- dbus_tube_conn = Connection(dbus_tube_adr)
-
- signal = SignalMessage('/', 'foo.bar', 'baz')
- my_bus_name = ':123.whatever.you.like'
- signal.set_sender(my_bus_name)
- signal.append(42, signature='u')
- dbus_tube_conn.send_message(signal)
-
- event = q.expect('stream-message')
- message = event.stanza
-
- assert message['to'] == 'bob at localhost/Bob'
-
- data_nodes = xpath.queryForNodes('/message/data[@xmlns="%s"]' % NS_IBB,
- message)
- assert data_nodes is not None
- assert len(data_nodes) == 1
- ibb_data = data_nodes[0]
- assert ibb_data['sid'] == dbus_stream_id
- binary = base64.b64decode(str(ibb_data))
- # little and big endian versions of: SIGNAL, NO_REPLY, protocol v1,
- # 4-byte payload
- assert binary.startswith('l\x04\x01\x01' '\x04\x00\x00\x00') or \
- binary.startswith('B\x04\x01\x01' '\x00\x00\x00\x04')
- # little and big endian versions of the 4-byte payload, UInt32(42)
- assert (binary[0] == 'l' and binary.endswith('\x2a\x00\x00\x00')) or \
- (binary[0] == 'B' and binary.endswith('\x00\x00\x00\x2a'))
- # XXX: verify that it's actually in the "sender" slot, rather than just
- # being in the message somewhere
- assert my_bus_name in binary
-
- watch_tube_signals(q, dbus_tube_conn)
-
- dbus_message = binary
- seq = 0
-
- # Have the fake client send us a message all in one go...
- msg = domish.Element(('jabber:client', 'message'))
- msg['to'] = 'test at localhost/Resource'
- msg['from'] = 'bob at localhost/Bob'
- data_node = msg.addElement('data', NS_IBB)
- data_node['sid'] = dbus_stream_id
- data_node['seq'] = str(seq)
- data_node.addContent(base64.b64encode(dbus_message))
- stream.send(msg)
- seq += 1
-
- # ... and a message one byte at a time ...
-
- for byte in dbus_message:
- msg = domish.Element(('jabber:client', 'message'))
- msg['to'] = 'test at localhost/Resource'
- msg['from'] = 'bob at localhost/Bob'
- data_node = msg.addElement('data', NS_IBB)
- data_node['sid'] = dbus_stream_id
- data_node['seq'] = str(seq)
- data_node.addContent(base64.b64encode(byte))
- stream.send(msg)
- seq += 1
-
- # ... and two messages in one go
-
- msg = domish.Element(('jabber:client', 'message'))
- msg['to'] = 'test at localhost/Resource'
- msg['from'] = 'bob at localhost/Bob'
- data_node = msg.addElement('data', NS_IBB)
- data_node['sid'] = dbus_stream_id
- data_node['seq'] = str(seq)
- data_node.addContent(base64.b64encode(dbus_message + dbus_message))
- stream.send(msg)
- seq += 1
-
- q.expect('tube-signal', signal='baz', args=[42], tube=dbus_tube_conn)
- q.expect('tube-signal', signal='baz', args=[42], tube=dbus_tube_conn)
- q.expect('tube-signal', signal='baz', args=[42], tube=dbus_tube_conn)
- q.expect('tube-signal', signal='baz', args=[42], tube=dbus_tube_conn)
-
- # OK, now let's try to accept a D-Bus tube
- iq = IQ(stream, 'set')
- iq['to'] = 'test at localhost/Resource'
- iq['from'] = 'bob at localhost/Bob'
- si = iq.addElement((NS_SI, 'si'))
- si['id'] = 'beta'
- si['profile'] = NS_TUBES
- feature = si.addElement((NS_FEATURE_NEG, 'feature'))
- x = feature.addElement((NS_X_DATA, 'x'))
- x['type'] = 'form'
- field = x.addElement((None, 'field'))
- field['var'] = 'stream-method'
- field['type'] = 'list-single'
- option = field.addElement((None, 'option'))
- value = option.addElement((None, 'value'))
- value.addContent(NS_IBB)
-
- tube = si.addElement((NS_TUBES, 'tube'))
- tube['type'] = 'dbus'
- tube['service'] = 'com.example.TestCase2'
- tube['id'] = '69'
- parameters = tube.addElement((None, 'parameters'))
- parameter = parameters.addElement((None, 'parameter'))
- parameter['type'] = 'str'
- parameter['name'] = 'login'
- parameter.addContent('TEST')
-
- stream.send(iq)
-
- event = q.expect('dbus-signal', signal='NewTube')
- id = event.args[0]
- initiator = event.args[1]
- type = event.args[2]
- service = event.args[3]
- parameters = event.args[4]
- state = event.args[5]
-
- assert id == 69
- initiator_jid = conn.InspectHandles(1, [initiator])[0]
- assert initiator_jid == 'bob at localhost'
- assert type == 0 # D-Bus tube
- assert service == 'com.example.TestCase2'
- assert parameters == {'login': 'TEST'}
- assert state == 0 # local pending
-
- # accept the tube
- call_async(q, tubes_iface, 'AcceptDBusTube', id)
-
- event = q.expect('stream-iq', iq_type='result')
- iq = event.stanza
- si = xpath.queryForNodes('/iq/si[@xmlns="%s"]' % NS_SI,
- iq)[0]
- value = xpath.queryForNodes('/si/feature/x/field/value', si)
- assert len(value) == 1
- proto = value[0]
- assert str(proto) == NS_IBB
- tube = xpath.queryForNodes('/si/tube[@xmlns="%s"]' % NS_TUBES, si)
- assert len(tube) == 1
-
- # Init the IBB bytestream
- iq = IQ(stream, 'set')
- iq['to'] = 'test at localhost/Resource'
- iq['from'] = 'bob at localhost/Bob'
- open = iq.addElement((NS_IBB, 'open'))
- open['sid'] = 'beta'
- open['block-size'] = '4096'
- stream.send(iq)
-
- event = q.expect('dbus-return', method='AcceptDBusTube')
- address = event.value[0]
- # FIXME: this is currently broken. See FIXME in tubes-channel.c
- #assert len(address) > 0
-
- event = q.expect('dbus-signal', signal='TubeStateChanged',
- args=[69, 2]) # 2 == OPEN
- id = event.args[0]
- state = event.args[1]
-
- # OK, we're done
- conn.Disconnect()
-
- q.expect('tube-signal', signal='Disconnected')
- q.expect('dbus-signal', signal='StatusChanged', args=[2, 1])
-
-if __name__ == '__main__':
- exec_test(test)
--
1.5.6.5
More information about the Telepathy-commits
mailing list