[Telepathy-commits] [telepathy-gabble/master] offer-muc-dbus-tube.py: factor out fire_signal_on_tube
Guillaume Desmottes
guillaume.desmottes at collabora.co.uk
Wed Feb 4 05:20:01 PST 2009
---
tests/twisted/tubes/offer-muc-dbus-tube.py | 62 +++++++++++++++------------
1 files changed, 34 insertions(+), 28 deletions(-)
diff --git a/tests/twisted/tubes/offer-muc-dbus-tube.py b/tests/twisted/tubes/offer-muc-dbus-tube.py
index 9440eba..c8543fc 100644
--- a/tests/twisted/tubes/offer-muc-dbus-tube.py
+++ b/tests/twisted/tubes/offer-muc-dbus-tube.py
@@ -23,6 +23,33 @@ sample_parameters = dbus.Dictionary({
'i': dbus.Int32(-123),
}, signature='sv')
+def fire_signal_on_tube(q, tube, chatroom, dbus_stream_id, my_bus_name):
+ signal = SignalMessage('/', 'foo.bar', 'baz')
+ signal.append(42, signature='u')
+ tube.send_message(signal)
+
+ event = q.expect('stream-message', to=chatroom,
+ message_type='groupchat')
+ message = event.stanza
+
+ data_nodes = xpath.queryForNodes('/message/data[@xmlns="%s"]' % ns.MUC_BYTESTREAM,
+ message)
+ assert data_nodes is not None
+ assert len(data_nodes) == 1
+ ibb_data = data_nodes[0]
+ assert ibb_data['sid'] == dbus_stream_id
+ binary = base64.b64decode(str(ibb_data))
+ # little and big endian versions of: SIGNAL, NO_REPLY, protocol v1,
+ # 4-byte payload
+ assert binary.startswith('l\x04\x01\x01' '\x04\x00\x00\x00') or \
+ binary.startswith('B\x04\x01\x01' '\x00\x00\x00\x04')
+ # little and big endian versions of the 4-byte payload, UInt32(42)
+ assert (binary[0] == 'l' and binary.endswith('\x2a\x00\x00\x00')) or \
+ (binary[0] == 'B' and binary.endswith('\x00\x00\x00\x2a'))
+ # XXX: verify that it's actually in the "sender" slot, rather than just
+ # being in the message somewhere
+ assert my_bus_name in binary
+
def test(q, bus, conn, stream):
conn.Connect()
@@ -143,32 +170,7 @@ def test(q, bus, conn, stream):
dbus_tube_adr = tubes_iface.GetDBusTubeAddress(dbus_tube_id)
tube = Connection(dbus_tube_adr)
- signal = SignalMessage('/', 'foo.bar', 'baz')
- signal.append(42, signature='u')
- tube.send_message(signal)
-
- event = q.expect('stream-message', to='chat at conf.localhost',
- message_type='groupchat')
- message = event.stanza
-
- data_nodes = xpath.queryForNodes('/message/data[@xmlns="%s"]' % ns.MUC_BYTESTREAM,
- message)
- assert data_nodes is not None
- assert len(data_nodes) == 1
- ibb_data = data_nodes[0]
- assert ibb_data['sid'] == dbus_stream_id
- binary = base64.b64decode(str(ibb_data))
- # little and big endian versions of: SIGNAL, NO_REPLY, protocol v1,
- # 4-byte payload
- assert binary.startswith('l\x04\x01\x01' '\x04\x00\x00\x00') or \
- binary.startswith('B\x04\x01\x01' '\x00\x00\x00\x04')
- # little and big endian versions of the 4-byte payload, UInt32(42)
- assert (binary[0] == 'l' and binary.endswith('\x2a\x00\x00\x00')) or \
- (binary[0] == 'B' and binary.endswith('\x00\x00\x00\x2a'))
- # XXX: verify that it's actually in the "sender" slot, rather than just
- # being in the message somewhere
- assert my_bus_name in binary
-
+ fire_signal_on_tube(q, tube, 'chat at conf.localhost', dbus_stream_id, my_bus_name)
# offer a D-Bus tube to another room using new API
requestotron = dbus.Interface(conn, CONN_IFACE_REQUESTS)
@@ -250,7 +252,7 @@ def test(q, bus, conn, stream):
# offer the tube
call_async(q, dbus_tube_iface, 'OfferDBusTube')
- new_tube_event, presence_event, _, status_event, dbus_changed_event = q.expect_many(
+ new_tube_event, presence_event, return_event, status_event, dbus_changed_event = q.expect_many(
EventPattern('dbus-signal', signal='NewTube'),
EventPattern('stream-presence', to='chat2 at conf.localhost/test'),
EventPattern('dbus-return', method='OfferDBusTube'),
@@ -307,6 +309,8 @@ def test(q, bus, conn, stream):
assert added == {tube_self_handle: my_bus_name}
assert removed == []
+ dbus_tube_adr = return_event.value[0]
+
bob_bus_name = ':2.Ym9i'
bob_handle = conn.RequestHandles(HT_CONTACT, ['chat2 at conf.localhost/bob'])[0]
@@ -338,7 +342,9 @@ def test(q, bus, conn, stream):
assert added == {bob_handle: bob_bus_name}
assert removed == []
- # TODO: use tube
+ tube = Connection(dbus_tube_adr)
+ fire_signal_on_tube(q, tube, 'chat2 at conf.localhost', dbus_stream_id, my_bus_name)
+
# TODO: remove bob
chan_iface.Close()
--
1.5.6.5
More information about the telepathy-commits
mailing list