[Telepathy-commits] [telepathy-gabble/master] add BytestreamIBBIQ
Guillaume Desmottes
guillaume.desmottes at collabora.co.uk
Mon Mar 9 08:57:07 PDT 2009
---
tests/twisted/bytestream.py | 14 +++++++++++++-
tests/twisted/tubes/tubetestutil.py | 10 +++++++---
2 files changed, 20 insertions(+), 4 deletions(-)
diff --git a/tests/twisted/bytestream.py b/tests/twisted/bytestream.py
index 7c32292..00ff624 100644
--- a/tests/twisted/bytestream.py
+++ b/tests/twisted/bytestream.py
@@ -1,5 +1,7 @@
import base64
import sha
+import sys
+import random
from twisted.internet.protocol import Factory, Protocol
from twisted.internet import reactor
@@ -8,7 +10,7 @@ from twisted.words.xish import xpath, domish
from twisted.internet.error import CannotListenError
from servicetest import Event, EventPattern
-from gabbletest import acknowledge_iq, sync_stream, make_result_iq
+from gabbletest import acknowledge_iq, sync_stream, make_result_iq, elem, elem_iq
import ns
def wait_events(q, expected, my_event):
@@ -443,6 +445,16 @@ class BytestreamIBBMsg(BytestreamIBB):
assert ibb_data['sid'] == self.stream_id
return str(ibb_data), ibb_data['sid']
+class BytestreamIBBIQ(BytestreamIBB):
+ def _send(self, from_, to, data):
+ id = random.randint(0, sys.maxint)
+
+ iq = elem_iq(self.stream, 'set', to=to, from_=from_, to=to, id=str(id))(
+ elem('data', xmlns=ns.IBB, sid=self.stream_id, seq=str(self.seq))(
+ (unicode(base64.b64encode(data)))))
+
+ self.stream.send(iq)
+
##### SI Fallback (Gabble specific extension) #####
class BytestreamSIFallback(Bytestream):
diff --git a/tests/twisted/tubes/tubetestutil.py b/tests/twisted/tubes/tubetestutil.py
index 4ac075b..31f572d 100644
--- a/tests/twisted/tubes/tubetestutil.py
+++ b/tests/twisted/tubes/tubetestutil.py
@@ -11,7 +11,7 @@ from dbus import PROPERTIES_IFACE
from servicetest import unwrap
from gabbletest import exec_test
from constants import *
-from bytestream import BytestreamIBBMsg, BytestreamS5B, BytestreamSIFallback
+from bytestream import BytestreamIBBMsg, BytestreamS5B, BytestreamSIFallback, BytestreamIBBIQ
from twisted.internet import reactor
from twisted.internet.protocol import Factory, Protocol
@@ -220,15 +220,19 @@ def set_up_echo(name):
return full_path
def exec_tube_test(test):
- def test_ibb(q, bus, conn, stream):
+ def test_ibb_msg(q, bus, conn, stream):
test(q, bus, conn, stream, BytestreamIBBMsg)
+ def test_ibb_iq(q, bus, conn, stream):
+ test(q, bus, conn, stream, BytestreamIBBIQ)
+
def test_socks5(q, bus, conn, stream):
test(q, bus, conn, stream, BytestreamS5B)
def test_si_fallback(q, bus, conn, stream):
test(q, bus, conn, stream, BytestreamSIFallback)
- exec_test(test_ibb)
+ exec_test(test_ibb_msg)
+ exec_test(test_ibb_iq)
exec_test(test_socks5)
exec_test(test_si_fallback)
--
1.5.6.5
More information about the telepathy-commits
mailing list