From 83d4a0c217683810162df8a2f438e27d48674edc Mon Sep 17 00:00:00 2001 From: Anders Kaseorg Date: Tue, 6 Apr 2021 15:48:55 -0700 Subject: [PATCH] sync-public-streams: Rewrite using an event queue. Signed-off-by: Anders Kaseorg --- zulip/integrations/zephyr/sync-public-streams | 82 +++++++++++-------- 1 file changed, 48 insertions(+), 34 deletions(-) diff --git a/zulip/integrations/zephyr/sync-public-streams b/zulip/integrations/zephyr/sync-public-streams index 87f77a7..053c655 100755 --- a/zulip/integrations/zephyr/sync-public-streams +++ b/zulip/integrations/zephyr/sync-public-streams @@ -3,32 +3,16 @@ import sys import os import logging import argparse -import time import json -import subprocess import unicodedata sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'api')) import zulip -from typing import Set, Optional - -def fetch_public_streams() -> Optional[Set[str]]: +def write_public_streams() -> None: public_streams = set() - try: - res = zulip_client.get_streams(include_all_active=True) - if res.get("result") == "success": - streams = res["streams"] - else: - logging.error("Error getting public streams:\n%s" % (res,)) - return None - except Exception: - logging.exception("Error getting public streams:") - return None - - for stream in streams: - stream_name = stream["name"] + for stream_name in stream_names: # Zephyr class names are canonicalized by first applying NFKC # normalization and then lower-casing server-side canonical_cls = unicodedata.normalize("NFKC", stream_name).lower() @@ -41,7 +25,9 @@ def fetch_public_streams() -> Optional[Set[str]]: public_streams.add(canonical_cls) - return public_streams + with open("/home/zulip/public_streams.tmp", "w") as f: + f.write(json.dumps(list(public_streams)) + "\n") + os.rename("/home/zulip/public_streams.tmp", "/home/zulip/public_streams") if __name__ == "__main__": log_file = "/home/zulip/sync_public_streams.log" @@ -58,22 +44,50 @@ if __name__ == "__main__": options = parser.parse_args() zulip_client = zulip.Client(client="ZulipSyncPublicStreamsBot/0.1") + backoff = zulip.RandomExponentialBackoff() - while True: - # Sync every 5 minutes because this makes a very large network - # request, due to Zephyr users who subscribe to 10K+ class - # names generated by a script. - # - # This delay means we won't subscribe to new Zephyr classes - # until 5 minutes after they are created in Zulip; this isn't - # great but an acceptable tradeoff. - time.sleep(300) - public_streams = fetch_public_streams() - if public_streams is None: + while backoff.keep_going(): + try: + res = zulip_client.register(event_types=["stream"]) + if res["result"] != "success": + backoff.fail() + logger.error("Error registering event queue:\n%r", res) + continue + except Exception: + logger.exception("Error registering event queue:") continue - f = open("/home/zulip/public_streams.tmp", "w") - f.write(json.dumps(list(public_streams)) + "\n") - f.close() + backoff.succeed() + queue_id = res["queue_id"] + last_event_id = res["last_event_id"] + stream_names = {stream["name"] for stream in res["streams"]} + write_public_streams() - subprocess.call(["mv", "/home/zulip/public_streams.tmp", "/home/zulip/public_streams"]) + while backoff.keep_going(): + try: + res = zulip_client.get_events(queue_id=queue_id, last_event_id=last_event_id) + if res["result"] != "success": + backoff.fail() + logger.error("Error getting events:\n%r", res) + if res["result"] == "error": + # Break out to the outer loop to re-register the event queue. + break + continue + except Exception: + logger.exception("Error getting events:") + continue + + backoff.succeed() + for event in res["events"]: + last_event_id = max(last_event_id, event["id"]) + if event["type"] == "stream": + if event["op"] == "create": + stream_names.update( + stream["name"] for stream in event["streams"] + ) + write_public_streams() + elif event["op"] == "delete": + stream_names.difference_update( + stream["name"] for stream in event["streams"] + ) + write_public_streams()