[Telepathy-commits] [telepathy-qt4/master] TextChannel: implement FeatureMessageQueue

Simon McVittie simon.mcvittie at collabora.co.uk
Fri Feb 20 06:09:10 PST 2009


---
 TelepathyQt4/Client/text-channel.cpp |  340 +++++++++++++++++++++++++++++++++-
 TelepathyQt4/Client/text-channel.h   |   23 +--
 2 files changed, 338 insertions(+), 25 deletions(-)

diff --git a/TelepathyQt4/Client/text-channel.cpp b/TelepathyQt4/Client/text-channel.cpp
index 0a028f8..3f11cb1 100644
--- a/TelepathyQt4/Client/text-channel.cpp
+++ b/TelepathyQt4/Client/text-channel.cpp
@@ -24,7 +24,11 @@
 
 #include "TelepathyQt4/Client/_gen/text-channel.moc.hpp"
 
+#include <TelepathyQt4/Client/Connection>
+#include <TelepathyQt4/Client/ContactManager>
+#include <TelepathyQt4/Client/PendingContacts>
 #include <TelepathyQt4/Client/PendingReadyChannel>
+#include <TelepathyQt4/Client/ReferencedHandles>
 
 #include "TelepathyQt4/debug-internal.h"
 
@@ -530,6 +534,25 @@ struct TextChannel::Private
     bool getAllMessagesInFlight;
     bool listPendingMessagesCalled;
     bool initialMessagesReceived;
+    struct QueuedEvent
+    {
+        inline QueuedEvent(const ReceivedMessage &message)
+            : isMessage(true), message(message),
+                removed(0)
+        { }
+        inline QueuedEvent(uint removed)
+            : isMessage(false), message(), removed(removed)
+        { }
+
+        bool isMessage;
+        ReceivedMessage message;
+        uint removed;
+    };
+    QList<ReceivedMessage> messages;
+    QList<QueuedEvent *> incompleteMessages;
+    QSet<uint> awaitingContacts;
+    void contactLost(uint handle);
+    void contactFound(QSharedPointer<Contact> contact);
 };
 
 TextChannel::Private::Private()
@@ -542,7 +565,10 @@ TextChannel::Private::Private()
       connectedMessageQueueSignals(false),
       getAllMessagesInFlight(false),
       listPendingMessagesCalled(false),
-      initialMessagesReceived(false)
+      initialMessagesReceived(false),
+      messages(),
+      incompleteMessages(),
+      awaitingContacts()
 {
 }
 
@@ -567,7 +593,9 @@ TextChannel::Private::~Private()
  *
  * Features that can be enabled on a TextChannel using becomeReady().
  *
- * \value FeatureMessageQueue Doesn't do anything yet
+ * \value FeatureMessageQueue The messageQueue method can be called, and the
+ *                            messageReceived and pendingMessageRemoved methods
+ *                            can be called
  * \value FeatureMessageCapabilities The supportedContentTypes,
  *                                   messagePartSupport and
  *                                   deliveryReportingSupport methods can
@@ -599,6 +627,25 @@ TextChannel::Private::~Private()
  */
 
 /**
+ * \fn void messageReceived(const Telepathy::Client::ReceivedMessage &message)
+ *
+ * Emitted when a message is added to messageQueue(), if the
+ * FeatureMessageQueue Feature has been enabled.
+ *
+ * This occurs slightly later than the message being received over D-Bus;
+ * see messageQueue() for details.
+ */
+
+/**
+ * \fn void pendingMessageRemoved(
+ *      const Telepathy::Client::ReceivedMessage &message)
+ *
+ * Emitted when a message is removed from messageQueue(), if the
+ * FeatureMessageQueue Feature has been enabled. See messageQueue() for the
+ * circumstances in which this happens.
+ */
+
+/**
  * Creates a TextChannel associated with the given object on the same service
  * as the given connection.
  *
@@ -714,6 +761,31 @@ DeliveryReportingSupportFlags TextChannel::deliveryReportingSupport() const
 }
 
 /**
+ * Return a list of messages received in this channel. This list is empty
+ * unless the FeatureMessageQueue Feature has been enabled.
+ *
+ * Messages are added to this list when they are received from the instant
+ * messaging service; the messageReceived signal is emitted.
+ *
+ * There is a small delay between the message being received over D-Bus and
+ * becoming available to users of this C++ API, since a small amount of
+ * additional information needs to be fetched. However, the relative ordering
+ * of all the messages in a channel is preserved.
+ *
+ * Messages are removed from this list when they are acknowledged with the
+ * acknowledge() or forget() methods. On channels where hasMessagesInterface()
+ * returns true, they will also be removed when acknowledged by a different
+ * client. In either case, the pendingMessageRemoved signal is emitted.
+ *
+ * \return The unacknowledged messages in this channel, excluding any that
+ *         have been forgotten with forget().
+ */
+QList<ReceivedMessage> TextChannel::messageQueue() const
+{
+    return mPriv->messages;
+}
+
+/**
  * Return whether the desired features are ready for use.
  *
  * \param channelFeatures Features of the Channel class
@@ -943,11 +1015,141 @@ void TextChannel::onMessageSent(const Telepathy::MessagePartList &parts,
             sentMessageToken);
 }
 
+void TextChannel::processQueue()
+{
+    // Proceed as far as we can with the processing of incoming messages
+    // and message-removal events; message IDs aren't necessarily globally
+    // unique, so we need to process them in the correct order relative
+    // to incoming messages
+    while (!mPriv->incompleteMessages.isEmpty()) {
+        const Private::QueuedEvent *e = mPriv->incompleteMessages.first();
+        debug() << "QueuedEvent:" << e;
+
+        if (e->isMessage) {
+            if (e->message.mPriv->senderHandle() != 0 &&
+                    !e->message.mPriv->sender) {
+                // the message doesn't have a sender Contact, but needs one.
+                // We'll have to stop processing here, and come back to it
+                // when we have more Contact objects
+                break;
+            }
+
+            // if we reach here, the message is ready
+            debug() << "Message is usable, copying to main queue";
+            mPriv->messages << e->message;
+            emit messageReceived(e->message);
+        } else {
+            // forget about the message(s) with ID e->removed (there should be
+            // at most one under normal circumstances)
+            int i = 0;
+            while (i < mPriv->messages.size()) {
+                if (mPriv->messages.at(i).mPriv->pendingId() == e->removed) {
+                    emit pendingMessageRemoved(mPriv->messages.at(i));
+                    mPriv->messages.removeAt(i);
+                } else {
+                    i++;
+                }
+            }
+        }
+
+        debug() << "Dropping first event";
+        delete mPriv->incompleteMessages.takeFirst();
+    }
+
+    if (mPriv->incompleteMessages.isEmpty()) {
+        if (mPriv->pendingFeatures & FeatureMessageQueue) {
+            debug() << "incompleteMessages empty for the first time: "
+                "FeatureMessageQueue is now ready";
+            mPriv->features |= FeatureMessageQueue;
+            mPriv->pendingFeatures &= ~FeatureMessageQueue;
+            mPriv->continueReadying(this);
+        }
+        return;
+    }
+
+    // What Contact objects do we need in order to proceed, ignoring those
+    // for which we've already sent a request?
+    QSet<uint> contactsRequired;
+    foreach (const Private::QueuedEvent *e, mPriv->incompleteMessages) {
+        if (e->isMessage) {
+            uint handle = e->message.mPriv->senderHandle();
+            if (handle != 0 && !e->message.mPriv->sender
+                    && !mPriv->awaitingContacts.contains(handle)) {
+                contactsRequired << handle;
+            }
+        }
+    }
+
+    connect(connection()->contactManager()->contactsForHandles(
+                contactsRequired.toList()),
+            SIGNAL(finished(Telepathy::Client::PendingOperation *)),
+            SLOT(onContactsFinished(Telepathy::Client::PendingOperation *)));
+
+    mPriv->awaitingContacts |= contactsRequired;
+}
+
+void TextChannel::Private::contactLost(uint handle)
+{
+    // we're not going to get a Contact object for this handle, so mark the
+    // messages from that handle as "unknown sender"
+    foreach (QueuedEvent *e, incompleteMessages) {
+        if (e->isMessage && e->message.mPriv->senderHandle() == handle
+                && !e->message.mPriv->sender) {
+            e->message.mPriv->clearSenderHandle();
+        }
+    }
+}
+
+void TextChannel::Private::contactFound(QSharedPointer<Contact> contact)
+{
+    uint handle = contact->handle().at(0);
+
+    foreach (QueuedEvent *e, incompleteMessages) {
+        if (e->isMessage && e->message.mPriv->senderHandle() == handle
+                && !e->message.mPriv->sender) {
+            e->message.mPriv->sender = contact;
+        }
+    }
+}
+
+void TextChannel::onContactsFinished(PendingOperation *op)
+{
+    PendingContacts *pc = qobject_cast<PendingContacts *>(op);
+    UIntList failed;
+
+    Q_ASSERT(pc->isForHandles());
+
+    foreach (uint handle, pc->handles()) {
+        mPriv->awaitingContacts -= handle;
+    }
+
+    if (pc->isError()) {
+        warning().nospace() << "Gathering contacts failed: "
+            << pc->errorName() << ": " << pc->errorMessage();
+        foreach (uint handle, pc->handles()) {
+            mPriv->contactLost(handle);
+        }
+    } else {
+        foreach (const QSharedPointer<Contact> &contact, pc->contacts()) {
+            mPriv->contactFound(contact);
+        }
+        foreach (uint handle, pc->invalidHandles()) {
+            mPriv->contactLost(handle);
+        }
+    }
+    // all the messages we were asking about should now be ready
+    processQueue();
+}
+
 void TextChannel::onMessageReceived(const Telepathy::MessagePartList &parts)
 {
     if (!mPriv->initialMessagesReceived) {
         return;
     }
+
+    mPriv->incompleteMessages << new Private::QueuedEvent(
+            ReceivedMessage(parts, this));
+    processQueue();
 }
 
 void TextChannel::onPendingMessagesRemoved(const Telepathy::UIntList &ids)
@@ -955,6 +1157,10 @@ void TextChannel::onPendingMessagesRemoved(const Telepathy::UIntList &ids)
     if (!mPriv->initialMessagesReceived) {
         return;
     }
+    foreach (uint id, ids) {
+        mPriv->incompleteMessages << new Private::QueuedEvent(id);
+    }
+    processQueue();
 }
 
 void TextChannel::onTextSent(uint timestamp, uint type, const QString &text)
@@ -969,6 +1175,49 @@ void TextChannel::onTextReceived(uint id, uint timestamp, uint sender,
     if (!mPriv->initialMessagesReceived) {
         return;
     }
+
+    MessagePart header;
+
+    if (timestamp == 0) {
+        timestamp = QDateTime::currentDateTime().toTime_t();
+    }
+    header.insert(QString::fromAscii("message-received"),
+            QDBusVariant(static_cast<qlonglong>(timestamp)));
+
+    header.insert(QString::fromAscii("pending-message-id"), QDBusVariant(id));
+    header.insert(QString::fromAscii("message-sender"), QDBusVariant(sender));
+    header.insert(QString::fromAscii("message-type"), QDBusVariant(type));
+
+    if (flags & ChannelTextMessageFlagScrollback) {
+        header.insert(QString::fromAscii("scrollback"), QDBusVariant(true));
+    }
+    if (flags & ChannelTextMessageFlagRescued) {
+        header.insert(QString::fromAscii("rescued"), QDBusVariant(true));
+    }
+
+    MessagePart body;
+
+    body.insert(QString::fromAscii("content-type"),
+            QDBusVariant(QString::fromAscii("text/plain")));
+    body.insert(QString::fromAscii("content"), QDBusVariant(text));
+
+    if (flags & ChannelTextMessageFlagTruncated) {
+        header.insert(QString::fromAscii("truncated"), QDBusVariant(true));
+    }
+
+    MessagePartList parts;
+    parts << header;
+    parts << body;
+
+    ReceivedMessage m(parts, this);
+
+    if (flags & ChannelTextMessageFlagNonTextContent) {
+        // set the "you are not expected to understand this" flag
+        m.mPriv->forceNonText = true;
+    }
+
+    mPriv->incompleteMessages << new Private::QueuedEvent(m);
+    processQueue();
 }
 
 void TextChannel::onTextSendError(uint error, uint timestamp, uint type,
@@ -977,6 +1226,60 @@ void TextChannel::onTextSendError(uint error, uint timestamp, uint type,
     if (!mPriv->initialMessagesReceived) {
         return;
     }
+
+    MessagePart header;
+
+    header.insert(QString::fromAscii("message-received"),
+            QDBusVariant(static_cast<qlonglong>(
+                    QDateTime::currentDateTime().toTime_t())));
+    header.insert(QString::fromAscii("message-type"),
+            QDBusVariant(static_cast<uint>(
+                    ChannelTextMessageTypeDeliveryReport)));
+
+    // we can't tell whether it's a temporary or permanent failure here,
+    // so guess based on the delivery-error
+    uint deliveryStatus;
+    switch (error) {
+        case ChannelTextSendErrorOffline:
+        case ChannelTextSendErrorPermissionDenied:
+            deliveryStatus = DeliveryStatusTemporarilyFailed;
+            break;
+
+        case ChannelTextSendErrorInvalidContact:
+        case ChannelTextSendErrorTooLong:
+        case ChannelTextSendErrorNotImplemented:
+            deliveryStatus = DeliveryStatusPermanentlyFailed;
+            break;
+
+        case ChannelTextSendErrorUnknown:
+        default:
+            deliveryStatus = DeliveryStatusTemporarilyFailed;
+            break;
+    }
+
+    header.insert(QString::fromAscii("delivery-status"),
+            QDBusVariant(deliveryStatus));
+    header.insert(QString::fromAscii("delivery-error"), QDBusVariant(error));
+
+    MessagePart echoHeader;
+    echoHeader.insert(QString::fromAscii("message-sent"),
+            QDBusVariant(timestamp));
+    echoHeader.insert(QString::fromAscii("message-type"),
+            QDBusVariant(type));
+
+    MessagePart echoBody;
+    echoBody.insert(QString::fromAscii("content-type"),
+            QDBusVariant(QString::fromAscii("text/plain")));
+    echoBody.insert(QString::fromAscii("content"), QDBusVariant(text));
+
+    MessagePartList echo;
+    echo << echoHeader;
+    echo << echoBody;
+    header.insert(QString::fromAscii("delivery-echo"),
+            QDBusVariant(QVariant::fromValue(echo)));
+
+    MessagePartList parts;
+    parts << header;
 }
 
 void TextChannel::onGetAllMessagesReply(QDBusPendingCallWatcher *watcher)
@@ -1001,12 +1304,16 @@ void TextChannel::onGetAllMessagesReply(QDBusPendingCallWatcher *watcher)
         (mPriv->pendingFeatures & FeatureMessageQueue)) {
         mPriv->initialMessagesReceived = true;
 
-        // FIXME: actually put the messages in the queue
-
-        mPriv->features |= FeatureMessageQueue;
-        mPriv->pendingFeatures &= ~FeatureMessageQueue;
+        MessagePartListList messages = qdbus_cast<MessagePartListList>(
+                props["PendingMessages"]);
+        foreach (const MessagePartList &message, messages) {
+            onMessageReceived(message);
+        }
     }
 
+    // Since we have the capabilities, we might as well set them - doing this
+    // multiple times is a no-op
+
     mPriv->supportedContentTypes = qdbus_cast<QStringList>(
             props["SupportedContentTypes"]);
     if (mPriv->supportedContentTypes.isEmpty()) {
@@ -1029,10 +1336,25 @@ void TextChannel::onListPendingMessagesReply(QDBusPendingCallWatcher *watcher)
 
     mPriv->initialMessagesReceived = true;
 
-    // FIXME: actually put the messages in the queue
+    QDBusPendingReply<PendingTextMessageList> reply = *watcher;
+    PendingTextMessageList list;
+
+    if (!reply.isError()) {
+        debug() << "Text::ListPendingMessages returned";
+        list = reply.value();
+    } else {
+        warning().nospace() << "Properties::GetAll(Channel.Interface.Messages)"
+            " failed with " << reply.error().name() << ": " <<
+            reply.error().message();
+        // ... and act as though list was empty
+    }
+
+    foreach (const PendingTextMessage &message, list) {
+        onTextReceived(message.identifier, message.unixTimestamp,
+                message.sender, message.messageType, message.flags,
+                message.text);
+    }
 
-    mPriv->features |= FeatureMessageQueue;
-    mPriv->pendingFeatures &= ~FeatureMessageQueue;
     mPriv->continueReadying(this);
 }
 
diff --git a/TelepathyQt4/Client/text-channel.h b/TelepathyQt4/Client/text-channel.h
index ddcc91f..f46136f 100644
--- a/TelepathyQt4/Client/text-channel.h
+++ b/TelepathyQt4/Client/text-channel.h
@@ -133,10 +133,8 @@ public:
     MessagePartSupportFlags messagePartSupport() const;
     DeliveryReportingSupportFlags deliveryReportingSupport() const;
 
-#if 0
     // requires FeatureMessageQueue
     QList<ReceivedMessage> messageQueue() const;
-#endif
 
 #if 0
 public Q_SLOTS:
@@ -188,21 +186,14 @@ Q_SIGNALS:
             Telepathy::MessageSendingFlags flags,
             const QString &sentMessageToken);
 
-#if 0
-    // Change notification for messageQueue()
-    //
-    // Only emitted when FeatureMessageQueue is enabled.
-    void messageReceived(ReceivedMessage message);
-
-    // Change notification for messageQueue()
-    //
-    // Only emitted when FeatureMessageQueue is enabled and
-    // hasMessagesInterface() returns true.
-    void pendingMessagesRemoved(QList<ReceivedMessage> messages);
-#endif
+    // FeatureMessageQueue
+    void messageReceived(const Telepathy::Client::ReceivedMessage &message);
+    void pendingMessageRemoved(
+            const Telepathy::Client::ReceivedMessage &message);
 
 private Q_SLOTS:
     void onChannelReady(Telepathy::Client::PendingOperation *);
+    void onContactsFinished(Telepathy::Client::PendingOperation *);
 
     void onMessageSent(const Telepathy::MessagePartList &, uint,
             const QString &);
@@ -216,11 +207,11 @@ private Q_SLOTS:
     void onListPendingMessagesReply(QDBusPendingCallWatcher *);
 
 private:
+    void processQueue();
+
     struct Private;
     friend struct Private;
     Private *mPriv;
-    // Implementation: messageQueue should probably be implemented as a
-    // QMap<uint,ReceivedMessage>
 };
 
 } // Telepathy::Client
-- 
1.5.6.5




More information about the telepathy-commits mailing list