sync-public-streams: Rewrite using an event queue.
Signed-off-by: Anders Kaseorg <anders@zulip.com>
This commit is contained in:
parent
15c46dce46
commit
83d4a0c217
|
@ -3,32 +3,16 @@ import sys
|
||||||
import os
|
import os
|
||||||
import logging
|
import logging
|
||||||
import argparse
|
import argparse
|
||||||
import time
|
|
||||||
import json
|
import json
|
||||||
import subprocess
|
|
||||||
import unicodedata
|
import unicodedata
|
||||||
|
|
||||||
sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'api'))
|
sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'api'))
|
||||||
import zulip
|
import zulip
|
||||||
|
|
||||||
from typing import Set, Optional
|
def write_public_streams() -> None:
|
||||||
|
|
||||||
def fetch_public_streams() -> Optional[Set[str]]:
|
|
||||||
public_streams = set()
|
public_streams = set()
|
||||||
|
|
||||||
try:
|
for stream_name in stream_names:
|
||||||
res = zulip_client.get_streams(include_all_active=True)
|
|
||||||
if res.get("result") == "success":
|
|
||||||
streams = res["streams"]
|
|
||||||
else:
|
|
||||||
logging.error("Error getting public streams:\n%s" % (res,))
|
|
||||||
return None
|
|
||||||
except Exception:
|
|
||||||
logging.exception("Error getting public streams:")
|
|
||||||
return None
|
|
||||||
|
|
||||||
for stream in streams:
|
|
||||||
stream_name = stream["name"]
|
|
||||||
# Zephyr class names are canonicalized by first applying NFKC
|
# Zephyr class names are canonicalized by first applying NFKC
|
||||||
# normalization and then lower-casing server-side
|
# normalization and then lower-casing server-side
|
||||||
canonical_cls = unicodedata.normalize("NFKC", stream_name).lower()
|
canonical_cls = unicodedata.normalize("NFKC", stream_name).lower()
|
||||||
|
@ -41,7 +25,9 @@ def fetch_public_streams() -> Optional[Set[str]]:
|
||||||
|
|
||||||
public_streams.add(canonical_cls)
|
public_streams.add(canonical_cls)
|
||||||
|
|
||||||
return public_streams
|
with open("/home/zulip/public_streams.tmp", "w") as f:
|
||||||
|
f.write(json.dumps(list(public_streams)) + "\n")
|
||||||
|
os.rename("/home/zulip/public_streams.tmp", "/home/zulip/public_streams")
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
log_file = "/home/zulip/sync_public_streams.log"
|
log_file = "/home/zulip/sync_public_streams.log"
|
||||||
|
@ -58,22 +44,50 @@ if __name__ == "__main__":
|
||||||
options = parser.parse_args()
|
options = parser.parse_args()
|
||||||
|
|
||||||
zulip_client = zulip.Client(client="ZulipSyncPublicStreamsBot/0.1")
|
zulip_client = zulip.Client(client="ZulipSyncPublicStreamsBot/0.1")
|
||||||
|
backoff = zulip.RandomExponentialBackoff()
|
||||||
|
|
||||||
while True:
|
while backoff.keep_going():
|
||||||
# Sync every 5 minutes because this makes a very large network
|
try:
|
||||||
# request, due to Zephyr users who subscribe to 10K+ class
|
res = zulip_client.register(event_types=["stream"])
|
||||||
# names generated by a script.
|
if res["result"] != "success":
|
||||||
#
|
backoff.fail()
|
||||||
# This delay means we won't subscribe to new Zephyr classes
|
logger.error("Error registering event queue:\n%r", res)
|
||||||
# until 5 minutes after they are created in Zulip; this isn't
|
continue
|
||||||
# great but an acceptable tradeoff.
|
except Exception:
|
||||||
time.sleep(300)
|
logger.exception("Error registering event queue:")
|
||||||
public_streams = fetch_public_streams()
|
|
||||||
if public_streams is None:
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
f = open("/home/zulip/public_streams.tmp", "w")
|
backoff.succeed()
|
||||||
f.write(json.dumps(list(public_streams)) + "\n")
|
queue_id = res["queue_id"]
|
||||||
f.close()
|
last_event_id = res["last_event_id"]
|
||||||
|
stream_names = {stream["name"] for stream in res["streams"]}
|
||||||
|
write_public_streams()
|
||||||
|
|
||||||
subprocess.call(["mv", "/home/zulip/public_streams.tmp", "/home/zulip/public_streams"])
|
while backoff.keep_going():
|
||||||
|
try:
|
||||||
|
res = zulip_client.get_events(queue_id=queue_id, last_event_id=last_event_id)
|
||||||
|
if res["result"] != "success":
|
||||||
|
backoff.fail()
|
||||||
|
logger.error("Error getting events:\n%r", res)
|
||||||
|
if res["result"] == "error":
|
||||||
|
# Break out to the outer loop to re-register the event queue.
|
||||||
|
break
|
||||||
|
continue
|
||||||
|
except Exception:
|
||||||
|
logger.exception("Error getting events:")
|
||||||
|
continue
|
||||||
|
|
||||||
|
backoff.succeed()
|
||||||
|
for event in res["events"]:
|
||||||
|
last_event_id = max(last_event_id, event["id"])
|
||||||
|
if event["type"] == "stream":
|
||||||
|
if event["op"] == "create":
|
||||||
|
stream_names.update(
|
||||||
|
stream["name"] for stream in event["streams"]
|
||||||
|
)
|
||||||
|
write_public_streams()
|
||||||
|
elif event["op"] == "delete":
|
||||||
|
stream_names.difference_update(
|
||||||
|
stream["name"] for stream in event["streams"]
|
||||||
|
)
|
||||||
|
write_public_streams()
|
||||||
|
|
Loading…
Reference in a new issue