[Telepathy-commits] [telepathy-gabble/master] bytestream.py: change SOCKS5 helper function as BytestreamS5B private methods

Guillaume Desmottes guillaume.desmottes at collabora.co.uk
Wed Feb 25 04:33:02 PST 2009


---
 tests/twisted/bytestream.py |  203 +++++++++++++++++++++----------------------
 1 files changed, 100 insertions(+), 103 deletions(-)

diff --git a/tests/twisted/bytestream.py b/tests/twisted/bytestream.py
index 8d3e5b3..16157a9 100644
--- a/tests/twisted/bytestream.py
+++ b/tests/twisted/bytestream.py
@@ -110,18 +110,61 @@ class BytestreamS5B(Bytestream):
     def get_ns(self):
         return ns.BYTESTREAMS
 
+    def _send_socks5_init(self, hosts):
+        iq = IQ(self.stream, 'set')
+        iq['to'] = self.target
+        iq['from'] = self.initiator
+        query = iq.addElement((ns.BYTESTREAMS, 'query'))
+        query['sid'] = self.stream_id
+        query['mode'] = 'tcp'
+        for jid, host, port in hosts:
+            streamhost = query.addElement('streamhost')
+            streamhost['jid'] = jid
+            streamhost['host'] = host
+            streamhost['port'] = str(port)
+        self.stream.send(iq)
+
+    def _socks5_expect_connection(self, q, sid, initiator, target):
+        event = q.expect('s5b-data-received')
+        assert event.data == '\x05\x01\x00' # version 5, 1 auth method, no auth
+        transport = event.transport
+        transport.write('\x05\x00') # version 5, no auth
+        event = q.expect('s5b-data-received')
+        # version 5, connect, reserved, domain type
+        expected_connect = '\x05\x01\x00\x03'
+        expected_connect += chr(40) # len (SHA-1)
+        # sha-1(sid + initiator + target)
+        unhashed_domain = sid + initiator + target
+        expected_connect += sha.new(unhashed_domain).hexdigest()
+        expected_connect += '\x00\x00' # port
+        assert event.data == expected_connect
+
+        transport.write('\x05\x00') #version 5, ok
+
+        return transport
+
+    def _listen_socks5(self):
+        for port in range(5000,5100):
+            try:
+                reactor.listenTCP(port, S5BFactory(self.q.append))
+            except CannotListenError:
+                continue
+            else:
+                return port
+
+        assert False, "Can't find a free port"
+
     def open_bytestream(self, expected=None):
-        port = listen_socks5(self.q)
+        port = self._listen_socks5()
 
-        send_socks5_init(self.stream, self.initiator, self.target,
-            self.stream_id, 'tcp', [(self.initiator, '127.0.0.1', port)])
+        self._send_socks5_init([(self.initiator, '127.0.0.1', port)])
 
         if expected is not None:
             event = self.q.expect_many(expected)[0]
         else:
             event = None
 
-        self.transport = socks5_expect_connection(self.q, self.stream_id,
+        self.transport = self._socks5_expect_connection(self.q, self.stream_id,
             self.initiator, self.target)
 
         return event
@@ -129,8 +172,56 @@ class BytestreamS5B(Bytestream):
     def send_data(self, data):
         self.transport.write(data)
 
+    def _expect_socks5_init(self):
+        event = self.q.expect('stream-iq', iq_type='set')
+        iq = event.stanza
+        query = xpath.queryForNodes('/iq/query', iq)[0]
+        assert query.uri == ns.BYTESTREAMS
+
+        mode = query['mode']
+        sid = query['sid']
+        hosts = []
+
+        for streamhost in xpath.queryForNodes('/query/streamhost', query):
+            hosts.append((streamhost['jid'], streamhost['host'], int(streamhost['port'])))
+        return iq['id'], mode, sid, hosts
+
+    def _socks5_connect(self, host, port):
+        reactor.connectTCP(host, port, S5BFactory(self.q.append))
+
+        event = self.q.expect('s5b-connected')
+        transport = event.transport
+        transport.write('\x05\x01\x00') #version 5, 1 auth method, no auth
+
+        event = self.q.expect('s5b-data-received')
+        event.data == '\x05\x00' # version 5, no auth
+
+        # version 5, connect, reserved, domain type
+        connect = '\x05\x01\x00\x03'
+        connect += chr(40) # len (SHA-1)
+        # sha-1(sid + initiator + target)
+        unhashed_domain = self.stream_id + self.initiator + self.target
+        connect += sha.new(unhashed_domain).hexdigest()
+        connect += '\x00\x00' # port
+        transport.write(connect)
+
+        event = self.q.expect('s5b-data-received')
+        event.data == '\x05\x00' # version 5, ok
+
+        return transport
+
+    def _send_socks5_reply(self, id, stream_used):
+        result = IQ(self.stream, 'result')
+        result['id'] = id
+        result['from'] = self.target
+        result['to'] = self.initiator
+        query = result.addElement((ns.BYTESTREAMS, 'query'))
+        streamhost_used = query.addElement((None, 'streamhost-used'))
+        streamhost_used['jid'] = stream_used
+        result.send()
+
     def wait_bytestream_open(self):
-        id, mode, sid, hosts = expect_socks5_init(self.q)
+        id, mode, sid, hosts = self._expect_socks5_init()
 
         for jid, host, port in hosts:
             pass
@@ -141,10 +232,9 @@ class BytestreamS5B(Bytestream):
         assert sid == self.stream_id
         jid, host, port = hosts[0]
 
-        self.transport = socks5_connect(self.q, host, port, sid, self.initiator,
-            self.target)
+        self.transport = self._socks5_connect(host, port)
 
-        send_socks5_reply(self.stream, self.target, self.initiator, id, jid)
+        self._send_socks5_reply(id, jid)
 
     def get_data(self):
        e = self.q.expect('s5b-data-received', transport=self.transport)
@@ -156,10 +246,9 @@ class BytestreamS5B(Bytestream):
 class BytestreamS5BPidgin(BytestreamS5B):
     """Simulate buggy S5B implementation (as Pidgin's one)"""
     def open_bytestream(self, expected=None):
-        port = listen_socks5(self.q)
+        port = self._listen_socks5()
 
-        send_socks5_init(self.stream, self.initiator, self.target,
-            self.stream_id, 'tcp', [(self.initiator, '127.0.0.1', port)])
+        self._send_socks5_init([(self.initiator, '127.0.0.1', port)])
 
         # FIXME: we should share lot of code with socks5_expect_connection
         # once we'll have refactored Bytestream test objects
@@ -229,88 +318,6 @@ class S5BFactory(Factory):
     def clientConnectionFailed(self, connector, reason):
         self.event_func(Event('s5b-connection-failed', reason=reason))
 
-def socks5_expect_connection(q, sid, initiator, target):
-    event = q.expect('s5b-data-received')
-    assert event.data == '\x05\x01\x00' # version 5, 1 auth method, no auth
-    transport = event.transport
-    transport.write('\x05\x00') # version 5, no auth
-    event = q.expect('s5b-data-received')
-    # version 5, connect, reserved, domain type
-    expected_connect = '\x05\x01\x00\x03'
-    expected_connect += chr(40) # len (SHA-1)
-    # sha-1(sid + initiator + target)
-    unhashed_domain = sid + initiator + target
-    expected_connect += sha.new(unhashed_domain).hexdigest()
-    expected_connect += '\x00\x00' # port
-    assert event.data == expected_connect
-
-    transport.write('\x05\x00') #version 5, ok
-
-    return transport
-
-def socks5_connect(q, host, port, sid,  initiator, target):
-    reactor.connectTCP(host, port, S5BFactory(q.append))
-
-    event = q.expect('s5b-connected')
-    transport = event.transport
-    transport.write('\x05\x01\x00') #version 5, 1 auth method, no auth
-
-    event = q.expect('s5b-data-received')
-    event.data == '\x05\x00' # version 5, no auth
-
-    # version 5, connect, reserved, domain type
-    connect = '\x05\x01\x00\x03'
-    connect += chr(40) # len (SHA-1)
-    # sha-1(sid + initiator + target)
-    unhashed_domain = sid + initiator + target
-    connect += sha.new(unhashed_domain).hexdigest()
-    connect += '\x00\x00' # port
-    transport.write(connect)
-
-    event = q.expect('s5b-data-received')
-    event.data == '\x05\x00' # version 5, ok
-
-    return transport
-
-def listen_socks5(q):
-    for port in range(5000,5100):
-        try:
-            reactor.listenTCP(port, S5BFactory(q.append))
-        except CannotListenError:
-            continue
-        else:
-            return port
-
-    assert False, "Can't find a free port"
-
-def send_socks5_init(stream, from_, to, sid, mode, hosts):
-    iq = IQ(stream, 'set')
-    iq['to'] = to
-    iq['from'] = from_
-    query = iq.addElement((ns.BYTESTREAMS, 'query'))
-    query['sid'] = sid
-    query['mode'] = mode
-    for jid, host, port in hosts:
-        streamhost = query.addElement('streamhost')
-        streamhost['jid'] = jid
-        streamhost['host'] = host
-        streamhost['port'] = str(port)
-    stream.send(iq)
-
-def expect_socks5_init(q):
-    event = q.expect('stream-iq', iq_type='set')
-    iq = event.stanza
-    query = xpath.queryForNodes('/iq/query', iq)[0]
-    assert query.uri == ns.BYTESTREAMS
-
-    mode = query['mode']
-    sid = query['sid']
-    hosts = []
-
-    for streamhost in xpath.queryForNodes('/query/streamhost', query):
-        hosts.append((streamhost['jid'], streamhost['host'], int(streamhost['port'])))
-    return iq['id'], mode, sid, hosts
-
 def expect_socks5_reply(q):
     event = q.expect('stream-iq', iq_type='result')
     iq = event.stanza
@@ -319,16 +326,6 @@ def expect_socks5_reply(q):
     streamhost_used = xpath.queryForNodes('/query/streamhost-used', query)[0]
     return streamhost_used
 
-def send_socks5_reply(stream, from_, to, id, stream_used):
-    result = IQ(stream, 'result')
-    result['id'] = id
-    result['from'] = from_
-    result['to'] = to
-    query = result.addElement((ns.BYTESTREAMS, 'query'))
-    streamhost_used = query.addElement((None, 'streamhost-used'))
-    streamhost_used['jid'] = stream_used
-    result.send()
-
 ##### XEP-0047: In-Band Bytestreams (IBB) #####
 
 class BytestreamIBB(Bytestream):
-- 
1.5.6.5




More information about the telepathy-commits mailing list