Rewrite check-mirroring to be more robust and flexible.
Features include: * Not forking into two processes (shells out to zwrite to send instead). This makes life easier since we're not doing concurrent programming. * Eliminated a lot of hard-to-read or unnecessary debugging output. * Adding explanatory test suggesting the likely problem for some common sets of received messages. * Much less code duplication. * Support for testing a sharded zephyr_mirror script (--sharded). * Use of the logging module to print timestamps -- makes debugging some issues a lot. * Only one sleep, and for only 10 seconds, between sending the outgoing messages and checking that they were received. * Support for running two copies of this script at the same time, so that running it manually doesn't screw up Nagios. * Passed running 100 tests run in a row. (imported from commit a3ec02ac1d1a04972e469ca30fec1790c4fb53bc)
This commit is contained in:
parent
abb95fca04
commit
4bfd784423
|
@ -4,6 +4,8 @@ import time
|
||||||
import optparse
|
import optparse
|
||||||
import os
|
import os
|
||||||
import random
|
import random
|
||||||
|
import logging
|
||||||
|
import subprocess
|
||||||
|
|
||||||
parser = optparse.OptionParser()
|
parser = optparse.OptionParser()
|
||||||
parser.add_option('--verbose',
|
parser.add_option('--verbose',
|
||||||
|
@ -14,6 +16,9 @@ parser.add_option('--site',
|
||||||
dest='site',
|
dest='site',
|
||||||
default="https://humbughq.com",
|
default="https://humbughq.com",
|
||||||
action='store')
|
action='store')
|
||||||
|
parser.add_option('--sharded',
|
||||||
|
default=False,
|
||||||
|
action='store_true')
|
||||||
parser.add_option('--root-path',
|
parser.add_option('--root-path',
|
||||||
dest='root_path',
|
dest='root_path',
|
||||||
default="/home/humbug",
|
default="/home/humbug",
|
||||||
|
@ -27,11 +32,6 @@ sys.path[:0] = [os.path.join(options.root_path, "python-zephyr"),
|
||||||
mit_user = 'tabbott/extra@ATHENA.MIT.EDU'
|
mit_user = 'tabbott/extra@ATHENA.MIT.EDU'
|
||||||
humbug_user = 'tabbott/extra@mit.edu'
|
humbug_user = 'tabbott/extra@mit.edu'
|
||||||
|
|
||||||
hzkey1 = random.getrandbits(32)
|
|
||||||
hzkey2 = random.getrandbits(32)
|
|
||||||
zhkey1 = random.getrandbits(32)
|
|
||||||
zhkey2 = random.getrandbits(32)
|
|
||||||
|
|
||||||
sys.path.append(".")
|
sys.path.append(".")
|
||||||
sys.path.append(os.path.dirname(os.path.dirname(__file__)))
|
sys.path.append(os.path.dirname(os.path.dirname(__file__)))
|
||||||
import api.common
|
import api.common
|
||||||
|
@ -41,6 +41,51 @@ humbug_client = api.common.HumbugAPI(email=humbug_user,
|
||||||
client="test: Humbug API",
|
client="test: Humbug API",
|
||||||
site=options.site)
|
site=options.site)
|
||||||
|
|
||||||
|
# Configure logging
|
||||||
|
log_file = "/home/humbug/check-mirroring-log"
|
||||||
|
log_format = "%(asctime)s: %(message)s"
|
||||||
|
logging.basicConfig(format=log_format)
|
||||||
|
|
||||||
|
formatter = logging.Formatter(log_format)
|
||||||
|
file_handler = logging.FileHandler(log_file)
|
||||||
|
file_handler.setFormatter(formatter)
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
logger.setLevel(logging.DEBUG)
|
||||||
|
logger.addHandler(file_handler)
|
||||||
|
|
||||||
|
# Initialize list of streams to test
|
||||||
|
if options.sharded:
|
||||||
|
# NOTE: Streams in this list must be in humbug_user's Humbug
|
||||||
|
# subscriptions, or we won't receive messages via Humbug.
|
||||||
|
|
||||||
|
# The sharded stream list has a bunch of pairs
|
||||||
|
# (stream, shard_name), where sha1sum(stream).startswith(shard_name)
|
||||||
|
test_streams = [
|
||||||
|
("message", "p"),
|
||||||
|
("tabbott-nagios-test-15", "0"),
|
||||||
|
("tabbott-nagios-test-1", "1"),
|
||||||
|
("tabbott-nagios-test-4", "2"),
|
||||||
|
("tabbott-nagios-test-32", "3"),
|
||||||
|
("tabbott-nagios-test-6", "4"),
|
||||||
|
("tabbott-nagios-test-93", "5"),
|
||||||
|
("tabbott-nagios-test-23", "6"),
|
||||||
|
("tabbott-nagios-test-16", "7"),
|
||||||
|
("tabbott-nagios-test-22", "8"),
|
||||||
|
("tabbott-nagios-test-3", "9"),
|
||||||
|
("tabbott-nagios-test-2", "a"),
|
||||||
|
("tabbott-nagios-test-10", "b"),
|
||||||
|
("tabbott-nagios-test-14", "c"),
|
||||||
|
("tabbott-nagios-test-8", "d"),
|
||||||
|
("tabbott-nagios-test-13", "e"),
|
||||||
|
("tabbott-nagios-test-45", "f"),
|
||||||
|
]
|
||||||
|
else:
|
||||||
|
test_streams = [
|
||||||
|
("message", "p"),
|
||||||
|
("tabbott-nagios-test", "a"),
|
||||||
|
]
|
||||||
|
|
||||||
def print_status_and_exit(status):
|
def print_status_and_exit(status):
|
||||||
# The output of this script is used by Nagios. Various outputs,
|
# The output of this script is used by Nagios. Various outputs,
|
||||||
# e.g. true success and punting due to a SERVNAK, result in a
|
# e.g. true success and punting due to a SERVNAK, result in a
|
||||||
|
@ -49,121 +94,207 @@ def print_status_and_exit(status):
|
||||||
print status
|
print status
|
||||||
sys.exit(status)
|
sys.exit(status)
|
||||||
|
|
||||||
def print_zephyr(notice):
|
def send_humbug(message):
|
||||||
print notice.cls, notice.instance, notice.sender, notice.message.split('\0')[1]
|
result = humbug_client.send_message(message)
|
||||||
|
|
||||||
def print_humbug(message):
|
|
||||||
if message['type'] == "stream":
|
|
||||||
print message["type"], message['display_recipient'], message['subject'], \
|
|
||||||
message['sender_email'], message['content']
|
|
||||||
else:
|
|
||||||
print message["type"], message['sender_email'], \
|
|
||||||
message['display_recipient'], message['content']
|
|
||||||
|
|
||||||
max_message_id = humbug_client.get_profile()['max_message_id']
|
|
||||||
|
|
||||||
child_pid = os.fork()
|
|
||||||
if child_pid == 0:
|
|
||||||
# Run the humbug => zephyr mirror in the child
|
|
||||||
time.sleep(5)
|
|
||||||
result = humbug_client.send_message({
|
|
||||||
"type": "private",
|
|
||||||
"content": str(hzkey1),
|
|
||||||
"to": humbug_user,
|
|
||||||
})
|
|
||||||
|
|
||||||
if result["result"] != "success":
|
if result["result"] != "success":
|
||||||
print "key1 send error:"
|
logger.error("Error sending humbug, args were:")
|
||||||
print result
|
logger.error(message)
|
||||||
|
logger.error(result)
|
||||||
result = humbug_client.send_message({
|
|
||||||
"type": "stream",
|
|
||||||
"subject": "test",
|
|
||||||
"content": str(hzkey2),
|
|
||||||
"to": "tabbott-nagios-test",
|
|
||||||
})
|
|
||||||
|
|
||||||
if result["result"] != "success":
|
|
||||||
print "key2 send error:"
|
|
||||||
print result
|
|
||||||
|
|
||||||
if options.verbose:
|
|
||||||
print "Sent Humbug messages!"
|
|
||||||
|
|
||||||
import zephyr
|
|
||||||
try:
|
|
||||||
zephyr.init()
|
|
||||||
except IOError, e:
|
|
||||||
if "SERVNAK received" in e:
|
|
||||||
print "SERVNAK received, punting rest of test"
|
|
||||||
print_status_and_exit(0)
|
|
||||||
|
|
||||||
zsig = "Timothy Good Abbott"
|
|
||||||
|
|
||||||
zeph = zephyr.ZNotice(sender=mit_user, auth=True, recipient=mit_user,
|
|
||||||
cls="message", instance="personal")
|
|
||||||
zeph.setmessage("%s\0%s" % (zsig, zhkey1))
|
|
||||||
zeph.send()
|
|
||||||
|
|
||||||
zeph = zephyr.ZNotice(sender=mit_user, auth=True,
|
|
||||||
cls="tabbott-nagios-test", instance="test")
|
|
||||||
zeph.setmessage("%s\0%s" % (zsig, zhkey2))
|
|
||||||
zeph.send()
|
|
||||||
if options.verbose:
|
|
||||||
print "Sent Zephyr messages!"
|
|
||||||
|
|
||||||
else:
|
|
||||||
failed = False
|
|
||||||
import zephyr
|
|
||||||
try:
|
|
||||||
zephyr.init()
|
|
||||||
zephyr._z.subAll([('message', 'personal', 'tabbott/extra@ATHENA.MIT.EDU'),
|
|
||||||
('tabbott-nagios-test', '*', '*')])
|
|
||||||
except IOError, e:
|
|
||||||
if "SERVNAK received" in e:
|
|
||||||
print "SERVNAK received, punting rest of test"
|
|
||||||
print_status_and_exit(0)
|
|
||||||
|
|
||||||
time.sleep(20)
|
|
||||||
if options.verbose:
|
|
||||||
print "Receiving messages!"
|
|
||||||
notices = []
|
|
||||||
while True:
|
|
||||||
notice = zephyr.receive(block=False)
|
|
||||||
if notice is None:
|
|
||||||
break
|
|
||||||
if notice.opcode != "":
|
|
||||||
continue
|
|
||||||
notices.append(notice)
|
|
||||||
if len(notices) != 4:
|
|
||||||
print "humbug=>zephyr: Got wrong number of messages back!"
|
|
||||||
failed = True
|
|
||||||
elif (set(notice.message.split('\0')[1] for notice in notices) !=
|
|
||||||
set([str(hzkey1), str(hzkey2), str(zhkey1), str(zhkey2)])):
|
|
||||||
print "humbug=>zephyr: Didn't get back right values!"
|
|
||||||
failed = True
|
|
||||||
if failed:
|
|
||||||
for notice in notices:
|
|
||||||
print_zephyr(notice)
|
|
||||||
|
|
||||||
messages = humbug_client.get_messages({'first': '0',
|
|
||||||
'last': str(max_message_id),
|
|
||||||
'server_generation': '0'})['messages']
|
|
||||||
if len(messages) != 4:
|
|
||||||
print "zephyr=>humbug: Didn't get exactly 4 messages!"
|
|
||||||
for message in messages:
|
|
||||||
print_humbug(message)
|
|
||||||
failed = True
|
|
||||||
elif (set(message["content"] for message in messages) !=
|
|
||||||
set([str(hzkey1), str(hzkey2), str(zhkey1), str(zhkey2)])):
|
|
||||||
print "zephyr=>humbug: Didn't get back right values!"
|
|
||||||
for message in messages:
|
|
||||||
print_humbug(message)
|
|
||||||
failed = True
|
|
||||||
|
|
||||||
if failed:
|
|
||||||
print "original keys:", hzkey1, hzkey2, zhkey1, zhkey2
|
|
||||||
print_status_and_exit(1)
|
print_status_and_exit(1)
|
||||||
|
|
||||||
print "Success!"
|
def send_zephyr(zwrite_args, content):
|
||||||
|
p = subprocess.Popen(zwrite_args, stdin=subprocess.PIPE,
|
||||||
|
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||||
|
stdout, stderr = p.communicate(input=content.encode("utf-8"))
|
||||||
|
if p.returncode != 0:
|
||||||
|
logger.error("Error sending zephyr:")
|
||||||
|
logger.info(stdout)
|
||||||
|
logger.error(stderr)
|
||||||
|
print_status_and_exit(1)
|
||||||
|
|
||||||
|
# Subscribe to Humbugs
|
||||||
|
try:
|
||||||
|
res = humbug_client.get_profile()
|
||||||
|
max_message_id = res.get('max_message_id')
|
||||||
|
if max_message_id is None:
|
||||||
|
logging.error("Error subscribing to Humbugs!")
|
||||||
|
logging.error(res)
|
||||||
|
print_status_and_exit(1)
|
||||||
|
except Exception:
|
||||||
|
logger.exception("Unexpected error subscribing to Humbugs")
|
||||||
|
print_status_and_exit(1)
|
||||||
|
|
||||||
|
# Subscribe to Zephyrs
|
||||||
|
import zephyr
|
||||||
|
zephyr_subs_to_add = []
|
||||||
|
for (stream, test) in test_streams:
|
||||||
|
if stream == "message":
|
||||||
|
zephyr_subs_to_add.append((stream, 'personal', mit_user))
|
||||||
|
else:
|
||||||
|
zephyr_subs_to_add.append((stream, '*', '*'))
|
||||||
|
|
||||||
|
for tries in xrange(10):
|
||||||
|
try:
|
||||||
|
zephyr.init()
|
||||||
|
zephyr._z.subAll(zephyr_subs_to_add)
|
||||||
|
zephyr_subs = zephyr._z.getSubscriptions()
|
||||||
|
for elt in zephyr_subs_to_add:
|
||||||
|
if elt not in zephyr_subs:
|
||||||
|
logging.error("Failed to subscribe to %s" % (elt,))
|
||||||
|
continue
|
||||||
|
break
|
||||||
|
except IOError, e:
|
||||||
|
if tries > 5:
|
||||||
|
if "SERVNAK received" in e:
|
||||||
|
logger.error("SERVNAK repeatedly received, punting rest of test")
|
||||||
|
else:
|
||||||
|
logger.exception("Exception subscribing to zephyrs")
|
||||||
|
print_status_and_exit(1)
|
||||||
|
|
||||||
|
# Prepare keys
|
||||||
|
zhkeys = {}
|
||||||
|
hzkeys = {}
|
||||||
|
def gen_keys(key_dict):
|
||||||
|
for (stream, test) in test_streams:
|
||||||
|
bits = str(random.getrandbits(32))
|
||||||
|
while bits in key_dict:
|
||||||
|
# Avoid the unlikely event that we get the same bits twice
|
||||||
|
bits = str(random.getrandbits(32))
|
||||||
|
key_dict[bits] = (stream, test)
|
||||||
|
|
||||||
|
gen_keys(zhkeys)
|
||||||
|
gen_keys(hzkeys)
|
||||||
|
|
||||||
|
logger.info("Starting sending messages!")
|
||||||
|
# Send zephyrs
|
||||||
|
zsig = "Timothy Good Abbott"
|
||||||
|
for key, (stream, test) in zhkeys.items():
|
||||||
|
if stream == "message":
|
||||||
|
zwrite_args = ["zwrite", "-s", zsig, mit_user]
|
||||||
|
else:
|
||||||
|
zwrite_args = ["zwrite", "-s", zsig, "-c", stream, "-i", "test"]
|
||||||
|
send_zephyr(zwrite_args, str(key))
|
||||||
|
logger.info("Sent Zephyr messages!")
|
||||||
|
|
||||||
|
# Send Humbugs
|
||||||
|
for key, (stream, test) in hzkeys.items():
|
||||||
|
if stream == "message":
|
||||||
|
send_humbug({
|
||||||
|
"type": "private",
|
||||||
|
"content": str(key),
|
||||||
|
"to": humbug_user,
|
||||||
|
})
|
||||||
|
else:
|
||||||
|
send_humbug({
|
||||||
|
"type": "stream",
|
||||||
|
"subject": "test",
|
||||||
|
"content": str(key),
|
||||||
|
"to": stream,
|
||||||
|
})
|
||||||
|
|
||||||
|
logger.info("Sent Humbug messages!")
|
||||||
|
|
||||||
|
failed = False
|
||||||
|
|
||||||
|
# Normally messages manage to forward through in under 3 seconds, but
|
||||||
|
# sleep 10 to give a safe margin since the messages do need to do 2
|
||||||
|
# round trips. This alert is for correctness, not performance, and so
|
||||||
|
# we want it to reliably alert only when messages aren't being
|
||||||
|
# delivered at all.
|
||||||
|
time.sleep(10)
|
||||||
|
logger.info("Starting receiving messages!")
|
||||||
|
|
||||||
|
# receive humbugs
|
||||||
|
messages = humbug_client.get_messages({'last': str(max_message_id)})['messages']
|
||||||
|
logger.info("Received Humbug messages!")
|
||||||
|
|
||||||
|
# receive zephyrs
|
||||||
|
notices = []
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
notice = zephyr.receive(block=False)
|
||||||
|
except Exception:
|
||||||
|
logging.exception("Exception receiving zephyrs:")
|
||||||
|
notice = None
|
||||||
|
if notice is None:
|
||||||
|
break
|
||||||
|
if notice.opcode != "":
|
||||||
|
continue
|
||||||
|
notices.append(notice)
|
||||||
|
logger.info("Received Zephyr messages!")
|
||||||
|
|
||||||
|
all_keys = set(zhkeys.keys() + hzkeys.keys())
|
||||||
|
def process_keys(content_list):
|
||||||
|
# Start by filtering out any keys that might have come from
|
||||||
|
# concurrent check-mirroring processes
|
||||||
|
content_keys = [key for key in content_list if key in all_keys]
|
||||||
|
key_counts = {}
|
||||||
|
for key in all_keys:
|
||||||
|
key_counts[key] = 0
|
||||||
|
for key in content_keys:
|
||||||
|
key_counts[key] += 1
|
||||||
|
z_missing = set(key for key in zhkeys.keys() if key_counts[key] == 0)
|
||||||
|
h_missing = set(key for key in hzkeys.keys() if key_counts[key] == 0)
|
||||||
|
duplicates = any(val > 1 for val in key_counts.values())
|
||||||
|
success = all(val == 1 for val in key_counts.values())
|
||||||
|
return key_counts, z_missing, h_missing, duplicates, success
|
||||||
|
|
||||||
|
# The h_foo variables are about the messages we _received_ in Humbug
|
||||||
|
# The z_foo variables are about the messages we _received_ in Zephyr
|
||||||
|
h_contents = [message["content"] for message in messages]
|
||||||
|
z_contents = [notice.message.split('\0')[1] for notice in notices]
|
||||||
|
(h_key_counts, h_missing_z, h_missing_h, h_duplicates, h_success) = process_keys(h_contents)
|
||||||
|
(z_key_counts, z_missing_z, z_missing_h, z_duplicates, z_success) = process_keys(z_contents)
|
||||||
|
|
||||||
|
if z_success and h_success:
|
||||||
|
logger.info("Success!")
|
||||||
print_status_and_exit(0)
|
print_status_and_exit(0)
|
||||||
|
elif z_success:
|
||||||
|
logger.info("Received everything correctly in Zephyr!")
|
||||||
|
elif h_success:
|
||||||
|
logger.info("Received everything correctly in Humbug!")
|
||||||
|
|
||||||
|
logger.error("Messages received the wrong number of times:")
|
||||||
|
for key in all_keys:
|
||||||
|
if z_key_counts[key] == 1 and h_key_counts[key] == 1:
|
||||||
|
continue
|
||||||
|
if key in zhkeys:
|
||||||
|
(stream, test) = zhkeys[key]
|
||||||
|
logger.warning("%10s: z got %s, h got %s. Sent via Zephyr(%s): class %s" % \
|
||||||
|
(key, z_key_counts[key], h_key_counts[key], test, stream))
|
||||||
|
if key in hzkeys:
|
||||||
|
(stream, test) = hzkeys[key]
|
||||||
|
logger.warning("%10s: z got %s. h got %s. Sent via Humbug(%s): class %s" % \
|
||||||
|
(key, z_key_counts[key], h_key_counts[key], test, stream))
|
||||||
|
logger.error("")
|
||||||
|
logger.error("Summary of specific problems:")
|
||||||
|
|
||||||
|
if h_duplicates:
|
||||||
|
logger.error("humbug: Received duplicate messages!")
|
||||||
|
logger.error("humbug: This is probably a bug in our message loop detection.")
|
||||||
|
logger.error("humbug: where Humbugs go humbug=>zephyr=>humbug")
|
||||||
|
if z_duplicates:
|
||||||
|
logger.error("zephyr: Received duplicate messages!")
|
||||||
|
logger.error("zephyr: This is probably a bug in our message loop detection.")
|
||||||
|
logger.error("zephyr: where Zephyrs go zephyr=>humbug=>zephyr")
|
||||||
|
|
||||||
|
if z_missing_z:
|
||||||
|
logger.error("zephyr: Didn't receive all the Zephyrs we sent on the Zephyr end!")
|
||||||
|
logger.error("zephyr: This is probably an issue with check-mirroring sending or receiving Zephyrs.")
|
||||||
|
if h_missing_h:
|
||||||
|
logger.error("humbug: Didn't receive all the Humbugs we sent on the Humbug end!")
|
||||||
|
logger.error("humbug: This is probably an issue with check-mirroring sending or receiving Humbugs.")
|
||||||
|
if z_missing_h:
|
||||||
|
logger.error("zephyr: Didn't receive all the Humbugs we sent on the Zephyr end!")
|
||||||
|
if z_missing_h == h_missing_h:
|
||||||
|
logger.error("zephyr: Including some Humbugs that we did receive on the Humbug end.")
|
||||||
|
logger.error("zephyr: This suggests we have a humbug=>zephyr mirroring problem.")
|
||||||
|
logger.error("zephyr: aka the personals mirroring script has issues.")
|
||||||
|
if h_missing_z:
|
||||||
|
logger.error("humbug: Didn't receive all the Zephyrs we sent on the Humbug end!")
|
||||||
|
if h_missing_z == z_missing_z:
|
||||||
|
logger.error("humbug: Including some Zephyrs that we did receive on the Zephyr end.")
|
||||||
|
logger.error("humbug: This suggests we have a zephyr=>humbug mirroring problem.")
|
||||||
|
logger.error("humbug: aka the global class mirroring script has issues.")
|
||||||
|
|
||||||
|
print_status_and_exit(1)
|
||||||
|
|
Loading…
Reference in a new issue