From 67ee231ff934f1f98e18b44f1651885cb33d3da8 Mon Sep 17 00:00:00 2001 From: Tim Abbott Date: Fri, 21 Dec 2012 11:59:41 -0500 Subject: [PATCH] zmirror: Add backoff-retry code for the Zephyr=>Humbug initialization. (imported from commit bde30d87d6dae75bbfdcd49b30a1f08b3c585ec7) --- bots/zephyr_mirror_backend.py | 86 +++++++++++++++++++++++++++-------- 1 file changed, 68 insertions(+), 18 deletions(-) diff --git a/bots/zephyr_mirror_backend.py b/bots/zephyr_mirror_backend.py index e98194c..76efc0e 100755 --- a/bots/zephyr_mirror_backend.py +++ b/bots/zephyr_mirror_backend.py @@ -34,6 +34,37 @@ import signal import logging import hashlib import tempfile +import random + +class CountingBackoff(object): + def __init__(self, maximum_retries=10): + self.number_of_retries = 0 + self.maximum_retries = maximum_retries + + def keep_going(self): + return self.number_of_retries < self.maximum_retries + + def succeed(self): + self.number_of_retries = 0 + + def fail(self): + self.number_of_retries = min(self.number_of_retries + 1, + self.maximum_retries) + +class RandomExponentialBackoff(CountingBackoff): + def fail(self): + self.number_of_retries = min(self.number_of_retries + 1, + self.maximum_retries) + # Exponential growth with ratio sqrt(2); compute random delay + # between x and 2x where x is growing exponentially + delay_scale = int(2 ** (self.number_of_retries / 2.0 - 1)) + 1 + delay = delay_scale + random.randint(1, delay_scale) + message = "Sleeping for %ss [max %s] before retrying." % (delay, delay_scale * 2) + try: + logger.warning(message) + except NameError: + print message + time.sleep(delay) DEFAULT_SITE = "https://humbughq.com" @@ -199,18 +230,21 @@ def update_subscriptions(): if len(classes_to_subscribe) > 0: zephyr_bulk_subscribe(list(classes_to_subscribe)) +def maybe_kill_child(): + try: + if child_pid is not None: + os.kill(child_pid, signal.SIGTERM) + except OSError: + # We don't care if the child process no longer exists, so just log the error + logger.exception("") + def maybe_restart_mirroring_script(): if os.stat(os.path.join(options.root_path, "stamps", "restart_stamp")).st_mtime > start_time or \ ((options.user == "tabbott" or options.user == "tabbott/extra") and os.stat(os.path.join(options.root_path, "stamps", "tabbott_stamp")).st_mtime > start_time): logger.warning("") logger.warning("zephyr mirroring script has been updated; restarting...") - try: - if child_pid is not None: - os.kill(child_pid, signal.SIGTERM) - except OSError: - # We don't care if the child process no longer exists, so just log the error - logger.exception("") + maybe_kill_child() try: zephyr._z.cancelSubs() except IOError: @@ -358,18 +392,43 @@ def decode_unicode_byte_strings(zeph): zeph[field] = decoded return zeph +def quit_failed_initialization(message): + logger.error(message) + maybe_kill_child() + sys.exit(1) + +def zephyr_init_autoretry(): + backoff = RandomExponentialBackoff() + while backoff.keep_going(): + try: + # zephyr.init() tries to clear old subscriptions, and thus + # sometimes gets a SERVNAK from the server + zephyr.init() + backoff.succeed() + return + except IOError: + logger.exception("Error initializing Zephyr library (retrying). Traceback:") + backoff.fail() + + quit_failed_initialization("Could not initialize Zephyr library, quitting!") + def zephyr_subscribe_autoretry(sub): - while True: + backoff = RandomExponentialBackoff() + while backoff.keep_going(): try: zephyr.Subscriptions().add(sub) + backoff.succeed() return except IOError: # Probably a SERVNAK from the zephyr server, but log the # traceback just in case it's something else logger.exception("Error subscribing to personals (retrying). Traceback:") - time.sleep(1) + backoff.fail() + + quit_failed_initialization("Could not subscribe to personals, quitting!") def zephyr_to_humbug(options): + zephyr_init_autoretry() if options.forward_class_messages: update_subscriptions() if options.forward_personals: @@ -404,7 +463,7 @@ def zephyr_to_humbug(options): logger.exception("Could not send saved zephyr:") time.sleep(2) - logger.info("Starting receive loop.") + logger.info("Successfully initialized; Starting receive loop.") if options.log_path is not None: with open(options.log_path, 'a') as log: @@ -898,15 +957,6 @@ or specify the --api-key-file option.""" % (options.api_key_file,)))) CURRENT_STATE = States.ZephyrToHumbug import zephyr - while True: - try: - # zephyr.init() tries to clear old subscriptions, and thus - # sometimes gets a SERVNAK from the server - zephyr.init() - break - except IOError: - logger.exception("") - time.sleep(1) logger_name = "zephyr=>humbug" if options.shard is not None: logger_name += "(%s)" % (options.shard,)