From 4e1c8dd6639c2debe1e14c94e564237fb76ef48a Mon Sep 17 00:00:00 2001 From: Jani Mustonen Date: Wed, 15 Nov 2017 18:38:50 +0200 Subject: [PATCH] Implement a per-room send queue. (#118) [ci skip] --- include/MatrixClient.h | 4 +- include/TimelineView.h | 28 ++++++--- include/TimelineViewManager.h | 9 ++- src/ChatPage.cc | 6 +- src/MatrixClient.cc | 13 +++-- src/TimelineView.cc | 104 ++++++++++++++++++++++++---------- src/TimelineViewManager.cc | 28 +++++---- 7 files changed, 131 insertions(+), 61 deletions(-) diff --git a/include/MatrixClient.h b/include/MatrixClient.h index d6dd7162..b2765c01 100644 --- a/include/MatrixClient.h +++ b/include/MatrixClient.h @@ -40,6 +40,7 @@ public: void initialSync() noexcept; void sync() noexcept; void sendRoomMessage(matrix::events::MessageEventType ty, + int txnId, const QString &roomid, const QString &msg, const QString &url = "") noexcept; @@ -61,7 +62,7 @@ public: QUrl getHomeServer() { return server_; }; int transactionId() { return txn_id_; }; - void incrementTransactionId() { txn_id_ += 1; }; + int incrementTransactionId() { return ++txn_id_; }; // It is incredibly important that it's incremented first. void reset() noexcept; @@ -103,6 +104,7 @@ signals: void syncFailed(const QString &msg); void joinFailed(const QString &msg); void messageSent(const QString &event_id, const QString &roomid, const int txn_id); + void messageSendFailed(const QString &roomid, const int txn_id); void emoteSent(const QString &event_id, const QString &roomid, const int txn_id); void messagesRetrieved(const QString &room_id, const RoomMessages &msgs); void joinedRoom(const QString &room_id); diff --git a/include/TimelineView.h b/include/TimelineView.h index 7e44db46..af0e9386 100644 --- a/include/TimelineView.h +++ b/include/TimelineView.h @@ -19,6 +19,7 @@ #include #include +#include #include #include "Emote.h" @@ -42,14 +43,18 @@ namespace events = matrix::events; // but not yet confirmed by the homeserver through sync. struct PendingMessage { + matrix::events::MessageEventType ty; int txn_id; QString body; + QString filename; QString event_id; TimelineItem *widget; - PendingMessage(int txn_id, QString body, QString event_id, TimelineItem *widget) - : txn_id(txn_id) + PendingMessage(matrix::events::MessageEventType ty, int txn_id, QString body, QString filename, QString event_id, TimelineItem *widget) + : ty(ty) + , txn_id(txn_id) , body(body) + , filename(filename) , event_id(event_id) , widget(widget) {} @@ -86,8 +91,8 @@ public: // Add new events at the end of the timeline. int addEvents(const Timeline &timeline); - void addUserMessage(matrix::events::MessageEventType ty, const QString &msg, int txn_id); - void addUserMessage(const QString &url, const QString &filename, int txn_id); + void addUserMessage(matrix::events::MessageEventType ty, const QString &msg); + void addUserMessage(const QString &url, const QString &filename); void updatePendingMessage(int txn_id, QString event_id); void scrollDown(); @@ -102,6 +107,11 @@ public slots: // Whether or not the initial batch has been loaded. bool hasLoaded() { return scroll_layout_->count() > 1 || isTimelineFinished; }; + void handleFailedMessage(int txnid); + +private slots: + void sendNextPendingMessage(); + signals: void updateLastTimelineMessage(const QString &user, const DescInfo &info); @@ -115,14 +125,15 @@ private: // sender's name. bool isSenderRendered(const QString &user_id, TimelineDirection direction); - bool isPendingMessage(const QString &eventid, - const QString &body, + bool isPendingMessage(const QString &txnid, const QString &sender, const QString &userid); - void removePendingMessage(const QString &eventid, const QString &body); + void removePendingMessage(const QString &txnid); bool isDuplicate(const QString &event_id) { return eventIds_.contains(event_id); }; + void handleNewUserMessage(PendingMessage msg); + // Return nullptr if the event couldn't be parsed. TimelineItem *parseMessageEvent(const QJsonObject &event, TimelineDirection direction); @@ -162,6 +173,7 @@ private: // The events currently rendered. Used for duplicate detection. QMap eventIds_; - QList pending_msgs_; + QQueue pending_msgs_; + QList pending_sent_msgs_; QSharedPointer client_; }; diff --git a/include/TimelineViewManager.h b/include/TimelineViewManager.h index 8ff49f20..5bd3054f 100644 --- a/include/TimelineViewManager.h +++ b/include/TimelineViewManager.h @@ -21,6 +21,8 @@ #include #include +#include "MessageEvent.h" + class JoinedRoom; class MatrixClient; class RoomInfoListItem; @@ -61,12 +63,13 @@ signals: public slots: void setHistoryView(const QString &room_id); - void sendTextMessage(const QString &msg); - void sendEmoteMessage(const QString &msg); - void sendImageMessage(const QString &roomid, const QString &filename, const QString &url); + void queueTextMessage(const QString &msg); + void queueEmoteMessage(const QString &msg); + void queueImageMessage(const QString &roomid, const QString &filename, const QString &url); private slots: void messageSent(const QString &eventid, const QString &roomid, int txnid); + void messageSendFailed(const QString &roomid, int txnid); private: QString active_room_; diff --git a/src/ChatPage.cc b/src/ChatPage.cc index b07729cf..4091086b 100644 --- a/src/ChatPage.cc +++ b/src/ChatPage.cc @@ -165,12 +165,12 @@ ChatPage::ChatPage(QSharedPointer client, QWidget *parent) connect(text_input_, SIGNAL(sendTextMessage(const QString &)), view_manager_, - SLOT(sendTextMessage(const QString &))); + SLOT(queueTextMessage(const QString &))); connect(text_input_, SIGNAL(sendEmoteMessage(const QString &)), view_manager_, - SLOT(sendEmoteMessage(const QString &))); + SLOT(queueEmoteMessage(const QString &))); connect(text_input_, &TextInputWidget::sendJoinRoomRequest, @@ -187,7 +187,7 @@ ChatPage::ChatPage(QSharedPointer client, QWidget *parent) this, [=](QString roomid, QString filename, QString url) { text_input_->hideUploadSpinner(); - view_manager_->sendImageMessage(roomid, filename, url); + view_manager_->queueImageMessage(roomid, filename, url); }); connect(client_.data(), diff --git a/src/MatrixClient.cc b/src/MatrixClient.cc index 3876d044..5589bdc7 100644 --- a/src/MatrixClient.cc +++ b/src/MatrixClient.cc @@ -261,6 +261,7 @@ MatrixClient::sync() noexcept void MatrixClient::sendRoomMessage(matrix::events::MessageEventType ty, + int txnId, const QString &roomid, const QString &msg, const QString &url) noexcept @@ -270,7 +271,7 @@ MatrixClient::sendRoomMessage(matrix::events::MessageEventType ty, QUrl endpoint(server_); endpoint.setPath(clientApiUrl_ + - QString("/rooms/%1/send/m.room.message/%2").arg(roomid).arg(txn_id_)); + QString("/rooms/%1/send/m.room.message/%2").arg(roomid).arg(txnId)); endpoint.setQuery(query); QString msgType(""); @@ -295,7 +296,6 @@ MatrixClient::sendRoomMessage(matrix::events::MessageEventType ty, request.setHeader(QNetworkRequest::ContentTypeHeader, "application/json"); auto reply = put(request, QJsonDocument(body).toJson(QJsonDocument::Compact)); - auto txnId = this->txn_id_; connect(reply, &QNetworkReply::finished, this, [this, reply, roomid, txnId]() { reply->deleteLater(); @@ -304,18 +304,22 @@ MatrixClient::sendRoomMessage(matrix::events::MessageEventType ty, if (status == 0 || status >= 400) { qWarning() << reply->errorString(); + emit messageSendFailed(roomid, txnId); return; } auto data = reply->readAll(); - if (data.isEmpty()) + if (data.isEmpty()) { + emit messageSendFailed(roomid, txnId); return; + } auto json = QJsonDocument::fromJson(data); if (!json.isObject()) { qDebug() << "Send message response is not a JSON object"; + emit messageSendFailed(roomid, txnId); return; } @@ -323,13 +327,12 @@ MatrixClient::sendRoomMessage(matrix::events::MessageEventType ty, if (!object.contains("event_id")) { qDebug() << "SendTextMessage: missing event_id from response"; + emit messageSendFailed(roomid, txnId); return; } emit messageSent(object.value("event_id").toString(), roomid, txnId); }); - - incrementTransactionId(); } void diff --git a/src/TimelineView.cc b/src/TimelineView.cc index 1ffa731d..0e45bf63 100644 --- a/src/TimelineView.cc +++ b/src/TimelineView.cc @@ -17,6 +17,7 @@ #include #include +#include #include #include @@ -245,9 +246,9 @@ TimelineView::parseMessageEvent(const QJsonObject &event, TimelineDirection dire eventIds_[text.eventId()] = true; - if (isPendingMessage( - text.eventId(), text.content().body(), text.sender(), local_user_)) { - removePendingMessage(text.eventId(), text.content().body()); + QString txnid = text.unsignedData().transactionId(); + if (!txnid.isEmpty() && isPendingMessage(txnid, text.sender(), local_user_)) { + removePendingMessage(txnid); return nullptr; } @@ -291,9 +292,9 @@ TimelineView::parseMessageEvent(const QJsonObject &event, TimelineDirection dire eventIds_[img.eventId()] = true; - if (isPendingMessage( - img.eventId(), img.msgContent().url(), img.sender(), local_user_)) { - removePendingMessage(img.eventId(), img.msgContent().url()); + QString txnid = img.unsignedData().transactionId(); + if (!txnid.isEmpty() && isPendingMessage(txnid, img.sender(), local_user_)) { + removePendingMessage(txnid); return nullptr; } @@ -317,11 +318,9 @@ TimelineView::parseMessageEvent(const QJsonObject &event, TimelineDirection dire eventIds_[emote.eventId()] = true; - if (isPendingMessage(emote.eventId(), - emote.content().body(), - emote.sender(), - local_user_)) { - removePendingMessage(emote.eventId(), emote.content().body()); + QString txnid = emote.unsignedData().transactionId(); + if (!txnid.isEmpty() && isPendingMessage(txnid, emote.sender(), local_user_)) { + removePendingMessage(txnid); return nullptr; } @@ -499,16 +498,16 @@ TimelineView::addTimelineItem(TimelineItem *item, TimelineDirection direction) void TimelineView::updatePendingMessage(int txn_id, QString event_id) { - for (auto &msg : pending_msgs_) { - if (msg.txn_id == txn_id) { - msg.event_id = event_id; - break; - } + if (pending_msgs_.head().txn_id == txn_id) { // We haven't received it yet + auto msg = pending_msgs_.dequeue(); + msg.event_id = event_id; + pending_sent_msgs_.append(msg); } + sendNextPendingMessage(); } void -TimelineView::addUserMessage(matrix::events::MessageEventType ty, const QString &body, int txn_id) +TimelineView::addUserMessage(matrix::events::MessageEventType ty, const QString &body) { QSettings settings; auto user_id = settings.value("auth/user_id").toString(); @@ -523,12 +522,13 @@ TimelineView::addUserMessage(matrix::events::MessageEventType ty, const QString lastSender_ = user_id; - PendingMessage message(txn_id, body, "", view_item); - pending_msgs_.push_back(message); + int txn_id = client_->incrementTransactionId(); + PendingMessage message(ty, txn_id, body, "", "", view_item); + handleNewUserMessage(message); } void -TimelineView::addUserMessage(const QString &url, const QString &filename, int txn_id) +TimelineView::addUserMessage(const QString &url, const QString &filename) { QSettings settings; auto user_id = settings.value("auth/user_id").toString(); @@ -545,8 +545,34 @@ TimelineView::addUserMessage(const QString &url, const QString &filename, int tx lastSender_ = user_id; - PendingMessage message(txn_id, url, "", view_item); - pending_msgs_.push_back(message); + int txn_id = client_->incrementTransactionId(); + PendingMessage message(matrix::events::MessageEventType::Image, txn_id, url, filename, "", view_item); + handleNewUserMessage(message); +} + +void +TimelineView::handleNewUserMessage(PendingMessage msg) +{ + pending_msgs_.enqueue(msg); + if (pending_msgs_.size() == 1 && pending_sent_msgs_.size() == 0) + sendNextPendingMessage(); +} + +void +TimelineView::sendNextPendingMessage() +{ + if (pending_msgs_.size() == 0) + return; + + PendingMessage &m = pending_msgs_.head(); + switch (m.ty) { + case matrix::events::MessageEventType::Image: + client_->sendRoomMessage(m.ty, m.txn_id, room_id_, QFileInfo(m.filename).fileName(), m.body); + break; + default: + client_->sendRoomMessage(m.ty, m.txn_id, room_id_, m.body); + break; + } } void @@ -562,8 +588,7 @@ TimelineView::notifyForLastEvent() } bool -TimelineView::isPendingMessage(const QString &eventid, - const QString &body, +TimelineView::isPendingMessage(const QString &txnid, const QString &sender, const QString &local_userid) { @@ -571,7 +596,12 @@ TimelineView::isPendingMessage(const QString &eventid, return false; for (const auto &msg : pending_msgs_) { - if (msg.event_id == eventid || msg.body == body) + if (QString::number(msg.txn_id) == txnid) + return true; + } + + for (const auto &msg : pending_sent_msgs_) { + if (QString::number(msg.txn_id) == txnid) return true; } @@ -579,14 +609,28 @@ TimelineView::isPendingMessage(const QString &eventid, } void -TimelineView::removePendingMessage(const QString &eventid, const QString &body) +TimelineView::removePendingMessage(const QString &txnid) { + for (auto it = pending_sent_msgs_.begin(); it != pending_sent_msgs_.end(); ++it) { + if (QString::number(it->txn_id) == txnid) { + int index = std::distance(pending_sent_msgs_.begin(), it); + pending_sent_msgs_.removeAt(index); + return; + } + } for (auto it = pending_msgs_.begin(); it != pending_msgs_.end(); ++it) { - int index = std::distance(pending_msgs_.begin(), it); - - if (it->event_id == eventid || it->body == body) { + if (QString::number(it->txn_id) == txnid) { + int index = std::distance(pending_msgs_.begin(), it); pending_msgs_.removeAt(index); - break; + return; } } } + +void +TimelineView::handleFailedMessage(int txnid) +{ + Q_UNUSED(txnid); + // Note: We do this even if the message has already been echoed. + QTimer::singleShot(500, this, SLOT(sendNextPendingMessage())); +} diff --git a/src/TimelineViewManager.cc b/src/TimelineViewManager.cc index 37feabbe..35c2a560 100644 --- a/src/TimelineViewManager.cc +++ b/src/TimelineViewManager.cc @@ -35,6 +35,10 @@ TimelineViewManager::TimelineViewManager(QSharedPointer client, QW connect( client_.data(), &MatrixClient::messageSent, this, &TimelineViewManager::messageSent); + + connect( + client_.data(), &MatrixClient::messageSendFailed, + this, &TimelineViewManager::messageSendFailed); } TimelineViewManager::~TimelineViewManager() {} @@ -51,28 +55,32 @@ TimelineViewManager::messageSent(const QString &event_id, const QString &roomid, } void -TimelineViewManager::sendTextMessage(const QString &msg) +TimelineViewManager::messageSendFailed(const QString &roomid, int txn_id) +{ + auto view = views_[roomid]; + view->handleFailedMessage(txn_id); +} + +void +TimelineViewManager::queueTextMessage(const QString &msg) { auto room_id = active_room_; auto view = views_[room_id]; - view->addUserMessage(matrix::events::MessageEventType::Text, msg, client_->transactionId()); - client_->sendRoomMessage(matrix::events::MessageEventType::Text, room_id, msg); + view->addUserMessage(matrix::events::MessageEventType::Text, msg); } void -TimelineViewManager::sendEmoteMessage(const QString &msg) +TimelineViewManager::queueEmoteMessage(const QString &msg) { auto room_id = active_room_; auto view = views_[room_id]; - view->addUserMessage( - matrix::events::MessageEventType::Emote, msg, client_->transactionId()); - client_->sendRoomMessage(matrix::events::MessageEventType::Emote, room_id, msg); + view->addUserMessage(matrix::events::MessageEventType::Emote, msg); } void -TimelineViewManager::sendImageMessage(const QString &roomid, +TimelineViewManager::queueImageMessage(const QString &roomid, const QString &filename, const QString &url) { @@ -83,9 +91,7 @@ TimelineViewManager::sendImageMessage(const QString &roomid, auto view = views_[roomid]; - view->addUserMessage(url, filename, client_->transactionId()); - client_->sendRoomMessage( - matrix::events::MessageEventType::Image, roomid, QFileInfo(filename).fileName(), url); + view->addUserMessage(url, filename); } void