Make python bindings use the event system

(imported from commit 5f47054bff4846018f8d606331de104a4ed2de0d)
This commit is contained in:
Zev Benjamin 2013-03-21 18:14:13 -04:00
parent 1b9a98e559
commit 13014fed0a

View file

@ -204,13 +204,16 @@ class Client(object):
setattr(cls, name, call) setattr(cls, name, call)
def call_on_each_message(self, callback, options = {}): def call_on_each_message(self, callback, options = {}):
max_message_id = None def do_register():
res = self.register(event_types=['message'])
return (res['queue_id'], res['last_event_id'])
queue_id = None
while True: while True:
if max_message_id is not None: if queue_id is None:
options["last"] = str(max_message_id) (queue_id, last_event_id) = do_register()
elif options.get('last') is not None:
options.pop('last') res = self.get_events(queue_id=queue_id, last_event_id=last_event_id)
res = self.get_messages(options)
if 'error' in res.get('result'): if 'error' in res.get('result'):
if res["result"] == "http-error": if res["result"] == "http-error":
if self.verbose: if self.verbose:
@ -221,20 +224,29 @@ class Client(object):
else: else:
if self.verbose: if self.verbose:
print "Server returned error:\n%s" % res["msg"] print "Server returned error:\n%s" % res["msg"]
if res["msg"].startswith("last value of") and \ if res["msg"].startswith("Bad event queue id:"):
"too old! Minimum valid is" in res["msg"]: # Our event queue went away, probably because
# We may have missed some messages while the # we were asleep or the server restarted
# network was down or something, but there's # abnormally. We may have missed some
# not really anything we can do about it other # messages while the network was down or
# than resuming getting new ones. # something, but there's not really anything
# we can do about it other than resuming
# getting new ones.
# #
# Reset max_message_id to just subscribe to new messages # Reset queue_id to register a new event queue.
max_message_id = None queue_id = None
# TODO: Make this back off once it's more reliable # TODO: Make this back off once it's more reliable
time.sleep(1) time.sleep(1)
continue continue
for message in sorted(res['messages'], key=lambda x: int(x["id"])):
max_message_id = max(max_message_id, int(message["id"])) messages = []
for event in res['events']:
last_event_id = max(last_event_id, int(event['id']))
if event['type'] == 'message':
messages.append(event['message'])
# The messages should already be sorted, but we sort just in case
for message in sorted(messages, key=lambda x: int(x["id"])):
callback(message) callback(message)
def _mk_subs(streams): def _mk_subs(streams):
@ -245,6 +257,8 @@ def _mk_del_subs(streams):
Client._register('send_message', url='messages', make_request=(lambda request: request)) Client._register('send_message', url='messages', make_request=(lambda request: request))
Client._register('get_messages', method='GET', url='messages/latest', longpolling=True) Client._register('get_messages', method='GET', url='messages/latest', longpolling=True)
Client._register('get_events', url='events', method='GET', longpolling=True, make_request=(lambda **kwargs: kwargs))
Client._register('register', make_request=(lambda event_types=[]: dict(event_types=event_types)))
Client._register('get_profile', method='GET', url='users/me') Client._register('get_profile', method='GET', url='users/me')
Client._register('get_public_streams', method='GET', url='streams') Client._register('get_public_streams', method='GET', url='streams')
Client._register('get_members', method='GET', url='users') Client._register('get_members', method='GET', url='users')