diff --git a/humbug/__init__.py b/humbug/__init__.py index 10ffaf9..e2a6a8a 100644 --- a/humbug/__init__.py +++ b/humbug/__init__.py @@ -204,13 +204,16 @@ class Client(object): setattr(cls, name, call) def call_on_each_message(self, callback, options = {}): - max_message_id = None + def do_register(): + res = self.register(event_types=['message']) + return (res['queue_id'], res['last_event_id']) + + queue_id = None while True: - if max_message_id is not None: - options["last"] = str(max_message_id) - elif options.get('last') is not None: - options.pop('last') - res = self.get_messages(options) + if queue_id is None: + (queue_id, last_event_id) = do_register() + + res = self.get_events(queue_id=queue_id, last_event_id=last_event_id) if 'error' in res.get('result'): if res["result"] == "http-error": if self.verbose: @@ -221,20 +224,29 @@ class Client(object): else: if self.verbose: print "Server returned error:\n%s" % res["msg"] - if res["msg"].startswith("last value of") and \ - "too old! Minimum valid is" in res["msg"]: - # We may have missed some messages while the - # network was down or something, but there's - # not really anything we can do about it other - # than resuming getting new ones. + if res["msg"].startswith("Bad event queue id:"): + # Our event queue went away, probably because + # we were asleep or the server restarted + # abnormally. We may have missed some + # messages while the network was down or + # something, but there's not really anything + # we can do about it other than resuming + # getting new ones. # - # Reset max_message_id to just subscribe to new messages - max_message_id = None + # Reset queue_id to register a new event queue. + queue_id = None # TODO: Make this back off once it's more reliable time.sleep(1) continue - for message in sorted(res['messages'], key=lambda x: int(x["id"])): - max_message_id = max(max_message_id, int(message["id"])) + + messages = [] + for event in res['events']: + last_event_id = max(last_event_id, int(event['id'])) + if event['type'] == 'message': + messages.append(event['message']) + + # The messages should already be sorted, but we sort just in case + for message in sorted(messages, key=lambda x: int(x["id"])): callback(message) def _mk_subs(streams): @@ -245,6 +257,8 @@ def _mk_del_subs(streams): Client._register('send_message', url='messages', make_request=(lambda request: request)) Client._register('get_messages', method='GET', url='messages/latest', longpolling=True) +Client._register('get_events', url='events', method='GET', longpolling=True, make_request=(lambda **kwargs: kwargs)) +Client._register('register', make_request=(lambda event_types=[]: dict(event_types=event_types))) Client._register('get_profile', method='GET', url='users/me') Client._register('get_public_streams', method='GET', url='streams') Client._register('get_members', method='GET', url='users')