From 6aa82571020d5e7f6097ac6421d4fd70abcfa861 Mon Sep 17 00:00:00 2001 From: Tim Abbott Date: Tue, 18 Dec 2012 10:54:39 -0500 Subject: [PATCH] check-mirroring: Fix spurious double-sent messages due to ACK failures. (imported from commit 9010bd08779ce3bf9ce1dca761910be504b6463a) --- bots/check-mirroring | 37 +++++++++++++++++++++++++++---------- 1 file changed, 27 insertions(+), 10 deletions(-) diff --git a/bots/check-mirroring b/bots/check-mirroring index 77e799a..c821c90 100755 --- a/bots/check-mirroring +++ b/bots/check-mirroring @@ -111,20 +111,21 @@ def send_humbug(message): logger.error(result) print_status_and_exit(1) -def send_zephyr(zwrite_args, content, retry=False): +# Returns True if and only if we "Detected server failure" sending the zephyr. +def send_zephyr(zwrite_args, content): p = subprocess.Popen(zwrite_args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = p.communicate(input=content.encode("utf-8")) if p.returncode != 0: - if not retry and "Detected server failure while receiving acknowledgement for" in stdout: + if "Detected server failure while receiving acknowledgement for" in stdout: logger.warning("Got server failure error sending zephyr; retrying") logger.warning(stderr) - # Retry sending the message rather than bailing. - return send_zephyr(zwrite_args, content, True) + return True logger.error("Error sending zephyr:") logger.info(stdout) logger.error(stderr) print_status_and_exit(1) + return False # Subscribe to Humbugs try: @@ -168,13 +169,16 @@ for tries in xrange(10): # Prepare keys zhkeys = {} hzkeys = {} +def gen_key(key_dict): + bits = str(random.getrandbits(32)) + while bits in key_dict: + # Avoid the unlikely event that we get the same bits twice + bits = str(random.getrandbits(32)) + return bits + def gen_keys(key_dict): for (stream, test) in test_streams: - bits = str(random.getrandbits(32)) - while bits in key_dict: - # Avoid the unlikely event that we get the same bits twice - bits = str(random.getrandbits(32)) - key_dict[bits] = (stream, test) + key_dict[gen_key(key_dict)] = (stream, test) gen_keys(zhkeys) gen_keys(hzkeys) @@ -205,7 +209,20 @@ for key, (stream, test) in zhkeys.items(): zwrite_args = ["zwrite", "-n", "-s", zsig, mit_user] else: zwrite_args = ["zwrite", "-n", "-s", zsig, "-c", stream, "-i", "test"] - send_zephyr(zwrite_args, str(key)) + server_failure = send_zephyr(zwrite_args, str(key)) + if server_failure: + # Replace the key we're not sure was delivered with a new key + value = zhkeys.pop(key) + new_key = gen_key(zhkeys) + zhkeys[new_key] = value + server_failure_again = send_zephyr(zwrite_args, str(new_key)) + if server_failure_again: + logging.error("Zephyr server failure twice in a row on keys %s and %s! Aborting." % + (key, new_key)) + print_status_and_exit(1) + else: + logging.warning("Replaced key %s with %s due to Zephyr server failure." % + (key, new_key)) receive_zephyrs() logger.info("Sent Zephyr messages!")