zephyr_mirror: Validate zephyr subscriptions happened successfully.

(imported from commit 494895b87eb9b555175c9890c5c2046cf4ce40ac)
This commit is contained in:
Tim Abbott 2012-11-28 15:09:15 -05:00
parent 121c3cb9c8
commit de14210a4d

View file

@ -147,10 +147,30 @@ def zephyr_bulk_subscribe(subs):
# retrying the next time the bot checks its subscriptions are
# up to date.
logger.exception("Error subscribing to streams (will retry automatically):")
logging.debug("Streams were: %s" % ((cls for cls, instance, recipient in subs),))
logging.warning("Streams were: %s" % ((cls for cls, instance, recipient in subs),))
return
try:
actual_zephyr_subs = [cls for (cls, _, _) in zephyr._z.getSubscriptions()]
except IOError:
logging.exception("Error getting current Zephyr subscriptions")
# Don't add anything to current_zephyr_subs so that we'll
# retry the next time we check for streams to subscribe to
# (within 15 seconds).
return
for (cls, instance, recipient) in subs:
current_zephyr_subs.add(cls)
if cls not in actual_zephyr_subs:
logging.error("Zephyr failed to subscribe us to %s; will retry" % (cls,))
try:
# We'll retry automatically when we next check for
# streams to subscribe to (within 15 seconds), but
# it's worth doing 1 retry immediately to avoid
# missing 15 seconds of messages on the affected
# classes
zephyr._z.sub(cls, instance, recipient)
except IOError:
pass
else:
current_zephyr_subs.add(cls)
def update_subscriptions_from_humbug():
try:
@ -167,6 +187,10 @@ def update_subscriptions_from_humbug():
for stream in streams:
if stream in current_zephyr_subs:
continue
if stream in ['security', 'login', 'network']:
# These zephyr classes cannot be subscribed to by us, due
# to MIT's Zephyr access control settings
continue
if (options.shard is not None and
not hashlib.sha1(stream).hexdigest().startswith(options.shard)):
# This stream is being handled by a different zephyr_mirror job.