[Telepathy-commits] [telepathy-gabble/master] Add test on caps cache

Alban Crequy alban.crequy at collabora.co.uk
Tue Aug 19 10:52:37 PDT 2008


20080512171036-a41c0-1a9a28d60c657f7da28f6d48b59fc3c4f0e41f88.gz
---
 tests/twisted/Makefile.am        |    1 +
 tests/twisted/test-caps-cache.py |  188 ++++++++++++++++++++++++++++++++++++++
 2 files changed, 189 insertions(+), 0 deletions(-)
 create mode 100644 tests/twisted/test-caps-cache.py

diff --git a/tests/twisted/Makefile.am b/tests/twisted/Makefile.am
index 207b7cd..e9f4ce3 100644
--- a/tests/twisted/Makefile.am
+++ b/tests/twisted/Makefile.am
@@ -41,6 +41,7 @@ TWISTED_TESTS = \
 	jingle/test-outgoing-call-deprecated2.py \
 	jingle/test-outgoing-call-rejected.py \
 	test-capabilities.py \
+	test-caps-cache.py \
 	test-caps-hash.py \
 	test-connect-fail.py \
 	test-connect.py \
diff --git a/tests/twisted/test-caps-cache.py b/tests/twisted/test-caps-cache.py
new file mode 100644
index 0000000..6066379
--- /dev/null
+++ b/tests/twisted/test-caps-cache.py
@@ -0,0 +1,188 @@
+
+"""
+Test that requesting a caps set 1 time is enough with hash and that we need 5
+confirmation without hash.
+"""
+
+import dbus
+import sys
+
+from twisted.words.xish import domish, xpath
+
+from gabbletest import exec_test, make_result_iq
+
+text = 'org.freedesktop.Telepathy.Channel.Type.Text'
+sm = 'org.freedesktop.Telepathy.Channel.Type.StreamedMedia'
+caps_iface = 'org.freedesktop.Telepathy.Connection.Interface.Capabilities'
+
+caps_changed_flag = 0
+
+def caps_changed_cb(dummy):
+    # Workaround to bug 9980: do not raise an error but use a flag
+    # https://bugs.freedesktop.org/show_bug.cgi?id=9980
+    global caps_changed_flag
+    caps_changed_flag = 1
+
+def make_presence(from_jid, type, status):
+    presence = domish.Element((None, 'presence'))
+
+    if from_jid is not None:
+        presence['from'] = from_jid
+
+    if type is not None:
+        presence['type'] = type
+
+    if status is not None:
+        presence.addElement('status', content=status)
+
+    return presence
+
+def presence_add_caps(presence, ver, client, hash=None):
+    c = presence.addElement(('http://jabber.org/protocol/caps', 'c'))
+    c['node'] = client
+    c['ver'] = ver
+    if hash is not None:
+        c['hash'] = hash
+    return presence
+
+def _test_without_hash(q, bus, conn, stream, contact, contact_handle, client, disco):
+    global caps_changed_flag
+
+    presence = make_presence(contact, None, 'hello')
+    stream.send(presence)
+
+    event = q.expect('dbus-signal', signal='PresenceUpdate',
+        args=[{contact_handle: (0L, {u'available': {'message': 'hello'}})}])
+
+    # no special capabilities
+    basic_caps = [(contact_handle, text, 3, 0)]
+    assert conn.Capabilities.GetCapabilities([contact_handle]) == basic_caps
+
+    # send updated presence with Jingle caps info
+    presence = make_presence(contact, None, 'hello')
+    presence = presence_add_caps(presence, '0.1', client)
+    print str(presence)
+    stream.send(presence)
+
+    if disco:
+        # Gabble looks up our capabilities
+        event = q.expect('stream-iq', to=contact,
+            query_ns='http://jabber.org/protocol/disco#info')
+        query_node = xpath.queryForNodes('/iq/query', event.stanza)[0]
+        assert query_node.attributes['node'] == \
+            client + '#' + '0.1'
+
+        # send good reply
+        result = make_result_iq(stream, event.stanza)
+        query = result.firstChildElement()
+        feature = query.addElement('feature')
+        feature['var'] = 'http://jabber.org/protocol/jingle'
+        feature = query.addElement('feature')
+        feature['var'] = 'http://jabber.org/protocol/jingle/description/audio'
+        feature = query.addElement('feature')
+        feature['var'] = 'http://www.google.com/transport/p2p'
+        stream.send(result)
+
+    # we can now do audio calls
+    event = q.expect('dbus-signal', signal='CapabilitiesChanged')
+    caps_changed_flag = 0
+
+    # don't receive any D-Bus signal
+    assert caps_changed_flag == 0
+
+def _test_with_hash(q, bus, conn, stream, contact, contact_handle, client, disco):
+    global caps_changed_flag
+
+    presence = make_presence(contact, None, 'hello')
+    stream.send(presence)
+
+    event = q.expect('dbus-signal', signal='PresenceUpdate',
+        args=[{contact_handle: (0L, {u'available': {'message': 'hello'}})}])
+
+    # no special capabilities
+    basic_caps = [(contact_handle, text, 3, 0)]
+    assert conn.Capabilities.GetCapabilities([contact_handle]) == basic_caps
+
+    # send updated presence with Jingle caps info
+    presence = make_presence(contact, None, 'hello')
+    c = presence.addElement(('http://jabber.org/protocol/caps', 'c'))
+    c['node'] = client
+    c['ver'] = 'CzO+nkbflbxu1pgzOQSIi8gOyDc=' # good hash
+    c['hash'] = 'sha-1'
+    stream.send(presence)
+
+    if disco:
+        # Gabble looks up our capabilities
+        event = q.expect('stream-iq', to=contact,
+            query_ns='http://jabber.org/protocol/disco#info')
+        query_node = xpath.queryForNodes('/iq/query', event.stanza)[0]
+        assert query_node.attributes['node'] == \
+            client + '#' + c['ver']
+
+        # send good reply
+        result = make_result_iq(stream, event.stanza)
+        query = result.firstChildElement()
+        query['node'] = client + '#' + c['ver']
+        feature = query.addElement('feature')
+        feature['var'] = 'http://jabber.org/protocol/jingle'
+        feature = query.addElement('feature')
+        feature['var'] = 'http://jabber.org/protocol/jingle/description/audio'
+        feature = query.addElement('feature')
+        feature['var'] = 'http://www.google.com/transport/p2p'
+        query.addRawXml("""
+<x type='result' xmlns='jabber:x:data'>
+<field var='FORM_TYPE' type='hidden'>
+<value>urn:xmpp:dataforms:softwareinfo</value>
+</field>
+<field var='software'>
+<value>A Fake Client with Twisted</value>
+</field>
+<field var='software_version'>
+<value>5.11.2-svn-20080512</value>
+</field>
+<field var='os'>
+<value>Debian GNU/Linux unstable (sid) unstable sid</value>
+</field>
+<field var='os_version'>
+<value>2.6.24-1-amd64</value>
+</field>
+</x>
+        """)
+        stream.send(result)
+
+    # we can now do audio calls
+    event = q.expect('dbus-signal', signal='CapabilitiesChanged')
+    caps_changed_flag = 0
+
+    # don't receive any D-Bus signal
+    assert caps_changed_flag == 0
+
+def test(q, bus, conn, stream):
+    conn.Connect()
+    q.expect('dbus-signal', signal='StatusChanged', args=[0, 1])
+
+    # be notified when the signal CapabilitiesChanged is fired
+    conn_caps_iface = dbus.Interface(conn, caps_iface)
+    conn_caps_iface.connect_to_signal('CapabilitiesChanged', caps_changed_cb)
+
+    client = 'http://telepathy.freedesktop.org/fake-client'
+
+    _test_without_hash(q, bus, conn, stream, 'bob1 at foo.com/Foo', 2L, client, 1)
+    _test_without_hash(q, bus, conn, stream, 'bob2 at foo.com/Foo', 3L, client, 1)
+    _test_without_hash(q, bus, conn, stream, 'bob3 at foo.com/Foo', 4L, client, 1)
+    _test_without_hash(q, bus, conn, stream, 'bob4 at foo.com/Foo', 5L, client, 1)
+    _test_without_hash(q, bus, conn, stream, 'bob5 at foo.com/Foo', 6L, client, 1)
+    # we have 5 different contacts that confirm
+    _test_without_hash(q, bus, conn, stream, 'bob6 at foo.com/Foo', 7L, client, 0)
+
+    _test_with_hash(q, bus, conn, stream, 'bilbo1 at foo.com/Foo', 8L, client, 1)
+    # 1 contact is enough with hash
+    _test_with_hash(q, bus, conn, stream, 'bilbo2 at foo.com/Foo', 9L, client, 0)
+
+    conn.Disconnect()
+    q.expect('dbus-signal', signal='StatusChanged', args=[2, 1])
+
+
+if __name__ == '__main__':
+    exec_test(test)
+
-- 
1.5.6.3




More information about the Telepathy-commits mailing list