[telepathy-gabble/master] remove v1 test API
Dafydd Harries
daf at rhydd.org
Tue Mar 31 06:33:43 PDT 2009
---
tests/twisted/gabbletest.py | 72 --------------------
tests/twisted/servicetest.py | 147 ------------------------------------------
2 files changed, 0 insertions(+), 219 deletions(-)
diff --git a/tests/twisted/gabbletest.py b/tests/twisted/gabbletest.py
index d4e673b..9c7e9e6 100644
--- a/tests/twisted/gabbletest.py
+++ b/tests/twisted/gabbletest.py
@@ -312,34 +312,6 @@ def make_stream(event_func, authenticator=None, protocol=None, port=4242):
port = reactor.listenTCP(port, factory)
return (stream, port)
-def go(params=None, authenticator=None, protocol=None, start=None):
- # hack to ease debugging
- domish.Element.__repr__ = domish.Element.toXml
-
- bus = dbus.SessionBus()
- handler = servicetest.EventTest()
- conn = make_connection(bus, handler.handle_event, params)
- (stream, _) = make_stream(handler.handle_event, authenticator, protocol)
- handler.data = {
- 'bus': bus,
- 'conn': conn,
- 'conn_iface': dbus.Interface(conn,
- 'org.freedesktop.Telepathy.Connection'),
- 'stream': stream}
- handler.data['test'] = handler
- handler.verbose = (os.environ.get('CHECK_TWISTED_VERBOSE', '') != '')
- map(handler.expect, servicetest.load_event_handlers())
-
- if '-v' in sys.argv:
- handler.verbose = True
-
- if start is None:
- handler.data['conn'].Connect()
- else:
- start(handler.data)
-
- reactor.run()
-
def install_colourer():
def red(s):
return '\x1b[31m%s\x1b[0m' % s
@@ -446,50 +418,6 @@ def expect_and_handle_set_vcard(q, stream, check=None):
stream.send(make_result_iq(stream, iq))
-def handle_get_vcard(event, data):
- iq = event.stanza
-
- if iq['type'] != 'get':
- return False
-
- if iq.uri != 'jabber:client':
- return False
-
- vcard = list(iq.elements())[0]
-
- if vcard.name != 'vCard':
- return False
-
- # Send back current vCard
- new_iq = IQ(data['stream'], 'result')
- new_iq['id'] = iq['id']
- new_iq.addChild(current_vcard)
- data['stream'].send(new_iq)
- return True
-
-def handle_set_vcard(event, data):
- global current_vcard
- iq = event.stanza
-
- if iq['type'] != 'set':
- return False
-
- if iq.uri != 'jabber:client':
- return False
-
- vcard = list(iq.elements())[0]
-
- if vcard.name != 'vCard':
- return False
-
- current_vcard = iq.firstChildElement()
-
- new_iq = IQ(data['stream'], 'result')
- new_iq['id'] = iq['id']
- data['stream'].send(new_iq)
- return True
-
-
def _elem_add(elem, *children):
for child in children:
if isinstance(child, domish.Element):
diff --git a/tests/twisted/servicetest.py b/tests/twisted/servicetest.py
index fad67ca..056cd81 100644
--- a/tests/twisted/servicetest.py
+++ b/tests/twisted/servicetest.py
@@ -8,7 +8,6 @@ from twisted.internet.protocol import Protocol, Factory, ClientFactory
glib2reactor.install()
import pprint
-import traceback
import unittest
import dbus.glib
@@ -18,38 +17,6 @@ from twisted.internet import reactor
tp_name_prefix = 'org.freedesktop.Telepathy'
tp_path_prefix = '/org/freedesktop/Telepathy'
-class TryNextHandler(Exception):
- pass
-
-def lazy(func):
- def handler(event, data):
- if func(event, data):
- return True
- else:
- raise TryNextHandler()
- handler.__name__ = func.__name__
- return handler
-
-def match(type, **kw):
- def decorate(func):
- def handler(event, data, *extra, **extra_kw):
- if event.type != type:
- return False
-
- for key, value in kw.iteritems():
- if not hasattr(event, key):
- return False
-
- if getattr(event, key) != value:
- return False
-
- return func(event, data, *extra, **extra_kw)
-
- handler.__name__ = func.__name__
- return handler
-
- return decorate
-
class Event:
def __init__(self, type, **kw):
self.__dict__.update(kw)
@@ -68,109 +35,6 @@ def format_event(event):
return ret
-class EventTest:
- """Somewhat odd event dispatcher for asynchronous tests.
-
- Callbacks are kept in a queue. Incoming events are passed to the first
- callback. If the callback returns True, the callback is removed. If the
- callback raises AssertionError, the test fails. If there are no more
- callbacks, the test passes. The reactor is stopped when the test passes.
- """
-
- def __init__(self):
- self.queue = []
- self.data = {'test': self}
- self.timeout_delayed_call = reactor.callLater(5, self.timeout_cb)
- #self.verbose = True
- self.verbose = False
- # ugh
- self.stopping = False
-
- def timeout_cb(self):
- print 'timed out waiting for events'
- print self.queue[0]
- self.fail()
-
- def fail(self):
- # ugh; better way to stop the reactor and exit(1)?
- import os
- os._exit(1)
-
- def expect(self, f):
- self.queue.append(f)
-
- def log(self, s):
- if self.verbose:
- print s
-
- def try_stop(self):
- if self.stopping:
- return True
-
- if not self.queue:
- self.log('no handlers left; stopping')
- self.stopping = True
- reactor.stop()
- return True
-
- return False
-
- def call_handlers(self, event):
- self.log('trying %r' % self.queue[0])
- handler = self.queue.pop(0)
-
- try:
- ret = handler(event, self.data)
- if not ret:
- self.queue.insert(0, handler)
- except TryNextHandler, e:
- if self.queue:
- ret = self.call_handlers(event)
- else:
- ret = False
- self.queue.insert(0, handler)
-
- return ret
-
- def handle_event(self, event):
- if self.try_stop():
- return
-
- self.log('got event:')
- self.log('- type: %s' % event.type)
- map(self.log, format_event(event))
-
- try:
- ret = self.call_handlers(event)
- except SystemExit, e:
- if e.code:
- print "Unsuccessful exit:", e
- self.fail()
- else:
- self.queue[:] = []
- ret = True
- except AssertionError, e:
- print 'test failed:'
- traceback.print_exc()
- self.fail()
- except (Exception, KeyboardInterrupt), e:
- print 'error in handler:'
- traceback.print_exc()
- self.fail()
-
- if ret not in (True, False):
- print ("warning: %s() returned something other than True or False"
- % self.queue[0].__name__)
-
- if ret:
- self.timeout_delayed_call.reset(5)
- self.log('event handled')
- else:
- self.log('event not handled')
-
- self.log('')
- self.try_stop()
-
class EventPattern:
def __init__(self, type, **properties):
self.type = type
@@ -471,17 +335,6 @@ def make_channel_proxy(conn, path, iface):
chan = dbus.Interface(chan, tp_name_prefix + '.' + iface)
return chan
-def load_event_handlers():
- path, _, _, _ = traceback.extract_stack()[0]
- import compiler
- import __main__
- ast = compiler.parseFile(path)
- return [
- getattr(__main__, node.name)
- for node in ast.node.asList()
- if node.__class__ == compiler.ast.Function and
- node.name.startswith('expect_')]
-
class EventProtocol(Protocol):
def __init__(self, queue=None):
self.queue = queue
--
1.5.6.5
More information about the telepathy-commits
mailing list