[telepathy-gabble/master] file_transfer_helper.py: move bytestreams at the end of the file

Guillaume Desmottes guillaume.desmottes at collabora.co.uk
Fri Apr 3 09:25:54 PDT 2009


---
 .../twisted/file-transfer/file_transfer_helper.py  |  232 ++++++++++----------
 1 files changed, 116 insertions(+), 116 deletions(-)

diff --git a/tests/twisted/file-transfer/file_transfer_helper.py b/tests/twisted/file-transfer/file_transfer_helper.py
index 7f48ebd..15d1f5f 100644
--- a/tests/twisted/file-transfer/file_transfer_helper.py
+++ b/tests/twisted/file-transfer/file_transfer_helper.py
@@ -280,122 +280,6 @@ class ReceiveFileTest(FileTransferTest):
         assert state == FT_STATE_COMPLETED
         assert reason == FT_STATE_CHANGE_REASON_NONE
 
-class Bytestream(object):
-    def __init__(self, stream, q):
-        self.stream = stream
-        self.q = q
-
-        self.stream_id = None
-
-    def open_bytestream(self, from_, to_):
-        # Open the bytestream and return the InitialOffsetDefined and
-        # FileTransferStateChanged events
-        raise NotImplemented
-
-    def send_data(self, from_, to, data):
-        raise NotImplemented
-
-    def get_ns(self):
-        raise NotImplemented
-
-    def wait_bytestream_open(self):
-        raise NotImplemented
-
-    def get_file(self, ft_channel, size):
-        raise NotImplemented
-
-class BytestreamIBB(Bytestream):
-    def __init__(self, stream, q):
-        Bytestream.__init__(self, stream, q)
-
-        self.seq = 0
-
-    def get_ns(self):
-        return ns.IBB
-
-    def open_bytestream(self, from_, to):
-        # open IBB bytestream
-        send_ibb_open(self.stream, from_, to, 'alpha', 4096)
-
-        _, offset_event, state_event = self.q.expect_many(
-            EventPattern('stream-iq', iq_type='result'),
-            EventPattern('dbus-signal', signal='InitialOffsetDefined'),
-            EventPattern('dbus-signal', signal='FileTransferStateChanged'))
-
-        return offset_event, state_event
-
-    def send_data(self, from_, to, data):
-        send_ibb_msg_data(self.stream, from_, to, 'alpha', self.seq, data)
-        sync_stream(self.q, self.stream)
-
-        self.seq += 1
-
-    def wait_bytestream_open(self):
-        # Wait IBB open iq
-        event = self.q.expect('stream-iq', iq_type='set')
-        sid = parse_ibb_open(event.stanza)
-        assert sid == self.stream_id
-
-        # open IBB bytestream
-        acknowledge_iq(self.stream, event.stanza)
-
-    def get_file(self, ft_channel, size):
-        # FIXME: try to share more code with parent class
-        data = ''
-        self.count = 0
-
-        def bytes_changed_cb(bytes):
-            self.count = bytes
-
-        ft_channel.connect_to_signal('TransferredBytesChanged', bytes_changed_cb)
-
-        # wait for IBB stanzas
-        while len(data) < size:
-            ibb_event = self.q.expect('stream-message')
-            sid, binary = parse_ibb_msg_data(ibb_event.stanza)
-            assert sid == self.stream_id
-            data += binary
-
-        # The bytes transferred has been announced using
-        # TransferredBytesChanged
-        assert self.count == size
-
-        # FileTransferStateChanged could have already been fired
-        e, close_event = self.q.expect_many(
-            EventPattern('dbus-signal', signal='FileTransferStateChanged'),
-            EventPattern('stream-iq', iq_type='set', query_name='close', query_ns=ns.IBB))
-
-        state, reason = e.args
-        assert state == FT_STATE_COMPLETED
-        assert reason == FT_STATE_CHANGE_REASON_NONE
-
-        # sender finish to send the file and so close the bytestream
-        acknowledge_iq(self.stream, close_event.stanza)
-
-        return data
-
-class BytestreamS5B(Bytestream):
-    def get_ns(self):
-        return ns.BYTESTREAMS
-
-    def open_bytestream(self, from_, to):
-        port = listen_socks5(self.q)
-
-        send_socks5_init(self.stream, from_, to,
-            'alpha', 'tcp', [(from_, '127.0.0.1', port)])
-
-        self.transport = socks5_expect_connection(self.q, 'alpha',
-            from_, 'test at localhost/Resource')
-
-        offset_event, state_event = self.q.expect_many(
-            EventPattern('dbus-signal', signal='InitialOffsetDefined'),
-            EventPattern('dbus-signal', signal='FileTransferStateChanged'))
-
-        return offset_event, state_event
-
-    def send_data(self, from_, to, data):
-        self.transport.write(data)
-
 class SendFileTest(FileTransferTest):
     def __init__(self, bytestream_cls):
         FileTransferTest.__init__(self, bytestream_cls)
@@ -511,3 +395,119 @@ class SendFileTest(FileTransferTest):
         data = self.bytestream.get_file(self.ft_channel, self.file.size)
 
         assert data == self.file.data
+
+class Bytestream(object):
+    def __init__(self, stream, q):
+        self.stream = stream
+        self.q = q
+
+        self.stream_id = None
+
+    def open_bytestream(self, from_, to_):
+        # Open the bytestream and return the InitialOffsetDefined and
+        # FileTransferStateChanged events
+        raise NotImplemented
+
+    def send_data(self, from_, to, data):
+        raise NotImplemented
+
+    def get_ns(self):
+        raise NotImplemented
+
+    def wait_bytestream_open(self):
+        raise NotImplemented
+
+    def get_file(self, ft_channel, size):
+        raise NotImplemented
+
+class BytestreamIBB(Bytestream):
+    def __init__(self, stream, q):
+        Bytestream.__init__(self, stream, q)
+
+        self.seq = 0
+
+    def get_ns(self):
+        return ns.IBB
+
+    def open_bytestream(self, from_, to):
+        # open IBB bytestream
+        send_ibb_open(self.stream, from_, to, 'alpha', 4096)
+
+        _, offset_event, state_event = self.q.expect_many(
+            EventPattern('stream-iq', iq_type='result'),
+            EventPattern('dbus-signal', signal='InitialOffsetDefined'),
+            EventPattern('dbus-signal', signal='FileTransferStateChanged'))
+
+        return offset_event, state_event
+
+    def send_data(self, from_, to, data):
+        send_ibb_msg_data(self.stream, from_, to, 'alpha', self.seq, data)
+        sync_stream(self.q, self.stream)
+
+        self.seq += 1
+
+    def wait_bytestream_open(self):
+        # Wait IBB open iq
+        event = self.q.expect('stream-iq', iq_type='set')
+        sid = parse_ibb_open(event.stanza)
+        assert sid == self.stream_id
+
+        # open IBB bytestream
+        acknowledge_iq(self.stream, event.stanza)
+
+    def get_file(self, ft_channel, size):
+        # FIXME: try to share more code with parent class
+        data = ''
+        self.count = 0
+
+        def bytes_changed_cb(bytes):
+            self.count = bytes
+
+        ft_channel.connect_to_signal('TransferredBytesChanged', bytes_changed_cb)
+
+        # wait for IBB stanzas
+        while len(data) < size:
+            ibb_event = self.q.expect('stream-message')
+            sid, binary = parse_ibb_msg_data(ibb_event.stanza)
+            assert sid == self.stream_id
+            data += binary
+
+        # The bytes transferred has been announced using
+        # TransferredBytesChanged
+        assert self.count == size
+
+        # FileTransferStateChanged could have already been fired
+        e, close_event = self.q.expect_many(
+            EventPattern('dbus-signal', signal='FileTransferStateChanged'),
+            EventPattern('stream-iq', iq_type='set', query_name='close', query_ns=ns.IBB))
+
+        state, reason = e.args
+        assert state == FT_STATE_COMPLETED
+        assert reason == FT_STATE_CHANGE_REASON_NONE
+
+        # sender finish to send the file and so close the bytestream
+        acknowledge_iq(self.stream, close_event.stanza)
+
+        return data
+
+class BytestreamS5B(Bytestream):
+    def get_ns(self):
+        return ns.BYTESTREAMS
+
+    def open_bytestream(self, from_, to):
+        port = listen_socks5(self.q)
+
+        send_socks5_init(self.stream, from_, to,
+            'alpha', 'tcp', [(from_, '127.0.0.1', port)])
+
+        self.transport = socks5_expect_connection(self.q, 'alpha',
+            from_, 'test at localhost/Resource')
+
+        offset_event, state_event = self.q.expect_many(
+            EventPattern('dbus-signal', signal='InitialOffsetDefined'),
+            EventPattern('dbus-signal', signal='FileTransferStateChanged'))
+
+        return offset_event, state_event
+
+    def send_data(self, from_, to, data):
+        self.transport.write(data)
-- 
1.5.6.5




More information about the telepathy-commits mailing list