Annotate check-mirroring.

This commit is contained in:
Tim Abbott 2016-09-10 10:56:23 -07:00
parent 05191181d9
commit 534774bd7e

View file

@ -11,6 +11,8 @@ import subprocess
import hashlib import hashlib
from six.moves import range from six.moves import range
if False: from typing import Any, Dict, List, Set, Tuple
parser = optparse.OptionParser() parser = optparse.OptionParser()
parser.add_option('--verbose', parser.add_option('--verbose',
dest='verbose', dest='verbose',
@ -91,7 +93,7 @@ if options.sharded:
for (stream, test) in test_streams: for (stream, test) in test_streams:
if stream == "message": if stream == "message":
continue continue
assert(hashlib.sha1(stream).hexdigest().startswith(test)) assert(hashlib.sha1(stream.encode("utf-8")).hexdigest().startswith(test))
else: else:
test_streams = [ test_streams = [
("message", "p"), ("message", "p"),
@ -99,6 +101,8 @@ else:
] ]
def print_status_and_exit(status): def print_status_and_exit(status):
# type: (int) -> None
# The output of this script is used by Nagios. Various outputs, # The output of this script is used by Nagios. Various outputs,
# e.g. true success and punting due to a SERVNAK, result in a # e.g. true success and punting due to a SERVNAK, result in a
# non-alert case, so to give us something unambiguous to check in # non-alert case, so to give us something unambiguous to check in
@ -107,15 +111,17 @@ def print_status_and_exit(status):
sys.exit(status) sys.exit(status)
def send_zulip(message): def send_zulip(message):
# type: (Dict[str, str]) -> None
result = zulip_client.send_message(message) result = zulip_client.send_message(message)
if result["result"] != "success": if result["result"] != "success":
logger.error("Error sending zulip, args were:") logger.error("Error sending zulip, args were:")
logger.error(message) logger.error(message) # type: ignore # https://github.com/python/typeshed/issues/532
logger.error(result) logger.error(result)
print_status_and_exit(1) print_status_and_exit(1)
# Returns True if and only if we "Detected server failure" sending the zephyr. # Returns True if and only if we "Detected server failure" sending the zephyr.
def send_zephyr(zwrite_args, content): def send_zephyr(zwrite_args, content):
# type: (List[str], str) -> bool
p = subprocess.Popen(zwrite_args, stdin=subprocess.PIPE, p = subprocess.Popen(zwrite_args, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate(input=content.encode("utf-8")) stdout, stderr = p.communicate(input=content.encode("utf-8"))
@ -167,7 +173,7 @@ for tries in range(10):
actually_subscribed = True actually_subscribed = True
break break
except IOError as e: except IOError as e:
if "SERVNAK received" in e: if "SERVNAK received" in e: # type: ignore # https://github.com/python/mypy/issues/2118
logger.error("SERVNAK repeatedly received, punting rest of test") logger.error("SERVNAK repeatedly received, punting rest of test")
else: else:
logger.exception("Exception subscribing to zephyrs") logger.exception("Exception subscribing to zephyrs")
@ -177,9 +183,10 @@ if not actually_subscribed:
print_status_and_exit(1) print_status_and_exit(1)
# Prepare keys # Prepare keys
zhkeys = {} zhkeys = {} # type: Dict[str, Tuple[str, str]]
hzkeys = {} hzkeys = {} # type: Dict[str, Tuple[str, str]]
def gen_key(key_dict): def gen_key(key_dict):
# type: (Dict[str, Any]) -> str
bits = str(random.getrandbits(32)) bits = str(random.getrandbits(32))
while bits in key_dict: while bits in key_dict:
# Avoid the unlikely event that we get the same bits twice # Avoid the unlikely event that we get the same bits twice
@ -187,6 +194,7 @@ def gen_key(key_dict):
return bits return bits
def gen_keys(key_dict): def gen_keys(key_dict):
# type: (Dict[str, Tuple[str, str]]) -> None
for (stream, test) in test_streams: for (stream, test) in test_streams:
key_dict[gen_key(key_dict)] = (stream, test) key_dict[gen_key(key_dict)] = (stream, test)
@ -199,6 +207,7 @@ notices = []
# receive queue with 30+ messages, which might result in messages # receive queue with 30+ messages, which might result in messages
# being dropped. # being dropped.
def receive_zephyrs(): def receive_zephyrs():
# type: () -> None
while True: while True:
try: try:
notice = zephyr.receive(block=False) notice = zephyr.receive(block=False)
@ -281,10 +290,12 @@ logger.info("Finished receiving Zephyr messages!")
all_keys = set(list(zhkeys.keys()) + list(hzkeys.keys())) all_keys = set(list(zhkeys.keys()) + list(hzkeys.keys()))
def process_keys(content_list): def process_keys(content_list):
# type: (List[str]) -> Tuple[Dict[str, int], Set[str], Set[str], bool, bool]
# Start by filtering out any keys that might have come from # Start by filtering out any keys that might have come from
# concurrent check-mirroring processes # concurrent check-mirroring processes
content_keys = [key for key in content_list if key in all_keys] content_keys = [key for key in content_list if key in all_keys]
key_counts = {} key_counts = {} # type: Dict[str, int]
for key in all_keys: for key in all_keys:
key_counts[key] = 0 key_counts[key] = 0
for key in content_keys: for key in content_keys: