zephyr_mirror: Move class name canonicalization earlier.
We should be canonicalizing stream names to class names in update_subscriptions_from_humbug, before we even decide which classes to subscribe to; otherwise deduplication and tracking of which classes we're already subscribed to won't work. (imported from commit a751b6fca1022390a087516a0730ff77f13d7edf)
This commit is contained in:
parent
df5161bcbd
commit
abb95fca04
|
@ -159,12 +159,7 @@ def zephyr_bulk_subscribe(subs):
|
||||||
# (within 15 seconds).
|
# (within 15 seconds).
|
||||||
return
|
return
|
||||||
for (cls, instance, recipient) in subs:
|
for (cls, instance, recipient) in subs:
|
||||||
# Zephyr class names are canonicalized by first applying NFKC
|
if cls not in actual_zephyr_subs:
|
||||||
# normalization and then lower-casing server-side -- so we
|
|
||||||
# need to compare against those to see if we've successfully
|
|
||||||
# subscribed.
|
|
||||||
canonical_cls = unicodedata.normalize("NFKC", cls.decode("utf-8").lower()).encode("utf-8")
|
|
||||||
if canonical_cls not in actual_zephyr_subs:
|
|
||||||
logging.error("Zephyr failed to subscribe us to %s; will retry" % (cls,))
|
logging.error("Zephyr failed to subscribe us to %s; will retry" % (cls,))
|
||||||
try:
|
try:
|
||||||
# We'll retry automatically when we next check for
|
# We'll retry automatically when we next check for
|
||||||
|
@ -189,23 +184,25 @@ def update_subscriptions_from_humbug():
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception("Error getting public streams:")
|
logger.exception("Error getting public streams:")
|
||||||
return
|
return
|
||||||
streams_to_subscribe = []
|
classes_to_subscribe = set()
|
||||||
for stream in streams:
|
for stream in streams:
|
||||||
encoded_stream = stream.encode("utf-8")
|
# Zephyr class names are canonicalized by first applying NFKC
|
||||||
if stream in current_zephyr_subs:
|
# normalization and then lower-casing server-side
|
||||||
|
canonical_cls = unicodedata.normalize("NFKC", stream).lower().encode("utf-8")
|
||||||
|
if canonical_cls in current_zephyr_subs:
|
||||||
continue
|
continue
|
||||||
if stream.lower() in ['security', 'login', 'network', 'ops', 'user_locate']:
|
if canonical_cls in ['security', 'login', 'network', 'ops', 'user_locate']:
|
||||||
# These zephyr classes cannot be subscribed to by us, due
|
# These zephyr classes cannot be subscribed to by us, due
|
||||||
# to MIT's Zephyr access control settings
|
# to MIT's Zephyr access control settings
|
||||||
continue
|
continue
|
||||||
if (options.shard is not None and
|
if (options.shard is not None and
|
||||||
not hashlib.sha1(encoded_stream).hexdigest().startswith(options.shard)):
|
not hashlib.sha1(canonical_cls).hexdigest().startswith(options.shard)):
|
||||||
# This stream is being handled by a different zephyr_mirror job.
|
# This stream is being handled by a different zephyr_mirror job.
|
||||||
continue
|
continue
|
||||||
|
|
||||||
streams_to_subscribe.append((encoded_stream, "*", "*"))
|
classes_to_subscribe.add((canonical_cls, "*", "*"))
|
||||||
if len(streams_to_subscribe) > 0:
|
if len(classes_to_subscribe) > 0:
|
||||||
zephyr_bulk_subscribe(streams_to_subscribe)
|
zephyr_bulk_subscribe(list(classes_to_subscribe))
|
||||||
|
|
||||||
def maybe_restart_mirroring_script():
|
def maybe_restart_mirroring_script():
|
||||||
if os.stat(os.path.join(options.root_path, "stamps", "restart_stamp")).st_mtime > start_time or \
|
if os.stat(os.path.join(options.root_path, "stamps", "restart_stamp")).st_mtime > start_time or \
|
||||||
|
|
Loading…
Reference in a new issue