Annotate api/zulip/__init__.py.

Note that we still can't run mypy against this file and other files,
because of how the interface is dynamically created via _register.  We
will need to change that or use a stub file to make it possible to
annotate this.

This was tweaked by tabbott to fix some bugs.
This commit is contained in:
Juan Verhook 2016-12-02 18:26:47 -08:00 committed by Tim Abbott
parent 2e2b8af9fd
commit 25a8315f71

View file

@ -38,8 +38,7 @@ from six.moves.configparser import SafeConfigParser
from six.moves import urllib
import logging
import six
from typing import Any, Dict
from typing import Any, Callable, Dict, Mapping, Optional, Tuple, Union
__version__ = "0.2.5"
@ -47,7 +46,7 @@ logger = logging.getLogger(__name__)
# Check that we have a recent enough version
# Older versions don't provide the 'json' attribute on responses.
assert(LooseVersion(requests.__version__) >= LooseVersion('0.12.1')) # type: ignore # https://github.com/python/mypy/issues/1165 and https://github.com/python/typeshed/pull/206
assert(LooseVersion(requests.__version__) >= LooseVersion('0.12.1'))
# In newer versions, the 'json' attribute is a function, not a property
requests_json_is_function = callable(requests.Response.json)
@ -55,26 +54,31 @@ API_VERSTRING = "v1/"
class CountingBackoff(object):
def __init__(self, maximum_retries=10, timeout_success_equivalent=None):
# type: (int, Optional[float]) -> None
self.number_of_retries = 0
self.maximum_retries = maximum_retries
self.timeout_success_equivalent = timeout_success_equivalent
self.last_attempt_time = 0
self.last_attempt_time = 0.0
def keep_going(self):
# type: () -> bool
self._check_success_timeout()
return self.number_of_retries < self.maximum_retries
def succeed(self):
# type: () -> None
self.number_of_retries = 0
self.last_attempt_time = time.time()
def fail(self):
# type: () -> None
self._check_success_timeout()
self.number_of_retries = min(self.number_of_retries + 1,
self.maximum_retries)
self.last_attempt_time = time.time()
def _check_success_timeout(self):
# type: () -> None
if (self.timeout_success_equivalent is not None
and self.last_attempt_time != 0
and time.time() - self.last_attempt_time > self.timeout_success_equivalent):
@ -82,6 +86,7 @@ class CountingBackoff(object):
class RandomExponentialBackoff(CountingBackoff):
def fail(self):
# type: () -> None
super(RandomExponentialBackoff, self).fail()
# Exponential growth with ratio sqrt(2); compute random delay
# between x and 2x where x is growing exponentially
@ -95,9 +100,11 @@ class RandomExponentialBackoff(CountingBackoff):
time.sleep(delay)
def _default_client():
# type: () -> str
return "ZulipPython/" + __version__
def generate_option_group(parser, prefix=''):
# type: (optparse.OptionParser, str) -> optparse.OptionGroup
group = optparse.OptionGroup(parser, 'Zulip API configuration')
group.add_option('--%ssite' % (prefix,),
dest="zulip_site",
@ -148,6 +155,7 @@ def generate_option_group(parser, prefix=''):
return group
def init_from_options(options, client=None):
# type: (Any, Optional[str]) -> Client
if options.zulip_client is not None:
client = options.zulip_client
elif client is None:
@ -160,6 +168,7 @@ def init_from_options(options, client=None):
client_cert_key=options.client_cert_key)
def get_default_config_filename():
# type: () -> str
config_file = os.path.join(os.environ["HOME"], ".zuliprc")
if (not os.path.exists(config_file) and
os.path.exists(os.path.join(os.environ["HOME"], ".humbugrc"))):
@ -173,6 +182,7 @@ class Client(object):
site=None, client=None,
cert_bundle=None, insecure=None,
client_cert=None, client_cert_key=None):
# type: (Optional[str], Optional[str], Optional[str], bool, bool, Optional[str], Optional[str], Optional[str], bool, Optional[str], Optional[str]) -> None
if client is None:
client = _default_client()
@ -230,7 +240,7 @@ class Client(object):
self.client_name = client
if insecure:
self.tls_verification = False
self.tls_verification = False # type: Union[bool, str]
elif cert_bundle is not None:
if not os.path.isfile(cert_bundle):
raise RuntimeError("tls bundle '%s' does not exist"
@ -256,6 +266,7 @@ class Client(object):
self.client_cert_key = client_cert_key
def get_user_agent(self):
# type: () -> str
vendor = ''
vendor_version = ''
try:
@ -280,6 +291,7 @@ class Client(object):
)
def do_api_query(self, orig_request, url, method="POST", longpolling = False):
# type: (Mapping[str, Any], str, str, bool) -> Dict[str, Any]
request = {}
for (key, val) in six.iteritems(orig_request):
@ -295,6 +307,7 @@ class Client(object):
} # type: Dict[str, Any]
def error_retry(error_string):
# type: (str) -> bool
if not self.retry_on_errors or query_state["failures"] >= 10:
return False
if self.verbose:
@ -311,6 +324,7 @@ class Client(object):
return True
def end_error_retry(succeeded):
# type: (bool) -> None
if query_state["had_error_retry"] and self.verbose:
if succeeded:
print("Success!")
@ -327,7 +341,7 @@ class Client(object):
# Build a client cert object for requests
if self.client_cert_key is not None:
client_cert = (self.client_cert, self.client_cert_key)
client_cert = (self.client_cert, self.client_cert_key) # type: Union[str, Tuple[str, str]]
else:
client_cert = self.client_cert
@ -391,15 +405,15 @@ class Client(object):
@classmethod
def _register(cls, name, url=None, make_request=None,
method="POST", computed_url=None, **query_kwargs):
# type: (Any, str, Optional[Callable], str, Optional[Callable], **Any) -> Any
if url is None:
url = name
if make_request is None:
def make_request(request=None):
if request is None:
request = {}
return request
def make_request(**kwargs):
# type: (**Any) -> Any
return kwargs.get("request", {})
def call(self, *args, **kwargs):
# type: (*Any, **Any) -> str
request = make_request(*args, **kwargs)
if computed_url is not None:
req_url = computed_url(request)
@ -410,15 +424,16 @@ class Client(object):
setattr(cls, name, call)
def call_on_each_event(self, callback, event_types=None, narrow=None):
# type: (Callable, Optional[List[str]], Any) -> None
if narrow is None:
narrow = []
def do_register():
# type: () -> Tuple[str, int]
while True:
if event_types is None:
res = self.register()
res = self.register() # type: ignore
else:
res = self.register(event_types=event_types, narrow=narrow)
res = self.register(event_types=event_types, narrow=narrow) # type: ignore
if 'error' in res.get('result'):
if self.verbose:
@ -432,7 +447,7 @@ class Client(object):
if queue_id is None:
(queue_id, last_event_id) = do_register()
res = self.get_events(queue_id=queue_id, last_event_id=last_event_id)
res = self.get_events(queue_id=queue_id, last_event_id=last_event_id) # type: ignore
if 'error' in res.get('result'):
if res["result"] == "http-error":
if self.verbose:
@ -463,24 +478,29 @@ class Client(object):
callback(event)
def call_on_each_message(self, callback):
# type: (Callable) -> None
def event_callback(event):
# type: (Dict[str, str]) -> None
if event['type'] == 'message':
callback(event['message'])
self.call_on_each_event(event_callback, ['message'])
def _mk_subs(streams, **kwargs):
# type: (str, **Any ) -> Dict[str, str]
result = kwargs
result['subscriptions'] = streams
return result
def _mk_rm_subs(streams):
# type: (str) -> Dict[str, str]
return {'delete': streams}
def _mk_deregister(queue_id):
# type: (str) -> Dict[str, str]
return {'queue_id': queue_id}
def _mk_events(event_types=None, narrow=None):
# type: (Any, List[None]) -> Dict[Any, List[None]]
if event_types is None:
return dict()
if narrow is None:
@ -488,6 +508,7 @@ def _mk_events(event_types=None, narrow=None):
return dict(event_types=event_types, narrow=narrow)
def _kwargs_to_dict(**kwargs):
# type: (**Any) -> Any
return kwargs
class ZulipStream(object):
@ -496,19 +517,22 @@ class ZulipStream(object):
"""
def __init__(self, type, to, subject, **kwargs):
# type: (str, str, str, **Any) -> None
self.client = Client(**kwargs)
self.type = type
self.to = to
self.subject = subject
def write(self, content):
# type: (str) -> None
message = {"type": self.type,
"to": self.to,
"subject": self.subject,
"content": content}
self.client.send_message(message)
self.client.send_message(message) # type: ignore
def flush(self):
# type: () -> None
pass
Client._register('send_message', url='messages', make_request=(lambda request: request))