#!/usr/bin/env python3 import argparse import json import logging import os import sys import unicodedata sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'api')) import zulip def write_public_streams() -> None: public_streams = set() for stream_name in stream_names: # Zephyr class names are canonicalized by first applying NFKC # normalization and then lower-casing server-side canonical_cls = unicodedata.normalize("NFKC", stream_name).lower() if canonical_cls in ['security', 'login', 'network', 'ops', 'user_locate', 'mit', 'moof', 'wsmonitor', 'wg_ctl', 'winlogger', 'hm_ctl', 'hm_stat', 'zephyr_admin', 'zephyr_ctl']: # These zephyr classes cannot be subscribed to by us, due # to MIT's Zephyr access control settings continue public_streams.add(canonical_cls) with open("/home/zulip/public_streams.tmp", "w") as f: f.write(json.dumps(list(public_streams)) + "\n") os.rename("/home/zulip/public_streams.tmp", "/home/zulip/public_streams") if __name__ == "__main__": log_file = "/home/zulip/sync_public_streams.log" logger = logging.getLogger(__name__) log_format = "%(asctime)s: %(message)s" logging.basicConfig(format=log_format) formatter = logging.Formatter(log_format) logger.setLevel(logging.DEBUG) file_handler = logging.FileHandler(log_file) file_handler.setFormatter(formatter) logger.addHandler(file_handler) parser = zulip.add_default_arguments(argparse.ArgumentParser()) options = parser.parse_args() zulip_client = zulip.Client(client="ZulipSyncPublicStreamsBot/0.1") backoff = zulip.RandomExponentialBackoff() while backoff.keep_going(): try: res = zulip_client.register(event_types=["stream"]) if res["result"] != "success": backoff.fail() logger.error("Error registering event queue:\n%r", res) continue except Exception: logger.exception("Error registering event queue:") continue backoff.succeed() queue_id = res["queue_id"] last_event_id = res["last_event_id"] stream_names = {stream["name"] for stream in res["streams"]} write_public_streams() while backoff.keep_going(): try: res = zulip_client.get_events(queue_id=queue_id, last_event_id=last_event_id) if res["result"] != "success": backoff.fail() logger.error("Error getting events:\n%r", res) if res["result"] == "error": # Break out to the outer loop to re-register the event queue. break continue except Exception: logger.exception("Error getting events:") continue backoff.succeed() for event in res["events"]: last_event_id = max(last_event_id, event["id"]) if event["type"] == "stream": if event["op"] == "create": stream_names.update( stream["name"] for stream in event["streams"] ) write_public_streams() elif event["op"] == "delete": stream_names.difference_update( stream["name"] for stream in event["streams"] ) write_public_streams()