[Telepathy-commits] [telepathy-gabble/master] rename test-si-accept-tubes.py to accept-private-stream-tube.py
Guillaume Desmottes
guillaume.desmottes at collabora.co.uk
Mon Jan 12 05:05:09 PST 2009
---
tests/twisted/tubes/accept-private-stream-tube.py | 271 +++++++++++++++++++++
tests/twisted/tubes/test-si-accept-tubes.py | 271 ---------------------
2 files changed, 271 insertions(+), 271 deletions(-)
create mode 100644 tests/twisted/tubes/accept-private-stream-tube.py
delete mode 100644 tests/twisted/tubes/test-si-accept-tubes.py
diff --git a/tests/twisted/tubes/accept-private-stream-tube.py b/tests/twisted/tubes/accept-private-stream-tube.py
new file mode 100644
index 0000000..1f82082
--- /dev/null
+++ b/tests/twisted/tubes/accept-private-stream-tube.py
@@ -0,0 +1,271 @@
+"""
+Receives several tube offers:
+- Test to accept a 1-1 stream tube
+ - using UNIX sockets and IPv4 sockets
+ - using the old tube iface and the new tube iface
+- Test to accept with bad parameters
+- Test to refuse the tube offer
+ - using the old tube iface and the new tube iface
+"""
+
+import dbus
+
+from servicetest import call_async, EventPattern, tp_name_prefix, \
+ EventProtocolClientFactory
+from gabbletest import exec_test, acknowledge_iq, send_error_reply
+
+from twisted.words.xish import domish, xpath
+from twisted.internet import reactor
+
+NS_TUBES = 'http://telepathy.freedesktop.org/xmpp/tubes'
+NS_SI = 'http://jabber.org/protocol/si'
+NS_IBB = 'http://jabber.org/protocol/ibb'
+
+bob_jid = 'bob at localhost/Bob'
+stream_tube_id = 49
+
+def receive_tube_offer(q, bus, conn, stream):
+ message = domish.Element(('jabber:client', 'message'))
+ message['to'] = 'test at localhost/Resource'
+ message['from'] = bob_jid
+ tube_node = message.addElement((NS_TUBES, 'tube'))
+ tube_node['type'] = 'stream'
+ tube_node['service'] = 'http'
+ tube_node['id'] = str(stream_tube_id)
+ stream.send(message)
+
+ old_sig, new_sig = q.expect_many(
+ EventPattern('dbus-signal', signal='NewChannel'),
+ EventPattern('dbus-signal', signal='NewChannels'),
+ )
+ chan_path = old_sig.args[0]
+ assert old_sig.args[1] == \
+ 'org.freedesktop.Telepathy.Channel.Type.Tubes', \
+ old_sig.args[1]
+ assert old_sig.args[2] == 1 # Handle_Type_Contact
+ bob_handle = old_sig.args[3]
+ assert old_sig.args[2] == 1, old_sig.args[2] # Suppress_Handler
+ assert len(new_sig.args) == 1
+ assert len(new_sig.args[0]) == 1
+ assert new_sig.args[0][0][0] == chan_path, new_sig.args[0][0]
+ assert new_sig.args[0][0][1] is not None
+
+ event = q.expect('dbus-signal', signal='NewTube')
+
+ old_sig, new_sig = q.expect_many(
+ EventPattern('dbus-signal', signal='NewChannel'),
+ EventPattern('dbus-signal', signal='NewChannels'),
+ )
+ new_chan_path = old_sig.args[0]
+ assert new_chan_path != chan_path
+ assert old_sig.args[1] == \
+ 'org.freedesktop.Telepathy.Channel.Type.StreamTube.DRAFT', \
+ old_sig.args[1]
+ assert old_sig.args[2] == 1 # Handle_Type_Contact
+ bob_handle = old_sig.args[3]
+ assert old_sig.args[2] == 1, old_sig.args[2] # Suppress_Handler
+ assert len(new_sig.args) == 1
+ assert len(new_sig.args[0]) == 1
+ assert new_sig.args[0][0][0] == new_chan_path, new_sig.args[0][0]
+ assert new_sig.args[0][0][1] is not None
+
+ # create channel proxies
+ tubes_chan = bus.get_object(conn.bus_name, chan_path)
+ tubes_iface = dbus.Interface(tubes_chan,
+ tp_name_prefix + '.Channel.Type.Tubes')
+
+ new_tubes_chan = bus.get_object(conn.bus_name, new_chan_path)
+ new_tubes_iface = dbus.Interface(new_tubes_chan,
+ tp_name_prefix + '.Channel.Type.StreamTube.DRAFT')
+
+ return (tubes_chan, tubes_iface, new_tubes_chan, new_tubes_iface)
+
+def expect_tube_activity(q, bus, conn, stream):
+ event_socket, event_iq = q.expect_many(
+ EventPattern('socket-connected'),
+ EventPattern('stream-iq', to=bob_jid, query_ns=NS_SI,
+ query_name='si'))
+ protocol = event_socket.protocol
+ protocol.sendData("hello initiator")
+
+ iq = event_iq.stanza
+ si = xpath.queryForNodes('/iq/si[@xmlns="%s"]' % NS_SI,
+ iq)[0]
+ values = xpath.queryForNodes(
+ '/si/feature[@xmlns="%s"]/x[@xmlns="%s"]/field/option/value'
+ % ('http://jabber.org/protocol/feature-neg', 'jabber:x:data'), si)
+ assert NS_IBB in [str(v) for v in values]
+
+ stream_node = xpath.queryForNodes('/si/stream[@xmlns="%s"]' %
+ NS_TUBES, si)[0]
+ assert stream_node is not None
+ assert stream_node['tube'] == str(stream_tube_id)
+ stream_id = si['id']
+
+ send_error_reply(stream, iq)
+
+def test(q, bus, conn, stream):
+ conn.Connect()
+
+ _, vcard_event, roster_event = q.expect_many(
+ EventPattern('dbus-signal', signal='StatusChanged', args=[0, 1]),
+ EventPattern('stream-iq', to=None, query_ns='vcard-temp',
+ query_name='vCard'),
+ EventPattern('stream-iq', query_ns='jabber:iq:roster'))
+
+ acknowledge_iq(stream, vcard_event.stanza)
+
+ roster = roster_event.stanza
+ roster['type'] = 'result'
+ item = roster_event.query.addElement('item')
+ item['jid'] = 'bob at localhost' # Bob can do tubes
+ item['subscription'] = 'both'
+ stream.send(roster)
+
+ # Send Bob presence and his caps
+ presence = domish.Element(('jabber:client', 'presence'))
+ presence['from'] = 'bob at localhost/Bob'
+ presence['to'] = 'test at localhost/Resource'
+ c = presence.addElement('c')
+ c['xmlns'] = 'http://jabber.org/protocol/caps'
+ c['node'] = 'http://example.com/ICantBelieveItsNotTelepathy'
+ c['ver'] = '1.2.3'
+ stream.send(presence)
+
+ event = q.expect('stream-iq', iq_type='get',
+ query_ns='http://jabber.org/protocol/disco#info',
+ to='bob at localhost/Bob')
+ result = event.stanza
+ result['type'] = 'result'
+ assert event.query['node'] == \
+ 'http://example.com/ICantBelieveItsNotTelepathy#1.2.3'
+ feature = event.query.addElement('feature')
+ feature['var'] = NS_TUBES
+ stream.send(result)
+
+ # Receive a tube offer from Bob
+ (tubes_chan, tubes_iface, new_tubes_chan, new_tubes_iface) = \
+ receive_tube_offer(q, bus, conn, stream)
+
+ # Try bad parameters on the old iface
+ call_async(q, tubes_iface, 'AcceptStreamTube', stream_tube_id+1, 2, 0, '',
+ byte_arrays=True)
+ q.expect('dbus-error', method='AcceptStreamTube')
+ call_async(q, tubes_iface, 'AcceptStreamTube', stream_tube_id, 2, 1, '',
+ byte_arrays=True)
+ q.expect('dbus-error', method='AcceptStreamTube')
+ call_async(q, tubes_iface, 'AcceptStreamTube', stream_tube_id, 20, 0, '',
+ byte_arrays=True)
+ q.expect('dbus-error', method='AcceptStreamTube')
+
+ # Try bad parameters on the new iface
+ call_async(q, new_tubes_iface, 'AcceptStreamTube', 20, 0, '',
+ byte_arrays=True)
+ q.expect('dbus-error', method='AcceptStreamTube')
+ call_async(q, new_tubes_iface, 'AcceptStreamTube', 0, 1, '',
+ byte_arrays=True)
+ q.expect('dbus-error', method='AcceptStreamTube')
+
+ # Accept the tube with old iface, and use IPv4
+ call_async(q, tubes_iface, 'AcceptStreamTube', stream_tube_id, 2, 0, '',
+ byte_arrays=True)
+
+ accept_return_event, _ = q.expect_many(
+ EventPattern('dbus-return', method='AcceptStreamTube'),
+ EventPattern('dbus-signal', signal='TubeStateChanged',
+ args=[stream_tube_id, 2]))
+
+ ip = accept_return_event.value[0][0]
+ port = accept_return_event.value[0][1]
+ assert port > 0
+
+ factory = EventProtocolClientFactory(q)
+ reactor.connectTCP(ip, port, factory)
+
+ expect_tube_activity(q, bus, conn, stream)
+ tubes_chan.Close()
+
+ # Receive a tube offer from Bob
+ (tubes_chan, tubes_iface, new_tubes_chan, new_tubes_iface) = \
+ receive_tube_offer(q, bus, conn, stream)
+
+ # Accept the tube with old iface, and use UNIX sockets
+ call_async(q, tubes_iface, 'AcceptStreamTube', stream_tube_id, 0, 0, '',
+ byte_arrays=True)
+
+ accept_return_event, _ = q.expect_many(
+ EventPattern('dbus-return', method='AcceptStreamTube'),
+ EventPattern('dbus-signal', signal='TubeStateChanged',
+ args=[stream_tube_id, 2]))
+
+ socket_address = accept_return_event.value[0]
+
+ factory = EventProtocolClientFactory(q)
+ reactor.connectUNIX(socket_address, factory)
+
+ expect_tube_activity(q, bus, conn, stream)
+ tubes_chan.Close()
+
+ # Receive a tube offer from Bob
+ (tubes_chan, tubes_iface, new_tubes_chan, new_tubes_iface) = \
+ receive_tube_offer(q, bus, conn, stream)
+
+ # Accept the tube with new iface, and use IPv4
+ call_async(q, new_tubes_iface, 'AcceptStreamTube', 2, 0, '',
+ byte_arrays=True)
+
+ accept_return_event, _ = q.expect_many(
+ EventPattern('dbus-return', method='AcceptStreamTube'),
+ EventPattern('dbus-signal', signal='TubeStateChanged',
+ args=[stream_tube_id, 2]))
+
+ ip = accept_return_event.value[0][0]
+ port = accept_return_event.value[0][1]
+ assert port > 0
+
+ factory = EventProtocolClientFactory(q)
+ reactor.connectTCP(ip, port, factory)
+
+ expect_tube_activity(q, bus, conn, stream)
+ tubes_chan.Close()
+
+ # Receive a tube offer from Bob
+ (tubes_chan, tubes_iface, new_tubes_chan, new_tubes_iface) = \
+ receive_tube_offer(q, bus, conn, stream)
+
+ # Accept the tube with new iface, and use UNIX sockets
+ call_async(q, new_tubes_iface, 'AcceptStreamTube', 0, 0, '',
+ byte_arrays=True)
+
+ accept_return_event, _ = q.expect_many(
+ EventPattern('dbus-return', method='AcceptStreamTube'),
+ EventPattern('dbus-signal', signal='TubeStateChanged',
+ args=[stream_tube_id, 2]))
+
+ socket_address = accept_return_event.value[0]
+
+ factory = EventProtocolClientFactory(q)
+ reactor.connectUNIX(socket_address, factory)
+
+ expect_tube_activity(q, bus, conn, stream)
+ tubes_chan.Close()
+
+ # Receive a tube offer from Bob
+ (tubes_chan, tubes_iface, new_tubes_chan, new_tubes_iface) = \
+ receive_tube_offer(q, bus, conn, stream)
+ # Just close the tube
+ tubes_chan.Close()
+
+ # Receive a tube offer from Bob
+ (tubes_chan, tubes_iface, new_tubes_chan, new_tubes_iface) = \
+ receive_tube_offer(q, bus, conn, stream)
+ # Just close the tube
+ new_tubes_chan.Close()
+
+ # OK, we're done
+ conn.Disconnect()
+
+ q.expect('dbus-signal', signal='StatusChanged', args=[2, 1])
+
+if __name__ == '__main__':
+ exec_test(test)
diff --git a/tests/twisted/tubes/test-si-accept-tubes.py b/tests/twisted/tubes/test-si-accept-tubes.py
deleted file mode 100644
index 1f82082..0000000
--- a/tests/twisted/tubes/test-si-accept-tubes.py
+++ /dev/null
@@ -1,271 +0,0 @@
-"""
-Receives several tube offers:
-- Test to accept a 1-1 stream tube
- - using UNIX sockets and IPv4 sockets
- - using the old tube iface and the new tube iface
-- Test to accept with bad parameters
-- Test to refuse the tube offer
- - using the old tube iface and the new tube iface
-"""
-
-import dbus
-
-from servicetest import call_async, EventPattern, tp_name_prefix, \
- EventProtocolClientFactory
-from gabbletest import exec_test, acknowledge_iq, send_error_reply
-
-from twisted.words.xish import domish, xpath
-from twisted.internet import reactor
-
-NS_TUBES = 'http://telepathy.freedesktop.org/xmpp/tubes'
-NS_SI = 'http://jabber.org/protocol/si'
-NS_IBB = 'http://jabber.org/protocol/ibb'
-
-bob_jid = 'bob at localhost/Bob'
-stream_tube_id = 49
-
-def receive_tube_offer(q, bus, conn, stream):
- message = domish.Element(('jabber:client', 'message'))
- message['to'] = 'test at localhost/Resource'
- message['from'] = bob_jid
- tube_node = message.addElement((NS_TUBES, 'tube'))
- tube_node['type'] = 'stream'
- tube_node['service'] = 'http'
- tube_node['id'] = str(stream_tube_id)
- stream.send(message)
-
- old_sig, new_sig = q.expect_many(
- EventPattern('dbus-signal', signal='NewChannel'),
- EventPattern('dbus-signal', signal='NewChannels'),
- )
- chan_path = old_sig.args[0]
- assert old_sig.args[1] == \
- 'org.freedesktop.Telepathy.Channel.Type.Tubes', \
- old_sig.args[1]
- assert old_sig.args[2] == 1 # Handle_Type_Contact
- bob_handle = old_sig.args[3]
- assert old_sig.args[2] == 1, old_sig.args[2] # Suppress_Handler
- assert len(new_sig.args) == 1
- assert len(new_sig.args[0]) == 1
- assert new_sig.args[0][0][0] == chan_path, new_sig.args[0][0]
- assert new_sig.args[0][0][1] is not None
-
- event = q.expect('dbus-signal', signal='NewTube')
-
- old_sig, new_sig = q.expect_many(
- EventPattern('dbus-signal', signal='NewChannel'),
- EventPattern('dbus-signal', signal='NewChannels'),
- )
- new_chan_path = old_sig.args[0]
- assert new_chan_path != chan_path
- assert old_sig.args[1] == \
- 'org.freedesktop.Telepathy.Channel.Type.StreamTube.DRAFT', \
- old_sig.args[1]
- assert old_sig.args[2] == 1 # Handle_Type_Contact
- bob_handle = old_sig.args[3]
- assert old_sig.args[2] == 1, old_sig.args[2] # Suppress_Handler
- assert len(new_sig.args) == 1
- assert len(new_sig.args[0]) == 1
- assert new_sig.args[0][0][0] == new_chan_path, new_sig.args[0][0]
- assert new_sig.args[0][0][1] is not None
-
- # create channel proxies
- tubes_chan = bus.get_object(conn.bus_name, chan_path)
- tubes_iface = dbus.Interface(tubes_chan,
- tp_name_prefix + '.Channel.Type.Tubes')
-
- new_tubes_chan = bus.get_object(conn.bus_name, new_chan_path)
- new_tubes_iface = dbus.Interface(new_tubes_chan,
- tp_name_prefix + '.Channel.Type.StreamTube.DRAFT')
-
- return (tubes_chan, tubes_iface, new_tubes_chan, new_tubes_iface)
-
-def expect_tube_activity(q, bus, conn, stream):
- event_socket, event_iq = q.expect_many(
- EventPattern('socket-connected'),
- EventPattern('stream-iq', to=bob_jid, query_ns=NS_SI,
- query_name='si'))
- protocol = event_socket.protocol
- protocol.sendData("hello initiator")
-
- iq = event_iq.stanza
- si = xpath.queryForNodes('/iq/si[@xmlns="%s"]' % NS_SI,
- iq)[0]
- values = xpath.queryForNodes(
- '/si/feature[@xmlns="%s"]/x[@xmlns="%s"]/field/option/value'
- % ('http://jabber.org/protocol/feature-neg', 'jabber:x:data'), si)
- assert NS_IBB in [str(v) for v in values]
-
- stream_node = xpath.queryForNodes('/si/stream[@xmlns="%s"]' %
- NS_TUBES, si)[0]
- assert stream_node is not None
- assert stream_node['tube'] == str(stream_tube_id)
- stream_id = si['id']
-
- send_error_reply(stream, iq)
-
-def test(q, bus, conn, stream):
- conn.Connect()
-
- _, vcard_event, roster_event = q.expect_many(
- EventPattern('dbus-signal', signal='StatusChanged', args=[0, 1]),
- EventPattern('stream-iq', to=None, query_ns='vcard-temp',
- query_name='vCard'),
- EventPattern('stream-iq', query_ns='jabber:iq:roster'))
-
- acknowledge_iq(stream, vcard_event.stanza)
-
- roster = roster_event.stanza
- roster['type'] = 'result'
- item = roster_event.query.addElement('item')
- item['jid'] = 'bob at localhost' # Bob can do tubes
- item['subscription'] = 'both'
- stream.send(roster)
-
- # Send Bob presence and his caps
- presence = domish.Element(('jabber:client', 'presence'))
- presence['from'] = 'bob at localhost/Bob'
- presence['to'] = 'test at localhost/Resource'
- c = presence.addElement('c')
- c['xmlns'] = 'http://jabber.org/protocol/caps'
- c['node'] = 'http://example.com/ICantBelieveItsNotTelepathy'
- c['ver'] = '1.2.3'
- stream.send(presence)
-
- event = q.expect('stream-iq', iq_type='get',
- query_ns='http://jabber.org/protocol/disco#info',
- to='bob at localhost/Bob')
- result = event.stanza
- result['type'] = 'result'
- assert event.query['node'] == \
- 'http://example.com/ICantBelieveItsNotTelepathy#1.2.3'
- feature = event.query.addElement('feature')
- feature['var'] = NS_TUBES
- stream.send(result)
-
- # Receive a tube offer from Bob
- (tubes_chan, tubes_iface, new_tubes_chan, new_tubes_iface) = \
- receive_tube_offer(q, bus, conn, stream)
-
- # Try bad parameters on the old iface
- call_async(q, tubes_iface, 'AcceptStreamTube', stream_tube_id+1, 2, 0, '',
- byte_arrays=True)
- q.expect('dbus-error', method='AcceptStreamTube')
- call_async(q, tubes_iface, 'AcceptStreamTube', stream_tube_id, 2, 1, '',
- byte_arrays=True)
- q.expect('dbus-error', method='AcceptStreamTube')
- call_async(q, tubes_iface, 'AcceptStreamTube', stream_tube_id, 20, 0, '',
- byte_arrays=True)
- q.expect('dbus-error', method='AcceptStreamTube')
-
- # Try bad parameters on the new iface
- call_async(q, new_tubes_iface, 'AcceptStreamTube', 20, 0, '',
- byte_arrays=True)
- q.expect('dbus-error', method='AcceptStreamTube')
- call_async(q, new_tubes_iface, 'AcceptStreamTube', 0, 1, '',
- byte_arrays=True)
- q.expect('dbus-error', method='AcceptStreamTube')
-
- # Accept the tube with old iface, and use IPv4
- call_async(q, tubes_iface, 'AcceptStreamTube', stream_tube_id, 2, 0, '',
- byte_arrays=True)
-
- accept_return_event, _ = q.expect_many(
- EventPattern('dbus-return', method='AcceptStreamTube'),
- EventPattern('dbus-signal', signal='TubeStateChanged',
- args=[stream_tube_id, 2]))
-
- ip = accept_return_event.value[0][0]
- port = accept_return_event.value[0][1]
- assert port > 0
-
- factory = EventProtocolClientFactory(q)
- reactor.connectTCP(ip, port, factory)
-
- expect_tube_activity(q, bus, conn, stream)
- tubes_chan.Close()
-
- # Receive a tube offer from Bob
- (tubes_chan, tubes_iface, new_tubes_chan, new_tubes_iface) = \
- receive_tube_offer(q, bus, conn, stream)
-
- # Accept the tube with old iface, and use UNIX sockets
- call_async(q, tubes_iface, 'AcceptStreamTube', stream_tube_id, 0, 0, '',
- byte_arrays=True)
-
- accept_return_event, _ = q.expect_many(
- EventPattern('dbus-return', method='AcceptStreamTube'),
- EventPattern('dbus-signal', signal='TubeStateChanged',
- args=[stream_tube_id, 2]))
-
- socket_address = accept_return_event.value[0]
-
- factory = EventProtocolClientFactory(q)
- reactor.connectUNIX(socket_address, factory)
-
- expect_tube_activity(q, bus, conn, stream)
- tubes_chan.Close()
-
- # Receive a tube offer from Bob
- (tubes_chan, tubes_iface, new_tubes_chan, new_tubes_iface) = \
- receive_tube_offer(q, bus, conn, stream)
-
- # Accept the tube with new iface, and use IPv4
- call_async(q, new_tubes_iface, 'AcceptStreamTube', 2, 0, '',
- byte_arrays=True)
-
- accept_return_event, _ = q.expect_many(
- EventPattern('dbus-return', method='AcceptStreamTube'),
- EventPattern('dbus-signal', signal='TubeStateChanged',
- args=[stream_tube_id, 2]))
-
- ip = accept_return_event.value[0][0]
- port = accept_return_event.value[0][1]
- assert port > 0
-
- factory = EventProtocolClientFactory(q)
- reactor.connectTCP(ip, port, factory)
-
- expect_tube_activity(q, bus, conn, stream)
- tubes_chan.Close()
-
- # Receive a tube offer from Bob
- (tubes_chan, tubes_iface, new_tubes_chan, new_tubes_iface) = \
- receive_tube_offer(q, bus, conn, stream)
-
- # Accept the tube with new iface, and use UNIX sockets
- call_async(q, new_tubes_iface, 'AcceptStreamTube', 0, 0, '',
- byte_arrays=True)
-
- accept_return_event, _ = q.expect_many(
- EventPattern('dbus-return', method='AcceptStreamTube'),
- EventPattern('dbus-signal', signal='TubeStateChanged',
- args=[stream_tube_id, 2]))
-
- socket_address = accept_return_event.value[0]
-
- factory = EventProtocolClientFactory(q)
- reactor.connectUNIX(socket_address, factory)
-
- expect_tube_activity(q, bus, conn, stream)
- tubes_chan.Close()
-
- # Receive a tube offer from Bob
- (tubes_chan, tubes_iface, new_tubes_chan, new_tubes_iface) = \
- receive_tube_offer(q, bus, conn, stream)
- # Just close the tube
- tubes_chan.Close()
-
- # Receive a tube offer from Bob
- (tubes_chan, tubes_iface, new_tubes_chan, new_tubes_iface) = \
- receive_tube_offer(q, bus, conn, stream)
- # Just close the tube
- new_tubes_chan.Close()
-
- # OK, we're done
- conn.Disconnect()
-
- q.expect('dbus-signal', signal='StatusChanged', args=[2, 1])
-
-if __name__ == '__main__':
- exec_test(test)
--
1.5.6.5
More information about the Telepathy-commits
mailing list