python-zulip-api/zulip/integrations/zephyr/check-mirroring

403 lines
14 KiB
Plaintext
Raw Permalink Normal View History

2020-04-02 09:59:28 -04:00
#!/usr/bin/env python3
2021-05-28 05:00:04 -04:00
import hashlib
import logging
import optparse
import random
import subprocess
2021-05-28 05:00:04 -04:00
import sys
import time
from ctypes import byref, c_int, c_ushort
2021-05-28 05:00:04 -04:00
from typing import Dict, List, Set, Tuple
import zephyr_ctypes
2021-05-28 05:00:04 -04:00
import zulip
2016-09-10 13:56:23 -04:00
parser = optparse.OptionParser()
parser.add_option("--verbose", dest="verbose", default=False, action="store_true")
parser.add_option("--site", dest="site", default=None, action="store")
parser.add_option("--sharded", default=False, action="store_true")
(options, args) = parser.parse_args()
mit_user = "tabbott/extra@ATHENA.MIT.EDU"
zulip_client = zulip.Client(verbose=True, client="ZulipMonitoring/0.1", site=options.site)
# Configure logging
log_file = "/var/log/zulip/check-mirroring-log"
log_format = "%(asctime)s: %(message)s"
logging.basicConfig(format=log_format)
formatter = logging.Formatter(log_format)
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(formatter)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.addHandler(file_handler)
# Initialize list of streams to test
if options.sharded:
# NOTE: Streams in this list must be in the zulip_user's Zulip
# subscriptions, or we won't receive messages via Zulip.
# The sharded stream list has a bunch of pairs
# (stream, shard_name), where sha1sum(stream).startswith(shard_name)
test_streams = [
("message", "p"),
("tabbott-nagios-test-32", "0"),
("tabbott-nagios-test-33", "1"),
2017-01-24 00:21:14 -05:00
("tabbott-nagios-test-2", "2"),
("tabbott-nagios-test-5", "3"),
("tabbott-nagios-test-13", "4"),
2017-01-24 00:21:14 -05:00
("tabbott-nagios-test-7", "5"),
("tabbott-nagios-test-22", "6"),
("tabbott-nagios-test-35", "7"),
2017-01-24 00:21:14 -05:00
("tabbott-nagios-test-4", "8"),
("tabbott-nagios-test-3", "9"),
("tabbott-nagios-test-1", "a"),
("tabbott-nagios-test-49", "b"),
("tabbott-nagios-test-34", "c"),
("tabbott-nagios-test-12", "d"),
("tabbott-nagios-test-11", "e"),
2017-01-24 00:21:14 -05:00
("tabbott-nagios-test-9", "f"),
2017-01-24 00:34:26 -05:00
]
for (stream, test) in test_streams:
if stream == "message":
continue
assert hashlib.sha1(stream.encode("utf-8")).hexdigest().startswith(test)
else:
test_streams = [
("message", "p"),
("tabbott-nagios-test", "a"),
2017-01-24 00:34:26 -05:00
]
def print_status_and_exit(status: int) -> None:
2016-09-10 13:56:23 -04:00
# The output of this script is used by Nagios. Various outputs,
# e.g. true success and punting due to a SERVNAK, result in a
# non-alert case, so to give us something unambiguous to check in
# Nagios, print the exit status.
print(status)
sys.exit(status)
def send_zulip(message: Dict[str, str]) -> None:
result = zulip_client.send_message(message)
if result["result"] != "success":
logger.error("Error sending zulip, args were:")
logger.error(str(message))
logger.error(str(result))
print_status_and_exit(1)
# Returns True if and only if we "Detected server failure" sending the zephyr.
def send_zephyr(zwrite_args: List[str], content: str) -> bool:
p = subprocess.Popen(
zwrite_args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
)
stdout, stderr = p.communicate(input=content)
if p.returncode != 0:
if "Detected server failure while receiving acknowledgement for" in stdout:
logger.warning("Got server failure error sending zephyr; retrying")
logger.warning(stderr)
return True
logger.error("Error sending zephyr:")
logger.info(stdout)
logger.error(stderr)
print_status_and_exit(1)
return False
# Subscribe to Zulip
try:
res = zulip_client.register(event_types=["message"])
if "error" in res["result"]:
logging.error("Error subscribing to Zulips!")
logging.error(res["msg"])
print_status_and_exit(1)
queue_id, last_event_id = (res["queue_id"], res["last_event_id"])
except Exception:
logger.exception("Unexpected error subscribing to Zulips")
print_status_and_exit(1)
# Subscribe to Zephyrs
zephyr_subs_to_add = []
for (stream, test) in test_streams:
if stream == "message":
zephyr_subs_to_add.append((stream, "personal", mit_user))
else:
zephyr_subs_to_add.append((stream, "*", "*"))
actually_subscribed = False
for tries in range(10):
try:
zephyr_ctypes.check(zephyr_ctypes.ZInitialize())
zephyr_port = c_ushort()
zephyr_ctypes.check(zephyr_ctypes.ZOpenPort(byref(zephyr_port)))
zephyr_ctypes.check(zephyr_ctypes.ZCancelSubscriptions(0))
zephyr_ctypes.check(
zephyr_ctypes.ZSubscribeTo(
(zephyr_ctypes.ZSubscription_t * len(zephyr_subs_to_add))(
*(
zephyr_ctypes.ZSubscription_t(
zsub_class=cls.encode(),
zsub_classinst=instance.encode(),
zsub_recipient=recipient.encode(),
)
for cls, instance, recipient in zephyr_subs_to_add
)
),
len(zephyr_subs_to_add),
0,
)
)
try:
nsubs = c_int()
zephyr_ctypes.check(zephyr_ctypes.ZRetrieveSubscriptions(0, byref(nsubs)))
zsubs = (zephyr_ctypes.ZSubscription_t * nsubs.value)()
zephyr_ctypes.check(zephyr_ctypes.ZGetSubscriptions(zsubs, byref(nsubs)))
zephyr_subs = {
(
zsub.zsub_class.decode(),
zsub.zsub_classinst.decode(),
zsub.zsub_recipient.decode(),
)
for zsub in zsubs
}
finally:
zephyr_ctypes.ZFlushSubscriptions()
missing = 0
for elt in zephyr_subs_to_add:
if elt not in zephyr_subs:
logging.error(f"Failed to subscribe to {elt}")
missing += 1
if missing == 0:
actually_subscribed = True
break
except zephyr_ctypes.ZephyrError as e:
if e.code == zephyr_ctypes.ZERR_SERVNAK:
logger.error("SERVNAK repeatedly received, punting rest of test")
else:
logger.exception("Exception subscribing to zephyrs")
if not actually_subscribed:
logger.error("Failed to subscribe to zephyrs")
print_status_and_exit(1)
# Prepare keys
zhkeys = {} # type: Dict[str, Tuple[str, str]]
hzkeys = {} # type: Dict[str, Tuple[str, str]]
def gen_key(key_dict: Dict[str, Tuple[str, str]]) -> str:
bits = str(random.getrandbits(32))
while bits in key_dict:
# Avoid the unlikely event that we get the same bits twice
bits = str(random.getrandbits(32))
return bits
def gen_keys(key_dict: Dict[str, Tuple[str, str]]) -> None:
for (stream, test) in test_streams:
key_dict[gen_key(key_dict)] = (stream, test)
gen_keys(zhkeys)
gen_keys(hzkeys)
notices = []
# We check for new zephyrs multiple times, to avoid filling the zephyr
# receive queue with 30+ messages, which might result in messages
# being dropped.
def receive_zephyrs() -> None:
while zephyr_ctypes.ZPending() != 0:
notice = zephyr_ctypes.ZNotice_t()
sender = zephyr_ctypes.sockaddr_in()
try:
zephyr_ctypes.check(zephyr_ctypes.ZReceiveNotice(byref(notice), byref(sender)))
except zephyr_ctypes.ZephyrError:
logging.exception("Exception receiving zephyrs:")
break
if notice.z_opcode != b"":
zephyr_ctypes.ZFreeNotice(byref(notice))
continue
notices.append(notice)
logger.info("Starting sending messages!")
# Send zephyrs
zsig = "Timothy Good Abbott"
for key, (stream, test) in zhkeys.items():
if stream == "message":
zwrite_args = ["zwrite", "-n", "-s", zsig, mit_user]
else:
zwrite_args = ["zwrite", "-n", "-s", zsig, "-c", stream, "-i", "test"]
server_failure = send_zephyr(zwrite_args, str(key))
if server_failure:
# Replace the key we're not sure was delivered with a new key
value = zhkeys.pop(key)
new_key = gen_key(zhkeys)
zhkeys[new_key] = value
server_failure_again = send_zephyr(zwrite_args, str(new_key))
if server_failure_again:
logging.error(
"Zephyr server failure twice in a row on keys %s and %s! Aborting."
% (key, new_key)
)
print_status_and_exit(1)
else:
logging.warning(f"Replaced key {key} with {new_key} due to Zephyr server failure.")
receive_zephyrs()
receive_zephyrs()
logger.info("Sent Zephyr messages!")
# Send Zulips
for key, (stream, test) in hzkeys.items():
if stream == "message":
send_zulip(
{
"type": "private",
"content": str(key),
"to": zulip_client.email,
}
)
else:
send_zulip(
{
"type": "stream",
"subject": "test",
"content": str(key),
"to": stream,
}
)
receive_zephyrs()
logger.info("Sent Zulip messages!")
# Normally messages manage to forward through in under 3 seconds, but
# sleep 10 to give a safe margin since the messages do need to do 2
# round trips. This alert is for correctness, not performance, and so
# we want it to reliably alert only when messages aren't being
# delivered at all.
time.sleep(10)
receive_zephyrs()
logger.info("Starting receiving messages!")
# receive zulips
res = zulip_client.get_events(queue_id=queue_id, last_event_id=last_event_id)
if "error" in res["result"]:
logging.error("Error receiving Zulips!")
logging.error(res["msg"])
print_status_and_exit(1)
messages = [event["message"] for event in res["events"]]
logger.info("Finished receiving Zulip messages!")
receive_zephyrs()
logger.info("Finished receiving Zephyr messages!")
all_keys = set(list(zhkeys.keys()) + list(hzkeys.keys()))
def process_keys(content_list: List[str]) -> Tuple[Dict[str, int], Set[str], Set[str], bool, bool]:
2016-09-10 13:56:23 -04:00
# Start by filtering out any keys that might have come from
# concurrent check-mirroring processes
content_keys = [key for key in content_list if key in all_keys]
key_counts = {} # type: Dict[str, int]
for key in all_keys:
key_counts[key] = 0
for key in content_keys:
key_counts[key] += 1
z_missing = {key for key in zhkeys.keys() if key_counts[key] == 0}
h_missing = {key for key in hzkeys.keys() if key_counts[key] == 0}
duplicates = any(val > 1 for val in key_counts.values())
success = all(val == 1 for val in key_counts.values())
return key_counts, z_missing, h_missing, duplicates, success
# The h_foo variables are about the messages we _received_ in Zulip
# The z_foo variables are about the messages we _received_ in Zephyr
h_contents = [message["content"] for message in messages]
z_contents = [
notice.z_message[: notice.z_message_len].split(b"\0")[1].decode(errors="replace")
for notice in notices
]
(h_key_counts, h_missing_z, h_missing_h, h_duplicates, h_success) = process_keys(h_contents)
(z_key_counts, z_missing_z, z_missing_h, z_duplicates, z_success) = process_keys(z_contents)
for notice in notices:
zephyr_ctypes.ZFreeNotice(byref(notice))
if z_success and h_success:
logger.info("Success!")
print_status_and_exit(0)
elif z_success:
logger.info("Received everything correctly in Zephyr!")
elif h_success:
logger.info("Received everything correctly in Zulip!")
logger.error("Messages received the wrong number of times:")
for key in all_keys:
if z_key_counts[key] == 1 and h_key_counts[key] == 1:
continue
if key in zhkeys:
(stream, test) = zhkeys[key]
logger.warning(
"%10s: z got %s, h got %s. Sent via Zephyr(%s): class %s"
% (key, z_key_counts[key], h_key_counts[key], test, stream)
)
if key in hzkeys:
(stream, test) = hzkeys[key]
logger.warning(
"%10s: z got %s. h got %s. Sent via Zulip(%s): class %s"
% (key, z_key_counts[key], h_key_counts[key], test, stream)
)
logger.error("")
logger.error("Summary of specific problems:")
if h_duplicates:
logger.error("zulip: Received duplicate messages!")
logger.error("zulip: This is probably a bug in our message loop detection.")
logger.error("zulip: where Zulips go zulip=>zephyr=>zulip")
if z_duplicates:
logger.error("zephyr: Received duplicate messages!")
logger.error("zephyr: This is probably a bug in our message loop detection.")
logger.error("zephyr: where Zephyrs go zephyr=>zulip=>zephyr")
if z_missing_z:
logger.error("zephyr: Didn't receive all the Zephyrs we sent on the Zephyr end!")
logger.error(
"zephyr: This is probably an issue with check-mirroring sending or receiving Zephyrs."
)
if h_missing_h:
logger.error("zulip: Didn't receive all the Zulips we sent on the Zulip end!")
logger.error(
"zulip: This is probably an issue with check-mirroring sending or receiving Zulips."
)
if z_missing_h:
logger.error("zephyr: Didn't receive all the Zulips we sent on the Zephyr end!")
if z_missing_h == h_missing_h:
logger.error("zephyr: Including some Zulips that we did receive on the Zulip end.")
logger.error("zephyr: This suggests we have a zulip=>zephyr mirroring problem.")
logger.error("zephyr: aka the personals mirroring script has issues.")
if h_missing_z:
logger.error("zulip: Didn't receive all the Zephyrs we sent on the Zulip end!")
if h_missing_z == z_missing_z:
logger.error("zulip: Including some Zephyrs that we did receive on the Zephyr end.")
logger.error("zulip: This suggests we have a zephyr=>zulip mirroring problem.")
logger.error("zulip: aka the global class mirroring script has issues.")
zulip_client.deregister(queue_id)
print_status_and_exit(1)