[Telepathy-commits] [telepathy-salut/master] test-two-muc-stream-tubes: use constants.py and check if the tubes are properly closed
Guillaume Desmottes
guillaume.desmottes at collabora.co.uk
Wed Mar 11 09:15:20 PDT 2009
---
tests/twisted/avahi/test-two-muc-stream-tubes.py | 31 ++++++++-------------
1 files changed, 12 insertions(+), 19 deletions(-)
diff --git a/tests/twisted/avahi/test-two-muc-stream-tubes.py b/tests/twisted/avahi/test-two-muc-stream-tubes.py
index 54e600f..bb7f249 100644
--- a/tests/twisted/avahi/test-two-muc-stream-tubes.py
+++ b/tests/twisted/avahi/test-two-muc-stream-tubes.py
@@ -13,20 +13,7 @@ from servicetest import make_channel_proxy, Event
from twisted.words.xish import xpath, domish
from twisted.internet.protocol import Factory, Protocol, ClientCreator
from twisted.internet import reactor
-
-CHANNEL_TYPE_TUBES = "org.freedesktop.Telepathy.Channel.Type.Tubes"
-CHANNEL_TYPE_TEXT = "org.freedesktop.Telepathy.Channel.Type.Text"
-HT_CONTACT = 1
-HT_ROOM = 2
-HT_CONTACT_LIST = 3
-TEXT_MESSAGE_TYPE_NORMAL = dbus.UInt32(0)
-SOCKET_ADDRESS_TYPE_UNIX = dbus.UInt32(0)
-SOCKET_ADDRESS_TYPE_IPV4 = dbus.UInt32(2)
-SOCKET_ACCESS_CONTROL_LOCALHOST = dbus.UInt32(0)
-
-TUBE_STATE_LOCAL_PENDING = 0
-TUBE_STATE_REMOTE_PENDING = 1
-TUBE_STATE_OPEN = 2
+from constants import *
sample_parameters = dbus.Dictionary({
's': 'hello',
@@ -195,7 +182,7 @@ def test(q, bus, conn):
q.expect('dbus-signal', signal='MembersChanged', path=path,
args=['', [conn2_self_handle], [], [], [], conn2_self_handle, 0])
- # first connection: offer a muc stream tube
+ # first connection: offer a muc stream tube (old API)
tubes1_path = conn.RequestChannel(CHANNEL_TYPE_TUBES, HT_ROOM, muc_handle1, True)
contact1_tubes_channel = make_channel_proxy(conn, tubes1_path, "Channel.Type.Tubes")
@@ -212,7 +199,7 @@ def test(q, bus, conn):
assert tube[2] == 1 # type = stream tube
assert tube[3] == 'http' # service
assert tube[4] == sample_parameters # paramaters
- assert tube[5] == TUBE_STATE_OPEN
+ assert tube[5] == TUBE_CHANNEL_STATE_OPEN
contact2_channeltype = None
while contact2_channeltype == None:
@@ -234,14 +221,14 @@ def test(q, bus, conn):
assert contact2_tube[4] is not None # parameters
assert contact2_tube[5] == 0, contact2_tube[5] # status = local pending
- # second connection: accept the tube
+ # second connection: accept the tube (old API)
unix_socket_adr = contact2_tubes_channel.AcceptStreamTube(
contact2_tube[0], 0, 0, '', byte_arrays=True)
e = q.expect('dbus-signal', signal='TubeStateChanged', path=tubes2_path)
id, state = e.args
assert id == conn2_tube_id
- assert state == TUBE_STATE_OPEN
+ assert state == TUBE_CHANNEL_STATE_OPEN
client = ClientCreator(reactor, ClientGreeter)
client.connectUNIX(unix_socket_adr).addCallback(client_connected_cb)
@@ -269,8 +256,14 @@ def test(q, bus, conn):
e = q.expect('client-data-received')
assert e.data == string.swapcase(test_string)
- # Close the tube propertly
+ # contact1 closes the tube
contact1_tubes_channel.CloseTube(conn1_tube_id)
+ q.expect('dbus-signal', signal='TubeClosed', args=[conn1_tube_id])
+
+ # contact2 closes the tube
+ contact2_tubes_channel.CloseTube(conn2_tube_id)
+ q.expect('dbus-signal', signal='TubeClosed', args=[conn2_tube_id])
+
conn.Disconnect()
conn2.Disconnect()
--
1.5.6.5
More information about the telepathy-commits
mailing list