[telepathy-gabble/master] tubetestutil: add create_server

Guillaume Desmottes guillaume.desmottes at collabora.co.uk
Wed Apr 29 10:43:02 PDT 2009


---
 tests/twisted/tubes/tubetestutil.py |   29 ++++++++++++++++++++++++++++-
 1 files changed, 28 insertions(+), 1 deletions(-)

diff --git a/tests/twisted/tubes/tubetestutil.py b/tests/twisted/tubes/tubetestutil.py
index 326375d..93fa506 100644
--- a/tests/twisted/tubes/tubetestutil.py
+++ b/tests/twisted/tubes/tubetestutil.py
@@ -7,13 +7,14 @@ import os
 
 import dbus
 
-from servicetest import unwrap, assertContains, EventProtocolClientFactory
+from servicetest import unwrap, assertContains, EventProtocolClientFactory, EventProtocolFactory
 from gabbletest import exec_test
 import constants as cs
 import bytestream
 
 from twisted.internet import reactor
 from twisted.internet.protocol import Factory, Protocol
+from twisted.internet.error import CannotListenError
 
 def check_tube_in_tubes(tube, tubes):
     """
@@ -234,6 +235,32 @@ def connect_socket(q, address_type, address):
     else:
         assert False
 
+def create_server(q, address_type, factory=None):
+    if factory is None:
+        factory = EventProtocolFactory(q)
+    if address_type == cs.SOCKET_ADDRESS_TYPE_UNIX:
+        path = os.getcwd() + '/stream'
+        try:
+            os.remove(path)
+        except OSError, e:
+            if e.errno != errno.ENOENT:
+                raise
+        reactor.listenUNIX(path, factory)
+
+        return dbus.ByteArray(path)
+
+    elif address_type == cs.SOCKET_ADDRESS_TYPE_IPV4:
+        for port in range(5000,6000):
+            try:
+                reactor.listenTCP(port, factory)
+            except CannotListenError:
+                continue
+            else:
+                return ('127.0.0.1', dbus.UInt16(port))
+
+    else:
+        assert False
+
 def exec_tube_test(test, *args):
     for bytestream_cls in [
             bytestream.BytestreamIBBMsg,
-- 
1.5.6.5




More information about the telepathy-commits mailing list