[Telepathy-commits] [telepathy-gabble/master] Add a basic test for stream tube capabilities
Alban Crequy
alban.crequy at collabora.co.uk
Fri Dec 5 09:42:28 PST 2008
---
tests/twisted/Makefile.am | 1 +
tests/twisted/test-caps-tubes.py | 159 ++++++++++++++++++++++++++++++++++++++
2 files changed, 160 insertions(+), 0 deletions(-)
create mode 100644 tests/twisted/test-caps-tubes.py
diff --git a/tests/twisted/Makefile.am b/tests/twisted/Makefile.am
index 5bdffe6..4f2e659 100644
--- a/tests/twisted/Makefile.am
+++ b/tests/twisted/Makefile.am
@@ -50,6 +50,7 @@ TWISTED_TESTS = \
test-capabilities.py \
test-caps-cache.py \
test-caps-hash.py \
+ test-caps-tubes.py \
connect/test-fail.py \
connect/test-success.py \
connect/test-twice.py \
diff --git a/tests/twisted/test-caps-tubes.py b/tests/twisted/test-caps-tubes.py
new file mode 100644
index 0000000..e2b0844
--- /dev/null
+++ b/tests/twisted/test-caps-tubes.py
@@ -0,0 +1,159 @@
+
+"""
+Test tubes capabilities with Connection.Interface.ContactCapabilities.DRAFT
+Receive presence and caps from contacts and check that GetContactCapabilities
+works correctly. Test:
+- no tube cap at all
+- one stream tube cap
+- TODO: one D-Bus tube cap
+- TODO: several of them
+TODO: signals
+"""
+
+import dbus
+import sys
+
+from twisted.words.xish import domish, xpath
+
+from servicetest import EventPattern
+from gabbletest import exec_test, make_result_iq, sync_stream
+
+text_iface = 'org.freedesktop.Telepathy.Channel.Type.Text'
+caps_iface = 'org.freedesktop.Telepathy.' + \
+ 'Connection.Interface.ContactCapabilities.DRAFT'
+
+ns_tubes = 'http://telepathy.freedesktop.org/xmpp/tubes'
+
+text_fixed_properties = dbus.Dictionary({
+ 'org.freedesktop.Telepathy.Channel.TargetHandleType': 1L,
+ 'org.freedesktop.Telepathy.Channel.ChannelType':
+ 'org.freedesktop.Telepathy.Channel.Type.Text'
+ })
+text_allowed_properties = dbus.Array([
+ 'org.freedesktop.Telepathy.Channel.TargetHandle',
+ ])
+
+daap_fixed_properties = dbus.Dictionary({
+ 'org.freedesktop.Telepathy.Channel.TargetHandleType': 1L,
+ 'org.freedesktop.Telepathy.Channel.ChannelType':
+ 'org.freedesktop.Telepathy.Channel.Type.StreamTube.DRAFT',
+ 'org.freedesktop.Telepathy.Channel.Type.StreamTube.DRAFT.Service':
+ 'daap'
+ })
+daap_allowed_properties = dbus.Array([
+ 'org.freedesktop.Telepathy.Channel.TargetHandle',
+ ])
+
+
+def make_presence(from_jid, type, status):
+ presence = domish.Element((None, 'presence'))
+
+ if from_jid is not None:
+ presence['from'] = from_jid
+
+ if type is not None:
+ presence['type'] = type
+
+ if status is not None:
+ presence.addElement('status', content=status)
+
+ return presence
+
+def presence_add_caps(presence, ver, client, hash=None):
+ c = presence.addElement(('http://jabber.org/protocol/caps', 'c'))
+ c['node'] = client
+ c['ver'] = ver
+ if hash is not None:
+ c['hash'] = hash
+ return presence
+
+def _test_tube_caps(q, bus, conn, stream, contact, contact_handle, client):
+
+ conn_caps_iface = dbus.Interface(conn, caps_iface)
+
+ # send presence with no tube cap
+ presence = make_presence(contact, None, 'hello')
+ c = presence.addElement(('http://jabber.org/protocol/caps', 'c'))
+ c['node'] = client
+ c['ver'] = 'JpaYgiKL0y4fUOCTwN3WLGpaftM='
+ c['hash'] = 'sha-1'
+ stream.send(presence)
+
+ # Gabble looks up our capabilities
+ event = q.expect('stream-iq', to=contact,
+ query_ns='http://jabber.org/protocol/disco#info')
+ query_node = xpath.queryForNodes('/iq/query', event.stanza)[0]
+ assert query_node.attributes['node'] == \
+ client + '#' + c['ver']
+
+ # send good reply
+ result = make_result_iq(stream, event.stanza)
+ query = result.firstChildElement()
+ query['node'] = client + '#' + c['ver']
+
+ feature = query.addElement('feature')
+ feature['var'] = 'http://jabber.org/protocol/jingle'
+ feature = query.addElement('feature')
+ feature['var'] = 'http://jabber.org/protocol/jingle/description/audio'
+ feature = query.addElement('feature')
+ feature['var'] = 'http://www.google.com/transport/p2p'
+ stream.send(result)
+ sync_stream(q, stream)
+
+ #event = q.expect('dbus-signal', signal='CapabilitiesChanged')
+
+ # no special capabilities
+ basic_caps = [(contact_handle, text_fixed_properties,
+ text_allowed_properties)]
+ caps = conn_caps_iface.GetContactCapabilities([contact_handle])
+ assert caps == basic_caps, caps
+
+ # send presence with 1 stream tube cap
+ presence = make_presence(contact, None, 'hello')
+ c = presence.addElement(('http://jabber.org/protocol/caps', 'c'))
+ c['node'] = client
+ c['ver'] = 'njTWnNVMGeDjS8+4TkMuMX6Z/Ug='
+ c['hash'] = 'sha-1'
+ stream.send(presence)
+
+ # Gabble looks up our capabilities
+ event = q.expect('stream-iq', to=contact,
+ query_ns='http://jabber.org/protocol/disco#info')
+ query_node = xpath.queryForNodes('/iq/query', event.stanza)[0]
+ assert query_node.attributes['node'] == \
+ client + '#' + c['ver']
+
+ # send good reply
+ result = make_result_iq(stream, event.stanza)
+ query = result.firstChildElement()
+ query['node'] = client + '#' + c['ver']
+ feature = query.addElement('feature')
+ feature['var'] = ns_tubes + '/stream/daap'
+ stream.send(result)
+ sync_stream(q, stream)
+
+ #event = q.expect('dbus-signal', signal='CapabilitiesChanged')
+
+ # daap capabilities
+ daap_caps = [
+ (contact_handle, text_fixed_properties, text_allowed_properties),
+ (contact_handle, daap_fixed_properties, daap_allowed_properties)]
+ caps = conn_caps_iface.GetContactCapabilities([contact_handle])
+ assert caps == daap_caps, caps
+
+
+def test(q, bus, conn, stream):
+ conn.Connect()
+ q.expect('dbus-signal', signal='StatusChanged', args=[0, 1])
+
+ client = 'http://telepathy.freedesktop.org/fake-client'
+
+ _test_tube_caps(q, bus, conn, stream, 'bilbo1 at foo.com/Foo', 2L, client)
+
+ conn.Disconnect()
+ q.expect('dbus-signal', signal='StatusChanged', args=[2, 1])
+
+
+if __name__ == '__main__':
+ exec_test(test)
+
--
1.5.6.5
More information about the Telepathy-commits
mailing list