[Telepathy-commits] [telepathy-qt4/master] StreamedMediaChannel: Only signal new streams/FeatureStreams ready when the stream contacts are retrieved.
Andre Moreira Magalhaes (andrunko)
andre.magalhaes at collabora.co.uk
Wed Mar 11 08:47:13 PDT 2009
---
TelepathyQt4/Client/streamed-media-channel.cpp | 164 +++++++++++++++++++-----
TelepathyQt4/Client/streamed-media-channel.h | 10 +-
2 files changed, 141 insertions(+), 33 deletions(-)
diff --git a/TelepathyQt4/Client/streamed-media-channel.cpp b/TelepathyQt4/Client/streamed-media-channel.cpp
index b1a0896..e552633 100644
--- a/TelepathyQt4/Client/streamed-media-channel.cpp
+++ b/TelepathyQt4/Client/streamed-media-channel.cpp
@@ -26,6 +26,7 @@
#include <TelepathyQt4/Client/Connection>
#include <TelepathyQt4/Client/ContactManager>
+#include <TelepathyQt4/Client/PendingContacts>
#include <TelepathyQt4/Client/PendingVoidMethodCall>
#include <QHash>
@@ -37,21 +38,44 @@ namespace Client
struct PendingMediaStreams::Private
{
- Private(StreamedMediaChannel *channel)
- : channel(channel)
+ Private(PendingMediaStreams *parent, StreamedMediaChannel *channel)
+ : parent(parent), channel(channel)
{
}
+ Private(PendingMediaStreams *parent, StreamedMediaChannel *channel,
+ const MediaStreams &streams)
+ : parent(parent), channel(channel), streams(streams)
+ {
+ getContacts();
+ }
+
+ void getContacts();
+
+ PendingMediaStreams *parent;
StreamedMediaChannel *channel;
MediaStreams streams;
};
+void PendingMediaStreams::Private::getContacts()
+{
+ QSet<uint> contactsRequired;
+ foreach (const QSharedPointer<MediaStream> &stream, streams) {
+ contactsRequired |= stream->contactHandle();
+ }
+
+ ContactManager *contactManager = channel->connection()->contactManager();
+ parent->connect(contactManager->contactsForHandles(contactsRequired.toList()),
+ SIGNAL(finished(Telepathy::Client::PendingOperation *)),
+ SLOT(gotContacts(Telepathy::Client::PendingOperation *)));
+}
+
PendingMediaStreams::PendingMediaStreams(StreamedMediaChannel *channel,
QSharedPointer<Telepathy::Client::Contact> contact,
QList<Telepathy::MediaStreamType> types,
QObject *parent)
: PendingOperation(parent),
- mPriv(new Private(channel))
+ mPriv(new Private(this, channel))
{
Telepathy::UIntList l;
foreach (Telepathy::MediaStreamType type, types) {
@@ -66,6 +90,14 @@ PendingMediaStreams::PendingMediaStreams(StreamedMediaChannel *channel,
SLOT(gotStreams(QDBusPendingCallWatcher*)));
}
+PendingMediaStreams::PendingMediaStreams(StreamedMediaChannel *channel,
+ const MediaStreams &streams,
+ QObject *parent)
+ : PendingOperation(parent),
+ mPriv(new Private(this, channel, streams))
+{
+}
+
PendingMediaStreams::~PendingMediaStreams()
{
delete mPriv;
@@ -74,9 +106,13 @@ PendingMediaStreams::~PendingMediaStreams()
MediaStreams PendingMediaStreams::streams() const
{
if (!isFinished()) {
- warning() << "PendingMediaStreams::streams called before finished";
+ warning() << "PendingMediaStreams::streams called before finished, "
+ "returning empty list";
+ return MediaStreams();
} else if (!isValid()) {
- warning() << "PendingMediaStreams::streams called when not valid";
+ warning() << "PendingMediaStreams::streams called when not valid, "
+ "returning empty list";
+ return MediaStreams();
}
return mPriv->streams;
@@ -98,8 +134,7 @@ void PendingMediaStreams::gotStreams(QDBusPendingCallWatcher *watcher)
Telepathy::MediaStreamInfoList list =
qdbus_cast<Telepathy::MediaStreamInfoList>(reply.value());
QSharedPointer<MediaStream> stream;
- foreach (const MediaStreamInfo &streamInfo, list) {
- // TODO retrieve the contact object
+ foreach (const Telepathy::MediaStreamInfo &streamInfo, list) {
stream = QSharedPointer<MediaStream>(
new MediaStream(mPriv->channel,
streamInfo.identifier,
@@ -109,29 +144,54 @@ void PendingMediaStreams::gotStreams(QDBusPendingCallWatcher *watcher)
(Telepathy::MediaStreamDirection) streamInfo.direction,
(Telepathy::MediaStreamPendingSend) streamInfo.pendingSendFlags));
mPriv->streams.append(stream);
- mPriv->channel->addStream(stream);
}
- setFinished();
+ mPriv->getContacts();
watcher->deleteLater();
}
+void PendingMediaStreams::gotContacts(PendingOperation *op)
+{
+ PendingContacts *pc = qobject_cast<PendingContacts *>(op);
+ Q_ASSERT(pc->isForHandles());
+
+ if (pc->isError()) {
+ warning().nospace() << "Gathering contacts failed: "
+ << pc->errorName() << ": " << pc->errorMessage();
+ // TODO should we setFinishedWithError here?
+ }
+
+ QHash<uint, QSharedPointer<Contact> > contactsForHandles;
+ foreach (const QSharedPointer<Contact> &contact, pc->contacts()) {
+ contactsForHandles.insert(contact->handle()[0], contact);
+ }
+
+ foreach (uint handle, pc->invalidHandles()) {
+ contactsForHandles.insert(handle, QSharedPointer<Contact>());
+ }
+
+ foreach (const QSharedPointer<MediaStream> &stream, mPriv->streams) {
+ stream->setContact(contactsForHandles[stream->contactHandle()]);
+ }
+
+ setFinished();
+}
+
struct MediaStream::Private
{
Private(StreamedMediaChannel *channel, uint id,
uint contactHandle, MediaStreamType type,
MediaStreamState state, MediaStreamDirection direction,
MediaStreamPendingSend pendingSend)
- : id(id), type(type), state(state),
+ : id(id), contactHandle(contactHandle), type(type), state(state),
direction(direction), pendingSend(pendingSend)
{
- ContactManager *contactManager = channel->connection()->contactManager();
- contact = contactManager->lookupContactByHandle(contactHandle);
}
StreamedMediaChannel *channel;
uint id;
+ uint contactHandle;
QSharedPointer<Contact> contact;
MediaStreamType type;
MediaStreamState state;
@@ -289,6 +349,17 @@ PendingOperation *MediaStream::requestStreamDirection(
mPriv->channel->streamedMediaInterface()->RequestStreamDirection(mPriv->id, direction));
}
+uint MediaStream::contactHandle() const
+{
+ return mPriv->contactHandle;
+}
+
+void MediaStream::setContact(const QSharedPointer<Contact> &contact)
+{
+ Q_ASSERT(!mPriv->contact || mPriv->contact == contact);
+ mPriv->contact = contact;
+}
+
void MediaStream::setDirection(Telepathy::MediaStreamDirection direction,
Telepathy::MediaStreamPendingSend pendingSend)
{
@@ -427,6 +498,13 @@ StreamedMediaChannel::~StreamedMediaChannel()
*/
MediaStreams StreamedMediaChannel::streams() const
{
+ if (!isReady(FeatureStreams)) {
+ warning() << "Trying to retrieve streams from streamed media channel, but "
+ "streams was not requested or the request did not finish yet. "
+ "Use becomeReady(FeatureStreams)";
+ return MediaStreams();
+ }
+
return mPriv->streams.values();
}
@@ -495,12 +573,10 @@ void StreamedMediaChannel::gotStreams(QDBusPendingCallWatcher *watcher)
}
debug() << "Got reply to StreamedMedia::ListStreams()";
- mPriv->initialStreamsReceived = true;
Telepathy::MediaStreamInfoList list =
qdbus_cast<Telepathy::MediaStreamInfoList>(reply.value());
- foreach (const MediaStreamInfo &streamInfo, list) {
- // TODO retrieve the contact object
+ foreach (const Telepathy::MediaStreamInfo &streamInfo, list) {
mPriv->streams.insert(streamInfo.identifier,
QSharedPointer<MediaStream>(
new MediaStream(this,
@@ -511,17 +587,51 @@ void StreamedMediaChannel::gotStreams(QDBusPendingCallWatcher *watcher)
(Telepathy::MediaStreamDirection) streamInfo.direction,
(Telepathy::MediaStreamPendingSend) streamInfo.pendingSendFlags)));
}
+ PendingMediaStreams *pms = new PendingMediaStreams(this,
+ mPriv->streams.values(), this);
+ connect(pms,
+ SIGNAL(finished(Telepathy::Client::PendingOperation *)),
+ SLOT(onStreamsReady(Telepathy::Client::PendingOperation *)));
+
+ watcher->deleteLater();
+}
+
+void StreamedMediaChannel::onStreamsReady(PendingOperation *op)
+{
+ if (op->isError()) {
+ mPriv->streams.clear();
+ mPriv->readinessHelper->setIntrospectCompleted(FeatureStreams, false,
+ op->errorName(), op->errorMessage());
+ return;
+ }
+ mPriv->initialStreamsReceived = true;
mPriv->readinessHelper->setIntrospectCompleted(FeatureStreams, true);
+}
- watcher->deleteLater();
+void StreamedMediaChannel::onNewStreamReady(PendingOperation *op)
+{
+ PendingMediaStreams *pms = qobject_cast<PendingMediaStreams *>(op);
+
+ if (op->isError()) {
+ return;
+ }
+
+ Q_ASSERT(pms->streams().size() == 1);
+
+ if (mPriv->initialStreamsReceived) {
+ QSharedPointer<MediaStream> stream = pms->streams().first();
+ emit streamAdded(stream);
+ }
}
void StreamedMediaChannel::onStreamAdded(uint streamId,
uint contactHandle, uint streamType)
{
- if (mPriv->initialStreamsReceived) {
- Q_ASSERT(!mPriv->streams.contains(streamId));
+ if (mPriv->streams.contains(streamId)) {
+ debug() << "Received StreamedMediaChannel.StreamAdded for an existing "
+ "stream, ignoring";
+ return;
}
QSharedPointer<MediaStream> stream = QSharedPointer<MediaStream>(
@@ -533,10 +643,11 @@ void StreamedMediaChannel::onStreamAdded(uint streamId,
Telepathy::MediaStreamDirectionNone,
(Telepathy::MediaStreamPendingSend) 0));
mPriv->streams.insert(streamId, stream);
-
- if (mPriv->initialStreamsReceived) {
- emit streamAdded(stream);
- }
+ PendingMediaStreams *pms = new PendingMediaStreams(this,
+ MediaStreams() << stream, this);
+ connect(pms,
+ SIGNAL(finished(Telepathy::Client::PendingOperation *)),
+ SLOT(onNewStreamReady(Telepathy::Client::PendingOperation *)));
}
void StreamedMediaChannel::onStreamRemoved(uint streamId)
@@ -595,14 +706,5 @@ void StreamedMediaChannel::onStreamError(uint streamId,
}
}
-void StreamedMediaChannel::addStream(const QSharedPointer<MediaStream> &stream)
-{
- mPriv->streams.insert(stream->id(), stream);
-
- if (mPriv->initialStreamsReceived) {
- emit streamAdded(stream);
- }
-}
-
} // Telepathy::Client
} // Telepathy
diff --git a/TelepathyQt4/Client/streamed-media-channel.h b/TelepathyQt4/Client/streamed-media-channel.h
index 8f6077b..55dfc84 100644
--- a/TelepathyQt4/Client/streamed-media-channel.h
+++ b/TelepathyQt4/Client/streamed-media-channel.h
@@ -50,6 +50,7 @@ public:
private Q_SLOTS:
void gotStreams(QDBusPendingCallWatcher *);
+ void gotContacts(Telepathy::Client::PendingOperation *);
private:
friend class StreamedMediaChannel;
@@ -58,6 +59,9 @@ private:
QSharedPointer<Telepathy::Client::Contact> contact,
QList<Telepathy::MediaStreamType> types,
QObject *parent = 0);
+ PendingMediaStreams(StreamedMediaChannel *channel,
+ const MediaStreams &streams,
+ QObject *parent = 0);
struct Private;
friend struct Private;
@@ -111,6 +115,8 @@ private:
MediaStreamState state, MediaStreamDirection direction,
MediaStreamPendingSend pendingSend);
+ uint contactHandle() const;
+ void setContact(const QSharedPointer<Contact> &contact);
void setDirection(Telepathy::MediaStreamDirection direction,
Telepathy::MediaStreamPendingSend pendingSend);
void setState(Telepathy::MediaStreamState state);
@@ -151,6 +157,8 @@ Q_SIGNALS:
private Q_SLOTS:
void gotStreams(QDBusPendingCallWatcher *);
+ void onStreamsReady(Telepathy::Client::PendingOperation *);
+ void onNewStreamReady(Telepathy::Client::PendingOperation *);
void onStreamAdded(uint, uint, uint);
void onStreamRemoved(uint);
void onStreamDirectionChanged(uint, uint, uint);
@@ -160,8 +168,6 @@ private Q_SLOTS:
private:
friend class PendingMediaStreams;
- void addStream(const QSharedPointer<MediaStream> &stream);
-
struct Private;
friend struct Private;
Private *mPriv;
--
1.5.6.5
More information about the telepathy-commits
mailing list