zephyr_mirror: Port sharding wrapper to asyncio.

Signed-off-by: Anders Kaseorg <anders@zulip.com>
This commit is contained in:
Anders Kaseorg 2023-01-25 16:53:09 -08:00
parent 7831d979c9
commit 02586f1d34

View file

@ -1,5 +1,6 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import asyncio
import os import os
import signal import signal
import subprocess import subprocess
@ -32,21 +33,22 @@ if options.sync_subscriptions:
sys.exit(0) sys.exit(0)
if options.forward_class_messages and not options.noshard: if options.forward_class_messages and not options.noshard:
# Needed to get access to zephyr.lib.parallel
sys.path.append("/home/zulip/zulip")
if options.on_startup_command is not None: if options.on_startup_command is not None:
subprocess.call([options.on_startup_command]) subprocess.call([options.on_startup_command])
from zerver.lib.parallel import run_parallel
print("Starting parallel zephyr class mirroring bot") print("Starting parallel zephyr class mirroring bot")
jobs = list("0123456789abcdef") shards = list("0123456789abcdef")
def run_job(shard: str) -> int: async def run_shard(shard: str) -> int:
subprocess.call(args + [f"--shard={shard}"]) process = await asyncio.create_subprocess_exec(*args, f"--shard={shard}")
return 0 return await process.wait()
for (status, job) in run_parallel(run_job, jobs, threads=16): async def run_shards():
print("A mirroring shard died!") for coro in asyncio.as_completed(map(run_shard, shards)):
await coro
print("A mirroring shard died!")
asyncio.run(run_shards())
sys.exit(0) sys.exit(0)
backoff = RandomExponentialBackoff(timeout_success_equivalent=300) backoff = RandomExponentialBackoff(timeout_success_equivalent=300)