[telepathy-salut/master] test-receive-file-ipv6.py: check if our IPv6 service has been actually announced and skip test if it doesn't
Guillaume Desmottes
guillaume.desmottes at collabora.co.uk
Wed Apr 1 08:09:47 PDT 2009
---
tests/twisted/avahi/test-receive-file-ipv6.py | 34 ++++++++++--------------
1 files changed, 14 insertions(+), 20 deletions(-)
diff --git a/tests/twisted/avahi/test-receive-file-ipv6.py b/tests/twisted/avahi/test-receive-file-ipv6.py
index 0567d50..0e1e9d2 100644
--- a/tests/twisted/avahi/test-receive-file-ipv6.py
+++ b/tests/twisted/avahi/test-receive-file-ipv6.py
@@ -7,12 +7,12 @@ import socket
from saluttest import exec_test
from file_transfer_helper import ReceiveFileTest
-from avahitest import AvahiAnnouncer, get_host_name, AvahiListener
+from avahitest import AvahiAnnouncer, get_host_name, AvahiListener,\
+ AvahiService, get_domain_name
from xmppstream import connect_to_stream6, setup_stream_listener6
from servicetest import TimeoutError
from twisted.words.xish import domish
-import constants as cs
class TestReceiveFileIPv6(ReceiveFileTest):
CONTACT_NAME = 'test-ft'
@@ -26,25 +26,19 @@ class TestReceiveFileIPv6(ReceiveFileTest):
self.contact_service = AvahiAnnouncer(self.contact_name, "_presence._tcp", port,
basic_txt, proto=avahi.PROTO_INET6)
- def wait_for_contact(self, name=CONTACT_NAME):
- publish_handle = self.conn.RequestHandles(cs.HT_CONTACT_LIST, ["publish"])[0]
- publish = self.conn.RequestChannel(
- "org.freedesktop.Telepathy.Channel.Type.ContactList",
- cs.HT_CONTACT_LIST, publish_handle, False)
-
- self.handle = 0
- # Wait until the record shows up in publish
- while self.handle == 0:
- try:
- e = self.q.expect('dbus-signal', signal='MembersChanged', path=publish)
- except TimeoutError:
- print "skip test as IPv6 doesn't seem to be enabled in Avahi"
- return True
-
- for h in e.args[1]:
- name = self.conn.InspectHandles(cs.HT_CONTACT, [h])[0]
- if name == self.contact_name:
- self.handle = h
+ # Avahi doesn't complain if we try to announce an IPv6 service with a
+ # not IPv6 enabled Avahi (http://www.avahi.org/ticket/264) so we try to
+ # resolve our own service to check if it has been actually announced.
+ service = AvahiService(self.q, self.contact_service.bus, self.contact_service.server,
+ avahi.IF_UNSPEC, self.contact_service.proto, self.contact_service.name,
+ self.contact_service.type, get_domain_name(), avahi.PROTO_INET6, 0)
+ service.resolve()
+
+ try:
+ self.q.expect('service-resolved', service=service)
+ except TimeoutError:
+ print "skip test as IPv6 doesn't seem to be enabled in Avahi"
+ return True
def _resolve_salut_presence(self):
AvahiListener(self.q).listen_for_service("_presence._tcp")
--
1.5.6.5
More information about the telepathy-commits
mailing list