mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-14 11:58:02 +00:00
feat(pytest): More types for seeder (#2577)
* feat(pytest): More types for seeder Add more types to the seeder and refactor replication test --------- Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
parent
c7750b9d58
commit
75eaeb32db
12 changed files with 416 additions and 184 deletions
|
@ -70,6 +70,39 @@ void PushError(lua_State* lua, string_view error, bool trace = true) {
|
|||
lua_settable(lua, -3);
|
||||
}
|
||||
|
||||
// Custom object explorer that collects all values into string array
|
||||
struct StringCollectorTranslator : public ObjectExplorer {
|
||||
void OnString(std::string_view str) final {
|
||||
values.emplace_back(str);
|
||||
}
|
||||
void OnArrayStart(unsigned len) final {
|
||||
CHECK(values.empty());
|
||||
values.reserve(len);
|
||||
}
|
||||
void OnArrayEnd() final {
|
||||
}
|
||||
void OnBool(bool b) final {
|
||||
OnString(absl::AlphaNum(b).Piece());
|
||||
}
|
||||
void OnDouble(double d) final {
|
||||
OnString(absl::AlphaNum(d).Piece());
|
||||
}
|
||||
void OnInt(int64_t val) final {
|
||||
OnString(absl::AlphaNum(val).Piece());
|
||||
}
|
||||
void OnNil() final {
|
||||
OnString("");
|
||||
}
|
||||
void OnStatus(std::string_view str) final {
|
||||
OnString(str);
|
||||
}
|
||||
void OnError(std::string_view str) final {
|
||||
LOG(ERROR) << str;
|
||||
}
|
||||
|
||||
vector<string> values;
|
||||
};
|
||||
|
||||
class RedisTranslator : public ObjectExplorer {
|
||||
public:
|
||||
RedisTranslator(lua_State* lua) : lua_(lua) {
|
||||
|
@ -306,40 +339,66 @@ void ToHex(const uint8_t* src, char* dest) {
|
|||
}
|
||||
|
||||
int DragonflyHashCommand(lua_State* lua) {
|
||||
int argc = lua_gettop(lua);
|
||||
if (argc != 2) {
|
||||
lua_pushstring(lua, "wrong number of arguments");
|
||||
return lua_error(lua);
|
||||
}
|
||||
|
||||
XXH64_hash_t hash = absl::bit_cast<XXH64_hash_t>(lua_tointeger(lua, 1));
|
||||
auto update_hash = [&hash](string_view sv) { hash = XXH64(sv.data(), sv.length(), hash); };
|
||||
bool requires_sort = lua_toboolean(lua, 2);
|
||||
|
||||
auto digest_value = [&hash, &update_hash, lua](int pos) {
|
||||
if (int type = lua_type(lua, pos); type == LUA_TSTRING) {
|
||||
const char* str = lua_tostring(lua, pos);
|
||||
update_hash(string_view{str, strlen(str)});
|
||||
} else {
|
||||
CHECK_EQ(type, LUA_TNUMBER) << "Only strings and integers can be hashed";
|
||||
update_hash(to_string(lua_tointeger(lua, pos)));
|
||||
}
|
||||
};
|
||||
// Pop first two arguments to call RedisGenericCommand from this function with tail
|
||||
lua_remove(lua, 1);
|
||||
lua_remove(lua, 1);
|
||||
|
||||
if (lua_type(lua, 2) == LUA_TTABLE) {
|
||||
lua_pushnil(lua);
|
||||
while (lua_next(lua, 2) != 0) {
|
||||
digest_value(-2); // key, included for correct hashing
|
||||
digest_value(-1); // value
|
||||
lua_pop(lua, 1);
|
||||
}
|
||||
} else {
|
||||
digest_value(2);
|
||||
// Compute key hash, we assume it's always second after the command
|
||||
{
|
||||
size_t len;
|
||||
const char* key = lua_tolstring(lua, 2, &len);
|
||||
hash = XXH64(key, len, hash);
|
||||
}
|
||||
|
||||
// Collect output into custom string collector
|
||||
StringCollectorTranslator translator;
|
||||
void** ptr = static_cast<void**>(lua_getextraspace(lua));
|
||||
reinterpret_cast<Interpreter*>(*ptr)->RedisGenericCommand(false, false, &translator);
|
||||
|
||||
if (requires_sort)
|
||||
sort(translator.values.begin(), translator.values.end());
|
||||
|
||||
// Compute new hash and return it
|
||||
for (string_view str : translator.values)
|
||||
hash = XXH64(str.data(), str.size(), hash);
|
||||
|
||||
lua_pushinteger(lua, absl::bit_cast<lua_Integer>(hash));
|
||||
return 1;
|
||||
}
|
||||
|
||||
int DragonflyRandstrCommand(lua_State* state) {
|
||||
int argc = lua_gettop(state);
|
||||
lua_Integer dsize = lua_tonumber(state, 1);
|
||||
lua_remove(state, 1);
|
||||
|
||||
std::string buf(dsize, ' ');
|
||||
auto push_str = [dsize, state, &buf]() {
|
||||
static const char alphanum[] =
|
||||
"0123456789"
|
||||
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
|
||||
"abcdefghijklmnopqrstuvwxyz";
|
||||
for (int i = 0; i < dsize; ++i)
|
||||
buf[i] = alphanum[rand() % (sizeof(alphanum) - 1)];
|
||||
lua_pushlstring(state, buf.c_str(), buf.length());
|
||||
};
|
||||
|
||||
if (argc == 1) {
|
||||
push_str();
|
||||
} else {
|
||||
lua_Integer num = lua_tonumber(state, 1);
|
||||
lua_createtable(state, num, 0);
|
||||
for (int i = 1; i <= num; i++) {
|
||||
push_str();
|
||||
lua_rawseti(state, -2, i);
|
||||
}
|
||||
}
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
int RedisSha1Command(lua_State* lua) {
|
||||
int argc = lua_gettop(lua);
|
||||
if (argc != 1) {
|
||||
|
@ -427,10 +486,16 @@ Interpreter::Interpreter() {
|
|||
/* Register the dragonfly commands table and fields */
|
||||
lua_newtable(lua_);
|
||||
|
||||
/* dragonfly.ihash - compute quick integer hash of command result */
|
||||
lua_pushstring(lua_, "ihash");
|
||||
lua_pushcfunction(lua_, DragonflyHashCommand);
|
||||
lua_settable(lua_, -3);
|
||||
|
||||
/* dragonfly.randstr - generate random string or table of random strings */
|
||||
lua_pushstring(lua_, "randstr");
|
||||
lua_pushcfunction(lua_, DragonflyRandstrCommand);
|
||||
lua_settable(lua_, -3);
|
||||
|
||||
/* Finally set the table as 'dragonfly' global var. */
|
||||
lua_setglobal(lua_, "dragonfly");
|
||||
CHECK(lua_checkstack(lua_, 64));
|
||||
|
@ -785,7 +850,7 @@ void Interpreter::ResetStack() {
|
|||
// Returns number of results, which is always 1 in this case.
|
||||
// Please note that lua resets the stack once the function returns so no need
|
||||
// to unwind the stack manually in the function (though lua allows doing this).
|
||||
int Interpreter::RedisGenericCommand(bool raise_error, bool async) {
|
||||
int Interpreter::RedisGenericCommand(bool raise_error, bool async, ObjectExplorer* explorer) {
|
||||
/* By using Lua debug hooks it is possible to trigger a recursive call
|
||||
* to luaRedisGenericCommand(), which normally should never happen.
|
||||
* To make this function reentrant is futile and makes it slower, but
|
||||
|
@ -884,9 +949,18 @@ int Interpreter::RedisGenericCommand(bool raise_error, bool async) {
|
|||
/* Pop all arguments from the stack, we do not need them anymore
|
||||
* and this way we guaranty we will have room on the stack for the result. */
|
||||
lua_pop(lua_, argc);
|
||||
RedisTranslator translator(lua_);
|
||||
redis_func_(
|
||||
CallArgs{MutSliceSpan{args}, &buffer_, &translator, async, raise_error, &raise_error});
|
||||
|
||||
// Calling with custom explorer is not supported with errors or async
|
||||
DCHECK(explorer == nullptr || (!raise_error && !async));
|
||||
|
||||
// If no custom explorer is set, use default translator
|
||||
optional<RedisTranslator> translator;
|
||||
if (explorer == nullptr) {
|
||||
translator.emplace(lua_);
|
||||
explorer = &*translator;
|
||||
}
|
||||
|
||||
redis_func_(CallArgs{MutSliceSpan{args}, &buffer_, explorer, async, raise_error, &raise_error});
|
||||
cmd_depth_--;
|
||||
|
||||
// Shrink reusable buffer if it's too big.
|
||||
|
@ -895,8 +969,11 @@ int Interpreter::RedisGenericCommand(bool raise_error, bool async) {
|
|||
buffer_.shrink_to_fit();
|
||||
}
|
||||
|
||||
if (!translator)
|
||||
return 0;
|
||||
|
||||
// Raise error for regular 'call' command if needed.
|
||||
if (raise_error && translator.HasError()) {
|
||||
if (raise_error && translator->HasError()) {
|
||||
// error is already on top of stack
|
||||
return RaiseError(lua_);
|
||||
}
|
||||
|
|
|
@ -17,8 +17,7 @@ namespace dfly {
|
|||
|
||||
class ObjectExplorer {
|
||||
public:
|
||||
virtual ~ObjectExplorer() {
|
||||
}
|
||||
virtual ~ObjectExplorer() = default;
|
||||
|
||||
virtual void OnBool(bool b) = 0;
|
||||
virtual void OnString(std::string_view str) = 0;
|
||||
|
@ -113,15 +112,15 @@ class Interpreter {
|
|||
redis_func_ = std::forward<U>(u);
|
||||
}
|
||||
|
||||
// Invoke command with arguments from lua stack, given options and possibly custom explorer
|
||||
int RedisGenericCommand(bool raise_error, bool async, ObjectExplorer* explorer = nullptr);
|
||||
|
||||
private:
|
||||
// Returns true if function was successfully added,
|
||||
// otherwise returns false and sets the error.
|
||||
bool AddInternal(const char* f_id, std::string_view body, std::string* error);
|
||||
bool IsTableSafe() const;
|
||||
|
||||
int RedisGenericCommand(bool raise_error, bool async);
|
||||
int RedisACallErrorsCommand();
|
||||
|
||||
static int RedisCallCommand(lua_State* lua);
|
||||
static int RedisPCallCommand(lua_State* lua);
|
||||
static int RedisACallCommand(lua_State* lua);
|
||||
|
|
|
@ -98,7 +98,7 @@ const char* ObjTypeName(int type) {
|
|||
case OBJ_STREAM:
|
||||
return "stream";
|
||||
case OBJ_JSON:
|
||||
return "ReJSON-RL";
|
||||
return "rejson-rl";
|
||||
default:
|
||||
LOG(ERROR) << "Unsupported type " << type;
|
||||
}
|
||||
|
|
|
@ -22,7 +22,7 @@ async def test_basic_json_get_set(async_client: aioredis.Redis):
|
|||
result = await get_set_json(connection=async_client, key=key_name, value=jane)
|
||||
assert result, "failed to set JSON value"
|
||||
the_type = await async_client.type(key_name)
|
||||
assert the_type == "ReJSON-RL"
|
||||
assert the_type == "rejson-rl"
|
||||
assert len(result) == 1
|
||||
assert result[0]["name"] == "Jane"
|
||||
assert result[0]["Age"] == 33
|
||||
|
@ -34,7 +34,7 @@ async def test_access_json_value_as_string(async_client: aioredis.Redis):
|
|||
assert result is not None, "failed to set JSON value"
|
||||
# make sure that we have valid JSON here
|
||||
the_type = await async_client.type(key_name)
|
||||
assert the_type == "ReJSON-RL"
|
||||
assert the_type == "rejson-rl"
|
||||
# you cannot access this key as string
|
||||
try:
|
||||
result = await async_client.get(key_name)
|
||||
|
@ -49,7 +49,7 @@ async def test_reset_key_to_string(async_client: aioredis.Redis):
|
|||
assert result is not None, "failed to set JSON value"
|
||||
# make sure that we have valid JSON here
|
||||
the_type = await async_client.type(key_name)
|
||||
assert the_type == "ReJSON-RL"
|
||||
assert the_type == "rejson-rl"
|
||||
|
||||
# set the key to be string - this is legal
|
||||
await async_client.set(key_name, "some random value")
|
||||
|
@ -60,7 +60,7 @@ async def test_reset_key_to_string(async_client: aioredis.Redis):
|
|||
# to change the type to JSON and override it
|
||||
result = await get_set_json(async_client, key=key_name, value=jane)
|
||||
the_type = await async_client.type(key_name)
|
||||
assert the_type == "ReJSON-RL"
|
||||
assert the_type == "rejson-rl"
|
||||
|
||||
|
||||
async def test_update_value(async_client: aioredis.Redis):
|
||||
|
@ -69,7 +69,7 @@ async def test_update_value(async_client: aioredis.Redis):
|
|||
assert result is not None, "failed to set JSON value"
|
||||
# make sure that we have valid JSON here
|
||||
the_type = await async_client.type(key_name)
|
||||
assert the_type == "ReJSON-RL"
|
||||
assert the_type == "rejson-rl"
|
||||
result = await get_set_json(async_client, value="0", key=key_name, path="$.a.*")
|
||||
assert len(result) == 3
|
||||
# make sure that all the values under 'a' where set to 0
|
||||
|
|
|
@ -3,12 +3,14 @@ from itertools import chain, repeat
|
|||
import re
|
||||
import pytest
|
||||
import asyncio
|
||||
import async_timeout
|
||||
import pymemcache
|
||||
import logging
|
||||
from redis import asyncio as aioredis
|
||||
from .utility import *
|
||||
from .instance import DflyInstanceFactory, DflyInstance
|
||||
from .seeder import Seeder as SeederV2
|
||||
from . import dfly_args
|
||||
import pymemcache
|
||||
import logging
|
||||
from .proxy import Proxy
|
||||
|
||||
ADMIN_PORT = 1211
|
||||
|
@ -17,36 +19,46 @@ DISCONNECT_CRASH_FULL_SYNC = 0
|
|||
DISCONNECT_CRASH_STABLE_SYNC = 1
|
||||
DISCONNECT_NORMAL_STABLE_SYNC = 2
|
||||
|
||||
M_OPT = [pytest.mark.opt_only]
|
||||
M_SLOW = [pytest.mark.slow]
|
||||
M_STRESS = [pytest.mark.slow, pytest.mark.opt_only]
|
||||
|
||||
|
||||
async def wait_for_replicas_state(*clients, state="stable_sync", timeout=0.05):
|
||||
"""Wait until all clients (replicas) reach passed state"""
|
||||
while len(clients) > 0:
|
||||
await asyncio.sleep(timeout)
|
||||
roles = await asyncio.gather(*(c.role() for c in clients))
|
||||
clients = [c for c, role in zip(clients, roles) if role[0] != "replica" or role[3] != state]
|
||||
|
||||
|
||||
"""
|
||||
Test full replication pipeline. Test full sync with streaming changes and stable state streaming.
|
||||
"""
|
||||
|
||||
# 1. Number of master threads
|
||||
# 2. Number of threads for each replica
|
||||
# 3. Seeder config
|
||||
# 4. Admin port
|
||||
replication_cases = [
|
||||
(8, [8], dict(keys=10_000, dbcount=4), False),
|
||||
(6, [6, 6, 6], dict(keys=4_000, dbcount=4), False),
|
||||
(8, [2, 2, 2, 2], dict(keys=4_000, dbcount=4), False),
|
||||
(4, [8, 8], dict(keys=4_000, dbcount=4), False),
|
||||
(4, [1] * 8, dict(keys=500, dbcount=2), False),
|
||||
(1, [1], dict(keys=100, dbcount=2), False),
|
||||
(6, [6, 6, 6], dict(keys=500, dbcount=4), True),
|
||||
(1, [1], dict(keys=100, dbcount=2), True),
|
||||
]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.slow
|
||||
@pytest.mark.parametrize("t_master, t_replicas, seeder_config, from_admin_port", replication_cases)
|
||||
@pytest.mark.parametrize(
|
||||
"t_master, t_replicas, seeder_config, stream_target",
|
||||
[
|
||||
# Quick general test that replication is working
|
||||
(1, 3 * [1], dict(key_target=1_000), 500),
|
||||
(4, [4, 4], dict(key_target=10_000), 1_000),
|
||||
pytest.param(6, [6, 6, 6], dict(key_target=100_000), 20_000, marks=M_OPT),
|
||||
# Skewed tests with different thread ratio
|
||||
pytest.param(8, 6 * [1], dict(key_target=5_000), 2_000, marks=M_SLOW),
|
||||
pytest.param(2, [8, 8], dict(key_target=10_000), 2_000, marks=M_SLOW),
|
||||
# Test with big value size
|
||||
pytest.param(2, [2], dict(key_target=1_000, data_size=10_000), 100, marks=M_SLOW),
|
||||
# Stress test
|
||||
pytest.param(8, [8, 8], dict(key_target=1_000_000, units=16), 50_000, marks=M_STRESS),
|
||||
],
|
||||
)
|
||||
async def test_replication_all(
|
||||
df_local_factory: DflyInstanceFactory,
|
||||
df_seeder_factory,
|
||||
t_master,
|
||||
t_replicas,
|
||||
seeder_config,
|
||||
from_admin_port,
|
||||
stream_target,
|
||||
):
|
||||
master = df_local_factory.create(admin_port=ADMIN_PORT, proactor_threads=t_master)
|
||||
replicas = [
|
||||
|
@ -54,56 +66,59 @@ async def test_replication_all(
|
|||
for i, t in enumerate(t_replicas)
|
||||
]
|
||||
|
||||
# Start master
|
||||
master.start()
|
||||
from_admin_port = random.choice([True, False])
|
||||
|
||||
# Start instances and connect clients
|
||||
df_local_factory.start_all([master] + replicas)
|
||||
c_master = master.client()
|
||||
|
||||
# Fill master with test data
|
||||
seeder = df_seeder_factory.create(port=master.port, **seeder_config)
|
||||
await seeder.run(target_deviation=0.1)
|
||||
|
||||
# Start replicas
|
||||
df_local_factory.start_all(replicas)
|
||||
|
||||
c_replicas = [replica.client() for replica in replicas]
|
||||
|
||||
# Fill master with test data
|
||||
seeder = SeederV2(**seeder_config)
|
||||
await seeder.run(c_master, target_deviation=0.01)
|
||||
|
||||
# Start data stream
|
||||
stream_task = asyncio.create_task(seeder.run(target_ops=3000))
|
||||
stream_task = asyncio.create_task(seeder.run(c_master))
|
||||
await asyncio.sleep(0.0)
|
||||
|
||||
# Start replication
|
||||
master_port = master.port if not from_admin_port else master.admin_port
|
||||
await asyncio.gather(
|
||||
*(
|
||||
asyncio.create_task(c.execute_command("REPLICAOF localhost " + str(master_port)))
|
||||
for c in c_replicas
|
||||
)
|
||||
)
|
||||
|
||||
async def run_replication(c_replica):
|
||||
await c_replica.execute_command("REPLICAOF localhost " + str(master_port))
|
||||
# Wait for all replicas to transition into stable sync
|
||||
async with async_timeout.timeout(20):
|
||||
await wait_for_replicas_state(*c_replicas)
|
||||
|
||||
await asyncio.gather(*(asyncio.create_task(run_replication(c)) for c in c_replicas))
|
||||
|
||||
# Wait for streaming to finish
|
||||
assert (
|
||||
not stream_task.done()
|
||||
), "Weak testcase. Increase number of streamed iterations to surpass full sync"
|
||||
# Stop streaming data once every replica is in stable sync
|
||||
await seeder.stop(c_master)
|
||||
await stream_task
|
||||
|
||||
# Check data after full sync
|
||||
await check_all_replicas_finished(c_replicas, c_master)
|
||||
await check_data(seeder, replicas, c_replicas)
|
||||
hashes = await asyncio.gather(*(SeederV2.capture(c) for c in [c_master] + c_replicas))
|
||||
assert len(set(hashes)) == 1
|
||||
|
||||
# Stream more data in stable state
|
||||
await seeder.run(target_ops=2000)
|
||||
await seeder.run(c_master, target_ops=stream_target)
|
||||
|
||||
# Check data after stable state stream
|
||||
await check_all_replicas_finished(c_replicas, c_master)
|
||||
await check_data(seeder, replicas, c_replicas)
|
||||
hashes = await asyncio.gather(*(SeederV2.capture(c) for c in [c_master] + c_replicas))
|
||||
assert len(set(hashes)) == 1
|
||||
|
||||
await disconnect_clients(c_master, *c_replicas)
|
||||
|
||||
|
||||
async def check_replica_finished_exec(c_replica: aioredis.Redis, c_master: aioredis.Redis):
|
||||
async def check_replica_finished_exec(c_replica: aioredis.Redis, m_offset):
|
||||
role = await c_replica.role()
|
||||
if role[0] != "replica" or role[3] != "stable_sync":
|
||||
return False
|
||||
syncid, r_offset = await c_replica.execute_command("DEBUG REPLICA OFFSET")
|
||||
m_offset = await c_master.execute_command("DFLY REPLICAOFFSET")
|
||||
|
||||
logging.debug(f" offset {syncid} {r_offset} {m_offset}")
|
||||
return r_offset == m_offset
|
||||
|
@ -117,25 +132,21 @@ async def check_all_replicas_finished(c_replicas, c_master, timeout=20):
|
|||
while (time.time() - start) < timeout:
|
||||
if not waiting_for:
|
||||
return
|
||||
await asyncio.sleep(1.0)
|
||||
await asyncio.sleep(0.2)
|
||||
|
||||
tasks = (asyncio.create_task(check_replica_finished_exec(c, c_master)) for c in waiting_for)
|
||||
finished_list = await asyncio.gather(*tasks)
|
||||
m_offset = await c_master.execute_command("DFLY REPLICAOFFSET")
|
||||
finished_list = await asyncio.gather(
|
||||
*(check_replica_finished_exec(c, m_offset) for c in waiting_for)
|
||||
)
|
||||
|
||||
# Remove clients that finished from waiting list
|
||||
waiting_for = [c for (c, finished) in zip(waiting_for, finished_list) if not finished]
|
||||
|
||||
first_r: aioredis.Redis = waiting_for[0]
|
||||
logging.error("Replica not finished, role %s", await first_r.role())
|
||||
raise RuntimeError("Not all replicas finished in time!")
|
||||
|
||||
|
||||
async def check_data(seeder, replicas, c_replicas):
|
||||
capture = await seeder.capture()
|
||||
for replica, c_replica in zip(replicas, c_replicas):
|
||||
await wait_available_async(c_replica)
|
||||
assert await seeder.compare(capture, port=replica.port)
|
||||
|
||||
|
||||
"""
|
||||
Test disconnecting replicas during different phases while constantly streaming changes to master.
|
||||
|
||||
|
@ -1052,50 +1063,42 @@ More details in https://github.com/dragonflydb/dragonfly/issues/1231
|
|||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.slow
|
||||
async def test_flushall_in_full_sync(df_local_factory, df_seeder_factory):
|
||||
master = df_local_factory.create(proactor_threads=4, logtostdout=True)
|
||||
replica = df_local_factory.create(proactor_threads=2, logtostdout=True)
|
||||
async def test_flushall_in_full_sync(df_local_factory):
|
||||
master = df_local_factory.create(proactor_threads=4)
|
||||
replica = df_local_factory.create(proactor_threads=2)
|
||||
|
||||
# Start master
|
||||
master.start()
|
||||
df_local_factory.start_all([master, replica])
|
||||
c_master = master.client()
|
||||
c_replica = replica.client()
|
||||
|
||||
# Fill master with test data
|
||||
seeder = df_seeder_factory.create(port=master.port, keys=100_000, dbcount=1)
|
||||
await seeder.run(target_deviation=0.1)
|
||||
seeder = SeederV2(key_target=30_000)
|
||||
await seeder.run(c_master, target_deviation=0.1)
|
||||
|
||||
# Start replica
|
||||
replica.start()
|
||||
c_replica = replica.client()
|
||||
# Start replication and wait for full sync
|
||||
await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
|
||||
|
||||
async def get_sync_mode(c_master):
|
||||
result = await c_master.execute_command("role")
|
||||
# result[1]->replicas info [0]->first replica info [2]->replication state
|
||||
return result[1][0][2]
|
||||
|
||||
async def is_full_sync_mode(c_master):
|
||||
return await get_sync_mode(c_master) == "full_sync"
|
||||
|
||||
# Wait for full sync to start
|
||||
while not await is_full_sync_mode(c_master):
|
||||
await asyncio.sleep(0.0)
|
||||
async with async_timeout.timeout(3):
|
||||
await wait_for_replicas_state(c_replica, state="full_sync", timeout=0.05)
|
||||
|
||||
syncid, _ = await c_replica.execute_command("DEBUG REPLICA OFFSET")
|
||||
|
||||
# Issue FLUSHALL and push some more entries
|
||||
await c_master.execute_command("FLUSHALL")
|
||||
# Issue FLUSHALL and record replica role at the same instant
|
||||
_, role = await asyncio.gather(c_master.execute_command("FLUSHALL"), c_replica.role())
|
||||
|
||||
if not await is_full_sync_mode(c_master):
|
||||
# Print warning if replication was too quick
|
||||
if role[3] != "full_sync":
|
||||
logging.error("!!! Full sync finished too fast. Adjust test parameters !!!")
|
||||
return
|
||||
|
||||
post_seeder = df_seeder_factory.create(port=master.port, keys=10, dbcount=1)
|
||||
await post_seeder.run(target_deviation=0.1)
|
||||
# Run a few more commands on top
|
||||
post_seeder = SeederV2(key_target=100)
|
||||
await post_seeder.run(c_master, target_deviation=0.1)
|
||||
|
||||
await check_all_replicas_finished([c_replica], c_master)
|
||||
|
||||
await check_data(post_seeder, [replica], [c_replica])
|
||||
# Check replica data consisten
|
||||
hash1, hash2 = await asyncio.gather(*(SeederV2.capture(c) for c in (c_master, c_replica)))
|
||||
assert hash1 == hash2
|
||||
|
||||
# Make sure that a new sync ID is present, meaning replication restarted following FLUSHALL.
|
||||
new_syncid, _ = await c_replica.execute_command("DEBUG REPLICA OFFSET")
|
||||
|
|
|
@ -1,10 +1,12 @@
|
|||
import asyncio
|
||||
import random
|
||||
import logging
|
||||
import re
|
||||
import typing
|
||||
import math
|
||||
import redis.asyncio as aioredis
|
||||
from dataclasses import dataclass
|
||||
import time
|
||||
|
||||
try:
|
||||
from importlib import resources as impresources
|
||||
|
@ -23,11 +25,22 @@ class SeederBase:
|
|||
SeederBase.UID_COUNTER += 1
|
||||
|
||||
@classmethod
|
||||
async def capture(clz, client: aioredis.Redis) -> typing.List[int]:
|
||||
async def capture(clz, client: aioredis.Redis) -> typing.Tuple[int]:
|
||||
"""Generate hash capture for all data stored in instance pointed by client"""
|
||||
|
||||
sha = await client.script_load(clz._load_script("hash"))
|
||||
return await asyncio.gather(*(client.evalsha(sha, 0, data_type) for data_type in clz.TYPES))
|
||||
return tuple(
|
||||
await asyncio.gather(
|
||||
*(clz._run_capture(client, sha, data_type) for data_type in clz.TYPES)
|
||||
)
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
async def _run_capture(client, sha, data_type):
|
||||
s = time.time()
|
||||
res = await client.evalsha(sha, 0, data_type)
|
||||
logging.debug(f"hash capture of {data_type} took {time.time() - s}")
|
||||
return res
|
||||
|
||||
@staticmethod
|
||||
def _read_file(fname):
|
||||
|
@ -100,15 +113,14 @@ class Seeder(SeederBase):
|
|||
|
||||
units: typing.List[Unit]
|
||||
|
||||
def __init__(self, units=10, key_target=10_000, data_size=10):
|
||||
def __init__(self, units=10, key_target=10_000, data_size=100):
|
||||
SeederBase.__init__(self)
|
||||
self.key_target = key_target
|
||||
self.data_size = data_size
|
||||
self.units = [
|
||||
Seeder.Unit(
|
||||
prefix=f"k-s{self.uid}u{i}-",
|
||||
type="STRING",
|
||||
# type=random.choice(Seeder.TYPES),
|
||||
type=Seeder.TYPES[i % len(Seeder.TYPES)],
|
||||
counter=0,
|
||||
stop_key=f"_s{self.uid}u{i}-stop",
|
||||
)
|
||||
|
@ -145,6 +157,8 @@ class Seeder(SeederBase):
|
|||
async def _run_unit(client: aioredis.Redis, sha: str, unit: Unit, using_stopkey, args):
|
||||
await client.delete(unit.stop_key)
|
||||
|
||||
s = time.time()
|
||||
|
||||
args = [
|
||||
unit.prefix,
|
||||
unit.type,
|
||||
|
@ -153,3 +167,7 @@ class Seeder(SeederBase):
|
|||
] + args
|
||||
|
||||
unit.counter = await client.evalsha(sha, 0, *args)
|
||||
|
||||
logging.debug(
|
||||
f"running unit {unit.prefix}/{unit.type} took {time.time() - s}, target {args[4+0]}"
|
||||
)
|
||||
|
|
|
@ -22,19 +22,21 @@ local data_size = tonumber(ARGV[8])
|
|||
-- assumes exclusive ownership
|
||||
local keys = LU_collect_keys(prefix, type)
|
||||
|
||||
LG_funcs.init(data_size)
|
||||
local addfunc = LG_funcs['add_' .. string.lower(type)]
|
||||
local modfunc = LG_funcs['mod_' .. string.lower(type)]
|
||||
|
||||
local function action_add()
|
||||
local key = prefix .. tostring(key_counter)
|
||||
key_counter = key_counter + 1
|
||||
|
||||
addfunc(key, keys)
|
||||
table.insert(keys, key)
|
||||
addfunc(key, data_size)
|
||||
end
|
||||
|
||||
local function action_mod()
|
||||
local key = keys[math.random(#keys)]
|
||||
modfunc(key, data_size)
|
||||
modfunc(key, keys)
|
||||
end
|
||||
|
||||
local function action_del()
|
||||
|
@ -62,11 +64,6 @@ while true do
|
|||
break
|
||||
end
|
||||
|
||||
if key_target < 100 and min_dev > 0 then
|
||||
print(real_target, key_target, math.abs(#keys - real_target) / real_target)
|
||||
print()
|
||||
end
|
||||
|
||||
-- break if we reached our target deviation
|
||||
if min_dev > 0 and math.abs(#keys - real_target) / real_target < min_dev then
|
||||
break
|
||||
|
@ -91,7 +88,7 @@ while true do
|
|||
-- the add intensity is monotonically decreasing with keycount growing,
|
||||
-- the delete intensity is monotonically increasing with keycount growing,
|
||||
-- the point where the intensities are equal is the equilibrium point,
|
||||
-- based on the formulas it's ~0.82 * key_target
|
||||
-- based on the formulas it's ~0.956 * key_target
|
||||
local i_add = math.max(0, 1 - (#keys / key_target) ^ 16)
|
||||
local i_del = (#keys / key_target) ^ 16
|
||||
|
||||
|
|
|
@ -1,20 +1,155 @@
|
|||
local LG_funcs = {}
|
||||
|
||||
-- strings
|
||||
|
||||
function LG_funcs.add_string(key, dsize)
|
||||
local char = string.char(math.random(65, 90))
|
||||
redis.apcall('SET', key, string.rep(char, dsize))
|
||||
function LG_funcs.init(dsize)
|
||||
LG_funcs.dsize = dsize
|
||||
LG_funcs.csize = math.floor(dsize ^ (2/3))
|
||||
LG_funcs.esize = math.ceil(dsize ^ (1/3))
|
||||
end
|
||||
|
||||
function LG_funcs.mod_string(key, dsize)
|
||||
-- strings
|
||||
-- store blobs of random chars
|
||||
|
||||
function LG_funcs.add_string(key)
|
||||
redis.apcall('SET', key, dragonfly.randstr(LG_funcs.dsize))
|
||||
end
|
||||
|
||||
function LG_funcs.mod_string(key)
|
||||
-- APPEND and SETRANGE are the only modifying operations for strings,
|
||||
-- issue APPEND rarely to not grow data too much
|
||||
if math.random() < 0.05 then
|
||||
redis.apcall('APPEND', key, '+')
|
||||
else
|
||||
local char = string.char(math.random(65, 90))
|
||||
local replacement = string.rep(char, math.random(0, dsize / 2))
|
||||
redis.apcall('SETRANGE', key, math.random(0, dsize / 2), replacement)
|
||||
local replacement = dragonfly.randstr(LG_funcs.dsize // 2)
|
||||
redis.apcall('SETRANGE', key, math.random(0, LG_funcs.dsize // 2), replacement)
|
||||
end
|
||||
end
|
||||
|
||||
-- lists
|
||||
-- store list of random blobs of default container/element sizes
|
||||
|
||||
function LG_funcs.add_list(key)
|
||||
local elements = dragonfly.randstr(LG_funcs.esize, LG_funcs.csize)
|
||||
redis.apcall('LPUSH', key, unpack(elements))
|
||||
end
|
||||
|
||||
function LG_funcs.mod_list(key)
|
||||
-- equally likely pops and pushes, we rely on the list size being large enough
|
||||
-- to "highly likely" not get emptied out by consequitve pops
|
||||
local action = math.random(1, 4)
|
||||
if action == 1 then
|
||||
redis.apcall('RPOP', key)
|
||||
elseif action == 2 then
|
||||
redis.apcall('LPOP', key)
|
||||
elseif action == 3 then
|
||||
redis.apcall('LPUSH', key, dragonfly.randstr(LG_funcs.esize))
|
||||
else
|
||||
redis.apcall('RPUSH', key, dragonfly.randstr(LG_funcs.esize))
|
||||
end
|
||||
end
|
||||
|
||||
-- sets
|
||||
-- store sets of blobs of default container/element sizes
|
||||
|
||||
function LG_funcs.add_set(key, keys)
|
||||
if #keys > 100 and math.random() < 0.05 then
|
||||
-- we assume that elements overlap with a very low proabiblity, so
|
||||
-- SDIFF is expected to be equal to the origin set.
|
||||
-- Repeating this operation too often can lead to two equal sets being chosen
|
||||
local i1 = math.random(#keys)
|
||||
local i2 = math.random(#keys)
|
||||
while i1 == i2 do
|
||||
i2 = math.random(#keys)
|
||||
end
|
||||
redis.apcall('SDIFFSTORE', key, keys[i1], keys[i2])
|
||||
else
|
||||
local elements = dragonfly.randstr(LG_funcs.esize, LG_funcs.csize)
|
||||
redis.apcall('SADD', key, unpack(elements))
|
||||
end
|
||||
end
|
||||
|
||||
function LG_funcs.mod_set(key)
|
||||
-- equally likely pops and additions
|
||||
if math.random() < 0.5 then
|
||||
redis.apcall('SPOP', key)
|
||||
else
|
||||
redis.apcall('SADD', key, dragonfly.randstr(LG_funcs.esize))
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
-- hashes
|
||||
-- store {to_string(i): value for i in [1, csize]},
|
||||
-- where `value` is a random string for even indices and a number for odd indices
|
||||
|
||||
function LG_funcs.add_hash(key)
|
||||
local blobs = dragonfly.randstr(LG_funcs.esize, LG_funcs.csize / 2)
|
||||
local htable = {}
|
||||
for i = 1, LG_funcs.csize, 2 do
|
||||
htable[i * 2 - 1] = tostring(i)
|
||||
htable[i * 2] = math.random(0, 1000)
|
||||
end
|
||||
for i = 2, LG_funcs.csize, 2 do
|
||||
htable[i * 2 - 1] = tostring(i)
|
||||
htable[i * 2] = blobs[i // 2]
|
||||
end
|
||||
redis.apcall('HSET', key, unpack(htable))
|
||||
end
|
||||
|
||||
function LG_funcs.mod_hash(key)
|
||||
local idx = math.random(LG_funcs.csize)
|
||||
if idx % 2 == 1 then
|
||||
redis.apcall('HINCRBY', key, tostring(idx), 1)
|
||||
else
|
||||
redis.apcall('HSET', key, tostring(idx), dragonfly.randstr(LG_funcs.esize))
|
||||
end
|
||||
end
|
||||
|
||||
-- sorted sets
|
||||
|
||||
function LG_funcs.add_zset(key, keys)
|
||||
-- TODO: We don't support ZDIFFSTORE
|
||||
local blobs = dragonfly.randstr(LG_funcs.esize, LG_funcs.csize)
|
||||
local ztable = {}
|
||||
for i = 1, LG_funcs.csize do
|
||||
ztable[i * 2 - 1] = tostring(i)
|
||||
ztable[i * 2] = blobs[i]
|
||||
end
|
||||
redis.apcall('ZADD', key, unpack(ztable))
|
||||
end
|
||||
|
||||
function LG_funcs.mod_zset(key, dbsize)
|
||||
local action = math.random(1, 4)
|
||||
if action <= 2 then
|
||||
redis.apcall('ZADD', key, math.random(0, LG_funcs.csize * 2), dragonfly.randstr(LG_funcs.esize))
|
||||
elseif action == 3 then
|
||||
redis.apcall('ZPOPMAX', key)
|
||||
else
|
||||
redis.apcall('ZPOPMIN', key)
|
||||
end
|
||||
end
|
||||
|
||||
-- json
|
||||
-- store single list of integers inside object
|
||||
|
||||
function LG_funcs.add_json(key)
|
||||
-- generate single list of counters
|
||||
local seed = math.random(100)
|
||||
local counters = {}
|
||||
for i = 1, LG_funcs.csize do
|
||||
counters[i] = ((i + seed) * 123) % 701
|
||||
end
|
||||
redis.apcall('JSON.SET', key, '$', cjson.encode({counters = counters}))
|
||||
end
|
||||
|
||||
function LG_funcs.mod_json(key, dbsize)
|
||||
local action = math.random(1, 4)
|
||||
if action == 1 then
|
||||
redis.apcall('JSON.ARRAPPEND', key, '$.counters', math.random(701))
|
||||
elseif action == 2 then
|
||||
redis.apcall('JSON.ARRPOP', key, '$.counters')
|
||||
elseif action == 3 then
|
||||
redis.apcall('JSON.NUMMULTBY', key, '$.counters[' .. math.random(LG_funcs.csize ) .. ']', 2)
|
||||
else
|
||||
redis.apcall('JSON.NUMINCRBY', key, '$.counters[' .. math.random(LG_funcs.csize ) .. ']', 1)
|
||||
end
|
||||
end
|
||||
|
|
|
@ -8,7 +8,7 @@ Keys of every type are sorted lexicographically to ensure consistent order.
|
|||
-- import:utillib --
|
||||
|
||||
-- inputs
|
||||
local requested_types = ARGV
|
||||
local type = ARGV[1]
|
||||
|
||||
local OUT_HASH = 0
|
||||
|
||||
|
@ -19,16 +19,11 @@ local function process(type)
|
|||
-- sort to provide consistent order
|
||||
table.sort(keys)
|
||||
for _, key in ipairs(keys) do
|
||||
-- add key to hash
|
||||
OUT_HASH = dragonfly.ihash(OUT_HASH, key)
|
||||
-- hand hash over to callback
|
||||
OUT_HASH = hfunc(key, OUT_HASH)
|
||||
end
|
||||
end
|
||||
|
||||
for _, type in ipairs(requested_types) do
|
||||
process(string.lower(type))
|
||||
end
|
||||
|
||||
process(string.lower(type))
|
||||
|
||||
return OUT_HASH
|
||||
|
|
|
@ -2,38 +2,30 @@ local LH_funcs = {}
|
|||
|
||||
function LH_funcs.string(key, hash)
|
||||
-- add value to hash
|
||||
return dragonfly.ihash(hash, redis.call('GET', key))
|
||||
return dragonfly.ihash(hash, false, 'GET', key)
|
||||
end
|
||||
|
||||
function LH_funcs.list(key, hash)
|
||||
-- add values to hash
|
||||
return dragonfly.ihash(hash, redis.call('LRANGE', key, 0, -1))
|
||||
return dragonfly.ihash(hash, false, 'LRANGE', key, 0, -1)
|
||||
end
|
||||
|
||||
function LH_funcs.set(key, hash)
|
||||
-- add values to hash, sort before to avoid ambiguity
|
||||
local items = redis.call('SMEMBERS', key)
|
||||
table.sort(items)
|
||||
return dragonfly.ihash(hash, items)
|
||||
return dragonfly.ihash(hash, true, 'SMEMBERS', key)
|
||||
end
|
||||
|
||||
function LH_funcs.zset(key, hash)
|
||||
-- add values to hash, ZRANGE returns always sorted values
|
||||
return dragonfly.ihash(hash, redis.call('ZRANGE', key, 0, -1, 'WITHSCORES'))
|
||||
return dragonfly.ihash(hash, false, 'ZRANGE', key, 0, -1, 'WITHSCORES')
|
||||
end
|
||||
|
||||
function LH_funcs.hash(key, hash)
|
||||
-- add values to hash, first convert to key-value pairs and sort
|
||||
local items = redis.call('HGETALL', key)
|
||||
local paired_items = {}
|
||||
for i = 1, #items, 2 do
|
||||
table.insert(paired_items, items[i] .. '->' .. items[i+1])
|
||||
end
|
||||
table.sort(paired_items)
|
||||
return dragonfly.ihash(hash, paired_items)
|
||||
return dragonfly.ihash(hash, true, 'HGETALL', key)
|
||||
end
|
||||
|
||||
function LH_funcs.json(key, hash)
|
||||
-- add values to hash, note JSON.GET returns just a string
|
||||
return dragonfly.ihash(hash, redis.call('JSON.GET', key))
|
||||
return dragonfly.ihash(hash, false, 'JSON.GET', key)
|
||||
end
|
||||
|
|
|
@ -1,5 +1,10 @@
|
|||
-- collect all keys into table specific type on specific prefix. Uses SCAN--
|
||||
local function LU_collect_keys(prefix, type)
|
||||
-- SCAN wants this weird type name for json
|
||||
if string.lower(type) == 'json' then
|
||||
type = 'ReJSON-RL'
|
||||
end
|
||||
|
||||
local pattern = prefix .. "*"
|
||||
local cursor = "0"
|
||||
local keys = {}
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
import asyncio
|
||||
import async_timeout
|
||||
import string
|
||||
import random
|
||||
from redis import asyncio as aioredis
|
||||
from . import dfly_args
|
||||
from .seeder import Seeder, StaticSeeder
|
||||
|
@ -18,7 +17,7 @@ async def test_static_seeder(async_client: aioredis.Redis):
|
|||
@dfly_args({"proactor_threads": 4})
|
||||
async def test_seeder_key_target(async_client: aioredis.Redis):
|
||||
"""Ensure seeder reaches its key targets"""
|
||||
s = Seeder(units=random.randint(4, 12), key_target=5000)
|
||||
s = Seeder(units=len(Seeder.TYPES) * 2, key_target=5000)
|
||||
|
||||
# Ensure tests are not reasonably slow
|
||||
async with async_timeout.timeout(1 + 4):
|
||||
|
@ -41,6 +40,15 @@ async def test_seeder_key_target(async_client: aioredis.Redis):
|
|||
await s.run(async_client, target_deviation=0.5) # don't set low precision with low values
|
||||
assert await async_client.dbsize() < 200
|
||||
|
||||
# Get cmdstat calls
|
||||
info = await async_client.info("ALL")
|
||||
calls = {
|
||||
k.split("_")[1]: v["calls"]
|
||||
for k, v in info.items()
|
||||
if k.startswith("cmdstat_") and v["calls"] > 50
|
||||
}
|
||||
assert len(calls) > 15 # we use at least 15 different commands
|
||||
|
||||
|
||||
@dfly_args({"proactor_threads": 4})
|
||||
async def test_seeder_capture(async_client: aioredis.Redis):
|
||||
|
@ -49,17 +57,15 @@ async def test_seeder_capture(async_client: aioredis.Redis):
|
|||
async def set_data():
|
||||
p = async_client.pipeline()
|
||||
p.mset(mapping={f"string{i}": f"{i}" for i in range(100)})
|
||||
# uncomment when seeder supports more than strings
|
||||
# p.lpush("list1", *list(string.ascii_letters))
|
||||
# p.sadd("set1", *list(string.ascii_letters))
|
||||
# p.hset("hash1", mapping={f"{i}": l for i, l in enumerate(string.ascii_letters)})
|
||||
# p.zadd("zset1", mapping={l: i for i, l in enumerate(string.ascii_letters)})
|
||||
# p.json().set("json1", ".", {"a": [1, 2, 3], "b": {"c": 1, "d": 2, "e": [5, 6]}})
|
||||
p.lpush("list1", *list(string.ascii_letters))
|
||||
p.sadd("set1", *list(string.ascii_letters))
|
||||
p.hset("hash1", mapping={f"{i}": l for i, l in enumerate(string.ascii_letters)})
|
||||
p.zadd("zset1", mapping={l: i for i, l in enumerate(string.ascii_letters)})
|
||||
await p.execute()
|
||||
|
||||
# Capture with filled data
|
||||
await set_data()
|
||||
c1 = await Seeder.capture(async_client)
|
||||
capture = await Seeder.capture(async_client)
|
||||
|
||||
# Check hashes are 0 without data
|
||||
await async_client.flushall()
|
||||
|
@ -67,11 +73,16 @@ async def test_seeder_capture(async_client: aioredis.Redis):
|
|||
|
||||
# Check setting the same data results in same hashes
|
||||
await set_data()
|
||||
c2 = await Seeder.capture(async_client)
|
||||
assert c1 == c2
|
||||
assert capture == await Seeder.capture(async_client)
|
||||
|
||||
# Check chaning the data gives different hahses
|
||||
# await async_client.lpush("list1", "NEW")
|
||||
await async_client.append("string1", "MORE-DATA")
|
||||
c3 = await Seeder.capture(async_client)
|
||||
assert c1 != c3
|
||||
# Check changing the data gives different hahses
|
||||
await async_client.lpush("list1", "NEW")
|
||||
assert capture != await Seeder.capture(async_client)
|
||||
|
||||
# Undo our change
|
||||
await async_client.lpop("list1")
|
||||
assert capture == await Seeder.capture(async_client)
|
||||
|
||||
# Do another change
|
||||
await async_client.spop("set1")
|
||||
assert capture != await Seeder.capture(async_client)
|
||||
|
|
Loading…
Reference in a new issue