1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

feat(cluster_mgr): Add support for migrate action (#2626)

Example usage:

```bash
# Create a 2-node cluster
./cluster_mgr.py --action=create --replicas_per_master=1 --num_master=2

# Move (no migration) all slots to first node
./cluster_mgr.py --action=move --target_port=7001 --slot_start=8192 --slot_end=16383

# Fill data - like run memtier

# Migrate all slots to 2nd node. One could measure how long this step takes.
./cluster_mgr.py --action=migrate --target_port=7002 --slot_start=0 --slot_end=16383
```
This commit is contained in:
Shahar Mike 2024-02-20 13:58:13 +02:00 committed by GitHub
parent ebe83c820c
commit c7750b9d58
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -6,14 +6,14 @@ import redis
import subprocess import subprocess
import time import time
''' """
To install: pip install -r requirements.txt To install: pip install -r requirements.txt
''' """
class Node: class Node:
def __init__(self, port): def __init__(self, port):
self.id = '' self.id = ""
self.port = port self.port = port
self.admin_port = port + 10_000 self.admin_port = port + 10_000
@ -25,34 +25,41 @@ class Master:
def start_node(node, threads): def start_node(node, threads):
f = open(f'/tmp/dfly.cluster.node.{node.port}.log', 'w') f = open(f"/tmp/dfly.cluster.node.{node.port}.log", "w")
print( print(f"- Log file for node {node.port}: {f.name}")
f'- Log file for node {node.port}: {f.name}') subprocess.Popen(
subprocess.Popen(['../build-dbg/dragonfly', f'--port={node.port}', [
f'--admin_port={node.admin_port}', '--cluster_mode=yes', f'--proactor_threads={threads}', "../build-opt/dragonfly",
'--dbfilename=', f'--logtostderr'], stderr=f) f"--port={node.port}",
f"--admin_port={node.admin_port}",
"--cluster_mode=yes",
f"--proactor_threads={threads}",
"--dbfilename=",
f"--logtostderr",
],
stderr=f,
)
def send_command(node, command): def send_command(node, command):
client = redis.Redis(decode_responses=True, client = redis.Redis(decode_responses=True, host="localhost", port=node.admin_port)
host="localhost", port=node.admin_port)
for i in range(0, 5): for i in range(0, 5):
try: try:
result = client.execute_command(*command) result = client.execute_command(*command)
client.close() client.close()
return result return result
except: except Exception as e:
print(e)
time.sleep(0.1 * i) time.sleep(0.1 * i)
print( print(f"Unable to run command {command} against localhost:{node.admin_port} after 5 attempts!")
f'Unable to connect to localhost:{node.admin_port} after 5 attempts!')
def update_id(node): def update_id(node):
id = send_command(node, ['dflycluster', 'myid']) id = send_command(node, ["dflycluster", "myid"])
node.id = id node.id = id
print(f'- ID for {node.port}: {id}') print(f"- ID for {node.port}: {id}")
def build_config_from_list(masters): def build_config_from_list(masters):
@ -60,23 +67,14 @@ def build_config_from_list(masters):
slots_per_node = math.floor(total_slots / len(masters)) slots_per_node = math.floor(total_slots / len(masters))
def build_node(node): def build_node(node):
return { return {"id": node.id, "ip": "localhost", "port": node.port}
"id": node.id,
"ip": "localhost",
"port": node.port
}
config = [] config = []
for i, master in enumerate(masters): for i, master in enumerate(masters):
c = { c = {
"slot_ranges": [ "slot_ranges": [{"start": i * slots_per_node, "end": (i + 1) * slots_per_node - 1}],
{
"start": i * slots_per_node,
"end": (i+1) * slots_per_node - 1
}
],
"master": build_node(master.node), "master": build_node(master.node),
"replicas": [build_node(replica) for replica in master.replicas] "replicas": [build_node(replica) for replica in master.replicas],
} }
config.append(c) config.append(c)
@ -97,19 +95,18 @@ def get_nodes_from_config(config):
def push_config(config): def push_config(config):
def push_to_node(node, config): def push_to_node(node, config):
config_str = json.dumps(config, indent=2) config_str = json.dumps(config, indent=2)
response = send_command(node, ['dflycluster', 'config', config_str]) response = send_command(node, ["dflycluster", "config", config_str])
print(f'- Push to {node.port}: {response}') print(f"- Push to {node.port}: {response}")
for node in get_nodes_from_config(config): for node in get_nodes_from_config(config):
push_to_node(node, config) push_to_node(node, config)
def create(args): def create(args):
print(f'Setting up a Dragonfly cluster:') print(f"Setting up a Dragonfly cluster:")
print(f'- Master nodes: {args.num_masters}') print(f"- Master nodes: {args.num_masters}")
print( print(f"- Ports: {args.first_port}...{args.first_port + args.num_masters - 1}")
f'- Ports: {args.first_port}...{args.first_port + args.num_masters - 1}') print(f"- Replicas for each master: {args.replicas_per_master}")
print(f'- Replicas for each master: {args.replicas_per_master}')
print() print()
next_port = args.first_port next_port = args.first_port
@ -129,28 +126,26 @@ def create(args):
for replica in master.replicas: for replica in master.replicas:
nodes.append(replica) nodes.append(replica)
print('Starting nodes...') print("Starting nodes...")
for node in nodes: for node in nodes:
start_node(node, args.threads) start_node(node, args.threads)
print() print()
if args.replicas_per_master > 0: if args.replicas_per_master > 0:
print('Configuring replication...') print("Configuring replication...")
for master in masters: for master in masters:
for replica in master.replicas: for replica in master.replicas:
response = send_command( response = send_command(replica, ["replicaof", "localhost", master.node.port])
replica, ['replicaof', 'localhost', master.node.port]) print(f"- {replica.port} replicating {master.node.port}: {response}")
print(
f'- {replica.port} replicating {master.node.port}: {response}')
print() print()
print(f'Getting IDs...') print(f"Getting IDs...")
for n in nodes: for n in nodes:
update_id(n) update_id(n)
print() print()
config = build_config_from_list(masters) config = build_config_from_list(masters)
print(f'Pushing config:\n{config}\n') print(f"Pushing config:\n{config}\n")
push_config(config) push_config(config)
print() print()
@ -161,49 +156,44 @@ def build_config_from_existing(args):
def build_node(node_list): def build_node(node_list):
d = list_to_dict(node_list) d = list_to_dict(node_list)
return { return {"id": d["id"], "ip": d["endpoint"], "port": d["port"]}
"id": d["id"],
"ip": d["endpoint"],
"port": d["port"]
}
def build_slots(slot_list): def build_slots(slot_list):
slots = [] slots = []
for i in range(0, len(slot_list), 2): for i in range(0, len(slot_list), 2):
slots.append( slots.append({"start": slot_list[i], "end": slot_list[i + 1]})
{
"start": slot_list[i],
"end": slot_list[i+1]
}
)
return slots return slots
client = redis.Redis(decode_responses=True, client = redis.Redis(decode_responses=True, host="localhost", port=args.first_port)
host="localhost", port=args.first_port)
existing = client.execute_command("cluster", "shards") existing = client.execute_command("cluster", "shards")
config = [] config = []
for shard_list in existing: for shard_list in existing:
shard = list_to_dict(shard_list) shard = list_to_dict(shard_list)
config.append({ config.append(
"slot_ranges": build_slots(shard["slots"]), {
"master": build_node(shard["nodes"][0]), "slot_ranges": build_slots(shard["slots"]),
"replicas": [build_node(replica) for replica in shard["nodes"][1::]] "master": build_node(shard["nodes"][0]),
}) "replicas": [build_node(replica) for replica in shard["nodes"][1::]],
}
)
return config return config
def find_node(config, port):
new_owner = None
for shard in config:
if shard["master"]["port"] == port:
new_owner = shard
break
else:
print(f"Can't find master with port {port} (hint: use flag --target_port).")
exit(-1)
return new_owner
def move(args): def move(args):
config = build_config_from_existing(args) config = build_config_from_existing(args)
new_owner = find_node(config, args.target_port)
new_owner = None
for shard in config:
if shard["master"]["port"] == args.target_port:
new_owner = shard
break
else:
print(
f"Can't find master with port {args.target_port} (hint: use flag --target_port).")
exit(-1)
def remove_slot(slot, from_range, from_shard): def remove_slot(slot, from_range, from_shard):
if from_range["start"] == slot: if from_range["start"] == slot:
@ -215,10 +205,10 @@ def move(args):
if from_range["start"] > from_range["end"]: if from_range["start"] > from_range["end"]:
from_shard["slot_ranges"].remove(from_range) from_shard["slot_ranges"].remove(from_range)
else: else:
assert slot > from_range["start"] and slot < from_range[ assert (
"end"], f'{slot} {from_range["start"]} {from_range["end"]}' slot > from_range["start"] and slot < from_range["end"]
from_shard["slot_ranges"].append( ), f'{slot} {from_range["start"]} {from_range["end"]}'
{"start": slot + 1, "end": from_range["end"]}) from_shard["slot_ranges"].append({"start": slot + 1, "end": from_range["end"]})
from_range["end"] = slot - 1 from_range["end"] = slot - 1
def add_slot(slot, to_shard): def add_slot(slot, to_shard):
@ -262,7 +252,7 @@ def move(args):
break break
return new_range return new_range
for slot in range(args.slot_start, args.slot_end+1): for slot in range(args.slot_start, args.slot_end + 1):
shard, slot_range = find_slot(slot, config) shard, slot_range = find_slot(slot, config)
if shard == None: if shard == None:
continue continue
@ -274,10 +264,57 @@ def move(args):
for shard in config: for shard in config:
shard["slot_ranges"] = pack(shard["slot_ranges"]) shard["slot_ranges"] = pack(shard["slot_ranges"])
print(f'Pushing new config:\n{json.dumps(config, indent=2)}\n') print(f"Pushing new config:\n{json.dumps(config, indent=2)}\n")
push_config(config) push_config(config)
def migrate(args):
config = build_config_from_existing(args)
target = find_node(config, args.target_port)
target_node = Node(target["master"]["port"])
# Find source node
source = None
for node in config:
slots = node["slot_ranges"]
for slot in slots:
if slot["start"] >= args.slot_start and slot["end"] <= args.slot_end:
source = node
break
if source == None:
print("Unsupported slot range migration (currently only 1-node migration supported)")
exit(-1)
source_node = Node(source["master"]["port"])
# do migration
sync_id = send_command(
target_node,
[
"DFLYCLUSTER",
"START-SLOT-MIGRATION",
"127.0.0.1",
source["master"]["port"] + 10_000,
args.slot_start,
args.slot_end,
],
)
# wait for migration finish
sync_status = []
while True:
sync_status = send_command(target_node, ["DFLYCLUSTER", "SLOT-MIGRATION-STATUS"])
assert len(sync_status) == 1
if sync_status[0].endswith("STABLE_SYNC"):
break
print("Reached stable sync: ", sync_status)
res = send_command(source_node, ["DFLYCLUSTER", "SLOT-MIGRATION-FINALIZE", sync_id])
assert res == "OK"
# Push new config to all nodes
move(args)
def print_config(args): def print_config(args):
config = build_config_from_existing(args) config = build_config_from_existing(args)
print(json.dumps(config, indent=2)) print(json.dumps(config, indent=2))
@ -290,7 +327,8 @@ def shutdown(args):
def main(): def main():
parser = argparse.ArgumentParser(description=''' parser = argparse.ArgumentParser(
description="""
Local Cluster Manager Local Cluster Manager
Example usage: Example usage:
@ -306,30 +344,43 @@ Connect to cluster and shutdown all nodes:
Connect to cluster and move slots 10-20 to master with port 7002: Connect to cluster and move slots 10-20 to master with port 7002:
./cluster_mgr.py --action=move --slot_start=10 --slot_end=20 --new_owner=7002 ./cluster_mgr.py --action=move --slot_start=10 --slot_end=20 --new_owner=7002
''')
parser.add_argument('--action', default='', Migrate slots 10-20 to master with port 7002
help='Which action to take? create: start a new instance, move=move slots') ./cluster_mgr.py --action=migrate --slot_start=10 --slot_end=20 --new_owner=7002
parser.add_argument('--num_masters', type=int, default=3, """
help='Number of master nodes in cluster') )
parser.add_argument('--replicas_per_master', type=int, default=0, parser.add_argument(
help='How many replicas for each master') "--action",
parser.add_argument('--first_port', type=int, default="",
default=7001, help="First master's port") help="Which action to take? create: start a new instance, move=move slots",
parser.add_argument('--threads', type=int, default=2, )
help="Threads per node") parser.add_argument(
parser.add_argument('--slot_start', type=int, "--num_masters", type=int, default=3, help="Number of master nodes in cluster"
default=0, help="First slot to move (inclusive)") )
parser.add_argument('--slot_end', type=int, parser.add_argument(
default=100, help="Last slot to move (inclusive)") "--replicas_per_master", type=int, default=0, help="How many replicas for each master"
parser.add_argument('--target_port', type=int, default=0, )
help="Master port to take ownership over slots in range " parser.add_argument("--first_port", type=int, default=7001, help="First master's port")
"[--slot_start, --slot_end]") parser.add_argument("--threads", type=int, default=2, help="Threads per node")
parser.add_argument("--slot_start", type=int, default=0, help="First slot to move (inclusive)")
parser.add_argument("--slot_end", type=int, default=100, help="Last slot to move (inclusive)")
parser.add_argument(
"--target_port",
type=int,
default=0,
help="Master port to take ownership over slots in range " "[--slot_start, --slot_end]",
)
args = parser.parse_args() args = parser.parse_args()
actions = {'create': create, 'shutdown': shutdown, actions = {
'move': move, 'print': print_config} "create": create,
"shutdown": shutdown,
"move": move,
"print": print_config,
"migrate": migrate,
}
action = actions.get(args.action.lower()) action = actions.get(args.action.lower())
if (action): if action:
action(args) action(args)
else: else:
print(f'Error - unknown action "{args.action}"') print(f'Error - unknown action "{args.action}"')