[telepathy-gabble/master] file_transfer_helper.py: test bugged CONNECT cmd
Guillaume Desmottes
guillaume.desmottes at collabora.co.uk
Fri Apr 3 09:26:03 PDT 2009
---
.../twisted/file-transfer/file_transfer_helper.py | 55 +++++++++++++++++++-
1 files changed, 54 insertions(+), 1 deletions(-)
diff --git a/tests/twisted/file-transfer/file_transfer_helper.py b/tests/twisted/file-transfer/file_transfer_helper.py
index b17f132..d452808 100644
--- a/tests/twisted/file-transfer/file_transfer_helper.py
+++ b/tests/twisted/file-transfer/file_transfer_helper.py
@@ -10,10 +10,11 @@ import ns
from bytestream import parse_si_offer, create_si_reply, parse_ibb_open, parse_ibb_msg_data,\
create_si_offer, parse_si_reply, send_ibb_open, send_ibb_msg_data, listen_socks5, \
send_socks5_init, socks5_expect_connection, expect_socks5_init, socks5_connect, \
- send_socks5_reply
+ send_socks5_reply, S5BFactory
from twisted.words.xish import domish, xpath
from twisted.words.protocols.jabber.client import IQ
+from twisted.internet import reactor
from dbus import PROPERTIES_IFACE
@@ -602,3 +603,55 @@ class BytestreamS5BBugged(BytestreamS5B):
EventPattern('dbus-signal', signal='FileTransferStateChanged'))
return offset_event, state_event
+
+ def wait_bytestream_open(self, initiator, receiver):
+ id, mode, sid, hosts = expect_socks5_init(self.q)
+
+ for jid, host, port in hosts:
+ assert jid == initiator, jid
+
+ assert mode == 'tcp'
+ assert sid == self.stream_id
+ jid, host, port = hosts[0]
+
+ # FIXME: we should share lot of code with socks5_connect
+ # once we'll have refactored Bytestream test objects
+
+ target = receiver
+
+ reactor.connectTCP(host, port, S5BFactory(self.q.append))
+
+ event = self.q.expect('s5b-connected')
+ transport = event.transport
+ transport.write('\x05\x01\x00') #version 5, 1 auth method, no auth
+
+ event = self.q.expect('s5b-data-received')
+ event.data == '\x05\x00' # version 5, no auth
+
+ # sha-1(sid + initiator + target)
+ unhashed_domain = sid + initiator + target
+ hashed_domain = sha.new(unhashed_domain).hexdigest()
+
+ # send CONNECT command
+ # version 5, connect, reserved, domain type
+ connect = '\x05\x01\x00\x03'
+ # Pidgin is not *that* bugged but let's pretend it is
+ domain = '127.0.0.1'
+ connect += chr(len(domain)) # len (SHA-1)
+ connect += domain
+ connect += '\x00\x00' # port
+ transport.write(connect)
+
+ # wait for CONNECT reply
+ event = self.q.expect('s5b-data-received')
+ # version 5, succeed, reserved, domain type
+ expected_reply = '\x05\x00\x00\x03'
+ expected_reply += chr(40) # len (SHA-1)
+ expected_reply += hashed_domain
+ expected_reply += '\x00\x00' # port
+ assert event.data == expected_reply
+
+ self.transport = transport
+
+ send_socks5_reply(self.stream, receiver, initiator, id, jid)
+
--
1.5.6.5
More information about the telepathy-commits
mailing list