[Telepathy-commits] [telepathy-gabble/master] use bytestream.py in offer-accept-private-dbus-stream-tube-socks5.py

Guillaume Desmottes guillaume.desmottes at collabora.co.uk
Thu Feb 19 03:38:19 PST 2009


---
 ...offer-accept-private-dbus-stream-tube-socks5.py |  110 +-------------------
 1 files changed, 2 insertions(+), 108 deletions(-)

diff --git a/tests/twisted/tubes/offer-accept-private-dbus-stream-tube-socks5.py b/tests/twisted/tubes/offer-accept-private-dbus-stream-tube-socks5.py
index 277f1c1..86cd856 100644
--- a/tests/twisted/tubes/offer-accept-private-dbus-stream-tube-socks5.py
+++ b/tests/twisted/tubes/offer-accept-private-dbus-stream-tube-socks5.py
@@ -15,6 +15,8 @@ from gabbletest import exec_test, acknowledge_iq, sync_stream
 import constants as cs
 import ns
 import tubetestutil as t
+from bytestream import S5BFactory, socks5_expect_connection, socks5_connect, \
+    send_socks5_init, expect_socks5_init, expect_socks5_reply
 
 from twisted.words.xish import domish, xpath
 from twisted.internet.protocol import Factory, Protocol
@@ -35,114 +37,6 @@ new_sample_parameters = dbus.Dictionary({
     'i': dbus.Int32(-123),
     }, signature='sv')
 
-class S5BProtocol(Protocol):
-    def connectionMade(self):
-        self.factory.event_func(Event('s5b-connected',
-            transport=self.transport))
-
-    def dataReceived(self, data):
-        self.factory.event_func(Event('s5b-data-received', data=data,
-            transport=self.transport))
-
-class S5BFactory(Factory):
-    protocol = S5BProtocol
-
-    def __init__(self, event_func):
-        self.event_func = event_func
-
-    def buildProtocol(self, addr):
-        protocol = Factory.buildProtocol(self, addr)
-        return protocol
-
-    def startedConnecting(self, connector):
-        self.event_func(Event('s5b-started-connecting', connector=connector))
-
-    def clientConnectionLost(self, connector, reason):
-        self.event_func(Event('s5b-connection-lost', connector=connector,
-            reason=reason))
-
-    def clientConnectionFailed(self, connector, reason):
-        self.event_func(Event('s5b-connection-failed', reason=reason))
-
-def socks5_expect_connection(q, sid, initiator, target):
-    event = q.expect('s5b-data-received')
-    assert event.data == '\x05\x01\x00' # version 5, 1 auth method, no auth
-    transport = event.transport
-    transport.write('\x05\x00') # version 5, no auth
-    event = q.expect('s5b-data-received')
-    # version 5, connect, reserved, domain type
-    expected_connect = '\x05\x01\x00\x03'
-    expected_connect += chr(40) # len (SHA-1)
-    # sha-1(sid + initiator + target)
-    unhashed_domain = sid + initiator + target
-    expected_connect += sha.new(unhashed_domain).hexdigest()
-    expected_connect += '\x00\x00' # port
-    assert event.data == expected_connect
-
-    transport.write('\x05\x00') #version 5, ok
-
-    return transport
-
-def socks5_connect(q, host, port, sid,  initiator, target):
-    reactor.connectTCP(host, port, S5BFactory(q.append))
-
-    event = q.expect('s5b-connected')
-    transport = event.transport
-    transport.write('\x05\x01\x00') #version 5, 1 auth method, no auth
-
-    event = q.expect('s5b-data-received')
-    event.data == '\x05\x00' # version 5, no auth
-
-    # version 5, connect, reserved, domain type
-    connect = '\x05\x01\x00\x03'
-    connect += chr(40) # len (SHA-1)
-    # sha-1(sid + initiator + target)
-    unhashed_domain = sid + initiator + target
-    connect += sha.new(unhashed_domain).hexdigest()
-    connect += '\x00\x00' # port
-    transport.write(connect)
-
-    event = q.expect('s5b-data-received')
-    event.data == '\x05\x00' # version 5, ok
-
-    return transport
-
-def send_socks5_init(stream, from_, to, sid, mode, hosts):
-    iq = IQ(stream, 'set')
-    iq['to'] = to
-    iq['from'] = from_
-    query = iq.addElement((ns.BYTESTREAMS, 'query'))
-    query['sid'] = sid
-    query['mode'] = mode
-    for jid, host, port in hosts:
-        streamhost = query.addElement('streamhost')
-        streamhost['jid'] = jid
-        streamhost['host'] = host
-        streamhost['port'] = port
-    stream.send(iq)
-
-def expect_socks5_init(q):
-    event = q.expect('stream-iq', iq_type='set')
-    iq = event.stanza
-    query = xpath.queryForNodes('/iq/query', iq)[0]
-    assert query.uri == ns.BYTESTREAMS
-
-    mode = query['mode']
-    sid = query['sid']
-    hosts = []
-
-    for streamhost in xpath.queryForNodes('/query/streamhost', query):
-        hosts.append((streamhost['jid'], streamhost['host'], int(streamhost['port'])))
-    return iq['id'], mode, sid, hosts
-
-def expect_socks5_reply(q):
-    event = q.expect('stream-iq', iq_type='result')
-    iq = event.stanza
-    query = xpath.queryForNodes('/iq/query', iq)[0]
-    assert query.uri == ns.BYTESTREAMS
-    streamhost_used = xpath.queryForNodes('/query/streamhost-used', query)[0]
-    return streamhost_used
-
 def test(q, bus, conn, stream):
     t.set_up_echo("")
     t.set_up_echo("2")
-- 
1.5.6.5




More information about the telepathy-commits mailing list