zephyr_mirror: Read list of public streams from sync-public-streams ouput.
(imported from commit c91828534bfed4047c16a98b926335f4beded005)
This commit is contained in:
parent
de713f552c
commit
bd3bd8ca44
|
@ -174,36 +174,26 @@ def zephyr_bulk_subscribe(subs):
|
||||||
else:
|
else:
|
||||||
current_zephyr_subs.add(cls)
|
current_zephyr_subs.add(cls)
|
||||||
|
|
||||||
def update_subscriptions_from_humbug():
|
def update_subscriptions():
|
||||||
try:
|
try:
|
||||||
res = humbug_client.get_public_streams()
|
f = file("/home/humbug/public_streams", "r")
|
||||||
if res.get("result") == "success":
|
public_streams = simplejson.loads(f.read())
|
||||||
streams = res["streams"]
|
f.close()
|
||||||
else:
|
except:
|
||||||
logger.error("Error getting public streams:\n%s" % res)
|
logger.exception("Error reading public streams:")
|
||||||
return
|
|
||||||
except Exception:
|
|
||||||
logger.exception("Error getting public streams:")
|
|
||||||
return
|
return
|
||||||
|
|
||||||
classes_to_subscribe = set()
|
classes_to_subscribe = set()
|
||||||
for stream in streams:
|
for stream in public_streams:
|
||||||
# Zephyr class names are canonicalized by first applying NFKC
|
zephyr_class = stream.encode("utf-8")
|
||||||
# normalization and then lower-casing server-side
|
|
||||||
canonical_cls = unicodedata.normalize("NFKC", stream).lower().encode("utf-8")
|
|
||||||
if canonical_cls in current_zephyr_subs:
|
|
||||||
continue
|
|
||||||
if canonical_cls in ['security', 'login', 'network', 'ops', 'user_locate',
|
|
||||||
'mit',
|
|
||||||
'hm_ctl', 'hm_stat', 'zephyr_admin', 'zephyr_ctl']:
|
|
||||||
# These zephyr classes cannot be subscribed to by us, due
|
|
||||||
# to MIT's Zephyr access control settings
|
|
||||||
continue
|
|
||||||
if (options.shard is not None and
|
if (options.shard is not None and
|
||||||
not hashlib.sha1(canonical_cls).hexdigest().startswith(options.shard)):
|
not hashlib.sha1(zephyr_class).hexdigest().startswith(options.shard)):
|
||||||
# This stream is being handled by a different zephyr_mirror job.
|
# This stream is being handled by a different zephyr_mirror job.
|
||||||
continue
|
continue
|
||||||
|
if zephyr_class in current_zephyr_subs:
|
||||||
|
continue
|
||||||
|
classes_to_subscribe.add((zephyr_class, "*", "*"))
|
||||||
|
|
||||||
classes_to_subscribe.add((canonical_cls, "*", "*"))
|
|
||||||
if len(classes_to_subscribe) > 0:
|
if len(classes_to_subscribe) > 0:
|
||||||
zephyr_bulk_subscribe(list(classes_to_subscribe))
|
zephyr_bulk_subscribe(list(classes_to_subscribe))
|
||||||
|
|
||||||
|
@ -260,7 +250,7 @@ def process_loop(log):
|
||||||
if options.forward_class_messages:
|
if options.forward_class_messages:
|
||||||
# Ask the Humbug server about any new classes to subscribe to
|
# Ask the Humbug server about any new classes to subscribe to
|
||||||
try:
|
try:
|
||||||
update_subscriptions_from_humbug()
|
update_subscriptions()
|
||||||
except Exception:
|
except Exception:
|
||||||
logging.exception("Error updating subscriptions from Humbug:")
|
logging.exception("Error updating subscriptions from Humbug:")
|
||||||
|
|
||||||
|
@ -377,7 +367,7 @@ def zephyr_subscribe_autoretry(sub):
|
||||||
|
|
||||||
def zephyr_to_humbug(options):
|
def zephyr_to_humbug(options):
|
||||||
if options.forward_class_messages:
|
if options.forward_class_messages:
|
||||||
update_subscriptions_from_humbug()
|
update_subscriptions()
|
||||||
if options.forward_personals:
|
if options.forward_personals:
|
||||||
# Subscribe to personals; we really can't operate without
|
# Subscribe to personals; we really can't operate without
|
||||||
# those subscriptions, so just retry until it works.
|
# those subscriptions, so just retry until it works.
|
||||||
|
|
Loading…
Reference in a new issue