[Telepathy-commits] [telepathy-gabble/master] bytestream.py: add Bytestream objects
Guillaume Desmottes
guillaume.desmottes at collabora.co.uk
Wed Feb 25 03:05:12 PST 2009
---
tests/twisted/bytestream.py | 155 +++++++++++++++++++++++++++++++++++++++++++
1 files changed, 155 insertions(+), 0 deletions(-)
diff --git a/tests/twisted/bytestream.py b/tests/twisted/bytestream.py
index 1d2d454..48d4d56 100644
--- a/tests/twisted/bytestream.py
+++ b/tests/twisted/bytestream.py
@@ -8,8 +8,36 @@ from twisted.words.xish import xpath, domish
from twisted.internet.error import CannotListenError
from servicetest import Event
+from gabbletest import acknowledge_iq, sync_stream
import ns
+class Bytestream(object):
+ def __init__(self, stream, q, sid, initiator, target):
+ self.stream = stream
+ self.q = q
+
+ self.stream_id = sid
+ self.initiator = initiator
+ self.target = target
+
+ def open_bytestream(self):
+ raise NotImplemented
+
+ def send_data(self, data):
+ raise NotImplemented
+
+ def get_ns(self):
+ raise NotImplemented
+
+ def wait_bytestream_open(self):
+ raise NotImplemented
+
+ def get_data(self):
+ raise NotImplemented
+
+ def wait_bytestream_closed(self):
+ raise NotImplemented
+
##### XEP-0095: Stream Initiation #####
def create_si_offer(stream, from_, to, sid, profile, bytestreams):
@@ -78,6 +106,91 @@ def parse_si_reply(iq):
##### XEP-0065: SOCKS5 Bytestreams #####
+class BytestreamS5B(Bytestream):
+ def get_ns(self):
+ return ns.BYTESTREAMS
+
+ def open_bytestream(self):
+ port = listen_socks5(self.q)
+
+ send_socks5_init(self.stream, self.initiator, self.target,
+ self.stream_id, 'tcp', [(self.initiator, '127.0.0.1', port)])
+
+ self.transport = socks5_expect_connection(self.q, self.stream_id,
+ self.initiator, self.target)
+
+ def send_data(self, data):
+ self.transport.write(data)
+
+ def wait_bytestream_open(self):
+ id, mode, sid, hosts = expect_socks5_init(self.q)
+
+ for jid, host, port in hosts:
+ assert jid == self.initiator, jid
+
+ assert mode == 'tcp'
+ assert sid == self.stream_id
+ jid, host, port = hosts[0]
+
+ self.transport = socks5_connect(self.q, host, port, sid, self.initiator,
+ self.target)
+
+ send_socks5_reply(self.stream, self.target, self.initiator, id, jid)
+
+ def get_data(self):
+ e = self.q.expect('s5b-data-received', transport=self.transport)
+ return e.data
+
+ def wait_bytestream_closed(self):
+ self.q.expect('s5b-connection-lost')
+
+class BytestreamS5BPidgin(BytestreamS5B):
+ """Simulate buggy S5B implementation (as Pidgin's one)"""
+ def open_bytestream(self):
+ port = listen_socks5(self.q)
+
+ send_socks5_init(self.stream, self.initiator, self.target,
+ self.stream_id, 'tcp', [(self.initiator, '127.0.0.1', port)])
+
+ # FIXME: we should share lot of code with socks5_expect_connection
+ # once we'll have refactored Bytestream test objects
+ sid = self.stream_id
+ initiator = self.initiator
+ target = self.target
+
+ # wait auth
+ event = self.q.expect('s5b-data-received')
+ assert event.data == '\x05\x01\x00' # version 5, 1 auth method, no auth
+
+ # send auth reply
+ transport = event.transport
+ transport.write('\x05\x00') # version 5, no auth
+ event = self.q.expect('s5b-data-received')
+
+ # sha-1(sid + initiator + target)
+ unhashed_domain = sid + initiator + target
+ hashed_domain = sha.new(unhashed_domain).hexdigest()
+
+ # wait CONNECT cmd
+ # version 5, connect, reserved, domain type
+ expected_connect = '\x05\x01\x00\x03'
+ expected_connect += chr(40) # len (SHA-1)
+ expected_connect += hashed_domain
+ expected_connect += '\x00\x00' # port
+ assert event.data == expected_connect
+
+ # send CONNECT reply
+ # version 5, ok, reserved, domain type
+ connect_reply = '\x05\x00\x00\x03'
+ # I'm Pidgin, why should I respect SOCKS5 XEP?
+ domain = '127.0.0.1'
+ connect_reply += chr(len(domain))
+ connect_reply += domain
+ connect_reply += '\x00\x00' # port
+ transport.write(connect_reply)
+
+ self.transport = transport
+
class S5BProtocol(Protocol):
def connectionMade(self):
self.factory.event_func(Event('s5b-connected',
@@ -209,6 +322,48 @@ def send_socks5_reply(stream, from_, to, id, stream_used):
##### XEP-0047: In-Band Bytestreams (IBB) #####
+class BytestreamIBB(Bytestream):
+ def __init__(self, stream, q, sid, initiator, target):
+ Bytestream.__init__(self, stream, q, sid, initiator, target)
+
+ self.seq = 0
+
+ def get_ns(self):
+ return ns.IBB
+
+ def open_bytestream(self):
+ # open IBB bytestream
+ send_ibb_open(self.stream, self.initiator, self.target, self.stream_id, 4096)
+
+ def send_data(self, data):
+ send_ibb_msg_data(self.stream, self.initiator, self.target, self.stream_id,
+ self.seq, data)
+ sync_stream(self.q, self.stream)
+
+ self.seq += 1
+
+ def wait_bytestream_open(self):
+ # Wait IBB open iq
+ event = self.q.expect('stream-iq', iq_type='set')
+ sid = parse_ibb_open(event.stanza)
+ assert sid == self.stream_id
+
+ # open IBB bytestream
+ acknowledge_iq(self.stream, event.stanza)
+
+ def get_data(self):
+ # wait for IBB stanzas
+ ibb_event = self.q.expect('stream-message')
+ sid, binary = parse_ibb_msg_data(ibb_event.stanza)
+ assert sid == self.stream_id
+ return binary
+
+ def wait_bytestream_closed(self):
+ close_event = self.q.expect('stream-iq', iq_type='set', query_name='close', query_ns=ns.IBB)
+
+ # sender finish to send the file and so close the bytestream
+ acknowledge_iq(self.stream, close_event.stanza)
+
def send_ibb_open(stream, from_, to, sid, block_size):
iq = IQ(stream, 'set')
iq['to'] = to
--
1.5.6.5
More information about the telepathy-commits
mailing list