 3d0f7955b6
			
		
	
	
		3d0f7955b6
		
	
	
	
	
		
			
			In order to keep all three packages (zulip, zulip_bots, zulip_botserver) in the same repo, all package files must now be nested one level deeper. For instance, python-zulip-api/zulip_bots/zulip_bots/bots/, instead of python-zulip-api/zulip_bots/bots/.
		
			
				
	
	
		
			366 lines
		
	
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable file
		
	
	
	
	
			
		
		
	
	
			366 lines
		
	
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable file
		
	
	
	
	
| #!/usr/bin/env python
 | |
| from __future__ import print_function
 | |
| from __future__ import absolute_import
 | |
| import sys
 | |
| import time
 | |
| import optparse
 | |
| import os
 | |
| import random
 | |
| import logging
 | |
| import subprocess
 | |
| import hashlib
 | |
| from six.moves import range
 | |
| 
 | |
| if False:
 | |
|     from typing import Any, Dict, List, Set, Tuple
 | |
| 
 | |
| parser = optparse.OptionParser()
 | |
| parser.add_option('--verbose',
 | |
|                   dest='verbose',
 | |
|                   default=False,
 | |
|                   action='store_true')
 | |
| parser.add_option('--site',
 | |
|                   dest='site',
 | |
|                   default=None,
 | |
|                   action='store')
 | |
| parser.add_option('--sharded',
 | |
|                   default=False,
 | |
|                   action='store_true')
 | |
| parser.add_option('--root-path',
 | |
|                   dest='root_path',
 | |
|                   default="/home/zulip",
 | |
|                   action='store')
 | |
| (options, args) = parser.parse_args()
 | |
| 
 | |
| # The 'api' directory needs to go first, so that 'import zulip' won't pick up
 | |
| # some other directory named 'zulip'.
 | |
| pyzephyr_lib_path = "python-zephyr/build/lib.linux-%s-%s/" % (os.uname()[4], sys.version[0:3])
 | |
| sys.path[:0] = [os.path.join(options.root_path, "api/"),
 | |
|                 os.path.join(options.root_path, "python-zephyr"),
 | |
|                 os.path.join(options.root_path, pyzephyr_lib_path),
 | |
|                 options.root_path]
 | |
| 
 | |
| mit_user = 'tabbott/extra@ATHENA.MIT.EDU'
 | |
| 
 | |
| sys.path.append(".")
 | |
| import zulip
 | |
| zulip_client = zulip.Client(
 | |
|     verbose=True,
 | |
|     client="ZulipMonitoring/0.1",
 | |
|     site=options.site)
 | |
| 
 | |
| # Configure logging
 | |
| log_file     = "/var/log/zulip/check-mirroring-log"
 | |
| log_format   = "%(asctime)s: %(message)s"
 | |
| logging.basicConfig(format=log_format)
 | |
| 
 | |
| formatter    = logging.Formatter(log_format)
 | |
| file_handler = logging.FileHandler(log_file)
 | |
| file_handler.setFormatter(formatter)
 | |
| 
 | |
| logger       = logging.getLogger(__name__)
 | |
| logger.setLevel(logging.DEBUG)
 | |
| logger.addHandler(file_handler)
 | |
| 
 | |
| # Initialize list of streams to test
 | |
| if options.sharded:
 | |
|     # NOTE: Streams in this list must be in the zulip_user's Zulip
 | |
|     # subscriptions, or we won't receive messages via Zulip.
 | |
| 
 | |
|     # The sharded stream list has a bunch of pairs
 | |
|     # (stream, shard_name), where sha1sum(stream).startswith(shard_name)
 | |
|     test_streams = [
 | |
|         ("message", "p"),
 | |
|         ("tabbott-nagios-test-32", "0"),
 | |
|         ("tabbott-nagios-test-33", "1"),
 | |
|         ("tabbott-nagios-test-2", "2"),
 | |
|         ("tabbott-nagios-test-5", "3"),
 | |
|         ("tabbott-nagios-test-13", "4"),
 | |
|         ("tabbott-nagios-test-7", "5"),
 | |
|         ("tabbott-nagios-test-22", "6"),
 | |
|         ("tabbott-nagios-test-35", "7"),
 | |
|         ("tabbott-nagios-test-4", "8"),
 | |
|         ("tabbott-nagios-test-3", "9"),
 | |
|         ("tabbott-nagios-test-1", "a"),
 | |
|         ("tabbott-nagios-test-49", "b"),
 | |
|         ("tabbott-nagios-test-34", "c"),
 | |
|         ("tabbott-nagios-test-12", "d"),
 | |
|         ("tabbott-nagios-test-11", "e"),
 | |
|         ("tabbott-nagios-test-9", "f"),
 | |
|     ]
 | |
|     for (stream, test) in test_streams:
 | |
|         if stream == "message":
 | |
|             continue
 | |
|         assert(hashlib.sha1(stream.encode("utf-8")).hexdigest().startswith(test))
 | |
| else:
 | |
|     test_streams = [
 | |
|         ("message", "p"),
 | |
|         ("tabbott-nagios-test", "a"),
 | |
|     ]
 | |
| 
 | |
| def print_status_and_exit(status):
 | |
|     # type: (int) -> None
 | |
| 
 | |
|     # The output of this script is used by Nagios. Various outputs,
 | |
|     # e.g. true success and punting due to a SERVNAK, result in a
 | |
|     # non-alert case, so to give us something unambiguous to check in
 | |
|     # Nagios, print the exit status.
 | |
|     print(status)
 | |
|     sys.exit(status)
 | |
| 
 | |
| def send_zulip(message):
 | |
|     # type: (Dict[str, str]) -> None
 | |
|     result = zulip_client.send_message(message)
 | |
|     if result["result"] != "success":
 | |
|         logger.error("Error sending zulip, args were:")
 | |
|         logger.error(str(message))
 | |
|         logger.error(str(result))
 | |
|         print_status_and_exit(1)
 | |
| 
 | |
| # Returns True if and only if we "Detected server failure" sending the zephyr.
 | |
| def send_zephyr(zwrite_args, content):
 | |
|     # type: (List[str], str) -> bool
 | |
|     p = subprocess.Popen(zwrite_args, stdin=subprocess.PIPE,
 | |
|                          stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 | |
|     stdout, stderr = p.communicate(input=content.encode("utf-8"))
 | |
|     if p.returncode != 0:
 | |
|         if "Detected server failure while receiving acknowledgement for" in stdout:
 | |
|             logger.warning("Got server failure error sending zephyr; retrying")
 | |
|             logger.warning(stderr)
 | |
|             return True
 | |
|         logger.error("Error sending zephyr:")
 | |
|         logger.info(stdout)
 | |
|         logger.error(stderr)
 | |
|         print_status_and_exit(1)
 | |
|     return False
 | |
| 
 | |
| # Subscribe to Zulip
 | |
| try:
 | |
|     res = zulip_client.register(event_types=["message"])
 | |
|     if 'error' in res.get('result'):
 | |
|         logging.error("Error subscribing to Zulips!")
 | |
|         logging.error(res['msg'])
 | |
|         print_status_and_exit(1)
 | |
|     queue_id, last_event_id = (res['queue_id'], res['last_event_id'])
 | |
| except Exception:
 | |
|     logger.exception("Unexpected error subscribing to Zulips")
 | |
|     print_status_and_exit(1)
 | |
| 
 | |
| # Subscribe to Zephyrs
 | |
| import zephyr
 | |
| zephyr_subs_to_add = []
 | |
| for (stream, test) in test_streams:
 | |
|     if stream == "message":
 | |
|         zephyr_subs_to_add.append((stream, 'personal', mit_user))
 | |
|     else:
 | |
|         zephyr_subs_to_add.append((stream, '*', '*'))
 | |
| 
 | |
| actually_subscribed = False
 | |
| for tries in range(10):
 | |
|     try:
 | |
|         zephyr.init()
 | |
|         zephyr._z.subAll(zephyr_subs_to_add)
 | |
|         zephyr_subs = zephyr._z.getSubscriptions()
 | |
| 
 | |
|         missing = 0
 | |
|         for elt in zephyr_subs_to_add:
 | |
|             if elt not in zephyr_subs:
 | |
|                 logging.error("Failed to subscribe to %s" % (elt,))
 | |
|                 missing += 1
 | |
|         if missing == 0:
 | |
|             actually_subscribed = True
 | |
|             break
 | |
|     except IOError as e:
 | |
|         if "SERVNAK received" in e:  # type: ignore # https://github.com/python/mypy/issues/2118
 | |
|             logger.error("SERVNAK repeatedly received, punting rest of test")
 | |
|         else:
 | |
|             logger.exception("Exception subscribing to zephyrs")
 | |
| 
 | |
| if not actually_subscribed:
 | |
|     logger.error("Failed to subscribe to zephyrs")
 | |
|     print_status_and_exit(1)
 | |
| 
 | |
| # Prepare keys
 | |
| zhkeys = {}  # type: Dict[str, Tuple[str, str]]
 | |
| hzkeys = {}  # type: Dict[str, Tuple[str, str]]
 | |
| def gen_key(key_dict):
 | |
|     # type: (Dict[str, Any]) -> str
 | |
|     bits = str(random.getrandbits(32))
 | |
|     while bits in key_dict:
 | |
|         # Avoid the unlikely event that we get the same bits twice
 | |
|         bits = str(random.getrandbits(32))
 | |
|     return bits
 | |
| 
 | |
| def gen_keys(key_dict):
 | |
|     # type: (Dict[str, Tuple[str, str]]) -> None
 | |
|     for (stream, test) in test_streams:
 | |
|         key_dict[gen_key(key_dict)] = (stream, test)
 | |
| 
 | |
| gen_keys(zhkeys)
 | |
| gen_keys(hzkeys)
 | |
| 
 | |
| notices = []
 | |
| 
 | |
| # We check for new zephyrs multiple times, to avoid filling the zephyr
 | |
| # receive queue with 30+ messages, which might result in messages
 | |
| # being dropped.
 | |
| def receive_zephyrs():
 | |
|     # type: () -> None
 | |
|     while True:
 | |
|         try:
 | |
|             notice = zephyr.receive(block=False)
 | |
|         except Exception:
 | |
|             logging.exception("Exception receiving zephyrs:")
 | |
|             notice = None
 | |
|         if notice is None:
 | |
|             break
 | |
|         if notice.opcode != "":
 | |
|             continue
 | |
|         notices.append(notice)
 | |
| 
 | |
| logger.info("Starting sending messages!")
 | |
| # Send zephyrs
 | |
| zsig = "Timothy Good Abbott"
 | |
| for key, (stream, test) in zhkeys.items():
 | |
|     if stream == "message":
 | |
|         zwrite_args = ["zwrite", "-n", "-s", zsig, mit_user]
 | |
|     else:
 | |
|         zwrite_args = ["zwrite", "-n", "-s", zsig, "-c", stream, "-i", "test"]
 | |
|     server_failure = send_zephyr(zwrite_args, str(key))
 | |
|     if server_failure:
 | |
|         # Replace the key we're not sure was delivered with a new key
 | |
|         value = zhkeys.pop(key)
 | |
|         new_key = gen_key(zhkeys)
 | |
|         zhkeys[new_key] = value
 | |
|         server_failure_again = send_zephyr(zwrite_args, str(new_key))
 | |
|         if server_failure_again:
 | |
|             logging.error("Zephyr server failure twice in a row on keys %s and %s!  Aborting." %
 | |
|                           (key, new_key))
 | |
|             print_status_and_exit(1)
 | |
|         else:
 | |
|             logging.warning("Replaced key %s with %s due to Zephyr server failure." %
 | |
|                             (key, new_key))
 | |
|     receive_zephyrs()
 | |
| 
 | |
| receive_zephyrs()
 | |
| logger.info("Sent Zephyr messages!")
 | |
| 
 | |
| # Send Zulips
 | |
| for key, (stream, test) in hzkeys.items():
 | |
|     if stream == "message":
 | |
|         send_zulip({
 | |
|             "type": "private",
 | |
|             "content": str(key),
 | |
|             "to": zulip_client.email,
 | |
|         })
 | |
|     else:
 | |
|         send_zulip({
 | |
|             "type": "stream",
 | |
|             "subject": "test",
 | |
|             "content": str(key),
 | |
|             "to": stream,
 | |
|         })
 | |
|     receive_zephyrs()
 | |
| 
 | |
| logger.info("Sent Zulip messages!")
 | |
| 
 | |
| # Normally messages manage to forward through in under 3 seconds, but
 | |
| # sleep 10 to give a safe margin since the messages do need to do 2
 | |
| # round trips.  This alert is for correctness, not performance, and so
 | |
| # we want it to reliably alert only when messages aren't being
 | |
| # delivered at all.
 | |
| time.sleep(10)
 | |
| receive_zephyrs()
 | |
| 
 | |
| logger.info("Starting receiving messages!")
 | |
| 
 | |
| # receive zulips
 | |
| res = zulip_client.get_events(queue_id=queue_id, last_event_id=last_event_id)
 | |
| if 'error' in res.get('result'):
 | |
|     logging.error("Error subscribing to Zulips!")
 | |
|     logging.error(res['msg'])
 | |
|     print_status_and_exit(1)
 | |
| messages = [event['message'] for event in res['events']]
 | |
| logger.info("Finished receiving Zulip messages!")
 | |
| 
 | |
| receive_zephyrs()
 | |
| logger.info("Finished receiving Zephyr messages!")
 | |
| 
 | |
| all_keys = set(list(zhkeys.keys()) + list(hzkeys.keys()))
 | |
| def process_keys(content_list):
 | |
|     # type: (List[str]) -> Tuple[Dict[str, int], Set[str], Set[str], bool, bool]
 | |
| 
 | |
|     # Start by filtering out any keys that might have come from
 | |
|     # concurrent check-mirroring processes
 | |
|     content_keys = [key for key in content_list if key in all_keys]
 | |
|     key_counts = {}  # type: Dict[str, int]
 | |
|     for key in all_keys:
 | |
|         key_counts[key] = 0
 | |
|     for key in content_keys:
 | |
|         key_counts[key] += 1
 | |
|     z_missing = set(key for key in zhkeys.keys() if key_counts[key] == 0)
 | |
|     h_missing = set(key for key in hzkeys.keys() if key_counts[key] == 0)
 | |
|     duplicates = any(val > 1 for val in key_counts.values())
 | |
|     success = all(val == 1 for val in key_counts.values())
 | |
|     return key_counts, z_missing, h_missing, duplicates, success
 | |
| 
 | |
| # The h_foo variables are about the messages we _received_ in Zulip
 | |
| # The z_foo variables are about the messages we _received_ in Zephyr
 | |
| h_contents = [message["content"] for message in messages]
 | |
| z_contents = [notice.message.split('\0')[1] for notice in notices]
 | |
| (h_key_counts, h_missing_z, h_missing_h, h_duplicates, h_success) = process_keys(h_contents)
 | |
| (z_key_counts, z_missing_z, z_missing_h, z_duplicates, z_success) = process_keys(z_contents)
 | |
| 
 | |
| if z_success and h_success:
 | |
|     logger.info("Success!")
 | |
|     print_status_and_exit(0)
 | |
| elif z_success:
 | |
|     logger.info("Received everything correctly in Zephyr!")
 | |
| elif h_success:
 | |
|     logger.info("Received everything correctly in Zulip!")
 | |
| 
 | |
| logger.error("Messages received the wrong number of times:")
 | |
| for key in all_keys:
 | |
|     if z_key_counts[key] == 1 and h_key_counts[key] == 1:
 | |
|         continue
 | |
|     if key in zhkeys:
 | |
|         (stream, test) = zhkeys[key]
 | |
|         logger.warning("%10s: z got %s, h got %s.  Sent via Zephyr(%s): class %s" %
 | |
|                        (key, z_key_counts[key], h_key_counts[key], test, stream))
 | |
|     if key in hzkeys:
 | |
|         (stream, test) = hzkeys[key]
 | |
|         logger.warning("%10s: z got %s. h got %s.  Sent via Zulip(%s): class %s" %
 | |
|                        (key, z_key_counts[key], h_key_counts[key], test, stream))
 | |
| logger.error("")
 | |
| logger.error("Summary of specific problems:")
 | |
| 
 | |
| if h_duplicates:
 | |
|     logger.error("zulip: Received duplicate messages!")
 | |
|     logger.error("zulip: This is probably a bug in our message loop detection.")
 | |
|     logger.error("zulip: where Zulips go zulip=>zephyr=>zulip")
 | |
| if z_duplicates:
 | |
|     logger.error("zephyr: Received duplicate messages!")
 | |
|     logger.error("zephyr: This is probably a bug in our message loop detection.")
 | |
|     logger.error("zephyr: where Zephyrs go zephyr=>zulip=>zephyr")
 | |
| 
 | |
| if z_missing_z:
 | |
|     logger.error("zephyr: Didn't receive all the Zephyrs we sent on the Zephyr end!")
 | |
|     logger.error("zephyr: This is probably an issue with check-mirroring sending or receiving Zephyrs.")
 | |
| if h_missing_h:
 | |
|     logger.error("zulip: Didn't receive all the Zulips we sent on the Zulip end!")
 | |
|     logger.error("zulip: This is probably an issue with check-mirroring sending or receiving Zulips.")
 | |
| if z_missing_h:
 | |
|     logger.error("zephyr: Didn't receive all the Zulips we sent on the Zephyr end!")
 | |
|     if z_missing_h == h_missing_h:
 | |
|         logger.error("zephyr: Including some Zulips that we did receive on the Zulip end.")
 | |
|         logger.error("zephyr: This suggests we have a zulip=>zephyr mirroring problem.")
 | |
|         logger.error("zephyr: aka the personals mirroring script has issues.")
 | |
| if h_missing_z:
 | |
|     logger.error("zulip: Didn't receive all the Zephyrs we sent on the Zulip end!")
 | |
|     if h_missing_z == z_missing_z:
 | |
|         logger.error("zulip: Including some Zephyrs that we did receive on the Zephyr end.")
 | |
|         logger.error("zulip: This suggests we have a zephyr=>zulip mirroring problem.")
 | |
|         logger.error("zulip: aka the global class mirroring script has issues.")
 | |
| 
 | |
| zulip_client.deregister(queue_id)
 | |
| print_status_and_exit(1)
 |