mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-14 11:58:02 +00:00
chore: split RecordExpiry preemptive and non-preemptive flows (#4252)
* add FiberGuard to RecordExpiry for non-preemptive flows --------- Signed-off-by: kostas <kostas@dragonflydb.io>
This commit is contained in:
parent
892a415201
commit
d8fda40d4d
5 changed files with 20 additions and 12 deletions
|
@ -190,7 +190,7 @@ unsigned PrimeEvictionPolicy::Evict(const PrimeTable::HotspotBuckets& eb, PrimeT
|
|||
|
||||
// log the evicted keys to journal.
|
||||
if (auto journal = db_slice_->shard_owner()->journal(); journal) {
|
||||
RecordExpiry(cntx_.db_index, key);
|
||||
RecordExpiry(cntx_.db_index, key, false);
|
||||
}
|
||||
// Safe we already acquired util::fb2::LockGuard lk(db_slice_->GetSerializationMutex());
|
||||
// on the flows that call this function
|
||||
|
@ -450,7 +450,7 @@ OpResult<DbSlice::PrimeItAndExp> DbSlice::FindInternal(const Context& cntx, std:
|
|||
}
|
||||
|
||||
if (res.it->second.HasExpire()) { // check expiry state
|
||||
res = ExpireIfNeeded(cntx, res.it);
|
||||
res = ExpireIfNeeded(cntx, res.it, true);
|
||||
if (!IsValid(res.it)) {
|
||||
return OpStatus::KEY_NOTFOUND;
|
||||
}
|
||||
|
@ -1088,11 +1088,12 @@ void DbSlice::PostUpdate(DbIndex db_ind, Iterator it, std::string_view key, size
|
|||
}
|
||||
|
||||
DbSlice::ItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, Iterator it) const {
|
||||
auto res = ExpireIfNeeded(cntx, it.GetInnerIt());
|
||||
auto res = ExpireIfNeeded(cntx, it.GetInnerIt(), false);
|
||||
return {.it = Iterator::FromPrime(res.it), .exp_it = ExpIterator::FromPrime(res.exp_it)};
|
||||
}
|
||||
|
||||
DbSlice::PrimeItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterator it) const {
|
||||
DbSlice::PrimeItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterator it,
|
||||
bool preempts) const {
|
||||
if (!it->second.HasExpire()) {
|
||||
LOG(ERROR) << "Invalid call to ExpireIfNeeded";
|
||||
return {it, ExpireIterator{}};
|
||||
|
@ -1122,7 +1123,7 @@ DbSlice::PrimeItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterato
|
|||
|
||||
// Replicate expiry
|
||||
if (auto journal = owner_->journal(); journal) {
|
||||
RecordExpiry(cntx.db_index, key);
|
||||
RecordExpiry(cntx.db_index, key, preempts);
|
||||
}
|
||||
|
||||
if (expired_keys_events_recording_)
|
||||
|
@ -1153,7 +1154,7 @@ void DbSlice::ExpireAllIfNeeded() {
|
|||
LOG(ERROR) << "Expire entry " << exp_it->first.ToString() << " not found in prime table";
|
||||
return;
|
||||
}
|
||||
ExpireIfNeeded(Context{nullptr, db_index, GetCurrentTimeMs()}, prime_it);
|
||||
ExpireIfNeeded(Context{nullptr, db_index, GetCurrentTimeMs()}, prime_it, false);
|
||||
};
|
||||
|
||||
ExpireTable::Cursor cursor;
|
||||
|
@ -1215,7 +1216,7 @@ auto DbSlice::DeleteExpiredStep(const Context& cntx, unsigned count) -> DeleteEx
|
|||
if (ttl <= 0) {
|
||||
auto prime_it = db.prime.Find(it->first);
|
||||
CHECK(!prime_it.is_done());
|
||||
ExpireIfNeeded(cntx, prime_it);
|
||||
ExpireIfNeeded(cntx, prime_it, false);
|
||||
++result.deleted;
|
||||
} else {
|
||||
result.survivor_ttl_sum += ttl;
|
||||
|
@ -1283,7 +1284,7 @@ pair<uint64_t, size_t> DbSlice::FreeMemWithEvictionStep(DbIndex db_ind, size_t s
|
|||
// fiber preemption could happen in this phase.
|
||||
for (string_view key : keys_to_journal) {
|
||||
if (auto journal = owner_->journal(); journal)
|
||||
RecordExpiry(db_ind, key);
|
||||
RecordExpiry(db_ind, key, false);
|
||||
|
||||
if (expired_keys_events_recording_)
|
||||
db_table->expired_keys_events_.emplace_back(key);
|
||||
|
|
|
@ -555,7 +555,7 @@ class DbSlice {
|
|||
ExpireIterator exp_it;
|
||||
};
|
||||
|
||||
PrimeItAndExp ExpireIfNeeded(const Context& cntx, PrimeIterator it) const;
|
||||
PrimeItAndExp ExpireIfNeeded(const Context& cntx, PrimeIterator it, bool preempts = false) const;
|
||||
|
||||
OpResult<AddOrFindResult> AddOrFindInternal(const Context& cntx, std::string_view key);
|
||||
|
||||
|
|
|
@ -597,6 +597,7 @@ void OpScan(const OpArgs& op_args, const ScanOpts& scan_opts, uint64_t* cursor,
|
|||
auto& db_slice = op_args.GetDbSlice();
|
||||
DCHECK(db_slice.IsDbValid(op_args.db_cntx.db_index));
|
||||
|
||||
util::FiberAtomicGuard guard;
|
||||
unsigned cnt = 0;
|
||||
|
||||
VLOG(1) << "PrimeTable " << db_slice.shard_id() << "/" << op_args.db_cntx.db_index << " has "
|
||||
|
|
|
@ -62,11 +62,17 @@ void RecordJournal(const OpArgs& op_args, std::string_view cmd, facade::ArgSlice
|
|||
op_args.tx->LogJournalOnShard(op_args.shard, Payload(cmd, args), shard_cnt, true);
|
||||
}
|
||||
|
||||
void RecordExpiry(DbIndex dbid, string_view key) {
|
||||
void RecordExpiry(DbIndex dbid, string_view key, bool preempts) {
|
||||
auto journal = EngineShard::tlocal()->journal();
|
||||
CHECK(journal);
|
||||
if (!preempts) {
|
||||
util::FiberAtomicGuard guard;
|
||||
journal->RecordEntry(0, journal::Op::EXPIRED, dbid, 1, cluster::KeySlot(key),
|
||||
Payload("DEL", ArgSlice{key}), preempts);
|
||||
return;
|
||||
}
|
||||
journal->RecordEntry(0, journal::Op::EXPIRED, dbid, 1, cluster::KeySlot(key),
|
||||
Payload("DEL", ArgSlice{key}), false);
|
||||
Payload("DEL", ArgSlice{key}), preempts);
|
||||
}
|
||||
|
||||
void TriggerJournalWriteToSink() {
|
||||
|
|
|
@ -224,7 +224,7 @@ void RecordJournal(const OpArgs& op_args, std::string_view cmd, ArgSlice args,
|
|||
|
||||
// Record expiry in journal with independent transaction. Must be called from shard thread holding
|
||||
// key.
|
||||
void RecordExpiry(DbIndex dbid, std::string_view key);
|
||||
void RecordExpiry(DbIndex dbid, std::string_view key, bool preempts = false);
|
||||
|
||||
// Trigger journal write to sink, no journal record will be added to journal.
|
||||
// Must be called from shard thread of journal to sink.
|
||||
|
|
Loading…
Reference in a new issue