[Telepathy-commits] [telepathy-gabble/master] Remove another 'from constants import *'

Will Thompson will.thompson at collabora.co.uk
Wed Feb 18 10:17:02 PST 2009


---
 .../offer-accept-private-dbus-stream-tube-ibb.py   |  144 ++++++++++----------
 1 files changed, 74 insertions(+), 70 deletions(-)

diff --git a/tests/twisted/tubes/offer-accept-private-dbus-stream-tube-ibb.py b/tests/twisted/tubes/offer-accept-private-dbus-stream-tube-ibb.py
index e4a414d..1d56425 100644
--- a/tests/twisted/tubes/offer-accept-private-dbus-stream-tube-ibb.py
+++ b/tests/twisted/tubes/offer-accept-private-dbus-stream-tube-ibb.py
@@ -9,7 +9,7 @@ from dbus.lowlevel import SignalMessage
 
 from servicetest import call_async, EventPattern, watch_tube_signals
 from gabbletest import exec_test, acknowledge_iq, sync_stream
-from constants import *
+import constants as cs
 import ns
 import tubetestutil as t
 
@@ -77,6 +77,7 @@ def test(q, bus, conn, stream):
         EventPattern('stream-iq', query_ns='jabber:iq:roster'))
 
     self_handle = conn.GetSelfHandle()
+
     acknowledge_iq(stream, vcard_event.stanza)
 
     roster = roster_event.stanza
@@ -112,13 +113,14 @@ def test(q, bus, conn, stream):
     sync_stream(q, stream)
 
     # new requestotron
-    requestotron = dbus.Interface(conn, CONN_IFACE_REQUESTS)
+    requestotron = dbus.Interface(conn, cs.CONN_IFACE_REQUESTS)
 
     # Test tubes with Bob. Bob does not have tube capabilities.
     bob_handle = conn.RequestHandles(1, ['bob at localhost'])[0]
 
     # old requestotron
-    call_async(q, conn, 'RequestChannel', CHANNEL_TYPE_TUBES, 1, bob_handle, True)
+    call_async(q, conn, 'RequestChannel', cs.CHANNEL_TYPE_TUBES, cs.HT_CONTACT,
+        bob_handle, True)
 
     ret, old_sig, new_sig = q.expect_many(
         EventPattern('dbus-return', method='RequestChannel'),
@@ -129,9 +131,10 @@ def test(q, bus, conn, stream):
     assert len(ret.value) == 1
     chan_path = ret.value[0]
 
-    t.check_NewChannel_signal(old_sig.args, CHANNEL_TYPE_TUBES, chan_path, bob_handle, True)
-    t.check_NewChannels_signal(new_sig.args, CHANNEL_TYPE_TUBES, chan_path,
-            bob_handle, 'bob at localhost', conn.GetSelfHandle())
+    t.check_NewChannel_signal(old_sig.args, cs.CHANNEL_TYPE_TUBES, chan_path,
+            bob_handle, True)
+    t.check_NewChannels_signal(new_sig.args, cs.CHANNEL_TYPE_TUBES, chan_path,
+            bob_handle, 'bob at localhost', self_handle)
     old_tubes_channel_properties = new_sig.args[0][0]
 
     t.check_conn_properties(q, conn, [old_tubes_channel_properties])
@@ -139,11 +142,11 @@ def test(q, bus, conn, stream):
     # Try to CreateChannel with correct properties
     # Gabble must succeed
     call_async(q, requestotron, 'CreateChannel',
-            {CHANNEL_TYPE: CHANNEL_TYPE_STREAM_TUBE,
-             TARGET_HANDLE_TYPE: HT_CONTACT,
-             TARGET_HANDLE: bob_handle,
-             STREAM_TUBE_SERVICE: 'newecho',
-            });
+            {cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_STREAM_TUBE,
+             cs.TARGET_HANDLE_TYPE: cs.HT_CONTACT,
+             cs.TARGET_HANDLE: bob_handle,
+             cs.STREAM_TUBE_SERVICE: "newecho",
+            })
 
     # the NewTube signal (old API) is not fired now as the tube wasn't offered
     # yet
@@ -157,8 +160,8 @@ def test(q, bus, conn, stream):
     new_chan_path = ret.value[0]
     new_chan_prop_asv = ret.value[1]
     # State and Parameters are mutables so not announced
-    assert (TUBE_STATE) not in new_chan_prop_asv
-    assert (TUBE_PARAMETERS) not in new_chan_prop_asv
+    assert cs.TUBE_STATE not in new_chan_prop_asv
+    assert cs.TUBE_PARAMETERS not in new_chan_prop_asv
     assert new_chan_path.find("StreamTube") != -1, new_chan_path
     assert new_chan_path.find("SITubesChannel") == -1, new_chan_path
     # The path of the Channel.Type.Tubes object MUST be different to the path
@@ -166,31 +169,30 @@ def test(q, bus, conn, stream):
     assert chan_path != new_chan_path
 
     new_tube_chan = bus.get_object(conn.bus_name, new_chan_path)
-    new_tube_iface = dbus.Interface(new_tube_chan, CHANNEL_TYPE_STREAM_TUBE)
+    new_tube_iface = dbus.Interface(new_tube_chan, cs.CHANNEL_TYPE_STREAM_TUBE)
 
     # check State and Parameters
-    new_tube_props = new_tube_chan.GetAll(CHANNEL_IFACE_TUBE,
+    new_tube_props = new_tube_chan.GetAll(cs.CHANNEL_IFACE_TUBE,
             dbus_interface=PROPERTIES_IFACE)
 
     # the tube created using the old API is in the "not offered" state
-    assert new_tube_props['State'] == TUBE_CHANNEL_STATE_NOT_OFFERED
+    assert new_tube_props['State'] == cs.TUBE_CHANNEL_STATE_NOT_OFFERED
 
-    t.check_NewChannel_signal(old_sig.args, CHANNEL_TYPE_STREAM_TUBE,
+    t.check_NewChannel_signal(old_sig.args, cs.CHANNEL_TYPE_STREAM_TUBE,
             new_chan_path, bob_handle, True)
-    t.check_NewChannels_signal(new_sig.args, CHANNEL_TYPE_STREAM_TUBE,
-            new_chan_path, bob_handle, 'bob at localhost', conn.GetSelfHandle())
+    t.check_NewChannels_signal(new_sig.args, cs.CHANNEL_TYPE_STREAM_TUBE,
+            new_chan_path, bob_handle, 'bob at localhost', self_handle)
     stream_tube_channel_properties = new_sig.args[0][0]
-    assert TUBE_STATE not in stream_tube_channel_properties
-    assert TUBE_PARAMETERS not in stream_tube_channel_properties
+    assert cs.TUBE_STATE not in stream_tube_channel_properties
+    assert cs.TUBE_PARAMETERS not in stream_tube_channel_properties
 
     t.check_conn_properties(q, conn,
             [old_tubes_channel_properties, stream_tube_channel_properties])
 
     tubes_chan = bus.get_object(conn.bus_name, chan_path)
+    tubes_iface = dbus.Interface(tubes_chan, cs.CHANNEL_TYPE_TUBES)
 
-    tubes_iface = dbus.Interface(tubes_chan, CHANNEL_TYPE_TUBES)
-
-    t.check_channel_properties(q, bus, conn, tubes_chan, CHANNEL_TYPE_TUBES,
+    t.check_channel_properties(q, bus, conn, tubes_chan, cs.CHANNEL_TYPE_TUBES,
             bob_handle, "bob at localhost")
 
     # Create another tube using old API
@@ -231,22 +233,22 @@ def test(q, bus, conn, stream):
 
 
     # the tube channel (new API) is announced
-    t.check_NewChannel_signal(new_chan.args, CHANNEL_TYPE_STREAM_TUBE,
+    t.check_NewChannel_signal(new_chan.args, cs.CHANNEL_TYPE_STREAM_TUBE,
         None, bob_handle, False)
-    t.check_NewChannels_signal(new_chans.args, CHANNEL_TYPE_STREAM_TUBE,
+    t.check_NewChannels_signal(new_chans.args, cs.CHANNEL_TYPE_STREAM_TUBE,
         new_chan.args[0], bob_handle, "bob at localhost", self_handle)
 
     props = new_chans.args[0][0][1]
-    assert TUBE_STATE not in props
-    assert TUBE_PARAMETERS not in props
+    assert cs.TUBE_STATE not in props
+    assert cs.TUBE_PARAMETERS not in props
 
     # We offered a tube using the old tube API and created one with the new
     # API, so there are 2 tubes. Check the new tube API works
     assert len(filter(lambda x:
-                  x[1] == CHANNEL_TYPE_TUBES,
+                  x[1] == cs.CHANNEL_TYPE_TUBES,
                   conn.ListChannels())) == 1
     channels = filter(lambda x:
-      x[1] == CHANNEL_TYPE_STREAM_TUBE and
+      x[1] == cs.CHANNEL_TYPE_STREAM_TUBE and
       x[0] == new_chan_path,
       conn.ListChannels())
     assert len(channels) == 1
@@ -254,26 +256,26 @@ def test(q, bus, conn, stream):
 
     old_tube_chan = bus.get_object(conn.bus_name, new_chan.args[0])
 
-    tube_basic_props = old_tube_chan.GetAll(CHANNEL,
+    tube_basic_props = old_tube_chan.GetAll(cs.CHANNEL,
             dbus_interface=PROPERTIES_IFACE)
     assert tube_basic_props.get("InitiatorHandle") == self_handle
 
-    stream_tube_props = old_tube_chan.GetAll(CHANNEL_TYPE_STREAM_TUBE,
+    stream_tube_props = old_tube_chan.GetAll(cs.CHANNEL_TYPE_STREAM_TUBE,
             dbus_interface=PROPERTIES_IFACE)
     assert stream_tube_props.get("Service") == "echo", stream_tube_props
 
-    old_tube_props = old_tube_chan.GetAll(CHANNEL_IFACE_TUBE,
+    old_tube_props = old_tube_chan.GetAll(cs.CHANNEL_IFACE_TUBE,
             dbus_interface=PROPERTIES_IFACE, byte_arrays=True)
     assert old_tube_props.get("Parameters") == dbus.Dictionary(sample_parameters)
 
     # Tube have been created using the old API and so is already offered
-    assert old_tube_props['State'] == TUBE_CHANNEL_STATE_REMOTE_PENDING
+    assert old_tube_props['State'] == cs.TUBE_CHANNEL_STATE_REMOTE_PENDING
 
-    t.check_channel_properties(q, bus, conn, tubes_chan, CHANNEL_TYPE_TUBES,
+    t.check_channel_properties(q, bus, conn, tubes_chan, cs.CHANNEL_TYPE_TUBES,
             bob_handle, "bob at localhost")
     t.check_channel_properties(q, bus, conn, old_tube_chan,
-            CHANNEL_TYPE_STREAM_TUBE, bob_handle, "bob at localhost",
-            TUBE_CHANNEL_STATE_REMOTE_PENDING)
+            cs.CHANNEL_TYPE_STREAM_TUBE, bob_handle, "bob at localhost",
+            cs.TUBE_CHANNEL_STATE_REMOTE_PENDING)
 
     # Offer the first tube created (new API)
     path2 = os.getcwd() + '/stream2'
@@ -285,7 +287,7 @@ def test(q, bus, conn, stream):
         EventPattern('dbus-signal', signal='NewTube'),
         EventPattern('dbus-signal', signal='TubeChannelStateChanged'))
 
-    assert state_event.args[0] == TUBE_CHANNEL_STATE_REMOTE_PENDING
+    assert state_event.args[0] == cs.TUBE_CHANNEL_STATE_REMOTE_PENDING
 
     message = msg_event.stanza
     assert message['to'] == 'bob at localhost/Bob' # check the resource
@@ -322,14 +324,14 @@ def test(q, bus, conn, stream):
     # The new tube has been offered, the parameters cannot be changed anymore
     # We need to use call_async to check the error
     tube_prop_iface = dbus.Interface(old_tube_chan, PROPERTIES_IFACE)
-    call_async(q, tube_prop_iface, 'Set', CHANNEL_IFACE_TUBE,
+    call_async(q, tube_prop_iface, 'Set', cs.CHANNEL_IFACE_TUBE,
             'Parameters', dbus.Dictionary(
             {dbus.String(u'foo2'): dbus.String(u'bar2')},
             signature=dbus.Signature('sv')),
             dbus_interface=PROPERTIES_IFACE)
     set_error = q.expect('dbus-error')
     # check it is *not* correctly changed
-    new_tube_props = new_tube_chan.GetAll(CHANNEL_IFACE_TUBE,
+    new_tube_props = new_tube_chan.GetAll(cs.CHANNEL_IFACE_TUBE,
             dbus_interface=PROPERTIES_IFACE, byte_arrays=True)
     assert new_tube_props.get("Parameters") == new_sample_parameters, \
             new_tube_props.get("Parameters")
@@ -359,7 +361,7 @@ def test(q, bus, conn, stream):
     si_reply_event, _ = q.expect_many(
             EventPattern('stream-iq', iq_type='result'),
             EventPattern('dbus-signal', signal='TubeStateChanged',
-                args=[stream_tube_id, 2])) # 2 == OPEN
+                args=[stream_tube_id, cs.TUBE_STATE_OPEN]))
     iq = si_reply_event.stanza
     si = xpath.queryForNodes('/iq/si[@xmlns="%s"]' % ns.SI,
         iq)[0]
@@ -373,9 +375,10 @@ def test(q, bus, conn, stream):
     q.expect('dbus-signal', signal='StreamTubeNewConnection',
         args=[stream_tube_id, bob_handle])
 
-    expected_tube = (stream_tube_id, self_handle, TUBE_TYPE_STREAM, 'echo',
-        sample_parameters, TUBE_STATE_OPEN)
-    t.check_tube_in_tubes(expected_tube, tubes_iface.ListTubes(byte_arrays=True))
+    expected_tube = (stream_tube_id, self_handle, cs.TUBE_TYPE_STREAM, 'echo',
+        sample_parameters, cs.TUBE_STATE_OPEN)
+    tubes = tubes_iface.ListTubes(byte_arrays=True)
+    t.check_tube_in_tubes(expected_tube, tubes)
 
     # The CM is the server, so fake a client wanting to talk to it
     # New API tube
@@ -402,7 +405,7 @@ def test(q, bus, conn, stream):
     si_reply_event, _ = q.expect_many(
             EventPattern('stream-iq', iq_type='result'),
             EventPattern('dbus-signal', signal='TubeChannelStateChanged',
-                args=[2])) # 2 == OPEN
+                args=[cs.TUBE_STATE_OPEN]))
     iq = si_reply_event.stanza
     si = xpath.queryForNodes('/iq/si[@xmlns="%s"]' % ns.SI,
         iq)[0]
@@ -416,8 +419,8 @@ def test(q, bus, conn, stream):
     q.expect('dbus-signal', signal='StreamTubeNewConnection',
         args=[bob_handle])
 
-    expected_tube = (new_stream_tube_id, self_handle, TUBE_TYPE_STREAM,
-        'newecho', new_sample_parameters, TUBE_STATE_OPEN)
+    expected_tube = (new_stream_tube_id, self_handle, cs.TUBE_TYPE_STREAM,
+        'newecho', new_sample_parameters, cs.TUBE_STATE_OPEN)
     t.check_tube_in_tubes (expected_tube, tubes_iface.ListTubes(byte_arrays=True))
 
     # have the fake client open the stream
@@ -487,7 +490,8 @@ def test(q, bus, conn, stream):
     assert binary == 'hello, new world'
 
     # OK, how about D-Bus?
-    call_async(q, tubes_iface, 'OfferDBusTube', 'com.example.TestCase', sample_parameters)
+    call_async(q, tubes_iface, 'OfferDBusTube',
+        'com.example.TestCase', sample_parameters)
 
     event = q.expect('stream-iq', iq_type='set', to='bob at localhost/Bob')
     iq = event.stanza
@@ -557,13 +561,13 @@ def test(q, bus, conn, stream):
     stream.send(result)
 
     q.expect('dbus-signal', signal='TubeStateChanged',
-        args=[dbus_tube_id, 2]) # 2 == OPEN
+        args=[dbus_tube_id, cs.TUBE_STATE_OPEN])
 
     tubes = tubes_iface.ListTubes(byte_arrays=True)
-    expected_dtube = (dbus_tube_id, self_handle, TUBE_TYPE_DBUS,
-        'com.example.TestCase', sample_parameters, TUBE_STATE_OPEN)
-    expected_stube = (stream_tube_id, self_handle, TUBE_TYPE_STREAM,
-        'echo', sample_parameters, TUBE_STATE_OPEN)
+    expected_dtube = (dbus_tube_id, self_handle, cs.TUBE_TYPE_DBUS,
+        'com.example.TestCase', sample_parameters, cs.TUBE_STATE_OPEN)
+    expected_stube = (stream_tube_id, self_handle, cs.TUBE_TYPE_STREAM,
+        'echo', sample_parameters, cs.TUBE_STATE_OPEN)
     t.check_tube_in_tubes(expected_dtube, tubes)
     t.check_tube_in_tubes(expected_stube, tubes)
 
@@ -659,10 +663,10 @@ def test(q, bus, conn, stream):
     assert id == 69
     initiator_jid = conn.InspectHandles(1, [initiator])[0]
     assert initiator_jid == 'bob at localhost'
-    assert type == 0 # D-Bus tube
+    assert type == cs.TUBE_TYPE_DBUS
     assert service == 'com.example.TestCase2'
     assert parameters == {'login': 'TEST'}
-    assert state == 0 # local pending
+    assert state == cs.TUBE_STATE_LOCAL_PENDING
 
     # accept the tube (old API)
     call_async(q, tubes_iface, 'AcceptDBusTube', id)
@@ -704,23 +708,23 @@ def test(q, bus, conn, stream):
     assert len(channels) == 1
     path, props = channels[0]
 
-    assert props[CHANNEL_TYPE] == CHANNEL_TYPE_DBUS_TUBE
-    assert props[INITIATOR_HANDLE] == bob_handle
-    assert props[INITIATOR_ID] == 'bob at localhost'
-    assert props[INTERFACES] == [CHANNEL_IFACE_TUBE]
-    assert props[REQUESTED] == False
-    assert props[TARGET_HANDLE] == bob_handle
-    assert props[TARGET_ID] == 'bob at localhost'
-    assert props[DBUS_TUBE_SERVICE_NAME] == 'com.example.TestCase2'
-    assert props[TUBE_PARAMETERS] == {'login': 'TEST'}
-    assert TUBE_STATE not in props
+    assert props[cs.CHANNEL_TYPE] == cs.CHANNEL_TYPE_DBUS_TUBE
+    assert props[cs.INITIATOR_HANDLE] == bob_handle
+    assert props[cs.INITIATOR_ID] == 'bob at localhost'
+    assert props[cs.INTERFACES] == [cs.CHANNEL_IFACE_TUBE]
+    assert props[cs.REQUESTED] == False
+    assert props[cs.TARGET_HANDLE] == bob_handle
+    assert props[cs.TARGET_ID] == 'bob at localhost'
+    assert props[cs.DBUS_TUBE_SERVICE_NAME] == 'com.example.TestCase2'
+    assert props[cs.TUBE_PARAMETERS] == {'login': 'TEST'}
+    assert cs.TUBE_STATE not in props
 
     tube_chan = bus.get_object(conn.bus_name, path)
-    tube_chan_iface = dbus.Interface(tube_chan, CHANNEL)
-    dbus_tube_iface = dbus.Interface(tube_chan, CHANNEL_TYPE_DBUS_TUBE)
+    tube_chan_iface = dbus.Interface(tube_chan, cs.CHANNEL)
+    dbus_tube_iface = dbus.Interface(tube_chan, cs.CHANNEL_TYPE_DBUS_TUBE)
 
-    status = tube_chan.Get(CHANNEL_IFACE_TUBE, 'State', dbus_interface=PROPERTIES_IFACE)
-    assert status == TUBE_STATE_LOCAL_PENDING
+    status = tube_chan.Get(cs.CHANNEL_IFACE_TUBE, 'State', dbus_interface=PROPERTIES_IFACE)
+    assert status == cs.TUBE_STATE_LOCAL_PENDING
 
     # accept the tube (new API)
     call_async(q, dbus_tube_iface, 'AcceptDBusTube')
@@ -753,7 +757,7 @@ def test(q, bus, conn, stream):
     addr = return_event.value[0]
     assert len(addr) > 0
 
-    assert state_event.args[0] == TUBE_STATE_OPEN
+    assert state_event.args[0] == cs.TUBE_STATE_OPEN
 
     # close the tube
     tube_chan_iface.Close()
-- 
1.5.6.5




More information about the telepathy-commits mailing list