#!/usr/bin/env python
from __future__ import print_function
from __future__ import absolute_import
import sys
import time
import optparse
import os
import random
import logging
import subprocess
import hashlib
from six.moves import range

if False:
    from typing import Any, Dict, List, Set, Tuple

parser = optparse.OptionParser()
parser.add_option('--verbose',
                  dest='verbose',
                  default=False,
                  action='store_true')
parser.add_option('--site',
                  dest='site',
                  default=None,
                  action='store')
parser.add_option('--sharded',
                  default=False,
                  action='store_true')
parser.add_option('--root-path',
                  dest='root_path',
                  default="/home/zulip",
                  action='store')
(options, args) = parser.parse_args()

# The 'api' directory needs to go first, so that 'import zulip' won't pick up
# some other directory named 'zulip'.
pyzephyr_lib_path = "python-zephyr/build/lib.linux-%s-%s/" % (os.uname()[4], sys.version[0:3])
sys.path[:0] = [os.path.join(options.root_path, "api/"),
                os.path.join(options.root_path, "python-zephyr"),
                os.path.join(options.root_path, pyzephyr_lib_path),
                options.root_path]

mit_user = 'tabbott/extra@ATHENA.MIT.EDU'

sys.path.append(".")
import zulip
zulip_client = zulip.Client(
    verbose=True,
    client="ZulipMonitoring/0.1",
    site=options.site)

# Configure logging
log_file     = "/var/log/zulip/check-mirroring-log"
log_format   = "%(asctime)s: %(message)s"
logging.basicConfig(format=log_format)

formatter    = logging.Formatter(log_format)
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(formatter)

logger       = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.addHandler(file_handler)

# Initialize list of streams to test
if options.sharded:
    # NOTE: Streams in this list must be in the zulip_user's Zulip
    # subscriptions, or we won't receive messages via Zulip.

    # The sharded stream list has a bunch of pairs
    # (stream, shard_name), where sha1sum(stream).startswith(shard_name)
    test_streams = [
        ("message", "p"),
        ("tabbott-nagios-test-32", "0"),
        ("tabbott-nagios-test-33", "1"),
        ("tabbott-nagios-test-2", "2"),
        ("tabbott-nagios-test-5", "3"),
        ("tabbott-nagios-test-13", "4"),
        ("tabbott-nagios-test-7", "5"),
        ("tabbott-nagios-test-22", "6"),
        ("tabbott-nagios-test-35", "7"),
        ("tabbott-nagios-test-4", "8"),
        ("tabbott-nagios-test-3", "9"),
        ("tabbott-nagios-test-1", "a"),
        ("tabbott-nagios-test-49", "b"),
        ("tabbott-nagios-test-34", "c"),
        ("tabbott-nagios-test-12", "d"),
        ("tabbott-nagios-test-11", "e"),
        ("tabbott-nagios-test-9", "f"),
    ]
    for (stream, test) in test_streams:
        if stream == "message":
            continue
        assert(hashlib.sha1(stream.encode("utf-8")).hexdigest().startswith(test))
else:
    test_streams = [
        ("message", "p"),
        ("tabbott-nagios-test", "a"),
    ]

def print_status_and_exit(status):
    # type: (int) -> None

    # The output of this script is used by Nagios. Various outputs,
    # e.g. true success and punting due to a SERVNAK, result in a
    # non-alert case, so to give us something unambiguous to check in
    # Nagios, print the exit status.
    print(status)
    sys.exit(status)

def send_zulip(message):
    # type: (Dict[str, str]) -> None
    result = zulip_client.send_message(message)
    if result["result"] != "success":
        logger.error("Error sending zulip, args were:")
        logger.error(str(message))
        logger.error(str(result))
        print_status_and_exit(1)

# Returns True if and only if we "Detected server failure" sending the zephyr.
def send_zephyr(zwrite_args, content):
    # type: (List[str], str) -> bool
    p = subprocess.Popen(zwrite_args, stdin=subprocess.PIPE,
                         stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    stdout, stderr = p.communicate(input=content.encode("utf-8"))
    if p.returncode != 0:
        if "Detected server failure while receiving acknowledgement for" in stdout:
            logger.warning("Got server failure error sending zephyr; retrying")
            logger.warning(stderr)
            return True
        logger.error("Error sending zephyr:")
        logger.info(stdout)
        logger.error(stderr)
        print_status_and_exit(1)
    return False

# Subscribe to Zulip
try:
    res = zulip_client.register(event_types=["message"])
    if 'error' in res.get('result'):
        logging.error("Error subscribing to Zulips!")
        logging.error(res['msg'])
        print_status_and_exit(1)
    queue_id, last_event_id = (res['queue_id'], res['last_event_id'])
except Exception:
    logger.exception("Unexpected error subscribing to Zulips")
    print_status_and_exit(1)

# Subscribe to Zephyrs
import zephyr
zephyr_subs_to_add = []
for (stream, test) in test_streams:
    if stream == "message":
        zephyr_subs_to_add.append((stream, 'personal', mit_user))
    else:
        zephyr_subs_to_add.append((stream, '*', '*'))

actually_subscribed = False
for tries in range(10):
    try:
        zephyr.init()
        zephyr._z.subAll(zephyr_subs_to_add)
        zephyr_subs = zephyr._z.getSubscriptions()

        missing = 0
        for elt in zephyr_subs_to_add:
            if elt not in zephyr_subs:
                logging.error("Failed to subscribe to %s" % (elt,))
                missing += 1
        if missing == 0:
            actually_subscribed = True
            break
    except IOError as e:
        if "SERVNAK received" in e: # type: ignore # https://github.com/python/mypy/issues/2118
            logger.error("SERVNAK repeatedly received, punting rest of test")
        else:
            logger.exception("Exception subscribing to zephyrs")

if not actually_subscribed:
    logger.error("Failed to subscribe to zephyrs")
    print_status_and_exit(1)

# Prepare keys
zhkeys = {} # type: Dict[str, Tuple[str, str]]
hzkeys = {} # type: Dict[str, Tuple[str, str]]
def gen_key(key_dict):
    # type: (Dict[str, Any]) -> str
    bits = str(random.getrandbits(32))
    while bits in key_dict:
        # Avoid the unlikely event that we get the same bits twice
        bits = str(random.getrandbits(32))
    return bits

def gen_keys(key_dict):
    # type: (Dict[str, Tuple[str, str]]) -> None
    for (stream, test) in test_streams:
        key_dict[gen_key(key_dict)] = (stream, test)

gen_keys(zhkeys)
gen_keys(hzkeys)

notices = []

# We check for new zephyrs multiple times, to avoid filling the zephyr
# receive queue with 30+ messages, which might result in messages
# being dropped.
def receive_zephyrs():
    # type: () -> None
    while True:
        try:
            notice = zephyr.receive(block=False)
        except Exception:
            logging.exception("Exception receiving zephyrs:")
            notice = None
        if notice is None:
            break
        if notice.opcode != "":
            continue
        notices.append(notice)

logger.info("Starting sending messages!")
# Send zephyrs
zsig = "Timothy Good Abbott"
for key, (stream, test) in zhkeys.items():
    if stream == "message":
        zwrite_args = ["zwrite", "-n", "-s", zsig, mit_user]
    else:
        zwrite_args = ["zwrite", "-n", "-s", zsig, "-c", stream, "-i", "test"]
    server_failure = send_zephyr(zwrite_args, str(key))
    if server_failure:
        # Replace the key we're not sure was delivered with a new key
        value = zhkeys.pop(key)
        new_key = gen_key(zhkeys)
        zhkeys[new_key] = value
        server_failure_again = send_zephyr(zwrite_args, str(new_key))
        if server_failure_again:
            logging.error("Zephyr server failure twice in a row on keys %s and %s!  Aborting." %
                          (key, new_key))
            print_status_and_exit(1)
        else:
            logging.warning("Replaced key %s with %s due to Zephyr server failure." %
                            (key, new_key))
    receive_zephyrs()

receive_zephyrs()
logger.info("Sent Zephyr messages!")

# Send Zulips
for key, (stream, test) in hzkeys.items():
    if stream == "message":
        send_zulip({
            "type": "private",
            "content": str(key),
            "to": zulip_client.email,
        })
    else:
        send_zulip({
            "type": "stream",
            "subject": "test",
            "content": str(key),
            "to": stream,
        })
    receive_zephyrs()

logger.info("Sent Zulip messages!")

# Normally messages manage to forward through in under 3 seconds, but
# sleep 10 to give a safe margin since the messages do need to do 2
# round trips.  This alert is for correctness, not performance, and so
# we want it to reliably alert only when messages aren't being
# delivered at all.
time.sleep(10)
receive_zephyrs()

logger.info("Starting receiving messages!")

# receive zulips
res = zulip_client.get_events(queue_id=queue_id, last_event_id=last_event_id)
if 'error' in res.get('result'):
    logging.error("Error subscribing to Zulips!")
    logging.error(res['msg'])
    print_status_and_exit(1)
messages = [event['message'] for event in res['events']]
logger.info("Finished receiving Zulip messages!")

receive_zephyrs()
logger.info("Finished receiving Zephyr messages!")

all_keys = set(list(zhkeys.keys()) + list(hzkeys.keys()))
def process_keys(content_list):
    # type: (List[str]) -> Tuple[Dict[str, int], Set[str], Set[str], bool, bool]

    # Start by filtering out any keys that might have come from
    # concurrent check-mirroring processes
    content_keys = [key for key in content_list if key in all_keys]
    key_counts = {} # type: Dict[str, int]
    for key in all_keys:
        key_counts[key] = 0
    for key in content_keys:
        key_counts[key] += 1
    z_missing = set(key for key in zhkeys.keys() if key_counts[key] == 0)
    h_missing = set(key for key in hzkeys.keys() if key_counts[key] == 0)
    duplicates = any(val > 1 for val in key_counts.values())
    success = all(val == 1 for val in key_counts.values())
    return key_counts, z_missing, h_missing, duplicates, success

# The h_foo variables are about the messages we _received_ in Zulip
# The z_foo variables are about the messages we _received_ in Zephyr
h_contents = [message["content"] for message in messages]
z_contents = [notice.message.split('\0')[1] for notice in notices]
(h_key_counts, h_missing_z, h_missing_h, h_duplicates, h_success) = process_keys(h_contents)
(z_key_counts, z_missing_z, z_missing_h, z_duplicates, z_success) = process_keys(z_contents)

if z_success and h_success:
    logger.info("Success!")
    print_status_and_exit(0)
elif z_success:
    logger.info("Received everything correctly in Zephyr!")
elif h_success:
    logger.info("Received everything correctly in Zulip!")

logger.error("Messages received the wrong number of times:")
for key in all_keys:
    if z_key_counts[key] == 1 and h_key_counts[key] == 1:
        continue
    if key in zhkeys:
        (stream, test) = zhkeys[key]
        logger.warning("%10s: z got %s, h got %s.  Sent via Zephyr(%s): class %s" %
                       (key, z_key_counts[key], h_key_counts[key], test, stream))
    if key in hzkeys:
        (stream, test) = hzkeys[key]
        logger.warning("%10s: z got %s. h got %s.  Sent via Zulip(%s): class %s" %
                       (key, z_key_counts[key], h_key_counts[key], test, stream))
logger.error("")
logger.error("Summary of specific problems:")

if h_duplicates:
    logger.error("zulip: Received duplicate messages!")
    logger.error("zulip: This is probably a bug in our message loop detection.")
    logger.error("zulip: where Zulips go zulip=>zephyr=>zulip")
if z_duplicates:
    logger.error("zephyr: Received duplicate messages!")
    logger.error("zephyr: This is probably a bug in our message loop detection.")
    logger.error("zephyr: where Zephyrs go zephyr=>zulip=>zephyr")

if z_missing_z:
    logger.error("zephyr: Didn't receive all the Zephyrs we sent on the Zephyr end!")
    logger.error("zephyr: This is probably an issue with check-mirroring sending or receiving Zephyrs.")
if h_missing_h:
    logger.error("zulip: Didn't receive all the Zulips we sent on the Zulip end!")
    logger.error("zulip: This is probably an issue with check-mirroring sending or receiving Zulips.")
if z_missing_h:
    logger.error("zephyr: Didn't receive all the Zulips we sent on the Zephyr end!")
    if z_missing_h == h_missing_h:
        logger.error("zephyr: Including some Zulips that we did receive on the Zulip end.")
        logger.error("zephyr: This suggests we have a zulip=>zephyr mirroring problem.")
        logger.error("zephyr: aka the personals mirroring script has issues.")
if h_missing_z:
    logger.error("zulip: Didn't receive all the Zephyrs we sent on the Zulip end!")
    if h_missing_z == z_missing_z:
        logger.error("zulip: Including some Zephyrs that we did receive on the Zephyr end.")
        logger.error("zulip: This suggests we have a zephyr=>zulip mirroring problem.")
        logger.error("zulip: aka the global class mirroring script has issues.")

zulip_client.deregister(queue_id)
print_status_and_exit(1)