354 lines
		
	
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable file
		
	
	
	
	
			
		
		
	
	
			354 lines
		
	
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable file
		
	
	
	
	
#!/usr/bin/env python
 | 
						|
import sys
 | 
						|
import time
 | 
						|
import optparse
 | 
						|
import os
 | 
						|
import random
 | 
						|
import logging
 | 
						|
import subprocess
 | 
						|
import hashlib
 | 
						|
 | 
						|
parser = optparse.OptionParser()
 | 
						|
parser.add_option('--verbose',
 | 
						|
                  dest='verbose',
 | 
						|
                  default=False,
 | 
						|
                  action='store_true')
 | 
						|
parser.add_option('--site',
 | 
						|
                  dest='site',
 | 
						|
                  default="https://api.zulip.com",
 | 
						|
                  action='store')
 | 
						|
parser.add_option('--sharded',
 | 
						|
                  default=False,
 | 
						|
                  action='store_true')
 | 
						|
parser.add_option('--root-path',
 | 
						|
                  dest='root_path',
 | 
						|
                  default="/home/zulip",
 | 
						|
                  action='store')
 | 
						|
(options, args) = parser.parse_args()
 | 
						|
 | 
						|
# The 'api' directory needs to go first, so that 'import zulip' won't pick up
 | 
						|
# some other directory named 'zulip'.
 | 
						|
pyzephyr_lib_path = "python-zephyr/build/lib.linux-%s-%s/" % (os.uname()[4], sys.version[0:3])
 | 
						|
sys.path[:0] = [os.path.join(options.root_path, "api/"),
 | 
						|
                os.path.join(options.root_path, "python-zephyr"),
 | 
						|
                os.path.join(options.root_path, pyzephyr_lib_path),
 | 
						|
                options.root_path]
 | 
						|
 | 
						|
mit_user = 'tabbott/extra@ATHENA.MIT.EDU'
 | 
						|
zulip_user = 'tabbott/extra@mit.edu'
 | 
						|
 | 
						|
sys.path.append(".")
 | 
						|
import zulip
 | 
						|
zulip_client = zulip.Client(
 | 
						|
    email=zulip_user,
 | 
						|
    api_key="xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
 | 
						|
    verbose=True,
 | 
						|
    client="ZulipMonitoring/0.1",
 | 
						|
    site=options.site)
 | 
						|
 | 
						|
# Configure logging
 | 
						|
log_file     = "/var/log/zulip/check-mirroring-log"
 | 
						|
log_format   = "%(asctime)s: %(message)s"
 | 
						|
logging.basicConfig(format=log_format)
 | 
						|
 | 
						|
formatter    = logging.Formatter(log_format)
 | 
						|
file_handler = logging.FileHandler(log_file)
 | 
						|
file_handler.setFormatter(formatter)
 | 
						|
 | 
						|
logger       = logging.getLogger(__name__)
 | 
						|
logger.setLevel(logging.DEBUG)
 | 
						|
logger.addHandler(file_handler)
 | 
						|
 | 
						|
# Initialize list of streams to test
 | 
						|
if options.sharded:
 | 
						|
    # NOTE: Streams in this list must be in zulip_user's Zulip
 | 
						|
    # subscriptions, or we won't receive messages via Zulip.
 | 
						|
 | 
						|
    # The sharded stream list has a bunch of pairs
 | 
						|
    # (stream, shard_name), where sha1sum(stream).startswith(shard_name)
 | 
						|
    test_streams = [
 | 
						|
        ("message", "p"),
 | 
						|
        ("tabbott-nagios-test-32", "0"),
 | 
						|
        ("tabbott-nagios-test-33", "1"),
 | 
						|
        ("tabbott-nagios-test-2",  "2"),
 | 
						|
        ("tabbott-nagios-test-5",  "3"),
 | 
						|
        ("tabbott-nagios-test-13", "4"),
 | 
						|
        ("tabbott-nagios-test-7",  "5"),
 | 
						|
        ("tabbott-nagios-test-22", "6"),
 | 
						|
        ("tabbott-nagios-test-35", "7"),
 | 
						|
        ("tabbott-nagios-test-4",  "8"),
 | 
						|
        ("tabbott-nagios-test-3",  "9"),
 | 
						|
        ("tabbott-nagios-test-1",  "a"),
 | 
						|
        ("tabbott-nagios-test-49", "b"),
 | 
						|
        ("tabbott-nagios-test-34", "c"),
 | 
						|
        ("tabbott-nagios-test-12", "d"),
 | 
						|
        ("tabbott-nagios-test-11", "e"),
 | 
						|
        ("tabbott-nagios-test-9",  "f"),
 | 
						|
        ]
 | 
						|
    for (stream, test) in test_streams:
 | 
						|
        if stream == "message":
 | 
						|
            continue
 | 
						|
        assert(hashlib.sha1(stream).hexdigest().startswith(test))
 | 
						|
else:
 | 
						|
    test_streams = [
 | 
						|
        ("message", "p"),
 | 
						|
        ("tabbott-nagios-test", "a"),
 | 
						|
        ]
 | 
						|
 | 
						|
def print_status_and_exit(status):
 | 
						|
    # The output of this script is used by Nagios. Various outputs,
 | 
						|
    # e.g. true success and punting due to a SERVNAK, result in a
 | 
						|
    # non-alert case, so to give us something unambiguous to check in
 | 
						|
    # Nagios, print the exit status.
 | 
						|
    print status
 | 
						|
    sys.exit(status)
 | 
						|
 | 
						|
def send_zulip(message):
 | 
						|
    result = zulip_client.send_message(message)
 | 
						|
    if result["result"] != "success":
 | 
						|
        logger.error("Error sending zulip, args were:")
 | 
						|
        logger.error(message)
 | 
						|
        logger.error(result)
 | 
						|
        print_status_and_exit(1)
 | 
						|
 | 
						|
# Returns True if and only if we "Detected server failure" sending the zephyr.
 | 
						|
def send_zephyr(zwrite_args, content):
 | 
						|
    p = subprocess.Popen(zwrite_args, stdin=subprocess.PIPE,
 | 
						|
                         stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 | 
						|
    stdout, stderr = p.communicate(input=content.encode("utf-8"))
 | 
						|
    if p.returncode != 0:
 | 
						|
        if "Detected server failure while receiving acknowledgement for" in stdout:
 | 
						|
            logger.warning("Got server failure error sending zephyr; retrying")
 | 
						|
            logger.warning(stderr)
 | 
						|
            return True
 | 
						|
        logger.error("Error sending zephyr:")
 | 
						|
        logger.info(stdout)
 | 
						|
        logger.error(stderr)
 | 
						|
        print_status_and_exit(1)
 | 
						|
    return False
 | 
						|
 | 
						|
# Subscribe to Zulip
 | 
						|
try:
 | 
						|
    res = zulip_client.register(event_types=["message"])
 | 
						|
    if 'error' in res.get('result'):
 | 
						|
        logging.error("Error subscribing to Zulips!")
 | 
						|
        logging.error(res['msg'])
 | 
						|
        print_status_and_exit(1)
 | 
						|
    queue_id, last_event_id = (res['queue_id'], res['last_event_id'])
 | 
						|
except Exception:
 | 
						|
    logger.exception("Unexpected error subscribing to Zulips")
 | 
						|
    print_status_and_exit(1)
 | 
						|
 | 
						|
# Subscribe to Zephyrs
 | 
						|
import zephyr
 | 
						|
zephyr_subs_to_add = []
 | 
						|
for (stream, test) in test_streams:
 | 
						|
    if stream == "message":
 | 
						|
        zephyr_subs_to_add.append((stream, 'personal', mit_user))
 | 
						|
    else:
 | 
						|
        zephyr_subs_to_add.append((stream, '*', '*'))
 | 
						|
 | 
						|
actually_subscribed = False
 | 
						|
for tries in xrange(10):
 | 
						|
    try:
 | 
						|
        zephyr.init()
 | 
						|
        zephyr._z.subAll(zephyr_subs_to_add)
 | 
						|
        zephyr_subs = zephyr._z.getSubscriptions()
 | 
						|
 | 
						|
        missing = 0
 | 
						|
        for elt in zephyr_subs_to_add:
 | 
						|
            if elt not in zephyr_subs:
 | 
						|
                logging.error("Failed to subscribe to %s" % (elt,))
 | 
						|
                missing += 1
 | 
						|
        if missing == 0:
 | 
						|
            actually_subscribed = True
 | 
						|
            break
 | 
						|
    except IOError, e:
 | 
						|
        if "SERVNAK received" in e:
 | 
						|
            logger.error("SERVNAK repeatedly received, punting rest of test")
 | 
						|
        else:
 | 
						|
            logger.exception("Exception subscribing to zephyrs")
 | 
						|
 | 
						|
if not actually_subscribed:
 | 
						|
    logger.error("Failed to subscribe to zephyrs")
 | 
						|
    print_status_and_exit(1)
 | 
						|
 | 
						|
# Prepare keys
 | 
						|
zhkeys = {}
 | 
						|
hzkeys = {}
 | 
						|
def gen_key(key_dict):
 | 
						|
    bits = str(random.getrandbits(32))
 | 
						|
    while bits in key_dict:
 | 
						|
        # Avoid the unlikely event that we get the same bits twice
 | 
						|
        bits = str(random.getrandbits(32))
 | 
						|
    return bits
 | 
						|
 | 
						|
def gen_keys(key_dict):
 | 
						|
    for (stream, test) in test_streams:
 | 
						|
        key_dict[gen_key(key_dict)] = (stream, test)
 | 
						|
 | 
						|
gen_keys(zhkeys)
 | 
						|
gen_keys(hzkeys)
 | 
						|
 | 
						|
notices = []
 | 
						|
 | 
						|
# We check for new zephyrs multiple times, to avoid filling the zephyr
 | 
						|
# receive queue with 30+ messages, which might result in messages
 | 
						|
# being dropped.
 | 
						|
def receive_zephyrs():
 | 
						|
    while True:
 | 
						|
        try:
 | 
						|
            notice = zephyr.receive(block=False)
 | 
						|
        except Exception:
 | 
						|
            logging.exception("Exception receiving zephyrs:")
 | 
						|
            notice = None
 | 
						|
        if notice is None:
 | 
						|
            break
 | 
						|
        if notice.opcode != "":
 | 
						|
            continue
 | 
						|
        notices.append(notice)
 | 
						|
 | 
						|
logger.info("Starting sending messages!")
 | 
						|
# Send zephyrs
 | 
						|
zsig = "Timothy Good Abbott"
 | 
						|
for key, (stream, test) in zhkeys.items():
 | 
						|
    if stream == "message":
 | 
						|
        zwrite_args = ["zwrite", "-n", "-s", zsig, mit_user]
 | 
						|
    else:
 | 
						|
        zwrite_args = ["zwrite", "-n", "-s", zsig, "-c", stream, "-i", "test"]
 | 
						|
    server_failure = send_zephyr(zwrite_args, str(key))
 | 
						|
    if server_failure:
 | 
						|
        # Replace the key we're not sure was delivered with a new key
 | 
						|
        value = zhkeys.pop(key)
 | 
						|
        new_key = gen_key(zhkeys)
 | 
						|
        zhkeys[new_key] = value
 | 
						|
        server_failure_again = send_zephyr(zwrite_args, str(new_key))
 | 
						|
        if server_failure_again:
 | 
						|
            logging.error("Zephyr server failure twice in a row on keys %s and %s!  Aborting." %
 | 
						|
                          (key, new_key))
 | 
						|
            print_status_and_exit(1)
 | 
						|
        else:
 | 
						|
            logging.warning("Replaced key %s with %s due to Zephyr server failure." %
 | 
						|
                            (key, new_key))
 | 
						|
    receive_zephyrs()
 | 
						|
 | 
						|
receive_zephyrs()
 | 
						|
logger.info("Sent Zephyr messages!")
 | 
						|
 | 
						|
# Send Zulips
 | 
						|
for key, (stream, test) in hzkeys.items():
 | 
						|
    if stream == "message":
 | 
						|
        send_zulip({
 | 
						|
                "type": "private",
 | 
						|
                "content": str(key),
 | 
						|
                "to": zulip_user,
 | 
						|
        })
 | 
						|
    else:
 | 
						|
        send_zulip({
 | 
						|
                "type": "stream",
 | 
						|
                "subject": "test",
 | 
						|
                "content": str(key),
 | 
						|
                "to": stream,
 | 
						|
                })
 | 
						|
    receive_zephyrs()
 | 
						|
 | 
						|
logger.info("Sent Zulip messages!")
 | 
						|
 | 
						|
# Normally messages manage to forward through in under 3 seconds, but
 | 
						|
# sleep 10 to give a safe margin since the messages do need to do 2
 | 
						|
# round trips.  This alert is for correctness, not performance, and so
 | 
						|
# we want it to reliably alert only when messages aren't being
 | 
						|
# delivered at all.
 | 
						|
time.sleep(10)
 | 
						|
receive_zephyrs()
 | 
						|
 | 
						|
logger.info("Starting receiving messages!")
 | 
						|
 | 
						|
# receive zulips
 | 
						|
res = zulip_client.get_events(queue_id=queue_id, last_event_id=last_event_id)
 | 
						|
if 'error' in res.get('result'):
 | 
						|
    logging.error("Error subscribing to Zulips!")
 | 
						|
    logging.error(res['msg'])
 | 
						|
    print_status_and_exit(1)
 | 
						|
messages = [event['message'] for event in res['events']]
 | 
						|
logger.info("Finished receiving Zulip messages!")
 | 
						|
 | 
						|
receive_zephyrs()
 | 
						|
logger.info("Finished receiving Zephyr messages!")
 | 
						|
 | 
						|
all_keys = set(zhkeys.keys() + hzkeys.keys())
 | 
						|
def process_keys(content_list):
 | 
						|
    # Start by filtering out any keys that might have come from
 | 
						|
    # concurrent check-mirroring processes
 | 
						|
    content_keys = [key for key in content_list if key in all_keys]
 | 
						|
    key_counts = {}
 | 
						|
    for key in all_keys:
 | 
						|
        key_counts[key] = 0
 | 
						|
    for key in content_keys:
 | 
						|
        key_counts[key] += 1
 | 
						|
    z_missing = set(key for key in zhkeys.keys() if key_counts[key] == 0)
 | 
						|
    h_missing = set(key for key in hzkeys.keys() if key_counts[key] == 0)
 | 
						|
    duplicates = any(val > 1 for val in key_counts.values())
 | 
						|
    success = all(val == 1 for val in key_counts.values())
 | 
						|
    return key_counts, z_missing, h_missing, duplicates, success
 | 
						|
 | 
						|
# The h_foo variables are about the messages we _received_ in Zulip
 | 
						|
# The z_foo variables are about the messages we _received_ in Zephyr
 | 
						|
h_contents = [message["content"] for message in messages]
 | 
						|
z_contents = [notice.message.split('\0')[1] for notice in notices]
 | 
						|
(h_key_counts, h_missing_z, h_missing_h, h_duplicates, h_success) = process_keys(h_contents)
 | 
						|
(z_key_counts, z_missing_z, z_missing_h, z_duplicates, z_success) = process_keys(z_contents)
 | 
						|
 | 
						|
if z_success and h_success:
 | 
						|
    logger.info("Success!")
 | 
						|
    print_status_and_exit(0)
 | 
						|
elif z_success:
 | 
						|
    logger.info("Received everything correctly in Zephyr!")
 | 
						|
elif h_success:
 | 
						|
    logger.info("Received everything correctly in Zulip!")
 | 
						|
 | 
						|
logger.error("Messages received the wrong number of times:")
 | 
						|
for key in all_keys:
 | 
						|
    if z_key_counts[key] == 1 and h_key_counts[key] == 1:
 | 
						|
        continue
 | 
						|
    if key in zhkeys:
 | 
						|
        (stream, test) = zhkeys[key]
 | 
						|
        logger.warning("%10s: z got %s, h got %s.  Sent via Zephyr(%s): class %s" % \
 | 
						|
            (key, z_key_counts[key], h_key_counts[key], test, stream))
 | 
						|
    if key in hzkeys:
 | 
						|
        (stream, test) = hzkeys[key]
 | 
						|
        logger.warning("%10s: z got %s. h got %s.  Sent via Zulip(%s): class %s" % \
 | 
						|
            (key, z_key_counts[key], h_key_counts[key], test, stream))
 | 
						|
logger.error("")
 | 
						|
logger.error("Summary of specific problems:")
 | 
						|
 | 
						|
if h_duplicates:
 | 
						|
    logger.error("zulip: Received duplicate messages!")
 | 
						|
    logger.error("zulip: This is probably a bug in our message loop detection.")
 | 
						|
    logger.error("zulip: where Zulips go zulip=>zephyr=>zulip")
 | 
						|
if z_duplicates:
 | 
						|
    logger.error("zephyr: Received duplicate messages!")
 | 
						|
    logger.error("zephyr: This is probably a bug in our message loop detection.")
 | 
						|
    logger.error("zephyr: where Zephyrs go zephyr=>zulip=>zephyr")
 | 
						|
 | 
						|
if z_missing_z:
 | 
						|
    logger.error("zephyr: Didn't receive all the Zephyrs we sent on the Zephyr end!")
 | 
						|
    logger.error("zephyr: This is probably an issue with check-mirroring sending or receiving Zephyrs.")
 | 
						|
if h_missing_h:
 | 
						|
    logger.error("zulip: Didn't receive all the Zulips we sent on the Zulip end!")
 | 
						|
    logger.error("zulip: This is probably an issue with check-mirroring sending or receiving Zulips.")
 | 
						|
if z_missing_h:
 | 
						|
    logger.error("zephyr: Didn't receive all the Zulips we sent on the Zephyr end!")
 | 
						|
    if z_missing_h == h_missing_h:
 | 
						|
        logger.error("zephyr: Including some Zulips that we did receive on the Zulip end.")
 | 
						|
        logger.error("zephyr: This suggests we have a zulip=>zephyr mirroring problem.")
 | 
						|
        logger.error("zephyr: aka the personals mirroring script has issues.")
 | 
						|
if h_missing_z:
 | 
						|
    logger.error("zulip: Didn't receive all the Zephyrs we sent on the Zulip end!")
 | 
						|
    if h_missing_z == z_missing_z:
 | 
						|
        logger.error("zulip: Including some Zephyrs that we did receive on the Zephyr end.")
 | 
						|
        logger.error("zulip: This suggests we have a zephyr=>zulip mirroring problem.")
 | 
						|
        logger.error("zulip: aka the global class mirroring script has issues.")
 | 
						|
 | 
						|
zulip_client.deregister(queue_id)
 | 
						|
print_status_and_exit(1)
 |