[telepathy-gabble/master] tubetestutil: add create_server
Guillaume Desmottes
guillaume.desmottes at collabora.co.uk
Wed Apr 29 10:43:02 PDT 2009
---
tests/twisted/tubes/tubetestutil.py | 29 ++++++++++++++++++++++++++++-
1 files changed, 28 insertions(+), 1 deletions(-)
diff --git a/tests/twisted/tubes/tubetestutil.py b/tests/twisted/tubes/tubetestutil.py
index 326375d..93fa506 100644
--- a/tests/twisted/tubes/tubetestutil.py
+++ b/tests/twisted/tubes/tubetestutil.py
@@ -7,13 +7,14 @@ import os
import dbus
-from servicetest import unwrap, assertContains, EventProtocolClientFactory
+from servicetest import unwrap, assertContains, EventProtocolClientFactory, EventProtocolFactory
from gabbletest import exec_test
import constants as cs
import bytestream
from twisted.internet import reactor
from twisted.internet.protocol import Factory, Protocol
+from twisted.internet.error import CannotListenError
def check_tube_in_tubes(tube, tubes):
"""
@@ -234,6 +235,32 @@ def connect_socket(q, address_type, address):
else:
assert False
+def create_server(q, address_type, factory=None):
+ if factory is None:
+ factory = EventProtocolFactory(q)
+ if address_type == cs.SOCKET_ADDRESS_TYPE_UNIX:
+ path = os.getcwd() + '/stream'
+ try:
+ os.remove(path)
+ except OSError, e:
+ if e.errno != errno.ENOENT:
+ raise
+ reactor.listenUNIX(path, factory)
+
+ return dbus.ByteArray(path)
+
+ elif address_type == cs.SOCKET_ADDRESS_TYPE_IPV4:
+ for port in range(5000,6000):
+ try:
+ reactor.listenTCP(port, factory)
+ except CannotListenError:
+ continue
+ else:
+ return ('127.0.0.1', dbus.UInt16(port))
+
+ else:
+ assert False
+
def exec_tube_test(test, *args):
for bytestream_cls in [
bytestream.BytestreamIBBMsg,
--
1.5.6.5
More information about the telepathy-commits
mailing list