python-zulip-api/zulip/integrations/zephyr/check-mirroring

344 lines
12 KiB
Plaintext
Raw Normal View History

2020-04-02 09:59:28 -04:00
#!/usr/bin/env python3
import sys
import time
import optparse
import os
import random
import logging
import subprocess
import hashlib
import zephyr
import zulip
from typing import Any, Dict, List, Set, Tuple
2016-09-10 13:56:23 -04:00
parser = optparse.OptionParser()
parser.add_option('--verbose',
dest='verbose',
default=False,
action='store_true')
parser.add_option('--site',
dest='site',
default=None,
action='store')
parser.add_option('--sharded',
default=False,
action='store_true')
(options, args) = parser.parse_args()
mit_user = 'tabbott/extra@ATHENA.MIT.EDU'
zulip_client = zulip.Client(
verbose=True,
client="ZulipMonitoring/0.1",
site=options.site)
# Configure logging
log_file = "/var/log/zulip/check-mirroring-log"
log_format = "%(asctime)s: %(message)s"
logging.basicConfig(format=log_format)
formatter = logging.Formatter(log_format)
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(formatter)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.addHandler(file_handler)
# Initialize list of streams to test
if options.sharded:
# NOTE: Streams in this list must be in the zulip_user's Zulip
# subscriptions, or we won't receive messages via Zulip.
# The sharded stream list has a bunch of pairs
# (stream, shard_name), where sha1sum(stream).startswith(shard_name)
test_streams = [
("message", "p"),
("tabbott-nagios-test-32", "0"),
("tabbott-nagios-test-33", "1"),
2017-01-24 00:21:14 -05:00
("tabbott-nagios-test-2", "2"),
("tabbott-nagios-test-5", "3"),
("tabbott-nagios-test-13", "4"),
2017-01-24 00:21:14 -05:00
("tabbott-nagios-test-7", "5"),
("tabbott-nagios-test-22", "6"),
("tabbott-nagios-test-35", "7"),
2017-01-24 00:21:14 -05:00
("tabbott-nagios-test-4", "8"),
("tabbott-nagios-test-3", "9"),
("tabbott-nagios-test-1", "a"),
("tabbott-nagios-test-49", "b"),
("tabbott-nagios-test-34", "c"),
("tabbott-nagios-test-12", "d"),
("tabbott-nagios-test-11", "e"),
2017-01-24 00:21:14 -05:00
("tabbott-nagios-test-9", "f"),
2017-01-24 00:34:26 -05:00
]
for (stream, test) in test_streams:
if stream == "message":
continue
2016-09-10 13:56:23 -04:00
assert(hashlib.sha1(stream.encode("utf-8")).hexdigest().startswith(test))
else:
test_streams = [
("message", "p"),
("tabbott-nagios-test", "a"),
2017-01-24 00:34:26 -05:00
]
def print_status_and_exit(status: int) -> None:
2016-09-10 13:56:23 -04:00
# The output of this script is used by Nagios. Various outputs,
# e.g. true success and punting due to a SERVNAK, result in a
# non-alert case, so to give us something unambiguous to check in
# Nagios, print the exit status.
print(status)
sys.exit(status)
def send_zulip(message: Dict[str, str]) -> None:
result = zulip_client.send_message(message)
if result["result"] != "success":
logger.error("Error sending zulip, args were:")
logger.error(str(message))
logger.error(str(result))
print_status_and_exit(1)
# Returns True if and only if we "Detected server failure" sending the zephyr.
def send_zephyr(zwrite_args: List[str], content: str) -> bool:
p = subprocess.Popen(zwrite_args, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate(input=content.encode("utf-8"))
if p.returncode != 0:
# FIXME: This should really look for a byte form of the string in stdout
if "Detected server failure while receiving acknowledgement for" in stdout: # type: ignore
logger.warning("Got server failure error sending zephyr; retrying")
logger.warning(stderr)
return True
logger.error("Error sending zephyr:")
logger.info(stdout)
logger.error(stderr)
print_status_and_exit(1)
return False
# Subscribe to Zulip
try:
res = zulip_client.register(event_types=["message"])
if 'error' in res['result']:
logging.error("Error subscribing to Zulips!")
logging.error(res['msg'])
print_status_and_exit(1)
queue_id, last_event_id = (res['queue_id'], res['last_event_id'])
except Exception:
logger.exception("Unexpected error subscribing to Zulips")
print_status_and_exit(1)
# Subscribe to Zephyrs
zephyr_subs_to_add = []
for (stream, test) in test_streams:
if stream == "message":
zephyr_subs_to_add.append((stream, 'personal', mit_user))
else:
zephyr_subs_to_add.append((stream, '*', '*'))
actually_subscribed = False
for tries in range(10):
try:
zephyr.init()
zephyr._z.subAll(zephyr_subs_to_add)
zephyr_subs = zephyr._z.getSubscriptions()
missing = 0
for elt in zephyr_subs_to_add:
if elt not in zephyr_subs:
logging.error("Failed to subscribe to %s" % (elt,))
missing += 1
if missing == 0:
actually_subscribed = True
break
except OSError as e:
if "SERVNAK received" in e: # type: ignore # https://github.com/python/mypy/issues/2118
logger.error("SERVNAK repeatedly received, punting rest of test")
else:
logger.exception("Exception subscribing to zephyrs")
if not actually_subscribed:
logger.error("Failed to subscribe to zephyrs")
print_status_and_exit(1)
# Prepare keys
zhkeys = {} # type: Dict[str, Tuple[str, str]]
hzkeys = {} # type: Dict[str, Tuple[str, str]]
def gen_key(key_dict: Dict[str, Any]) -> str:
bits = str(random.getrandbits(32))
while bits in key_dict:
# Avoid the unlikely event that we get the same bits twice
bits = str(random.getrandbits(32))
return bits
def gen_keys(key_dict: Dict[str, Tuple[str, str]]) -> None:
for (stream, test) in test_streams:
key_dict[gen_key(key_dict)] = (stream, test)
gen_keys(zhkeys)
gen_keys(hzkeys)
notices = []
# We check for new zephyrs multiple times, to avoid filling the zephyr
# receive queue with 30+ messages, which might result in messages
# being dropped.
def receive_zephyrs() -> None:
while True:
try:
notice = zephyr.receive(block=False)
except Exception:
logging.exception("Exception receiving zephyrs:")
notice = None
if notice is None:
break
if notice.opcode != "":
continue
notices.append(notice)
logger.info("Starting sending messages!")
# Send zephyrs
zsig = "Timothy Good Abbott"
for key, (stream, test) in zhkeys.items():
if stream == "message":
zwrite_args = ["zwrite", "-n", "-s", zsig, mit_user]
else:
zwrite_args = ["zwrite", "-n", "-s", zsig, "-c", stream, "-i", "test"]
server_failure = send_zephyr(zwrite_args, str(key))
if server_failure:
# Replace the key we're not sure was delivered with a new key
value = zhkeys.pop(key)
new_key = gen_key(zhkeys)
zhkeys[new_key] = value
server_failure_again = send_zephyr(zwrite_args, str(new_key))
if server_failure_again:
logging.error("Zephyr server failure twice in a row on keys %s and %s! Aborting." %
(key, new_key))
print_status_and_exit(1)
else:
logging.warning("Replaced key %s with %s due to Zephyr server failure." %
(key, new_key))
receive_zephyrs()
receive_zephyrs()
logger.info("Sent Zephyr messages!")
# Send Zulips
for key, (stream, test) in hzkeys.items():
if stream == "message":
send_zulip({
2017-01-24 01:06:13 -05:00
"type": "private",
"content": str(key),
"to": zulip_client.email,
})
else:
send_zulip({
2017-01-24 01:06:13 -05:00
"type": "stream",
"subject": "test",
"content": str(key),
"to": stream,
2017-01-24 00:34:26 -05:00
})
receive_zephyrs()
logger.info("Sent Zulip messages!")
# Normally messages manage to forward through in under 3 seconds, but
# sleep 10 to give a safe margin since the messages do need to do 2
# round trips. This alert is for correctness, not performance, and so
# we want it to reliably alert only when messages aren't being
# delivered at all.
time.sleep(10)
receive_zephyrs()
logger.info("Starting receiving messages!")
# receive zulips
res = zulip_client.get_events(queue_id=queue_id, last_event_id=last_event_id)
if 'error' in res['result']:
logging.error("Error receiving Zulips!")
logging.error(res['msg'])
print_status_and_exit(1)
messages = [event['message'] for event in res['events']]
logger.info("Finished receiving Zulip messages!")
receive_zephyrs()
logger.info("Finished receiving Zephyr messages!")
all_keys = set(list(zhkeys.keys()) + list(hzkeys.keys()))
def process_keys(content_list: List[str]) -> Tuple[Dict[str, int], Set[str], Set[str], bool, bool]:
2016-09-10 13:56:23 -04:00
# Start by filtering out any keys that might have come from
# concurrent check-mirroring processes
content_keys = [key for key in content_list if key in all_keys]
key_counts = {} # type: Dict[str, int]
for key in all_keys:
key_counts[key] = 0
for key in content_keys:
key_counts[key] += 1
z_missing = {key for key in zhkeys.keys() if key_counts[key] == 0}
h_missing = {key for key in hzkeys.keys() if key_counts[key] == 0}
duplicates = any(val > 1 for val in key_counts.values())
success = all(val == 1 for val in key_counts.values())
return key_counts, z_missing, h_missing, duplicates, success
# The h_foo variables are about the messages we _received_ in Zulip
# The z_foo variables are about the messages we _received_ in Zephyr
h_contents = [message["content"] for message in messages]
z_contents = [notice.message.split('\0')[1] for notice in notices]
(h_key_counts, h_missing_z, h_missing_h, h_duplicates, h_success) = process_keys(h_contents)
(z_key_counts, z_missing_z, z_missing_h, z_duplicates, z_success) = process_keys(z_contents)
if z_success and h_success:
logger.info("Success!")
print_status_and_exit(0)
elif z_success:
logger.info("Received everything correctly in Zephyr!")
elif h_success:
logger.info("Received everything correctly in Zulip!")
logger.error("Messages received the wrong number of times:")
for key in all_keys:
if z_key_counts[key] == 1 and h_key_counts[key] == 1:
continue
if key in zhkeys:
(stream, test) = zhkeys[key]
2016-12-03 12:07:49 -05:00
logger.warning("%10s: z got %s, h got %s. Sent via Zephyr(%s): class %s" %
(key, z_key_counts[key], h_key_counts[key], test, stream))
if key in hzkeys:
(stream, test) = hzkeys[key]
2016-12-03 12:07:49 -05:00
logger.warning("%10s: z got %s. h got %s. Sent via Zulip(%s): class %s" %
(key, z_key_counts[key], h_key_counts[key], test, stream))
logger.error("")
logger.error("Summary of specific problems:")
if h_duplicates:
logger.error("zulip: Received duplicate messages!")
logger.error("zulip: This is probably a bug in our message loop detection.")
logger.error("zulip: where Zulips go zulip=>zephyr=>zulip")
if z_duplicates:
logger.error("zephyr: Received duplicate messages!")
logger.error("zephyr: This is probably a bug in our message loop detection.")
logger.error("zephyr: where Zephyrs go zephyr=>zulip=>zephyr")
if z_missing_z:
logger.error("zephyr: Didn't receive all the Zephyrs we sent on the Zephyr end!")
logger.error("zephyr: This is probably an issue with check-mirroring sending or receiving Zephyrs.")
if h_missing_h:
logger.error("zulip: Didn't receive all the Zulips we sent on the Zulip end!")
logger.error("zulip: This is probably an issue with check-mirroring sending or receiving Zulips.")
if z_missing_h:
logger.error("zephyr: Didn't receive all the Zulips we sent on the Zephyr end!")
if z_missing_h == h_missing_h:
logger.error("zephyr: Including some Zulips that we did receive on the Zulip end.")
logger.error("zephyr: This suggests we have a zulip=>zephyr mirroring problem.")
logger.error("zephyr: aka the personals mirroring script has issues.")
if h_missing_z:
logger.error("zulip: Didn't receive all the Zephyrs we sent on the Zulip end!")
if h_missing_z == z_missing_z:
logger.error("zulip: Including some Zephyrs that we did receive on the Zephyr end.")
logger.error("zulip: This suggests we have a zephyr=>zulip mirroring problem.")
logger.error("zulip: aka the global class mirroring script has issues.")
zulip_client.deregister(queue_id)
print_status_and_exit(1)