157fdb94d4
This is for consistency with where one goes for server logs on our normal frontends. (imported from commit 094f4cd1ca6c64497594827e12d8100bbb9bd6ab)
349 lines
12 KiB
Python
Executable file
349 lines
12 KiB
Python
Executable file
#!/usr/bin/env python
|
|
import sys
|
|
import time
|
|
import optparse
|
|
import os
|
|
import random
|
|
import logging
|
|
import subprocess
|
|
import hashlib
|
|
|
|
parser = optparse.OptionParser()
|
|
parser.add_option('--verbose',
|
|
dest='verbose',
|
|
default=False,
|
|
action='store_true')
|
|
parser.add_option('--site',
|
|
dest='site',
|
|
default="https://api.zulip.com",
|
|
action='store')
|
|
parser.add_option('--sharded',
|
|
default=False,
|
|
action='store_true')
|
|
parser.add_option('--root-path',
|
|
dest='root_path',
|
|
default="/home/humbug",
|
|
action='store')
|
|
(options, args) = parser.parse_args()
|
|
|
|
# The 'api' directory needs to go first, so that 'import zulip' won't pick up
|
|
# some other directory named 'humbug'.
|
|
pyzephyr_lib_path = "python-zephyr/build/lib.linux-%s-%s/" % (os.uname()[4], sys.version[0:3])
|
|
sys.path[:0] = [os.path.join(options.root_path, "api/"),
|
|
os.path.join(options.root_path, "python-zephyr"),
|
|
os.path.join(options.root_path, pyzephyr_lib_path),
|
|
options.root_path]
|
|
|
|
mit_user = 'tabbott/extra@ATHENA.MIT.EDU'
|
|
zulip_user = 'tabbott/extra@mit.edu'
|
|
|
|
sys.path.append(".")
|
|
import zulip
|
|
zulip_client = zulip.Client(
|
|
email=zulip_user,
|
|
api_key="xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
|
|
verbose=True,
|
|
client="test: Humbug API",
|
|
site=options.site)
|
|
|
|
# Configure logging
|
|
log_file = os.path.join(options.root_path, "logs", "check-mirroring-log")
|
|
log_format = "%(asctime)s: %(message)s"
|
|
logging.basicConfig(format=log_format)
|
|
|
|
formatter = logging.Formatter(log_format)
|
|
file_handler = logging.FileHandler(log_file)
|
|
file_handler.setFormatter(formatter)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
logger.setLevel(logging.DEBUG)
|
|
logger.addHandler(file_handler)
|
|
|
|
# Initialize list of streams to test
|
|
if options.sharded:
|
|
# NOTE: Streams in this list must be in zulip_user's Zulip
|
|
# subscriptions, or we won't receive messages via Zulip.
|
|
|
|
# The sharded stream list has a bunch of pairs
|
|
# (stream, shard_name), where sha1sum(stream).startswith(shard_name)
|
|
test_streams = [
|
|
("message", "p"),
|
|
("tabbott-nagios-test-32", "0"),
|
|
("tabbott-nagios-test-33", "1"),
|
|
("tabbott-nagios-test-2", "2"),
|
|
("tabbott-nagios-test-5", "3"),
|
|
("tabbott-nagios-test-13", "4"),
|
|
("tabbott-nagios-test-7", "5"),
|
|
("tabbott-nagios-test-22", "6"),
|
|
("tabbott-nagios-test-35", "7"),
|
|
("tabbott-nagios-test-4", "8"),
|
|
("tabbott-nagios-test-3", "9"),
|
|
("tabbott-nagios-test-1", "a"),
|
|
("tabbott-nagios-test-49", "b"),
|
|
("tabbott-nagios-test-34", "c"),
|
|
("tabbott-nagios-test-12", "d"),
|
|
("tabbott-nagios-test-11", "e"),
|
|
("tabbott-nagios-test-9", "f"),
|
|
]
|
|
for (stream, test) in test_streams:
|
|
if stream == "message":
|
|
continue
|
|
assert(hashlib.sha1(stream).hexdigest().startswith(test))
|
|
else:
|
|
test_streams = [
|
|
("message", "p"),
|
|
("tabbott-nagios-test", "a"),
|
|
]
|
|
|
|
def print_status_and_exit(status):
|
|
# The output of this script is used by Nagios. Various outputs,
|
|
# e.g. true success and punting due to a SERVNAK, result in a
|
|
# non-alert case, so to give us something unambiguous to check in
|
|
# Nagios, print the exit status.
|
|
print status
|
|
sys.exit(status)
|
|
|
|
def send_zulip(message):
|
|
result = zulip_client.send_message(message)
|
|
if result["result"] != "success":
|
|
logger.error("Error sending zulip, args were:")
|
|
logger.error(message)
|
|
logger.error(result)
|
|
print_status_and_exit(1)
|
|
|
|
# Returns True if and only if we "Detected server failure" sending the zephyr.
|
|
def send_zephyr(zwrite_args, content):
|
|
p = subprocess.Popen(zwrite_args, stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
stdout, stderr = p.communicate(input=content.encode("utf-8"))
|
|
if p.returncode != 0:
|
|
if "Detected server failure while receiving acknowledgement for" in stdout:
|
|
logger.warning("Got server failure error sending zephyr; retrying")
|
|
logger.warning(stderr)
|
|
return True
|
|
logger.error("Error sending zephyr:")
|
|
logger.info(stdout)
|
|
logger.error(stderr)
|
|
print_status_and_exit(1)
|
|
return False
|
|
|
|
# Subscribe to Zulip
|
|
try:
|
|
res = zulip_client.get_profile()
|
|
max_message_id = res.get('max_message_id')
|
|
if max_message_id is None:
|
|
logging.error("Error subscribing to Zulips!")
|
|
logging.error(res)
|
|
print_status_and_exit(1)
|
|
except Exception:
|
|
logger.exception("Unexpected error subscribing to Zulips")
|
|
print_status_and_exit(1)
|
|
|
|
# Subscribe to Zephyrs
|
|
import zephyr
|
|
zephyr_subs_to_add = []
|
|
for (stream, test) in test_streams:
|
|
if stream == "message":
|
|
zephyr_subs_to_add.append((stream, 'personal', mit_user))
|
|
else:
|
|
zephyr_subs_to_add.append((stream, '*', '*'))
|
|
|
|
actually_subscribed = False
|
|
for tries in xrange(10):
|
|
try:
|
|
zephyr.init()
|
|
zephyr._z.subAll(zephyr_subs_to_add)
|
|
zephyr_subs = zephyr._z.getSubscriptions()
|
|
|
|
missing = 0
|
|
for elt in zephyr_subs_to_add:
|
|
if elt not in zephyr_subs:
|
|
logging.error("Failed to subscribe to %s" % (elt,))
|
|
missing += 1
|
|
if missing == 0:
|
|
actually_subscribed = True
|
|
break
|
|
except IOError, e:
|
|
if "SERVNAK received" in e:
|
|
logger.error("SERVNAK repeatedly received, punting rest of test")
|
|
else:
|
|
logger.exception("Exception subscribing to zephyrs")
|
|
|
|
if not actually_subscribed:
|
|
logger.error("Failed to subscribe to zephyrs")
|
|
print_status_and_exit(1)
|
|
|
|
# Prepare keys
|
|
zhkeys = {}
|
|
hzkeys = {}
|
|
def gen_key(key_dict):
|
|
bits = str(random.getrandbits(32))
|
|
while bits in key_dict:
|
|
# Avoid the unlikely event that we get the same bits twice
|
|
bits = str(random.getrandbits(32))
|
|
return bits
|
|
|
|
def gen_keys(key_dict):
|
|
for (stream, test) in test_streams:
|
|
key_dict[gen_key(key_dict)] = (stream, test)
|
|
|
|
gen_keys(zhkeys)
|
|
gen_keys(hzkeys)
|
|
|
|
notices = []
|
|
|
|
# We check for new zephyrs multiple times, to avoid filling the zephyr
|
|
# receive queue with 30+ messages, which might result in messages
|
|
# being dropped.
|
|
def receive_zephyrs():
|
|
while True:
|
|
try:
|
|
notice = zephyr.receive(block=False)
|
|
except Exception:
|
|
logging.exception("Exception receiving zephyrs:")
|
|
notice = None
|
|
if notice is None:
|
|
break
|
|
if notice.opcode != "":
|
|
continue
|
|
notices.append(notice)
|
|
|
|
logger.info("Starting sending messages!")
|
|
# Send zephyrs
|
|
zsig = "Timothy Good Abbott"
|
|
for key, (stream, test) in zhkeys.items():
|
|
if stream == "message":
|
|
zwrite_args = ["zwrite", "-n", "-s", zsig, mit_user]
|
|
else:
|
|
zwrite_args = ["zwrite", "-n", "-s", zsig, "-c", stream, "-i", "test"]
|
|
server_failure = send_zephyr(zwrite_args, str(key))
|
|
if server_failure:
|
|
# Replace the key we're not sure was delivered with a new key
|
|
value = zhkeys.pop(key)
|
|
new_key = gen_key(zhkeys)
|
|
zhkeys[new_key] = value
|
|
server_failure_again = send_zephyr(zwrite_args, str(new_key))
|
|
if server_failure_again:
|
|
logging.error("Zephyr server failure twice in a row on keys %s and %s! Aborting." %
|
|
(key, new_key))
|
|
print_status_and_exit(1)
|
|
else:
|
|
logging.warning("Replaced key %s with %s due to Zephyr server failure." %
|
|
(key, new_key))
|
|
receive_zephyrs()
|
|
|
|
receive_zephyrs()
|
|
logger.info("Sent Zephyr messages!")
|
|
|
|
# Send Zulips
|
|
for key, (stream, test) in hzkeys.items():
|
|
if stream == "message":
|
|
send_zulip({
|
|
"type": "private",
|
|
"content": str(key),
|
|
"to": zulip_user,
|
|
})
|
|
else:
|
|
send_zulip({
|
|
"type": "stream",
|
|
"subject": "test",
|
|
"content": str(key),
|
|
"to": stream,
|
|
})
|
|
receive_zephyrs()
|
|
|
|
logger.info("Sent Zulip messages!")
|
|
|
|
# Normally messages manage to forward through in under 3 seconds, but
|
|
# sleep 10 to give a safe margin since the messages do need to do 2
|
|
# round trips. This alert is for correctness, not performance, and so
|
|
# we want it to reliably alert only when messages aren't being
|
|
# delivered at all.
|
|
time.sleep(10)
|
|
receive_zephyrs()
|
|
|
|
logger.info("Starting receiving messages!")
|
|
|
|
# receive zulips
|
|
messages = zulip_client.get_messages({'last': str(max_message_id)})['messages']
|
|
logger.info("Finished receiving Zulip messages!")
|
|
|
|
receive_zephyrs()
|
|
logger.info("Finished receiving Zephyr messages!")
|
|
|
|
all_keys = set(zhkeys.keys() + hzkeys.keys())
|
|
def process_keys(content_list):
|
|
# Start by filtering out any keys that might have come from
|
|
# concurrent check-mirroring processes
|
|
content_keys = [key for key in content_list if key in all_keys]
|
|
key_counts = {}
|
|
for key in all_keys:
|
|
key_counts[key] = 0
|
|
for key in content_keys:
|
|
key_counts[key] += 1
|
|
z_missing = set(key for key in zhkeys.keys() if key_counts[key] == 0)
|
|
h_missing = set(key for key in hzkeys.keys() if key_counts[key] == 0)
|
|
duplicates = any(val > 1 for val in key_counts.values())
|
|
success = all(val == 1 for val in key_counts.values())
|
|
return key_counts, z_missing, h_missing, duplicates, success
|
|
|
|
# The h_foo variables are about the messages we _received_ in Zulip
|
|
# The z_foo variables are about the messages we _received_ in Zephyr
|
|
h_contents = [message["content"] for message in messages]
|
|
z_contents = [notice.message.split('\0')[1] for notice in notices]
|
|
(h_key_counts, h_missing_z, h_missing_h, h_duplicates, h_success) = process_keys(h_contents)
|
|
(z_key_counts, z_missing_z, z_missing_h, z_duplicates, z_success) = process_keys(z_contents)
|
|
|
|
if z_success and h_success:
|
|
logger.info("Success!")
|
|
print_status_and_exit(0)
|
|
elif z_success:
|
|
logger.info("Received everything correctly in Zephyr!")
|
|
elif h_success:
|
|
logger.info("Received everything correctly in Zulip!")
|
|
|
|
logger.error("Messages received the wrong number of times:")
|
|
for key in all_keys:
|
|
if z_key_counts[key] == 1 and h_key_counts[key] == 1:
|
|
continue
|
|
if key in zhkeys:
|
|
(stream, test) = zhkeys[key]
|
|
logger.warning("%10s: z got %s, h got %s. Sent via Zephyr(%s): class %s" % \
|
|
(key, z_key_counts[key], h_key_counts[key], test, stream))
|
|
if key in hzkeys:
|
|
(stream, test) = hzkeys[key]
|
|
logger.warning("%10s: z got %s. h got %s. Sent via Zulip(%s): class %s" % \
|
|
(key, z_key_counts[key], h_key_counts[key], test, stream))
|
|
logger.error("")
|
|
logger.error("Summary of specific problems:")
|
|
|
|
if h_duplicates:
|
|
logger.error("zulip: Received duplicate messages!")
|
|
logger.error("zulip: This is probably a bug in our message loop detection.")
|
|
logger.error("zulip: where Zulips go zulip=>zephyr=>zulip")
|
|
if z_duplicates:
|
|
logger.error("zephyr: Received duplicate messages!")
|
|
logger.error("zephyr: This is probably a bug in our message loop detection.")
|
|
logger.error("zephyr: where Zephyrs go zephyr=>zulip=>zephyr")
|
|
|
|
if z_missing_z:
|
|
logger.error("zephyr: Didn't receive all the Zephyrs we sent on the Zephyr end!")
|
|
logger.error("zephyr: This is probably an issue with check-mirroring sending or receiving Zephyrs.")
|
|
if h_missing_h:
|
|
logger.error("zulip: Didn't receive all the Zulips we sent on the Zulip end!")
|
|
logger.error("zulip: This is probably an issue with check-mirroring sending or receiving Zulips.")
|
|
if z_missing_h:
|
|
logger.error("zephyr: Didn't receive all the Zulips we sent on the Zephyr end!")
|
|
if z_missing_h == h_missing_h:
|
|
logger.error("zephyr: Including some Zulips that we did receive on the Zulip end.")
|
|
logger.error("zephyr: This suggests we have a zulip=>zephyr mirroring problem.")
|
|
logger.error("zephyr: aka the personals mirroring script has issues.")
|
|
if h_missing_z:
|
|
logger.error("zulip: Didn't receive all the Zephyrs we sent on the Zulip end!")
|
|
if h_missing_z == z_missing_z:
|
|
logger.error("zulip: Including some Zephyrs that we did receive on the Zephyr end.")
|
|
logger.error("zulip: This suggests we have a zephyr=>zulip mirroring problem.")
|
|
logger.error("zulip: aka the global class mirroring script has issues.")
|
|
|
|
print_status_and_exit(1)
|