[Telepathy-commits] [telepathy-qt4/master] TextChannel: implement FeatureMessageQueue
Simon McVittie
simon.mcvittie at collabora.co.uk
Fri Feb 20 06:09:10 PST 2009
---
TelepathyQt4/Client/text-channel.cpp | 340 +++++++++++++++++++++++++++++++++-
TelepathyQt4/Client/text-channel.h | 23 +--
2 files changed, 338 insertions(+), 25 deletions(-)
diff --git a/TelepathyQt4/Client/text-channel.cpp b/TelepathyQt4/Client/text-channel.cpp
index 0a028f8..3f11cb1 100644
--- a/TelepathyQt4/Client/text-channel.cpp
+++ b/TelepathyQt4/Client/text-channel.cpp
@@ -24,7 +24,11 @@
#include "TelepathyQt4/Client/_gen/text-channel.moc.hpp"
+#include <TelepathyQt4/Client/Connection>
+#include <TelepathyQt4/Client/ContactManager>
+#include <TelepathyQt4/Client/PendingContacts>
#include <TelepathyQt4/Client/PendingReadyChannel>
+#include <TelepathyQt4/Client/ReferencedHandles>
#include "TelepathyQt4/debug-internal.h"
@@ -530,6 +534,25 @@ struct TextChannel::Private
bool getAllMessagesInFlight;
bool listPendingMessagesCalled;
bool initialMessagesReceived;
+ struct QueuedEvent
+ {
+ inline QueuedEvent(const ReceivedMessage &message)
+ : isMessage(true), message(message),
+ removed(0)
+ { }
+ inline QueuedEvent(uint removed)
+ : isMessage(false), message(), removed(removed)
+ { }
+
+ bool isMessage;
+ ReceivedMessage message;
+ uint removed;
+ };
+ QList<ReceivedMessage> messages;
+ QList<QueuedEvent *> incompleteMessages;
+ QSet<uint> awaitingContacts;
+ void contactLost(uint handle);
+ void contactFound(QSharedPointer<Contact> contact);
};
TextChannel::Private::Private()
@@ -542,7 +565,10 @@ TextChannel::Private::Private()
connectedMessageQueueSignals(false),
getAllMessagesInFlight(false),
listPendingMessagesCalled(false),
- initialMessagesReceived(false)
+ initialMessagesReceived(false),
+ messages(),
+ incompleteMessages(),
+ awaitingContacts()
{
}
@@ -567,7 +593,9 @@ TextChannel::Private::~Private()
*
* Features that can be enabled on a TextChannel using becomeReady().
*
- * \value FeatureMessageQueue Doesn't do anything yet
+ * \value FeatureMessageQueue The messageQueue method can be called, and the
+ * messageReceived and pendingMessageRemoved methods
+ * can be called
* \value FeatureMessageCapabilities The supportedContentTypes,
* messagePartSupport and
* deliveryReportingSupport methods can
@@ -599,6 +627,25 @@ TextChannel::Private::~Private()
*/
/**
+ * \fn void messageReceived(const Telepathy::Client::ReceivedMessage &message)
+ *
+ * Emitted when a message is added to messageQueue(), if the
+ * FeatureMessageQueue Feature has been enabled.
+ *
+ * This occurs slightly later than the message being received over D-Bus;
+ * see messageQueue() for details.
+ */
+
+/**
+ * \fn void pendingMessageRemoved(
+ * const Telepathy::Client::ReceivedMessage &message)
+ *
+ * Emitted when a message is removed from messageQueue(), if the
+ * FeatureMessageQueue Feature has been enabled. See messageQueue() for the
+ * circumstances in which this happens.
+ */
+
+/**
* Creates a TextChannel associated with the given object on the same service
* as the given connection.
*
@@ -714,6 +761,31 @@ DeliveryReportingSupportFlags TextChannel::deliveryReportingSupport() const
}
/**
+ * Return a list of messages received in this channel. This list is empty
+ * unless the FeatureMessageQueue Feature has been enabled.
+ *
+ * Messages are added to this list when they are received from the instant
+ * messaging service; the messageReceived signal is emitted.
+ *
+ * There is a small delay between the message being received over D-Bus and
+ * becoming available to users of this C++ API, since a small amount of
+ * additional information needs to be fetched. However, the relative ordering
+ * of all the messages in a channel is preserved.
+ *
+ * Messages are removed from this list when they are acknowledged with the
+ * acknowledge() or forget() methods. On channels where hasMessagesInterface()
+ * returns true, they will also be removed when acknowledged by a different
+ * client. In either case, the pendingMessageRemoved signal is emitted.
+ *
+ * \return The unacknowledged messages in this channel, excluding any that
+ * have been forgotten with forget().
+ */
+QList<ReceivedMessage> TextChannel::messageQueue() const
+{
+ return mPriv->messages;
+}
+
+/**
* Return whether the desired features are ready for use.
*
* \param channelFeatures Features of the Channel class
@@ -943,11 +1015,141 @@ void TextChannel::onMessageSent(const Telepathy::MessagePartList &parts,
sentMessageToken);
}
+void TextChannel::processQueue()
+{
+ // Proceed as far as we can with the processing of incoming messages
+ // and message-removal events; message IDs aren't necessarily globally
+ // unique, so we need to process them in the correct order relative
+ // to incoming messages
+ while (!mPriv->incompleteMessages.isEmpty()) {
+ const Private::QueuedEvent *e = mPriv->incompleteMessages.first();
+ debug() << "QueuedEvent:" << e;
+
+ if (e->isMessage) {
+ if (e->message.mPriv->senderHandle() != 0 &&
+ !e->message.mPriv->sender) {
+ // the message doesn't have a sender Contact, but needs one.
+ // We'll have to stop processing here, and come back to it
+ // when we have more Contact objects
+ break;
+ }
+
+ // if we reach here, the message is ready
+ debug() << "Message is usable, copying to main queue";
+ mPriv->messages << e->message;
+ emit messageReceived(e->message);
+ } else {
+ // forget about the message(s) with ID e->removed (there should be
+ // at most one under normal circumstances)
+ int i = 0;
+ while (i < mPriv->messages.size()) {
+ if (mPriv->messages.at(i).mPriv->pendingId() == e->removed) {
+ emit pendingMessageRemoved(mPriv->messages.at(i));
+ mPriv->messages.removeAt(i);
+ } else {
+ i++;
+ }
+ }
+ }
+
+ debug() << "Dropping first event";
+ delete mPriv->incompleteMessages.takeFirst();
+ }
+
+ if (mPriv->incompleteMessages.isEmpty()) {
+ if (mPriv->pendingFeatures & FeatureMessageQueue) {
+ debug() << "incompleteMessages empty for the first time: "
+ "FeatureMessageQueue is now ready";
+ mPriv->features |= FeatureMessageQueue;
+ mPriv->pendingFeatures &= ~FeatureMessageQueue;
+ mPriv->continueReadying(this);
+ }
+ return;
+ }
+
+ // What Contact objects do we need in order to proceed, ignoring those
+ // for which we've already sent a request?
+ QSet<uint> contactsRequired;
+ foreach (const Private::QueuedEvent *e, mPriv->incompleteMessages) {
+ if (e->isMessage) {
+ uint handle = e->message.mPriv->senderHandle();
+ if (handle != 0 && !e->message.mPriv->sender
+ && !mPriv->awaitingContacts.contains(handle)) {
+ contactsRequired << handle;
+ }
+ }
+ }
+
+ connect(connection()->contactManager()->contactsForHandles(
+ contactsRequired.toList()),
+ SIGNAL(finished(Telepathy::Client::PendingOperation *)),
+ SLOT(onContactsFinished(Telepathy::Client::PendingOperation *)));
+
+ mPriv->awaitingContacts |= contactsRequired;
+}
+
+void TextChannel::Private::contactLost(uint handle)
+{
+ // we're not going to get a Contact object for this handle, so mark the
+ // messages from that handle as "unknown sender"
+ foreach (QueuedEvent *e, incompleteMessages) {
+ if (e->isMessage && e->message.mPriv->senderHandle() == handle
+ && !e->message.mPriv->sender) {
+ e->message.mPriv->clearSenderHandle();
+ }
+ }
+}
+
+void TextChannel::Private::contactFound(QSharedPointer<Contact> contact)
+{
+ uint handle = contact->handle().at(0);
+
+ foreach (QueuedEvent *e, incompleteMessages) {
+ if (e->isMessage && e->message.mPriv->senderHandle() == handle
+ && !e->message.mPriv->sender) {
+ e->message.mPriv->sender = contact;
+ }
+ }
+}
+
+void TextChannel::onContactsFinished(PendingOperation *op)
+{
+ PendingContacts *pc = qobject_cast<PendingContacts *>(op);
+ UIntList failed;
+
+ Q_ASSERT(pc->isForHandles());
+
+ foreach (uint handle, pc->handles()) {
+ mPriv->awaitingContacts -= handle;
+ }
+
+ if (pc->isError()) {
+ warning().nospace() << "Gathering contacts failed: "
+ << pc->errorName() << ": " << pc->errorMessage();
+ foreach (uint handle, pc->handles()) {
+ mPriv->contactLost(handle);
+ }
+ } else {
+ foreach (const QSharedPointer<Contact> &contact, pc->contacts()) {
+ mPriv->contactFound(contact);
+ }
+ foreach (uint handle, pc->invalidHandles()) {
+ mPriv->contactLost(handle);
+ }
+ }
+ // all the messages we were asking about should now be ready
+ processQueue();
+}
+
void TextChannel::onMessageReceived(const Telepathy::MessagePartList &parts)
{
if (!mPriv->initialMessagesReceived) {
return;
}
+
+ mPriv->incompleteMessages << new Private::QueuedEvent(
+ ReceivedMessage(parts, this));
+ processQueue();
}
void TextChannel::onPendingMessagesRemoved(const Telepathy::UIntList &ids)
@@ -955,6 +1157,10 @@ void TextChannel::onPendingMessagesRemoved(const Telepathy::UIntList &ids)
if (!mPriv->initialMessagesReceived) {
return;
}
+ foreach (uint id, ids) {
+ mPriv->incompleteMessages << new Private::QueuedEvent(id);
+ }
+ processQueue();
}
void TextChannel::onTextSent(uint timestamp, uint type, const QString &text)
@@ -969,6 +1175,49 @@ void TextChannel::onTextReceived(uint id, uint timestamp, uint sender,
if (!mPriv->initialMessagesReceived) {
return;
}
+
+ MessagePart header;
+
+ if (timestamp == 0) {
+ timestamp = QDateTime::currentDateTime().toTime_t();
+ }
+ header.insert(QString::fromAscii("message-received"),
+ QDBusVariant(static_cast<qlonglong>(timestamp)));
+
+ header.insert(QString::fromAscii("pending-message-id"), QDBusVariant(id));
+ header.insert(QString::fromAscii("message-sender"), QDBusVariant(sender));
+ header.insert(QString::fromAscii("message-type"), QDBusVariant(type));
+
+ if (flags & ChannelTextMessageFlagScrollback) {
+ header.insert(QString::fromAscii("scrollback"), QDBusVariant(true));
+ }
+ if (flags & ChannelTextMessageFlagRescued) {
+ header.insert(QString::fromAscii("rescued"), QDBusVariant(true));
+ }
+
+ MessagePart body;
+
+ body.insert(QString::fromAscii("content-type"),
+ QDBusVariant(QString::fromAscii("text/plain")));
+ body.insert(QString::fromAscii("content"), QDBusVariant(text));
+
+ if (flags & ChannelTextMessageFlagTruncated) {
+ header.insert(QString::fromAscii("truncated"), QDBusVariant(true));
+ }
+
+ MessagePartList parts;
+ parts << header;
+ parts << body;
+
+ ReceivedMessage m(parts, this);
+
+ if (flags & ChannelTextMessageFlagNonTextContent) {
+ // set the "you are not expected to understand this" flag
+ m.mPriv->forceNonText = true;
+ }
+
+ mPriv->incompleteMessages << new Private::QueuedEvent(m);
+ processQueue();
}
void TextChannel::onTextSendError(uint error, uint timestamp, uint type,
@@ -977,6 +1226,60 @@ void TextChannel::onTextSendError(uint error, uint timestamp, uint type,
if (!mPriv->initialMessagesReceived) {
return;
}
+
+ MessagePart header;
+
+ header.insert(QString::fromAscii("message-received"),
+ QDBusVariant(static_cast<qlonglong>(
+ QDateTime::currentDateTime().toTime_t())));
+ header.insert(QString::fromAscii("message-type"),
+ QDBusVariant(static_cast<uint>(
+ ChannelTextMessageTypeDeliveryReport)));
+
+ // we can't tell whether it's a temporary or permanent failure here,
+ // so guess based on the delivery-error
+ uint deliveryStatus;
+ switch (error) {
+ case ChannelTextSendErrorOffline:
+ case ChannelTextSendErrorPermissionDenied:
+ deliveryStatus = DeliveryStatusTemporarilyFailed;
+ break;
+
+ case ChannelTextSendErrorInvalidContact:
+ case ChannelTextSendErrorTooLong:
+ case ChannelTextSendErrorNotImplemented:
+ deliveryStatus = DeliveryStatusPermanentlyFailed;
+ break;
+
+ case ChannelTextSendErrorUnknown:
+ default:
+ deliveryStatus = DeliveryStatusTemporarilyFailed;
+ break;
+ }
+
+ header.insert(QString::fromAscii("delivery-status"),
+ QDBusVariant(deliveryStatus));
+ header.insert(QString::fromAscii("delivery-error"), QDBusVariant(error));
+
+ MessagePart echoHeader;
+ echoHeader.insert(QString::fromAscii("message-sent"),
+ QDBusVariant(timestamp));
+ echoHeader.insert(QString::fromAscii("message-type"),
+ QDBusVariant(type));
+
+ MessagePart echoBody;
+ echoBody.insert(QString::fromAscii("content-type"),
+ QDBusVariant(QString::fromAscii("text/plain")));
+ echoBody.insert(QString::fromAscii("content"), QDBusVariant(text));
+
+ MessagePartList echo;
+ echo << echoHeader;
+ echo << echoBody;
+ header.insert(QString::fromAscii("delivery-echo"),
+ QDBusVariant(QVariant::fromValue(echo)));
+
+ MessagePartList parts;
+ parts << header;
}
void TextChannel::onGetAllMessagesReply(QDBusPendingCallWatcher *watcher)
@@ -1001,12 +1304,16 @@ void TextChannel::onGetAllMessagesReply(QDBusPendingCallWatcher *watcher)
(mPriv->pendingFeatures & FeatureMessageQueue)) {
mPriv->initialMessagesReceived = true;
- // FIXME: actually put the messages in the queue
-
- mPriv->features |= FeatureMessageQueue;
- mPriv->pendingFeatures &= ~FeatureMessageQueue;
+ MessagePartListList messages = qdbus_cast<MessagePartListList>(
+ props["PendingMessages"]);
+ foreach (const MessagePartList &message, messages) {
+ onMessageReceived(message);
+ }
}
+ // Since we have the capabilities, we might as well set them - doing this
+ // multiple times is a no-op
+
mPriv->supportedContentTypes = qdbus_cast<QStringList>(
props["SupportedContentTypes"]);
if (mPriv->supportedContentTypes.isEmpty()) {
@@ -1029,10 +1336,25 @@ void TextChannel::onListPendingMessagesReply(QDBusPendingCallWatcher *watcher)
mPriv->initialMessagesReceived = true;
- // FIXME: actually put the messages in the queue
+ QDBusPendingReply<PendingTextMessageList> reply = *watcher;
+ PendingTextMessageList list;
+
+ if (!reply.isError()) {
+ debug() << "Text::ListPendingMessages returned";
+ list = reply.value();
+ } else {
+ warning().nospace() << "Properties::GetAll(Channel.Interface.Messages)"
+ " failed with " << reply.error().name() << ": " <<
+ reply.error().message();
+ // ... and act as though list was empty
+ }
+
+ foreach (const PendingTextMessage &message, list) {
+ onTextReceived(message.identifier, message.unixTimestamp,
+ message.sender, message.messageType, message.flags,
+ message.text);
+ }
- mPriv->features |= FeatureMessageQueue;
- mPriv->pendingFeatures &= ~FeatureMessageQueue;
mPriv->continueReadying(this);
}
diff --git a/TelepathyQt4/Client/text-channel.h b/TelepathyQt4/Client/text-channel.h
index ddcc91f..f46136f 100644
--- a/TelepathyQt4/Client/text-channel.h
+++ b/TelepathyQt4/Client/text-channel.h
@@ -133,10 +133,8 @@ public:
MessagePartSupportFlags messagePartSupport() const;
DeliveryReportingSupportFlags deliveryReportingSupport() const;
-#if 0
// requires FeatureMessageQueue
QList<ReceivedMessage> messageQueue() const;
-#endif
#if 0
public Q_SLOTS:
@@ -188,21 +186,14 @@ Q_SIGNALS:
Telepathy::MessageSendingFlags flags,
const QString &sentMessageToken);
-#if 0
- // Change notification for messageQueue()
- //
- // Only emitted when FeatureMessageQueue is enabled.
- void messageReceived(ReceivedMessage message);
-
- // Change notification for messageQueue()
- //
- // Only emitted when FeatureMessageQueue is enabled and
- // hasMessagesInterface() returns true.
- void pendingMessagesRemoved(QList<ReceivedMessage> messages);
-#endif
+ // FeatureMessageQueue
+ void messageReceived(const Telepathy::Client::ReceivedMessage &message);
+ void pendingMessageRemoved(
+ const Telepathy::Client::ReceivedMessage &message);
private Q_SLOTS:
void onChannelReady(Telepathy::Client::PendingOperation *);
+ void onContactsFinished(Telepathy::Client::PendingOperation *);
void onMessageSent(const Telepathy::MessagePartList &, uint,
const QString &);
@@ -216,11 +207,11 @@ private Q_SLOTS:
void onListPendingMessagesReply(QDBusPendingCallWatcher *);
private:
+ void processQueue();
+
struct Private;
friend struct Private;
Private *mPriv;
- // Implementation: messageQueue should probably be implemented as a
- // QMap<uint,ReceivedMessage>
};
} // Telepathy::Client
--
1.5.6.5
More information about the telepathy-commits
mailing list