From dcb6c007087e4689b202e4d2fc051a9a9a31c4b0 Mon Sep 17 00:00:00 2001 From: Nicolas Werner Date: Tue, 4 Jul 2023 18:01:44 +0200 Subject: [PATCH] Add some event expiration function --- src/Utils.cpp | 199 ++++++++++++++++++++++++++++++++++++++++++++++++++ src/Utils.h | 3 + 2 files changed, 202 insertions(+) diff --git a/src/Utils.cpp b/src/Utils.cpp index 8ff8cec6..7a412db0 100644 --- a/src/Utils.cpp +++ b/src/Utils.cpp @@ -22,6 +22,7 @@ #include #include +#include #include #include @@ -1604,3 +1605,201 @@ utils::updateSpaceVias() ApplySpaceUpdatesState::next(std::move(asus)); } + +std::atomic event_expiration_running = false; +void +utils::removeExpiredEvents() +{ + // TODO(Nico): Add its own toggle... + if (!UserSettings::instance()->updateSpaceVias()) + return; + + if (event_expiration_running.exchange(true)) { + nhlog::net()->info("Event expiration still running, not starting second job."); + return; + } + + nhlog::net()->info("Remove expired events starting."); + + auto rooms = cache::roomInfo(false); + + auto us = http::client()->user_id().to_string(); + + using ExpType = + mtx::events::AccountDataEvent; + static auto getExpEv = [](const std::string &room = "") -> std::optional { + if (auto accountEvent = + cache::client()->getAccountData(mtx::events::EventType::NhekoEventExpiry, room)) + if (auto ev = std::get_if(&*accountEvent); + ev && (ev->content.expire_after_ms || ev->content.keep_only_latest)) + return std::optional{*ev}; + return std::nullopt; + }; + + struct ApplyEventExpiration + { + std::optional globalExpiry; + std::vector roomsToUpdate; + std::string filter; + + std::string currentRoom; + std::uint64_t currentRoomCount = 0; + std::string currentRoomPrevToken; + std::vector currentRoomRedactionQueue; + mtx::events::account_data::nheko_extensions::EventExpiry currentExpiry; + + static void next(std::shared_ptr state) + { + if (!state->currentRoomRedactionQueue.empty()) { + http::client()->redact_event( + state->currentRoom, + state->currentRoomRedactionQueue.back(), + [state = std::move(state)](const mtx::responses::EventId &, + mtx::http::RequestErr e) mutable { + const auto &event_id = state->currentRoomRedactionQueue.back(); + if (e) { + if (e->status_code == 429 && e->matrix_error.retry_after.count() != 0) { + ChatPage::instance()->callFunctionOnGuiThread( + [state = std::move(state), + interval = e->matrix_error.retry_after]() { + QTimer::singleShot(interval, + ChatPage::instance(), + [self = std::move(state)]() mutable { + next(std::move(self)); + }); + }); + return; + } + + nhlog::net()->error("Failed to redact event {} in {}: {}", + event_id, + state->currentRoom, + *e); + } + nhlog::net()->info( + "Redacted event {} in {}: {}", event_id, state->currentRoom, *e); + state->currentRoomRedactionQueue.pop_back(); + next(std::move(state)); + }); + } else if (!state->currentRoom.empty()) { + mtx::http::MessagesOpts opts{}; + opts.dir = mtx::http::PaginationDirection::Backwards; + opts.from = state->currentRoomPrevToken; + opts.limit = 1000; + opts.filter = state->filter; + + http::client()->messages( + opts, + [state = std::move(state)](const mtx::responses::Messages &msgs, + mtx::http::RequestErr e) mutable { + if (e || msgs.chunk.empty()) { + state->currentRoom.clear(); + state->currentRoomCount = 0; + state->currentRoomPrevToken.clear(); + } else { + if (!msgs.end.empty()) + state->currentRoomPrevToken = msgs.end; + + auto now = (uint64_t)QDateTime::currentMSecsSinceEpoch(); + auto us = http::client()->user_id().to_string(); + + for (const auto &e : msgs.chunk) { + if (std::holds_alternative< + mtx::events::RedactionEvent>(e)) + continue; + + if (mtx::accessors::sender(e) != us) + continue; + + state->currentRoomCount++; + if (state->currentRoomCount <= state->currentExpiry.protect_latest) { + continue; + } + + if (state->currentExpiry.exclude_state_events && + mtx::accessors::is_state_event(e)) + continue; + + if (state->currentExpiry.keep_only_latest && + state->currentRoomCount > state->currentExpiry.keep_only_latest) { + state->currentRoomRedactionQueue.push_back( + mtx::accessors::event_id(e)); + } else if (state->currentExpiry.expire_after_ms && + (state->currentExpiry.expire_after_ms + + mtx::accessors::origin_server_ts(e).toMSecsSinceEpoch()) < + now) { + state->currentRoomRedactionQueue.push_back( + mtx::accessors::event_id(e)); + } + } + } + + if (msgs.end.empty() && state->currentRoomRedactionQueue.empty()) { + state->currentRoom.clear(); + state->currentRoomCount = 0; + state->currentRoomPrevToken.clear(); + } + + next(std::move(state)); + }); + } else if (!state->roomsToUpdate.empty()) { + const auto &room = state->roomsToUpdate.back(); + + auto localExp = getExpEv(room); + if (localExp) { + state->currentRoom = room; + state->currentExpiry = localExp->content; + } else if (state->globalExpiry) { + state->currentRoom = room; + state->currentExpiry = state->globalExpiry->content; + } + state->roomsToUpdate.pop_back(); + next(std::move(state)); + } else { + nhlog::net()->info("Finished event expiry"); + event_expiration_running = false; + } + } + }; + + auto asus = std::make_shared(); + + asus->filter = + nlohmann::json{ + "room", + nlohmann::json::object({ + { + "timeline", + nlohmann::json::object({ + {"senders", nlohmann::json::array({us})}, + {"not_types", nlohmann::json::array({"m.room.redaction"})}, + }), + }, + }), + } + .dump(); + + asus->globalExpiry = getExpEv(); + + for (const auto &[roomid_, info] : rooms.toStdMap()) { + auto roomid = roomid_.toStdString(); + + if (!asus->globalExpiry && !getExpEv(roomid)) + continue; + + if (auto pl = cache::client() + ->getStateEvent(roomid) + .value_or(mtx::events::StateEvent{}) + .content; + pl.user_level(us) < pl.event_level(to_string(mtx::events::EventType::RoomRedaction))) { + nhlog::net()->warn("Can't react events in {}, not running expiration.", roomid); + continue; + } + + asus->roomsToUpdate.push_back(roomid); + } + + nhlog::db()->info("Running expiration in {} rooms", asus->roomsToUpdate.size()); + + ApplyEventExpiration::next(std::move(asus)); +} diff --git a/src/Utils.h b/src/Utils.h index af5ea340..83f2cad1 100644 --- a/src/Utils.h +++ b/src/Utils.h @@ -339,4 +339,7 @@ roomVias(const std::string &roomid); void updateSpaceVias(); + +void +removeExpiredEvents(); }