zephyr_mirror: Use zephyr's bulk subscription functionality.
Previously we were spending 15 seconds on linerva (and more like 2.5 minutes on the not-yet-operational zmirror.humbughq.com) to subscribe to all of our streams. (imported from commit c36cb1c26868f142683d9c92d4875fcd4931886e)
This commit is contained in:
parent
43b9510abd
commit
1c1519109a
|
@ -130,11 +130,9 @@ def username_to_fullname(username):
|
||||||
return fullnames[username]
|
return fullnames[username]
|
||||||
|
|
||||||
current_zephyr_subs = set()
|
current_zephyr_subs = set()
|
||||||
def ensure_subscribed(sub):
|
def zephyr_bulk_subscribe(subs):
|
||||||
if sub in current_zephyr_subs:
|
|
||||||
return
|
|
||||||
try:
|
try:
|
||||||
zephyr.Subscriptions().add((sub, '*', '*'))
|
zephyr._z.subAll(subs)
|
||||||
except IOError:
|
except IOError:
|
||||||
# Since we haven't added the subscription to
|
# Since we haven't added the subscription to
|
||||||
# current_zephyr_subs yet, we can just return (so that we'll
|
# current_zephyr_subs yet, we can just return (so that we'll
|
||||||
|
@ -142,9 +140,10 @@ def ensure_subscribed(sub):
|
||||||
# retrying the next time the bot checks its subscriptions are
|
# retrying the next time the bot checks its subscriptions are
|
||||||
# up to date.
|
# up to date.
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
print "Error subscribing to stream %s; will retry later." % (sub,)
|
print "Error subscribing to streams; will retry later."
|
||||||
return
|
return
|
||||||
current_zephyr_subs.add(sub)
|
for (cls, instance, recipient) in subs:
|
||||||
|
current_zephyr_subs.add(cls)
|
||||||
|
|
||||||
def update_subscriptions_from_humbug():
|
def update_subscriptions_from_humbug():
|
||||||
try:
|
try:
|
||||||
|
@ -154,8 +153,12 @@ def update_subscriptions_from_humbug():
|
||||||
print "%s: Error getting public streams:" % (datetime.datetime.now())
|
print "%s: Error getting public streams:" % (datetime.datetime.now())
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
return
|
return
|
||||||
|
streams_to_subscribe = []
|
||||||
for stream in streams:
|
for stream in streams:
|
||||||
ensure_subscribed(stream)
|
if stream in current_zephyr_subs:
|
||||||
|
continue
|
||||||
|
streams_to_subscribe.append((stream, "*", "*"))
|
||||||
|
zephyr_bulk_subscribe(streams_to_subscribe)
|
||||||
|
|
||||||
def maybe_restart_mirroring_script():
|
def maybe_restart_mirroring_script():
|
||||||
if os.stat(os.path.join(options.root_path, "stamps", "restart_stamp")).st_mtime > start_time or \
|
if os.stat(os.path.join(options.root_path, "stamps", "restart_stamp")).st_mtime > start_time or \
|
||||||
|
|
Loading…
Reference in a new issue