From 305a8ce569bfc00a4f5fd9a333851ca854a91e84 Mon Sep 17 00:00:00 2001 From: Tim Abbott Date: Tue, 27 Nov 2012 10:27:02 -0500 Subject: [PATCH] zephyr_mirror: Run the class mirror in 16 parallel shards. (imported from commit cb17ef999e94cc57d5b01114479728b04475c9c4) --- bots/zephyr_mirror.py | 13 +++++++++++++ bots/zephyr_mirror_backend.py | 29 ++++++++++++++++++++++------- 2 files changed, 35 insertions(+), 7 deletions(-) diff --git a/bots/zephyr_mirror.py b/bots/zephyr_mirror.py index a1f78d3..33b1830 100755 --- a/bots/zephyr_mirror.py +++ b/bots/zephyr_mirror.py @@ -39,6 +39,19 @@ if options.sync_subscriptions: subprocess.call(args) sys.exit(0) +if options.forward_class_messages: + sys.path.append("/home/humbug/humbug") + from zephyr.lib.parallel import run_parallel + print "Starting parallel zephyr class mirroring bot" + jobs = list("0123456789abcdef") + def run_job(shard): + subprocess.call(args + ["--shard=%s" % (shard,)]) + return 0 + for (status, job) in run_parallel(run_job, jobs, threads=16): + print "A mirroring shard died!" + pass + sys.exit(0) + while True: print "Starting zephyr mirroring bot" try: diff --git a/bots/zephyr_mirror_backend.py b/bots/zephyr_mirror_backend.py index a9aa208..4e5b02f 100755 --- a/bots/zephyr_mirror_backend.py +++ b/bots/zephyr_mirror_backend.py @@ -33,6 +33,7 @@ import datetime import textwrap import signal import logging +import hashlib DEFAULT_SITE = "https://humbughq.com" @@ -166,6 +167,11 @@ def update_subscriptions_from_humbug(): for stream in streams: if stream in current_zephyr_subs: continue + if (options.shard is not None and + not hashlib.sha1(stream).hexdigest().startswith(options.shard)): + # This stream is being handled by a different zephyr_mirror job. + continue + streams_to_subscribe.append((stream.encode("utf-8"), "*", "*")) if len(streams_to_subscribe) > 0: zephyr_bulk_subscribe(streams_to_subscribe) @@ -672,6 +678,8 @@ def parse_args(): default=False, help=optparse.SUPPRESS_HELP, action='store_true') + parser.add_option('--shard', + help=optparse.SUPPRESS_HELP) parser.add_option('--resend-log', dest='resend_log_path', help=optparse.SUPPRESS_HELP) @@ -757,15 +765,13 @@ or specify the --api-key-file option.""" % (options.api_key_file,))) add_humbug_subscriptions(True) sys.exit(0) - if options.forward_from_humbug: - print "This option is obsolete." - sys.exit(0) - # First check that there are no other bots running - cmdline = " ".join(sys.argv) bot_name = "zephyr_mirror_backend.py" if not options.test_mode: - proc = subprocess.Popen(['pgrep', '-U', os.environ["USER"], "-f", bot_name], + pgrep_query = bot_name + if options.shard is not None: + pgrep_query = "%s.*--shard=%s" % (bot_name, options.shard) + proc = subprocess.Popen(['pgrep', '-U', os.environ["USER"], "-f", pgrep_query], stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, _err_unused = proc.communicate() @@ -779,6 +785,12 @@ or specify the --api-key-file option.""" % (options.api_key_file,))) # We don't care if the child process no longer exists, so just print the error traceback.print_exc() + if options.shard is not None and set(options.shard) != set("a"): + # The shard that is all "a"s is the one that handles personals + # forwarding and humbug => zephyr forwarding + options.forward_personals = False + options.forward_from_humbug = False + if options.forward_from_humbug: child_pid = os.fork() if child_pid == 0: @@ -800,7 +812,10 @@ or specify the --api-key-file option.""" % (options.api_key_file,))) except IOError: traceback.print_exc() time.sleep(1) - logger = configure_logger("zephyr=>humbug") + logger_name = "zephyr=>humbug" + if options.shard is not None: + logger_name += "(%s)" % (options.shard,) + logger = configure_logger(logger_name) # Have the kernel reap children for when we fork off processes to send Humbugs signal.signal(signal.SIGCHLD, signal.SIG_IGN) zephyr_to_humbug(options)