[telepathy-gabble/master] bytestream.py: add Bytestream objects

Guillaume Desmottes guillaume.desmottes at collabora.co.uk
Fri Apr 3 09:26:06 PDT 2009


---
 tests/twisted/bytestream.py |  155 +++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 155 insertions(+), 0 deletions(-)

diff --git a/tests/twisted/bytestream.py b/tests/twisted/bytestream.py
index a255c60..7b58902 100644
--- a/tests/twisted/bytestream.py
+++ b/tests/twisted/bytestream.py
@@ -8,8 +8,36 @@ from twisted.words.xish import xpath, domish
 from twisted.internet.error import CannotListenError
 
 from servicetest import Event
+from gabbletest import acknowledge_iq, sync_stream
 import ns
 
+class Bytestream(object):
+    def __init__(self, stream, q, sid, initiator, target):
+        self.stream = stream
+        self.q = q
+
+        self.stream_id = sid
+        self.initiator = initiator
+        self.target = target
+
+    def open_bytestream(self):
+        raise NotImplemented
+
+    def send_data(self, data):
+        raise NotImplemented
+
+    def get_ns(self):
+        raise NotImplemented
+
+    def wait_bytestream_open(self):
+        raise NotImplemented
+
+    def get_data(self):
+        raise NotImplemented
+
+    def wait_bytestream_closed(self):
+        raise NotImplemented
+
 ##### XEP-0095: Stream Initiation #####
 
 def create_si_offer(stream, from_, to, sid, profile, bytestreams):
@@ -78,6 +106,91 @@ def parse_si_reply(iq):
 
 ##### XEP-0065: SOCKS5 Bytestreams #####
 
+class BytestreamS5B(Bytestream):
+    def get_ns(self):
+        return ns.BYTESTREAMS
+
+    def open_bytestream(self):
+        port = listen_socks5(self.q)
+
+        send_socks5_init(self.stream, self.initiator, self.target,
+            self.stream_id, 'tcp', [(self.initiator, '127.0.0.1', port)])
+
+        self.transport = socks5_expect_connection(self.q, self.stream_id,
+            self.initiator, self.target)
+
+    def send_data(self, data):
+        self.transport.write(data)
+
+    def wait_bytestream_open(self):
+        id, mode, sid, hosts = expect_socks5_init(self.q)
+
+        for jid, host, port in hosts:
+            assert jid == self.initiator, jid
+
+        assert mode == 'tcp'
+        assert sid == self.stream_id
+        jid, host, port = hosts[0]
+
+        self.transport = socks5_connect(self.q, host, port, sid, self.initiator,
+            self.target)
+
+        send_socks5_reply(self.stream, self.target, self.initiator, id, jid)
+
+    def get_data(self):
+       e = self.q.expect('s5b-data-received', transport=self.transport)
+       return e.data
+
+    def wait_bytestream_closed(self):
+        self.q.expect('s5b-connection-lost')
+
+class BytestreamS5BPidgin(BytestreamS5B):
+    """Simulate buggy S5B implementation (as Pidgin's one)"""
+    def open_bytestream(self):
+        port = listen_socks5(self.q)
+
+        send_socks5_init(self.stream, self.initiator, self.target,
+            self.stream_id, 'tcp', [(self.initiator, '127.0.0.1', port)])
+
+        # FIXME: we should share lot of code with socks5_expect_connection
+        # once we'll have refactored Bytestream test objects
+        sid = self.stream_id
+        initiator = self.initiator
+        target = self.target
+
+        # wait auth
+        event = self.q.expect('s5b-data-received')
+        assert event.data == '\x05\x01\x00' # version 5, 1 auth method, no auth
+
+        # send auth reply
+        transport = event.transport
+        transport.write('\x05\x00') # version 5, no auth
+        event = self.q.expect('s5b-data-received')
+
+        # sha-1(sid + initiator + target)
+        unhashed_domain = sid + initiator + target
+        hashed_domain = sha.new(unhashed_domain).hexdigest()
+
+        # wait CONNECT cmd
+        # version 5, connect, reserved, domain type
+        expected_connect = '\x05\x01\x00\x03'
+        expected_connect += chr(40) # len (SHA-1)
+        expected_connect += hashed_domain
+        expected_connect += '\x00\x00' # port
+        assert event.data == expected_connect
+
+        # send CONNECT reply
+        # version 5, ok, reserved, domain type
+        connect_reply = '\x05\x00\x00\x03'
+        # I'm Pidgin, why should I respect SOCKS5 XEP?
+        domain = '127.0.0.1'
+        connect_reply += chr(len(domain))
+        connect_reply += domain
+        connect_reply += '\x00\x00' # port
+        transport.write(connect_reply)
+
+        self.transport = transport
+
 class S5BProtocol(Protocol):
     def connectionMade(self):
         self.factory.event_func(Event('s5b-connected',
@@ -231,6 +344,48 @@ def send_socks5_reply(stream, from_, to, id, stream_used):
 
 ##### XEP-0047: In-Band Bytestreams (IBB) #####
 
+class BytestreamIBB(Bytestream):
+    def __init__(self, stream, q, sid, initiator, target):
+        Bytestream.__init__(self, stream, q, sid, initiator, target)
+
+        self.seq = 0
+
+    def get_ns(self):
+        return ns.IBB
+
+    def open_bytestream(self):
+        # open IBB bytestream
+        send_ibb_open(self.stream, self.initiator, self.target, self.stream_id, 4096)
+
+    def send_data(self, data):
+        send_ibb_msg_data(self.stream, self.initiator, self.target, self.stream_id,
+            self.seq, data)
+        sync_stream(self.q, self.stream)
+
+        self.seq += 1
+
+    def wait_bytestream_open(self):
+        # Wait IBB open iq
+        event = self.q.expect('stream-iq', iq_type='set')
+        sid = parse_ibb_open(event.stanza)
+        assert sid == self.stream_id
+
+        # open IBB bytestream
+        acknowledge_iq(self.stream, event.stanza)
+
+    def get_data(self):
+        # wait for IBB stanzas
+        ibb_event = self.q.expect('stream-message')
+        sid, binary = parse_ibb_msg_data(ibb_event.stanza)
+        assert sid == self.stream_id
+        return binary
+
+    def wait_bytestream_closed(self):
+        close_event = self.q.expect('stream-iq', iq_type='set', query_name='close', query_ns=ns.IBB)
+
+        # sender finish to send the file and so close the bytestream
+        acknowledge_iq(self.stream, close_event.stanza)
+
 def send_ibb_open(stream, from_, to, sid, block_size):
     iq = IQ(stream, 'set')
     iq['to'] = to
-- 
1.5.6.5




More information about the telepathy-commits mailing list