zephyr_mirror: Run the class mirror in 16 parallel shards.

(imported from commit cb17ef999e94cc57d5b01114479728b04475c9c4)
This commit is contained in:
Tim Abbott 2012-11-27 10:27:02 -05:00
parent bfec56661e
commit 305a8ce569
2 changed files with 35 additions and 7 deletions

View file

@ -39,6 +39,19 @@ if options.sync_subscriptions:
subprocess.call(args) subprocess.call(args)
sys.exit(0) sys.exit(0)
if options.forward_class_messages:
sys.path.append("/home/humbug/humbug")
from zephyr.lib.parallel import run_parallel
print "Starting parallel zephyr class mirroring bot"
jobs = list("0123456789abcdef")
def run_job(shard):
subprocess.call(args + ["--shard=%s" % (shard,)])
return 0
for (status, job) in run_parallel(run_job, jobs, threads=16):
print "A mirroring shard died!"
pass
sys.exit(0)
while True: while True:
print "Starting zephyr mirroring bot" print "Starting zephyr mirroring bot"
try: try:

View file

@ -33,6 +33,7 @@ import datetime
import textwrap import textwrap
import signal import signal
import logging import logging
import hashlib
DEFAULT_SITE = "https://humbughq.com" DEFAULT_SITE = "https://humbughq.com"
@ -166,6 +167,11 @@ def update_subscriptions_from_humbug():
for stream in streams: for stream in streams:
if stream in current_zephyr_subs: if stream in current_zephyr_subs:
continue continue
if (options.shard is not None and
not hashlib.sha1(stream).hexdigest().startswith(options.shard)):
# This stream is being handled by a different zephyr_mirror job.
continue
streams_to_subscribe.append((stream.encode("utf-8"), "*", "*")) streams_to_subscribe.append((stream.encode("utf-8"), "*", "*"))
if len(streams_to_subscribe) > 0: if len(streams_to_subscribe) > 0:
zephyr_bulk_subscribe(streams_to_subscribe) zephyr_bulk_subscribe(streams_to_subscribe)
@ -672,6 +678,8 @@ def parse_args():
default=False, default=False,
help=optparse.SUPPRESS_HELP, help=optparse.SUPPRESS_HELP,
action='store_true') action='store_true')
parser.add_option('--shard',
help=optparse.SUPPRESS_HELP)
parser.add_option('--resend-log', parser.add_option('--resend-log',
dest='resend_log_path', dest='resend_log_path',
help=optparse.SUPPRESS_HELP) help=optparse.SUPPRESS_HELP)
@ -757,15 +765,13 @@ or specify the --api-key-file option.""" % (options.api_key_file,)))
add_humbug_subscriptions(True) add_humbug_subscriptions(True)
sys.exit(0) sys.exit(0)
if options.forward_from_humbug:
print "This option is obsolete."
sys.exit(0)
# First check that there are no other bots running # First check that there are no other bots running
cmdline = " ".join(sys.argv)
bot_name = "zephyr_mirror_backend.py" bot_name = "zephyr_mirror_backend.py"
if not options.test_mode: if not options.test_mode:
proc = subprocess.Popen(['pgrep', '-U', os.environ["USER"], "-f", bot_name], pgrep_query = bot_name
if options.shard is not None:
pgrep_query = "%s.*--shard=%s" % (bot_name, options.shard)
proc = subprocess.Popen(['pgrep', '-U', os.environ["USER"], "-f", pgrep_query],
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE) stderr=subprocess.PIPE)
out, _err_unused = proc.communicate() out, _err_unused = proc.communicate()
@ -779,6 +785,12 @@ or specify the --api-key-file option.""" % (options.api_key_file,)))
# We don't care if the child process no longer exists, so just print the error # We don't care if the child process no longer exists, so just print the error
traceback.print_exc() traceback.print_exc()
if options.shard is not None and set(options.shard) != set("a"):
# The shard that is all "a"s is the one that handles personals
# forwarding and humbug => zephyr forwarding
options.forward_personals = False
options.forward_from_humbug = False
if options.forward_from_humbug: if options.forward_from_humbug:
child_pid = os.fork() child_pid = os.fork()
if child_pid == 0: if child_pid == 0:
@ -800,7 +812,10 @@ or specify the --api-key-file option.""" % (options.api_key_file,)))
except IOError: except IOError:
traceback.print_exc() traceback.print_exc()
time.sleep(1) time.sleep(1)
logger = configure_logger("zephyr=>humbug") logger_name = "zephyr=>humbug"
if options.shard is not None:
logger_name += "(%s)" % (options.shard,)
logger = configure_logger(logger_name)
# Have the kernel reap children for when we fork off processes to send Humbugs # Have the kernel reap children for when we fork off processes to send Humbugs
signal.signal(signal.SIGCHLD, signal.SIG_IGN) signal.signal(signal.SIGCHLD, signal.SIG_IGN)
zephyr_to_humbug(options) zephyr_to_humbug(options)