[Telepathy-commits] [telepathy-gabble/master] Remove a 'from constants import *'

Will Thompson will.thompson at collabora.co.uk
Wed Feb 18 09:01:58 PST 2009


Also remove a bunch of redundant checking, and clear up other test code.
---
 ...offer-accept-private-dbus-stream-tube-socks5.py |  126 ++++++++------------
 1 files changed, 50 insertions(+), 76 deletions(-)

diff --git a/tests/twisted/tubes/offer-accept-private-dbus-stream-tube-socks5.py b/tests/twisted/tubes/offer-accept-private-dbus-stream-tube-socks5.py
index a71078b..cd7c5f3 100644
--- a/tests/twisted/tubes/offer-accept-private-dbus-stream-tube-socks5.py
+++ b/tests/twisted/tubes/offer-accept-private-dbus-stream-tube-socks5.py
@@ -12,7 +12,7 @@ from dbus.lowlevel import SignalMessage
 from servicetest import call_async, EventPattern, tp_name_prefix, \
      watch_tube_signals, sync_dbus
 from gabbletest import exec_test, acknowledge_iq, sync_stream
-from constants import *
+import constants as cs
 import ns
 import tubetestutil as t
 
@@ -74,6 +74,8 @@ def test(q, bus, conn, stream):
             query_name='vCard'),
         EventPattern('stream-iq', query_ns='jabber:iq:roster'))
 
+    self_handle = conn.GetSelfHandle()
+
     acknowledge_iq(stream, vcard_event.stanza)
 
     roster = roster_event.stanza
@@ -130,15 +132,13 @@ def test(q, bus, conn, stream):
     # Ensure that Joe and Bob's caps have been received
     sync_stream(q, stream)
 
-    # new requestotron
-    requestotron = dbus.Interface(conn,
-            'org.freedesktop.Telepathy.Connection.Interface.Requests')
+    requestotron = dbus.Interface(conn, cs.CONN_IFACE_REQUESTS)
 
     # Test tubes with Joe. Joe does not have tube capabilities.
     # Gabble does not allow to offer a tube to him.
     joe_handle = conn.RequestHandles(1, ['joe at localhost'])[0]
-    call_async(q, conn, 'RequestChannel',
-            tp_name_prefix + '.Channel.Type.Tubes', 1, joe_handle, True);
+    call_async(q, conn, 'RequestChannel', cs.CHANNEL_TYPE_TUBES,
+            cs.HT_CONTACT, joe_handle, True);
 
     ret, old_sig, new_sig = q.expect_many(
         EventPattern('dbus-return', method='RequestChannel'),
@@ -148,8 +148,7 @@ def test(q, bus, conn, stream):
     joe_chan_path = ret.value[0]
 
     joe_tubes_chan = bus.get_object(conn.bus_name, joe_chan_path)
-    joe_tubes_iface = dbus.Interface(joe_tubes_chan,
-        tp_name_prefix + '.Channel.Type.Tubes')
+    joe_tubes_iface = dbus.Interface(joe_tubes_chan, cs.CHANNEL_TYPE_TUBES)
     path = os.getcwd() + '/stream'
     call_async(q, joe_tubes_iface, 'OfferStreamTube',
         'echo', sample_parameters, 0, dbus.ByteArray(path), 0, "")
@@ -161,8 +160,8 @@ def test(q, bus, conn, stream):
     bob_handle = conn.RequestHandles(1, ['bob at localhost'])[0]
 
     # old requestotron
-    call_async(q, conn, 'RequestChannel',
-            tp_name_prefix + '.Channel.Type.Tubes', 1, bob_handle, True);
+    call_async(q, conn, 'RequestChannel', cs.CHANNEL_TYPE_TUBES, cs.HT_CONTACT,
+            bob_handle, True);
 
     ret, old_sig, new_sig = q.expect_many(
         EventPattern('dbus-return', method='RequestChannel'),
@@ -173,54 +172,48 @@ def test(q, bus, conn, stream):
     assert len(ret.value) == 1
     chan_path = ret.value[0]
 
-    t.check_NewChannel_signal(old_sig.args, CHANNEL_TYPE_TUBES, chan_path,
+    t.check_NewChannel_signal(old_sig.args, cs.CHANNEL_TYPE_TUBES, chan_path,
         bob_handle, True)
-    t.check_NewChannels_signal(new_sig.args, CHANNEL_TYPE_TUBES, chan_path,
-            bob_handle, 'bob at localhost', conn.GetSelfHandle())
+    t.check_NewChannels_signal(new_sig.args, cs.CHANNEL_TYPE_TUBES, chan_path,
+            bob_handle, 'bob at localhost', self_handle)
     old_tubes_channel_properties = new_sig.args[0][0]
 
     t.check_conn_properties(q, conn, [old_tubes_channel_properties])
     # Try to CreateChannel with correct properties
     # Gabble must succeed
     call_async(q, requestotron, 'CreateChannel',
-            {'org.freedesktop.Telepathy.Channel.ChannelType':
-                'org.freedesktop.Telepathy.Channel.Type.StreamTube.DRAFT',
-             'org.freedesktop.Telepathy.Channel.TargetHandleType':
-                1,
-             'org.freedesktop.Telepathy.Channel.TargetHandle':
-                bob_handle,
-             'org.freedesktop.Telepathy.Channel.Type.StreamTube.DRAFT.Service':
-                "newecho",
-            });
+            {cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_STREAM_TUBE,
+             cs.TARGET_HANDLE_TYPE: cs.HT_CONTACT,
+             cs.TARGET_HANDLE: bob_handle,
+             cs.STREAM_TUBE_SERVICE: "newecho",
+            })
     ret, old_sig, new_sig = q.expect_many(
         EventPattern('dbus-return', method='CreateChannel'),
         EventPattern('dbus-signal', signal='NewChannel'),
         EventPattern('dbus-signal', signal='NewChannels'),
         )
 
-    assert len(ret.value) == 2 # CreateChannel returns 2 values: o, a{sv}
-    new_chan_path = ret.value[0]
-    new_chan_prop_asv = ret.value[1]
+    new_chan_path, new_chan_prop_asv = ret.value
+
     assert new_chan_path.find("StreamTube") != -1, new_chan_path
     assert new_chan_path.find("SITubesChannel") == -1, new_chan_path
     # The path of the Channel.Type.Tubes object MUST be different to the path
     # of the Channel.Type.StreamTube object !
     assert chan_path != new_chan_path
 
-    t.check_NewChannel_signal(old_sig.args, CHANNEL_TYPE_STREAM_TUBE,
+    t.check_NewChannel_signal(old_sig.args, cs.CHANNEL_TYPE_STREAM_TUBE,
             new_chan_path, bob_handle, True)
-    t.check_NewChannels_signal(new_sig.args, CHANNEL_TYPE_STREAM_TUBE,
-            new_chan_path, bob_handle, 'bob at localhost', conn.GetSelfHandle())
+    t.check_NewChannels_signal(new_sig.args, cs.CHANNEL_TYPE_STREAM_TUBE,
+            new_chan_path, bob_handle, 'bob at localhost', self_handle)
     stream_tube_channel_properties = new_sig.args[0][0]
 
     t.check_conn_properties(q, conn,
             [old_tubes_channel_properties, stream_tube_channel_properties])
 
     tubes_chan = bus.get_object(conn.bus_name, chan_path)
-    tubes_iface = dbus.Interface(tubes_chan,
-        tp_name_prefix + '.Channel.Type.Tubes')
+    tubes_iface = dbus.Interface(tubes_chan, cs.CHANNEL_TYPE_TUBES)
 
-    t.check_channel_properties(q, bus, conn, tubes_chan, CHANNEL_TYPE_TUBES,
+    t.check_channel_properties(q, bus, conn, tubes_chan, cs.CHANNEL_TYPE_TUBES,
             bob_handle, "bob at localhost")
 
     # Offer the tube, old API
@@ -253,43 +246,29 @@ def test(q, bus, conn, stream):
                      }
 
     # We offered a tube using the old tube API and created one with the new
-    # API, so there is 2 tubes. Check the new tube API works
+    # API, so there are 2 tubes. Check the new tube API works
     assert len(filter(lambda x:
-                  x[1] == "org.freedesktop.Telepathy.Channel.Type.Tubes",
+                  x[1] == cs.CHANNEL_TYPE_TUBES,
                   conn.ListChannels())) == 1
     channels = filter(lambda x:
-      x[1] == "org.freedesktop.Telepathy.Channel.Type.StreamTube.DRAFT" and
+      x[1] == cs.CHANNEL_TYPE_STREAM_TUBE and
       x[0] == new_chan_path,
       conn.ListChannels())
     assert len(channels) == 1
     assert new_chan_path == channels[0][0]
 
     tube_chan = bus.get_object(conn.bus_name, channels[0][0])
-    tube_iface = dbus.Interface(tube_chan,
-        tp_name_prefix + '.Channel.Type.StreamTube.DRAFT')
+    tube_iface = dbus.Interface(tube_chan, cs.CHANNEL_TYPE_STREAM_TUBE)
+    tube_prop_iface = dbus.Interface(tube_chan, dbus.PROPERTIES_IFACE)
 
-    self_handle = conn.GetSelfHandle()
-    tube_basic_props = tube_chan.GetAll(
-            'org.freedesktop.Telepathy.Channel',
-            dbus_interface=dbus.PROPERTIES_IFACE)
-    assert tube_basic_props.get("InitiatorHandle") == self_handle
-
-    stream_tube_props = tube_chan.GetAll(
-            'org.freedesktop.Telepathy.Channel.Type.StreamTube.DRAFT',
-            dbus_interface=dbus.PROPERTIES_IFACE)
-    assert stream_tube_props.get("Service") == "newecho", stream_tube_props
-
-    tube_props = tube_chan.GetAll(
-            'org.freedesktop.Telepathy.Channel.Interface.Tube.DRAFT',
-            dbus_interface=dbus.PROPERTIES_IFACE)
-    # 3 == Tube_Channel_State_Not_Offered
-    assert tube_props.get("State") == 3, tube_props
+    service = tube_prop_iface.Get(cs.CHANNEL_TYPE_STREAM_TUBE, 'Service')
+    assert service == "newecho", service
 
-    t.check_channel_properties(q, bus, conn, tubes_chan, CHANNEL_TYPE_TUBES,
+    t.check_channel_properties(q, bus, conn, tubes_chan, cs.CHANNEL_TYPE_TUBES,
             bob_handle, "bob at localhost")
     t.check_channel_properties(q, bus, conn, tube_chan,
-            CHANNEL_TYPE_STREAM_TUBE, bob_handle, "bob at localhost",
-            TUBE_STATE_NOT_OFFERED)
+            cs.CHANNEL_TYPE_STREAM_TUBE, bob_handle, "bob at localhost",
+            cs.TUBE_STATE_NOT_OFFERED)
 
     # Offer the tube, new API
     path2 = os.getcwd() + '/stream2'
@@ -322,21 +301,16 @@ def test(q, bus, conn, stream):
                      }
     # The new tube has been offered, the parameters cannot be changed anymore
     # We need to use call_async to check the error
-    tube_prop_iface = dbus.Interface(tube_chan,
-        dbus.PROPERTIES_IFACE)
-    call_async(q, tube_prop_iface, 'Set',
-        'org.freedesktop.Telepathy.Channel.Interface.Tube.DRAFT',
+    call_async(q, tube_prop_iface, 'Set', cs.CHANNEL_IFACE_TUBE,
             'Parameters', dbus.Dictionary(
             {dbus.String(u'foo2'): dbus.String(u'bar2')},
             signature=dbus.Signature('sv')),
             dbus_interface=dbus.PROPERTIES_IFACE)
     set_error = q.expect('dbus-error')
     # check it is *not* correctly changed
-    tube_props = tube_chan.GetAll(
-            'org.freedesktop.Telepathy.Channel.Interface.Tube.DRAFT',
-            dbus_interface=dbus.PROPERTIES_IFACE, byte_arrays=True)
-    assert tube_props.get("Parameters") == new_sample_parameters, \
-            tube_props.get("Parameters")
+    params = tube_prop_iface.Get(cs.CHANNEL_IFACE_TUBE, 'Parameters',
+            byte_arrays=True)
+    assert params == new_sample_parameters, (params, new_sample_parameters)
 
     # The CM is the server, so fake a client wanting to talk to it
     # Old API tube
@@ -363,7 +337,7 @@ def test(q, bus, conn, stream):
     si_reply_event, _ = q.expect_many(
             EventPattern('stream-iq', iq_type='result'),
             EventPattern('dbus-signal', signal='TubeStateChanged',
-                args=[stream_tube_id, 2])) # 2 == OPEN
+                args=[stream_tube_id, cs.TUBE_STATE_OPEN]))
     iq = si_reply_event.stanza
     si = xpath.queryForNodes('/iq/si[@xmlns="%s"]' % ns.SI,
         iq)[0]
@@ -377,8 +351,8 @@ def test(q, bus, conn, stream):
     q.expect('dbus-signal', signal='StreamTubeNewConnection',
         args=[stream_tube_id, bob_handle])
 
-    expected_tube = (stream_tube_id, self_handle, TUBE_TYPE_STREAM, 'echo',
-        sample_parameters, TUBE_STATE_OPEN)
+    expected_tube = (stream_tube_id, self_handle, cs.TUBE_TYPE_STREAM, 'echo',
+        sample_parameters, cs.TUBE_STATE_OPEN)
     tubes = tubes_iface.ListTubes(byte_arrays=True)
     t.check_tube_in_tubes(expected_tube, tubes)
 
@@ -407,7 +381,7 @@ def test(q, bus, conn, stream):
     si_reply_event, _ = q.expect_many(
             EventPattern('stream-iq', iq_type='result'),
             EventPattern('dbus-signal', signal='TubeChannelStateChanged',
-                args=[2])) # 2 == OPEN
+                args=[cs.TUBE_STATE_OPEN]))
     iq = si_reply_event.stanza
     si = xpath.queryForNodes('/iq/si[@xmlns="%s"]' % ns.SI,
         iq)[0]
@@ -428,7 +402,7 @@ def test(q, bus, conn, stream):
         1,      # Unix stream
         'newecho',
         new_sample_parameters,
-        2,      # OPEN
+        cs.TUBE_STATE_OPEN,
         ) in tubes, tubes
 
     reactor.listenTCP(5086, S5BFactory(q.append))
@@ -639,13 +613,13 @@ def test(q, bus, conn, stream):
     stream.send(result)
 
     q.expect('dbus-signal', signal='TubeStateChanged',
-        args=[dbus_tube_id, 2]) # 2 == OPEN
+        args=[dbus_tube_id, cs.TUBE_STATE_OPEN])
 
     tubes = tubes_iface.ListTubes(byte_arrays=True)
-    expected_dtube = (dbus_tube_id, self_handle, TUBE_TYPE_DBUS,
-        'com.example.TestCase', sample_parameters, TUBE_STATE_OPEN)
-    expected_stube = (stream_tube_id, self_handle, TUBE_TYPE_STREAM,
-        'echo', sample_parameters, TUBE_STATE_OPEN)
+    expected_dtube = (dbus_tube_id, self_handle, cs.TUBE_TYPE_DBUS,
+        'com.example.TestCase', sample_parameters, cs.TUBE_STATE_OPEN)
+    expected_stube = (stream_tube_id, self_handle, cs.TUBE_TYPE_STREAM,
+        'echo', sample_parameters, cs.TUBE_STATE_OPEN)
     t.check_tube_in_tubes(expected_dtube, tubes)
     t.check_tube_in_tubes(expected_stube, tubes)
 
@@ -729,10 +703,10 @@ def test(q, bus, conn, stream):
     assert id == 69
     initiator_jid = conn.InspectHandles(1, [initiator])[0]
     assert initiator_jid == 'bob at localhost'
-    assert type == 0 # D-Bus tube
+    assert type == cs.TUBE_TYPE_DBUS
     assert service == 'com.example.TestCase2'
     assert parameters == {'login': 'TEST'}
-    assert state == 0 # local pending
+    assert state == cs.TUBE_STATE_LOCAL_PENDING
 
     # accept the tube
     call_async(q, tubes_iface, 'AcceptDBusTube', id)
-- 
1.5.6.5




More information about the telepathy-commits mailing list