Add some event expiration function

This commit is contained in:
Nicolas Werner 2023-07-04 18:01:44 +02:00
parent bab5dd9fdb
commit dcb6c00708
No known key found for this signature in database
GPG key ID: C8D75E610773F2D9
2 changed files with 202 additions and 0 deletions

View file

@ -22,6 +22,7 @@
#include <array>
#include <cmath>
#include <mtx/responses/messages.hpp>
#include <unordered_set>
#include <variant>
@ -1604,3 +1605,201 @@ utils::updateSpaceVias()
ApplySpaceUpdatesState::next(std::move(asus));
}
std::atomic<bool> event_expiration_running = false;
void
utils::removeExpiredEvents()
{
// TODO(Nico): Add its own toggle...
if (!UserSettings::instance()->updateSpaceVias())
return;
if (event_expiration_running.exchange(true)) {
nhlog::net()->info("Event expiration still running, not starting second job.");
return;
}
nhlog::net()->info("Remove expired events starting.");
auto rooms = cache::roomInfo(false);
auto us = http::client()->user_id().to_string();
using ExpType =
mtx::events::AccountDataEvent<mtx::events::account_data::nheko_extensions::EventExpiry>;
static auto getExpEv = [](const std::string &room = "") -> std::optional<ExpType> {
if (auto accountEvent =
cache::client()->getAccountData(mtx::events::EventType::NhekoEventExpiry, room))
if (auto ev = std::get_if<ExpType>(&*accountEvent);
ev && (ev->content.expire_after_ms || ev->content.keep_only_latest))
return std::optional{*ev};
return std::nullopt;
};
struct ApplyEventExpiration
{
std::optional<ExpType> globalExpiry;
std::vector<std::string> roomsToUpdate;
std::string filter;
std::string currentRoom;
std::uint64_t currentRoomCount = 0;
std::string currentRoomPrevToken;
std::vector<std::string> currentRoomRedactionQueue;
mtx::events::account_data::nheko_extensions::EventExpiry currentExpiry;
static void next(std::shared_ptr<ApplyEventExpiration> state)
{
if (!state->currentRoomRedactionQueue.empty()) {
http::client()->redact_event(
state->currentRoom,
state->currentRoomRedactionQueue.back(),
[state = std::move(state)](const mtx::responses::EventId &,
mtx::http::RequestErr e) mutable {
const auto &event_id = state->currentRoomRedactionQueue.back();
if (e) {
if (e->status_code == 429 && e->matrix_error.retry_after.count() != 0) {
ChatPage::instance()->callFunctionOnGuiThread(
[state = std::move(state),
interval = e->matrix_error.retry_after]() {
QTimer::singleShot(interval,
ChatPage::instance(),
[self = std::move(state)]() mutable {
next(std::move(self));
});
});
return;
}
nhlog::net()->error("Failed to redact event {} in {}: {}",
event_id,
state->currentRoom,
*e);
}
nhlog::net()->info(
"Redacted event {} in {}: {}", event_id, state->currentRoom, *e);
state->currentRoomRedactionQueue.pop_back();
next(std::move(state));
});
} else if (!state->currentRoom.empty()) {
mtx::http::MessagesOpts opts{};
opts.dir = mtx::http::PaginationDirection::Backwards;
opts.from = state->currentRoomPrevToken;
opts.limit = 1000;
opts.filter = state->filter;
http::client()->messages(
opts,
[state = std::move(state)](const mtx::responses::Messages &msgs,
mtx::http::RequestErr e) mutable {
if (e || msgs.chunk.empty()) {
state->currentRoom.clear();
state->currentRoomCount = 0;
state->currentRoomPrevToken.clear();
} else {
if (!msgs.end.empty())
state->currentRoomPrevToken = msgs.end;
auto now = (uint64_t)QDateTime::currentMSecsSinceEpoch();
auto us = http::client()->user_id().to_string();
for (const auto &e : msgs.chunk) {
if (std::holds_alternative<
mtx::events::RedactionEvent<mtx::events::msg::Redaction>>(e))
continue;
if (mtx::accessors::sender(e) != us)
continue;
state->currentRoomCount++;
if (state->currentRoomCount <= state->currentExpiry.protect_latest) {
continue;
}
if (state->currentExpiry.exclude_state_events &&
mtx::accessors::is_state_event(e))
continue;
if (state->currentExpiry.keep_only_latest &&
state->currentRoomCount > state->currentExpiry.keep_only_latest) {
state->currentRoomRedactionQueue.push_back(
mtx::accessors::event_id(e));
} else if (state->currentExpiry.expire_after_ms &&
(state->currentExpiry.expire_after_ms +
mtx::accessors::origin_server_ts(e).toMSecsSinceEpoch()) <
now) {
state->currentRoomRedactionQueue.push_back(
mtx::accessors::event_id(e));
}
}
}
if (msgs.end.empty() && state->currentRoomRedactionQueue.empty()) {
state->currentRoom.clear();
state->currentRoomCount = 0;
state->currentRoomPrevToken.clear();
}
next(std::move(state));
});
} else if (!state->roomsToUpdate.empty()) {
const auto &room = state->roomsToUpdate.back();
auto localExp = getExpEv(room);
if (localExp) {
state->currentRoom = room;
state->currentExpiry = localExp->content;
} else if (state->globalExpiry) {
state->currentRoom = room;
state->currentExpiry = state->globalExpiry->content;
}
state->roomsToUpdate.pop_back();
next(std::move(state));
} else {
nhlog::net()->info("Finished event expiry");
event_expiration_running = false;
}
}
};
auto asus = std::make_shared<ApplyEventExpiration>();
asus->filter =
nlohmann::json{
"room",
nlohmann::json::object({
{
"timeline",
nlohmann::json::object({
{"senders", nlohmann::json::array({us})},
{"not_types", nlohmann::json::array({"m.room.redaction"})},
}),
},
}),
}
.dump();
asus->globalExpiry = getExpEv();
for (const auto &[roomid_, info] : rooms.toStdMap()) {
auto roomid = roomid_.toStdString();
if (!asus->globalExpiry && !getExpEv(roomid))
continue;
if (auto pl = cache::client()
->getStateEvent<mtx::events::state::PowerLevels>(roomid)
.value_or(mtx::events::StateEvent<mtx::events::state::PowerLevels>{})
.content;
pl.user_level(us) < pl.event_level(to_string(mtx::events::EventType::RoomRedaction))) {
nhlog::net()->warn("Can't react events in {}, not running expiration.", roomid);
continue;
}
asus->roomsToUpdate.push_back(roomid);
}
nhlog::db()->info("Running expiration in {} rooms", asus->roomsToUpdate.size());
ApplyEventExpiration::next(std::move(asus));
}

View file

@ -339,4 +339,7 @@ roomVias(const std::string &roomid);
void
updateSpaceVias();
void
removeExpiredEvents();
}