[Telepathy-commits] [telepathy-gabble/master] bytestream.py: change SOCKS5 helper function as BytestreamS5B private methods
Guillaume Desmottes
guillaume.desmottes at collabora.co.uk
Wed Feb 25 04:33:02 PST 2009
---
tests/twisted/bytestream.py | 203 +++++++++++++++++++++----------------------
1 files changed, 100 insertions(+), 103 deletions(-)
diff --git a/tests/twisted/bytestream.py b/tests/twisted/bytestream.py
index 8d3e5b3..16157a9 100644
--- a/tests/twisted/bytestream.py
+++ b/tests/twisted/bytestream.py
@@ -110,18 +110,61 @@ class BytestreamS5B(Bytestream):
def get_ns(self):
return ns.BYTESTREAMS
+ def _send_socks5_init(self, hosts):
+ iq = IQ(self.stream, 'set')
+ iq['to'] = self.target
+ iq['from'] = self.initiator
+ query = iq.addElement((ns.BYTESTREAMS, 'query'))
+ query['sid'] = self.stream_id
+ query['mode'] = 'tcp'
+ for jid, host, port in hosts:
+ streamhost = query.addElement('streamhost')
+ streamhost['jid'] = jid
+ streamhost['host'] = host
+ streamhost['port'] = str(port)
+ self.stream.send(iq)
+
+ def _socks5_expect_connection(self, q, sid, initiator, target):
+ event = q.expect('s5b-data-received')
+ assert event.data == '\x05\x01\x00' # version 5, 1 auth method, no auth
+ transport = event.transport
+ transport.write('\x05\x00') # version 5, no auth
+ event = q.expect('s5b-data-received')
+ # version 5, connect, reserved, domain type
+ expected_connect = '\x05\x01\x00\x03'
+ expected_connect += chr(40) # len (SHA-1)
+ # sha-1(sid + initiator + target)
+ unhashed_domain = sid + initiator + target
+ expected_connect += sha.new(unhashed_domain).hexdigest()
+ expected_connect += '\x00\x00' # port
+ assert event.data == expected_connect
+
+ transport.write('\x05\x00') #version 5, ok
+
+ return transport
+
+ def _listen_socks5(self):
+ for port in range(5000,5100):
+ try:
+ reactor.listenTCP(port, S5BFactory(self.q.append))
+ except CannotListenError:
+ continue
+ else:
+ return port
+
+ assert False, "Can't find a free port"
+
def open_bytestream(self, expected=None):
- port = listen_socks5(self.q)
+ port = self._listen_socks5()
- send_socks5_init(self.stream, self.initiator, self.target,
- self.stream_id, 'tcp', [(self.initiator, '127.0.0.1', port)])
+ self._send_socks5_init([(self.initiator, '127.0.0.1', port)])
if expected is not None:
event = self.q.expect_many(expected)[0]
else:
event = None
- self.transport = socks5_expect_connection(self.q, self.stream_id,
+ self.transport = self._socks5_expect_connection(self.q, self.stream_id,
self.initiator, self.target)
return event
@@ -129,8 +172,56 @@ class BytestreamS5B(Bytestream):
def send_data(self, data):
self.transport.write(data)
+ def _expect_socks5_init(self):
+ event = self.q.expect('stream-iq', iq_type='set')
+ iq = event.stanza
+ query = xpath.queryForNodes('/iq/query', iq)[0]
+ assert query.uri == ns.BYTESTREAMS
+
+ mode = query['mode']
+ sid = query['sid']
+ hosts = []
+
+ for streamhost in xpath.queryForNodes('/query/streamhost', query):
+ hosts.append((streamhost['jid'], streamhost['host'], int(streamhost['port'])))
+ return iq['id'], mode, sid, hosts
+
+ def _socks5_connect(self, host, port):
+ reactor.connectTCP(host, port, S5BFactory(self.q.append))
+
+ event = self.q.expect('s5b-connected')
+ transport = event.transport
+ transport.write('\x05\x01\x00') #version 5, 1 auth method, no auth
+
+ event = self.q.expect('s5b-data-received')
+ event.data == '\x05\x00' # version 5, no auth
+
+ # version 5, connect, reserved, domain type
+ connect = '\x05\x01\x00\x03'
+ connect += chr(40) # len (SHA-1)
+ # sha-1(sid + initiator + target)
+ unhashed_domain = self.stream_id + self.initiator + self.target
+ connect += sha.new(unhashed_domain).hexdigest()
+ connect += '\x00\x00' # port
+ transport.write(connect)
+
+ event = self.q.expect('s5b-data-received')
+ event.data == '\x05\x00' # version 5, ok
+
+ return transport
+
+ def _send_socks5_reply(self, id, stream_used):
+ result = IQ(self.stream, 'result')
+ result['id'] = id
+ result['from'] = self.target
+ result['to'] = self.initiator
+ query = result.addElement((ns.BYTESTREAMS, 'query'))
+ streamhost_used = query.addElement((None, 'streamhost-used'))
+ streamhost_used['jid'] = stream_used
+ result.send()
+
def wait_bytestream_open(self):
- id, mode, sid, hosts = expect_socks5_init(self.q)
+ id, mode, sid, hosts = self._expect_socks5_init()
for jid, host, port in hosts:
pass
@@ -141,10 +232,9 @@ class BytestreamS5B(Bytestream):
assert sid == self.stream_id
jid, host, port = hosts[0]
- self.transport = socks5_connect(self.q, host, port, sid, self.initiator,
- self.target)
+ self.transport = self._socks5_connect(host, port)
- send_socks5_reply(self.stream, self.target, self.initiator, id, jid)
+ self._send_socks5_reply(id, jid)
def get_data(self):
e = self.q.expect('s5b-data-received', transport=self.transport)
@@ -156,10 +246,9 @@ class BytestreamS5B(Bytestream):
class BytestreamS5BPidgin(BytestreamS5B):
"""Simulate buggy S5B implementation (as Pidgin's one)"""
def open_bytestream(self, expected=None):
- port = listen_socks5(self.q)
+ port = self._listen_socks5()
- send_socks5_init(self.stream, self.initiator, self.target,
- self.stream_id, 'tcp', [(self.initiator, '127.0.0.1', port)])
+ self._send_socks5_init([(self.initiator, '127.0.0.1', port)])
# FIXME: we should share lot of code with socks5_expect_connection
# once we'll have refactored Bytestream test objects
@@ -229,88 +318,6 @@ class S5BFactory(Factory):
def clientConnectionFailed(self, connector, reason):
self.event_func(Event('s5b-connection-failed', reason=reason))
-def socks5_expect_connection(q, sid, initiator, target):
- event = q.expect('s5b-data-received')
- assert event.data == '\x05\x01\x00' # version 5, 1 auth method, no auth
- transport = event.transport
- transport.write('\x05\x00') # version 5, no auth
- event = q.expect('s5b-data-received')
- # version 5, connect, reserved, domain type
- expected_connect = '\x05\x01\x00\x03'
- expected_connect += chr(40) # len (SHA-1)
- # sha-1(sid + initiator + target)
- unhashed_domain = sid + initiator + target
- expected_connect += sha.new(unhashed_domain).hexdigest()
- expected_connect += '\x00\x00' # port
- assert event.data == expected_connect
-
- transport.write('\x05\x00') #version 5, ok
-
- return transport
-
-def socks5_connect(q, host, port, sid, initiator, target):
- reactor.connectTCP(host, port, S5BFactory(q.append))
-
- event = q.expect('s5b-connected')
- transport = event.transport
- transport.write('\x05\x01\x00') #version 5, 1 auth method, no auth
-
- event = q.expect('s5b-data-received')
- event.data == '\x05\x00' # version 5, no auth
-
- # version 5, connect, reserved, domain type
- connect = '\x05\x01\x00\x03'
- connect += chr(40) # len (SHA-1)
- # sha-1(sid + initiator + target)
- unhashed_domain = sid + initiator + target
- connect += sha.new(unhashed_domain).hexdigest()
- connect += '\x00\x00' # port
- transport.write(connect)
-
- event = q.expect('s5b-data-received')
- event.data == '\x05\x00' # version 5, ok
-
- return transport
-
-def listen_socks5(q):
- for port in range(5000,5100):
- try:
- reactor.listenTCP(port, S5BFactory(q.append))
- except CannotListenError:
- continue
- else:
- return port
-
- assert False, "Can't find a free port"
-
-def send_socks5_init(stream, from_, to, sid, mode, hosts):
- iq = IQ(stream, 'set')
- iq['to'] = to
- iq['from'] = from_
- query = iq.addElement((ns.BYTESTREAMS, 'query'))
- query['sid'] = sid
- query['mode'] = mode
- for jid, host, port in hosts:
- streamhost = query.addElement('streamhost')
- streamhost['jid'] = jid
- streamhost['host'] = host
- streamhost['port'] = str(port)
- stream.send(iq)
-
-def expect_socks5_init(q):
- event = q.expect('stream-iq', iq_type='set')
- iq = event.stanza
- query = xpath.queryForNodes('/iq/query', iq)[0]
- assert query.uri == ns.BYTESTREAMS
-
- mode = query['mode']
- sid = query['sid']
- hosts = []
-
- for streamhost in xpath.queryForNodes('/query/streamhost', query):
- hosts.append((streamhost['jid'], streamhost['host'], int(streamhost['port'])))
- return iq['id'], mode, sid, hosts
-
def expect_socks5_reply(q):
event = q.expect('stream-iq', iq_type='result')
iq = event.stanza
@@ -319,16 +326,6 @@ def expect_socks5_reply(q):
streamhost_used = xpath.queryForNodes('/query/streamhost-used', query)[0]
return streamhost_used
-def send_socks5_reply(stream, from_, to, id, stream_used):
- result = IQ(stream, 'result')
- result['id'] = id
- result['from'] = from_
- result['to'] = to
- query = result.addElement((ns.BYTESTREAMS, 'query'))
- streamhost_used = query.addElement((None, 'streamhost-used'))
- streamhost_used['jid'] = stream_used
- result.send()
-
##### XEP-0047: In-Band Bytestreams (IBB) #####
class BytestreamIBB(Bytestream):
--
1.5.6.5
More information about the telepathy-commits
mailing list