From cee5af1baf4dec051e2889cdb19514a00ccfed65 Mon Sep 17 00:00:00 2001 From: ashotland Date: Thu, 29 Dec 2022 16:01:01 +0200 Subject: [PATCH] feat(tools):cache log player batching all the way optimisation (#617) feat(tools):cache log player batching all the way optimization Signed-off-by: ashotland Signed-off-by: ashotland --- tools/cache_logs_player.py | 70 ++++++++++++++++++++++---------------- 1 file changed, 41 insertions(+), 29 deletions(-) mode change 100644 => 100755 tools/cache_logs_player.py diff --git a/tools/cache_logs_player.py b/tools/cache_logs_player.py old mode 100644 new mode 100755 index a70dde51c..36b511897 --- a/tools/cache_logs_player.py +++ b/tools/cache_logs_player.py @@ -13,7 +13,7 @@ To install: pip install -r requirements.txt class Command: args = None - client_id = 0 + sync_id = 0 # Commands with the same sync_id will be executed synchrnously class TwitterCacheTraceParser: """ @@ -29,7 +29,7 @@ class TwitterCacheTraceParser: ttl = csv[6] cmd = Command() - cmd.client_id = client_id + cmd.sync_id = client_id if operation == "get": cmd.args = ["GET", key] @@ -58,29 +58,20 @@ class TwitterCacheTraceParser: class AsyncWorker: QUEUE_SIZE = 100000 - BATCH_SIZE = 20 def __init__(self, redis_client) -> None: self.queue = asyncio.Queue(self.QUEUE_SIZE) self.redis_client = redis_client self.working = False - async def put(self, cmd: Command) -> None: - await self.queue.put(cmd) + async def put(self, batch: list) -> None: + await self.queue.put(batch) async def work(self) -> None: self.working = True - batch = [] - while self.working or len(batch) > 0 or not self.queue.empty() : - try: - cmd = await asyncio.wait_for(self.queue.get(), timeout=1.0) - batch.append(cmd) - if len(batch) >= self.BATCH_SIZE: - await self.execute(batch) - batch.clear() - except asyncio.exceptions.TimeoutError: - await self.execute(batch) - batch.clear() + while self.working or not self.queue.empty() : + batch = await self.queue.get() + await self.execute(batch) async def execute(self, batch) -> None: async with self.redis_client.pipeline(transaction=False) as pipe: @@ -97,18 +88,18 @@ class AsyncWorker: class AsyncWorkerPool: """ Mangaes worker pool to send commands in parallel - Maintains synchronous order for commands with the same client id + Maintains synchronous order for commands with the same sync_id """ def __init__(self, redis_client, num_workers) -> None: self.redis_client = redis_client self.num_workers = num_workers self.workers = [] self.tasks = [] - self.client_id_to_worker = {} + self.sync_id_to_worker = {} self.next_worker_index = -1 - def allocate(self, client_id) -> AsyncWorker: - if not client_id in self.client_id_to_worker: + def allocate(self, sync_id) -> AsyncWorker: + if not sync_id in self.sync_id_to_worker: self.next_worker_index = (self.next_worker_index + 1) % self.num_workers if len(self.workers) <= self.next_worker_index: @@ -116,13 +107,13 @@ class AsyncWorkerPool: self.workers.append(AsyncWorker(self.redis_client)) self.tasks.append(self.workers[self.next_worker_index].start()) - self.client_id_to_worker[client_id] = self.workers[self.next_worker_index] + self.sync_id_to_worker[sync_id] = self.workers[self.next_worker_index] - return self.client_id_to_worker[client_id] + return self.sync_id_to_worker[sync_id] - async def put(self, cmd: Command) -> None: - worker = self.allocate(cmd.client_id) - await worker.put(cmd) + async def put(self, batch: list, sync_id: int) -> None: + worker = self.allocate(sync_id) + await worker.put(batch) async def stop(self): for worker in self.workers: @@ -131,18 +122,40 @@ class AsyncWorkerPool: class AsyncPlayer: + READ_BATCH_SIZE = 10 * 1000 * 1000 + def __init__(self, redis_uri, num_workers) -> None: self.redis_uri = redis_uri self.redis_client = aioredis.from_url(f"redis://{self.redis_uri}", encoding="utf-8", decode_responses=True) self.worker_pool = AsyncWorkerPool(self.redis_client, 100) + self.batch_by_sync_id = {} + + async def dispatch_batches(self): + for sync_id in self.batch_by_sync_id: + await self.worker_pool.put(self.batch_by_sync_id[sync_id], sync_id) + self.batch_by_sync_id.clear() + async def read_and_dispatch(self, csv_file, parser): print(f"dispatching from {csv_file}") + + line_count = 0 + async with aiofiles.open(csv_file, mode="r", encoding="utf-8", newline="") as afp: async for row in AsyncReader(afp): - await self.worker_pool.put(parser.parse(row)) + cmd = parser.parse(row) + if not self.batch_by_sync_id.get(cmd.sync_id): + self.batch_by_sync_id[cmd.sync_id] = [] + batch = self.batch_by_sync_id[cmd.sync_id] + batch.append(cmd) + line_count = line_count + 1 + if (line_count >= self.READ_BATCH_SIZE): + await self.dispatch_batches() + line_count = 0 + # handle the remaining lines + await self.dispatch_batches() - async def reportStats(self): + async def report_stats(self): while True: info = await self.redis_client.execute_command("info", "stats") print(f"{datetime.now()}: {info}") @@ -154,13 +167,12 @@ class AsyncPlayer: print(await self.redis_client.ping()) read_dispatch_task = asyncio.create_task(self.read_and_dispatch(csv_file, parser)) - stats_task = asyncio.create_task(self.reportStats()) + stats_task = asyncio.create_task(self.report_stats()) await read_dispatch_task print(f"finished reading {csv_file}") await self.worker_pool.stop() - stats_task.cancel() print("all done")