Add call_on_each_event() to our Python bindings
call_on_each_message() is now implemented in terms of call_on_each_event(). (imported from commit b4f74ccf46e9cafd9a6ca28dce975492c2d0b29f)
This commit is contained in:
parent
13014fed0a
commit
c2fa85ef71
|
@ -203,9 +203,19 @@ class Client(object):
|
||||||
call.func_name = name
|
call.func_name = name
|
||||||
setattr(cls, name, call)
|
setattr(cls, name, call)
|
||||||
|
|
||||||
def call_on_each_message(self, callback, options = {}):
|
def call_on_each_event(self, callback, event_types=None):
|
||||||
def do_register():
|
def do_register():
|
||||||
res = self.register(event_types=['message'])
|
while True:
|
||||||
|
if event_types is None:
|
||||||
|
res = self.register()
|
||||||
|
else:
|
||||||
|
res = self.register(event_types=event_types)
|
||||||
|
|
||||||
|
if 'error' in res.get('result'):
|
||||||
|
if self.verbose:
|
||||||
|
print "Server returned error:\n%s" % res['msg']
|
||||||
|
time.sleep(1)
|
||||||
|
else:
|
||||||
return (res['queue_id'], res['last_event_id'])
|
return (res['queue_id'], res['last_event_id'])
|
||||||
|
|
||||||
queue_id = None
|
queue_id = None
|
||||||
|
@ -217,10 +227,10 @@ class Client(object):
|
||||||
if 'error' in res.get('result'):
|
if 'error' in res.get('result'):
|
||||||
if res["result"] == "http-error":
|
if res["result"] == "http-error":
|
||||||
if self.verbose:
|
if self.verbose:
|
||||||
print "HTTP error fetching messages -- probably a server restart"
|
print "HTTP error fetching events -- probably a server restart"
|
||||||
elif res["result"] == "connection-error":
|
elif res["result"] == "connection-error":
|
||||||
if self.verbose:
|
if self.verbose:
|
||||||
print "Connection error fetching messages -- probably server is temporarily down?"
|
print "Connection error fetching events -- probably server is temporarily down?"
|
||||||
else:
|
else:
|
||||||
if self.verbose:
|
if self.verbose:
|
||||||
print "Server returned error:\n%s" % res["msg"]
|
print "Server returned error:\n%s" % res["msg"]
|
||||||
|
@ -228,7 +238,7 @@ class Client(object):
|
||||||
# Our event queue went away, probably because
|
# Our event queue went away, probably because
|
||||||
# we were asleep or the server restarted
|
# we were asleep or the server restarted
|
||||||
# abnormally. We may have missed some
|
# abnormally. We may have missed some
|
||||||
# messages while the network was down or
|
# events while the network was down or
|
||||||
# something, but there's not really anything
|
# something, but there's not really anything
|
||||||
# we can do about it other than resuming
|
# we can do about it other than resuming
|
||||||
# getting new ones.
|
# getting new ones.
|
||||||
|
@ -239,15 +249,16 @@ class Client(object):
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
messages = []
|
|
||||||
for event in res['events']:
|
for event in res['events']:
|
||||||
last_event_id = max(last_event_id, int(event['id']))
|
last_event_id = max(last_event_id, int(event['id']))
|
||||||
if event['type'] == 'message':
|
callback(event)
|
||||||
messages.append(event['message'])
|
|
||||||
|
|
||||||
# The messages should already be sorted, but we sort just in case
|
def call_on_each_message(self, callback):
|
||||||
for message in sorted(messages, key=lambda x: int(x["id"])):
|
def event_callback(event):
|
||||||
callback(message)
|
if event['type'] == 'message':
|
||||||
|
callback(event['message'])
|
||||||
|
|
||||||
|
self.call_on_each_event(event_callback, ['message'])
|
||||||
|
|
||||||
def _mk_subs(streams):
|
def _mk_subs(streams):
|
||||||
return {'subscriptions': streams}
|
return {'subscriptions': streams}
|
||||||
|
@ -255,10 +266,15 @@ def _mk_subs(streams):
|
||||||
def _mk_del_subs(streams):
|
def _mk_del_subs(streams):
|
||||||
return {'delete': streams}
|
return {'delete': streams}
|
||||||
|
|
||||||
|
def _mk_events(event_types=None):
|
||||||
|
if event_types is None:
|
||||||
|
return dict()
|
||||||
|
return dict(event_types=event_types)
|
||||||
|
|
||||||
Client._register('send_message', url='messages', make_request=(lambda request: request))
|
Client._register('send_message', url='messages', make_request=(lambda request: request))
|
||||||
Client._register('get_messages', method='GET', url='messages/latest', longpolling=True)
|
Client._register('get_messages', method='GET', url='messages/latest', longpolling=True)
|
||||||
Client._register('get_events', url='events', method='GET', longpolling=True, make_request=(lambda **kwargs: kwargs))
|
Client._register('get_events', url='events', method='GET', longpolling=True, make_request=(lambda **kwargs: kwargs))
|
||||||
Client._register('register', make_request=(lambda event_types=[]: dict(event_types=event_types)))
|
Client._register('register', make_request=_mk_events)
|
||||||
Client._register('get_profile', method='GET', url='users/me')
|
Client._register('get_profile', method='GET', url='users/me')
|
||||||
Client._register('get_public_streams', method='GET', url='streams')
|
Client._register('get_public_streams', method='GET', url='streams')
|
||||||
Client._register('get_members', method='GET', url='users')
|
Client._register('get_members', method='GET', url='users')
|
||||||
|
|
Loading…
Reference in a new issue