Add an early out cache for event expiration

This commit is contained in:
Nicolas Werner 2023-07-06 20:49:40 +02:00
parent 5f8e80cd84
commit 1abb52700a
No known key found for this signature in database
GPG key ID: C8D75E610773F2D9
4 changed files with 93 additions and 8 deletions

View file

@ -104,7 +104,7 @@ ApplicationWindow {
SpinBox {
Layout.alignment: Qt.AlignRight | Qt.AlignVCenter
from: 0
to: 1000
to: 1000000
stepSize: 1
value: eventExpiry.expireEventsAfterCount
onValueChanged: eventExpiry.expireEventsAfterCount = value
@ -126,7 +126,7 @@ ApplicationWindow {
SpinBox {
Layout.alignment: Qt.AlignRight | Qt.AlignVCenter
from: 0
to: 1000
to: 1000000
stepSize: 1
value: eventExpiry.protectLatestEvents
onValueChanged: eventExpiry.protectLatestEvents = value

View file

@ -82,6 +82,8 @@ static constexpr auto DEVICES_DB("devices");
static constexpr auto DEVICE_KEYS_DB("device_keys");
//! room_ids that have encryption enabled.
static constexpr auto ENCRYPTED_ROOMS_DB("encrypted_rooms");
//! Expiration progress for each room
static constexpr auto EVENT_EXPIRATION_BG_JOB_DB("event_expiration_bg_job");
//! room_id -> pickled OlmInboundGroupSession
static constexpr auto INBOUND_MEGOLM_SESSIONS_DB("inbound_megolm_sessions");
@ -327,7 +329,9 @@ Cache::setup()
megolmSessionDataDb_ = lmdb::dbi::open(txn, MEGOLM_SESSIONS_DATA_DB, MDB_CREATE);
// What rooms are encrypted
encryptedRooms_ = lmdb::dbi::open(txn, ENCRYPTED_ROOMS_DB, MDB_CREATE);
encryptedRooms_ = lmdb::dbi::open(txn, ENCRYPTED_ROOMS_DB, MDB_CREATE);
eventExpiryBgJob_ = lmdb::dbi::open(txn, EVENT_EXPIRATION_BG_JOB_DB, MDB_CREATE);
[[maybe_unused]] auto verificationDb = getVerificationDb(txn);
[[maybe_unused]] auto userKeysDb = getUserKeysDb(txn);
@ -584,6 +588,39 @@ Cache::pickleSecret()
return pickle_secret_;
}
void
Cache::storeEventExpirationProgress(const std::string &room,
const std::string &expirationSettings,
const std::string &stopMarker)
{
nlohmann::json j;
j["s"] = expirationSettings;
j["m"] = stopMarker;
auto txn = lmdb::txn::begin(env_);
eventExpiryBgJob_.put(txn, room, j.dump());
txn.commit();
}
std::string
Cache::loadEventExpirationProgress(const std::string &room, const std::string &expirationSettings)
{
try {
auto txn = ro_txn(env_);
std::string_view data;
if (!eventExpiryBgJob_.get(txn, room, data))
return "";
auto j = nlohmann::json::parse(data);
if (j.value("s", "") == expirationSettings)
return j.value("m", "");
} catch (...) {
return "";
}
return "";
}
void
Cache::setEncryptedRoom(lmdb::txn &txn, const std::string &room_id)
{

View file

@ -87,6 +87,13 @@ public:
//! Retrieve if the room is tombstoned (closed or replaced by a different room)
bool getRoomIsTombstoned(lmdb::txn &txn, lmdb::dbi &statesdb);
// for the event expiry background job
void storeEventExpirationProgress(const std::string &room,
const std::string &expirationSettings,
const std::string &stopMarker);
std::string
loadEventExpirationProgress(const std::string &room, const std::string &expirationSettings);
//! Get a specific state event
template<typename T>
std::optional<mtx::events::StateEvent<T>>
@ -714,6 +721,8 @@ private:
lmdb::dbi encryptedRooms_;
lmdb::dbi eventExpiryBgJob_;
QString localUserId_;
QString cacheDirectory_;

View file

@ -1467,7 +1467,7 @@ utils::updateSpaceVias()
ChatPage::instance()->callFunctionOnGuiThread(
[state = std::move(state),
interval = e->matrix_error.retry_after]() {
QTimer::singleShot(interval,
QTimer::singleShot(interval * 3,
ChatPage::instance(),
[self = std::move(state)]() mutable {
next(std::move(self));
@ -1502,7 +1502,7 @@ utils::updateSpaceVias()
ChatPage::instance()->callFunctionOnGuiThread(
[state = std::move(state),
interval = e->matrix_error.retry_after]() {
QTimer::singleShot(interval,
QTimer::singleShot(interval * 3,
ChatPage::instance(),
[self = std::move(state)]() mutable {
next(std::move(self));
@ -1644,9 +1644,19 @@ utils::removeExpiredEvents()
std::string currentRoom;
bool firstMessagesCall = true;
std::uint64_t currentRoomCount = 0;
// batch token for pagination
std::string currentRoomPrevToken;
// event id of an event redacted in a previous run
std::string currentRoomStopAt;
// event id of first event redacted in the current run, hoping that the order stays the
// same.
std::string currentRoomFirstRedactedEvent;
// (evtype,state_key) tuple to keep the latest state event of each.
std::set<std::pair<std::string, std::string>> currentRoomStateEvents;
// event ids pending redaction
std::vector<std::string> currentRoomRedactionQueue;
mtx::events::account_data::nheko_extensions::EventExpiry currentExpiry;
static void next(std::shared_ptr<ApplyEventExpiration> state)
@ -1664,7 +1674,8 @@ utils::removeExpiredEvents()
ChatPage::instance()->callFunctionOnGuiThread(
[state = std::move(state),
interval = e->matrix_error.retry_after]() {
QTimer::singleShot(interval,
// triple interval to allow other traffic as well
QTimer::singleShot(interval * 3,
ChatPage::instance(),
[self = std::move(state)]() mutable {
next(std::move(self));
@ -1681,6 +1692,10 @@ utils::removeExpiredEvents()
}
} else {
nhlog::net()->info("Redacted event {} in {}", evid, state->currentRoom);
if (state->currentRoomFirstRedactedEvent.empty())
state->currentRoomFirstRedactedEvent = evid;
state->currentRoomRedactionQueue.pop_back();
next(std::move(state));
}
@ -1688,6 +1703,13 @@ utils::removeExpiredEvents()
} else if (!state->currentRoom.empty()) {
if (state->currentRoomPrevToken.empty() && !state->firstMessagesCall) {
nhlog::net()->info("Finished room {}", state->currentRoom);
if (!state->currentRoomFirstRedactedEvent.empty())
cache::client()->storeEventExpirationProgress(
state->currentRoom,
nlohmann::json(state->currentExpiry).dump(),
state->currentRoomFirstRedactedEvent);
state->currentRoom.clear();
next(std::move(state));
return;
@ -1708,7 +1730,7 @@ utils::removeExpiredEvents()
mtx::http::RequestErr error) mutable {
if (error) {
// skip success handler
nhlog::net()->info(
nhlog::net()->warn(
"Finished room {} with error {}", state->currentRoom, *error);
state->currentRoom.clear();
} else if (msgs.chunk.empty()) {
@ -1725,8 +1747,22 @@ utils::removeExpiredEvents()
continue;
if (std::holds_alternative<
mtx::events::RoomEvent<mtx::events::msg::Redacted>>(e))
mtx::events::RoomEvent<mtx::events::msg::Redacted>>(e) ||
std::holds_alternative<
mtx::events::StateEvent<mtx::events::msg::Redacted>>(e)) {
if (!state->currentRoomStopAt.empty() &&
mtx::accessors::event_id(e) == state->currentRoomStopAt) {
// There is no filter to remove redacted events from
// pagination, so we try to stop early by caching what event
// we redacted last if we reached the end of a room.
nhlog::net()->info(
"Found previous redaction marker, stopping early: {}",
state->currentRoom);
state->currentRoomPrevToken.clear();
break;
}
continue;
}
if (std::holds_alternative<
mtx::events::StateEvent<mtx::events::msg::Redacted>>(e))
@ -1806,6 +1842,9 @@ utils::removeExpiredEvents()
state->currentRoomRedactionQueue.clear();
state->currentRoomStateEvents.clear();
state->currentRoomStopAt = cache::client()->loadEventExpirationProgress(
state->currentRoom, nlohmann::json(state->currentExpiry).dump());
state->roomsToUpdate.pop_back();
next(std::move(state));
} else {