[Telepathy-commits] [telepathy-salut/master] Add a incoming and outgoing xmmp stream for testing
Sjoerd Simons
sjoerd.simons at collabora.co.uk
Wed Sep 3 11:41:58 PDT 2008
---
tests/twisted/xmppstream.py | 325 +++++++++++++++++++------------------------
1 files changed, 145 insertions(+), 180 deletions(-)
diff --git a/tests/twisted/xmppstream.py b/tests/twisted/xmppstream.py
index 31cebf5..8d1cb92 100644
--- a/tests/twisted/xmppstream.py
+++ b/tests/twisted/xmppstream.py
@@ -3,142 +3,21 @@
Infrastructure code testing CM by pretending to be a Jabber server.
"""
-import base64
-import os
-import sha
-import sys
-
import servicetest
+from servicetest import Event, EventPattern
import twisted
-from twisted.words.xish import domish, xpath
-from twisted.words.protocols.jabber.client import IQ
-from twisted.words.protocols.jabber import xmlstream
+from twisted.words.xish import domish, xpath, xmlstream
+from twisted.internet.protocol import Factory
from twisted.internet import reactor
-import dbus
-
-NS_XMPP_SASL = 'urn:ietf:params:xml:ns:xmpp-sasl'
-NS_XMPP_BIND = 'urn:ietf:params:xml:ns:xmpp-bind'
-
-def make_result_iq(stream, iq):
- result = IQ(stream, "result")
- result["id"] = iq["id"]
- query = iq.firstChildElement()
-
- if query:
- result.addElement((query.uri, query.name))
-
- return result
-
-def acknowledge_iq(stream, iq):
- stream.send(make_result_iq(stream, iq))
-
-def sync_stream(q, stream):
- """Used to ensure that Gabble has processed all stanzas sent to it."""
-
- iq = IQ(stream, "get")
- iq.addElement(('http://jabber.org/protocol/disco#info', 'query'))
- stream.send(iq)
- q.expect('stream-iq', query_ns='http://jabber.org/protocol/disco#info')
-
-class JabberAuthenticator(xmlstream.Authenticator):
- "Trivial XML stream authenticator that accepts one username/digest pair."
-
- def __init__(self, username, password):
- self.username = username
- self.password = password
- xmlstream.Authenticator.__init__(self)
-
- def streamStarted(self, root=None):
- if root:
- self.xmlstream.sid = root.getAttribute('id')
-
- self.xmlstream.sendHeader()
- self.xmlstream.addOnetimeObserver(
- "/iq/query[@xmlns='jabber:iq:auth']", self.initialIq)
-
- def initialIq(self, iq):
- result = IQ(self.xmlstream, "result")
- result["id"] = iq["id"]
- query = result.addElement('query')
- query["xmlns"] = "jabber:iq:auth"
- query.addElement('username', content='test')
- query.addElement('password')
- query.addElement('digest')
- query.addElement('resource')
- self.xmlstream.addOnetimeObserver('/iq/query/username', self.secondIq)
- self.xmlstream.send(result)
-
- def secondIq(self, iq):
- username = xpath.queryForNodes('/iq/query/username', iq)
- assert map(str, username) == [self.username]
-
- digest = xpath.queryForNodes('/iq/query/digest', iq)
- expect = sha.sha(self.xmlstream.sid + self.password).hexdigest()
- assert map(str, digest) == [expect]
-
- resource = xpath.queryForNodes('/iq/query/resource', iq)
- assert map(str, resource) == ['Resource']
-
- result = IQ(self.xmlstream, "result")
- result["id"] = iq["id"]
- self.xmlstream.send(result)
- self.xmlstream.dispatch(self.xmlstream, xmlstream.STREAM_AUTHD_EVENT)
-
-
-class XmppAuthenticator(xmlstream.Authenticator):
- def __init__(self, username, password):
- xmlstream.Authenticator.__init__(self)
- self.username = username
- self.password = password
- self.authenticated = False
-
- def streamStarted(self, root=None):
- if root:
- self.xmlstream.sid = root.getAttribute('id')
-
- self.xmlstream.sendHeader()
-
- if self.authenticated:
- # Initiator authenticated itself, and has started a new stream.
-
- features = domish.Element((xmlstream.NS_STREAMS, 'features'))
- bind = features.addElement((NS_XMPP_BIND, 'bind'))
- self.xmlstream.send(features)
-
- self.xmlstream.addOnetimeObserver(
- "/iq/bind[@xmlns='%s']" % NS_XMPP_BIND, self.bindIq)
- else:
- features = domish.Element((xmlstream.NS_STREAMS, 'features'))
- mechanisms = features.addElement((NS_XMPP_SASL, 'mechanisms'))
- mechanism = mechanisms.addElement('mechanism', content='PLAIN')
- self.xmlstream.send(features)
-
- self.xmlstream.addOnetimeObserver("/auth", self.auth)
-
- def auth(self, auth):
- assert (base64.b64decode(str(auth)) ==
- '\x00%s\x00%s' % (self.username, self.password))
-
- success = domish.Element((NS_XMPP_SASL, 'success'))
- self.xmlstream.send(success)
- self.xmlstream.reset()
- self.authenticated = True
-
- def bindIq(self, iq):
- assert xpath.queryForString('/iq/bind/resource', iq) == 'Resource'
-
- result = IQ(self.xmlstream, "result")
- result["id"] = iq["id"]
- bind = result.addElement((NS_XMPP_BIND, 'bind'))
- jid = bind.addElement('jid', content='test at localhost/Resource')
- self.xmlstream.send(result)
-
- self.xmlstream.dispatch(self.xmlstream, xmlstream.STREAM_AUTHD_EVENT)
+NS_STREAMS = 'http://etherx.jabber.org/streams'
def make_stream_event(type, stanza):
event = servicetest.Event(type, stanza=stanza)
- event.to = stanza.getAttribute("to")
+ if stanza.hasAttribute("to"):
+ event.name = stanza.getAttribute("to")
+ if stanza.hasAttribute("from"):
+ event.remote_name = stanza.getAttribute("from")
return event
def make_iq_event(iq):
@@ -167,58 +46,144 @@ def make_message_event(stanza):
return event
class BaseXmlStream(xmlstream.XmlStream):
- initiating = False
- namespace = 'jabber:client'
-
- def __init__(self, event_func, authenticator):
- xmlstream.XmlStream.__init__(self, authenticator)
- self.event_func = event_func
- self.addObserver('//iq', lambda x: event_func(
+ prefixes = { NS_STREAMS: 'stream' }
+ version = "1.0"
+
+ def __init__(self, event_function, name = None, remote_name = None):
+ xmlstream.XmlStream.__init__(self)
+
+ self.name = name
+ self.remote_name = remote_name
+ self.event_func = event_function
+
+ self.event_function = event_function
+ self.addObserver(xmlstream.STREAM_START_EVENT,
+ lambda *args: self.event(Event('stream-opened')))
+ self.addObserver('//features', lambda x: self.event(
+ make_stream_event('stream-features', x)))
+ self.addObserver('//iq', lambda x: self.event(
make_iq_event(x)))
- self.addObserver('//message', lambda x: event_func(
+ self.addObserver('//message', lambda x: self.event(
make_message_event(x)))
- self.addObserver('//presence', lambda x: event_func(
+ self.addObserver('//presence', lambda x: self.event(
make_presence_event(x)))
- self.addObserver('//event/stream/authd', self._cb_authd)
-
- def _cb_authd(self, _):
- # called when stream is authenticated
- self.addObserver(
- "/iq/query[@xmlns='http://jabber.org/protocol/disco#info']",
- self._cb_disco_iq)
- self.event_func(servicetest.Event('stream-authenticated'))
-
- def _cb_disco_iq(self, iq):
- if iq.getAttribute('to') == 'localhost':
- # add PEP support
- nodes = xpath.queryForNodes(
- "/iq/query[@xmlns='http://jabber.org/protocol/disco#info']",
- iq)
- query = nodes[0]
- identity = query.addElement('identity')
- identity['category'] = 'pubsub'
- identity['type'] = 'pep'
-
- iq['type'] = 'result'
- self.send(iq)
-
-class JabberXmlStream(BaseXmlStream):
- version = (0, 9)
-
-class XmppXmlStream(BaseXmlStream):
- version = (1, 0)
-
-def make_stream(event_func, authenticator=None, protocol=None, port=4242):
- # set up Jabber server
-
- if authenticator is None:
- authenticator = JabberAuthenticator('test', 'pass')
-
- if protocol is None:
- protocol = JabberXmlStream
-
- stream = protocol(event_func, authenticator)
- factory = twisted.internet.protocol.Factory()
- factory.protocol = lambda *args: stream
+
+ def send_header(self):
+ root = domish.Element((NS_STREAMS, 'stream'))
+ root['from'] = self.name
+ root['to'] = self.remote_name
+ root['version'] = self.version
+ self.send(root.toXml(closeElement = 0, prefixes=self.prefixes))
+
+ def event(self, e):
+ e.connection = self
+ self.event_function(e)
+
+ def send(self, obj):
+ if domish.IElement.providedBy(obj):
+ if self.name != None:
+ obj["from"] = self.name
+ if self.remote_name != None:
+ obj["to"] = self.remote_name
+ obj = obj.toXml(prefixes=self.prefixes)
+
+ xmlstream.XmlStream.send(self, obj)
+
+
+class IncomingXmppStream(BaseXmlStream):
+ def __init__(self, event_func, name):
+ BaseXmlStream.__init__(self, event_func)
+ self.name = name
+ self.remote_name = None
+
+ def onDocumentStart(self, rootElement):
+ # Use the fact that it's always salut that connects, so it sends a
+ # proper opening
+ assert rootElement.name == "stream"
+ assert rootElement.uri == NS_STREAMS
+
+ assert rootElement.hasAttribute("from")
+ assert rootElement.hasAttribute("to")
+ assert rootElement["to"] == self.name, self.name
+
+ assert rootElement.hasAttribute("version")
+ assert rootElement["version"] == "1.0"
+
+ self.remote_name = rootElement["from"]
+ self.send_header()
+ self.send_features()
+ BaseXmlStream.onDocumentStart(self, rootElement)
+
+ def send_features(self):
+ features = domish.Element((NS_STREAMS, 'features'))
+ self.send(features)
+
+class IncomingXmppFactory(Factory):
+ def buildProtocol(self, addr):
+ p = self.protocol()
+ p.factory = self
+ e = Event('incoming-connection', listener = self)
+ p.event(e)
+ return p
+
+def setup_stream_listener(queue, name, port = 0, protocol = None):
+ if protocol == None:
+ protocol = IncomingXmppStream
+
+ factory = IncomingXmppFactory()
+ factory.protocol = lambda *args: protocol(queue.append, name)
port = reactor.listenTCP(port, factory)
- return (stream, port)
+
+ return (factory, port.getHost().port)
+
+class OutgoingXmppStream(BaseXmlStream):
+ def __init__(self, event_function, name, remote_name):
+ BaseXmlStream.__init__(self, event_function, name, remote_name)
+ self.addObserver(xmlstream.STREAM_CONNECTED_EVENT, self.connected)
+
+ def connected (self, stream):
+ self.send_header()
+
+def connect_to_stream(queue, name, remote_name, host, port, protocol = None):
+ if protocol == None:
+ protocol = OutgoingXmppStream
+
+ p = protocol(queue.append, name, remote_name)
+
+ factory = twisted.internet.protocol.ClientFactory()
+ factory.protocol = lambda *args: p
+ reactor.connectTCP(host, port, factory)
+
+ return p
+
+if __name__ == '__main__':
+ def run_test():
+ q = servicetest.IteratingEventQueue()
+ # Set verboseness if needed for debugging
+ #q.verbose = True
+
+ (listener, port) = setup_stream_listener(q, "incoming")
+ outbound = connect_to_stream(q, "outgoing",
+ "incoming", "localhost", port)
+
+ inbound = q.expect('incoming-connection',
+ listener = listener).connection
+
+ # inbound stream is opened first, then outbounds stream is opened and
+ # receive features
+ q.expect('stream-opened', connection = inbound)
+ q.expect('stream-opened', connection = outbound)
+ q.expect('stream-features', connection = outbound)
+
+
+ message = domish.Element(('','message'))
+ message.addElement('body', content="test123")
+ outbound.send(message)
+
+ e = q.expect('stream-message', connection=inbound)
+
+ # twisting twisted
+ reactor.stop()
+
+ reactor.callLater(0.1, run_test)
+ reactor.run()
--
1.5.6.3
More information about the Telepathy-commits
mailing list