From 02586f1d348dbcab3bc8d30b805fffd48a524e18 Mon Sep 17 00:00:00 2001 From: Anders Kaseorg Date: Wed, 25 Jan 2023 16:53:09 -0800 Subject: [PATCH] zephyr_mirror: Port sharding wrapper to asyncio. Signed-off-by: Anders Kaseorg --- zulip/integrations/zephyr/zephyr_mirror.py | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/zulip/integrations/zephyr/zephyr_mirror.py b/zulip/integrations/zephyr/zephyr_mirror.py index d4450b4..2bb0b99 100755 --- a/zulip/integrations/zephyr/zephyr_mirror.py +++ b/zulip/integrations/zephyr/zephyr_mirror.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 +import asyncio import os import signal import subprocess @@ -32,21 +33,22 @@ if options.sync_subscriptions: sys.exit(0) if options.forward_class_messages and not options.noshard: - # Needed to get access to zephyr.lib.parallel - sys.path.append("/home/zulip/zulip") if options.on_startup_command is not None: subprocess.call([options.on_startup_command]) - from zerver.lib.parallel import run_parallel print("Starting parallel zephyr class mirroring bot") - jobs = list("0123456789abcdef") + shards = list("0123456789abcdef") - def run_job(shard: str) -> int: - subprocess.call(args + [f"--shard={shard}"]) - return 0 + async def run_shard(shard: str) -> int: + process = await asyncio.create_subprocess_exec(*args, f"--shard={shard}") + return await process.wait() - for (status, job) in run_parallel(run_job, jobs, threads=16): - print("A mirroring shard died!") + async def run_shards(): + for coro in asyncio.as_completed(map(run_shard, shards)): + await coro + print("A mirroring shard died!") + + asyncio.run(run_shards()) sys.exit(0) backoff = RandomExponentialBackoff(timeout_success_equivalent=300)