5e2979b870
This must be deployed after we update our running nginx configuration to serve api.humbughq.com. (imported from commit b5c34ebdd595f55eecd6dca6a18a37f105107bd5)
348 lines
12 KiB
Python
Executable file
348 lines
12 KiB
Python
Executable file
#!/usr/bin/env python
|
|
import sys
|
|
import time
|
|
import optparse
|
|
import os
|
|
import random
|
|
import logging
|
|
import subprocess
|
|
import hashlib
|
|
|
|
parser = optparse.OptionParser()
|
|
parser.add_option('--verbose',
|
|
dest='verbose',
|
|
default=False,
|
|
action='store_true')
|
|
parser.add_option('--site',
|
|
dest='site',
|
|
default="https://api.humbughq.com",
|
|
action='store')
|
|
parser.add_option('--sharded',
|
|
default=False,
|
|
action='store_true')
|
|
parser.add_option('--root-path',
|
|
dest='root_path',
|
|
default="/home/humbug",
|
|
action='store')
|
|
(options, args) = parser.parse_args()
|
|
|
|
# The 'api' directory needs to go first, so that 'import humbug' won't pick up
|
|
# some other directory named 'humbug'.
|
|
sys.path[:0] = [os.path.join(options.root_path, "api/"),
|
|
os.path.join(options.root_path, "python-zephyr"),
|
|
os.path.join(options.root_path, "python-zephyr/build/lib.linux-x86_64-2.6/"),
|
|
options.root_path]
|
|
|
|
mit_user = 'tabbott/extra@ATHENA.MIT.EDU'
|
|
humbug_user = 'tabbott/extra@mit.edu'
|
|
|
|
sys.path.append(".")
|
|
import humbug
|
|
humbug_client = humbug.Client(
|
|
email=humbug_user,
|
|
api_key="xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
|
|
verbose=True,
|
|
client="test: Humbug API",
|
|
site=options.site)
|
|
|
|
# Configure logging
|
|
log_file = os.path.join(options.root_path, "check-mirroring-log")
|
|
log_format = "%(asctime)s: %(message)s"
|
|
logging.basicConfig(format=log_format)
|
|
|
|
formatter = logging.Formatter(log_format)
|
|
file_handler = logging.FileHandler(log_file)
|
|
file_handler.setFormatter(formatter)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
logger.setLevel(logging.DEBUG)
|
|
logger.addHandler(file_handler)
|
|
|
|
# Initialize list of streams to test
|
|
if options.sharded:
|
|
# NOTE: Streams in this list must be in humbug_user's Humbug
|
|
# subscriptions, or we won't receive messages via Humbug.
|
|
|
|
# The sharded stream list has a bunch of pairs
|
|
# (stream, shard_name), where sha1sum(stream).startswith(shard_name)
|
|
test_streams = [
|
|
("message", "p"),
|
|
("tabbott-nagios-test-32", "0"),
|
|
("tabbott-nagios-test-33", "1"),
|
|
("tabbott-nagios-test-2", "2"),
|
|
("tabbott-nagios-test-5", "3"),
|
|
("tabbott-nagios-test-13", "4"),
|
|
("tabbott-nagios-test-7", "5"),
|
|
("tabbott-nagios-test-22", "6"),
|
|
("tabbott-nagios-test-35", "7"),
|
|
("tabbott-nagios-test-4", "8"),
|
|
("tabbott-nagios-test-3", "9"),
|
|
("tabbott-nagios-test-1", "a"),
|
|
("tabbott-nagios-test-49", "b"),
|
|
("tabbott-nagios-test-34", "c"),
|
|
("tabbott-nagios-test-12", "d"),
|
|
("tabbott-nagios-test-11", "e"),
|
|
("tabbott-nagios-test-9", "f"),
|
|
]
|
|
for (stream, test) in test_streams:
|
|
if stream == "message":
|
|
continue
|
|
assert(hashlib.sha1(stream).hexdigest().startswith(test))
|
|
else:
|
|
test_streams = [
|
|
("message", "p"),
|
|
("tabbott-nagios-test", "a"),
|
|
]
|
|
|
|
def print_status_and_exit(status):
|
|
# The output of this script is used by Nagios. Various outputs,
|
|
# e.g. true success and punting due to a SERVNAK, result in a
|
|
# non-alert case, so to give us something unambiguous to check in
|
|
# Nagios, print the exit status.
|
|
print status
|
|
sys.exit(status)
|
|
|
|
def send_humbug(message):
|
|
result = humbug_client.send_message(message)
|
|
if result["result"] != "success":
|
|
logger.error("Error sending humbug, args were:")
|
|
logger.error(message)
|
|
logger.error(result)
|
|
print_status_and_exit(1)
|
|
|
|
# Returns True if and only if we "Detected server failure" sending the zephyr.
|
|
def send_zephyr(zwrite_args, content):
|
|
p = subprocess.Popen(zwrite_args, stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
stdout, stderr = p.communicate(input=content.encode("utf-8"))
|
|
if p.returncode != 0:
|
|
if "Detected server failure while receiving acknowledgement for" in stdout:
|
|
logger.warning("Got server failure error sending zephyr; retrying")
|
|
logger.warning(stderr)
|
|
return True
|
|
logger.error("Error sending zephyr:")
|
|
logger.info(stdout)
|
|
logger.error(stderr)
|
|
print_status_and_exit(1)
|
|
return False
|
|
|
|
# Subscribe to Humbugs
|
|
try:
|
|
res = humbug_client.get_profile()
|
|
max_message_id = res.get('max_message_id')
|
|
if max_message_id is None:
|
|
logging.error("Error subscribing to Humbugs!")
|
|
logging.error(res)
|
|
print_status_and_exit(1)
|
|
except Exception:
|
|
logger.exception("Unexpected error subscribing to Humbugs")
|
|
print_status_and_exit(1)
|
|
|
|
# Subscribe to Zephyrs
|
|
import zephyr
|
|
zephyr_subs_to_add = []
|
|
for (stream, test) in test_streams:
|
|
if stream == "message":
|
|
zephyr_subs_to_add.append((stream, 'personal', mit_user))
|
|
else:
|
|
zephyr_subs_to_add.append((stream, '*', '*'))
|
|
|
|
actually_subscribed = False
|
|
for tries in xrange(10):
|
|
try:
|
|
zephyr.init()
|
|
zephyr._z.subAll(zephyr_subs_to_add)
|
|
zephyr_subs = zephyr._z.getSubscriptions()
|
|
|
|
missing = 0
|
|
for elt in zephyr_subs_to_add:
|
|
if elt not in zephyr_subs:
|
|
logging.error("Failed to subscribe to %s" % (elt,))
|
|
missing += 1
|
|
if missing == 0:
|
|
actually_subscribed = True
|
|
break
|
|
except IOError, e:
|
|
if "SERVNAK received" in e:
|
|
logger.error("SERVNAK repeatedly received, punting rest of test")
|
|
else:
|
|
logger.exception("Exception subscribing to zephyrs")
|
|
|
|
if not actually_subscribed:
|
|
logger.error("Failed to subscribe to zephyrs")
|
|
print_status_and_exit(1)
|
|
|
|
# Prepare keys
|
|
zhkeys = {}
|
|
hzkeys = {}
|
|
def gen_key(key_dict):
|
|
bits = str(random.getrandbits(32))
|
|
while bits in key_dict:
|
|
# Avoid the unlikely event that we get the same bits twice
|
|
bits = str(random.getrandbits(32))
|
|
return bits
|
|
|
|
def gen_keys(key_dict):
|
|
for (stream, test) in test_streams:
|
|
key_dict[gen_key(key_dict)] = (stream, test)
|
|
|
|
gen_keys(zhkeys)
|
|
gen_keys(hzkeys)
|
|
|
|
notices = []
|
|
|
|
# We check for new zephyrs multiple times, to avoid filling the zephyr
|
|
# receive queue with 30+ messages, which might result in messages
|
|
# being dropped.
|
|
def receive_zephyrs():
|
|
while True:
|
|
try:
|
|
notice = zephyr.receive(block=False)
|
|
except Exception:
|
|
logging.exception("Exception receiving zephyrs:")
|
|
notice = None
|
|
if notice is None:
|
|
break
|
|
if notice.opcode != "":
|
|
continue
|
|
notices.append(notice)
|
|
|
|
logger.info("Starting sending messages!")
|
|
# Send zephyrs
|
|
zsig = "Timothy Good Abbott"
|
|
for key, (stream, test) in zhkeys.items():
|
|
if stream == "message":
|
|
zwrite_args = ["zwrite", "-n", "-s", zsig, mit_user]
|
|
else:
|
|
zwrite_args = ["zwrite", "-n", "-s", zsig, "-c", stream, "-i", "test"]
|
|
server_failure = send_zephyr(zwrite_args, str(key))
|
|
if server_failure:
|
|
# Replace the key we're not sure was delivered with a new key
|
|
value = zhkeys.pop(key)
|
|
new_key = gen_key(zhkeys)
|
|
zhkeys[new_key] = value
|
|
server_failure_again = send_zephyr(zwrite_args, str(new_key))
|
|
if server_failure_again:
|
|
logging.error("Zephyr server failure twice in a row on keys %s and %s! Aborting." %
|
|
(key, new_key))
|
|
print_status_and_exit(1)
|
|
else:
|
|
logging.warning("Replaced key %s with %s due to Zephyr server failure." %
|
|
(key, new_key))
|
|
receive_zephyrs()
|
|
|
|
receive_zephyrs()
|
|
logger.info("Sent Zephyr messages!")
|
|
|
|
# Send Humbugs
|
|
for key, (stream, test) in hzkeys.items():
|
|
if stream == "message":
|
|
send_humbug({
|
|
"type": "private",
|
|
"content": str(key),
|
|
"to": humbug_user,
|
|
})
|
|
else:
|
|
send_humbug({
|
|
"type": "stream",
|
|
"subject": "test",
|
|
"content": str(key),
|
|
"to": stream,
|
|
})
|
|
receive_zephyrs()
|
|
|
|
logger.info("Sent Humbug messages!")
|
|
|
|
# Normally messages manage to forward through in under 3 seconds, but
|
|
# sleep 10 to give a safe margin since the messages do need to do 2
|
|
# round trips. This alert is for correctness, not performance, and so
|
|
# we want it to reliably alert only when messages aren't being
|
|
# delivered at all.
|
|
time.sleep(10)
|
|
receive_zephyrs()
|
|
|
|
logger.info("Starting receiving messages!")
|
|
|
|
# receive humbugs
|
|
messages = humbug_client.get_messages({'last': str(max_message_id)})['messages']
|
|
logger.info("Finished receiving Humbug messages!")
|
|
|
|
receive_zephyrs()
|
|
logger.info("Finished receiving Zephyr messages!")
|
|
|
|
all_keys = set(zhkeys.keys() + hzkeys.keys())
|
|
def process_keys(content_list):
|
|
# Start by filtering out any keys that might have come from
|
|
# concurrent check-mirroring processes
|
|
content_keys = [key for key in content_list if key in all_keys]
|
|
key_counts = {}
|
|
for key in all_keys:
|
|
key_counts[key] = 0
|
|
for key in content_keys:
|
|
key_counts[key] += 1
|
|
z_missing = set(key for key in zhkeys.keys() if key_counts[key] == 0)
|
|
h_missing = set(key for key in hzkeys.keys() if key_counts[key] == 0)
|
|
duplicates = any(val > 1 for val in key_counts.values())
|
|
success = all(val == 1 for val in key_counts.values())
|
|
return key_counts, z_missing, h_missing, duplicates, success
|
|
|
|
# The h_foo variables are about the messages we _received_ in Humbug
|
|
# The z_foo variables are about the messages we _received_ in Zephyr
|
|
h_contents = [message["content"] for message in messages]
|
|
z_contents = [notice.message.split('\0')[1] for notice in notices]
|
|
(h_key_counts, h_missing_z, h_missing_h, h_duplicates, h_success) = process_keys(h_contents)
|
|
(z_key_counts, z_missing_z, z_missing_h, z_duplicates, z_success) = process_keys(z_contents)
|
|
|
|
if z_success and h_success:
|
|
logger.info("Success!")
|
|
print_status_and_exit(0)
|
|
elif z_success:
|
|
logger.info("Received everything correctly in Zephyr!")
|
|
elif h_success:
|
|
logger.info("Received everything correctly in Humbug!")
|
|
|
|
logger.error("Messages received the wrong number of times:")
|
|
for key in all_keys:
|
|
if z_key_counts[key] == 1 and h_key_counts[key] == 1:
|
|
continue
|
|
if key in zhkeys:
|
|
(stream, test) = zhkeys[key]
|
|
logger.warning("%10s: z got %s, h got %s. Sent via Zephyr(%s): class %s" % \
|
|
(key, z_key_counts[key], h_key_counts[key], test, stream))
|
|
if key in hzkeys:
|
|
(stream, test) = hzkeys[key]
|
|
logger.warning("%10s: z got %s. h got %s. Sent via Humbug(%s): class %s" % \
|
|
(key, z_key_counts[key], h_key_counts[key], test, stream))
|
|
logger.error("")
|
|
logger.error("Summary of specific problems:")
|
|
|
|
if h_duplicates:
|
|
logger.error("humbug: Received duplicate messages!")
|
|
logger.error("humbug: This is probably a bug in our message loop detection.")
|
|
logger.error("humbug: where Humbugs go humbug=>zephyr=>humbug")
|
|
if z_duplicates:
|
|
logger.error("zephyr: Received duplicate messages!")
|
|
logger.error("zephyr: This is probably a bug in our message loop detection.")
|
|
logger.error("zephyr: where Zephyrs go zephyr=>humbug=>zephyr")
|
|
|
|
if z_missing_z:
|
|
logger.error("zephyr: Didn't receive all the Zephyrs we sent on the Zephyr end!")
|
|
logger.error("zephyr: This is probably an issue with check-mirroring sending or receiving Zephyrs.")
|
|
if h_missing_h:
|
|
logger.error("humbug: Didn't receive all the Humbugs we sent on the Humbug end!")
|
|
logger.error("humbug: This is probably an issue with check-mirroring sending or receiving Humbugs.")
|
|
if z_missing_h:
|
|
logger.error("zephyr: Didn't receive all the Humbugs we sent on the Zephyr end!")
|
|
if z_missing_h == h_missing_h:
|
|
logger.error("zephyr: Including some Humbugs that we did receive on the Humbug end.")
|
|
logger.error("zephyr: This suggests we have a humbug=>zephyr mirroring problem.")
|
|
logger.error("zephyr: aka the personals mirroring script has issues.")
|
|
if h_missing_z:
|
|
logger.error("humbug: Didn't receive all the Zephyrs we sent on the Humbug end!")
|
|
if h_missing_z == z_missing_z:
|
|
logger.error("humbug: Including some Zephyrs that we did receive on the Zephyr end.")
|
|
logger.error("humbug: This suggests we have a zephyr=>humbug mirroring problem.")
|
|
logger.error("humbug: aka the global class mirroring script has issues.")
|
|
|
|
print_status_and_exit(1)
|