[Telepathy-commits] [telepathy-gabble/master] add bytestream.py

Guillaume Desmottes guillaume.desmottes at collabora.co.uk
Thu Feb 19 03:37:16 PST 2009


---
 tests/twisted/Makefile.am   |    1 +
 tests/twisted/bytestream.py |  118 +++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 119 insertions(+), 0 deletions(-)
 create mode 100644 tests/twisted/bytestream.py

diff --git a/tests/twisted/Makefile.am b/tests/twisted/Makefile.am
index 5a2d1f0..aadd620 100644
--- a/tests/twisted/Makefile.am
+++ b/tests/twisted/Makefile.am
@@ -141,6 +141,7 @@ BUILT_SOURCES = config.py
 
 EXTRA_DIST = \
 	$(TWISTED_TESTS) \
+	bytestream.py \
 	config.py \
 	constants.py \
 	gabbletest.py \
diff --git a/tests/twisted/bytestream.py b/tests/twisted/bytestream.py
new file mode 100644
index 0000000..98befa9
--- /dev/null
+++ b/tests/twisted/bytestream.py
@@ -0,0 +1,118 @@
+import sha
+
+from twisted.internet.protocol import Factory, Protocol
+from twisted.internet import reactor
+from twisted.words.protocols.jabber.client import IQ
+from twisted.words.xish import xpath
+
+from servicetest import Event
+import ns
+
+
+class S5BProtocol(Protocol):
+    def connectionMade(self):
+        self.factory.event_func(Event('s5b-connected',
+            transport=self.transport))
+
+    def dataReceived(self, data):
+        self.factory.event_func(Event('s5b-data-received', data=data,
+            transport=self.transport))
+
+class S5BFactory(Factory):
+    protocol = S5BProtocol
+
+    def __init__(self, event_func):
+        self.event_func = event_func
+
+    def buildProtocol(self, addr):
+        protocol = Factory.buildProtocol(self, addr)
+        return protocol
+
+    def startedConnecting(self, connector):
+        self.event_func(Event('s5b-started-connecting', connector=connector))
+
+    def clientConnectionLost(self, connector, reason):
+        self.event_func(Event('s5b-connection-lost', connector=connector,
+            reason=reason))
+
+    def clientConnectionFailed(self, connector, reason):
+        self.event_func(Event('s5b-connection-failed', reason=reason))
+
+def socks5_expect_connection(q, sid, initiator, target):
+    event = q.expect('s5b-data-received')
+    assert event.data == '\x05\x01\x00' # version 5, 1 auth method, no auth
+    transport = event.transport
+    transport.write('\x05\x00') # version 5, no auth
+    event = q.expect('s5b-data-received')
+    # version 5, connect, reserved, domain type
+    expected_connect = '\x05\x01\x00\x03'
+    expected_connect += chr(40) # len (SHA-1)
+    # sha-1(sid + initiator + target)
+    unhashed_domain = sid + initiator + target
+    expected_connect += sha.new(unhashed_domain).hexdigest()
+    expected_connect += '\x00\x00' # port
+    assert event.data == expected_connect
+
+    transport.write('\x05\x00') #version 5, ok
+
+    return transport
+
+def socks5_connect(q, host, port, sid,  initiator, target):
+    reactor.connectTCP(host, port, S5BFactory(q.append))
+
+    event = q.expect('s5b-connected')
+    transport = event.transport
+    transport.write('\x05\x01\x00') #version 5, 1 auth method, no auth
+
+    event = q.expect('s5b-data-received')
+    event.data == '\x05\x00' # version 5, no auth
+
+    # version 5, connect, reserved, domain type
+    connect = '\x05\x01\x00\x03'
+    connect += chr(40) # len (SHA-1)
+    # sha-1(sid + initiator + target)
+    unhashed_domain = sid + initiator + target
+    connect += sha.new(unhashed_domain).hexdigest()
+    connect += '\x00\x00' # port
+    transport.write(connect)
+
+    event = q.expect('s5b-data-received')
+    event.data == '\x05\x00' # version 5, ok
+
+    return transport
+
+def send_socks5_init(stream, from_, to, sid, mode, hosts):
+    iq = IQ(stream, 'set')
+    iq['to'] = to
+    iq['from'] = from_
+    query = iq.addElement((ns.BYTESTREAMS, 'query'))
+    query['sid'] = sid
+    query['mode'] = mode
+    for jid, host, port in hosts:
+        streamhost = query.addElement('streamhost')
+        streamhost['jid'] = jid
+        streamhost['host'] = host
+        streamhost['port'] = port
+    stream.send(iq)
+
+def expect_socks5_init(q):
+    event = q.expect('stream-iq', iq_type='set')
+    iq = event.stanza
+    query = xpath.queryForNodes('/iq/query', iq)[0]
+    assert query.uri == ns.BYTESTREAMS
+
+    mode = query['mode']
+    sid = query['sid']
+    hosts = []
+
+    for streamhost in xpath.queryForNodes('/query/streamhost', query):
+        hosts.append((streamhost['jid'], streamhost['host'], int(streamhost['port'])))
+    return iq['id'], mode, sid, hosts
+
+def expect_socks5_reply(q):
+    event = q.expect('stream-iq', iq_type='result')
+    iq = event.stanza
+    query = xpath.queryForNodes('/iq/query', iq)[0]
+    assert query.uri == ns.BYTESTREAMS
+    streamhost_used = xpath.queryForNodes('/query/streamhost-used', query)[0]
+    return streamhost_used
-- 
1.5.6.5




More information about the telepathy-commits mailing list