[telepathy-gabble/master] Tidy up test-outgoing-call-ensure

Will Thompson will.thompson at collabora.co.uk
Tue Apr 21 10:40:08 PDT 2009


---
 tests/twisted/jingle/test-outgoing-call-ensure.py |  123 ++++++++-------------
 1 files changed, 47 insertions(+), 76 deletions(-)

diff --git a/tests/twisted/jingle/test-outgoing-call-ensure.py b/tests/twisted/jingle/test-outgoing-call-ensure.py
index 8a5b0a3..9a330f0 100644
--- a/tests/twisted/jingle/test-outgoing-call-ensure.py
+++ b/tests/twisted/jingle/test-outgoing-call-ensure.py
@@ -1,46 +1,25 @@
-
 """
 Test making outgoing calls using EnsureChannel, and retrieving existing calls
 using EnsureChannel.
 """
 
-from gabbletest import exec_test, sync_stream
-from servicetest import make_channel_proxy, call_async, EventPattern
+from gabbletest import exec_test
+from servicetest import (
+    wrap_channel,
+    call_async, EventPattern,
+    assertEquals, assertLength,
+    )
 import constants as cs
-import jingletest
+from jingletest2 import JingleProtocol031, JingleTest2
 
 
 def test(q, bus, conn, stream):
-    jt = jingletest.JingleTest(stream, 'test at localhost', 'foo at bar.com/Foo')
-
-    # If we need to override remote caps, feats, codecs or caps,
-    # this is a good time to do it
-
-    # Connecting
-    conn.Connect()
-
-    q.expect('dbus-signal', signal='StatusChanged', args=[1, 1])
-
-    q.expect('stream-authenticated')
-    q.expect('dbus-signal', signal='PresenceUpdate',
-        args=[{1L: (0L, {u'available': {}})}])
-    q.expect('dbus-signal', signal='StatusChanged', args=[0, 1])
+    jt = JingleTest2(JingleProtocol031(), conn, q, stream, 'test at localhost',
+        'foo at bar.com/Foo')
+    jt.prepare()
 
     self_handle = conn.GetSelfHandle()
-
-    # We need remote end's presence for capabilities
-    jt.send_remote_presence()
-
-    # Gabble doesn't trust it, so makes a disco
-    event = q.expect('stream-iq', query_ns='http://jabber.org/protocol/disco#info',
-             to='foo at bar.com/Foo')
-
-    jt.send_remote_disco_reply(event.stanza)
-
-    # Force Gabble to process the caps before calling EnsureChannel
-    sync_stream(q, stream)
-
-    handle = conn.RequestHandles(1, [jt.remote_jid])[0]
+    handle = conn.RequestHandles(cs.HT_CONTACT, [jt.peer])[0]
 
     # Ensure a channel that doesn't exist yet.
     call_async(q, conn.Requests, 'EnsureChannel',
@@ -63,25 +42,25 @@ def test(q, bus, conn, stream):
 
     sig_path, sig_ct, sig_ht, sig_h, sig_sh = old_sig.args
 
-    assert sig_path == path, (sig_path, path)
-    assert sig_ct == cs.CHANNEL_TYPE_STREAMED_MEDIA, sig_ct
-    assert sig_ht == cs.HT_CONTACT, sig_ht
-    assert sig_h == handle, sig_h
-    assert sig_sh == True           # suppress handler
+    assertEquals(sig_path, path)
+    assertEquals(cs.CHANNEL_TYPE_STREAMED_MEDIA, sig_ct)
+    assertEquals(cs.HT_CONTACT, sig_ht)
+    assertEquals(handle, sig_h)
+    assert sig_sh # suppress handler
 
-    assert len(new_sig.args) == 1
-    assert len(new_sig.args[0]) == 1        # one channel
-    assert len(new_sig.args[0][0]) == 2     # two struct members
-    assert new_sig.args[0][0][0] == path
+    assertLength(1, new_sig.args)
+    assertLength(1, new_sig.args[0])        # one channel
+    assertLength(2, new_sig.args[0][0])     # two struct members
+    assertEquals(path, new_sig.args[0][0][0])
     emitted_props = new_sig.args[0][0][1]
 
-    assert emitted_props[cs.CHANNEL_TYPE] == cs.CHANNEL_TYPE_STREAMED_MEDIA
-    assert emitted_props[cs.TARGET_HANDLE_TYPE] == cs.HT_CONTACT
-    assert emitted_props[cs.TARGET_HANDLE] == handle
-    assert emitted_props[cs.TARGET_ID] == 'foo at bar.com', emitted_props
-    assert emitted_props[cs.REQUESTED] == True
-    assert emitted_props[cs.INITIATOR_HANDLE] == self_handle
-    assert emitted_props[cs.INITIATOR_ID] == 'test at localhost'
+    assertEquals(cs.CHANNEL_TYPE_STREAMED_MEDIA, emitted_props[cs.CHANNEL_TYPE])
+    assertEquals(cs.HT_CONTACT, emitted_props[cs.TARGET_HANDLE_TYPE])
+    assertEquals(handle, emitted_props[cs.TARGET_HANDLE])
+    assertEquals('foo at bar.com', emitted_props[cs.TARGET_ID])
+    assert emitted_props[cs.REQUESTED]
+    assertEquals(self_handle, emitted_props[cs.INITIATOR_HANDLE])
+    assertEquals('test at localhost', emitted_props[cs.INITIATOR_ID])
 
     # Now ensure a media channel with the same contact, and check it's the
     # same.
@@ -95,13 +74,13 @@ def test(q, bus, conn, stream):
     yours2, path2, props2 = event.value
 
     # We should have got back the same channel we created a page or so ago.
-    assert path == path2, (path, path2)
+    assertEquals(path2, path)
     # It's not been created for this call, so Yours should be False.
     assert not yours2
 
     # Time passes ... afterwards we close the chan
 
-    chan = bus.get_object(conn.bus_name, path)
+    chan = wrap_channel(bus.get_object(conn.bus_name, path), 'StreamedMedia')
     chan.Close()
 
 
@@ -118,31 +97,29 @@ def test(q, bus, conn, stream):
         )
 
     path = ret.value[0]
-    assert old_sig.args[0] == path, (old_sig.args[0], path)
-    assert old_sig.args[1] == cs.CHANNEL_TYPE_STREAMED_MEDIA, old_sig.args[1]
-    assert old_sig.args[2] == 0, old_sig.args[2]
-    assert old_sig.args[3] == 0, old_sig.args[3]
-    assert old_sig.args[4] == True      # suppress handler
-
-    assert len(new_sig.args) == 1
-    assert len(new_sig.args[0]) == 1        # one channel
-    assert len(new_sig.args[0][0]) == 2     # two struct members
-    assert new_sig.args[0][0][0] == path
+    assertEquals(
+        [path, cs.CHANNEL_TYPE_STREAMED_MEDIA, cs.HT_NONE, 0, True],
+        old_sig.args)
+
+    assertLength(1, new_sig.args)
+    assertLength(1, new_sig.args[0])        # one channel
+    assertLength(2, new_sig.args[0][0])     # two struct members
+    assertEquals(path, new_sig.args[0][0][0])
     emitted_props = new_sig.args[0][0][1]
 
-    assert emitted_props[cs.CHANNEL_TYPE] == cs.CHANNEL_TYPE_STREAMED_MEDIA
-    assert emitted_props[cs.TARGET_HANDLE_TYPE] == 0
-    assert emitted_props[cs.TARGET_HANDLE] == 0
-    assert emitted_props[cs.TARGET_ID] == ''
-    assert emitted_props[cs.REQUESTED] == True
-    assert emitted_props[cs.INITIATOR_HANDLE] == self_handle
-    assert emitted_props[cs.INITIATOR_ID] == 'test at localhost'
+    assertEquals(cs.CHANNEL_TYPE_STREAMED_MEDIA, emitted_props[cs.CHANNEL_TYPE])
+    assertEquals(cs.HT_NONE, emitted_props[cs.TARGET_HANDLE_TYPE])
+    assertEquals(0, emitted_props[cs.TARGET_HANDLE])
+    assertEquals('', emitted_props[cs.TARGET_ID])
+    assert emitted_props[cs.REQUESTED]
+    assertEquals(self_handle, emitted_props[cs.INITIATOR_HANDLE])
+    assertEquals('test at localhost', emitted_props[cs.INITIATOR_ID])
 
-    media_iface = make_channel_proxy(conn, path, 'Channel.Type.StreamedMedia')
+    chan = wrap_channel(bus.get_object(conn.bus_name, path), 'StreamedMedia')
 
     # Request streams with the other person.  This should make them the
     # channel's "peer" property.
-    media_iface.RequestStreams(handle, [cs.MEDIA_STREAM_TYPE_AUDIO])
+    chan.StreamedMedia.RequestStreams(handle, [cs.MEDIA_STREAM_TYPE_AUDIO])
 
     # Now, Ensuring a media channel with handle should yield the channel just
     # created.
@@ -158,21 +135,15 @@ def test(q, bus, conn, stream):
 
     # we should have got back the anonymous channel we got with requestchannel
     # and called RequestStreams(handle) on.
-    assert path == path2, (path, path2)
+    assertEquals(path2, path)
     # It's not been created for this call, so Yours should be False.
     assert not yours
 
-    # Time passes ... afterwards we close the chan
-
-    chan = bus.get_object(conn.bus_name, path)
     chan.Close()
 
     conn.Disconnect()
     q.expect('dbus-signal', signal='StatusChanged', args=[2, 1])
 
-    return True
-
-
 if __name__ == '__main__':
     exec_test(test)
 
-- 
1.5.6.5




More information about the telepathy-commits mailing list