zephyr: Use exponential backoffs in retry loops.
This reduces the number of retries that might spam APIs. There is some complexity here which is left un-managed -- for instance, maybe_restart_mirroring_script does a number of restart attempts, and then fails, but will be retried every 15s by the surrounding `process_loop`. Previously, it would merely have looped forever inside maybe_restart_mirroring_script. Three loops are intentionally left as infinite `while True` loops, that merely cap their backoff at the default 90s. Their callers do not expect, or have any way to handle more gracefully, a failure of the expected-infinite-loop in `process_loop` or `zulip_to_zephyr`. They maintain their previous behavior of retrying forever, albeit more slowly.
This commit is contained in:
parent
8670cce8e9
commit
a20c9cc6d7
|
@ -17,6 +17,8 @@ import hashlib
|
||||||
import tempfile
|
import tempfile
|
||||||
import select
|
import select
|
||||||
|
|
||||||
|
from zulip import RandomExponentialBackoff
|
||||||
|
|
||||||
DEFAULT_SITE = "https://api.zulip.com"
|
DEFAULT_SITE = "https://api.zulip.com"
|
||||||
|
|
||||||
class States:
|
class States:
|
||||||
|
@ -218,32 +220,41 @@ def maybe_restart_mirroring_script() -> None:
|
||||||
except OSError:
|
except OSError:
|
||||||
# We don't care whether we failed to cancel subs properly, but we should log it
|
# We don't care whether we failed to cancel subs properly, but we should log it
|
||||||
logger.exception("")
|
logger.exception("")
|
||||||
while True:
|
backoff = RandomExponentialBackoff(
|
||||||
|
maximum_retries=3,
|
||||||
|
)
|
||||||
|
while backoff.keep_going():
|
||||||
try:
|
try:
|
||||||
os.execvp(os.path.abspath(__file__), sys.argv)
|
os.execvp(os.path.abspath(__file__), sys.argv)
|
||||||
|
# No need for backoff.succeed, since this can't be reached
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception("Error restarting mirroring script; trying again... Traceback:")
|
logger.exception("Error restarting mirroring script; trying again... Traceback:")
|
||||||
time.sleep(1)
|
backoff.fail()
|
||||||
|
raise Exception("Failed to reload too many times, aborting!")
|
||||||
|
|
||||||
def process_loop(log: Optional[IO[Any]]) -> None:
|
def process_loop(log: Optional[IO[Any]]) -> None:
|
||||||
restart_check_count = 0
|
restart_check_count = 0
|
||||||
last_check_time = time.time()
|
last_check_time = time.time()
|
||||||
|
recieve_backoff = RandomExponentialBackoff()
|
||||||
while True:
|
while True:
|
||||||
select.select([zephyr._z.getFD()], [], [], 15)
|
select.select([zephyr._z.getFD()], [], [], 15)
|
||||||
try:
|
try:
|
||||||
|
process_backoff = RandomExponentialBackoff()
|
||||||
# Fetch notices from the queue until its empty
|
# Fetch notices from the queue until its empty
|
||||||
while True:
|
while True:
|
||||||
notice = zephyr.receive(block=False)
|
notice = zephyr.receive(block=False)
|
||||||
|
recieve_backoff.succeed()
|
||||||
if notice is None:
|
if notice is None:
|
||||||
break
|
break
|
||||||
try:
|
try:
|
||||||
process_notice(notice, log)
|
process_notice(notice, log)
|
||||||
|
process_backoff.succeed()
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception("Error relaying zephyr:")
|
logger.exception("Error relaying zephyr:")
|
||||||
time.sleep(2)
|
process_backoff.fail()
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception("Error checking for new zephyrs:")
|
logger.exception("Error checking for new zephyrs:")
|
||||||
time.sleep(1)
|
recieve_backoff.fail()
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if time.time() - last_check_time > 15:
|
if time.time() - last_check_time > 15:
|
||||||
|
@ -759,12 +770,13 @@ def maybe_forward_to_zephyr(message: Dict[str, Any]) -> None:
|
||||||
def zulip_to_zephyr(options: int) -> None:
|
def zulip_to_zephyr(options: int) -> None:
|
||||||
# Sync messages from zulip to zephyr
|
# Sync messages from zulip to zephyr
|
||||||
logger.info("Starting syncing messages.")
|
logger.info("Starting syncing messages.")
|
||||||
|
backoff = RandomExponentialBackoff(timeout_success_equivalent=120)
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
zulip_client.call_on_each_message(maybe_forward_to_zephyr)
|
zulip_client.call_on_each_message(maybe_forward_to_zephyr)
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception("Error syncing messages:")
|
logger.exception("Error syncing messages:")
|
||||||
time.sleep(1)
|
backoff.fail()
|
||||||
|
|
||||||
def subscribed_to_mail_messages() -> bool:
|
def subscribed_to_mail_messages() -> bool:
|
||||||
# In case we have lost our AFS tokens and those won't be able to
|
# In case we have lost our AFS tokens and those won't be able to
|
||||||
|
|
Loading…
Reference in a new issue