[Telepathy-commits] [telepathy-gabble/master] Remove a 'from constants import *'
Will Thompson
will.thompson at collabora.co.uk
Wed Feb 18 09:01:58 PST 2009
Also remove a bunch of redundant checking, and clear up other test code.
---
...offer-accept-private-dbus-stream-tube-socks5.py | 126 ++++++++------------
1 files changed, 50 insertions(+), 76 deletions(-)
diff --git a/tests/twisted/tubes/offer-accept-private-dbus-stream-tube-socks5.py b/tests/twisted/tubes/offer-accept-private-dbus-stream-tube-socks5.py
index a71078b..cd7c5f3 100644
--- a/tests/twisted/tubes/offer-accept-private-dbus-stream-tube-socks5.py
+++ b/tests/twisted/tubes/offer-accept-private-dbus-stream-tube-socks5.py
@@ -12,7 +12,7 @@ from dbus.lowlevel import SignalMessage
from servicetest import call_async, EventPattern, tp_name_prefix, \
watch_tube_signals, sync_dbus
from gabbletest import exec_test, acknowledge_iq, sync_stream
-from constants import *
+import constants as cs
import ns
import tubetestutil as t
@@ -74,6 +74,8 @@ def test(q, bus, conn, stream):
query_name='vCard'),
EventPattern('stream-iq', query_ns='jabber:iq:roster'))
+ self_handle = conn.GetSelfHandle()
+
acknowledge_iq(stream, vcard_event.stanza)
roster = roster_event.stanza
@@ -130,15 +132,13 @@ def test(q, bus, conn, stream):
# Ensure that Joe and Bob's caps have been received
sync_stream(q, stream)
- # new requestotron
- requestotron = dbus.Interface(conn,
- 'org.freedesktop.Telepathy.Connection.Interface.Requests')
+ requestotron = dbus.Interface(conn, cs.CONN_IFACE_REQUESTS)
# Test tubes with Joe. Joe does not have tube capabilities.
# Gabble does not allow to offer a tube to him.
joe_handle = conn.RequestHandles(1, ['joe at localhost'])[0]
- call_async(q, conn, 'RequestChannel',
- tp_name_prefix + '.Channel.Type.Tubes', 1, joe_handle, True);
+ call_async(q, conn, 'RequestChannel', cs.CHANNEL_TYPE_TUBES,
+ cs.HT_CONTACT, joe_handle, True);
ret, old_sig, new_sig = q.expect_many(
EventPattern('dbus-return', method='RequestChannel'),
@@ -148,8 +148,7 @@ def test(q, bus, conn, stream):
joe_chan_path = ret.value[0]
joe_tubes_chan = bus.get_object(conn.bus_name, joe_chan_path)
- joe_tubes_iface = dbus.Interface(joe_tubes_chan,
- tp_name_prefix + '.Channel.Type.Tubes')
+ joe_tubes_iface = dbus.Interface(joe_tubes_chan, cs.CHANNEL_TYPE_TUBES)
path = os.getcwd() + '/stream'
call_async(q, joe_tubes_iface, 'OfferStreamTube',
'echo', sample_parameters, 0, dbus.ByteArray(path), 0, "")
@@ -161,8 +160,8 @@ def test(q, bus, conn, stream):
bob_handle = conn.RequestHandles(1, ['bob at localhost'])[0]
# old requestotron
- call_async(q, conn, 'RequestChannel',
- tp_name_prefix + '.Channel.Type.Tubes', 1, bob_handle, True);
+ call_async(q, conn, 'RequestChannel', cs.CHANNEL_TYPE_TUBES, cs.HT_CONTACT,
+ bob_handle, True);
ret, old_sig, new_sig = q.expect_many(
EventPattern('dbus-return', method='RequestChannel'),
@@ -173,54 +172,48 @@ def test(q, bus, conn, stream):
assert len(ret.value) == 1
chan_path = ret.value[0]
- t.check_NewChannel_signal(old_sig.args, CHANNEL_TYPE_TUBES, chan_path,
+ t.check_NewChannel_signal(old_sig.args, cs.CHANNEL_TYPE_TUBES, chan_path,
bob_handle, True)
- t.check_NewChannels_signal(new_sig.args, CHANNEL_TYPE_TUBES, chan_path,
- bob_handle, 'bob at localhost', conn.GetSelfHandle())
+ t.check_NewChannels_signal(new_sig.args, cs.CHANNEL_TYPE_TUBES, chan_path,
+ bob_handle, 'bob at localhost', self_handle)
old_tubes_channel_properties = new_sig.args[0][0]
t.check_conn_properties(q, conn, [old_tubes_channel_properties])
# Try to CreateChannel with correct properties
# Gabble must succeed
call_async(q, requestotron, 'CreateChannel',
- {'org.freedesktop.Telepathy.Channel.ChannelType':
- 'org.freedesktop.Telepathy.Channel.Type.StreamTube.DRAFT',
- 'org.freedesktop.Telepathy.Channel.TargetHandleType':
- 1,
- 'org.freedesktop.Telepathy.Channel.TargetHandle':
- bob_handle,
- 'org.freedesktop.Telepathy.Channel.Type.StreamTube.DRAFT.Service':
- "newecho",
- });
+ {cs.CHANNEL_TYPE: cs.CHANNEL_TYPE_STREAM_TUBE,
+ cs.TARGET_HANDLE_TYPE: cs.HT_CONTACT,
+ cs.TARGET_HANDLE: bob_handle,
+ cs.STREAM_TUBE_SERVICE: "newecho",
+ })
ret, old_sig, new_sig = q.expect_many(
EventPattern('dbus-return', method='CreateChannel'),
EventPattern('dbus-signal', signal='NewChannel'),
EventPattern('dbus-signal', signal='NewChannels'),
)
- assert len(ret.value) == 2 # CreateChannel returns 2 values: o, a{sv}
- new_chan_path = ret.value[0]
- new_chan_prop_asv = ret.value[1]
+ new_chan_path, new_chan_prop_asv = ret.value
+
assert new_chan_path.find("StreamTube") != -1, new_chan_path
assert new_chan_path.find("SITubesChannel") == -1, new_chan_path
# The path of the Channel.Type.Tubes object MUST be different to the path
# of the Channel.Type.StreamTube object !
assert chan_path != new_chan_path
- t.check_NewChannel_signal(old_sig.args, CHANNEL_TYPE_STREAM_TUBE,
+ t.check_NewChannel_signal(old_sig.args, cs.CHANNEL_TYPE_STREAM_TUBE,
new_chan_path, bob_handle, True)
- t.check_NewChannels_signal(new_sig.args, CHANNEL_TYPE_STREAM_TUBE,
- new_chan_path, bob_handle, 'bob at localhost', conn.GetSelfHandle())
+ t.check_NewChannels_signal(new_sig.args, cs.CHANNEL_TYPE_STREAM_TUBE,
+ new_chan_path, bob_handle, 'bob at localhost', self_handle)
stream_tube_channel_properties = new_sig.args[0][0]
t.check_conn_properties(q, conn,
[old_tubes_channel_properties, stream_tube_channel_properties])
tubes_chan = bus.get_object(conn.bus_name, chan_path)
- tubes_iface = dbus.Interface(tubes_chan,
- tp_name_prefix + '.Channel.Type.Tubes')
+ tubes_iface = dbus.Interface(tubes_chan, cs.CHANNEL_TYPE_TUBES)
- t.check_channel_properties(q, bus, conn, tubes_chan, CHANNEL_TYPE_TUBES,
+ t.check_channel_properties(q, bus, conn, tubes_chan, cs.CHANNEL_TYPE_TUBES,
bob_handle, "bob at localhost")
# Offer the tube, old API
@@ -253,43 +246,29 @@ def test(q, bus, conn, stream):
}
# We offered a tube using the old tube API and created one with the new
- # API, so there is 2 tubes. Check the new tube API works
+ # API, so there are 2 tubes. Check the new tube API works
assert len(filter(lambda x:
- x[1] == "org.freedesktop.Telepathy.Channel.Type.Tubes",
+ x[1] == cs.CHANNEL_TYPE_TUBES,
conn.ListChannels())) == 1
channels = filter(lambda x:
- x[1] == "org.freedesktop.Telepathy.Channel.Type.StreamTube.DRAFT" and
+ x[1] == cs.CHANNEL_TYPE_STREAM_TUBE and
x[0] == new_chan_path,
conn.ListChannels())
assert len(channels) == 1
assert new_chan_path == channels[0][0]
tube_chan = bus.get_object(conn.bus_name, channels[0][0])
- tube_iface = dbus.Interface(tube_chan,
- tp_name_prefix + '.Channel.Type.StreamTube.DRAFT')
+ tube_iface = dbus.Interface(tube_chan, cs.CHANNEL_TYPE_STREAM_TUBE)
+ tube_prop_iface = dbus.Interface(tube_chan, dbus.PROPERTIES_IFACE)
- self_handle = conn.GetSelfHandle()
- tube_basic_props = tube_chan.GetAll(
- 'org.freedesktop.Telepathy.Channel',
- dbus_interface=dbus.PROPERTIES_IFACE)
- assert tube_basic_props.get("InitiatorHandle") == self_handle
-
- stream_tube_props = tube_chan.GetAll(
- 'org.freedesktop.Telepathy.Channel.Type.StreamTube.DRAFT',
- dbus_interface=dbus.PROPERTIES_IFACE)
- assert stream_tube_props.get("Service") == "newecho", stream_tube_props
-
- tube_props = tube_chan.GetAll(
- 'org.freedesktop.Telepathy.Channel.Interface.Tube.DRAFT',
- dbus_interface=dbus.PROPERTIES_IFACE)
- # 3 == Tube_Channel_State_Not_Offered
- assert tube_props.get("State") == 3, tube_props
+ service = tube_prop_iface.Get(cs.CHANNEL_TYPE_STREAM_TUBE, 'Service')
+ assert service == "newecho", service
- t.check_channel_properties(q, bus, conn, tubes_chan, CHANNEL_TYPE_TUBES,
+ t.check_channel_properties(q, bus, conn, tubes_chan, cs.CHANNEL_TYPE_TUBES,
bob_handle, "bob at localhost")
t.check_channel_properties(q, bus, conn, tube_chan,
- CHANNEL_TYPE_STREAM_TUBE, bob_handle, "bob at localhost",
- TUBE_STATE_NOT_OFFERED)
+ cs.CHANNEL_TYPE_STREAM_TUBE, bob_handle, "bob at localhost",
+ cs.TUBE_STATE_NOT_OFFERED)
# Offer the tube, new API
path2 = os.getcwd() + '/stream2'
@@ -322,21 +301,16 @@ def test(q, bus, conn, stream):
}
# The new tube has been offered, the parameters cannot be changed anymore
# We need to use call_async to check the error
- tube_prop_iface = dbus.Interface(tube_chan,
- dbus.PROPERTIES_IFACE)
- call_async(q, tube_prop_iface, 'Set',
- 'org.freedesktop.Telepathy.Channel.Interface.Tube.DRAFT',
+ call_async(q, tube_prop_iface, 'Set', cs.CHANNEL_IFACE_TUBE,
'Parameters', dbus.Dictionary(
{dbus.String(u'foo2'): dbus.String(u'bar2')},
signature=dbus.Signature('sv')),
dbus_interface=dbus.PROPERTIES_IFACE)
set_error = q.expect('dbus-error')
# check it is *not* correctly changed
- tube_props = tube_chan.GetAll(
- 'org.freedesktop.Telepathy.Channel.Interface.Tube.DRAFT',
- dbus_interface=dbus.PROPERTIES_IFACE, byte_arrays=True)
- assert tube_props.get("Parameters") == new_sample_parameters, \
- tube_props.get("Parameters")
+ params = tube_prop_iface.Get(cs.CHANNEL_IFACE_TUBE, 'Parameters',
+ byte_arrays=True)
+ assert params == new_sample_parameters, (params, new_sample_parameters)
# The CM is the server, so fake a client wanting to talk to it
# Old API tube
@@ -363,7 +337,7 @@ def test(q, bus, conn, stream):
si_reply_event, _ = q.expect_many(
EventPattern('stream-iq', iq_type='result'),
EventPattern('dbus-signal', signal='TubeStateChanged',
- args=[stream_tube_id, 2])) # 2 == OPEN
+ args=[stream_tube_id, cs.TUBE_STATE_OPEN]))
iq = si_reply_event.stanza
si = xpath.queryForNodes('/iq/si[@xmlns="%s"]' % ns.SI,
iq)[0]
@@ -377,8 +351,8 @@ def test(q, bus, conn, stream):
q.expect('dbus-signal', signal='StreamTubeNewConnection',
args=[stream_tube_id, bob_handle])
- expected_tube = (stream_tube_id, self_handle, TUBE_TYPE_STREAM, 'echo',
- sample_parameters, TUBE_STATE_OPEN)
+ expected_tube = (stream_tube_id, self_handle, cs.TUBE_TYPE_STREAM, 'echo',
+ sample_parameters, cs.TUBE_STATE_OPEN)
tubes = tubes_iface.ListTubes(byte_arrays=True)
t.check_tube_in_tubes(expected_tube, tubes)
@@ -407,7 +381,7 @@ def test(q, bus, conn, stream):
si_reply_event, _ = q.expect_many(
EventPattern('stream-iq', iq_type='result'),
EventPattern('dbus-signal', signal='TubeChannelStateChanged',
- args=[2])) # 2 == OPEN
+ args=[cs.TUBE_STATE_OPEN]))
iq = si_reply_event.stanza
si = xpath.queryForNodes('/iq/si[@xmlns="%s"]' % ns.SI,
iq)[0]
@@ -428,7 +402,7 @@ def test(q, bus, conn, stream):
1, # Unix stream
'newecho',
new_sample_parameters,
- 2, # OPEN
+ cs.TUBE_STATE_OPEN,
) in tubes, tubes
reactor.listenTCP(5086, S5BFactory(q.append))
@@ -639,13 +613,13 @@ def test(q, bus, conn, stream):
stream.send(result)
q.expect('dbus-signal', signal='TubeStateChanged',
- args=[dbus_tube_id, 2]) # 2 == OPEN
+ args=[dbus_tube_id, cs.TUBE_STATE_OPEN])
tubes = tubes_iface.ListTubes(byte_arrays=True)
- expected_dtube = (dbus_tube_id, self_handle, TUBE_TYPE_DBUS,
- 'com.example.TestCase', sample_parameters, TUBE_STATE_OPEN)
- expected_stube = (stream_tube_id, self_handle, TUBE_TYPE_STREAM,
- 'echo', sample_parameters, TUBE_STATE_OPEN)
+ expected_dtube = (dbus_tube_id, self_handle, cs.TUBE_TYPE_DBUS,
+ 'com.example.TestCase', sample_parameters, cs.TUBE_STATE_OPEN)
+ expected_stube = (stream_tube_id, self_handle, cs.TUBE_TYPE_STREAM,
+ 'echo', sample_parameters, cs.TUBE_STATE_OPEN)
t.check_tube_in_tubes(expected_dtube, tubes)
t.check_tube_in_tubes(expected_stube, tubes)
@@ -729,10 +703,10 @@ def test(q, bus, conn, stream):
assert id == 69
initiator_jid = conn.InspectHandles(1, [initiator])[0]
assert initiator_jid == 'bob at localhost'
- assert type == 0 # D-Bus tube
+ assert type == cs.TUBE_TYPE_DBUS
assert service == 'com.example.TestCase2'
assert parameters == {'login': 'TEST'}
- assert state == 0 # local pending
+ assert state == cs.TUBE_STATE_LOCAL_PENDING
# accept the tube
call_async(q, tubes_iface, 'AcceptDBusTube', id)
--
1.5.6.5
More information about the telepathy-commits
mailing list