[telepathy-gabble/master] remove v1 test API

Dafydd Harries daf at rhydd.org
Tue Mar 31 06:33:43 PDT 2009


---
 tests/twisted/gabbletest.py  |   72 --------------------
 tests/twisted/servicetest.py |  147 ------------------------------------------
 2 files changed, 0 insertions(+), 219 deletions(-)

diff --git a/tests/twisted/gabbletest.py b/tests/twisted/gabbletest.py
index d4e673b..9c7e9e6 100644
--- a/tests/twisted/gabbletest.py
+++ b/tests/twisted/gabbletest.py
@@ -312,34 +312,6 @@ def make_stream(event_func, authenticator=None, protocol=None, port=4242):
     port = reactor.listenTCP(port, factory)
     return (stream, port)
 
-def go(params=None, authenticator=None, protocol=None, start=None):
-    # hack to ease debugging
-    domish.Element.__repr__ = domish.Element.toXml
-
-    bus = dbus.SessionBus()
-    handler = servicetest.EventTest()
-    conn = make_connection(bus, handler.handle_event, params)
-    (stream, _) = make_stream(handler.handle_event, authenticator, protocol)
-    handler.data = {
-        'bus': bus,
-        'conn': conn,
-        'conn_iface': dbus.Interface(conn,
-            'org.freedesktop.Telepathy.Connection'),
-        'stream': stream}
-    handler.data['test'] = handler
-    handler.verbose = (os.environ.get('CHECK_TWISTED_VERBOSE', '') != '')
-    map(handler.expect, servicetest.load_event_handlers())
-
-    if '-v' in sys.argv:
-        handler.verbose = True
-
-    if start is None:
-        handler.data['conn'].Connect()
-    else:
-        start(handler.data)
-
-    reactor.run()
-
 def install_colourer():
     def red(s):
         return '\x1b[31m%s\x1b[0m' % s
@@ -446,50 +418,6 @@ def expect_and_handle_set_vcard(q, stream, check=None):
 
     stream.send(make_result_iq(stream, iq))
 
-def handle_get_vcard(event, data):
-    iq = event.stanza
-
-    if iq['type'] != 'get':
-        return False
-
-    if iq.uri != 'jabber:client':
-        return False
-
-    vcard = list(iq.elements())[0]
-
-    if vcard.name != 'vCard':
-        return False
-
-    # Send back current vCard
-    new_iq = IQ(data['stream'], 'result')
-    new_iq['id'] = iq['id']
-    new_iq.addChild(current_vcard)
-    data['stream'].send(new_iq)
-    return True
-
-def handle_set_vcard(event, data):
-    global current_vcard
-    iq = event.stanza
-
-    if iq['type'] != 'set':
-        return False
-
-    if iq.uri != 'jabber:client':
-        return False
-
-    vcard = list(iq.elements())[0]
-
-    if vcard.name != 'vCard':
-        return False
-
-    current_vcard = iq.firstChildElement()
-
-    new_iq = IQ(data['stream'], 'result')
-    new_iq['id'] = iq['id']
-    data['stream'].send(new_iq)
-    return True
-
-
 def _elem_add(elem, *children):
     for child in children:
         if isinstance(child, domish.Element):
diff --git a/tests/twisted/servicetest.py b/tests/twisted/servicetest.py
index fad67ca..056cd81 100644
--- a/tests/twisted/servicetest.py
+++ b/tests/twisted/servicetest.py
@@ -8,7 +8,6 @@ from twisted.internet.protocol import Protocol, Factory, ClientFactory
 glib2reactor.install()
 
 import pprint
-import traceback
 import unittest
 
 import dbus.glib
@@ -18,38 +17,6 @@ from twisted.internet import reactor
 tp_name_prefix = 'org.freedesktop.Telepathy'
 tp_path_prefix = '/org/freedesktop/Telepathy'
 
-class TryNextHandler(Exception):
-    pass
-
-def lazy(func):
-    def handler(event, data):
-        if func(event, data):
-            return True
-        else:
-            raise TryNextHandler()
-    handler.__name__ = func.__name__
-    return handler
-
-def match(type, **kw):
-    def decorate(func):
-        def handler(event, data, *extra, **extra_kw):
-            if event.type != type:
-                return False
-
-            for key, value in kw.iteritems():
-                if not hasattr(event, key):
-                    return False
-
-                if getattr(event, key) != value:
-                    return False
-
-            return func(event, data, *extra, **extra_kw)
-
-        handler.__name__ = func.__name__
-        return handler
-
-    return decorate
-
 class Event:
     def __init__(self, type, **kw):
         self.__dict__.update(kw)
@@ -68,109 +35,6 @@ def format_event(event):
 
     return ret
 
-class EventTest:
-    """Somewhat odd event dispatcher for asynchronous tests.
-
-    Callbacks are kept in a queue. Incoming events are passed to the first
-    callback. If the callback returns True, the callback is removed. If the
-    callback raises AssertionError, the test fails. If there are no more
-    callbacks, the test passes. The reactor is stopped when the test passes.
-    """
-
-    def __init__(self):
-        self.queue = []
-        self.data = {'test': self}
-        self.timeout_delayed_call = reactor.callLater(5, self.timeout_cb)
-        #self.verbose = True
-        self.verbose = False
-        # ugh
-        self.stopping = False
-
-    def timeout_cb(self):
-        print 'timed out waiting for events'
-        print self.queue[0]
-        self.fail()
-
-    def fail(self):
-        # ugh; better way to stop the reactor and exit(1)?
-        import os
-        os._exit(1)
-
-    def expect(self, f):
-        self.queue.append(f)
-
-    def log(self, s):
-        if self.verbose:
-            print s
-
-    def try_stop(self):
-        if self.stopping:
-            return True
-
-        if not self.queue:
-            self.log('no handlers left; stopping')
-            self.stopping = True
-            reactor.stop()
-            return True
-
-        return False
-
-    def call_handlers(self, event):
-        self.log('trying %r' % self.queue[0])
-        handler = self.queue.pop(0)
-
-        try:
-            ret = handler(event, self.data)
-            if not ret:
-                self.queue.insert(0, handler)
-        except TryNextHandler, e:
-            if self.queue:
-                ret = self.call_handlers(event)
-            else:
-                ret = False
-            self.queue.insert(0, handler)
-
-        return ret
-
-    def handle_event(self, event):
-        if self.try_stop():
-            return
-
-        self.log('got event:')
-        self.log('- type: %s' % event.type)
-        map(self.log, format_event(event))
-
-        try:
-            ret = self.call_handlers(event)
-        except SystemExit, e:
-            if e.code:
-                print "Unsuccessful exit:", e
-                self.fail()
-            else:
-                self.queue[:] = []
-                ret = True
-        except AssertionError, e:
-            print 'test failed:'
-            traceback.print_exc()
-            self.fail()
-        except (Exception, KeyboardInterrupt), e:
-            print 'error in handler:'
-            traceback.print_exc()
-            self.fail()
-
-        if ret not in (True, False):
-            print ("warning: %s() returned something other than True or False"
-                % self.queue[0].__name__)
-
-        if ret:
-            self.timeout_delayed_call.reset(5)
-            self.log('event handled')
-        else:
-            self.log('event not handled')
-
-        self.log('')
-        self.try_stop()
-
 class EventPattern:
     def __init__(self, type, **properties):
         self.type = type
@@ -471,17 +335,6 @@ def make_channel_proxy(conn, path, iface):
     chan = dbus.Interface(chan, tp_name_prefix + '.' + iface)
     return chan
 
-def load_event_handlers():
-    path, _, _, _ = traceback.extract_stack()[0]
-    import compiler
-    import __main__
-    ast = compiler.parseFile(path)
-    return [
-        getattr(__main__, node.name)
-        for node in ast.node.asList()
-        if node.__class__ == compiler.ast.Function and
-            node.name.startswith('expect_')]
-
 class EventProtocol(Protocol):
     def __init__(self, queue=None):
         self.queue = queue
-- 
1.5.6.5




More information about the telepathy-commits mailing list