[telepathy-python/master] New tube API is now stable

Guillaume Desmottes guillaume.desmottes at collabora.co.uk
Fri Jun 12 03:26:10 PDT 2009


---
 examples/stream_tube_client.py  |  168 +++++++++++++++++++++------------------
 examples/tube-stream-muc.py     |   14 ++--
 examples/tube-stream-private.py |   20 +----
 3 files changed, 102 insertions(+), 100 deletions(-)

diff --git a/examples/stream_tube_client.py b/examples/stream_tube_client.py
index 3cb1dca..718fff4 100644
--- a/examples/stream_tube_client.py
+++ b/examples/stream_tube_client.py
@@ -8,11 +8,13 @@ import tempfile
 import random
 import string
 
+from dbus import PROPERTIES_IFACE
+
 from telepathy.client import (
         Connection, Channel)
 from telepathy.interfaces import (
         CONN_INTERFACE, CHANNEL_INTERFACE_GROUP, CHANNEL_TYPE_TUBES,
-        CHANNEL_TYPE_TEXT)
+        CHANNEL_TYPE_TEXT, CONNECTION_INTERFACE_REQUESTS, CHANNEL_INTERFACE)
 from telepathy.constants import (
         CONNECTION_HANDLE_TYPE_CONTACT,
         CONNECTION_HANDLE_TYPE_ROOM, CONNECTION_STATUS_CONNECTED,
@@ -26,12 +28,19 @@ from telepathy.constants import (
 
 from account import connection_from_file
 
-tube_type = {TUBE_TYPE_DBUS: "D-Bus",\
-             TUBE_TYPE_STREAM: "Stream"}
+# TODO: import when tube API is stable
+CHANNEL_INTERFACE_TUBE = CHANNEL_INTERFACE + ".Interface.Tube"
+CHANNEL_TYPE_STREAM_TUBE = CHANNEL_INTERFACE + ".Type.StreamTube"
+
+TUBE_CHANNEL_STATE_LOCAL_PENDING = 0
+TUBE_CHANNEL_STATE_REMOTE_PENDING = 1
+TUBE_CHANNEL_STATE_OPEN = 2
+TUBE_CHANNEL_STATE_NOT_OFFERED = 3
 
-tube_state = {TUBE_STATE_LOCAL_PENDING : 'local pending',\
-              TUBE_STATE_REMOTE_PENDING : 'remote pending',\
-              TUBE_STATE_OPEN : 'open'}
+tube_state = {TUBE_CHANNEL_STATE_LOCAL_PENDING : 'local pending',\
+              TUBE_CHANNEL_STATE_REMOTE_PENDING : 'remote pending',\
+              TUBE_CHANNEL_STATE_OPEN : 'open',
+              TUBE_CHANNEL_STATE_NOT_OFFERED: 'not offered'}
 
 SERVICE = "x-example"
 
@@ -39,18 +48,18 @@ loop = None
 
 class StreamTubeClient:
     def __init__(self, account_file, muc_id, contact_id):
-        self.conn = connection_from_file(account_file)
+        self.conn = connection_from_file(account_file,
+            ready_handler=self.ready_cb)
         self.muc_id = muc_id
         self.contact_id = contact_id
 
         self.joined = False
+        self.tube = None
 
         assert self.muc_id is None or self.contact_id is None
 
         self.conn[CONN_INTERFACE].connect_to_signal('StatusChanged',
             self.status_changed_cb)
-        self.conn[CONN_INTERFACE].connect_to_signal("NewChannel",
-                self.new_channel_cb)
 
     def run(self):
         self.conn[CONN_INTERFACE].Connect()
@@ -69,12 +78,14 @@ class StreamTubeClient:
             print 'connecting'
         elif state == CONNECTION_STATUS_CONNECTED:
             print 'connected'
-            self.connected_cb()
         elif state == CONNECTION_STATUS_DISCONNECTED:
             print 'disconnected'
             loop.quit()
 
-    def connected_cb(self):
+    def ready_cb(self, conn):
+        self.conn[CONNECTION_INTERFACE_REQUESTS].connect_to_signal("NewChannels",
+                self.new_channels_cb)
+
         self.self_handle = self.conn[CONN_INTERFACE].GetSelfHandle()
 
     def join_muc(self):
@@ -83,48 +94,39 @@ class StreamTubeClient:
         time.sleep(2)
 
         print "join muc", self.muc_id
-        handle = self.conn[CONN_INTERFACE].RequestHandles(
-            CONNECTION_HANDLE_TYPE_ROOM, [self.muc_id])[0]
 
-        chan_path = self.conn[CONN_INTERFACE].RequestChannel(
-            CHANNEL_TYPE_TEXT, CONNECTION_HANDLE_TYPE_ROOM,
-            handle, True)
+        path, props = self.conn[CONNECTION_INTERFACE_REQUESTS].CreateChannel({
+            CHANNEL_INTERFACE + ".ChannelType": CHANNEL_TYPE_TEXT,
+            CHANNEL_INTERFACE + ".TargetHandleType": CONNECTION_HANDLE_TYPE_ROOM,
+            CHANNEL_INTERFACE + ".TargetID": self.muc_id})
 
-        self.channel_text = Channel(self.conn.dbus_proxy.bus_name, chan_path)
+        self.channel_text = Channel(self.conn.dbus_proxy.bus_name, path)
 
         self.self_handle = self.channel_text[CHANNEL_INTERFACE_GROUP].GetSelfHandle()
         self.channel_text[CHANNEL_INTERFACE_GROUP].connect_to_signal(
                 "MembersChanged", self.text_channel_members_changed_cb)
 
-        chan_path = self.conn[CONN_INTERFACE].RequestChannel(
-            CHANNEL_TYPE_TUBES, CONNECTION_HANDLE_TYPE_ROOM,
-            handle, True)
-        self.channel_tubes = Channel(self.conn.dbus_proxy.bus_name, chan_path)
-
         if self.self_handle in self.channel_text[CHANNEL_INTERFACE_GROUP].GetMembers():
             self.joined = True
             self.muc_joined()
 
-    def new_channel_cb(self, object_path, channel_type, handle_type, handle,
-        suppress_handler):
-      if channel_type == CHANNEL_TYPE_TUBES:
-            self.channel_tubes = Channel(self.conn.dbus_proxy.bus_name,
-                    object_path)
-
-            self.channel_tubes[CHANNEL_TYPE_TUBES].connect_to_signal(
-                    "TubeStateChanged", self.tube_state_changed_cb)
-            self.channel_tubes[CHANNEL_TYPE_TUBES].connect_to_signal(
-                    "NewTube", self.new_tube_cb)
-            self.channel_tubes[CHANNEL_TYPE_TUBES].connect_to_signal(
-                    "TubeClosed", self.tube_closed_cb)
-            self.channel_tubes[CHANNEL_TYPE_TUBES].connect_to_signal(
-                   "StreamTubeNewConnection",
-                   self.stream_tube_new_connection_cb)
-
-            for tube in self.channel_tubes[CHANNEL_TYPE_TUBES].ListTubes():
-                id, initiator, type, service, params, state = (tube[0],
-                        tube[1], tube[2], tube[3], tube[4], tube[5])
-                self.new_tube_cb(id, initiator, type, service, params, state)
+    def new_channels_cb(self, channels):
+        if self.tube is not None:
+            return
+
+        for path, props in channels:
+            if props[CHANNEL_INTERFACE + ".ChannelType"] == CHANNEL_TYPE_STREAM_TUBE:
+                self.tube = Channel(self.conn.dbus_proxy.bus_name, path)
+
+                self.tube[CHANNEL_INTERFACE_TUBE].connect_to_signal(
+                        "TubeChannelStateChanged", self.tube_channel_state_changed_cb)
+                self.tube[CHANNEL_INTERFACE].connect_to_signal(
+                        "Closed", self.tube_closed_cb)
+                self.tube[CHANNEL_TYPE_STREAM_TUBE].connect_to_signal(
+                       "StreamTubeNewConnection",
+                       self.stream_tube_new_connection_cb)
+
+                self.got_tube(props)
 
     def text_channel_members_changed_cb(self, message, added, removed,
             local_pending, remote_pending, actor, reason):
@@ -135,30 +137,30 @@ class StreamTubeClient:
     def muc_joined(self):
         pass
 
-    def new_tube_cb(self, id, initiator, type, service, params, state):
-        initiator_id = self.conn[CONN_INTERFACE].InspectHandles(
-                CONNECTION_HANDLE_TYPE_CONTACT, [initiator])[0]
+    def got_tube(self, props):
+        initiator_id = props[CHANNEL_INTERFACE + ".InitiatorID"]
+        service = props[CHANNEL_TYPE_STREAM_TUBE + ".Service"]
 
-        print "new %s tube (%d) offered by %s. Service: %s. State: %s" % (
-                tube_type[type], id, initiator_id, service, tube_state[state])
+        state = self.tube[PROPERTIES_IFACE].Get(CHANNEL_INTERFACE_TUBE, 'State')
 
-        if state == TUBE_STATE_OPEN:
-            self.tube_opened(id)
+        print "new stream tube offered by %s. Service: %s. State: %s" % (
+                initiator_id, service, tube_state[state])
 
-    def tube_opened(self, id):
+    def tube_opened(self):
         pass
 
-    def tube_state_changed_cb(self, id, state):
-        if state == TUBE_STATE_OPEN:
-            self.tube_opened(id)
+    def tube_channel_state_changed_cb(self, state):
+        print "tubes state changed:", tube_state[state]
+        if state == TUBE_CHANNEL_STATE_OPEN:
+            self.tube_opened()
 
-    def tube_closed_cb(self, id):
-        print "tube closed", id
+    def tube_closed_cb(self):
+        print "tube closed"
 
-    def stream_tube_new_connection_cb(self, id, handle):
-       print "new socket connection on tube %u from %s" % (id,
+    def stream_tube_new_connection_cb(self, handle):
+       print "new socket connection on tube from %s" % \
                self.conn[CONN_INTERFACE].InspectHandles(
-                   CONNECTION_HANDLE_TYPE_CONTACT, [handle])[0])
+                   CONNECTION_HANDLE_TYPE_CONTACT, [handle])[0]
 
 class StreamTubeInitiatorClient(StreamTubeClient):
     def __init__(self, account_file, muc_id, contact_id, socket_address=None):
@@ -174,39 +176,47 @@ class StreamTubeInitiatorClient(StreamTubeClient):
             print "Will export socket", socket_address
             self.socket_address = socket_address
 
-    def offer_tube(self):
-        params = {"login": "badger", "a_int" : 69}
-        print "offer tube"
-        id = self.channel_tubes[CHANNEL_TYPE_TUBES].OfferStreamTube(SERVICE,
-                params, SOCKET_ADDRESS_TYPE_IPV4, self.socket_address,
-                SOCKET_ACCESS_CONTROL_LOCALHOST, "")
+    def create_tube(self, handle_type, id):
+        print "Create tube"
+
+        path, props = self.conn[CONNECTION_INTERFACE_REQUESTS].CreateChannel({
+            CHANNEL_INTERFACE + ".ChannelType": CHANNEL_TYPE_STREAM_TUBE,
+            CHANNEL_INTERFACE + ".TargetHandleType": handle_type,
+            CHANNEL_INTERFACE + ".TargetID": id,
+            CHANNEL_TYPE_STREAM_TUBE + ".Service": SERVICE})
+
+    def got_tube(self, props):
+        StreamTubeClient.got_tube(self, props)
+
+        params = dbus.Dictionary({"login": "badger",
+            "a_int" : dbus.Int32(69)}, signature='sv')
+
+        print "Offer tube"
+        self.tube[CHANNEL_TYPE_STREAM_TUBE].Offer(
+            SOCKET_ADDRESS_TYPE_IPV4, self.socket_address, SOCKET_ACCESS_CONTROL_LOCALHOST,
+            params)
+
 
 class StreamTubeJoinerClient(StreamTubeClient):
     def __init__(self, account_file, muc_id, contact_id, connect_trivial_client):
         StreamTubeClient.__init__(self, account_file, muc_id, contact_id)
 
-        self.tube_accepted = False
         self.connect_trivial_client = connect_trivial_client
 
-    def new_tube_cb(self, id, initiator, type, service, params, state):
-        StreamTubeClient.new_tube_cb(self, id, initiator, type, service, params, state)
+    def got_tube(self, props):
+        StreamTubeClient.got_tube(self, props)
 
-        if state == TUBE_STATE_LOCAL_PENDING and service == SERVICE and\
-                not self.tube_accepted:
-            print "accept tube", id
-            self.tube_accepted = True
-            self.channel_tubes[CHANNEL_TYPE_TUBES].AcceptStreamTube(id,
-                    SOCKET_ADDRESS_TYPE_IPV4, SOCKET_ACCESS_CONTROL_LOCALHOST, "")
+        print "accept tube"
 
-    def tube_opened(self, id):
-        StreamTubeClient.tube_opened(self, id)
+        self.address = self.tube[CHANNEL_TYPE_STREAM_TUBE].Accept(
+                SOCKET_ADDRESS_TYPE_IPV4, SOCKET_ACCESS_CONTROL_LOCALHOST, "",
+                byte_arrays=True)
 
-        address_type, address = self.channel_tubes[CHANNEL_TYPE_TUBES].GetStreamTubeSocketAddress(
-                id, byte_arrays=True)
-        print "tube opened. Clients can connect to", address
+    def tube_opened(self):
+        print "tube opened. Clients can connect to", self.address
 
         if self.connect_trivial_client:
-            self.client = TrivialStreamClient(address)
+            self.client = TrivialStreamClient(self.address)
             self.client.connect()
 
 class TrivialStream:
diff --git a/examples/tube-stream-muc.py b/examples/tube-stream-muc.py
index 039fb18..bb49a12 100644
--- a/examples/tube-stream-muc.py
+++ b/examples/tube-stream-muc.py
@@ -1,6 +1,8 @@
 import sys
 import dbus
 
+from telepathy.constants import CONNECTION_HANDLE_TYPE_ROOM
+
 from stream_tube_client import StreamTubeJoinerClient, \
         StreamTubeInitiatorClient
 
@@ -8,24 +10,24 @@ class StreamTubeInitiatorMucClient(StreamTubeInitiatorClient):
     def __init__(self, account_file, muc_id, socket_path=None):
         StreamTubeInitiatorClient.__init__(self, account_file, muc_id, None, socket_path)
 
-    def connected_cb(self):
-        StreamTubeInitiatorClient.connected_cb(self)
+    def ready_cb(self, conn):
+        StreamTubeInitiatorClient.ready_cb(self, conn)
 
         self.join_muc()
 
     def muc_joined(self):
         StreamTubeInitiatorClient.muc_joined(self)
 
-        print "muc joined. Offer the tube"
-        self.offer_tube()
+        print "muc joined. Create the tube"
+        self.create_tube(CONNECTION_HANDLE_TYPE_ROOM, self.muc_id)
 
 class StreamTubeJoinerMucClient(StreamTubeJoinerClient):
     def __init__(self, account_file, muc_id, connect_trivial_client):
         StreamTubeJoinerClient.__init__(self, account_file, muc_id, None,
                 connect_trivial_client)
 
-    def connected_cb(self):
-        StreamTubeJoinerClient.connected_cb(self)
+    def ready_cb(self, conn):
+        StreamTubeJoinerClient.ready_cb(self, conn)
 
         self.join_muc()
 
diff --git a/examples/tube-stream-private.py b/examples/tube-stream-private.py
index 6fddbee..83459ee 100644
--- a/examples/tube-stream-private.py
+++ b/examples/tube-stream-private.py
@@ -11,28 +11,18 @@ class StreamTubeInitiatorPrivateClient(StreamTubeInitiatorClient):
     def __init__(self, account_file, contact_id, socket_address=None):
         StreamTubeInitiatorClient.__init__(self, account_file, None, contact_id, socket_address)
 
-    def connected_cb(self):
-        StreamTubeInitiatorClient.connected_cb(self)
+    def ready_cb(self, conn):
+        StreamTubeInitiatorClient.ready_cb(self, conn)
 
-        self.tubes_with_contact()
-        self.offer_tube()
-
-    def tubes_with_contact(self):
-        handle = self.conn[CONN_INTERFACE].RequestHandles(
-                CONNECTION_HANDLE_TYPE_CONTACT, [self.contact_id])[0]
-
-        chan_path = self.conn[CONN_INTERFACE].RequestChannel(
-            CHANNEL_TYPE_TUBES, CONNECTION_HANDLE_TYPE_CONTACT,
-            handle, True)
-        self.channel_tubes = Channel(self.conn.dbus_proxy.bus_name, chan_path)
+        self.create_tube(CONNECTION_HANDLE_TYPE_CONTACT, self.contact_id)
 
 class StreamTubeJoinerPrivateClient(StreamTubeJoinerClient):
     def __init__(self, account_file, connect_trivial_client):
         StreamTubeJoinerClient.__init__(self, account_file, None, None,
                 connect_trivial_client)
 
-    def connected_cb(self):
-        StreamTubeJoinerClient.connected_cb(self)
+    def ready_cb(self, conn):
+        StreamTubeJoinerClient.ready_cb(self, conn)
 
         print "waiting for a tube offer from contacts"
 
-- 
1.5.6.5




More information about the telepathy-commits mailing list