From 4bfd7844238c29e787aeb360d21b7aa9247468be Mon Sep 17 00:00:00 2001 From: Tim Abbott Date: Wed, 28 Nov 2012 22:34:49 -0500 Subject: [PATCH] Rewrite check-mirroring to be more robust and flexible. Features include: * Not forking into two processes (shells out to zwrite to send instead). This makes life easier since we're not doing concurrent programming. * Eliminated a lot of hard-to-read or unnecessary debugging output. * Adding explanatory test suggesting the likely problem for some common sets of received messages. * Much less code duplication. * Support for testing a sharded zephyr_mirror script (--sharded). * Use of the logging module to print timestamps -- makes debugging some issues a lot. * Only one sleep, and for only 10 seconds, between sending the outgoing messages and checking that they were received. * Support for running two copies of this script at the same time, so that running it manually doesn't screw up Nagios. * Passed running 100 tests run in a row. (imported from commit a3ec02ac1d1a04972e469ca30fec1790c4fb53bc) --- bots/check-mirroring | 369 +++++++++++++++++++++++++++++-------------- 1 file changed, 250 insertions(+), 119 deletions(-) diff --git a/bots/check-mirroring b/bots/check-mirroring index 0c80c70..39d4322 100755 --- a/bots/check-mirroring +++ b/bots/check-mirroring @@ -4,6 +4,8 @@ import time import optparse import os import random +import logging +import subprocess parser = optparse.OptionParser() parser.add_option('--verbose', @@ -14,6 +16,9 @@ parser.add_option('--site', dest='site', default="https://humbughq.com", action='store') +parser.add_option('--sharded', + default=False, + action='store_true') parser.add_option('--root-path', dest='root_path', default="/home/humbug", @@ -27,11 +32,6 @@ sys.path[:0] = [os.path.join(options.root_path, "python-zephyr"), mit_user = 'tabbott/extra@ATHENA.MIT.EDU' humbug_user = 'tabbott/extra@mit.edu' -hzkey1 = random.getrandbits(32) -hzkey2 = random.getrandbits(32) -zhkey1 = random.getrandbits(32) -zhkey2 = random.getrandbits(32) - sys.path.append(".") sys.path.append(os.path.dirname(os.path.dirname(__file__))) import api.common @@ -41,6 +41,51 @@ humbug_client = api.common.HumbugAPI(email=humbug_user, client="test: Humbug API", site=options.site) +# Configure logging +log_file = "/home/humbug/check-mirroring-log" +log_format = "%(asctime)s: %(message)s" +logging.basicConfig(format=log_format) + +formatter = logging.Formatter(log_format) +file_handler = logging.FileHandler(log_file) +file_handler.setFormatter(formatter) + +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) +logger.addHandler(file_handler) + +# Initialize list of streams to test +if options.sharded: + # NOTE: Streams in this list must be in humbug_user's Humbug + # subscriptions, or we won't receive messages via Humbug. + + # The sharded stream list has a bunch of pairs + # (stream, shard_name), where sha1sum(stream).startswith(shard_name) + test_streams = [ + ("message", "p"), + ("tabbott-nagios-test-15", "0"), + ("tabbott-nagios-test-1", "1"), + ("tabbott-nagios-test-4", "2"), + ("tabbott-nagios-test-32", "3"), + ("tabbott-nagios-test-6", "4"), + ("tabbott-nagios-test-93", "5"), + ("tabbott-nagios-test-23", "6"), + ("tabbott-nagios-test-16", "7"), + ("tabbott-nagios-test-22", "8"), + ("tabbott-nagios-test-3", "9"), + ("tabbott-nagios-test-2", "a"), + ("tabbott-nagios-test-10", "b"), + ("tabbott-nagios-test-14", "c"), + ("tabbott-nagios-test-8", "d"), + ("tabbott-nagios-test-13", "e"), + ("tabbott-nagios-test-45", "f"), + ] +else: + test_streams = [ + ("message", "p"), + ("tabbott-nagios-test", "a"), + ] + def print_status_and_exit(status): # The output of this script is used by Nagios. Various outputs, # e.g. true success and punting due to a SERVNAK, result in a @@ -49,121 +94,207 @@ def print_status_and_exit(status): print status sys.exit(status) -def print_zephyr(notice): - print notice.cls, notice.instance, notice.sender, notice.message.split('\0')[1] - -def print_humbug(message): - if message['type'] == "stream": - print message["type"], message['display_recipient'], message['subject'], \ - message['sender_email'], message['content'] - else: - print message["type"], message['sender_email'], \ - message['display_recipient'], message['content'] - -max_message_id = humbug_client.get_profile()['max_message_id'] - -child_pid = os.fork() -if child_pid == 0: - # Run the humbug => zephyr mirror in the child - time.sleep(5) - result = humbug_client.send_message({ - "type": "private", - "content": str(hzkey1), - "to": humbug_user, - }) - +def send_humbug(message): + result = humbug_client.send_message(message) if result["result"] != "success": - print "key1 send error:" - print result - - result = humbug_client.send_message({ - "type": "stream", - "subject": "test", - "content": str(hzkey2), - "to": "tabbott-nagios-test", - }) - - if result["result"] != "success": - print "key2 send error:" - print result - - if options.verbose: - print "Sent Humbug messages!" - - import zephyr - try: - zephyr.init() - except IOError, e: - if "SERVNAK received" in e: - print "SERVNAK received, punting rest of test" - print_status_and_exit(0) - - zsig = "Timothy Good Abbott" - - zeph = zephyr.ZNotice(sender=mit_user, auth=True, recipient=mit_user, - cls="message", instance="personal") - zeph.setmessage("%s\0%s" % (zsig, zhkey1)) - zeph.send() - - zeph = zephyr.ZNotice(sender=mit_user, auth=True, - cls="tabbott-nagios-test", instance="test") - zeph.setmessage("%s\0%s" % (zsig, zhkey2)) - zeph.send() - if options.verbose: - print "Sent Zephyr messages!" - -else: - failed = False - import zephyr - try: - zephyr.init() - zephyr._z.subAll([('message', 'personal', 'tabbott/extra@ATHENA.MIT.EDU'), - ('tabbott-nagios-test', '*', '*')]) - except IOError, e: - if "SERVNAK received" in e: - print "SERVNAK received, punting rest of test" - print_status_and_exit(0) - - time.sleep(20) - if options.verbose: - print "Receiving messages!" - notices = [] - while True: - notice = zephyr.receive(block=False) - if notice is None: - break - if notice.opcode != "": - continue - notices.append(notice) - if len(notices) != 4: - print "humbug=>zephyr: Got wrong number of messages back!" - failed = True - elif (set(notice.message.split('\0')[1] for notice in notices) != - set([str(hzkey1), str(hzkey2), str(zhkey1), str(zhkey2)])): - print "humbug=>zephyr: Didn't get back right values!" - failed = True - if failed: - for notice in notices: - print_zephyr(notice) - - messages = humbug_client.get_messages({'first': '0', - 'last': str(max_message_id), - 'server_generation': '0'})['messages'] - if len(messages) != 4: - print "zephyr=>humbug: Didn't get exactly 4 messages!" - for message in messages: - print_humbug(message) - failed = True - elif (set(message["content"] for message in messages) != - set([str(hzkey1), str(hzkey2), str(zhkey1), str(zhkey2)])): - print "zephyr=>humbug: Didn't get back right values!" - for message in messages: - print_humbug(message) - failed = True - - if failed: - print "original keys:", hzkey1, hzkey2, zhkey1, zhkey2 + logger.error("Error sending humbug, args were:") + logger.error(message) + logger.error(result) print_status_and_exit(1) - print "Success!" +def send_zephyr(zwrite_args, content): + p = subprocess.Popen(zwrite_args, stdin=subprocess.PIPE, + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout, stderr = p.communicate(input=content.encode("utf-8")) + if p.returncode != 0: + logger.error("Error sending zephyr:") + logger.info(stdout) + logger.error(stderr) + print_status_and_exit(1) + +# Subscribe to Humbugs +try: + res = humbug_client.get_profile() + max_message_id = res.get('max_message_id') + if max_message_id is None: + logging.error("Error subscribing to Humbugs!") + logging.error(res) + print_status_and_exit(1) +except Exception: + logger.exception("Unexpected error subscribing to Humbugs") + print_status_and_exit(1) + +# Subscribe to Zephyrs +import zephyr +zephyr_subs_to_add = [] +for (stream, test) in test_streams: + if stream == "message": + zephyr_subs_to_add.append((stream, 'personal', mit_user)) + else: + zephyr_subs_to_add.append((stream, '*', '*')) + +for tries in xrange(10): + try: + zephyr.init() + zephyr._z.subAll(zephyr_subs_to_add) + zephyr_subs = zephyr._z.getSubscriptions() + for elt in zephyr_subs_to_add: + if elt not in zephyr_subs: + logging.error("Failed to subscribe to %s" % (elt,)) + continue + break + except IOError, e: + if tries > 5: + if "SERVNAK received" in e: + logger.error("SERVNAK repeatedly received, punting rest of test") + else: + logger.exception("Exception subscribing to zephyrs") + print_status_and_exit(1) + +# Prepare keys +zhkeys = {} +hzkeys = {} +def gen_keys(key_dict): + for (stream, test) in test_streams: + bits = str(random.getrandbits(32)) + while bits in key_dict: + # Avoid the unlikely event that we get the same bits twice + bits = str(random.getrandbits(32)) + key_dict[bits] = (stream, test) + +gen_keys(zhkeys) +gen_keys(hzkeys) + +logger.info("Starting sending messages!") +# Send zephyrs +zsig = "Timothy Good Abbott" +for key, (stream, test) in zhkeys.items(): + if stream == "message": + zwrite_args = ["zwrite", "-s", zsig, mit_user] + else: + zwrite_args = ["zwrite", "-s", zsig, "-c", stream, "-i", "test"] + send_zephyr(zwrite_args, str(key)) +logger.info("Sent Zephyr messages!") + +# Send Humbugs +for key, (stream, test) in hzkeys.items(): + if stream == "message": + send_humbug({ + "type": "private", + "content": str(key), + "to": humbug_user, + }) + else: + send_humbug({ + "type": "stream", + "subject": "test", + "content": str(key), + "to": stream, + }) + +logger.info("Sent Humbug messages!") + +failed = False + +# Normally messages manage to forward through in under 3 seconds, but +# sleep 10 to give a safe margin since the messages do need to do 2 +# round trips. This alert is for correctness, not performance, and so +# we want it to reliably alert only when messages aren't being +# delivered at all. +time.sleep(10) +logger.info("Starting receiving messages!") + +# receive humbugs +messages = humbug_client.get_messages({'last': str(max_message_id)})['messages'] +logger.info("Received Humbug messages!") + +# receive zephyrs +notices = [] +while True: + try: + notice = zephyr.receive(block=False) + except Exception: + logging.exception("Exception receiving zephyrs:") + notice = None + if notice is None: + break + if notice.opcode != "": + continue + notices.append(notice) +logger.info("Received Zephyr messages!") + +all_keys = set(zhkeys.keys() + hzkeys.keys()) +def process_keys(content_list): + # Start by filtering out any keys that might have come from + # concurrent check-mirroring processes + content_keys = [key for key in content_list if key in all_keys] + key_counts = {} + for key in all_keys: + key_counts[key] = 0 + for key in content_keys: + key_counts[key] += 1 + z_missing = set(key for key in zhkeys.keys() if key_counts[key] == 0) + h_missing = set(key for key in hzkeys.keys() if key_counts[key] == 0) + duplicates = any(val > 1 for val in key_counts.values()) + success = all(val == 1 for val in key_counts.values()) + return key_counts, z_missing, h_missing, duplicates, success + +# The h_foo variables are about the messages we _received_ in Humbug +# The z_foo variables are about the messages we _received_ in Zephyr +h_contents = [message["content"] for message in messages] +z_contents = [notice.message.split('\0')[1] for notice in notices] +(h_key_counts, h_missing_z, h_missing_h, h_duplicates, h_success) = process_keys(h_contents) +(z_key_counts, z_missing_z, z_missing_h, z_duplicates, z_success) = process_keys(z_contents) + +if z_success and h_success: + logger.info("Success!") print_status_and_exit(0) +elif z_success: + logger.info("Received everything correctly in Zephyr!") +elif h_success: + logger.info("Received everything correctly in Humbug!") + +logger.error("Messages received the wrong number of times:") +for key in all_keys: + if z_key_counts[key] == 1 and h_key_counts[key] == 1: + continue + if key in zhkeys: + (stream, test) = zhkeys[key] + logger.warning("%10s: z got %s, h got %s. Sent via Zephyr(%s): class %s" % \ + (key, z_key_counts[key], h_key_counts[key], test, stream)) + if key in hzkeys: + (stream, test) = hzkeys[key] + logger.warning("%10s: z got %s. h got %s. Sent via Humbug(%s): class %s" % \ + (key, z_key_counts[key], h_key_counts[key], test, stream)) +logger.error("") +logger.error("Summary of specific problems:") + +if h_duplicates: + logger.error("humbug: Received duplicate messages!") + logger.error("humbug: This is probably a bug in our message loop detection.") + logger.error("humbug: where Humbugs go humbug=>zephyr=>humbug") +if z_duplicates: + logger.error("zephyr: Received duplicate messages!") + logger.error("zephyr: This is probably a bug in our message loop detection.") + logger.error("zephyr: where Zephyrs go zephyr=>humbug=>zephyr") + +if z_missing_z: + logger.error("zephyr: Didn't receive all the Zephyrs we sent on the Zephyr end!") + logger.error("zephyr: This is probably an issue with check-mirroring sending or receiving Zephyrs.") +if h_missing_h: + logger.error("humbug: Didn't receive all the Humbugs we sent on the Humbug end!") + logger.error("humbug: This is probably an issue with check-mirroring sending or receiving Humbugs.") +if z_missing_h: + logger.error("zephyr: Didn't receive all the Humbugs we sent on the Zephyr end!") + if z_missing_h == h_missing_h: + logger.error("zephyr: Including some Humbugs that we did receive on the Humbug end.") + logger.error("zephyr: This suggests we have a humbug=>zephyr mirroring problem.") + logger.error("zephyr: aka the personals mirroring script has issues.") +if h_missing_z: + logger.error("humbug: Didn't receive all the Zephyrs we sent on the Humbug end!") + if h_missing_z == z_missing_z: + logger.error("humbug: Including some Zephyrs that we did receive on the Zephyr end.") + logger.error("humbug: This suggests we have a zephyr=>humbug mirroring problem.") + logger.error("humbug: aka the global class mirroring script has issues.") + +print_status_and_exit(1)