python-zulip-api/zulip/integrations/rss/rss-bot

264 lines
7.7 KiB
Plaintext
Raw Permalink Normal View History

2020-04-02 09:59:28 -04:00
#!/usr/bin/env python3
#
# RSS integration for Zulip
#
2021-05-28 05:00:04 -04:00
import argparse
import calendar
import errno
import hashlib
import logging
import os
import re
import sys
import time
2020-04-03 05:23:36 -04:00
import urllib.parse
2021-05-28 05:00:04 -04:00
from html.parser import HTMLParser
from typing import Any, Dict, List, Tuple
import feedparser
2021-05-28 05:00:04 -04:00
import zulip
2021-05-28 05:00:04 -04:00
VERSION = "0.9" # type: str
RSS_DATA_DIR = os.path.expanduser(os.path.join("~", ".cache", "zulip-rss")) # type: str
OLDNESS_THRESHOLD = 30 # type: int
usage = """Usage: Send summaries of RSS entries for your favorite feeds to Zulip.
This bot requires the feedparser module.
To use this script:
1. Create an RSS feed file containing 1 feed URL per line (default feed
file location: ~/.cache/zulip-rss/rss-feeds)
2. Subscribe to the stream that will receive RSS updates (default stream: rss)
3. create a ~/.zuliprc as described on https://zulip.com/api/configuring-python-bindings
4. Test the script by running it manually, like this:
/usr/local/share/zulip/integrations/rss/rss-bot
You can customize the location on the feed file and recipient stream, e.g.:
/usr/local/share/zulip/integrations/rss/rss-bot --feed-file=/path/to/my-feeds --stream=my-rss-stream
4. Configure a crontab entry for this script. A sample crontab entry for
processing feeds stored in the default location and sending to the default
stream every 5 minutes is:
*/5 * * * * /usr/local/share/zulip/integrations/rss/rss-bot"""
parser = zulip.add_default_arguments(
argparse.ArgumentParser(usage)
) # type: argparse.ArgumentParser
parser.add_argument(
"--stream",
dest="stream",
help="The stream to which to send RSS messages.",
default="rss",
action="store",
)
parser.add_argument(
"--data-dir",
dest="data_dir",
help="The directory where feed metadata is stored",
default=os.path.join(RSS_DATA_DIR),
action="store",
)
parser.add_argument(
"--feed-file",
dest="feed_file",
help="The file containing a list of RSS feed URLs to follow, one URL per line",
default=os.path.join(RSS_DATA_DIR, "rss-feeds"),
action="store",
)
parser.add_argument(
"--unwrap",
dest="unwrap",
action="store_true",
help="Convert word-wrapped paragraphs into single lines",
default=False,
)
parser.add_argument(
"--math",
dest="math",
action="store_true",
help="Convert $ to $$ (for KaTeX processing)",
default=False,
)
2017-08-02 16:21:48 -04:00
opts = parser.parse_args() # type: Any
def mkdir_p(path: str) -> None:
# Python doesn't have an analog to `mkdir -p` < Python 3.2.
try:
os.makedirs(path)
except OSError as e:
if e.errno == errno.EEXIST and os.path.isdir(path):
pass
else:
raise
try:
mkdir_p(opts.data_dir)
except OSError:
# We can't write to the logfile, so just print and give up.
print(f"Unable to store RSS data at {opts.data_dir}.", file=sys.stderr)
exit(1)
log_file = os.path.join(opts.data_dir, "rss-bot.log") # type: str
log_format = "%(asctime)s: %(message)s" # type: str
logging.basicConfig(format=log_format)
formatter = logging.Formatter(log_format) # type: logging.Formatter
file_handler = logging.FileHandler(log_file) # type: logging.FileHandler
file_handler.setFormatter(formatter)
logger = logging.getLogger(__name__) # type: logging.Logger
logger.setLevel(logging.DEBUG)
logger.addHandler(file_handler)
def log_error_and_exit(error: str) -> None:
logger.error(error)
logger.error(usage)
exit(1)
class MLStripper(HTMLParser):
def __init__(self) -> None:
super().__init__()
self.reset()
self.fed = [] # type: List[str]
def handle_data(self, data: str) -> None:
self.fed.append(data)
def get_data(self) -> str:
return "".join(self.fed)
def strip_tags(html: str) -> str:
stripper = MLStripper()
stripper.feed(html)
return stripper.get_data()
def compute_entry_hash(entry: Dict[str, Any]) -> str:
entry_time = entry.get("published", entry.get("updated"))
entry_id = entry.get("id", entry.get("link"))
return hashlib.md5((entry_id + str(entry_time)).encode()).hexdigest()
def unwrap_text(body: str) -> str:
# Replace \n by space if it is preceded and followed by a non-\n.
# Example: '\na\nb\nc\n\nd\n' -> '\na b c\n\nd\n'
return re.sub("(?<=[^\n])\n(?=[^\n])", " ", body)
def elide_subject(subject: str) -> str:
MAX_TOPIC_LENGTH = 60
if len(subject) > MAX_TOPIC_LENGTH:
subject = subject[: MAX_TOPIC_LENGTH - 3].rstrip() + "..."
return subject
def send_zulip(entry: Any, feed_name: str) -> Dict[str, Any]:
body = entry.summary # type: str
if opts.unwrap:
body = unwrap_text(body)
content = "**[{}]({})**\n{}\n{}".format(
entry.title,
entry.link,
strip_tags(body),
entry.link,
) # type: str
if opts.math:
content = content.replace("$", "$$")
message = {
"type": "stream",
"sender": opts.zulip_email,
"to": opts.stream,
"subject": elide_subject(feed_name),
"content": content,
} # type: Dict[str, str]
return client.send_message(message)
try:
with open(opts.feed_file) as f:
feed_urls = [feed.strip() for feed in f.readlines()] # type: List[str]
except OSError:
log_error_and_exit(f"Unable to read feed file at {opts.feed_file}.")
client = zulip.Client(
email=opts.zulip_email,
api_key=opts.zulip_api_key,
config_file=opts.zulip_config_file,
site=opts.zulip_site,
client="ZulipRSS/" + VERSION,
) # type: zulip.Client
first_message = True # type: bool
for feed_url in feed_urls:
feed_file = os.path.join(opts.data_dir, urllib.parse.urlparse(feed_url).netloc) # Type: str
try:
with open(feed_file) as f:
old_feed_hashes = {
line.strip(): True for line in f.readlines()
} # type: Dict[str, bool]
except OSError:
old_feed_hashes = {}
new_hashes = [] # type: List[str]
data = feedparser.parse(feed_url) # type: feedparser.parse
for entry in data.entries:
entry_hash = compute_entry_hash(entry) # type: str
# An entry has either been published or updated.
entry_time = entry.get(
"published_parsed", entry.get("updated_parsed")
) # type: Tuple[int, int]
if (
entry_time is not None
and (time.time() - calendar.timegm(entry_time)) > OLDNESS_THRESHOLD * 60 * 60 * 24
):
# As a safeguard against misbehaving feeds, don't try to process
# entries older than some threshold.
continue
if entry_hash in old_feed_hashes:
# We've already seen this. No need to process any older entries.
break
if (not old_feed_hashes) and (len(new_hashes) >= 3):
# On a first run, pick up the 3 most recent entries. An RSS feed has
# entries in reverse chronological order.
break
feed_name = data.feed.title or feed_url # type: str
response = send_zulip(entry, feed_name) # type: Dict[str, Any]
if response["result"] != "success":
logger.error(f"Error processing {feed_url}")
logger.error(str(response))
if first_message:
# This is probably some fundamental problem like the stream not
# existing or something being misconfigured, so bail instead of
# getting the same error for every RSS entry.
log_error_and_exit("Failed to process first message")
# Go ahead and move on -- perhaps this entry is corrupt.
new_hashes.append(entry_hash)
first_message = False
with open(feed_file, "a") as f:
for hash in new_hashes:
f.write(hash + "\n")
logger.info("Sent zulips for %d %s entries" % (len(new_hashes), feed_url))