[Telepathy-commits] [telepathy-salut/master] Move the xmpp bits into xmppstream.py

Sjoerd Simons sjoerd.simons at collabora.co.uk
Mon Sep 1 06:33:42 PDT 2008


---
 tests/twisted/saluttest.py  |  291 +------------------------------------------
 tests/twisted/xmppstream.py |  224 +++++++++++++++++++++++++++++++++
 2 files changed, 226 insertions(+), 289 deletions(-)
 create mode 100644 tests/twisted/xmppstream.py

diff --git a/tests/twisted/saluttest.py b/tests/twisted/saluttest.py
index 05d0aaf..8865e94 100644
--- a/tests/twisted/saluttest.py
+++ b/tests/twisted/saluttest.py
@@ -10,204 +10,10 @@ import sys
 
 import servicetest
 import twisted
-from twisted.words.xish import domish, xpath
-from twisted.words.protocols.jabber.client import IQ
-from twisted.words.protocols.jabber import xmlstream
 from twisted.internet import reactor
 
 import dbus
 
-NS_XMPP_SASL = 'urn:ietf:params:xml:ns:xmpp-sasl'
-NS_XMPP_BIND = 'urn:ietf:params:xml:ns:xmpp-bind'
-
-def make_result_iq(stream, iq):
-    result = IQ(stream, "result")
-    result["id"] = iq["id"]
-    query = iq.firstChildElement()
-
-    if query:
-        result.addElement((query.uri, query.name))
-
-    return result
-
-def acknowledge_iq(stream, iq):
-    stream.send(make_result_iq(stream, iq))
-
-def sync_stream(q, stream):
-    """Used to ensure that Gabble has processed all stanzas sent to it."""
-
-    iq = IQ(stream, "get")
-    iq.addElement(('http://jabber.org/protocol/disco#info', 'query'))
-    stream.send(iq)
-    q.expect('stream-iq', query_ns='http://jabber.org/protocol/disco#info')
-
-class JabberAuthenticator(xmlstream.Authenticator):
-    "Trivial XML stream authenticator that accepts one username/digest pair."
-
-    def __init__(self, username, password):
-        self.username = username
-        self.password = password
-        xmlstream.Authenticator.__init__(self)
-
-    def streamStarted(self, root=None):
-        if root:
-            self.xmlstream.sid = root.getAttribute('id')
-
-        self.xmlstream.sendHeader()
-        self.xmlstream.addOnetimeObserver(
-            "/iq/query[@xmlns='jabber:iq:auth']", self.initialIq)
-
-    def initialIq(self, iq):
-        result = IQ(self.xmlstream, "result")
-        result["id"] = iq["id"]
-        query = result.addElement('query')
-        query["xmlns"] = "jabber:iq:auth"
-        query.addElement('username', content='test')
-        query.addElement('password')
-        query.addElement('digest')
-        query.addElement('resource')
-        self.xmlstream.addOnetimeObserver('/iq/query/username', self.secondIq)
-        self.xmlstream.send(result)
-
-    def secondIq(self, iq):
-        username = xpath.queryForNodes('/iq/query/username', iq)
-        assert map(str, username) == [self.username]
-
-        digest = xpath.queryForNodes('/iq/query/digest', iq)
-        expect = sha.sha(self.xmlstream.sid + self.password).hexdigest()
-        assert map(str, digest) == [expect]
-
-        resource = xpath.queryForNodes('/iq/query/resource', iq)
-        assert map(str, resource) == ['Resource']
-
-        result = IQ(self.xmlstream, "result")
-        result["id"] = iq["id"]
-        self.xmlstream.send(result)
-        self.xmlstream.dispatch(self.xmlstream, xmlstream.STREAM_AUTHD_EVENT)
-
-
-class XmppAuthenticator(xmlstream.Authenticator):
-    def __init__(self, username, password):
-        xmlstream.Authenticator.__init__(self)
-        self.username = username
-        self.password = password
-        self.authenticated = False
-
-    def streamStarted(self, root=None):
-        if root:
-            self.xmlstream.sid = root.getAttribute('id')
-
-        self.xmlstream.sendHeader()
-
-        if self.authenticated:
-            # Initiator authenticated itself, and has started a new stream.
-
-            features = domish.Element((xmlstream.NS_STREAMS, 'features'))
-            bind = features.addElement((NS_XMPP_BIND, 'bind'))
-            self.xmlstream.send(features)
-
-            self.xmlstream.addOnetimeObserver(
-                "/iq/bind[@xmlns='%s']" % NS_XMPP_BIND, self.bindIq)
-        else:
-            features = domish.Element((xmlstream.NS_STREAMS, 'features'))
-            mechanisms = features.addElement((NS_XMPP_SASL, 'mechanisms'))
-            mechanism = mechanisms.addElement('mechanism', content='PLAIN')
-            self.xmlstream.send(features)
-
-            self.xmlstream.addOnetimeObserver("/auth", self.auth)
-
-    def auth(self, auth):
-        assert (base64.b64decode(str(auth)) ==
-            '\x00%s\x00%s' % (self.username, self.password))
-
-        success = domish.Element((NS_XMPP_SASL, 'success'))
-        self.xmlstream.send(success)
-        self.xmlstream.reset()
-        self.authenticated = True
-
-    def bindIq(self, iq):
-        assert xpath.queryForString('/iq/bind/resource', iq) == 'Resource'
-
-        result = IQ(self.xmlstream, "result")
-        result["id"] = iq["id"]
-        bind = result.addElement((NS_XMPP_BIND, 'bind'))
-        jid = bind.addElement('jid', content='test at localhost/Resource')
-        self.xmlstream.send(result)
-
-        self.xmlstream.dispatch(self.xmlstream, xmlstream.STREAM_AUTHD_EVENT)
-
-def make_stream_event(type, stanza):
-    event = servicetest.Event(type, stanza=stanza)
-    event.to = stanza.getAttribute("to")
-    return event
-
-def make_iq_event(iq):
-    event = make_stream_event('stream-iq', iq)
-    event.iq_type = iq.getAttribute("type")
-    query = iq.firstChildElement()
-
-    if query:
-        event.query = query
-        event.query_ns = query.uri
-        event.query_name = query.name
-
-        if query.getAttribute("node"):
-            event.query_node = query.getAttribute("node")
-
-    return event
-
-def make_presence_event(stanza):
-    event = make_stream_event('stream-presence', stanza)
-    event.presence_type = stanza.getAttribute('type')
-    return event
-
-def make_message_event(stanza):
-    event = make_stream_event('stream-message', stanza)
-    event.message_type = stanza.getAttribute('type')
-    return event
-
-class BaseXmlStream(xmlstream.XmlStream):
-    initiating = False
-    namespace = 'jabber:client'
-
-    def __init__(self, event_func, authenticator):
-        xmlstream.XmlStream.__init__(self, authenticator)
-        self.event_func = event_func
-        self.addObserver('//iq', lambda x: event_func(
-            make_iq_event(x)))
-        self.addObserver('//message', lambda x: event_func(
-            make_message_event(x)))
-        self.addObserver('//presence', lambda x: event_func(
-            make_presence_event(x)))
-        self.addObserver('//event/stream/authd', self._cb_authd)
-
-    def _cb_authd(self, _):
-        # called when stream is authenticated
-        self.addObserver(
-            "/iq/query[@xmlns='http://jabber.org/protocol/disco#info']",
-            self._cb_disco_iq)
-        self.event_func(servicetest.Event('stream-authenticated'))
-
-    def _cb_disco_iq(self, iq):
-        if iq.getAttribute('to') == 'localhost':
-            # add PEP support
-            nodes = xpath.queryForNodes(
-                "/iq/query[@xmlns='http://jabber.org/protocol/disco#info']",
-                iq)
-            query = nodes[0]
-            identity = query.addElement('identity')
-            identity['category'] = 'pubsub'
-            identity['type'] = 'pep'
-
-            iq['type'] = 'result'
-            self.send(iq)
-
-class JabberXmlStream(BaseXmlStream):
-    version = (0, 9)
-
-class XmppXmlStream(BaseXmlStream):
-    version = (1, 0)
-
 def make_connection(bus, event_func, params=None):
     default_params = {
         'account': 'test at localhost/Resource',
@@ -220,51 +26,8 @@ def make_connection(bus, event_func, params=None):
     if params:
         default_params.update(params)
 
-    return servicetest.make_connection(bus, event_func, 'gabble', 'jabber',
-        default_params)
-
-def make_stream(event_func, authenticator=None, protocol=None, port=4242):
-    # set up Jabber server
-
-    if authenticator is None:
-        authenticator = JabberAuthenticator('test', 'pass')
-
-    if protocol is None:
-        protocol = JabberXmlStream
-
-    stream = protocol(event_func, authenticator)
-    factory = twisted.internet.protocol.Factory()
-    factory.protocol = lambda *args: stream
-    port = reactor.listenTCP(port, factory)
-    return (stream, port)
-
-def go(params=None, authenticator=None, protocol=None, start=None):
-    # hack to ease debugging
-    domish.Element.__repr__ = domish.Element.toXml
-
-    bus = dbus.SessionBus()
-    handler = servicetest.EventTest()
-    conn = make_connection(bus, handler.handle_event, params)
-    (stream, _) = make_stream(handler.handle_event, authenticator, protocol)
-    handler.data = {
-        'bus': bus,
-        'conn': conn,
-        'conn_iface': dbus.Interface(conn,
-            'org.freedesktop.Telepathy.Connection'),
-        'stream': stream}
-    handler.data['test'] = handler
-    handler.verbose = (os.environ.get('CHECK_TWISTED_VERBOSE', '') != '')
-    map(handler.expect, servicetest.load_event_handlers())
-
-    if '-v' in sys.argv:
-        handler.verbose = True
-
-    if start is None:
-        handler.data['conn'].Connect()
-    else:
-        start(handler.data)
-
-    reactor.run()
+    return servicetest.make_connection(bus, event_func, 'salut',
+        'local-xmpp', default_params)
 
 def install_colourer():
     def red(s):
@@ -292,8 +55,6 @@ def install_colourer():
 
 
 def exec_test_deferred (fun, params, protocol=None, timeout=None):
-    # hack to ease debugging
-    domish.Element.__repr__ = domish.Element.toXml
     colourer = None
 
     if sys.stdout.isatty():
@@ -336,51 +97,3 @@ def exec_test_deferred (fun, params, protocol=None, timeout=None):
 def exec_test(fun, params=None, protocol=None, timeout=None):
   reactor.callWhenRunning (exec_test_deferred, fun, params, protocol, timeout)
   reactor.run()
-
-# Useful routines for server-side vCard handling
-current_vcard = domish.Element(('vcard-temp', 'vCard'))
-
-def handle_get_vcard(event, data):
-    iq = event.stanza
-
-    if iq['type'] != 'get':
-        return False
-
-    if iq.uri != 'jabber:client':
-        return False
-
-    vcard = list(iq.elements())[0]
-
-    if vcard.name != 'vCard':
-        return False
-
-    # Send back current vCard
-    new_iq = IQ(data['stream'], 'result')
-    new_iq['id'] = iq['id']
-    new_iq.addChild(current_vcard)
-    data['stream'].send(new_iq)
-    return True
-
-def handle_set_vcard(event, data):
-    global current_vcard
-    iq = event.stanza
-
-    if iq['type'] != 'set':
-        return False
-
-    if iq.uri != 'jabber:client':
-        return False
-
-    vcard = list(iq.elements())[0]
-
-    if vcard.name != 'vCard':
-        return False
-
-    current_vcard = iq.firstChildElement()
-
-    new_iq = IQ(data['stream'], 'result')
-    new_iq['id'] = iq['id']
-    data['stream'].send(new_iq)
-    return True
-
-
diff --git a/tests/twisted/xmppstream.py b/tests/twisted/xmppstream.py
new file mode 100644
index 0000000..31cebf5
--- /dev/null
+++ b/tests/twisted/xmppstream.py
@@ -0,0 +1,224 @@
+
+"""
+Infrastructure code testing CM by pretending to be a Jabber server.
+"""
+
+import base64
+import os
+import sha
+import sys
+
+import servicetest
+import twisted
+from twisted.words.xish import domish, xpath
+from twisted.words.protocols.jabber.client import IQ
+from twisted.words.protocols.jabber import xmlstream
+from twisted.internet import reactor
+
+import dbus
+
+NS_XMPP_SASL = 'urn:ietf:params:xml:ns:xmpp-sasl'
+NS_XMPP_BIND = 'urn:ietf:params:xml:ns:xmpp-bind'
+
+def make_result_iq(stream, iq):
+    result = IQ(stream, "result")
+    result["id"] = iq["id"]
+    query = iq.firstChildElement()
+
+    if query:
+        result.addElement((query.uri, query.name))
+
+    return result
+
+def acknowledge_iq(stream, iq):
+    stream.send(make_result_iq(stream, iq))
+
+def sync_stream(q, stream):
+    """Used to ensure that Gabble has processed all stanzas sent to it."""
+
+    iq = IQ(stream, "get")
+    iq.addElement(('http://jabber.org/protocol/disco#info', 'query'))
+    stream.send(iq)
+    q.expect('stream-iq', query_ns='http://jabber.org/protocol/disco#info')
+
+class JabberAuthenticator(xmlstream.Authenticator):
+    "Trivial XML stream authenticator that accepts one username/digest pair."
+
+    def __init__(self, username, password):
+        self.username = username
+        self.password = password
+        xmlstream.Authenticator.__init__(self)
+
+    def streamStarted(self, root=None):
+        if root:
+            self.xmlstream.sid = root.getAttribute('id')
+
+        self.xmlstream.sendHeader()
+        self.xmlstream.addOnetimeObserver(
+            "/iq/query[@xmlns='jabber:iq:auth']", self.initialIq)
+
+    def initialIq(self, iq):
+        result = IQ(self.xmlstream, "result")
+        result["id"] = iq["id"]
+        query = result.addElement('query')
+        query["xmlns"] = "jabber:iq:auth"
+        query.addElement('username', content='test')
+        query.addElement('password')
+        query.addElement('digest')
+        query.addElement('resource')
+        self.xmlstream.addOnetimeObserver('/iq/query/username', self.secondIq)
+        self.xmlstream.send(result)
+
+    def secondIq(self, iq):
+        username = xpath.queryForNodes('/iq/query/username', iq)
+        assert map(str, username) == [self.username]
+
+        digest = xpath.queryForNodes('/iq/query/digest', iq)
+        expect = sha.sha(self.xmlstream.sid + self.password).hexdigest()
+        assert map(str, digest) == [expect]
+
+        resource = xpath.queryForNodes('/iq/query/resource', iq)
+        assert map(str, resource) == ['Resource']
+
+        result = IQ(self.xmlstream, "result")
+        result["id"] = iq["id"]
+        self.xmlstream.send(result)
+        self.xmlstream.dispatch(self.xmlstream, xmlstream.STREAM_AUTHD_EVENT)
+
+
+class XmppAuthenticator(xmlstream.Authenticator):
+    def __init__(self, username, password):
+        xmlstream.Authenticator.__init__(self)
+        self.username = username
+        self.password = password
+        self.authenticated = False
+
+    def streamStarted(self, root=None):
+        if root:
+            self.xmlstream.sid = root.getAttribute('id')
+
+        self.xmlstream.sendHeader()
+
+        if self.authenticated:
+            # Initiator authenticated itself, and has started a new stream.
+
+            features = domish.Element((xmlstream.NS_STREAMS, 'features'))
+            bind = features.addElement((NS_XMPP_BIND, 'bind'))
+            self.xmlstream.send(features)
+
+            self.xmlstream.addOnetimeObserver(
+                "/iq/bind[@xmlns='%s']" % NS_XMPP_BIND, self.bindIq)
+        else:
+            features = domish.Element((xmlstream.NS_STREAMS, 'features'))
+            mechanisms = features.addElement((NS_XMPP_SASL, 'mechanisms'))
+            mechanism = mechanisms.addElement('mechanism', content='PLAIN')
+            self.xmlstream.send(features)
+
+            self.xmlstream.addOnetimeObserver("/auth", self.auth)
+
+    def auth(self, auth):
+        assert (base64.b64decode(str(auth)) ==
+            '\x00%s\x00%s' % (self.username, self.password))
+
+        success = domish.Element((NS_XMPP_SASL, 'success'))
+        self.xmlstream.send(success)
+        self.xmlstream.reset()
+        self.authenticated = True
+
+    def bindIq(self, iq):
+        assert xpath.queryForString('/iq/bind/resource', iq) == 'Resource'
+
+        result = IQ(self.xmlstream, "result")
+        result["id"] = iq["id"]
+        bind = result.addElement((NS_XMPP_BIND, 'bind'))
+        jid = bind.addElement('jid', content='test at localhost/Resource')
+        self.xmlstream.send(result)
+
+        self.xmlstream.dispatch(self.xmlstream, xmlstream.STREAM_AUTHD_EVENT)
+
+def make_stream_event(type, stanza):
+    event = servicetest.Event(type, stanza=stanza)
+    event.to = stanza.getAttribute("to")
+    return event
+
+def make_iq_event(iq):
+    event = make_stream_event('stream-iq', iq)
+    event.iq_type = iq.getAttribute("type")
+    query = iq.firstChildElement()
+
+    if query:
+        event.query = query
+        event.query_ns = query.uri
+        event.query_name = query.name
+
+        if query.getAttribute("node"):
+            event.query_node = query.getAttribute("node")
+
+    return event
+
+def make_presence_event(stanza):
+    event = make_stream_event('stream-presence', stanza)
+    event.presence_type = stanza.getAttribute('type')
+    return event
+
+def make_message_event(stanza):
+    event = make_stream_event('stream-message', stanza)
+    event.message_type = stanza.getAttribute('type')
+    return event
+
+class BaseXmlStream(xmlstream.XmlStream):
+    initiating = False
+    namespace = 'jabber:client'
+
+    def __init__(self, event_func, authenticator):
+        xmlstream.XmlStream.__init__(self, authenticator)
+        self.event_func = event_func
+        self.addObserver('//iq', lambda x: event_func(
+            make_iq_event(x)))
+        self.addObserver('//message', lambda x: event_func(
+            make_message_event(x)))
+        self.addObserver('//presence', lambda x: event_func(
+            make_presence_event(x)))
+        self.addObserver('//event/stream/authd', self._cb_authd)
+
+    def _cb_authd(self, _):
+        # called when stream is authenticated
+        self.addObserver(
+            "/iq/query[@xmlns='http://jabber.org/protocol/disco#info']",
+            self._cb_disco_iq)
+        self.event_func(servicetest.Event('stream-authenticated'))
+
+    def _cb_disco_iq(self, iq):
+        if iq.getAttribute('to') == 'localhost':
+            # add PEP support
+            nodes = xpath.queryForNodes(
+                "/iq/query[@xmlns='http://jabber.org/protocol/disco#info']",
+                iq)
+            query = nodes[0]
+            identity = query.addElement('identity')
+            identity['category'] = 'pubsub'
+            identity['type'] = 'pep'
+
+            iq['type'] = 'result'
+            self.send(iq)
+
+class JabberXmlStream(BaseXmlStream):
+    version = (0, 9)
+
+class XmppXmlStream(BaseXmlStream):
+    version = (1, 0)
+
+def make_stream(event_func, authenticator=None, protocol=None, port=4242):
+    # set up Jabber server
+
+    if authenticator is None:
+        authenticator = JabberAuthenticator('test', 'pass')
+
+    if protocol is None:
+        protocol = JabberXmlStream
+
+    stream = protocol(event_func, authenticator)
+    factory = twisted.internet.protocol.Factory()
+    factory.protocol = lambda *args: stream
+    port = reactor.listenTCP(port, factory)
+    return (stream, port)
-- 
1.5.6.3




More information about the Telepathy-commits mailing list