diff --git a/resources/qml/dialogs/EventExpirationDialog.qml b/resources/qml/dialogs/EventExpirationDialog.qml index 5d12bda8..37268ca7 100644 --- a/resources/qml/dialogs/EventExpirationDialog.qml +++ b/resources/qml/dialogs/EventExpirationDialog.qml @@ -104,7 +104,7 @@ ApplicationWindow { SpinBox { Layout.alignment: Qt.AlignRight | Qt.AlignVCenter from: 0 - to: 1000 + to: 1000000 stepSize: 1 value: eventExpiry.expireEventsAfterCount onValueChanged: eventExpiry.expireEventsAfterCount = value @@ -126,7 +126,7 @@ ApplicationWindow { SpinBox { Layout.alignment: Qt.AlignRight | Qt.AlignVCenter from: 0 - to: 1000 + to: 1000000 stepSize: 1 value: eventExpiry.protectLatestEvents onValueChanged: eventExpiry.protectLatestEvents = value diff --git a/src/Cache.cpp b/src/Cache.cpp index 7a19cba4..3fe2892b 100644 --- a/src/Cache.cpp +++ b/src/Cache.cpp @@ -82,6 +82,8 @@ static constexpr auto DEVICES_DB("devices"); static constexpr auto DEVICE_KEYS_DB("device_keys"); //! room_ids that have encryption enabled. static constexpr auto ENCRYPTED_ROOMS_DB("encrypted_rooms"); +//! Expiration progress for each room +static constexpr auto EVENT_EXPIRATION_BG_JOB_DB("event_expiration_bg_job"); //! room_id -> pickled OlmInboundGroupSession static constexpr auto INBOUND_MEGOLM_SESSIONS_DB("inbound_megolm_sessions"); @@ -327,7 +329,9 @@ Cache::setup() megolmSessionDataDb_ = lmdb::dbi::open(txn, MEGOLM_SESSIONS_DATA_DB, MDB_CREATE); // What rooms are encrypted - encryptedRooms_ = lmdb::dbi::open(txn, ENCRYPTED_ROOMS_DB, MDB_CREATE); + encryptedRooms_ = lmdb::dbi::open(txn, ENCRYPTED_ROOMS_DB, MDB_CREATE); + eventExpiryBgJob_ = lmdb::dbi::open(txn, EVENT_EXPIRATION_BG_JOB_DB, MDB_CREATE); + [[maybe_unused]] auto verificationDb = getVerificationDb(txn); [[maybe_unused]] auto userKeysDb = getUserKeysDb(txn); @@ -584,6 +588,39 @@ Cache::pickleSecret() return pickle_secret_; } +void +Cache::storeEventExpirationProgress(const std::string &room, + const std::string &expirationSettings, + const std::string &stopMarker) +{ + nlohmann::json j; + j["s"] = expirationSettings; + j["m"] = stopMarker; + + auto txn = lmdb::txn::begin(env_); + eventExpiryBgJob_.put(txn, room, j.dump()); + txn.commit(); +} + +std::string +Cache::loadEventExpirationProgress(const std::string &room, const std::string &expirationSettings) + +{ + try { + auto txn = ro_txn(env_); + std::string_view data; + if (!eventExpiryBgJob_.get(txn, room, data)) + return ""; + + auto j = nlohmann::json::parse(data); + if (j.value("s", "") == expirationSettings) + return j.value("m", ""); + } catch (...) { + return ""; + } + return ""; +} + void Cache::setEncryptedRoom(lmdb::txn &txn, const std::string &room_id) { diff --git a/src/Cache_p.h b/src/Cache_p.h index 121e7e66..8d51c7c4 100644 --- a/src/Cache_p.h +++ b/src/Cache_p.h @@ -87,6 +87,13 @@ public: //! Retrieve if the room is tombstoned (closed or replaced by a different room) bool getRoomIsTombstoned(lmdb::txn &txn, lmdb::dbi &statesdb); + // for the event expiry background job + void storeEventExpirationProgress(const std::string &room, + const std::string &expirationSettings, + const std::string &stopMarker); + std::string + loadEventExpirationProgress(const std::string &room, const std::string &expirationSettings); + //! Get a specific state event template std::optional> @@ -714,6 +721,8 @@ private: lmdb::dbi encryptedRooms_; + lmdb::dbi eventExpiryBgJob_; + QString localUserId_; QString cacheDirectory_; diff --git a/src/Utils.cpp b/src/Utils.cpp index 20c6f7d1..0ea42a27 100644 --- a/src/Utils.cpp +++ b/src/Utils.cpp @@ -1467,7 +1467,7 @@ utils::updateSpaceVias() ChatPage::instance()->callFunctionOnGuiThread( [state = std::move(state), interval = e->matrix_error.retry_after]() { - QTimer::singleShot(interval, + QTimer::singleShot(interval * 3, ChatPage::instance(), [self = std::move(state)]() mutable { next(std::move(self)); @@ -1502,7 +1502,7 @@ utils::updateSpaceVias() ChatPage::instance()->callFunctionOnGuiThread( [state = std::move(state), interval = e->matrix_error.retry_after]() { - QTimer::singleShot(interval, + QTimer::singleShot(interval * 3, ChatPage::instance(), [self = std::move(state)]() mutable { next(std::move(self)); @@ -1644,9 +1644,19 @@ utils::removeExpiredEvents() std::string currentRoom; bool firstMessagesCall = true; std::uint64_t currentRoomCount = 0; + + // batch token for pagination std::string currentRoomPrevToken; + // event id of an event redacted in a previous run + std::string currentRoomStopAt; + // event id of first event redacted in the current run, hoping that the order stays the + // same. + std::string currentRoomFirstRedactedEvent; + // (evtype,state_key) tuple to keep the latest state event of each. std::set> currentRoomStateEvents; + // event ids pending redaction std::vector currentRoomRedactionQueue; + mtx::events::account_data::nheko_extensions::EventExpiry currentExpiry; static void next(std::shared_ptr state) @@ -1664,7 +1674,8 @@ utils::removeExpiredEvents() ChatPage::instance()->callFunctionOnGuiThread( [state = std::move(state), interval = e->matrix_error.retry_after]() { - QTimer::singleShot(interval, + // triple interval to allow other traffic as well + QTimer::singleShot(interval * 3, ChatPage::instance(), [self = std::move(state)]() mutable { next(std::move(self)); @@ -1681,6 +1692,10 @@ utils::removeExpiredEvents() } } else { nhlog::net()->info("Redacted event {} in {}", evid, state->currentRoom); + + if (state->currentRoomFirstRedactedEvent.empty()) + state->currentRoomFirstRedactedEvent = evid; + state->currentRoomRedactionQueue.pop_back(); next(std::move(state)); } @@ -1688,6 +1703,13 @@ utils::removeExpiredEvents() } else if (!state->currentRoom.empty()) { if (state->currentRoomPrevToken.empty() && !state->firstMessagesCall) { nhlog::net()->info("Finished room {}", state->currentRoom); + + if (!state->currentRoomFirstRedactedEvent.empty()) + cache::client()->storeEventExpirationProgress( + state->currentRoom, + nlohmann::json(state->currentExpiry).dump(), + state->currentRoomFirstRedactedEvent); + state->currentRoom.clear(); next(std::move(state)); return; @@ -1708,7 +1730,7 @@ utils::removeExpiredEvents() mtx::http::RequestErr error) mutable { if (error) { // skip success handler - nhlog::net()->info( + nhlog::net()->warn( "Finished room {} with error {}", state->currentRoom, *error); state->currentRoom.clear(); } else if (msgs.chunk.empty()) { @@ -1725,8 +1747,22 @@ utils::removeExpiredEvents() continue; if (std::holds_alternative< - mtx::events::RoomEvent>(e)) + mtx::events::RoomEvent>(e) || + std::holds_alternative< + mtx::events::StateEvent>(e)) { + if (!state->currentRoomStopAt.empty() && + mtx::accessors::event_id(e) == state->currentRoomStopAt) { + // There is no filter to remove redacted events from + // pagination, so we try to stop early by caching what event + // we redacted last if we reached the end of a room. + nhlog::net()->info( + "Found previous redaction marker, stopping early: {}", + state->currentRoom); + state->currentRoomPrevToken.clear(); + break; + } continue; + } if (std::holds_alternative< mtx::events::StateEvent>(e)) @@ -1806,6 +1842,9 @@ utils::removeExpiredEvents() state->currentRoomRedactionQueue.clear(); state->currentRoomStateEvents.clear(); + state->currentRoomStopAt = cache::client()->loadEventExpirationProgress( + state->currentRoom, nlohmann::json(state->currentExpiry).dump()); + state->roomsToUpdate.pop_back(); next(std::move(state)); } else {