[telepathy-gabble/master] Tidy up test-outgoing-call-ensure
Will Thompson
will.thompson at collabora.co.uk
Tue Apr 21 10:40:08 PDT 2009
---
tests/twisted/jingle/test-outgoing-call-ensure.py | 123 ++++++++-------------
1 files changed, 47 insertions(+), 76 deletions(-)
diff --git a/tests/twisted/jingle/test-outgoing-call-ensure.py b/tests/twisted/jingle/test-outgoing-call-ensure.py
index 8a5b0a3..9a330f0 100644
--- a/tests/twisted/jingle/test-outgoing-call-ensure.py
+++ b/tests/twisted/jingle/test-outgoing-call-ensure.py
@@ -1,46 +1,25 @@
-
"""
Test making outgoing calls using EnsureChannel, and retrieving existing calls
using EnsureChannel.
"""
-from gabbletest import exec_test, sync_stream
-from servicetest import make_channel_proxy, call_async, EventPattern
+from gabbletest import exec_test
+from servicetest import (
+ wrap_channel,
+ call_async, EventPattern,
+ assertEquals, assertLength,
+ )
import constants as cs
-import jingletest
+from jingletest2 import JingleProtocol031, JingleTest2
def test(q, bus, conn, stream):
- jt = jingletest.JingleTest(stream, 'test at localhost', 'foo at bar.com/Foo')
-
- # If we need to override remote caps, feats, codecs or caps,
- # this is a good time to do it
-
- # Connecting
- conn.Connect()
-
- q.expect('dbus-signal', signal='StatusChanged', args=[1, 1])
-
- q.expect('stream-authenticated')
- q.expect('dbus-signal', signal='PresenceUpdate',
- args=[{1L: (0L, {u'available': {}})}])
- q.expect('dbus-signal', signal='StatusChanged', args=[0, 1])
+ jt = JingleTest2(JingleProtocol031(), conn, q, stream, 'test at localhost',
+ 'foo at bar.com/Foo')
+ jt.prepare()
self_handle = conn.GetSelfHandle()
-
- # We need remote end's presence for capabilities
- jt.send_remote_presence()
-
- # Gabble doesn't trust it, so makes a disco
- event = q.expect('stream-iq', query_ns='http://jabber.org/protocol/disco#info',
- to='foo at bar.com/Foo')
-
- jt.send_remote_disco_reply(event.stanza)
-
- # Force Gabble to process the caps before calling EnsureChannel
- sync_stream(q, stream)
-
- handle = conn.RequestHandles(1, [jt.remote_jid])[0]
+ handle = conn.RequestHandles(cs.HT_CONTACT, [jt.peer])[0]
# Ensure a channel that doesn't exist yet.
call_async(q, conn.Requests, 'EnsureChannel',
@@ -63,25 +42,25 @@ def test(q, bus, conn, stream):
sig_path, sig_ct, sig_ht, sig_h, sig_sh = old_sig.args
- assert sig_path == path, (sig_path, path)
- assert sig_ct == cs.CHANNEL_TYPE_STREAMED_MEDIA, sig_ct
- assert sig_ht == cs.HT_CONTACT, sig_ht
- assert sig_h == handle, sig_h
- assert sig_sh == True # suppress handler
+ assertEquals(sig_path, path)
+ assertEquals(cs.CHANNEL_TYPE_STREAMED_MEDIA, sig_ct)
+ assertEquals(cs.HT_CONTACT, sig_ht)
+ assertEquals(handle, sig_h)
+ assert sig_sh # suppress handler
- assert len(new_sig.args) == 1
- assert len(new_sig.args[0]) == 1 # one channel
- assert len(new_sig.args[0][0]) == 2 # two struct members
- assert new_sig.args[0][0][0] == path
+ assertLength(1, new_sig.args)
+ assertLength(1, new_sig.args[0]) # one channel
+ assertLength(2, new_sig.args[0][0]) # two struct members
+ assertEquals(path, new_sig.args[0][0][0])
emitted_props = new_sig.args[0][0][1]
- assert emitted_props[cs.CHANNEL_TYPE] == cs.CHANNEL_TYPE_STREAMED_MEDIA
- assert emitted_props[cs.TARGET_HANDLE_TYPE] == cs.HT_CONTACT
- assert emitted_props[cs.TARGET_HANDLE] == handle
- assert emitted_props[cs.TARGET_ID] == 'foo at bar.com', emitted_props
- assert emitted_props[cs.REQUESTED] == True
- assert emitted_props[cs.INITIATOR_HANDLE] == self_handle
- assert emitted_props[cs.INITIATOR_ID] == 'test at localhost'
+ assertEquals(cs.CHANNEL_TYPE_STREAMED_MEDIA, emitted_props[cs.CHANNEL_TYPE])
+ assertEquals(cs.HT_CONTACT, emitted_props[cs.TARGET_HANDLE_TYPE])
+ assertEquals(handle, emitted_props[cs.TARGET_HANDLE])
+ assertEquals('foo at bar.com', emitted_props[cs.TARGET_ID])
+ assert emitted_props[cs.REQUESTED]
+ assertEquals(self_handle, emitted_props[cs.INITIATOR_HANDLE])
+ assertEquals('test at localhost', emitted_props[cs.INITIATOR_ID])
# Now ensure a media channel with the same contact, and check it's the
# same.
@@ -95,13 +74,13 @@ def test(q, bus, conn, stream):
yours2, path2, props2 = event.value
# We should have got back the same channel we created a page or so ago.
- assert path == path2, (path, path2)
+ assertEquals(path2, path)
# It's not been created for this call, so Yours should be False.
assert not yours2
# Time passes ... afterwards we close the chan
- chan = bus.get_object(conn.bus_name, path)
+ chan = wrap_channel(bus.get_object(conn.bus_name, path), 'StreamedMedia')
chan.Close()
@@ -118,31 +97,29 @@ def test(q, bus, conn, stream):
)
path = ret.value[0]
- assert old_sig.args[0] == path, (old_sig.args[0], path)
- assert old_sig.args[1] == cs.CHANNEL_TYPE_STREAMED_MEDIA, old_sig.args[1]
- assert old_sig.args[2] == 0, old_sig.args[2]
- assert old_sig.args[3] == 0, old_sig.args[3]
- assert old_sig.args[4] == True # suppress handler
-
- assert len(new_sig.args) == 1
- assert len(new_sig.args[0]) == 1 # one channel
- assert len(new_sig.args[0][0]) == 2 # two struct members
- assert new_sig.args[0][0][0] == path
+ assertEquals(
+ [path, cs.CHANNEL_TYPE_STREAMED_MEDIA, cs.HT_NONE, 0, True],
+ old_sig.args)
+
+ assertLength(1, new_sig.args)
+ assertLength(1, new_sig.args[0]) # one channel
+ assertLength(2, new_sig.args[0][0]) # two struct members
+ assertEquals(path, new_sig.args[0][0][0])
emitted_props = new_sig.args[0][0][1]
- assert emitted_props[cs.CHANNEL_TYPE] == cs.CHANNEL_TYPE_STREAMED_MEDIA
- assert emitted_props[cs.TARGET_HANDLE_TYPE] == 0
- assert emitted_props[cs.TARGET_HANDLE] == 0
- assert emitted_props[cs.TARGET_ID] == ''
- assert emitted_props[cs.REQUESTED] == True
- assert emitted_props[cs.INITIATOR_HANDLE] == self_handle
- assert emitted_props[cs.INITIATOR_ID] == 'test at localhost'
+ assertEquals(cs.CHANNEL_TYPE_STREAMED_MEDIA, emitted_props[cs.CHANNEL_TYPE])
+ assertEquals(cs.HT_NONE, emitted_props[cs.TARGET_HANDLE_TYPE])
+ assertEquals(0, emitted_props[cs.TARGET_HANDLE])
+ assertEquals('', emitted_props[cs.TARGET_ID])
+ assert emitted_props[cs.REQUESTED]
+ assertEquals(self_handle, emitted_props[cs.INITIATOR_HANDLE])
+ assertEquals('test at localhost', emitted_props[cs.INITIATOR_ID])
- media_iface = make_channel_proxy(conn, path, 'Channel.Type.StreamedMedia')
+ chan = wrap_channel(bus.get_object(conn.bus_name, path), 'StreamedMedia')
# Request streams with the other person. This should make them the
# channel's "peer" property.
- media_iface.RequestStreams(handle, [cs.MEDIA_STREAM_TYPE_AUDIO])
+ chan.StreamedMedia.RequestStreams(handle, [cs.MEDIA_STREAM_TYPE_AUDIO])
# Now, Ensuring a media channel with handle should yield the channel just
# created.
@@ -158,21 +135,15 @@ def test(q, bus, conn, stream):
# we should have got back the anonymous channel we got with requestchannel
# and called RequestStreams(handle) on.
- assert path == path2, (path, path2)
+ assertEquals(path2, path)
# It's not been created for this call, so Yours should be False.
assert not yours
- # Time passes ... afterwards we close the chan
-
- chan = bus.get_object(conn.bus_name, path)
chan.Close()
conn.Disconnect()
q.expect('dbus-signal', signal='StatusChanged', args=[2, 1])
- return True
-
-
if __name__ == '__main__':
exec_test(test)
--
1.5.6.5
More information about the telepathy-commits
mailing list