zmirror: Add backoff-retry code for the Zephyr=>Humbug initialization.
(imported from commit bde30d87d6dae75bbfdcd49b30a1f08b3c585ec7)
This commit is contained in:
parent
805e508d02
commit
67ee231ff9
|
@ -34,6 +34,37 @@ import signal
|
|||
import logging
|
||||
import hashlib
|
||||
import tempfile
|
||||
import random
|
||||
|
||||
class CountingBackoff(object):
|
||||
def __init__(self, maximum_retries=10):
|
||||
self.number_of_retries = 0
|
||||
self.maximum_retries = maximum_retries
|
||||
|
||||
def keep_going(self):
|
||||
return self.number_of_retries < self.maximum_retries
|
||||
|
||||
def succeed(self):
|
||||
self.number_of_retries = 0
|
||||
|
||||
def fail(self):
|
||||
self.number_of_retries = min(self.number_of_retries + 1,
|
||||
self.maximum_retries)
|
||||
|
||||
class RandomExponentialBackoff(CountingBackoff):
|
||||
def fail(self):
|
||||
self.number_of_retries = min(self.number_of_retries + 1,
|
||||
self.maximum_retries)
|
||||
# Exponential growth with ratio sqrt(2); compute random delay
|
||||
# between x and 2x where x is growing exponentially
|
||||
delay_scale = int(2 ** (self.number_of_retries / 2.0 - 1)) + 1
|
||||
delay = delay_scale + random.randint(1, delay_scale)
|
||||
message = "Sleeping for %ss [max %s] before retrying." % (delay, delay_scale * 2)
|
||||
try:
|
||||
logger.warning(message)
|
||||
except NameError:
|
||||
print message
|
||||
time.sleep(delay)
|
||||
|
||||
DEFAULT_SITE = "https://humbughq.com"
|
||||
|
||||
|
@ -199,18 +230,21 @@ def update_subscriptions():
|
|||
if len(classes_to_subscribe) > 0:
|
||||
zephyr_bulk_subscribe(list(classes_to_subscribe))
|
||||
|
||||
def maybe_kill_child():
|
||||
try:
|
||||
if child_pid is not None:
|
||||
os.kill(child_pid, signal.SIGTERM)
|
||||
except OSError:
|
||||
# We don't care if the child process no longer exists, so just log the error
|
||||
logger.exception("")
|
||||
|
||||
def maybe_restart_mirroring_script():
|
||||
if os.stat(os.path.join(options.root_path, "stamps", "restart_stamp")).st_mtime > start_time or \
|
||||
((options.user == "tabbott" or options.user == "tabbott/extra") and
|
||||
os.stat(os.path.join(options.root_path, "stamps", "tabbott_stamp")).st_mtime > start_time):
|
||||
logger.warning("")
|
||||
logger.warning("zephyr mirroring script has been updated; restarting...")
|
||||
try:
|
||||
if child_pid is not None:
|
||||
os.kill(child_pid, signal.SIGTERM)
|
||||
except OSError:
|
||||
# We don't care if the child process no longer exists, so just log the error
|
||||
logger.exception("")
|
||||
maybe_kill_child()
|
||||
try:
|
||||
zephyr._z.cancelSubs()
|
||||
except IOError:
|
||||
|
@ -358,18 +392,43 @@ def decode_unicode_byte_strings(zeph):
|
|||
zeph[field] = decoded
|
||||
return zeph
|
||||
|
||||
def quit_failed_initialization(message):
|
||||
logger.error(message)
|
||||
maybe_kill_child()
|
||||
sys.exit(1)
|
||||
|
||||
def zephyr_init_autoretry():
|
||||
backoff = RandomExponentialBackoff()
|
||||
while backoff.keep_going():
|
||||
try:
|
||||
# zephyr.init() tries to clear old subscriptions, and thus
|
||||
# sometimes gets a SERVNAK from the server
|
||||
zephyr.init()
|
||||
backoff.succeed()
|
||||
return
|
||||
except IOError:
|
||||
logger.exception("Error initializing Zephyr library (retrying). Traceback:")
|
||||
backoff.fail()
|
||||
|
||||
quit_failed_initialization("Could not initialize Zephyr library, quitting!")
|
||||
|
||||
def zephyr_subscribe_autoretry(sub):
|
||||
while True:
|
||||
backoff = RandomExponentialBackoff()
|
||||
while backoff.keep_going():
|
||||
try:
|
||||
zephyr.Subscriptions().add(sub)
|
||||
backoff.succeed()
|
||||
return
|
||||
except IOError:
|
||||
# Probably a SERVNAK from the zephyr server, but log the
|
||||
# traceback just in case it's something else
|
||||
logger.exception("Error subscribing to personals (retrying). Traceback:")
|
||||
time.sleep(1)
|
||||
backoff.fail()
|
||||
|
||||
quit_failed_initialization("Could not subscribe to personals, quitting!")
|
||||
|
||||
def zephyr_to_humbug(options):
|
||||
zephyr_init_autoretry()
|
||||
if options.forward_class_messages:
|
||||
update_subscriptions()
|
||||
if options.forward_personals:
|
||||
|
@ -404,7 +463,7 @@ def zephyr_to_humbug(options):
|
|||
logger.exception("Could not send saved zephyr:")
|
||||
time.sleep(2)
|
||||
|
||||
logger.info("Starting receive loop.")
|
||||
logger.info("Successfully initialized; Starting receive loop.")
|
||||
|
||||
if options.log_path is not None:
|
||||
with open(options.log_path, 'a') as log:
|
||||
|
@ -898,15 +957,6 @@ or specify the --api-key-file option.""" % (options.api_key_file,))))
|
|||
CURRENT_STATE = States.ZephyrToHumbug
|
||||
|
||||
import zephyr
|
||||
while True:
|
||||
try:
|
||||
# zephyr.init() tries to clear old subscriptions, and thus
|
||||
# sometimes gets a SERVNAK from the server
|
||||
zephyr.init()
|
||||
break
|
||||
except IOError:
|
||||
logger.exception("")
|
||||
time.sleep(1)
|
||||
logger_name = "zephyr=>humbug"
|
||||
if options.shard is not None:
|
||||
logger_name += "(%s)" % (options.shard,)
|
||||
|
|
Loading…
Reference in a new issue