python-zulip-api/humbug/bots/check-mirroring
Luke Faraone f2f4a2f8bd Move the API into a subdirectory for ease of imports.
Previously, if users of our code put the API folder in their pyshared
they would have to import it as "humbug.humbug". By moving Humbug's API
into a directory named "humbug" and moving the API into __init__, you
can just "import humbug".

(imported from commit 1d2654ae57f8ecbbfe76559de267ec4889708ee8)
2013-01-16 16:55:22 -05:00

349 lines
12 KiB
Python
Executable file

#!/usr/bin/python
import sys
import time
import optparse
import os
import random
import logging
import subprocess
import hashlib
from os import path
parser = optparse.OptionParser()
parser.add_option('--verbose',
dest='verbose',
default=False,
action='store_true')
parser.add_option('--site',
dest='site',
default="https://humbughq.com",
action='store')
parser.add_option('--sharded',
default=False,
action='store_true')
parser.add_option('--root-path',
dest='root_path',
default="/home/humbug",
action='store')
(options, args) = parser.parse_args()
# The 'api' directory needs to go first, so that 'import humbug' won't pick up
# some other directory named 'humbug'.
sys.path[:0] = [os.path.join(options.root_path, "api/"),
os.path.join(options.root_path, "python-zephyr"),
os.path.join(options.root_path, "python-zephyr/build/lib.linux-x86_64-2.6/"),
options.root_path]
mit_user = 'tabbott/extra@ATHENA.MIT.EDU'
humbug_user = 'tabbott/extra@mit.edu'
sys.path.append(".")
import humbug
humbug_client = humbug.Client(
email=humbug_user,
api_key="xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
verbose=True,
client="test: Humbug API",
site=options.site)
# Configure logging
log_file = os.path.join(options.root_path, "check-mirroring-log")
log_format = "%(asctime)s: %(message)s"
logging.basicConfig(format=log_format)
formatter = logging.Formatter(log_format)
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(formatter)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.addHandler(file_handler)
# Initialize list of streams to test
if options.sharded:
# NOTE: Streams in this list must be in humbug_user's Humbug
# subscriptions, or we won't receive messages via Humbug.
# The sharded stream list has a bunch of pairs
# (stream, shard_name), where sha1sum(stream).startswith(shard_name)
test_streams = [
("message", "p"),
("tabbott-nagios-test-32", "0"),
("tabbott-nagios-test-33", "1"),
("tabbott-nagios-test-2", "2"),
("tabbott-nagios-test-5", "3"),
("tabbott-nagios-test-13", "4"),
("tabbott-nagios-test-7", "5"),
("tabbott-nagios-test-22", "6"),
("tabbott-nagios-test-35", "7"),
("tabbott-nagios-test-4", "8"),
("tabbott-nagios-test-3", "9"),
("tabbott-nagios-test-1", "a"),
("tabbott-nagios-test-49", "b"),
("tabbott-nagios-test-34", "c"),
("tabbott-nagios-test-12", "d"),
("tabbott-nagios-test-11", "e"),
("tabbott-nagios-test-9", "f"),
]
for (stream, test) in test_streams:
if stream == "message":
continue
assert(hashlib.sha1(stream).hexdigest().startswith(test))
else:
test_streams = [
("message", "p"),
("tabbott-nagios-test", "a"),
]
def print_status_and_exit(status):
# The output of this script is used by Nagios. Various outputs,
# e.g. true success and punting due to a SERVNAK, result in a
# non-alert case, so to give us something unambiguous to check in
# Nagios, print the exit status.
print status
sys.exit(status)
def send_humbug(message):
result = humbug_client.send_message(message)
if result["result"] != "success":
logger.error("Error sending humbug, args were:")
logger.error(message)
logger.error(result)
print_status_and_exit(1)
# Returns True if and only if we "Detected server failure" sending the zephyr.
def send_zephyr(zwrite_args, content):
p = subprocess.Popen(zwrite_args, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate(input=content.encode("utf-8"))
if p.returncode != 0:
if "Detected server failure while receiving acknowledgement for" in stdout:
logger.warning("Got server failure error sending zephyr; retrying")
logger.warning(stderr)
return True
logger.error("Error sending zephyr:")
logger.info(stdout)
logger.error(stderr)
print_status_and_exit(1)
return False
# Subscribe to Humbugs
try:
res = humbug_client.get_profile()
max_message_id = res.get('max_message_id')
if max_message_id is None:
logging.error("Error subscribing to Humbugs!")
logging.error(res)
print_status_and_exit(1)
except Exception:
logger.exception("Unexpected error subscribing to Humbugs")
print_status_and_exit(1)
# Subscribe to Zephyrs
import zephyr
zephyr_subs_to_add = []
for (stream, test) in test_streams:
if stream == "message":
zephyr_subs_to_add.append((stream, 'personal', mit_user))
else:
zephyr_subs_to_add.append((stream, '*', '*'))
actually_subscribed = False
for tries in xrange(10):
try:
zephyr.init()
zephyr._z.subAll(zephyr_subs_to_add)
zephyr_subs = zephyr._z.getSubscriptions()
missing = 0
for elt in zephyr_subs_to_add:
if elt not in zephyr_subs:
logging.error("Failed to subscribe to %s" % (elt,))
missing += 1
if missing == 0:
actually_subscribed = True
break
except IOError, e:
if "SERVNAK received" in e:
logger.error("SERVNAK repeatedly received, punting rest of test")
else:
logger.exception("Exception subscribing to zephyrs")
if not actually_subscribed:
logger.error("Failed to subscribe to zephyrs")
print_status_and_exit(1)
# Prepare keys
zhkeys = {}
hzkeys = {}
def gen_key(key_dict):
bits = str(random.getrandbits(32))
while bits in key_dict:
# Avoid the unlikely event that we get the same bits twice
bits = str(random.getrandbits(32))
return bits
def gen_keys(key_dict):
for (stream, test) in test_streams:
key_dict[gen_key(key_dict)] = (stream, test)
gen_keys(zhkeys)
gen_keys(hzkeys)
notices = []
# We check for new zephyrs multiple times, to avoid filling the zephyr
# receive queue with 30+ messages, which might result in messages
# being dropped.
def receive_zephyrs():
while True:
try:
notice = zephyr.receive(block=False)
except Exception:
logging.exception("Exception receiving zephyrs:")
notice = None
if notice is None:
break
if notice.opcode != "":
continue
notices.append(notice)
logger.info("Starting sending messages!")
# Send zephyrs
zsig = "Timothy Good Abbott"
for key, (stream, test) in zhkeys.items():
if stream == "message":
zwrite_args = ["zwrite", "-n", "-s", zsig, mit_user]
else:
zwrite_args = ["zwrite", "-n", "-s", zsig, "-c", stream, "-i", "test"]
server_failure = send_zephyr(zwrite_args, str(key))
if server_failure:
# Replace the key we're not sure was delivered with a new key
value = zhkeys.pop(key)
new_key = gen_key(zhkeys)
zhkeys[new_key] = value
server_failure_again = send_zephyr(zwrite_args, str(new_key))
if server_failure_again:
logging.error("Zephyr server failure twice in a row on keys %s and %s! Aborting." %
(key, new_key))
print_status_and_exit(1)
else:
logging.warning("Replaced key %s with %s due to Zephyr server failure." %
(key, new_key))
receive_zephyrs()
receive_zephyrs()
logger.info("Sent Zephyr messages!")
# Send Humbugs
for key, (stream, test) in hzkeys.items():
if stream == "message":
send_humbug({
"type": "private",
"content": str(key),
"to": humbug_user,
})
else:
send_humbug({
"type": "stream",
"subject": "test",
"content": str(key),
"to": stream,
})
receive_zephyrs()
logger.info("Sent Humbug messages!")
# Normally messages manage to forward through in under 3 seconds, but
# sleep 10 to give a safe margin since the messages do need to do 2
# round trips. This alert is for correctness, not performance, and so
# we want it to reliably alert only when messages aren't being
# delivered at all.
time.sleep(10)
receive_zephyrs()
logger.info("Starting receiving messages!")
# receive humbugs
messages = humbug_client.get_messages({'last': str(max_message_id)})['messages']
logger.info("Finished receiving Humbug messages!")
receive_zephyrs()
logger.info("Finished receiving Zephyr messages!")
all_keys = set(zhkeys.keys() + hzkeys.keys())
def process_keys(content_list):
# Start by filtering out any keys that might have come from
# concurrent check-mirroring processes
content_keys = [key for key in content_list if key in all_keys]
key_counts = {}
for key in all_keys:
key_counts[key] = 0
for key in content_keys:
key_counts[key] += 1
z_missing = set(key for key in zhkeys.keys() if key_counts[key] == 0)
h_missing = set(key for key in hzkeys.keys() if key_counts[key] == 0)
duplicates = any(val > 1 for val in key_counts.values())
success = all(val == 1 for val in key_counts.values())
return key_counts, z_missing, h_missing, duplicates, success
# The h_foo variables are about the messages we _received_ in Humbug
# The z_foo variables are about the messages we _received_ in Zephyr
h_contents = [message["content"] for message in messages]
z_contents = [notice.message.split('\0')[1] for notice in notices]
(h_key_counts, h_missing_z, h_missing_h, h_duplicates, h_success) = process_keys(h_contents)
(z_key_counts, z_missing_z, z_missing_h, z_duplicates, z_success) = process_keys(z_contents)
if z_success and h_success:
logger.info("Success!")
print_status_and_exit(0)
elif z_success:
logger.info("Received everything correctly in Zephyr!")
elif h_success:
logger.info("Received everything correctly in Humbug!")
logger.error("Messages received the wrong number of times:")
for key in all_keys:
if z_key_counts[key] == 1 and h_key_counts[key] == 1:
continue
if key in zhkeys:
(stream, test) = zhkeys[key]
logger.warning("%10s: z got %s, h got %s. Sent via Zephyr(%s): class %s" % \
(key, z_key_counts[key], h_key_counts[key], test, stream))
if key in hzkeys:
(stream, test) = hzkeys[key]
logger.warning("%10s: z got %s. h got %s. Sent via Humbug(%s): class %s" % \
(key, z_key_counts[key], h_key_counts[key], test, stream))
logger.error("")
logger.error("Summary of specific problems:")
if h_duplicates:
logger.error("humbug: Received duplicate messages!")
logger.error("humbug: This is probably a bug in our message loop detection.")
logger.error("humbug: where Humbugs go humbug=>zephyr=>humbug")
if z_duplicates:
logger.error("zephyr: Received duplicate messages!")
logger.error("zephyr: This is probably a bug in our message loop detection.")
logger.error("zephyr: where Zephyrs go zephyr=>humbug=>zephyr")
if z_missing_z:
logger.error("zephyr: Didn't receive all the Zephyrs we sent on the Zephyr end!")
logger.error("zephyr: This is probably an issue with check-mirroring sending or receiving Zephyrs.")
if h_missing_h:
logger.error("humbug: Didn't receive all the Humbugs we sent on the Humbug end!")
logger.error("humbug: This is probably an issue with check-mirroring sending or receiving Humbugs.")
if z_missing_h:
logger.error("zephyr: Didn't receive all the Humbugs we sent on the Zephyr end!")
if z_missing_h == h_missing_h:
logger.error("zephyr: Including some Humbugs that we did receive on the Humbug end.")
logger.error("zephyr: This suggests we have a humbug=>zephyr mirroring problem.")
logger.error("zephyr: aka the personals mirroring script has issues.")
if h_missing_z:
logger.error("humbug: Didn't receive all the Zephyrs we sent on the Humbug end!")
if h_missing_z == z_missing_z:
logger.error("humbug: Including some Zephyrs that we did receive on the Zephyr end.")
logger.error("humbug: This suggests we have a zephyr=>humbug mirroring problem.")
logger.error("humbug: aka the global class mirroring script has issues.")
print_status_and_exit(1)