api: Move the API package to a dedicated subdirectory.

In order to keep all three packages (zulip, zulip_bots,
zulip_botserver) in the same repo, all package files must now
be nested one level deeper.

For instance, python-zulip-api/zulip_bots/zulip_bots/bots/, instead
of python-zulip-api/zulip_bots/bots/.
This commit is contained in:
Eeshan Garg 2017-07-18 01:49:51 -02:30
parent 879f44ab3a
commit 3d0f7955b6
59 changed files with 186 additions and 192 deletions

View file

@ -0,0 +1,366 @@
#!/usr/bin/env python
from __future__ import print_function
from __future__ import absolute_import
import sys
import time
import optparse
import os
import random
import logging
import subprocess
import hashlib
from six.moves import range
if False:
from typing import Any, Dict, List, Set, Tuple
parser = optparse.OptionParser()
parser.add_option('--verbose',
dest='verbose',
default=False,
action='store_true')
parser.add_option('--site',
dest='site',
default=None,
action='store')
parser.add_option('--sharded',
default=False,
action='store_true')
parser.add_option('--root-path',
dest='root_path',
default="/home/zulip",
action='store')
(options, args) = parser.parse_args()
# The 'api' directory needs to go first, so that 'import zulip' won't pick up
# some other directory named 'zulip'.
pyzephyr_lib_path = "python-zephyr/build/lib.linux-%s-%s/" % (os.uname()[4], sys.version[0:3])
sys.path[:0] = [os.path.join(options.root_path, "api/"),
os.path.join(options.root_path, "python-zephyr"),
os.path.join(options.root_path, pyzephyr_lib_path),
options.root_path]
mit_user = 'tabbott/extra@ATHENA.MIT.EDU'
sys.path.append(".")
import zulip
zulip_client = zulip.Client(
verbose=True,
client="ZulipMonitoring/0.1",
site=options.site)
# Configure logging
log_file = "/var/log/zulip/check-mirroring-log"
log_format = "%(asctime)s: %(message)s"
logging.basicConfig(format=log_format)
formatter = logging.Formatter(log_format)
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(formatter)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.addHandler(file_handler)
# Initialize list of streams to test
if options.sharded:
# NOTE: Streams in this list must be in the zulip_user's Zulip
# subscriptions, or we won't receive messages via Zulip.
# The sharded stream list has a bunch of pairs
# (stream, shard_name), where sha1sum(stream).startswith(shard_name)
test_streams = [
("message", "p"),
("tabbott-nagios-test-32", "0"),
("tabbott-nagios-test-33", "1"),
("tabbott-nagios-test-2", "2"),
("tabbott-nagios-test-5", "3"),
("tabbott-nagios-test-13", "4"),
("tabbott-nagios-test-7", "5"),
("tabbott-nagios-test-22", "6"),
("tabbott-nagios-test-35", "7"),
("tabbott-nagios-test-4", "8"),
("tabbott-nagios-test-3", "9"),
("tabbott-nagios-test-1", "a"),
("tabbott-nagios-test-49", "b"),
("tabbott-nagios-test-34", "c"),
("tabbott-nagios-test-12", "d"),
("tabbott-nagios-test-11", "e"),
("tabbott-nagios-test-9", "f"),
]
for (stream, test) in test_streams:
if stream == "message":
continue
assert(hashlib.sha1(stream.encode("utf-8")).hexdigest().startswith(test))
else:
test_streams = [
("message", "p"),
("tabbott-nagios-test", "a"),
]
def print_status_and_exit(status):
# type: (int) -> None
# The output of this script is used by Nagios. Various outputs,
# e.g. true success and punting due to a SERVNAK, result in a
# non-alert case, so to give us something unambiguous to check in
# Nagios, print the exit status.
print(status)
sys.exit(status)
def send_zulip(message):
# type: (Dict[str, str]) -> None
result = zulip_client.send_message(message)
if result["result"] != "success":
logger.error("Error sending zulip, args were:")
logger.error(str(message))
logger.error(str(result))
print_status_and_exit(1)
# Returns True if and only if we "Detected server failure" sending the zephyr.
def send_zephyr(zwrite_args, content):
# type: (List[str], str) -> bool
p = subprocess.Popen(zwrite_args, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate(input=content.encode("utf-8"))
if p.returncode != 0:
if "Detected server failure while receiving acknowledgement for" in stdout:
logger.warning("Got server failure error sending zephyr; retrying")
logger.warning(stderr)
return True
logger.error("Error sending zephyr:")
logger.info(stdout)
logger.error(stderr)
print_status_and_exit(1)
return False
# Subscribe to Zulip
try:
res = zulip_client.register(event_types=["message"])
if 'error' in res.get('result'):
logging.error("Error subscribing to Zulips!")
logging.error(res['msg'])
print_status_and_exit(1)
queue_id, last_event_id = (res['queue_id'], res['last_event_id'])
except Exception:
logger.exception("Unexpected error subscribing to Zulips")
print_status_and_exit(1)
# Subscribe to Zephyrs
import zephyr
zephyr_subs_to_add = []
for (stream, test) in test_streams:
if stream == "message":
zephyr_subs_to_add.append((stream, 'personal', mit_user))
else:
zephyr_subs_to_add.append((stream, '*', '*'))
actually_subscribed = False
for tries in range(10):
try:
zephyr.init()
zephyr._z.subAll(zephyr_subs_to_add)
zephyr_subs = zephyr._z.getSubscriptions()
missing = 0
for elt in zephyr_subs_to_add:
if elt not in zephyr_subs:
logging.error("Failed to subscribe to %s" % (elt,))
missing += 1
if missing == 0:
actually_subscribed = True
break
except IOError as e:
if "SERVNAK received" in e: # type: ignore # https://github.com/python/mypy/issues/2118
logger.error("SERVNAK repeatedly received, punting rest of test")
else:
logger.exception("Exception subscribing to zephyrs")
if not actually_subscribed:
logger.error("Failed to subscribe to zephyrs")
print_status_and_exit(1)
# Prepare keys
zhkeys = {} # type: Dict[str, Tuple[str, str]]
hzkeys = {} # type: Dict[str, Tuple[str, str]]
def gen_key(key_dict):
# type: (Dict[str, Any]) -> str
bits = str(random.getrandbits(32))
while bits in key_dict:
# Avoid the unlikely event that we get the same bits twice
bits = str(random.getrandbits(32))
return bits
def gen_keys(key_dict):
# type: (Dict[str, Tuple[str, str]]) -> None
for (stream, test) in test_streams:
key_dict[gen_key(key_dict)] = (stream, test)
gen_keys(zhkeys)
gen_keys(hzkeys)
notices = []
# We check for new zephyrs multiple times, to avoid filling the zephyr
# receive queue with 30+ messages, which might result in messages
# being dropped.
def receive_zephyrs():
# type: () -> None
while True:
try:
notice = zephyr.receive(block=False)
except Exception:
logging.exception("Exception receiving zephyrs:")
notice = None
if notice is None:
break
if notice.opcode != "":
continue
notices.append(notice)
logger.info("Starting sending messages!")
# Send zephyrs
zsig = "Timothy Good Abbott"
for key, (stream, test) in zhkeys.items():
if stream == "message":
zwrite_args = ["zwrite", "-n", "-s", zsig, mit_user]
else:
zwrite_args = ["zwrite", "-n", "-s", zsig, "-c", stream, "-i", "test"]
server_failure = send_zephyr(zwrite_args, str(key))
if server_failure:
# Replace the key we're not sure was delivered with a new key
value = zhkeys.pop(key)
new_key = gen_key(zhkeys)
zhkeys[new_key] = value
server_failure_again = send_zephyr(zwrite_args, str(new_key))
if server_failure_again:
logging.error("Zephyr server failure twice in a row on keys %s and %s! Aborting." %
(key, new_key))
print_status_and_exit(1)
else:
logging.warning("Replaced key %s with %s due to Zephyr server failure." %
(key, new_key))
receive_zephyrs()
receive_zephyrs()
logger.info("Sent Zephyr messages!")
# Send Zulips
for key, (stream, test) in hzkeys.items():
if stream == "message":
send_zulip({
"type": "private",
"content": str(key),
"to": zulip_client.email,
})
else:
send_zulip({
"type": "stream",
"subject": "test",
"content": str(key),
"to": stream,
})
receive_zephyrs()
logger.info("Sent Zulip messages!")
# Normally messages manage to forward through in under 3 seconds, but
# sleep 10 to give a safe margin since the messages do need to do 2
# round trips. This alert is for correctness, not performance, and so
# we want it to reliably alert only when messages aren't being
# delivered at all.
time.sleep(10)
receive_zephyrs()
logger.info("Starting receiving messages!")
# receive zulips
res = zulip_client.get_events(queue_id=queue_id, last_event_id=last_event_id)
if 'error' in res.get('result'):
logging.error("Error subscribing to Zulips!")
logging.error(res['msg'])
print_status_and_exit(1)
messages = [event['message'] for event in res['events']]
logger.info("Finished receiving Zulip messages!")
receive_zephyrs()
logger.info("Finished receiving Zephyr messages!")
all_keys = set(list(zhkeys.keys()) + list(hzkeys.keys()))
def process_keys(content_list):
# type: (List[str]) -> Tuple[Dict[str, int], Set[str], Set[str], bool, bool]
# Start by filtering out any keys that might have come from
# concurrent check-mirroring processes
content_keys = [key for key in content_list if key in all_keys]
key_counts = {} # type: Dict[str, int]
for key in all_keys:
key_counts[key] = 0
for key in content_keys:
key_counts[key] += 1
z_missing = set(key for key in zhkeys.keys() if key_counts[key] == 0)
h_missing = set(key for key in hzkeys.keys() if key_counts[key] == 0)
duplicates = any(val > 1 for val in key_counts.values())
success = all(val == 1 for val in key_counts.values())
return key_counts, z_missing, h_missing, duplicates, success
# The h_foo variables are about the messages we _received_ in Zulip
# The z_foo variables are about the messages we _received_ in Zephyr
h_contents = [message["content"] for message in messages]
z_contents = [notice.message.split('\0')[1] for notice in notices]
(h_key_counts, h_missing_z, h_missing_h, h_duplicates, h_success) = process_keys(h_contents)
(z_key_counts, z_missing_z, z_missing_h, z_duplicates, z_success) = process_keys(z_contents)
if z_success and h_success:
logger.info("Success!")
print_status_and_exit(0)
elif z_success:
logger.info("Received everything correctly in Zephyr!")
elif h_success:
logger.info("Received everything correctly in Zulip!")
logger.error("Messages received the wrong number of times:")
for key in all_keys:
if z_key_counts[key] == 1 and h_key_counts[key] == 1:
continue
if key in zhkeys:
(stream, test) = zhkeys[key]
logger.warning("%10s: z got %s, h got %s. Sent via Zephyr(%s): class %s" %
(key, z_key_counts[key], h_key_counts[key], test, stream))
if key in hzkeys:
(stream, test) = hzkeys[key]
logger.warning("%10s: z got %s. h got %s. Sent via Zulip(%s): class %s" %
(key, z_key_counts[key], h_key_counts[key], test, stream))
logger.error("")
logger.error("Summary of specific problems:")
if h_duplicates:
logger.error("zulip: Received duplicate messages!")
logger.error("zulip: This is probably a bug in our message loop detection.")
logger.error("zulip: where Zulips go zulip=>zephyr=>zulip")
if z_duplicates:
logger.error("zephyr: Received duplicate messages!")
logger.error("zephyr: This is probably a bug in our message loop detection.")
logger.error("zephyr: where Zephyrs go zephyr=>zulip=>zephyr")
if z_missing_z:
logger.error("zephyr: Didn't receive all the Zephyrs we sent on the Zephyr end!")
logger.error("zephyr: This is probably an issue with check-mirroring sending or receiving Zephyrs.")
if h_missing_h:
logger.error("zulip: Didn't receive all the Zulips we sent on the Zulip end!")
logger.error("zulip: This is probably an issue with check-mirroring sending or receiving Zulips.")
if z_missing_h:
logger.error("zephyr: Didn't receive all the Zulips we sent on the Zephyr end!")
if z_missing_h == h_missing_h:
logger.error("zephyr: Including some Zulips that we did receive on the Zulip end.")
logger.error("zephyr: This suggests we have a zulip=>zephyr mirroring problem.")
logger.error("zephyr: aka the personals mirroring script has issues.")
if h_missing_z:
logger.error("zulip: Didn't receive all the Zephyrs we sent on the Zulip end!")
if h_missing_z == z_missing_z:
logger.error("zulip: Including some Zephyrs that we did receive on the Zephyr end.")
logger.error("zulip: This suggests we have a zephyr=>zulip mirroring problem.")
logger.error("zulip: aka the global class mirroring script has issues.")
zulip_client.deregister(queue_id)
print_status_and_exit(1)

View file

@ -0,0 +1,42 @@
#!/usr/bin/env python
import sys
import subprocess
import base64
short_user = sys.argv[1]
api_key = sys.argv[2]
ccache_data_encoded = sys.argv[3]
# Update the Kerberos ticket cache file
program_name = "zmirror-%s" % (short_user,)
with open("/home/zulip/ccache/%s" % (program_name,), "w") as f:
f.write(base64.b64decode(ccache_data_encoded))
# Setup API key
api_key_path = "/home/zulip/api-keys/%s" % (program_name,)
open(api_key_path, "w").write(api_key + "\n")
# Setup supervisord configuration
supervisor_path = "/etc/supervisor/conf.d/%s.conf" % (program_name,)
template = "/home/zulip/zulip/api/integrations/zephyr/zmirror_private.conf.template"
template_data = open(template).read()
session_path = "/home/zulip/zephyr_sessions/%s" % (program_name,)
# Preserve mail zephyrs forwarding setting across rewriting the config file
try:
if "--forward-mail-zephyrs" in open(supervisor_path, "r").read():
template_data = template_data.replace("--use-sessions", "--use-sessions --forward-mail-zephyrs")
except Exception:
pass
open(supervisor_path, "w").write(template_data.replace("USERNAME", short_user))
# Delete your session
subprocess.check_call(["rm", "-f", session_path])
# Update your supervisor config, which may restart your mirror
subprocess.check_call(["supervisorctl", "reread"])
subprocess.check_call(["supervisorctl", "update"])
# Restart your mirror, in case it wasn't restarted by the previous
# (Otherwise if the mirror lost subs, this would do nothing)
# TODO: check whether we JUST restarted it first
subprocess.check_call(["supervisorctl", "restart", program_name])

View file

@ -0,0 +1,74 @@
#!/usr/bin/env python
import sys
import os
import logging
import optparse
import time
import simplejson
import subprocess
import unicodedata
sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'api'))
import zulip
from typing import Set
def fetch_public_streams():
# type: () -> Set[bytes]
public_streams = set()
try:
res = zulip_client.get_streams(include_all_active=True)
if res.get("result") == "success":
streams = res["streams"]
else:
logging.error("Error getting public streams:\n%s" % (res,))
return None
except Exception:
logging.exception("Error getting public streams:")
return None
for stream in streams:
stream_name = stream["name"]
# Zephyr class names are canonicalized by first applying NFKC
# normalization and then lower-casing server-side
canonical_cls = unicodedata.normalize("NFKC", stream_name).lower().encode("utf-8")
if canonical_cls in [b'security', b'login', b'network', b'ops', b'user_locate',
b'mit', b'moof', b'wsmonitor', b'wg_ctl', b'winlogger',
b'hm_ctl', b'hm_stat', b'zephyr_admin', b'zephyr_ctl']:
# These zephyr classes cannot be subscribed to by us, due
# to MIT's Zephyr access control settings
continue
public_streams.add(canonical_cls)
return public_streams
if __name__ == "__main__":
log_file = "/home/zulip/sync_public_streams.log"
logger = logging.getLogger(__name__)
log_format = "%(asctime)s: %(message)s"
logging.basicConfig(format=log_format)
formatter = logging.Formatter(log_format)
logger.setLevel(logging.DEBUG)
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
parser = optparse.OptionParser()
parser.add_option_group(zulip.generate_option_group(parser))
(options, args) = parser.parse_args()
zulip_client = zulip.Client(client="ZulipSyncPublicStreamsBot/0.1")
while True:
time.sleep(15)
public_streams = fetch_public_streams()
if public_streams is None:
continue
f = open("/home/zulip/public_streams.tmp", "w")
f.write(simplejson.dumps(list(public_streams)) + "\n")
f.close()
subprocess.call(["mv", "/home/zulip/public_streams.tmp", "/home/zulip/public_streams"])

View file

@ -0,0 +1,93 @@
#!/usr/bin/env python
# Copyright (C) 2012 Zulip, Inc.
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation files
# (the "Software"), to deal in the Software without restriction,
# including without limitation the rights to use, copy, modify, merge,
# publish, distribute, sublicense, and/or sell copies of the Software,
# and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
from __future__ import absolute_import
from __future__ import print_function
import sys
import subprocess
import os
import traceback
import signal
sys.path[:0] = [os.path.dirname(__file__)]
from zephyr_mirror_backend import parse_args
(options, args) = parse_args()
sys.path[:0] = [os.path.join(options.root_path, 'api')]
from types import FrameType
from typing import Any
def die(signal, frame):
# type: (int, FrameType) -> None
# We actually want to exit, so run os._exit (so as not to be caught and restarted)
os._exit(1)
signal.signal(signal.SIGINT, die)
from zulip import RandomExponentialBackoff
args = [os.path.join(options.root_path, "user_root", "zephyr_mirror_backend.py")]
args.extend(sys.argv[1:])
if options.sync_subscriptions:
subprocess.call(args)
sys.exit(0)
if options.forward_class_messages and not options.noshard:
sys.path.append("/home/zulip/zulip")
if options.on_startup_command is not None:
subprocess.call([options.on_startup_command])
from zerver.lib.parallel import run_parallel
print("Starting parallel zephyr class mirroring bot")
jobs = list("0123456789abcdef")
def run_job(shard):
# type: (str) -> int
subprocess.call(args + ["--shard=%s" % (shard,)])
return 0
for (status, job) in run_parallel(run_job, jobs, threads=16):
print("A mirroring shard died!")
pass
sys.exit(0)
backoff = RandomExponentialBackoff(timeout_success_equivalent=300)
while backoff.keep_going():
print("Starting zephyr mirroring bot")
try:
subprocess.call(args)
except Exception:
traceback.print_exc()
backoff.fail()
error_message = """
ERROR: The Zephyr mirroring bot is unable to continue mirroring Zephyrs.
This is often caused by failing to maintain unexpired Kerberos tickets
or AFS tokens. See https://zulipchat.com/zephyr for documentation on how to
maintain unexpired Kerberos tickets and AFS tokens.
"""
print(error_message)
sys.exit(1)

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,3 @@
#!/bin/sh
krb_user_id=1051
env KRB5CCNAME=/tmp/krb5cc_"$krb_user_id".tmp kinit -k -t /home/zulip/tabbott.extra.keytab tabbott/extra@ATHENA.MIT.EDU; mv /tmp/krb5cc_"$krb_user_id".tmp /tmp/krb5cc_"$krb_user_id"

View file

@ -0,0 +1,11 @@
[program:zmirror-USERNAME]
command=/home/zulip/zulip/api/integrations/zephyr/zephyr_mirror_backend.py --root-path=/home/zulip/zulip --user=USERNAME --log-path=/home/zulip/logs/mirror-log-%(program_name)s --use-sessions --session-path=/home/zulip/zephyr_sessions/%(program_name)s --api-key-file=/home/zulip/api-keys/%(program_name)s --ignore-expired-tickets --nagios-path=/home/zulip/mirror_status/%(program_name)s --nagios-class=zulip-mirror-nagios --site=https://zephyr.zulipchat.com
priority=200 ; the relative start priority (default 999)
autostart=true ; start at supervisord start (default: true)
autorestart=true ; whether/when to restart (default: unexpected)
stopsignal=TERM ; signal used to kill process (default TERM)
stopwaitsecs=30 ; max num secs to wait b4 SIGKILL (default 10)
user=zulip ; setuid to this UNIX account to run the program
redirect_stderr=true ; redirect proc stderr to stdout (default false)
stdout_logfile=/var/log/zulip/%(program_name)s.log ; stdout log path, NONE for none; default AUTO
environment=HOME="/home/zulip",USER="zulip",KRB5CCNAME="/home/zulip/ccache/%(program_name)s"