Implement a per-room send queue. (#118)

[ci skip]
This commit is contained in:
Jani Mustonen 2017-11-15 18:38:50 +02:00 committed by mujx
parent 4a912a2dff
commit 4e1c8dd663
7 changed files with 131 additions and 61 deletions

View file

@ -40,6 +40,7 @@ public:
void initialSync() noexcept; void initialSync() noexcept;
void sync() noexcept; void sync() noexcept;
void sendRoomMessage(matrix::events::MessageEventType ty, void sendRoomMessage(matrix::events::MessageEventType ty,
int txnId,
const QString &roomid, const QString &roomid,
const QString &msg, const QString &msg,
const QString &url = "") noexcept; const QString &url = "") noexcept;
@ -61,7 +62,7 @@ public:
QUrl getHomeServer() { return server_; }; QUrl getHomeServer() { return server_; };
int transactionId() { return txn_id_; }; int transactionId() { return txn_id_; };
void incrementTransactionId() { txn_id_ += 1; }; int incrementTransactionId() { return ++txn_id_; }; // It is incredibly important that it's incremented first.
void reset() noexcept; void reset() noexcept;
@ -103,6 +104,7 @@ signals:
void syncFailed(const QString &msg); void syncFailed(const QString &msg);
void joinFailed(const QString &msg); void joinFailed(const QString &msg);
void messageSent(const QString &event_id, const QString &roomid, const int txn_id); void messageSent(const QString &event_id, const QString &roomid, const int txn_id);
void messageSendFailed(const QString &roomid, const int txn_id);
void emoteSent(const QString &event_id, const QString &roomid, const int txn_id); void emoteSent(const QString &event_id, const QString &roomid, const int txn_id);
void messagesRetrieved(const QString &room_id, const RoomMessages &msgs); void messagesRetrieved(const QString &room_id, const RoomMessages &msgs);
void joinedRoom(const QString &room_id); void joinedRoom(const QString &room_id);

View file

@ -19,6 +19,7 @@
#include <QLayout> #include <QLayout>
#include <QList> #include <QList>
#include <QQueue>
#include <QScrollArea> #include <QScrollArea>
#include "Emote.h" #include "Emote.h"
@ -42,14 +43,18 @@ namespace events = matrix::events;
// but not yet confirmed by the homeserver through sync. // but not yet confirmed by the homeserver through sync.
struct PendingMessage struct PendingMessage
{ {
matrix::events::MessageEventType ty;
int txn_id; int txn_id;
QString body; QString body;
QString filename;
QString event_id; QString event_id;
TimelineItem *widget; TimelineItem *widget;
PendingMessage(int txn_id, QString body, QString event_id, TimelineItem *widget) PendingMessage(matrix::events::MessageEventType ty, int txn_id, QString body, QString filename, QString event_id, TimelineItem *widget)
: txn_id(txn_id) : ty(ty)
, txn_id(txn_id)
, body(body) , body(body)
, filename(filename)
, event_id(event_id) , event_id(event_id)
, widget(widget) , widget(widget)
{} {}
@ -86,8 +91,8 @@ public:
// Add new events at the end of the timeline. // Add new events at the end of the timeline.
int addEvents(const Timeline &timeline); int addEvents(const Timeline &timeline);
void addUserMessage(matrix::events::MessageEventType ty, const QString &msg, int txn_id); void addUserMessage(matrix::events::MessageEventType ty, const QString &msg);
void addUserMessage(const QString &url, const QString &filename, int txn_id); void addUserMessage(const QString &url, const QString &filename);
void updatePendingMessage(int txn_id, QString event_id); void updatePendingMessage(int txn_id, QString event_id);
void scrollDown(); void scrollDown();
@ -102,6 +107,11 @@ public slots:
// Whether or not the initial batch has been loaded. // Whether or not the initial batch has been loaded.
bool hasLoaded() { return scroll_layout_->count() > 1 || isTimelineFinished; }; bool hasLoaded() { return scroll_layout_->count() > 1 || isTimelineFinished; };
void handleFailedMessage(int txnid);
private slots:
void sendNextPendingMessage();
signals: signals:
void updateLastTimelineMessage(const QString &user, const DescInfo &info); void updateLastTimelineMessage(const QString &user, const DescInfo &info);
@ -115,14 +125,15 @@ private:
// sender's name. // sender's name.
bool isSenderRendered(const QString &user_id, TimelineDirection direction); bool isSenderRendered(const QString &user_id, TimelineDirection direction);
bool isPendingMessage(const QString &eventid, bool isPendingMessage(const QString &txnid,
const QString &body,
const QString &sender, const QString &sender,
const QString &userid); const QString &userid);
void removePendingMessage(const QString &eventid, const QString &body); void removePendingMessage(const QString &txnid);
bool isDuplicate(const QString &event_id) { return eventIds_.contains(event_id); }; bool isDuplicate(const QString &event_id) { return eventIds_.contains(event_id); };
void handleNewUserMessage(PendingMessage msg);
// Return nullptr if the event couldn't be parsed. // Return nullptr if the event couldn't be parsed.
TimelineItem *parseMessageEvent(const QJsonObject &event, TimelineDirection direction); TimelineItem *parseMessageEvent(const QJsonObject &event, TimelineDirection direction);
@ -162,6 +173,7 @@ private:
// The events currently rendered. Used for duplicate detection. // The events currently rendered. Used for duplicate detection.
QMap<QString, bool> eventIds_; QMap<QString, bool> eventIds_;
QList<PendingMessage> pending_msgs_; QQueue<PendingMessage> pending_msgs_;
QList<PendingMessage> pending_sent_msgs_;
QSharedPointer<MatrixClient> client_; QSharedPointer<MatrixClient> client_;
}; };

View file

@ -21,6 +21,8 @@
#include <QSharedPointer> #include <QSharedPointer>
#include <QStackedWidget> #include <QStackedWidget>
#include "MessageEvent.h"
class JoinedRoom; class JoinedRoom;
class MatrixClient; class MatrixClient;
class RoomInfoListItem; class RoomInfoListItem;
@ -61,12 +63,13 @@ signals:
public slots: public slots:
void setHistoryView(const QString &room_id); void setHistoryView(const QString &room_id);
void sendTextMessage(const QString &msg); void queueTextMessage(const QString &msg);
void sendEmoteMessage(const QString &msg); void queueEmoteMessage(const QString &msg);
void sendImageMessage(const QString &roomid, const QString &filename, const QString &url); void queueImageMessage(const QString &roomid, const QString &filename, const QString &url);
private slots: private slots:
void messageSent(const QString &eventid, const QString &roomid, int txnid); void messageSent(const QString &eventid, const QString &roomid, int txnid);
void messageSendFailed(const QString &roomid, int txnid);
private: private:
QString active_room_; QString active_room_;

View file

@ -165,12 +165,12 @@ ChatPage::ChatPage(QSharedPointer<MatrixClient> client, QWidget *parent)
connect(text_input_, connect(text_input_,
SIGNAL(sendTextMessage(const QString &)), SIGNAL(sendTextMessage(const QString &)),
view_manager_, view_manager_,
SLOT(sendTextMessage(const QString &))); SLOT(queueTextMessage(const QString &)));
connect(text_input_, connect(text_input_,
SIGNAL(sendEmoteMessage(const QString &)), SIGNAL(sendEmoteMessage(const QString &)),
view_manager_, view_manager_,
SLOT(sendEmoteMessage(const QString &))); SLOT(queueEmoteMessage(const QString &)));
connect(text_input_, connect(text_input_,
&TextInputWidget::sendJoinRoomRequest, &TextInputWidget::sendJoinRoomRequest,
@ -187,7 +187,7 @@ ChatPage::ChatPage(QSharedPointer<MatrixClient> client, QWidget *parent)
this, this,
[=](QString roomid, QString filename, QString url) { [=](QString roomid, QString filename, QString url) {
text_input_->hideUploadSpinner(); text_input_->hideUploadSpinner();
view_manager_->sendImageMessage(roomid, filename, url); view_manager_->queueImageMessage(roomid, filename, url);
}); });
connect(client_.data(), connect(client_.data(),

View file

@ -261,6 +261,7 @@ MatrixClient::sync() noexcept
void void
MatrixClient::sendRoomMessage(matrix::events::MessageEventType ty, MatrixClient::sendRoomMessage(matrix::events::MessageEventType ty,
int txnId,
const QString &roomid, const QString &roomid,
const QString &msg, const QString &msg,
const QString &url) noexcept const QString &url) noexcept
@ -270,7 +271,7 @@ MatrixClient::sendRoomMessage(matrix::events::MessageEventType ty,
QUrl endpoint(server_); QUrl endpoint(server_);
endpoint.setPath(clientApiUrl_ + endpoint.setPath(clientApiUrl_ +
QString("/rooms/%1/send/m.room.message/%2").arg(roomid).arg(txn_id_)); QString("/rooms/%1/send/m.room.message/%2").arg(roomid).arg(txnId));
endpoint.setQuery(query); endpoint.setQuery(query);
QString msgType(""); QString msgType("");
@ -295,7 +296,6 @@ MatrixClient::sendRoomMessage(matrix::events::MessageEventType ty,
request.setHeader(QNetworkRequest::ContentTypeHeader, "application/json"); request.setHeader(QNetworkRequest::ContentTypeHeader, "application/json");
auto reply = put(request, QJsonDocument(body).toJson(QJsonDocument::Compact)); auto reply = put(request, QJsonDocument(body).toJson(QJsonDocument::Compact));
auto txnId = this->txn_id_;
connect(reply, &QNetworkReply::finished, this, [this, reply, roomid, txnId]() { connect(reply, &QNetworkReply::finished, this, [this, reply, roomid, txnId]() {
reply->deleteLater(); reply->deleteLater();
@ -304,18 +304,22 @@ MatrixClient::sendRoomMessage(matrix::events::MessageEventType ty,
if (status == 0 || status >= 400) { if (status == 0 || status >= 400) {
qWarning() << reply->errorString(); qWarning() << reply->errorString();
emit messageSendFailed(roomid, txnId);
return; return;
} }
auto data = reply->readAll(); auto data = reply->readAll();
if (data.isEmpty()) if (data.isEmpty()) {
emit messageSendFailed(roomid, txnId);
return; return;
}
auto json = QJsonDocument::fromJson(data); auto json = QJsonDocument::fromJson(data);
if (!json.isObject()) { if (!json.isObject()) {
qDebug() << "Send message response is not a JSON object"; qDebug() << "Send message response is not a JSON object";
emit messageSendFailed(roomid, txnId);
return; return;
} }
@ -323,13 +327,12 @@ MatrixClient::sendRoomMessage(matrix::events::MessageEventType ty,
if (!object.contains("event_id")) { if (!object.contains("event_id")) {
qDebug() << "SendTextMessage: missing event_id from response"; qDebug() << "SendTextMessage: missing event_id from response";
emit messageSendFailed(roomid, txnId);
return; return;
} }
emit messageSent(object.value("event_id").toString(), roomid, txnId); emit messageSent(object.value("event_id").toString(), roomid, txnId);
}); });
incrementTransactionId();
} }
void void

View file

@ -17,6 +17,7 @@
#include <QApplication> #include <QApplication>
#include <QDebug> #include <QDebug>
#include <QFileInfo>
#include <QSettings> #include <QSettings>
#include <QTimer> #include <QTimer>
@ -245,9 +246,9 @@ TimelineView::parseMessageEvent(const QJsonObject &event, TimelineDirection dire
eventIds_[text.eventId()] = true; eventIds_[text.eventId()] = true;
if (isPendingMessage( QString txnid = text.unsignedData().transactionId();
text.eventId(), text.content().body(), text.sender(), local_user_)) { if (!txnid.isEmpty() && isPendingMessage(txnid, text.sender(), local_user_)) {
removePendingMessage(text.eventId(), text.content().body()); removePendingMessage(txnid);
return nullptr; return nullptr;
} }
@ -291,9 +292,9 @@ TimelineView::parseMessageEvent(const QJsonObject &event, TimelineDirection dire
eventIds_[img.eventId()] = true; eventIds_[img.eventId()] = true;
if (isPendingMessage( QString txnid = img.unsignedData().transactionId();
img.eventId(), img.msgContent().url(), img.sender(), local_user_)) { if (!txnid.isEmpty() && isPendingMessage(txnid, img.sender(), local_user_)) {
removePendingMessage(img.eventId(), img.msgContent().url()); removePendingMessage(txnid);
return nullptr; return nullptr;
} }
@ -317,11 +318,9 @@ TimelineView::parseMessageEvent(const QJsonObject &event, TimelineDirection dire
eventIds_[emote.eventId()] = true; eventIds_[emote.eventId()] = true;
if (isPendingMessage(emote.eventId(), QString txnid = emote.unsignedData().transactionId();
emote.content().body(), if (!txnid.isEmpty() && isPendingMessage(txnid, emote.sender(), local_user_)) {
emote.sender(), removePendingMessage(txnid);
local_user_)) {
removePendingMessage(emote.eventId(), emote.content().body());
return nullptr; return nullptr;
} }
@ -499,16 +498,16 @@ TimelineView::addTimelineItem(TimelineItem *item, TimelineDirection direction)
void void
TimelineView::updatePendingMessage(int txn_id, QString event_id) TimelineView::updatePendingMessage(int txn_id, QString event_id)
{ {
for (auto &msg : pending_msgs_) { if (pending_msgs_.head().txn_id == txn_id) { // We haven't received it yet
if (msg.txn_id == txn_id) { auto msg = pending_msgs_.dequeue();
msg.event_id = event_id; msg.event_id = event_id;
break; pending_sent_msgs_.append(msg);
}
} }
sendNextPendingMessage();
} }
void void
TimelineView::addUserMessage(matrix::events::MessageEventType ty, const QString &body, int txn_id) TimelineView::addUserMessage(matrix::events::MessageEventType ty, const QString &body)
{ {
QSettings settings; QSettings settings;
auto user_id = settings.value("auth/user_id").toString(); auto user_id = settings.value("auth/user_id").toString();
@ -523,12 +522,13 @@ TimelineView::addUserMessage(matrix::events::MessageEventType ty, const QString
lastSender_ = user_id; lastSender_ = user_id;
PendingMessage message(txn_id, body, "", view_item); int txn_id = client_->incrementTransactionId();
pending_msgs_.push_back(message); PendingMessage message(ty, txn_id, body, "", "", view_item);
handleNewUserMessage(message);
} }
void void
TimelineView::addUserMessage(const QString &url, const QString &filename, int txn_id) TimelineView::addUserMessage(const QString &url, const QString &filename)
{ {
QSettings settings; QSettings settings;
auto user_id = settings.value("auth/user_id").toString(); auto user_id = settings.value("auth/user_id").toString();
@ -545,8 +545,34 @@ TimelineView::addUserMessage(const QString &url, const QString &filename, int tx
lastSender_ = user_id; lastSender_ = user_id;
PendingMessage message(txn_id, url, "", view_item); int txn_id = client_->incrementTransactionId();
pending_msgs_.push_back(message); PendingMessage message(matrix::events::MessageEventType::Image, txn_id, url, filename, "", view_item);
handleNewUserMessage(message);
}
void
TimelineView::handleNewUserMessage(PendingMessage msg)
{
pending_msgs_.enqueue(msg);
if (pending_msgs_.size() == 1 && pending_sent_msgs_.size() == 0)
sendNextPendingMessage();
}
void
TimelineView::sendNextPendingMessage()
{
if (pending_msgs_.size() == 0)
return;
PendingMessage &m = pending_msgs_.head();
switch (m.ty) {
case matrix::events::MessageEventType::Image:
client_->sendRoomMessage(m.ty, m.txn_id, room_id_, QFileInfo(m.filename).fileName(), m.body);
break;
default:
client_->sendRoomMessage(m.ty, m.txn_id, room_id_, m.body);
break;
}
} }
void void
@ -562,8 +588,7 @@ TimelineView::notifyForLastEvent()
} }
bool bool
TimelineView::isPendingMessage(const QString &eventid, TimelineView::isPendingMessage(const QString &txnid,
const QString &body,
const QString &sender, const QString &sender,
const QString &local_userid) const QString &local_userid)
{ {
@ -571,7 +596,12 @@ TimelineView::isPendingMessage(const QString &eventid,
return false; return false;
for (const auto &msg : pending_msgs_) { for (const auto &msg : pending_msgs_) {
if (msg.event_id == eventid || msg.body == body) if (QString::number(msg.txn_id) == txnid)
return true;
}
for (const auto &msg : pending_sent_msgs_) {
if (QString::number(msg.txn_id) == txnid)
return true; return true;
} }
@ -579,14 +609,28 @@ TimelineView::isPendingMessage(const QString &eventid,
} }
void void
TimelineView::removePendingMessage(const QString &eventid, const QString &body) TimelineView::removePendingMessage(const QString &txnid)
{ {
for (auto it = pending_sent_msgs_.begin(); it != pending_sent_msgs_.end(); ++it) {
if (QString::number(it->txn_id) == txnid) {
int index = std::distance(pending_sent_msgs_.begin(), it);
pending_sent_msgs_.removeAt(index);
return;
}
}
for (auto it = pending_msgs_.begin(); it != pending_msgs_.end(); ++it) { for (auto it = pending_msgs_.begin(); it != pending_msgs_.end(); ++it) {
int index = std::distance(pending_msgs_.begin(), it); if (QString::number(it->txn_id) == txnid) {
int index = std::distance(pending_msgs_.begin(), it);
if (it->event_id == eventid || it->body == body) {
pending_msgs_.removeAt(index); pending_msgs_.removeAt(index);
break; return;
} }
} }
} }
void
TimelineView::handleFailedMessage(int txnid)
{
Q_UNUSED(txnid);
// Note: We do this even if the message has already been echoed.
QTimer::singleShot(500, this, SLOT(sendNextPendingMessage()));
}

View file

@ -35,6 +35,10 @@ TimelineViewManager::TimelineViewManager(QSharedPointer<MatrixClient> client, QW
connect( connect(
client_.data(), &MatrixClient::messageSent, this, &TimelineViewManager::messageSent); client_.data(), &MatrixClient::messageSent, this, &TimelineViewManager::messageSent);
connect(
client_.data(), &MatrixClient::messageSendFailed,
this, &TimelineViewManager::messageSendFailed);
} }
TimelineViewManager::~TimelineViewManager() {} TimelineViewManager::~TimelineViewManager() {}
@ -51,28 +55,32 @@ TimelineViewManager::messageSent(const QString &event_id, const QString &roomid,
} }
void void
TimelineViewManager::sendTextMessage(const QString &msg) TimelineViewManager::messageSendFailed(const QString &roomid, int txn_id)
{
auto view = views_[roomid];
view->handleFailedMessage(txn_id);
}
void
TimelineViewManager::queueTextMessage(const QString &msg)
{ {
auto room_id = active_room_; auto room_id = active_room_;
auto view = views_[room_id]; auto view = views_[room_id];
view->addUserMessage(matrix::events::MessageEventType::Text, msg, client_->transactionId()); view->addUserMessage(matrix::events::MessageEventType::Text, msg);
client_->sendRoomMessage(matrix::events::MessageEventType::Text, room_id, msg);
} }
void void
TimelineViewManager::sendEmoteMessage(const QString &msg) TimelineViewManager::queueEmoteMessage(const QString &msg)
{ {
auto room_id = active_room_; auto room_id = active_room_;
auto view = views_[room_id]; auto view = views_[room_id];
view->addUserMessage( view->addUserMessage(matrix::events::MessageEventType::Emote, msg);
matrix::events::MessageEventType::Emote, msg, client_->transactionId());
client_->sendRoomMessage(matrix::events::MessageEventType::Emote, room_id, msg);
} }
void void
TimelineViewManager::sendImageMessage(const QString &roomid, TimelineViewManager::queueImageMessage(const QString &roomid,
const QString &filename, const QString &filename,
const QString &url) const QString &url)
{ {
@ -83,9 +91,7 @@ TimelineViewManager::sendImageMessage(const QString &roomid,
auto view = views_[roomid]; auto view = views_[roomid];
view->addUserMessage(url, filename, client_->transactionId()); view->addUserMessage(url, filename);
client_->sendRoomMessage(
matrix::events::MessageEventType::Image, roomid, QFileInfo(filename).fileName(), url);
} }
void void