mypy: Annotate /api/integrations/rss/rss-bot.

With a few tweaks by tabbott.
This commit is contained in:
ausDensk 2016-12-29 20:29:49 +01:00 committed by Tim Abbott
parent bb5abd971a
commit 8d603a4489

60
integrations/rss/rss-bot Executable file → Normal file
View file

@ -34,12 +34,13 @@ import os
import sys
import time
from six.moves import urllib
from typing import Dict, List, Tuple, Any
import feedparser
import zulip
VERSION = "0.9"
RSS_DATA_DIR = os.path.expanduser(os.path.join('~', '.cache', 'zulip-rss'))
OLDNESS_THRESHOLD = 30 # days
VERSION = "0.9" # type: str
RSS_DATA_DIR = os.path.expanduser(os.path.join('~', '.cache', 'zulip-rss')) # type: str
OLDNESS_THRESHOLD = 30 # type: int
usage = """Usage: Send summaries of RSS entries for your favorite feeds to Zulip.
@ -65,7 +66,7 @@ stream every 5 minutes is:
*/5 * * * * /usr/local/share/zulip/integrations/rss/rss-bot"""
parser = optparse.OptionParser(usage)
parser = optparse.OptionParser(usage) # type: optparse.OptionParser
parser.add_option('--stream',
dest='stream',
help='The stream to which to send RSS messages.',
@ -82,9 +83,10 @@ parser.add_option('--feed-file',
default=os.path.join(RSS_DATA_DIR, "rss-feeds"),
action='store')
parser.add_option_group(zulip.generate_option_group(parser))
(opts, args) = parser.parse_args()
(opts, args) = parser.parse_args() # type: Tuple[Any, List[str]]
def mkdir_p(path):
# type: (str) -> None
# Python doesn't have an analog to `mkdir -p` < Python 3.2.
try:
os.makedirs(path)
@ -101,90 +103,98 @@ except OSError:
print("Unable to store RSS data at %s." % (opts.data_dir,), file=sys.stderr)
exit(1)
log_file = os.path.join(opts.data_dir, "rss-bot.log")
log_format = "%(asctime)s: %(message)s"
log_file = os.path.join(opts.data_dir, "rss-bot.log") # type: str
log_format = "%(asctime)s: %(message)s" # type: str
logging.basicConfig(format=log_format)
formatter = logging.Formatter(log_format)
file_handler = logging.FileHandler(log_file)
formatter = logging.Formatter(log_format) # type: logging.Formatter
file_handler = logging.FileHandler(log_file) # type: logging.FileHandler
file_handler.setFormatter(formatter)
logger = logging.getLogger(__name__)
logger = logging.getLogger(__name__) # type: logging.Logger
logger.setLevel(logging.DEBUG)
logger.addHandler(file_handler)
def log_error_and_exit(error):
# type: (str) -> None
logger.error(error)
logger.error(usage)
exit(1)
class MLStripper(HTMLParser):
def __init__(self):
# type: () -> None
self.reset()
self.fed = []
self.fed = [] # type: List[str]
def handle_data(self, data):
# type: (str) -> None
self.fed.append(data)
def get_data(self):
# type: () -> str
return ''.join(self.fed)
def strip_tags(html):
# type: (str) -> str
stripper = MLStripper()
stripper.feed(html)
return stripper.get_data()
def compute_entry_hash(entry):
# type: (Dict[str, Any]) -> str
entry_time = entry.get("published", entry.get("updated"))
entry_id = entry.get("id", entry.get("link"))
return hashlib.md5(entry_id + str(entry_time)).hexdigest()
def elide_subject(subject):
# type: (str) -> str
MAX_TOPIC_LENGTH = 60
if len(subject) > MAX_TOPIC_LENGTH:
subject = subject[:MAX_TOPIC_LENGTH - 3].rstrip() + '...'
return subject
def send_zulip(entry, feed_name):
# type: (Any, str) -> Dict[str, Any]
content = "**[%s](%s)**\n%s\n%s" % (entry.title,
entry.link,
strip_tags(entry.summary),
entry.link)
entry.link) # type: str
message = {"type": "stream",
"sender": opts.zulip_email,
"to": opts.stream,
"subject": elide_subject(feed_name),
"content": content,
}
} # type: Dict[str, str]
return client.send_message(message)
try:
with open(opts.feed_file, "r") as f:
feed_urls = [feed.strip() for feed in f.readlines()]
feed_urls = [feed.strip() for feed in f.readlines()] # type: List[str]
except IOError:
log_error_and_exit("Unable to read feed file at %s." % (opts.feed_file,))
client = zulip.Client(email=opts.zulip_email, api_key=opts.zulip_api_key,
site=opts.zulip_site, client="ZulipRSS/" + VERSION)
site=opts.zulip_site, client="ZulipRSS/" + VERSION) # type: zulip.Client
first_message = True
first_message = True # type: bool
for feed_url in feed_urls:
feed_file = os.path.join(opts.data_dir, urllib.parse.urlparse(feed_url).netloc)
feed_file = os.path.join(opts.data_dir, urllib.parse.urlparse(feed_url).netloc) # Type: str
try:
with open(feed_file, "r") as f:
old_feed_hashes = dict((line.strip(), True) for line in f.readlines())
old_feed_hashes = dict((line.strip(), True) for line in f.readlines()) # type: Dict[str, bool]
except IOError:
old_feed_hashes = {}
new_hashes = []
data = feedparser.parse(feed_url)
new_hashes = [] # type: List[str]
data = feedparser.parse(feed_url) # type: feedparser.parse
for entry in data.entries:
entry_hash = compute_entry_hash(entry)
entry_hash = compute_entry_hash(entry) # type: str
# An entry has either been published or updated.
entry_time = entry.get("published_parsed", entry.get("updated_parsed"))
entry_time = entry.get("published_parsed", entry.get("updated_parsed")) # type: Tuple[int, int]
if entry_time is not None and (time.time() - calendar.timegm(entry_time)) > OLDNESS_THRESHOLD * 60 * 60 * 24:
# As a safeguard against misbehaving feeds, don't try to process
# entries older than some threshold.
@ -197,12 +207,12 @@ for feed_url in feed_urls:
# entries in reverse chronological order.
break
feed_name = data.feed.title or feed_url
feed_name = data.feed.title or feed_url # type: str
response = send_zulip(entry, feed_name)
response = send_zulip(entry, feed_name) # type: Dict[str, Any]
if response["result"] != "success":
logger.error("Error processing %s" % (feed_url,))
logger.error(response)
logger.error(str(response))
if first_message:
# This is probably some fundamental problem like the stream not
# existing or something being misconfigured, so bail instead of