#!/usr/bin/env python3 import hashlib import logging import optparse import random import subprocess import sys import time from ctypes import byref, c_int, c_ushort from typing import Dict, List, Set, Tuple import zephyr_ctypes import zulip parser = optparse.OptionParser() parser.add_option("--verbose", dest="verbose", default=False, action="store_true") parser.add_option("--site", dest="site", default=None, action="store") parser.add_option("--sharded", default=False, action="store_true") (options, args) = parser.parse_args() mit_user = "tabbott/extra@ATHENA.MIT.EDU" zulip_client = zulip.Client(verbose=True, client="ZulipMonitoring/0.1", site=options.site) # Configure logging log_file = "/var/log/zulip/check-mirroring-log" log_format = "%(asctime)s: %(message)s" logging.basicConfig(format=log_format) formatter = logging.Formatter(log_format) file_handler = logging.FileHandler(log_file) file_handler.setFormatter(formatter) logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) logger.addHandler(file_handler) # Initialize list of streams to test if options.sharded: # NOTE: Streams in this list must be in the zulip_user's Zulip # subscriptions, or we won't receive messages via Zulip. # The sharded stream list has a bunch of pairs # (stream, shard_name), where sha1sum(stream).startswith(shard_name) test_streams = [ ("message", "p"), ("tabbott-nagios-test-32", "0"), ("tabbott-nagios-test-33", "1"), ("tabbott-nagios-test-2", "2"), ("tabbott-nagios-test-5", "3"), ("tabbott-nagios-test-13", "4"), ("tabbott-nagios-test-7", "5"), ("tabbott-nagios-test-22", "6"), ("tabbott-nagios-test-35", "7"), ("tabbott-nagios-test-4", "8"), ("tabbott-nagios-test-3", "9"), ("tabbott-nagios-test-1", "a"), ("tabbott-nagios-test-49", "b"), ("tabbott-nagios-test-34", "c"), ("tabbott-nagios-test-12", "d"), ("tabbott-nagios-test-11", "e"), ("tabbott-nagios-test-9", "f"), ] for (stream, test) in test_streams: if stream == "message": continue assert hashlib.sha1(stream.encode("utf-8")).hexdigest().startswith(test) else: test_streams = [ ("message", "p"), ("tabbott-nagios-test", "a"), ] def print_status_and_exit(status: int) -> None: # The output of this script is used by Nagios. Various outputs, # e.g. true success and punting due to a SERVNAK, result in a # non-alert case, so to give us something unambiguous to check in # Nagios, print the exit status. print(status) sys.exit(status) def send_zulip(message: Dict[str, str]) -> None: result = zulip_client.send_message(message) if result["result"] != "success": logger.error("Error sending zulip, args were:") logger.error(str(message)) logger.error(str(result)) print_status_and_exit(1) # Returns True if and only if we "Detected server failure" sending the zephyr. def send_zephyr(zwrite_args: List[str], content: str) -> bool: p = subprocess.Popen( zwrite_args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, ) stdout, stderr = p.communicate(input=content) if p.returncode != 0: if "Detected server failure while receiving acknowledgement for" in stdout: logger.warning("Got server failure error sending zephyr; retrying") logger.warning(stderr) return True logger.error("Error sending zephyr:") logger.info(stdout) logger.error(stderr) print_status_and_exit(1) return False # Subscribe to Zulip try: res = zulip_client.register(event_types=["message"]) if "error" in res["result"]: logging.error("Error subscribing to Zulips!") logging.error(res["msg"]) print_status_and_exit(1) queue_id, last_event_id = (res["queue_id"], res["last_event_id"]) except Exception: logger.exception("Unexpected error subscribing to Zulips") print_status_and_exit(1) # Subscribe to Zephyrs zephyr_subs_to_add = [] for (stream, test) in test_streams: if stream == "message": zephyr_subs_to_add.append((stream, "personal", mit_user)) else: zephyr_subs_to_add.append((stream, "*", "*")) actually_subscribed = False for tries in range(10): try: zephyr_ctypes.check(zephyr_ctypes.ZInitialize()) zephyr_port = c_ushort() zephyr_ctypes.check(zephyr_ctypes.ZOpenPort(byref(zephyr_port))) zephyr_ctypes.check(zephyr_ctypes.ZCancelSubscriptions(0)) zephyr_ctypes.check( zephyr_ctypes.ZSubscribeTo( (zephyr_ctypes.ZSubscription_t * len(zephyr_subs_to_add))( *( zephyr_ctypes.ZSubscription_t( zsub_class=cls.encode(), zsub_classinst=instance.encode(), zsub_recipient=recipient.encode(), ) for cls, instance, recipient in zephyr_subs_to_add ) ), len(zephyr_subs_to_add), 0, ) ) try: nsubs = c_int() zephyr_ctypes.check(zephyr_ctypes.ZRetrieveSubscriptions(0, byref(nsubs))) zsubs = (zephyr_ctypes.ZSubscription_t * nsubs.value)() zephyr_ctypes.check(zephyr_ctypes.ZGetSubscriptions(zsubs, byref(nsubs))) zephyr_subs = { ( zsub.zsub_class.decode(), zsub.zsub_classinst.decode(), zsub.zsub_recipient.decode(), ) for zsub in zsubs } finally: zephyr_ctypes.ZFlushSubscriptions() missing = 0 for elt in zephyr_subs_to_add: if elt not in zephyr_subs: logging.error(f"Failed to subscribe to {elt}") missing += 1 if missing == 0: actually_subscribed = True break except zephyr_ctypes.ZephyrError as e: if e.code == zephyr_ctypes.ZERR_SERVNAK: logger.error("SERVNAK repeatedly received, punting rest of test") else: logger.exception("Exception subscribing to zephyrs") if not actually_subscribed: logger.error("Failed to subscribe to zephyrs") print_status_and_exit(1) # Prepare keys zhkeys = {} # type: Dict[str, Tuple[str, str]] hzkeys = {} # type: Dict[str, Tuple[str, str]] def gen_key(key_dict: Dict[str, Tuple[str, str]]) -> str: bits = str(random.getrandbits(32)) while bits in key_dict: # Avoid the unlikely event that we get the same bits twice bits = str(random.getrandbits(32)) return bits def gen_keys(key_dict: Dict[str, Tuple[str, str]]) -> None: for (stream, test) in test_streams: key_dict[gen_key(key_dict)] = (stream, test) gen_keys(zhkeys) gen_keys(hzkeys) notices = [] # We check for new zephyrs multiple times, to avoid filling the zephyr # receive queue with 30+ messages, which might result in messages # being dropped. def receive_zephyrs() -> None: while zephyr_ctypes.ZPending() != 0: notice = zephyr_ctypes.ZNotice_t() sender = zephyr_ctypes.sockaddr_in() try: zephyr_ctypes.check(zephyr_ctypes.ZReceiveNotice(byref(notice), byref(sender))) except zephyr_ctypes.ZephyrError: logging.exception("Exception receiving zephyrs:") break if notice.z_opcode != b"": continue notices.append(notice) logger.info("Starting sending messages!") # Send zephyrs zsig = "Timothy Good Abbott" for key, (stream, test) in zhkeys.items(): if stream == "message": zwrite_args = ["zwrite", "-n", "-s", zsig, mit_user] else: zwrite_args = ["zwrite", "-n", "-s", zsig, "-c", stream, "-i", "test"] server_failure = send_zephyr(zwrite_args, str(key)) if server_failure: # Replace the key we're not sure was delivered with a new key value = zhkeys.pop(key) new_key = gen_key(zhkeys) zhkeys[new_key] = value server_failure_again = send_zephyr(zwrite_args, str(new_key)) if server_failure_again: logging.error( "Zephyr server failure twice in a row on keys %s and %s! Aborting." % (key, new_key) ) print_status_and_exit(1) else: logging.warning(f"Replaced key {key} with {new_key} due to Zephyr server failure.") receive_zephyrs() receive_zephyrs() logger.info("Sent Zephyr messages!") # Send Zulips for key, (stream, test) in hzkeys.items(): if stream == "message": send_zulip( { "type": "private", "content": str(key), "to": zulip_client.email, } ) else: send_zulip( { "type": "stream", "subject": "test", "content": str(key), "to": stream, } ) receive_zephyrs() logger.info("Sent Zulip messages!") # Normally messages manage to forward through in under 3 seconds, but # sleep 10 to give a safe margin since the messages do need to do 2 # round trips. This alert is for correctness, not performance, and so # we want it to reliably alert only when messages aren't being # delivered at all. time.sleep(10) receive_zephyrs() logger.info("Starting receiving messages!") # receive zulips res = zulip_client.get_events(queue_id=queue_id, last_event_id=last_event_id) if "error" in res["result"]: logging.error("Error receiving Zulips!") logging.error(res["msg"]) print_status_and_exit(1) messages = [event["message"] for event in res["events"]] logger.info("Finished receiving Zulip messages!") receive_zephyrs() logger.info("Finished receiving Zephyr messages!") all_keys = set(list(zhkeys.keys()) + list(hzkeys.keys())) def process_keys(content_list: List[str]) -> Tuple[Dict[str, int], Set[str], Set[str], bool, bool]: # Start by filtering out any keys that might have come from # concurrent check-mirroring processes content_keys = [key for key in content_list if key in all_keys] key_counts = {} # type: Dict[str, int] for key in all_keys: key_counts[key] = 0 for key in content_keys: key_counts[key] += 1 z_missing = {key for key in zhkeys.keys() if key_counts[key] == 0} h_missing = {key for key in hzkeys.keys() if key_counts[key] == 0} duplicates = any(val > 1 for val in key_counts.values()) success = all(val == 1 for val in key_counts.values()) return key_counts, z_missing, h_missing, duplicates, success # The h_foo variables are about the messages we _received_ in Zulip # The z_foo variables are about the messages we _received_ in Zephyr h_contents = [message["content"] for message in messages] z_contents = [ notice.z_message[: notice.z_message_len].split(b"\0")[1].decode(errors="replace") for notice in notices ] (h_key_counts, h_missing_z, h_missing_h, h_duplicates, h_success) = process_keys(h_contents) (z_key_counts, z_missing_z, z_missing_h, z_duplicates, z_success) = process_keys(z_contents) if z_success and h_success: logger.info("Success!") print_status_and_exit(0) elif z_success: logger.info("Received everything correctly in Zephyr!") elif h_success: logger.info("Received everything correctly in Zulip!") logger.error("Messages received the wrong number of times:") for key in all_keys: if z_key_counts[key] == 1 and h_key_counts[key] == 1: continue if key in zhkeys: (stream, test) = zhkeys[key] logger.warning( "%10s: z got %s, h got %s. Sent via Zephyr(%s): class %s" % (key, z_key_counts[key], h_key_counts[key], test, stream) ) if key in hzkeys: (stream, test) = hzkeys[key] logger.warning( "%10s: z got %s. h got %s. Sent via Zulip(%s): class %s" % (key, z_key_counts[key], h_key_counts[key], test, stream) ) logger.error("") logger.error("Summary of specific problems:") if h_duplicates: logger.error("zulip: Received duplicate messages!") logger.error("zulip: This is probably a bug in our message loop detection.") logger.error("zulip: where Zulips go zulip=>zephyr=>zulip") if z_duplicates: logger.error("zephyr: Received duplicate messages!") logger.error("zephyr: This is probably a bug in our message loop detection.") logger.error("zephyr: where Zephyrs go zephyr=>zulip=>zephyr") if z_missing_z: logger.error("zephyr: Didn't receive all the Zephyrs we sent on the Zephyr end!") logger.error( "zephyr: This is probably an issue with check-mirroring sending or receiving Zephyrs." ) if h_missing_h: logger.error("zulip: Didn't receive all the Zulips we sent on the Zulip end!") logger.error( "zulip: This is probably an issue with check-mirroring sending or receiving Zulips." ) if z_missing_h: logger.error("zephyr: Didn't receive all the Zulips we sent on the Zephyr end!") if z_missing_h == h_missing_h: logger.error("zephyr: Including some Zulips that we did receive on the Zulip end.") logger.error("zephyr: This suggests we have a zulip=>zephyr mirroring problem.") logger.error("zephyr: aka the personals mirroring script has issues.") if h_missing_z: logger.error("zulip: Didn't receive all the Zephyrs we sent on the Zulip end!") if h_missing_z == z_missing_z: logger.error("zulip: Including some Zephyrs that we did receive on the Zephyr end.") logger.error("zulip: This suggests we have a zephyr=>zulip mirroring problem.") logger.error("zulip: aka the global class mirroring script has issues.") zulip_client.deregister(queue_id) print_status_and_exit(1)