[Telepathy-commits] [telepathy-qt4/master] StreamedMediaChannel: Only signal new streams/FeatureStreams ready when the stream contacts are retrieved.

Andre Moreira Magalhaes (andrunko) andre.magalhaes at collabora.co.uk
Wed Mar 11 08:47:13 PDT 2009


---
 TelepathyQt4/Client/streamed-media-channel.cpp |  164 +++++++++++++++++++-----
 TelepathyQt4/Client/streamed-media-channel.h   |   10 +-
 2 files changed, 141 insertions(+), 33 deletions(-)

diff --git a/TelepathyQt4/Client/streamed-media-channel.cpp b/TelepathyQt4/Client/streamed-media-channel.cpp
index b1a0896..e552633 100644
--- a/TelepathyQt4/Client/streamed-media-channel.cpp
+++ b/TelepathyQt4/Client/streamed-media-channel.cpp
@@ -26,6 +26,7 @@
 
 #include <TelepathyQt4/Client/Connection>
 #include <TelepathyQt4/Client/ContactManager>
+#include <TelepathyQt4/Client/PendingContacts>
 #include <TelepathyQt4/Client/PendingVoidMethodCall>
 
 #include <QHash>
@@ -37,21 +38,44 @@ namespace Client
 
 struct PendingMediaStreams::Private
 {
-    Private(StreamedMediaChannel *channel)
-        : channel(channel)
+    Private(PendingMediaStreams *parent, StreamedMediaChannel *channel)
+        : parent(parent), channel(channel)
     {
     }
 
+    Private(PendingMediaStreams *parent, StreamedMediaChannel *channel,
+            const MediaStreams &streams)
+        : parent(parent), channel(channel), streams(streams)
+    {
+        getContacts();
+    }
+
+    void getContacts();
+
+    PendingMediaStreams *parent;
     StreamedMediaChannel *channel;
     MediaStreams streams;
 };
 
+void PendingMediaStreams::Private::getContacts()
+{
+    QSet<uint> contactsRequired;
+    foreach (const QSharedPointer<MediaStream> &stream, streams) {
+        contactsRequired |= stream->contactHandle();
+    }
+
+    ContactManager *contactManager = channel->connection()->contactManager();
+    parent->connect(contactManager->contactsForHandles(contactsRequired.toList()),
+            SIGNAL(finished(Telepathy::Client::PendingOperation *)),
+            SLOT(gotContacts(Telepathy::Client::PendingOperation *)));
+}
+
 PendingMediaStreams::PendingMediaStreams(StreamedMediaChannel *channel,
         QSharedPointer<Telepathy::Client::Contact> contact,
         QList<Telepathy::MediaStreamType> types,
         QObject *parent)
     : PendingOperation(parent),
-      mPriv(new Private(channel))
+      mPriv(new Private(this, channel))
 {
     Telepathy::UIntList l;
     foreach (Telepathy::MediaStreamType type, types) {
@@ -66,6 +90,14 @@ PendingMediaStreams::PendingMediaStreams(StreamedMediaChannel *channel,
             SLOT(gotStreams(QDBusPendingCallWatcher*)));
 }
 
+PendingMediaStreams::PendingMediaStreams(StreamedMediaChannel *channel,
+        const MediaStreams &streams,
+        QObject *parent)
+    : PendingOperation(parent),
+      mPriv(new Private(this, channel, streams))
+{
+}
+
 PendingMediaStreams::~PendingMediaStreams()
 {
     delete mPriv;
@@ -74,9 +106,13 @@ PendingMediaStreams::~PendingMediaStreams()
 MediaStreams PendingMediaStreams::streams() const
 {
     if (!isFinished()) {
-        warning() << "PendingMediaStreams::streams called before finished";
+        warning() << "PendingMediaStreams::streams called before finished, "
+            "returning empty list";
+        return MediaStreams();
     } else if (!isValid()) {
-        warning() << "PendingMediaStreams::streams called when not valid";
+        warning() << "PendingMediaStreams::streams called when not valid, "
+            "returning empty list";
+        return MediaStreams();
     }
 
     return mPriv->streams;
@@ -98,8 +134,7 @@ void PendingMediaStreams::gotStreams(QDBusPendingCallWatcher *watcher)
     Telepathy::MediaStreamInfoList list =
         qdbus_cast<Telepathy::MediaStreamInfoList>(reply.value());
     QSharedPointer<MediaStream> stream;
-    foreach (const MediaStreamInfo &streamInfo, list) {
-        // TODO retrieve the contact object
+    foreach (const Telepathy::MediaStreamInfo &streamInfo, list) {
         stream = QSharedPointer<MediaStream>(
                 new MediaStream(mPriv->channel,
                     streamInfo.identifier,
@@ -109,29 +144,54 @@ void PendingMediaStreams::gotStreams(QDBusPendingCallWatcher *watcher)
                     (Telepathy::MediaStreamDirection) streamInfo.direction,
                     (Telepathy::MediaStreamPendingSend) streamInfo.pendingSendFlags));
         mPriv->streams.append(stream);
-        mPriv->channel->addStream(stream);
     }
 
-    setFinished();
+    mPriv->getContacts();
 
     watcher->deleteLater();
 }
 
+void PendingMediaStreams::gotContacts(PendingOperation *op)
+{
+    PendingContacts *pc = qobject_cast<PendingContacts *>(op);
+    Q_ASSERT(pc->isForHandles());
+
+    if (pc->isError()) {
+        warning().nospace() << "Gathering contacts failed: "
+            << pc->errorName() << ": " << pc->errorMessage();
+        // TODO should we setFinishedWithError here?
+    }
+
+    QHash<uint, QSharedPointer<Contact> > contactsForHandles;
+    foreach (const QSharedPointer<Contact> &contact, pc->contacts()) {
+        contactsForHandles.insert(contact->handle()[0], contact);
+    }
+
+    foreach (uint handle, pc->invalidHandles()) {
+        contactsForHandles.insert(handle, QSharedPointer<Contact>());
+    }
+
+    foreach (const QSharedPointer<MediaStream> &stream, mPriv->streams) {
+        stream->setContact(contactsForHandles[stream->contactHandle()]);
+    }
+
+    setFinished();
+}
+
 struct MediaStream::Private
 {
     Private(StreamedMediaChannel *channel, uint id,
             uint contactHandle, MediaStreamType type,
             MediaStreamState state, MediaStreamDirection direction,
             MediaStreamPendingSend pendingSend)
-        : id(id), type(type), state(state),
+        : id(id), contactHandle(contactHandle), type(type), state(state),
           direction(direction), pendingSend(pendingSend)
     {
-        ContactManager *contactManager = channel->connection()->contactManager();
-        contact = contactManager->lookupContactByHandle(contactHandle);
     }
 
     StreamedMediaChannel *channel;
     uint id;
+    uint contactHandle;
     QSharedPointer<Contact> contact;
     MediaStreamType type;
     MediaStreamState state;
@@ -289,6 +349,17 @@ PendingOperation *MediaStream::requestStreamDirection(
             mPriv->channel->streamedMediaInterface()->RequestStreamDirection(mPriv->id, direction));
 }
 
+uint MediaStream::contactHandle() const
+{
+    return mPriv->contactHandle;
+}
+
+void MediaStream::setContact(const QSharedPointer<Contact> &contact)
+{
+    Q_ASSERT(!mPriv->contact || mPriv->contact == contact);
+    mPriv->contact = contact;
+}
+
 void MediaStream::setDirection(Telepathy::MediaStreamDirection direction,
         Telepathy::MediaStreamPendingSend pendingSend)
 {
@@ -427,6 +498,13 @@ StreamedMediaChannel::~StreamedMediaChannel()
  */
 MediaStreams StreamedMediaChannel::streams() const
 {
+    if (!isReady(FeatureStreams)) {
+        warning() << "Trying to retrieve streams from streamed media channel, but "
+                     "streams was not requested or the request did not finish yet. "
+                     "Use becomeReady(FeatureStreams)";
+        return MediaStreams();
+    }
+
     return mPriv->streams.values();
 }
 
@@ -495,12 +573,10 @@ void StreamedMediaChannel::gotStreams(QDBusPendingCallWatcher *watcher)
     }
 
     debug() << "Got reply to StreamedMedia::ListStreams()";
-    mPriv->initialStreamsReceived = true;
 
     Telepathy::MediaStreamInfoList list =
         qdbus_cast<Telepathy::MediaStreamInfoList>(reply.value());
-    foreach (const MediaStreamInfo &streamInfo, list) {
-        // TODO retrieve the contact object
+    foreach (const Telepathy::MediaStreamInfo &streamInfo, list) {
         mPriv->streams.insert(streamInfo.identifier,
                 QSharedPointer<MediaStream>(
                     new MediaStream(this,
@@ -511,17 +587,51 @@ void StreamedMediaChannel::gotStreams(QDBusPendingCallWatcher *watcher)
                         (Telepathy::MediaStreamDirection) streamInfo.direction,
                         (Telepathy::MediaStreamPendingSend) streamInfo.pendingSendFlags)));
     }
+    PendingMediaStreams *pms = new PendingMediaStreams(this,
+            mPriv->streams.values(), this);
+    connect(pms,
+            SIGNAL(finished(Telepathy::Client::PendingOperation *)),
+            SLOT(onStreamsReady(Telepathy::Client::PendingOperation *)));
+
+    watcher->deleteLater();
+}
+
+void StreamedMediaChannel::onStreamsReady(PendingOperation *op)
+{
+    if (op->isError()) {
+        mPriv->streams.clear();
+        mPriv->readinessHelper->setIntrospectCompleted(FeatureStreams, false,
+                op->errorName(), op->errorMessage());
+        return;
+    }
 
+    mPriv->initialStreamsReceived = true;
     mPriv->readinessHelper->setIntrospectCompleted(FeatureStreams, true);
+}
 
-    watcher->deleteLater();
+void StreamedMediaChannel::onNewStreamReady(PendingOperation *op)
+{
+    PendingMediaStreams *pms = qobject_cast<PendingMediaStreams *>(op);
+
+    if (op->isError()) {
+        return;
+    }
+
+    Q_ASSERT(pms->streams().size() == 1);
+
+    if (mPriv->initialStreamsReceived) {
+        QSharedPointer<MediaStream> stream = pms->streams().first();
+        emit streamAdded(stream);
+    }
 }
 
 void StreamedMediaChannel::onStreamAdded(uint streamId,
         uint contactHandle, uint streamType)
 {
-    if (mPriv->initialStreamsReceived) {
-        Q_ASSERT(!mPriv->streams.contains(streamId));
+    if (mPriv->streams.contains(streamId)) {
+        debug() << "Received StreamedMediaChannel.StreamAdded for an existing "
+            "stream, ignoring";
+        return;
     }
 
     QSharedPointer<MediaStream> stream = QSharedPointer<MediaStream>(
@@ -533,10 +643,11 @@ void StreamedMediaChannel::onStreamAdded(uint streamId,
                 Telepathy::MediaStreamDirectionNone,
                 (Telepathy::MediaStreamPendingSend) 0));
     mPriv->streams.insert(streamId, stream);
-
-    if (mPriv->initialStreamsReceived) {
-        emit streamAdded(stream);
-    }
+    PendingMediaStreams *pms = new PendingMediaStreams(this,
+            MediaStreams() << stream, this);
+    connect(pms,
+            SIGNAL(finished(Telepathy::Client::PendingOperation *)),
+            SLOT(onNewStreamReady(Telepathy::Client::PendingOperation *)));
 }
 
 void StreamedMediaChannel::onStreamRemoved(uint streamId)
@@ -595,14 +706,5 @@ void StreamedMediaChannel::onStreamError(uint streamId,
     }
 }
 
-void StreamedMediaChannel::addStream(const QSharedPointer<MediaStream> &stream)
-{
-    mPriv->streams.insert(stream->id(), stream);
-
-    if (mPriv->initialStreamsReceived) {
-        emit streamAdded(stream);
-    }
-}
-
 } // Telepathy::Client
 } // Telepathy
diff --git a/TelepathyQt4/Client/streamed-media-channel.h b/TelepathyQt4/Client/streamed-media-channel.h
index 8f6077b..55dfc84 100644
--- a/TelepathyQt4/Client/streamed-media-channel.h
+++ b/TelepathyQt4/Client/streamed-media-channel.h
@@ -50,6 +50,7 @@ public:
 
 private Q_SLOTS:
     void gotStreams(QDBusPendingCallWatcher *);
+    void gotContacts(Telepathy::Client::PendingOperation *);
 
 private:
     friend class StreamedMediaChannel;
@@ -58,6 +59,9 @@ private:
             QSharedPointer<Telepathy::Client::Contact> contact,
             QList<Telepathy::MediaStreamType> types,
             QObject *parent = 0);
+    PendingMediaStreams(StreamedMediaChannel *channel,
+            const MediaStreams &streams,
+            QObject *parent = 0);
 
     struct Private;
     friend struct Private;
@@ -111,6 +115,8 @@ private:
             MediaStreamState state, MediaStreamDirection direction,
             MediaStreamPendingSend pendingSend);
 
+    uint contactHandle() const;
+    void setContact(const QSharedPointer<Contact> &contact);
     void setDirection(Telepathy::MediaStreamDirection direction,
             Telepathy::MediaStreamPendingSend pendingSend);
     void setState(Telepathy::MediaStreamState state);
@@ -151,6 +157,8 @@ Q_SIGNALS:
 
 private Q_SLOTS:
     void gotStreams(QDBusPendingCallWatcher *);
+    void onStreamsReady(Telepathy::Client::PendingOperation *);
+    void onNewStreamReady(Telepathy::Client::PendingOperation *);
     void onStreamAdded(uint, uint, uint);
     void onStreamRemoved(uint);
     void onStreamDirectionChanged(uint, uint, uint);
@@ -160,8 +168,6 @@ private Q_SLOTS:
 private:
     friend class PendingMediaStreams;
 
-    void addStream(const QSharedPointer<MediaStream> &stream);
-
     struct Private;
     friend struct Private;
     Private *mPriv;
-- 
1.5.6.5




More information about the telepathy-commits mailing list