[Telepathy-commits] [telepathy-gabble/master] Remove another 'from constants import *'
Will Thompson
will.thompson at collabora.co.uk
Wed Feb 18 10:17:02 PST 2009
---
.../offer-accept-private-dbus-stream-tube-ibb.py | 144 ++++++++++----------
1 files changed, 74 insertions(+), 70 deletions(-)
diff --git a/tests/twisted/tubes/offer-accept-private-dbus-stream-tube-ibb.py b/tests/twisted/tubes/offer-accept-private-dbus-stream-tube-ibb.py
index e4a414d..1d56425 100644
--- a/tests/twisted/tubes/offer-accept-private-dbus-stream-tube-ibb.py
+++ b/tests/twisted/tubes/offer-accept-private-dbus-stream-tube-ibb.py
@@ -9,7 +9,7 @@ from dbus.lowlevel import SignalMessage
from servicetest import call_async, EventPattern, watch_tube_signals
from gabbletest import exec_test, acknowledge_iq, sync_stream
-from constants import *
+import constants as cs
import ns
import tubetestutil as t
@@ -77,6 +77,7 @@ def test(q, bus, conn, stream):
EventPattern('stream-iq', query_ns='jabber:iq:roster'))
self_handle = conn.GetSelfHandle()
+
acknowledge_iq(stream, vcard_event.stanza)
roster = roster_event.stanza
@@ -112,13 +113,14 @@ def test(q, bus, conn, stream):
sync_stream(q, stream)
# new requestotron
- requestotron = dbus.Interface(conn, CONN_IFACE_REQUESTS)
+ requestotron = dbus.Interface(conn, cs.CONN_IFACE_REQUESTS)
# Test tubes with Bob. Bob does not have tube capabilities.
bob_handle = conn.RequestHandles(1, ['bob at localhost'])[0]
# old requestotron
- call_async(q, conn, 'RequestChannel', CHANNEL_TYPE_TUBES, 1, bob_handle, True)
+ call_async(q, conn, 'RequestChannel', cs.CHANNEL_TYPE_TUBES, cs.HT_CONTACT,
+ bob_handle, True)
ret, old_sig, new_sig = q.expect_many(
EventPattern('dbus-return', method='RequestChannel'),
@@ -129,9 +131,10 @@ def test(q, bus, conn, stream):
assert len(ret.value) == 1
chan_path = ret.value[0]
- t.check_NewChannel_signal(old_sig.args, CHANNEL_TYPE_TUBES, chan_path, bob_handle, True)
- t.check_NewChannels_signal(new_sig.args, CHANNEL_TYPE_TUBES, chan_path,
- bob_handle, 'bob at localhost', conn.GetSelfHandle())
+ t.check_NewChannel_signal(old_sig.args, cs.CHANNEL_TYPE_TUBES, chan_path,
+ bob_handle, True)
+ t.check_NewChannels_signal(new_sig.args, cs.CHANNEL_TYPE_TUBES, chan_path,
+ bob_handle, 'bob at localhost', self_handle)
old_tubes_channel_properties = new_sig.args[0][0]
t.check_conn_properties(q, conn, [old_tubes_channel_properties])
@@ -139,11 +142,11 @@ def test(q, bus, conn, stream):
# Try to CreateChannel with correct properties
# Gabble must succeed
call_async(q, requestotron, 'CreateChannel',
- {CHANNEL_TYPE: CHANNEL_TYPE_STREAM_TUBE,
- TARGET_HANDLE_TYPE: HT_CONTACT,
- TARGET_HANDLE: bob_handle,
- STREAM_TUBE_SERVICE: 'newecho',
- });
+ {cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_STREAM_TUBE,
+ cs.TARGET_HANDLE_TYPE: cs.HT_CONTACT,
+ cs.TARGET_HANDLE: bob_handle,
+ cs.STREAM_TUBE_SERVICE: "newecho",
+ })
# the NewTube signal (old API) is not fired now as the tube wasn't offered
# yet
@@ -157,8 +160,8 @@ def test(q, bus, conn, stream):
new_chan_path = ret.value[0]
new_chan_prop_asv = ret.value[1]
# State and Parameters are mutables so not announced
- assert (TUBE_STATE) not in new_chan_prop_asv
- assert (TUBE_PARAMETERS) not in new_chan_prop_asv
+ assert cs.TUBE_STATE not in new_chan_prop_asv
+ assert cs.TUBE_PARAMETERS not in new_chan_prop_asv
assert new_chan_path.find("StreamTube") != -1, new_chan_path
assert new_chan_path.find("SITubesChannel") == -1, new_chan_path
# The path of the Channel.Type.Tubes object MUST be different to the path
@@ -166,31 +169,30 @@ def test(q, bus, conn, stream):
assert chan_path != new_chan_path
new_tube_chan = bus.get_object(conn.bus_name, new_chan_path)
- new_tube_iface = dbus.Interface(new_tube_chan, CHANNEL_TYPE_STREAM_TUBE)
+ new_tube_iface = dbus.Interface(new_tube_chan, cs.CHANNEL_TYPE_STREAM_TUBE)
# check State and Parameters
- new_tube_props = new_tube_chan.GetAll(CHANNEL_IFACE_TUBE,
+ new_tube_props = new_tube_chan.GetAll(cs.CHANNEL_IFACE_TUBE,
dbus_interface=PROPERTIES_IFACE)
# the tube created using the old API is in the "not offered" state
- assert new_tube_props['State'] == TUBE_CHANNEL_STATE_NOT_OFFERED
+ assert new_tube_props['State'] == cs.TUBE_CHANNEL_STATE_NOT_OFFERED
- t.check_NewChannel_signal(old_sig.args, CHANNEL_TYPE_STREAM_TUBE,
+ t.check_NewChannel_signal(old_sig.args, cs.CHANNEL_TYPE_STREAM_TUBE,
new_chan_path, bob_handle, True)
- t.check_NewChannels_signal(new_sig.args, CHANNEL_TYPE_STREAM_TUBE,
- new_chan_path, bob_handle, 'bob at localhost', conn.GetSelfHandle())
+ t.check_NewChannels_signal(new_sig.args, cs.CHANNEL_TYPE_STREAM_TUBE,
+ new_chan_path, bob_handle, 'bob at localhost', self_handle)
stream_tube_channel_properties = new_sig.args[0][0]
- assert TUBE_STATE not in stream_tube_channel_properties
- assert TUBE_PARAMETERS not in stream_tube_channel_properties
+ assert cs.TUBE_STATE not in stream_tube_channel_properties
+ assert cs.TUBE_PARAMETERS not in stream_tube_channel_properties
t.check_conn_properties(q, conn,
[old_tubes_channel_properties, stream_tube_channel_properties])
tubes_chan = bus.get_object(conn.bus_name, chan_path)
+ tubes_iface = dbus.Interface(tubes_chan, cs.CHANNEL_TYPE_TUBES)
- tubes_iface = dbus.Interface(tubes_chan, CHANNEL_TYPE_TUBES)
-
- t.check_channel_properties(q, bus, conn, tubes_chan, CHANNEL_TYPE_TUBES,
+ t.check_channel_properties(q, bus, conn, tubes_chan, cs.CHANNEL_TYPE_TUBES,
bob_handle, "bob at localhost")
# Create another tube using old API
@@ -231,22 +233,22 @@ def test(q, bus, conn, stream):
# the tube channel (new API) is announced
- t.check_NewChannel_signal(new_chan.args, CHANNEL_TYPE_STREAM_TUBE,
+ t.check_NewChannel_signal(new_chan.args, cs.CHANNEL_TYPE_STREAM_TUBE,
None, bob_handle, False)
- t.check_NewChannels_signal(new_chans.args, CHANNEL_TYPE_STREAM_TUBE,
+ t.check_NewChannels_signal(new_chans.args, cs.CHANNEL_TYPE_STREAM_TUBE,
new_chan.args[0], bob_handle, "bob at localhost", self_handle)
props = new_chans.args[0][0][1]
- assert TUBE_STATE not in props
- assert TUBE_PARAMETERS not in props
+ assert cs.TUBE_STATE not in props
+ assert cs.TUBE_PARAMETERS not in props
# We offered a tube using the old tube API and created one with the new
# API, so there are 2 tubes. Check the new tube API works
assert len(filter(lambda x:
- x[1] == CHANNEL_TYPE_TUBES,
+ x[1] == cs.CHANNEL_TYPE_TUBES,
conn.ListChannels())) == 1
channels = filter(lambda x:
- x[1] == CHANNEL_TYPE_STREAM_TUBE and
+ x[1] == cs.CHANNEL_TYPE_STREAM_TUBE and
x[0] == new_chan_path,
conn.ListChannels())
assert len(channels) == 1
@@ -254,26 +256,26 @@ def test(q, bus, conn, stream):
old_tube_chan = bus.get_object(conn.bus_name, new_chan.args[0])
- tube_basic_props = old_tube_chan.GetAll(CHANNEL,
+ tube_basic_props = old_tube_chan.GetAll(cs.CHANNEL,
dbus_interface=PROPERTIES_IFACE)
assert tube_basic_props.get("InitiatorHandle") == self_handle
- stream_tube_props = old_tube_chan.GetAll(CHANNEL_TYPE_STREAM_TUBE,
+ stream_tube_props = old_tube_chan.GetAll(cs.CHANNEL_TYPE_STREAM_TUBE,
dbus_interface=PROPERTIES_IFACE)
assert stream_tube_props.get("Service") == "echo", stream_tube_props
- old_tube_props = old_tube_chan.GetAll(CHANNEL_IFACE_TUBE,
+ old_tube_props = old_tube_chan.GetAll(cs.CHANNEL_IFACE_TUBE,
dbus_interface=PROPERTIES_IFACE, byte_arrays=True)
assert old_tube_props.get("Parameters") == dbus.Dictionary(sample_parameters)
# Tube have been created using the old API and so is already offered
- assert old_tube_props['State'] == TUBE_CHANNEL_STATE_REMOTE_PENDING
+ assert old_tube_props['State'] == cs.TUBE_CHANNEL_STATE_REMOTE_PENDING
- t.check_channel_properties(q, bus, conn, tubes_chan, CHANNEL_TYPE_TUBES,
+ t.check_channel_properties(q, bus, conn, tubes_chan, cs.CHANNEL_TYPE_TUBES,
bob_handle, "bob at localhost")
t.check_channel_properties(q, bus, conn, old_tube_chan,
- CHANNEL_TYPE_STREAM_TUBE, bob_handle, "bob at localhost",
- TUBE_CHANNEL_STATE_REMOTE_PENDING)
+ cs.CHANNEL_TYPE_STREAM_TUBE, bob_handle, "bob at localhost",
+ cs.TUBE_CHANNEL_STATE_REMOTE_PENDING)
# Offer the first tube created (new API)
path2 = os.getcwd() + '/stream2'
@@ -285,7 +287,7 @@ def test(q, bus, conn, stream):
EventPattern('dbus-signal', signal='NewTube'),
EventPattern('dbus-signal', signal='TubeChannelStateChanged'))
- assert state_event.args[0] == TUBE_CHANNEL_STATE_REMOTE_PENDING
+ assert state_event.args[0] == cs.TUBE_CHANNEL_STATE_REMOTE_PENDING
message = msg_event.stanza
assert message['to'] == 'bob at localhost/Bob' # check the resource
@@ -322,14 +324,14 @@ def test(q, bus, conn, stream):
# The new tube has been offered, the parameters cannot be changed anymore
# We need to use call_async to check the error
tube_prop_iface = dbus.Interface(old_tube_chan, PROPERTIES_IFACE)
- call_async(q, tube_prop_iface, 'Set', CHANNEL_IFACE_TUBE,
+ call_async(q, tube_prop_iface, 'Set', cs.CHANNEL_IFACE_TUBE,
'Parameters', dbus.Dictionary(
{dbus.String(u'foo2'): dbus.String(u'bar2')},
signature=dbus.Signature('sv')),
dbus_interface=PROPERTIES_IFACE)
set_error = q.expect('dbus-error')
# check it is *not* correctly changed
- new_tube_props = new_tube_chan.GetAll(CHANNEL_IFACE_TUBE,
+ new_tube_props = new_tube_chan.GetAll(cs.CHANNEL_IFACE_TUBE,
dbus_interface=PROPERTIES_IFACE, byte_arrays=True)
assert new_tube_props.get("Parameters") == new_sample_parameters, \
new_tube_props.get("Parameters")
@@ -359,7 +361,7 @@ def test(q, bus, conn, stream):
si_reply_event, _ = q.expect_many(
EventPattern('stream-iq', iq_type='result'),
EventPattern('dbus-signal', signal='TubeStateChanged',
- args=[stream_tube_id, 2])) # 2 == OPEN
+ args=[stream_tube_id, cs.TUBE_STATE_OPEN]))
iq = si_reply_event.stanza
si = xpath.queryForNodes('/iq/si[@xmlns="%s"]' % ns.SI,
iq)[0]
@@ -373,9 +375,10 @@ def test(q, bus, conn, stream):
q.expect('dbus-signal', signal='StreamTubeNewConnection',
args=[stream_tube_id, bob_handle])
- expected_tube = (stream_tube_id, self_handle, TUBE_TYPE_STREAM, 'echo',
- sample_parameters, TUBE_STATE_OPEN)
- t.check_tube_in_tubes(expected_tube, tubes_iface.ListTubes(byte_arrays=True))
+ expected_tube = (stream_tube_id, self_handle, cs.TUBE_TYPE_STREAM, 'echo',
+ sample_parameters, cs.TUBE_STATE_OPEN)
+ tubes = tubes_iface.ListTubes(byte_arrays=True)
+ t.check_tube_in_tubes(expected_tube, tubes)
# The CM is the server, so fake a client wanting to talk to it
# New API tube
@@ -402,7 +405,7 @@ def test(q, bus, conn, stream):
si_reply_event, _ = q.expect_many(
EventPattern('stream-iq', iq_type='result'),
EventPattern('dbus-signal', signal='TubeChannelStateChanged',
- args=[2])) # 2 == OPEN
+ args=[cs.TUBE_STATE_OPEN]))
iq = si_reply_event.stanza
si = xpath.queryForNodes('/iq/si[@xmlns="%s"]' % ns.SI,
iq)[0]
@@ -416,8 +419,8 @@ def test(q, bus, conn, stream):
q.expect('dbus-signal', signal='StreamTubeNewConnection',
args=[bob_handle])
- expected_tube = (new_stream_tube_id, self_handle, TUBE_TYPE_STREAM,
- 'newecho', new_sample_parameters, TUBE_STATE_OPEN)
+ expected_tube = (new_stream_tube_id, self_handle, cs.TUBE_TYPE_STREAM,
+ 'newecho', new_sample_parameters, cs.TUBE_STATE_OPEN)
t.check_tube_in_tubes (expected_tube, tubes_iface.ListTubes(byte_arrays=True))
# have the fake client open the stream
@@ -487,7 +490,8 @@ def test(q, bus, conn, stream):
assert binary == 'hello, new world'
# OK, how about D-Bus?
- call_async(q, tubes_iface, 'OfferDBusTube', 'com.example.TestCase', sample_parameters)
+ call_async(q, tubes_iface, 'OfferDBusTube',
+ 'com.example.TestCase', sample_parameters)
event = q.expect('stream-iq', iq_type='set', to='bob at localhost/Bob')
iq = event.stanza
@@ -557,13 +561,13 @@ def test(q, bus, conn, stream):
stream.send(result)
q.expect('dbus-signal', signal='TubeStateChanged',
- args=[dbus_tube_id, 2]) # 2 == OPEN
+ args=[dbus_tube_id, cs.TUBE_STATE_OPEN])
tubes = tubes_iface.ListTubes(byte_arrays=True)
- expected_dtube = (dbus_tube_id, self_handle, TUBE_TYPE_DBUS,
- 'com.example.TestCase', sample_parameters, TUBE_STATE_OPEN)
- expected_stube = (stream_tube_id, self_handle, TUBE_TYPE_STREAM,
- 'echo', sample_parameters, TUBE_STATE_OPEN)
+ expected_dtube = (dbus_tube_id, self_handle, cs.TUBE_TYPE_DBUS,
+ 'com.example.TestCase', sample_parameters, cs.TUBE_STATE_OPEN)
+ expected_stube = (stream_tube_id, self_handle, cs.TUBE_TYPE_STREAM,
+ 'echo', sample_parameters, cs.TUBE_STATE_OPEN)
t.check_tube_in_tubes(expected_dtube, tubes)
t.check_tube_in_tubes(expected_stube, tubes)
@@ -659,10 +663,10 @@ def test(q, bus, conn, stream):
assert id == 69
initiator_jid = conn.InspectHandles(1, [initiator])[0]
assert initiator_jid == 'bob at localhost'
- assert type == 0 # D-Bus tube
+ assert type == cs.TUBE_TYPE_DBUS
assert service == 'com.example.TestCase2'
assert parameters == {'login': 'TEST'}
- assert state == 0 # local pending
+ assert state == cs.TUBE_STATE_LOCAL_PENDING
# accept the tube (old API)
call_async(q, tubes_iface, 'AcceptDBusTube', id)
@@ -704,23 +708,23 @@ def test(q, bus, conn, stream):
assert len(channels) == 1
path, props = channels[0]
- assert props[CHANNEL_TYPE] == CHANNEL_TYPE_DBUS_TUBE
- assert props[INITIATOR_HANDLE] == bob_handle
- assert props[INITIATOR_ID] == 'bob at localhost'
- assert props[INTERFACES] == [CHANNEL_IFACE_TUBE]
- assert props[REQUESTED] == False
- assert props[TARGET_HANDLE] == bob_handle
- assert props[TARGET_ID] == 'bob at localhost'
- assert props[DBUS_TUBE_SERVICE_NAME] == 'com.example.TestCase2'
- assert props[TUBE_PARAMETERS] == {'login': 'TEST'}
- assert TUBE_STATE not in props
+ assert props[cs.CHANNEL_TYPE] == cs.CHANNEL_TYPE_DBUS_TUBE
+ assert props[cs.INITIATOR_HANDLE] == bob_handle
+ assert props[cs.INITIATOR_ID] == 'bob at localhost'
+ assert props[cs.INTERFACES] == [cs.CHANNEL_IFACE_TUBE]
+ assert props[cs.REQUESTED] == False
+ assert props[cs.TARGET_HANDLE] == bob_handle
+ assert props[cs.TARGET_ID] == 'bob at localhost'
+ assert props[cs.DBUS_TUBE_SERVICE_NAME] == 'com.example.TestCase2'
+ assert props[cs.TUBE_PARAMETERS] == {'login': 'TEST'}
+ assert cs.TUBE_STATE not in props
tube_chan = bus.get_object(conn.bus_name, path)
- tube_chan_iface = dbus.Interface(tube_chan, CHANNEL)
- dbus_tube_iface = dbus.Interface(tube_chan, CHANNEL_TYPE_DBUS_TUBE)
+ tube_chan_iface = dbus.Interface(tube_chan, cs.CHANNEL)
+ dbus_tube_iface = dbus.Interface(tube_chan, cs.CHANNEL_TYPE_DBUS_TUBE)
- status = tube_chan.Get(CHANNEL_IFACE_TUBE, 'State', dbus_interface=PROPERTIES_IFACE)
- assert status == TUBE_STATE_LOCAL_PENDING
+ status = tube_chan.Get(cs.CHANNEL_IFACE_TUBE, 'State', dbus_interface=PROPERTIES_IFACE)
+ assert status == cs.TUBE_STATE_LOCAL_PENDING
# accept the tube (new API)
call_async(q, dbus_tube_iface, 'AcceptDBusTube')
@@ -753,7 +757,7 @@ def test(q, bus, conn, stream):
addr = return_event.value[0]
assert len(addr) > 0
- assert state_event.args[0] == TUBE_STATE_OPEN
+ assert state_event.args[0] == cs.TUBE_STATE_OPEN
# close the tube
tube_chan_iface.Close()
--
1.5.6.5
More information about the telepathy-commits
mailing list