1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

fix: send a single RDB_OPCODE_FULLSYNC_END from a snapshot (#920)

Fixes #917 by appending a blob of 8 bytes during serialization and consuming
it during the parsing phase.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2023-03-08 13:25:12 +02:00 committed by GitHub
parent 02770fdcd1
commit b7abe269f1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 16 additions and 4 deletions

View file

@ -1717,6 +1717,8 @@ error_code RdbLoader::Load(io::Source* src) {
/* Read type. */
SET_OR_RETURN(FetchType(), type);
DVLOG(2) << "Opcode type: " << type;
/* Handle special types. */
if (type == RDB_OPCODE_EXPIRETIME) {
LOG(ERROR) << "opcode RDB_OPCODE_EXPIRETIME not supported";
@ -1753,6 +1755,9 @@ error_code RdbLoader::Load(io::Source* src) {
}
if (type == RDB_OPCODE_FULLSYNC_END) {
VLOG(1) << "Read RDB_OPCODE_FULLSYNC_END";
RETURN_ON_ERR(EnsureRead(8));
mem_buf_->ConsumeInput(8); // ignore 8 bytes
if (full_sync_cut_cb)
full_sync_cut_cb();
continue;
@ -1833,6 +1838,8 @@ error_code RdbLoader::Load(io::Source* src) {
settings.Reset();
} // main load loop
DVLOG(1) << "RdbLoad loop finished";
if (stop_early_) {
return *ec_;
}

View file

@ -666,7 +666,14 @@ error_code RdbSerializer::SaveStreamConsumers(streamCG* cg) {
}
error_code RdbSerializer::SendFullSyncCut() {
return WriteOpcode(RDB_OPCODE_FULLSYNC_END);
RETURN_ON_ERR(WriteOpcode(RDB_OPCODE_FULLSYNC_END));
// RDB_OPCODE_FULLSYNC_END followed by 8 bytes of 0.
// The reason for this is that some opcodes require to have at least 8 bytes of data
// in the read buffer when consuming the rdb data, and since RDB_OPCODE_FULLSYNC_END is one of
// the last opcodes sent to replica, we respect this requirement by sending a blob of 8 bytes.
uint8_t buf[8] = {0};
return WriteRaw(buf);
}
error_code RdbSerializer::WriteRaw(const io::Bytes& buf) {

View file

@ -157,9 +157,7 @@ void SliceSnapshot::IterateBucketsFb(const Cancellation* cll) {
mu_.lock();
mu_.unlock();
// TODO: investigate why a single byte gets stuck and does not arrive to replica
for (unsigned i = 10; i > 1; i--)
CHECK(!serializer_->SendFullSyncCut());
CHECK(!serializer_->SendFullSyncCut());
PushSerializedToChannel(true);
// serialized + side_saved must be equal to the total saved.