[Telepathy-commits] [telepathy-salut/master] Add a incoming and outgoing xmmp stream for testing

Sjoerd Simons sjoerd.simons at collabora.co.uk
Wed Sep 3 11:41:58 PDT 2008


---
 tests/twisted/xmppstream.py |  325 +++++++++++++++++++------------------------
 1 files changed, 145 insertions(+), 180 deletions(-)

diff --git a/tests/twisted/xmppstream.py b/tests/twisted/xmppstream.py
index 31cebf5..8d1cb92 100644
--- a/tests/twisted/xmppstream.py
+++ b/tests/twisted/xmppstream.py
@@ -3,142 +3,21 @@
 Infrastructure code testing CM by pretending to be a Jabber server.
 """
 
-import base64
-import os
-import sha
-import sys
-
 import servicetest
+from servicetest import Event, EventPattern
 import twisted
-from twisted.words.xish import domish, xpath
-from twisted.words.protocols.jabber.client import IQ
-from twisted.words.protocols.jabber import xmlstream
+from twisted.words.xish import domish, xpath, xmlstream
+from twisted.internet.protocol import Factory
 from twisted.internet import reactor
 
-import dbus
-
-NS_XMPP_SASL = 'urn:ietf:params:xml:ns:xmpp-sasl'
-NS_XMPP_BIND = 'urn:ietf:params:xml:ns:xmpp-bind'
-
-def make_result_iq(stream, iq):
-    result = IQ(stream, "result")
-    result["id"] = iq["id"]
-    query = iq.firstChildElement()
-
-    if query:
-        result.addElement((query.uri, query.name))
-
-    return result
-
-def acknowledge_iq(stream, iq):
-    stream.send(make_result_iq(stream, iq))
-
-def sync_stream(q, stream):
-    """Used to ensure that Gabble has processed all stanzas sent to it."""
-
-    iq = IQ(stream, "get")
-    iq.addElement(('http://jabber.org/protocol/disco#info', 'query'))
-    stream.send(iq)
-    q.expect('stream-iq', query_ns='http://jabber.org/protocol/disco#info')
-
-class JabberAuthenticator(xmlstream.Authenticator):
-    "Trivial XML stream authenticator that accepts one username/digest pair."
-
-    def __init__(self, username, password):
-        self.username = username
-        self.password = password
-        xmlstream.Authenticator.__init__(self)
-
-    def streamStarted(self, root=None):
-        if root:
-            self.xmlstream.sid = root.getAttribute('id')
-
-        self.xmlstream.sendHeader()
-        self.xmlstream.addOnetimeObserver(
-            "/iq/query[@xmlns='jabber:iq:auth']", self.initialIq)
-
-    def initialIq(self, iq):
-        result = IQ(self.xmlstream, "result")
-        result["id"] = iq["id"]
-        query = result.addElement('query')
-        query["xmlns"] = "jabber:iq:auth"
-        query.addElement('username', content='test')
-        query.addElement('password')
-        query.addElement('digest')
-        query.addElement('resource')
-        self.xmlstream.addOnetimeObserver('/iq/query/username', self.secondIq)
-        self.xmlstream.send(result)
-
-    def secondIq(self, iq):
-        username = xpath.queryForNodes('/iq/query/username', iq)
-        assert map(str, username) == [self.username]
-
-        digest = xpath.queryForNodes('/iq/query/digest', iq)
-        expect = sha.sha(self.xmlstream.sid + self.password).hexdigest()
-        assert map(str, digest) == [expect]
-
-        resource = xpath.queryForNodes('/iq/query/resource', iq)
-        assert map(str, resource) == ['Resource']
-
-        result = IQ(self.xmlstream, "result")
-        result["id"] = iq["id"]
-        self.xmlstream.send(result)
-        self.xmlstream.dispatch(self.xmlstream, xmlstream.STREAM_AUTHD_EVENT)
-
-
-class XmppAuthenticator(xmlstream.Authenticator):
-    def __init__(self, username, password):
-        xmlstream.Authenticator.__init__(self)
-        self.username = username
-        self.password = password
-        self.authenticated = False
-
-    def streamStarted(self, root=None):
-        if root:
-            self.xmlstream.sid = root.getAttribute('id')
-
-        self.xmlstream.sendHeader()
-
-        if self.authenticated:
-            # Initiator authenticated itself, and has started a new stream.
-
-            features = domish.Element((xmlstream.NS_STREAMS, 'features'))
-            bind = features.addElement((NS_XMPP_BIND, 'bind'))
-            self.xmlstream.send(features)
-
-            self.xmlstream.addOnetimeObserver(
-                "/iq/bind[@xmlns='%s']" % NS_XMPP_BIND, self.bindIq)
-        else:
-            features = domish.Element((xmlstream.NS_STREAMS, 'features'))
-            mechanisms = features.addElement((NS_XMPP_SASL, 'mechanisms'))
-            mechanism = mechanisms.addElement('mechanism', content='PLAIN')
-            self.xmlstream.send(features)
-
-            self.xmlstream.addOnetimeObserver("/auth", self.auth)
-
-    def auth(self, auth):
-        assert (base64.b64decode(str(auth)) ==
-            '\x00%s\x00%s' % (self.username, self.password))
-
-        success = domish.Element((NS_XMPP_SASL, 'success'))
-        self.xmlstream.send(success)
-        self.xmlstream.reset()
-        self.authenticated = True
-
-    def bindIq(self, iq):
-        assert xpath.queryForString('/iq/bind/resource', iq) == 'Resource'
-
-        result = IQ(self.xmlstream, "result")
-        result["id"] = iq["id"]
-        bind = result.addElement((NS_XMPP_BIND, 'bind'))
-        jid = bind.addElement('jid', content='test at localhost/Resource')
-        self.xmlstream.send(result)
-
-        self.xmlstream.dispatch(self.xmlstream, xmlstream.STREAM_AUTHD_EVENT)
+NS_STREAMS = 'http://etherx.jabber.org/streams'
 
 def make_stream_event(type, stanza):
     event = servicetest.Event(type, stanza=stanza)
-    event.to = stanza.getAttribute("to")
+    if stanza.hasAttribute("to"):
+        event.name = stanza.getAttribute("to")
+    if stanza.hasAttribute("from"):
+        event.remote_name = stanza.getAttribute("from")
     return event
 
 def make_iq_event(iq):
@@ -167,58 +46,144 @@ def make_message_event(stanza):
     return event
 
 class BaseXmlStream(xmlstream.XmlStream):
-    initiating = False
-    namespace = 'jabber:client'
-
-    def __init__(self, event_func, authenticator):
-        xmlstream.XmlStream.__init__(self, authenticator)
-        self.event_func = event_func
-        self.addObserver('//iq', lambda x: event_func(
+    prefixes = { NS_STREAMS: 'stream' }
+    version = "1.0"
+
+    def __init__(self, event_function, name = None, remote_name = None):
+        xmlstream.XmlStream.__init__(self)
+
+        self.name = name
+        self.remote_name = remote_name
+        self.event_func = event_function
+
+        self.event_function = event_function
+        self.addObserver(xmlstream.STREAM_START_EVENT,
+            lambda *args: self.event(Event('stream-opened')))
+        self.addObserver('//features', lambda x: self.event(
+            make_stream_event('stream-features', x)))
+        self.addObserver('//iq', lambda x: self.event(
             make_iq_event(x)))
-        self.addObserver('//message', lambda x: event_func(
+        self.addObserver('//message', lambda x: self.event(
             make_message_event(x)))
-        self.addObserver('//presence', lambda x: event_func(
+        self.addObserver('//presence', lambda x: self.event(
             make_presence_event(x)))
-        self.addObserver('//event/stream/authd', self._cb_authd)
-
-    def _cb_authd(self, _):
-        # called when stream is authenticated
-        self.addObserver(
-            "/iq/query[@xmlns='http://jabber.org/protocol/disco#info']",
-            self._cb_disco_iq)
-        self.event_func(servicetest.Event('stream-authenticated'))
-
-    def _cb_disco_iq(self, iq):
-        if iq.getAttribute('to') == 'localhost':
-            # add PEP support
-            nodes = xpath.queryForNodes(
-                "/iq/query[@xmlns='http://jabber.org/protocol/disco#info']",
-                iq)
-            query = nodes[0]
-            identity = query.addElement('identity')
-            identity['category'] = 'pubsub'
-            identity['type'] = 'pep'
-
-            iq['type'] = 'result'
-            self.send(iq)
-
-class JabberXmlStream(BaseXmlStream):
-    version = (0, 9)
-
-class XmppXmlStream(BaseXmlStream):
-    version = (1, 0)
-
-def make_stream(event_func, authenticator=None, protocol=None, port=4242):
-    # set up Jabber server
-
-    if authenticator is None:
-        authenticator = JabberAuthenticator('test', 'pass')
-
-    if protocol is None:
-        protocol = JabberXmlStream
-
-    stream = protocol(event_func, authenticator)
-    factory = twisted.internet.protocol.Factory()
-    factory.protocol = lambda *args: stream
+
+    def send_header(self):
+        root = domish.Element((NS_STREAMS, 'stream'))
+        root['from'] = self.name
+        root['to'] = self.remote_name
+        root['version'] = self.version
+        self.send(root.toXml(closeElement = 0, prefixes=self.prefixes))
+
+    def event(self, e):
+        e.connection = self
+        self.event_function(e)
+
+    def send(self, obj):
+        if domish.IElement.providedBy(obj):
+            if self.name != None:
+                obj["from"] = self.name
+            if self.remote_name != None:
+                obj["to"] = self.remote_name
+            obj = obj.toXml(prefixes=self.prefixes)
+
+        xmlstream.XmlStream.send(self, obj)
+
+
+class IncomingXmppStream(BaseXmlStream):
+    def __init__(self, event_func, name):
+        BaseXmlStream.__init__(self, event_func)
+        self.name = name
+        self.remote_name = None
+
+    def onDocumentStart(self, rootElement):
+        # Use the fact that it's always salut that connects, so it sends a
+        # proper opening
+        assert rootElement.name == "stream"
+        assert rootElement.uri == NS_STREAMS
+
+        assert rootElement.hasAttribute("from")
+        assert rootElement.hasAttribute("to")
+        assert rootElement["to"] == self.name, self.name
+
+        assert rootElement.hasAttribute("version")
+        assert rootElement["version"] == "1.0"
+
+        self.remote_name = rootElement["from"]
+        self.send_header()
+        self.send_features()
+        BaseXmlStream.onDocumentStart(self, rootElement)
+
+    def send_features(self):
+        features = domish.Element((NS_STREAMS, 'features'))
+        self.send(features)
+
+class IncomingXmppFactory(Factory):
+    def buildProtocol(self, addr):
+        p = self.protocol()
+        p.factory = self
+        e = Event('incoming-connection', listener = self)
+        p.event(e)
+        return p
+
+def setup_stream_listener(queue, name, port = 0, protocol = None):
+    if protocol == None:
+        protocol = IncomingXmppStream
+
+    factory = IncomingXmppFactory()
+    factory.protocol = lambda *args: protocol(queue.append, name)
     port = reactor.listenTCP(port, factory)
-    return (stream, port)
+
+    return (factory, port.getHost().port)
+
+class OutgoingXmppStream(BaseXmlStream):
+    def __init__(self, event_function, name, remote_name):
+        BaseXmlStream.__init__(self, event_function, name, remote_name)
+        self.addObserver(xmlstream.STREAM_CONNECTED_EVENT, self.connected)
+
+    def connected (self, stream):
+        self.send_header()
+
+def connect_to_stream(queue, name, remote_name, host, port, protocol = None):
+    if protocol == None:
+        protocol = OutgoingXmppStream
+
+    p = protocol(queue.append, name, remote_name)
+
+    factory = twisted.internet.protocol.ClientFactory()
+    factory.protocol = lambda *args: p
+    reactor.connectTCP(host, port, factory)
+
+    return p
+
+if __name__ == '__main__':
+    def run_test():
+        q = servicetest.IteratingEventQueue()
+        # Set verboseness if needed for debugging
+        #q.verbose = True
+
+        (listener, port) = setup_stream_listener(q, "incoming")
+        outbound = connect_to_stream(q, "outgoing",
+            "incoming", "localhost", port)
+
+        inbound = q.expect('incoming-connection',
+            listener = listener).connection
+
+        # inbound stream is opened first, then outbounds stream is opened and
+        # receive features
+        q.expect('stream-opened', connection = inbound)
+        q.expect('stream-opened', connection = outbound)
+        q.expect('stream-features', connection = outbound)
+
+
+        message = domish.Element(('','message'))
+        message.addElement('body', content="test123")
+        outbound.send(message)
+
+        e = q.expect('stream-message', connection=inbound)
+
+        # twisting twisted
+        reactor.stop()
+
+    reactor.callLater(0.1, run_test)
+    reactor.run()
-- 
1.5.6.3




More information about the Telepathy-commits mailing list