check-mirroring: Fix spurious double-sent messages due to ACK failures.
(imported from commit 9010bd08779ce3bf9ce1dca761910be504b6463a)
This commit is contained in:
parent
a3fba04fdd
commit
6aa8257102
|
@ -111,20 +111,21 @@ def send_humbug(message):
|
||||||
logger.error(result)
|
logger.error(result)
|
||||||
print_status_and_exit(1)
|
print_status_and_exit(1)
|
||||||
|
|
||||||
def send_zephyr(zwrite_args, content, retry=False):
|
# Returns True if and only if we "Detected server failure" sending the zephyr.
|
||||||
|
def send_zephyr(zwrite_args, content):
|
||||||
p = subprocess.Popen(zwrite_args, stdin=subprocess.PIPE,
|
p = subprocess.Popen(zwrite_args, stdin=subprocess.PIPE,
|
||||||
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||||
stdout, stderr = p.communicate(input=content.encode("utf-8"))
|
stdout, stderr = p.communicate(input=content.encode("utf-8"))
|
||||||
if p.returncode != 0:
|
if p.returncode != 0:
|
||||||
if not retry and "Detected server failure while receiving acknowledgement for" in stdout:
|
if "Detected server failure while receiving acknowledgement for" in stdout:
|
||||||
logger.warning("Got server failure error sending zephyr; retrying")
|
logger.warning("Got server failure error sending zephyr; retrying")
|
||||||
logger.warning(stderr)
|
logger.warning(stderr)
|
||||||
# Retry sending the message rather than bailing.
|
return True
|
||||||
return send_zephyr(zwrite_args, content, True)
|
|
||||||
logger.error("Error sending zephyr:")
|
logger.error("Error sending zephyr:")
|
||||||
logger.info(stdout)
|
logger.info(stdout)
|
||||||
logger.error(stderr)
|
logger.error(stderr)
|
||||||
print_status_and_exit(1)
|
print_status_and_exit(1)
|
||||||
|
return False
|
||||||
|
|
||||||
# Subscribe to Humbugs
|
# Subscribe to Humbugs
|
||||||
try:
|
try:
|
||||||
|
@ -168,13 +169,16 @@ for tries in xrange(10):
|
||||||
# Prepare keys
|
# Prepare keys
|
||||||
zhkeys = {}
|
zhkeys = {}
|
||||||
hzkeys = {}
|
hzkeys = {}
|
||||||
def gen_keys(key_dict):
|
def gen_key(key_dict):
|
||||||
for (stream, test) in test_streams:
|
|
||||||
bits = str(random.getrandbits(32))
|
bits = str(random.getrandbits(32))
|
||||||
while bits in key_dict:
|
while bits in key_dict:
|
||||||
# Avoid the unlikely event that we get the same bits twice
|
# Avoid the unlikely event that we get the same bits twice
|
||||||
bits = str(random.getrandbits(32))
|
bits = str(random.getrandbits(32))
|
||||||
key_dict[bits] = (stream, test)
|
return bits
|
||||||
|
|
||||||
|
def gen_keys(key_dict):
|
||||||
|
for (stream, test) in test_streams:
|
||||||
|
key_dict[gen_key(key_dict)] = (stream, test)
|
||||||
|
|
||||||
gen_keys(zhkeys)
|
gen_keys(zhkeys)
|
||||||
gen_keys(hzkeys)
|
gen_keys(hzkeys)
|
||||||
|
@ -205,7 +209,20 @@ for key, (stream, test) in zhkeys.items():
|
||||||
zwrite_args = ["zwrite", "-n", "-s", zsig, mit_user]
|
zwrite_args = ["zwrite", "-n", "-s", zsig, mit_user]
|
||||||
else:
|
else:
|
||||||
zwrite_args = ["zwrite", "-n", "-s", zsig, "-c", stream, "-i", "test"]
|
zwrite_args = ["zwrite", "-n", "-s", zsig, "-c", stream, "-i", "test"]
|
||||||
send_zephyr(zwrite_args, str(key))
|
server_failure = send_zephyr(zwrite_args, str(key))
|
||||||
|
if server_failure:
|
||||||
|
# Replace the key we're not sure was delivered with a new key
|
||||||
|
value = zhkeys.pop(key)
|
||||||
|
new_key = gen_key(zhkeys)
|
||||||
|
zhkeys[new_key] = value
|
||||||
|
server_failure_again = send_zephyr(zwrite_args, str(new_key))
|
||||||
|
if server_failure_again:
|
||||||
|
logging.error("Zephyr server failure twice in a row on keys %s and %s! Aborting." %
|
||||||
|
(key, new_key))
|
||||||
|
print_status_and_exit(1)
|
||||||
|
else:
|
||||||
|
logging.warning("Replaced key %s with %s due to Zephyr server failure." %
|
||||||
|
(key, new_key))
|
||||||
|
|
||||||
receive_zephyrs()
|
receive_zephyrs()
|
||||||
logger.info("Sent Zephyr messages!")
|
logger.info("Sent Zephyr messages!")
|
||||||
|
|
Loading…
Reference in a new issue