#include "EventStore.h" #include #include #include "Cache.h" #include "Cache_p.h" #include "EventAccessors.h" #include "Logging.h" #include "MatrixClient.h" #include "Olm.h" Q_DECLARE_METATYPE(Reaction) QCache EventStore::decryptedEvents_{ 1000}; QCache EventStore::events_by_id_{ 1000}; QCache EventStore::events_{1000}; EventStore::EventStore(std::string room_id, QObject *) : room_id_(std::move(room_id)) { static auto reactionType = qRegisterMetaType(); (void)reactionType; auto range = cache::client()->getTimelineRange(room_id_); if (range) { this->first = range->first; this->last = range->last; } connect( this, &EventStore::eventFetched, this, [this](std::string id, std::string relatedTo, mtx::events::collections::TimelineEvents timeline) { cache::client()->storeEvent(room_id_, id, {timeline}); if (!relatedTo.empty()) { auto idx = idToIndex(relatedTo); if (idx) emit dataChanged(*idx, *idx); } }, Qt::QueuedConnection); connect( this, &EventStore::oldMessagesRetrieved, this, [this](const mtx::responses::Messages &res) { // uint64_t newFirst = cache::client()->saveOldMessages(room_id_, res); if (newFirst == first) fetchMore(); else { emit beginInsertRows(toExternalIdx(newFirst), toExternalIdx(this->first - 1)); this->first = newFirst; emit endInsertRows(); emit fetchedMore(); } }, Qt::QueuedConnection); connect(this, &EventStore::processPending, this, [this]() { if (!current_txn.empty()) { nhlog::ui()->debug("Already processing {}", current_txn); return; } auto event = cache::client()->firstPendingMessage(room_id_); if (!event) { nhlog::ui()->debug("No event to send"); return; } std::visit( [this](auto e) { auto txn_id = e.event_id; this->current_txn = txn_id; if (txn_id.empty() || txn_id[0] != 'm') { nhlog::ui()->debug("Invalid txn id '{}'", txn_id); cache::client()->removePendingStatus(room_id_, txn_id); return; } if constexpr (mtx::events::message_content_to_type != mtx::events::EventType::Unsupported) http::client()->send_room_message( room_id_, txn_id, e.content, [this, txn_id](const mtx::responses::EventId &event_id, mtx::http::RequestErr err) { if (err) { const int status_code = static_cast(err->status_code); nhlog::net()->warn( "[{}] failed to send message: {} {}", txn_id, err->matrix_error.error, status_code); emit messageFailed(txn_id); return; } emit messageSent(txn_id, event_id.event_id.to_string()); }); }, event->data); }); connect( this, &EventStore::messageFailed, this, [this](std::string txn_id) { if (current_txn == txn_id) { current_txn_error_count++; if (current_txn_error_count > 10) { nhlog::ui()->debug("failing txn id '{}'", txn_id); cache::client()->removePendingStatus(room_id_, txn_id); current_txn_error_count = 0; } } QTimer::singleShot(1000, this, [this]() { nhlog::ui()->debug("timeout"); this->current_txn = ""; emit processPending(); }); }, Qt::QueuedConnection); connect( this, &EventStore::messageSent, this, [this](std::string txn_id, std::string event_id) { nhlog::ui()->debug("sent {}", txn_id); http::client()->read_event( room_id_, event_id, [this, event_id](mtx::http::RequestErr err) { if (err) { nhlog::net()->warn( "failed to read_event ({}, {})", room_id_, event_id); } }); cache::client()->removePendingStatus(room_id_, txn_id); this->current_txn = ""; this->current_txn_error_count = 0; emit processPending(); }, Qt::QueuedConnection); } void EventStore::addPending(mtx::events::collections::TimelineEvents event) { if (this->thread() != QThread::currentThread()) nhlog::db()->warn("{} called from a different thread!", __func__); cache::client()->savePendingMessage(this->room_id_, {event}); mtx::responses::Timeline events; events.limited = false; events.events.emplace_back(event); handleSync(events); emit processPending(); } void EventStore::handleSync(const mtx::responses::Timeline &events) { if (this->thread() != QThread::currentThread()) nhlog::db()->warn("{} called from a different thread!", __func__); auto range = cache::client()->getTimelineRange(room_id_); if (!range) return; if (events.limited) { emit beginResetModel(); this->last = range->last; this->first = range->first; emit endResetModel(); } else if (range->last > this->last) { emit beginInsertRows(toExternalIdx(this->last + 1), toExternalIdx(range->last)); this->last = range->last; emit endInsertRows(); } for (const auto &event : events.events) { std::string relates_to; if (auto redaction = std::get_if>( &event)) { // fixup reactions auto redacted = events_by_id_.object({room_id_, redaction->redacts}); if (redacted) { auto id = mtx::accessors::relates_to_event_id(*redacted); if (!id.empty()) { auto idx = idToIndex(id); if (idx) { events_by_id_.remove( {room_id_, redaction->redacts}); events_.remove({room_id_, toInternalIdx(*idx)}); emit dataChanged(*idx, *idx); } } } relates_to = redaction->redacts; } else if (auto reaction = std::get_if>( &event)) { relates_to = reaction->content.relates_to.event_id; } else { relates_to = mtx::accessors::in_reply_to_event(event); } if (!relates_to.empty()) { auto idx = cache::client()->getTimelineIndex(room_id_, relates_to); if (idx) { events_by_id_.remove({room_id_, relates_to}); decryptedEvents_.remove({room_id_, relates_to}); events_.remove({room_id_, *idx}); emit dataChanged(toExternalIdx(*idx), toExternalIdx(*idx)); } } if (auto txn_id = mtx::accessors::transaction_id(event); !txn_id.empty()) { auto idx = cache::client()->getTimelineIndex( room_id_, mtx::accessors::event_id(event)); if (idx) { Index index{room_id_, *idx}; events_.remove(index); emit dataChanged(toExternalIdx(*idx), toExternalIdx(*idx)); } } } } QVariantList EventStore::reactions(const std::string &event_id) { auto event_ids = cache::client()->relatedEvents(room_id_, event_id); struct TempReaction { int count = 0; std::vector users; std::string reactedBySelf; }; std::map aggregation; std::vector reactions; auto self = http::client()->user_id().to_string(); for (const auto &id : event_ids) { auto related_event = event(id, event_id); if (!related_event) continue; if (auto reaction = std::get_if>( related_event)) { auto &agg = aggregation[reaction->content.relates_to.key]; if (agg.count == 0) { Reaction temp{}; temp.key_ = QString::fromStdString(reaction->content.relates_to.key); reactions.push_back(temp); } agg.count++; agg.users.push_back(cache::displayName(room_id_, reaction->sender)); if (reaction->sender == self) agg.reactedBySelf = reaction->event_id; } } QVariantList temp; for (auto &reaction : reactions) { const auto &agg = aggregation[reaction.key_.toStdString()]; reaction.count_ = agg.count; reaction.selfReactedEvent_ = QString::fromStdString(agg.reactedBySelf); bool first = true; for (const auto &user : agg.users) { if (first) first = false; else reaction.users_ += ", "; reaction.users_ += QString::fromStdString(user); } nhlog::db()->debug("key: {}, count: {}, users: {}", reaction.key_.toStdString(), reaction.count_, reaction.users_.toStdString()); temp.append(QVariant::fromValue(reaction)); } return temp; } mtx::events::collections::TimelineEvents * EventStore::event(int idx, bool decrypt) { if (this->thread() != QThread::currentThread()) nhlog::db()->warn("{} called from a different thread!", __func__); Index index{room_id_, toInternalIdx(idx)}; if (index.idx > last || index.idx < first) return nullptr; auto event_ptr = events_.object(index); if (!event_ptr) { auto event_id = cache::client()->getTimelineEventId(room_id_, index.idx); if (!event_id) return nullptr; auto event = cache::client()->getEvent(room_id_, *event_id); if (!event) return nullptr; else event_ptr = new mtx::events::collections::TimelineEvents(std::move(event->data)); events_.insert(index, event_ptr); } if (decrypt) if (auto encrypted = std::get_if>( event_ptr)) return decryptEvent({room_id_, encrypted->event_id}, *encrypted); return event_ptr; } std::optional EventStore::idToIndex(std::string_view id) const { if (this->thread() != QThread::currentThread()) nhlog::db()->warn("{} called from a different thread!", __func__); auto idx = cache::client()->getTimelineIndex(room_id_, id); if (idx) return toExternalIdx(*idx); else return std::nullopt; } std::optional EventStore::indexToId(int idx) const { if (this->thread() != QThread::currentThread()) nhlog::db()->warn("{} called from a different thread!", __func__); return cache::client()->getTimelineEventId(room_id_, toInternalIdx(idx)); } mtx::events::collections::TimelineEvents * EventStore::decryptEvent(const IdIndex &idx, const mtx::events::EncryptedEvent &e) { if (auto cachedEvent = decryptedEvents_.object(idx)) return cachedEvent; MegolmSessionIndex index; index.room_id = room_id_; index.session_id = e.content.session_id; index.sender_key = e.content.sender_key; mtx::events::RoomEvent dummy; dummy.origin_server_ts = e.origin_server_ts; dummy.event_id = e.event_id; dummy.sender = e.sender; dummy.content.body = tr("-- Encrypted Event (No keys found for decryption) --", "Placeholder, when the message was not decrypted yet or can't be decrypted.") .toStdString(); auto asCacheEntry = [&idx](mtx::events::collections::TimelineEvents &&event) { auto event_ptr = new mtx::events::collections::TimelineEvents(std::move(event)); decryptedEvents_.insert(idx, event_ptr); return event_ptr; }; try { if (!cache::client()->inboundMegolmSessionExists(index)) { nhlog::crypto()->info("Could not find inbound megolm session ({}, {}, {})", index.room_id, index.session_id, e.sender); // TODO: request megolm session_id & session_key from the sender. return asCacheEntry(std::move(dummy)); } } catch (const lmdb::error &e) { nhlog::db()->critical("failed to check megolm session's existence: {}", e.what()); dummy.content.body = tr("-- Decryption Error (failed to communicate with DB) --", "Placeholder, when the message can't be decrypted, because " "the DB access failed when trying to lookup the session.") .toStdString(); return asCacheEntry(std::move(dummy)); } std::string msg_str; try { auto session = cache::client()->getInboundMegolmSession(index); auto res = olm::client()->decrypt_group_message(session, e.content.ciphertext); msg_str = std::string((char *)res.data.data(), res.data.size()); } catch (const lmdb::error &e) { nhlog::db()->critical("failed to retrieve megolm session with index ({}, {}, {})", index.room_id, index.session_id, index.sender_key, e.what()); dummy.content.body = tr("-- Decryption Error (failed to retrieve megolm keys from db) --", "Placeholder, when the message can't be decrypted, because the DB access " "failed.") .toStdString(); return asCacheEntry(std::move(dummy)); } catch (const mtx::crypto::olm_exception &e) { nhlog::crypto()->critical("failed to decrypt message with index ({}, {}, {}): {}", index.room_id, index.session_id, index.sender_key, e.what()); dummy.content.body = tr("-- Decryption Error (%1) --", "Placeholder, when the message can't be decrypted. In this case, the Olm " "decrytion returned an error, which is passed as %1.") .arg(e.what()) .toStdString(); return asCacheEntry(std::move(dummy)); } // Add missing fields for the event. json body = json::parse(msg_str); body["event_id"] = e.event_id; body["sender"] = e.sender; body["origin_server_ts"] = e.origin_server_ts; body["unsigned"] = e.unsigned_data; // relations are unencrypted in content... if (json old_ev = e; old_ev["content"].count("m.relates_to") != 0) body["content"]["m.relates_to"] = old_ev["content"]["m.relates_to"]; json event_array = json::array(); event_array.push_back(body); std::vector temp_events; mtx::responses::utils::parse_timeline_events(event_array, temp_events); if (temp_events.size() == 1) { auto encInfo = mtx::accessors::file(temp_events[0]); if (encInfo) emit newEncryptedImage(encInfo.value()); return asCacheEntry(std::move(temp_events[0])); } dummy.content.body = tr("-- Encrypted Event (Unknown event type) --", "Placeholder, when the message was decrypted, but we couldn't parse it, because " "Nheko/mtxclient don't support that event type yet.") .toStdString(); return asCacheEntry(std::move(dummy)); } mtx::events::collections::TimelineEvents * EventStore::event(std::string_view id, std::string_view related_to, bool decrypt) { if (this->thread() != QThread::currentThread()) nhlog::db()->warn("{} called from a different thread!", __func__); if (id.empty()) return nullptr; IdIndex index{room_id_, std::string(id.data(), id.size())}; auto event_ptr = events_by_id_.object(index); if (!event_ptr) { auto event = cache::client()->getEvent(room_id_, index.id); if (!event) { http::client()->get_event( room_id_, index.id, [this, relatedTo = std::string(related_to.data(), related_to.size()), id = index.id](const mtx::events::collections::TimelineEvents &timeline, mtx::http::RequestErr err) { if (err) { nhlog::net()->error( "Failed to retrieve event with id {}, which was " "requested to show the replyTo for event {}", relatedTo, id); return; } emit eventFetched(id, relatedTo, timeline); }); return nullptr; } event_ptr = new mtx::events::collections::TimelineEvents(std::move(event->data)); events_by_id_.insert(index, event_ptr); } if (decrypt) if (auto encrypted = std::get_if>( event_ptr)) return decryptEvent(index, *encrypted); return event_ptr; } void EventStore::fetchMore() { mtx::http::MessagesOpts opts; opts.room_id = room_id_; opts.from = cache::client()->previousBatchToken(room_id_); nhlog::ui()->debug("Paginating room {}, token {}", opts.room_id, opts.from); http::client()->messages( opts, [this, opts](const mtx::responses::Messages &res, mtx::http::RequestErr err) { if (err) { nhlog::net()->error("failed to call /messages ({}): {} - {} - {}", opts.room_id, mtx::errors::to_string(err->matrix_error.errcode), err->matrix_error.error, err->parse_error); emit fetchedMore(); return; } emit oldMessagesRetrieved(std::move(res)); }); }