[telepathy-salut/master] add test-caps-file-transfer.py

Guillaume Desmottes guillaume.desmottes at collabora.co.uk
Wed Apr 1 04:10:40 PDT 2009


---
 tests/twisted/avahi/test-caps-file-transfer.py |  174 ++++++++++++++++++++++++
 1 files changed, 174 insertions(+), 0 deletions(-)
 create mode 100644 tests/twisted/avahi/test-caps-file-transfer.py

diff --git a/tests/twisted/avahi/test-caps-file-transfer.py b/tests/twisted/avahi/test-caps-file-transfer.py
new file mode 100644
index 0000000..54ce954
--- /dev/null
+++ b/tests/twisted/avahi/test-caps-file-transfer.py
@@ -0,0 +1,174 @@
+
+"""
+Test tubes capabilities with Connection.Interface.ContactCapabilities.DRAFT
+
+1. Check if Salut advertise the OOB caps
+
+2. Receive presence and caps from contacts and check that
+GetContactCapabilities works correctly and that ContactCapabilitiesChanged is
+correctly received. Also check that GetContactAttributes gives the same
+results.
+
+- capa announced with FT
+- capa announced without FT
+- no capabilites announced (assume FT is supported)
+"""
+
+import dbus
+
+from avahitest import AvahiAnnouncer
+from avahitest import get_host_name
+from avahitest import txt_get_key
+
+from twisted.words.xish import xpath
+
+from servicetest import EventPattern
+from saluttest import exec_test, make_result_iq
+from xmppstream import setup_stream_listener
+import ns
+from constants import *
+
+from caps_helper import compute_caps_hash
+from config import PACKAGE_STRING
+
+# last value of the "ver" key we resolved. We use it to be sure that the
+# modified caps has already be announced.
+old_ver = ''
+
+def receive_presence_and_ask_caps(q, stream, service):
+    global old_ver
+
+    event_avahi, event_dbus = q.expect_many(
+            EventPattern('service-resolved', service=service),
+            EventPattern('dbus-signal', signal='ContactCapabilitiesChanged')
+        )
+    signaled_caps = event_dbus.args[0][1]
+
+    ver = txt_get_key(event_avahi.txt, "ver")
+    while ver == old_ver:
+        # be sure that the announced caps actually changes
+        event_avahi = q.expect('service-resolved', service=service)
+        ver = txt_get_key(event_avahi.txt, "ver")
+    old_ver = ver
+
+    hash = txt_get_key(event_avahi.txt, "hash")
+    node = txt_get_key(event_avahi.txt, "node")
+    assert hash == 'sha-1'
+
+    # ask caps
+    request = """
+<iq from='fake_contact at jabber.org/resource' 
+    id='disco1'
+    to='salut at jabber.org/resource' 
+    type='get'>
+  <query xmlns='http://jabber.org/protocol/disco#info'
+         node='""" + node + '#' + ver + """'/>
+</iq>
+"""
+    stream.send(request)
+
+    # receive caps
+    event = q.expect('stream-iq',
+        query_ns='http://jabber.org/protocol/disco#info')
+    caps_str = str(xpath.queryForNodes('/iq/query/feature', event.stanza))
+
+    features = []
+    for feature in xpath.queryForNodes('/iq/query/feature', event.stanza):
+        features.append(feature['var'])
+
+    # Check if the hash matches the announced capabilities
+    assert ver == compute_caps_hash(['client/pc//%s' % PACKAGE_STRING], features, [])
+
+    return (event, caps_str, signaled_caps)
+
+def caps_contain(event, cap):
+    node = xpath.queryForNodes('/iq/query/feature[@var="%s"]'
+            % cap,
+            event.stanza)
+    if node is None:
+        return False
+    if len(node) != 1:
+        return False
+    var = node[0].attributes['var']
+    if var is None:
+        return False
+    return var == cap
+
+def test_ft_caps_from_contact(q, bus, conn, client):
+
+    conn_caps_iface = dbus.Interface(conn, CONN_IFACE_CONTACT_CAPA)
+    conn_contacts_iface = dbus.Interface(conn, CONN_IFACE_CONTACTS)
+
+    # send presence with FT capa
+    ver = compute_caps_hash([], [ns.IQ_OOB], [])
+    txt_record = { "txtvers": "1", "status": "avail",
+        "node": client, "ver": ver, "hash": "sha-1"}
+    contact_name = "test-caps-tube@" + get_host_name()
+    listener, port = setup_stream_listener(q, contact_name)
+    announcer = AvahiAnnouncer(contact_name, "_presence._tcp", port,
+            txt_record)
+
+    # this is the first presence, Salut connects to the contact
+    e = q.expect('incoming-connection', listener = listener)
+    incoming = e.connection
+
+    # Salut looks up our capabilities
+    event = q.expect('stream-iq', connection = incoming,
+        query_ns='http://jabber.org/protocol/disco#info')
+    query_node = xpath.queryForNodes('/iq/query', event.stanza)[0]
+    assert query_node.attributes['node'] == \
+        client + '#' + ver, (query_node.attributes['node'], client, ver)
+
+    contact_handle = conn.RequestHandles(HT_CONTACT, [contact_name])[0]
+
+    # send good reply
+    result = make_result_iq(event.stanza)
+    query = result.firstChildElement()
+    query['node'] = client + '#' + ver
+
+    feature = query.addElement('feature')
+    feature['var'] = ns.IQ_OOB
+    incoming.send(result)
+
+    # FT capa is announced
+    e = q.expect('dbus-signal', signal='ContactCapabilitiesChanged')
+    caps = e.args[0][contact_handle]
+    assert ({CHANNEL_TYPE: CHANNEL_TYPE_FILE_TRANSFER,
+             TARGET_HANDLE_TYPE: HT_CONTACT},
+            [TARGET_HANDLE, TARGET_ID, FT_CONTENT_TYPE, FT_FILENAME, FT_SIZE,
+                FT_CONTENT_HASH_TYPE, FT_CONTENT_HASH, FT_DESCRIPTION,
+                FT_DATE, FT_INITIAL_OFFSET]) in caps
+
+    caps_get = conn_caps_iface.GetContactCapabilities([contact_handle])[contact_handle]
+    assert caps == caps_get
+
+    # check the Contacts interface give the same caps
+    caps_via_contacts_iface = conn_contacts_iface.GetContactAttributes(
+            [contact_handle], [CONN_IFACE_CONTACT_CAPA], False) \
+            [contact_handle][CONN_IFACE_CONTACT_CAPA + '/caps']
+    assert caps_via_contacts_iface == caps, caps_via_contacts_iface
+
+    # TODO: capa announced without FT
+    # TODO: no capabilites announced (assume FT is supported)
+
+def test(q, bus, conn):
+    # last value of the "ver" key we resolved. We use it to be sure that the
+    # modified caps has already be announced.
+    old_ver = None
+
+    conn.Connect()
+    q.expect('dbus-signal', signal='StatusChanged', args=[0, 0])
+
+    # TODO: check if Salut advertise the OOB caps
+
+    client = 'http://telepathy.freedesktop.org/fake-client'
+
+    test_ft_caps_from_contact(q, bus, conn, client)
+
+    conn.Disconnect()
+    q.expect('dbus-signal', signal='StatusChanged', args=[2, 1])
+
+
+if __name__ == '__main__':
+    exec_test(test)
+
-- 
1.5.6.5




More information about the telepathy-commits mailing list