python-zulip-api/bots/check-mirroring
Tim Abbott aba1ca871c check-mirroring: Try receiving zephyrs multiple times to avoid large queues.
(imported from commit 08c08df46794cd89db19748c3d5adfa4097de4cb)
2012-12-14 17:34:37 -05:00

319 lines
11 KiB
Python
Executable file

#!/usr/bin/python
import sys
import time
import optparse
import os
import random
import logging
import subprocess
from os import path
parser = optparse.OptionParser()
parser.add_option('--verbose',
dest='verbose',
default=False,
action='store_true')
parser.add_option('--site',
dest='site',
default="https://humbughq.com",
action='store')
parser.add_option('--sharded',
default=False,
action='store_true')
parser.add_option('--root-path',
dest='root_path',
default="/home/humbug",
action='store')
(options, args) = parser.parse_args()
# The 'api' directory needs to go first, so that 'import humbug' won't pick up
# some other directory named 'humbug'.
sys.path[:0] = [os.path.join(options.root_path, "api/"),
os.path.join(options.root_path, "python-zephyr"),
os.path.join(options.root_path, "python-zephyr/build/lib.linux-x86_64-2.6/"),
options.root_path]
mit_user = 'tabbott/extra@ATHENA.MIT.EDU'
humbug_user = 'tabbott/extra@mit.edu'
sys.path.append(".")
import humbug
humbug_client = humbug.Client(
email=humbug_user,
api_key="xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
verbose=True,
client="test: Humbug API",
site=options.site)
# Configure logging
log_file = "/home/humbug/check-mirroring-log"
log_format = "%(asctime)s: %(message)s"
logging.basicConfig(format=log_format)
formatter = logging.Formatter(log_format)
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(formatter)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.addHandler(file_handler)
# Initialize list of streams to test
if options.sharded:
# NOTE: Streams in this list must be in humbug_user's Humbug
# subscriptions, or we won't receive messages via Humbug.
# The sharded stream list has a bunch of pairs
# (stream, shard_name), where sha1sum(stream).startswith(shard_name)
test_streams = [
("message", "p"),
("tabbott-nagios-test-15", "0"),
("tabbott-nagios-test-1", "1"),
("tabbott-nagios-test-4", "2"),
("tabbott-nagios-test-32", "3"),
("tabbott-nagios-test-6", "4"),
("tabbott-nagios-test-93", "5"),
("tabbott-nagios-test-23", "6"),
("tabbott-nagios-test-16", "7"),
("tabbott-nagios-test-22", "8"),
("tabbott-nagios-test-3", "9"),
("tabbott-nagios-test-2", "a"),
("tabbott-nagios-test-10", "b"),
("tabbott-nagios-test-14", "c"),
("tabbott-nagios-test-8", "d"),
("tabbott-nagios-test-13", "e"),
("tabbott-nagios-test-45", "f"),
]
else:
test_streams = [
("message", "p"),
("tabbott-nagios-test", "a"),
]
def print_status_and_exit(status):
# The output of this script is used by Nagios. Various outputs,
# e.g. true success and punting due to a SERVNAK, result in a
# non-alert case, so to give us something unambiguous to check in
# Nagios, print the exit status.
print status
sys.exit(status)
def send_humbug(message):
result = humbug_client.send_message(message)
if result["result"] != "success":
logger.error("Error sending humbug, args were:")
logger.error(message)
logger.error(result)
print_status_and_exit(1)
def send_zephyr(zwrite_args, content, retry=False):
p = subprocess.Popen(zwrite_args, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate(input=content.encode("utf-8"))
if p.returncode != 0:
if not retry and "Detected server failure while receiving acknowledgement for" in stdout:
logger.warning("Got server failure error sending zephyr; retrying")
logger.warning(stderr)
# Retry sending the message rather than bailing.
return send_zephyr(zwrite_args, content, True)
logger.error("Error sending zephyr:")
logger.info(stdout)
logger.error(stderr)
print_status_and_exit(1)
# Subscribe to Humbugs
try:
res = humbug_client.get_profile()
max_message_id = res.get('max_message_id')
if max_message_id is None:
logging.error("Error subscribing to Humbugs!")
logging.error(res)
print_status_and_exit(1)
except Exception:
logger.exception("Unexpected error subscribing to Humbugs")
print_status_and_exit(1)
# Subscribe to Zephyrs
import zephyr
zephyr_subs_to_add = []
for (stream, test) in test_streams:
if stream == "message":
zephyr_subs_to_add.append((stream, 'personal', mit_user))
else:
zephyr_subs_to_add.append((stream, '*', '*'))
for tries in xrange(10):
try:
zephyr.init()
zephyr._z.subAll(zephyr_subs_to_add)
zephyr_subs = zephyr._z.getSubscriptions()
for elt in zephyr_subs_to_add:
if elt not in zephyr_subs:
logging.error("Failed to subscribe to %s" % (elt,))
continue
break
except IOError, e:
if tries > 5:
if "SERVNAK received" in e:
logger.error("SERVNAK repeatedly received, punting rest of test")
else:
logger.exception("Exception subscribing to zephyrs")
print_status_and_exit(1)
# Prepare keys
zhkeys = {}
hzkeys = {}
def gen_keys(key_dict):
for (stream, test) in test_streams:
bits = str(random.getrandbits(32))
while bits in key_dict:
# Avoid the unlikely event that we get the same bits twice
bits = str(random.getrandbits(32))
key_dict[bits] = (stream, test)
gen_keys(zhkeys)
gen_keys(hzkeys)
notices = []
# We check for new zephyrs multiple times, to avoid filling the zephyr
# receive queue with 30+ messages, which might result in messages
# being dropped.
def receive_zephyrs():
while True:
try:
notice = zephyr.receive(block=False)
except Exception:
logging.exception("Exception receiving zephyrs:")
notice = None
if notice is None:
break
if notice.opcode != "":
continue
notices.append(notice)
logger.info("Starting sending messages!")
# Send zephyrs
zsig = "Timothy Good Abbott"
for key, (stream, test) in zhkeys.items():
if stream == "message":
zwrite_args = ["zwrite", "-n", "-s", zsig, mit_user]
else:
zwrite_args = ["zwrite", "-n", "-s", zsig, "-c", stream, "-i", "test"]
send_zephyr(zwrite_args, str(key))
receive_zephyrs()
logger.info("Sent Zephyr messages!")
# Send Humbugs
for key, (stream, test) in hzkeys.items():
if stream == "message":
send_humbug({
"type": "private",
"content": str(key),
"to": humbug_user,
})
else:
send_humbug({
"type": "stream",
"subject": "test",
"content": str(key),
"to": stream,
})
receive_zephyrs()
logger.info("Sent Humbug messages!")
# Normally messages manage to forward through in under 3 seconds, but
# sleep 10 to give a safe margin since the messages do need to do 2
# round trips. This alert is for correctness, not performance, and so
# we want it to reliably alert only when messages aren't being
# delivered at all.
time.sleep(10)
receive_zephyrs()
logger.info("Starting receiving messages!")
# receive humbugs
messages = humbug_client.get_messages({'last': str(max_message_id)})['messages']
logger.info("Finished receiving Humbug messages!")
receive_zephyrs()
logger.info("Finished receiving Zephyr messages!")
all_keys = set(zhkeys.keys() + hzkeys.keys())
def process_keys(content_list):
# Start by filtering out any keys that might have come from
# concurrent check-mirroring processes
content_keys = [key for key in content_list if key in all_keys]
key_counts = {}
for key in all_keys:
key_counts[key] = 0
for key in content_keys:
key_counts[key] += 1
z_missing = set(key for key in zhkeys.keys() if key_counts[key] == 0)
h_missing = set(key for key in hzkeys.keys() if key_counts[key] == 0)
duplicates = any(val > 1 for val in key_counts.values())
success = all(val == 1 for val in key_counts.values())
return key_counts, z_missing, h_missing, duplicates, success
# The h_foo variables are about the messages we _received_ in Humbug
# The z_foo variables are about the messages we _received_ in Zephyr
h_contents = [message["content"] for message in messages]
z_contents = [notice.message.split('\0')[1] for notice in notices]
(h_key_counts, h_missing_z, h_missing_h, h_duplicates, h_success) = process_keys(h_contents)
(z_key_counts, z_missing_z, z_missing_h, z_duplicates, z_success) = process_keys(z_contents)
if z_success and h_success:
logger.info("Success!")
print_status_and_exit(0)
elif z_success:
logger.info("Received everything correctly in Zephyr!")
elif h_success:
logger.info("Received everything correctly in Humbug!")
logger.error("Messages received the wrong number of times:")
for key in all_keys:
if z_key_counts[key] == 1 and h_key_counts[key] == 1:
continue
if key in zhkeys:
(stream, test) = zhkeys[key]
logger.warning("%10s: z got %s, h got %s. Sent via Zephyr(%s): class %s" % \
(key, z_key_counts[key], h_key_counts[key], test, stream))
if key in hzkeys:
(stream, test) = hzkeys[key]
logger.warning("%10s: z got %s. h got %s. Sent via Humbug(%s): class %s" % \
(key, z_key_counts[key], h_key_counts[key], test, stream))
logger.error("")
logger.error("Summary of specific problems:")
if h_duplicates:
logger.error("humbug: Received duplicate messages!")
logger.error("humbug: This is probably a bug in our message loop detection.")
logger.error("humbug: where Humbugs go humbug=>zephyr=>humbug")
if z_duplicates:
logger.error("zephyr: Received duplicate messages!")
logger.error("zephyr: This is probably a bug in our message loop detection.")
logger.error("zephyr: where Zephyrs go zephyr=>humbug=>zephyr")
if z_missing_z:
logger.error("zephyr: Didn't receive all the Zephyrs we sent on the Zephyr end!")
logger.error("zephyr: This is probably an issue with check-mirroring sending or receiving Zephyrs.")
if h_missing_h:
logger.error("humbug: Didn't receive all the Humbugs we sent on the Humbug end!")
logger.error("humbug: This is probably an issue with check-mirroring sending or receiving Humbugs.")
if z_missing_h:
logger.error("zephyr: Didn't receive all the Humbugs we sent on the Zephyr end!")
if z_missing_h == h_missing_h:
logger.error("zephyr: Including some Humbugs that we did receive on the Humbug end.")
logger.error("zephyr: This suggests we have a humbug=>zephyr mirroring problem.")
logger.error("zephyr: aka the personals mirroring script has issues.")
if h_missing_z:
logger.error("humbug: Didn't receive all the Zephyrs we sent on the Humbug end!")
if h_missing_z == z_missing_z:
logger.error("humbug: Including some Zephyrs that we did receive on the Zephyr end.")
logger.error("humbug: This suggests we have a zephyr=>humbug mirroring problem.")
logger.error("humbug: aka the global class mirroring script has issues.")
print_status_and_exit(1)