[telepathy-gabble/master] file_transfer_helper.py: test bugged CONNECT cmd

Guillaume Desmottes guillaume.desmottes at collabora.co.uk
Fri Apr 3 09:26:03 PDT 2009


---
 .../twisted/file-transfer/file_transfer_helper.py  |   55 +++++++++++++++++++-
 1 files changed, 54 insertions(+), 1 deletions(-)

diff --git a/tests/twisted/file-transfer/file_transfer_helper.py b/tests/twisted/file-transfer/file_transfer_helper.py
index b17f132..d452808 100644
--- a/tests/twisted/file-transfer/file_transfer_helper.py
+++ b/tests/twisted/file-transfer/file_transfer_helper.py
@@ -10,10 +10,11 @@ import ns
 from bytestream import parse_si_offer, create_si_reply, parse_ibb_open, parse_ibb_msg_data,\
     create_si_offer, parse_si_reply, send_ibb_open, send_ibb_msg_data, listen_socks5, \
     send_socks5_init, socks5_expect_connection, expect_socks5_init, socks5_connect, \
-    send_socks5_reply
+    send_socks5_reply, S5BFactory
 
 from twisted.words.xish import domish, xpath
 from twisted.words.protocols.jabber.client import IQ
+from twisted.internet import reactor
 
 from dbus import PROPERTIES_IFACE
 
@@ -602,3 +603,55 @@ class BytestreamS5BBugged(BytestreamS5B):
             EventPattern('dbus-signal', signal='FileTransferStateChanged'))
 
         return offset_event, state_event
+
+    def wait_bytestream_open(self, initiator, receiver):
+        id, mode, sid, hosts = expect_socks5_init(self.q)
+
+        for jid, host, port in hosts:
+            assert jid == initiator, jid
+
+        assert mode == 'tcp'
+        assert sid == self.stream_id
+        jid, host, port = hosts[0]
+
+        # FIXME: we should share lot of code with socks5_connect
+        # once we'll have refactored Bytestream test objects
+
+        target = receiver
+
+        reactor.connectTCP(host, port, S5BFactory(self.q.append))
+
+        event = self.q.expect('s5b-connected')
+        transport = event.transport
+        transport.write('\x05\x01\x00') #version 5, 1 auth method, no auth
+
+        event = self.q.expect('s5b-data-received')
+        event.data == '\x05\x00' # version 5, no auth
+
+        # sha-1(sid + initiator + target)
+        unhashed_domain = sid + initiator + target
+        hashed_domain = sha.new(unhashed_domain).hexdigest()
+
+        # send CONNECT command
+        # version 5, connect, reserved, domain type
+        connect = '\x05\x01\x00\x03'
+        # Pidgin is not *that* bugged but let's pretend it is
+        domain = '127.0.0.1'
+        connect += chr(len(domain)) # len (SHA-1)
+        connect += domain
+        connect += '\x00\x00' # port
+        transport.write(connect)
+
+        # wait for CONNECT reply
+        event = self.q.expect('s5b-data-received')
+        # version 5, succeed, reserved, domain type
+        expected_reply = '\x05\x00\x00\x03'
+        expected_reply += chr(40) # len (SHA-1)
+        expected_reply += hashed_domain
+        expected_reply += '\x00\x00' # port
+        assert event.data == expected_reply
+
+        self.transport = transport
+
+        send_socks5_reply(self.stream, receiver, initiator, id, jid)
+
-- 
1.5.6.5




More information about the telepathy-commits mailing list