This includes mainly fixes of string literals using f-strings or .format(...), as well as unpacking of list comprehensions.
		
			
				
	
	
		
			263 lines
		
	
	
	
		
			7.7 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable file
		
	
	
	
	
			
		
		
	
	
			263 lines
		
	
	
	
		
			7.7 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable file
		
	
	
	
	
#!/usr/bin/env python3
 | 
						|
#
 | 
						|
# RSS integration for Zulip
 | 
						|
#
 | 
						|
 | 
						|
import argparse
 | 
						|
import calendar
 | 
						|
import errno
 | 
						|
import hashlib
 | 
						|
import logging
 | 
						|
import os
 | 
						|
import re
 | 
						|
import sys
 | 
						|
import time
 | 
						|
import urllib.parse
 | 
						|
from html.parser import HTMLParser
 | 
						|
from typing import Any, Dict, List, Tuple
 | 
						|
 | 
						|
import feedparser
 | 
						|
 | 
						|
import zulip
 | 
						|
 | 
						|
VERSION = "0.9"  # type: str
 | 
						|
RSS_DATA_DIR = os.path.expanduser(os.path.join("~", ".cache", "zulip-rss"))  # type: str
 | 
						|
OLDNESS_THRESHOLD = 30  # type: int
 | 
						|
 | 
						|
usage = """Usage: Send summaries of RSS entries for your favorite feeds to Zulip.
 | 
						|
 | 
						|
This bot requires the feedparser module.
 | 
						|
 | 
						|
To use this script:
 | 
						|
 | 
						|
1. Create an RSS feed file containing 1 feed URL per line (default feed
 | 
						|
   file location: ~/.cache/zulip-rss/rss-feeds)
 | 
						|
2. Subscribe to the stream that will receive RSS updates (default stream: rss)
 | 
						|
3. create a ~/.zuliprc as described on https://zulip.com/api/configuring-python-bindings
 | 
						|
4. Test the script by running it manually, like this:
 | 
						|
 | 
						|
/usr/local/share/zulip/integrations/rss/rss-bot
 | 
						|
 | 
						|
You can customize the location on the feed file and recipient stream, e.g.:
 | 
						|
 | 
						|
/usr/local/share/zulip/integrations/rss/rss-bot --feed-file=/path/to/my-feeds --stream=my-rss-stream
 | 
						|
 | 
						|
4. Configure a crontab entry for this script. A sample crontab entry for
 | 
						|
processing feeds stored in the default location and sending to the default
 | 
						|
stream every 5 minutes is:
 | 
						|
 | 
						|
*/5 * * * * /usr/local/share/zulip/integrations/rss/rss-bot"""
 | 
						|
 | 
						|
parser = zulip.add_default_arguments(
 | 
						|
    argparse.ArgumentParser(usage)
 | 
						|
)  # type: argparse.ArgumentParser
 | 
						|
parser.add_argument(
 | 
						|
    "--stream",
 | 
						|
    dest="stream",
 | 
						|
    help="The stream to which to send RSS messages.",
 | 
						|
    default="rss",
 | 
						|
    action="store",
 | 
						|
)
 | 
						|
parser.add_argument(
 | 
						|
    "--data-dir",
 | 
						|
    dest="data_dir",
 | 
						|
    help="The directory where feed metadata is stored",
 | 
						|
    default=os.path.join(RSS_DATA_DIR),
 | 
						|
    action="store",
 | 
						|
)
 | 
						|
parser.add_argument(
 | 
						|
    "--feed-file",
 | 
						|
    dest="feed_file",
 | 
						|
    help="The file containing a list of RSS feed URLs to follow, one URL per line",
 | 
						|
    default=os.path.join(RSS_DATA_DIR, "rss-feeds"),
 | 
						|
    action="store",
 | 
						|
)
 | 
						|
parser.add_argument(
 | 
						|
    "--unwrap",
 | 
						|
    dest="unwrap",
 | 
						|
    action="store_true",
 | 
						|
    help="Convert word-wrapped paragraphs into single lines",
 | 
						|
    default=False,
 | 
						|
)
 | 
						|
parser.add_argument(
 | 
						|
    "--math",
 | 
						|
    dest="math",
 | 
						|
    action="store_true",
 | 
						|
    help="Convert $ to $$ (for KaTeX processing)",
 | 
						|
    default=False,
 | 
						|
)
 | 
						|
 | 
						|
opts = parser.parse_args()  # type: Any
 | 
						|
 | 
						|
 | 
						|
def mkdir_p(path: str) -> None:
 | 
						|
    # Python doesn't have an analog to `mkdir -p` < Python 3.2.
 | 
						|
    try:
 | 
						|
        os.makedirs(path)
 | 
						|
    except OSError as e:
 | 
						|
        if e.errno == errno.EEXIST and os.path.isdir(path):
 | 
						|
            pass
 | 
						|
        else:
 | 
						|
            raise
 | 
						|
 | 
						|
 | 
						|
try:
 | 
						|
    mkdir_p(opts.data_dir)
 | 
						|
except OSError:
 | 
						|
    # We can't write to the logfile, so just print and give up.
 | 
						|
    print(f"Unable to store RSS data at {opts.data_dir}.", file=sys.stderr)
 | 
						|
    exit(1)
 | 
						|
 | 
						|
log_file = os.path.join(opts.data_dir, "rss-bot.log")  # type: str
 | 
						|
log_format = "%(asctime)s: %(message)s"  # type: str
 | 
						|
logging.basicConfig(format=log_format)
 | 
						|
 | 
						|
formatter = logging.Formatter(log_format)  # type: logging.Formatter
 | 
						|
file_handler = logging.FileHandler(log_file)  # type: logging.FileHandler
 | 
						|
file_handler.setFormatter(formatter)
 | 
						|
 | 
						|
logger = logging.getLogger(__name__)  # type: logging.Logger
 | 
						|
logger.setLevel(logging.DEBUG)
 | 
						|
logger.addHandler(file_handler)
 | 
						|
 | 
						|
 | 
						|
def log_error_and_exit(error: str) -> None:
 | 
						|
    logger.error(error)
 | 
						|
    logger.error(usage)
 | 
						|
    exit(1)
 | 
						|
 | 
						|
 | 
						|
class MLStripper(HTMLParser):
 | 
						|
    def __init__(self) -> None:
 | 
						|
        super().__init__()
 | 
						|
        self.reset()
 | 
						|
        self.fed = []  # type: List[str]
 | 
						|
 | 
						|
    def handle_data(self, data: str) -> None:
 | 
						|
        self.fed.append(data)
 | 
						|
 | 
						|
    def get_data(self) -> str:
 | 
						|
        return "".join(self.fed)
 | 
						|
 | 
						|
 | 
						|
def strip_tags(html: str) -> str:
 | 
						|
    stripper = MLStripper()
 | 
						|
    stripper.feed(html)
 | 
						|
    return stripper.get_data()
 | 
						|
 | 
						|
 | 
						|
def compute_entry_hash(entry: Dict[str, Any]) -> str:
 | 
						|
    entry_time = entry.get("published", entry.get("updated"))
 | 
						|
    entry_id = entry.get("id", entry.get("link"))
 | 
						|
    return hashlib.md5((entry_id + str(entry_time)).encode()).hexdigest()
 | 
						|
 | 
						|
 | 
						|
def unwrap_text(body: str) -> str:
 | 
						|
    # Replace \n by space if it is preceded and followed by a non-\n.
 | 
						|
    # Example: '\na\nb\nc\n\nd\n' -> '\na b c\n\nd\n'
 | 
						|
    return re.sub("(?<=[^\n])\n(?=[^\n])", " ", body)
 | 
						|
 | 
						|
 | 
						|
def elide_subject(subject: str) -> str:
 | 
						|
    MAX_TOPIC_LENGTH = 60
 | 
						|
    if len(subject) > MAX_TOPIC_LENGTH:
 | 
						|
        subject = subject[: MAX_TOPIC_LENGTH - 3].rstrip() + "..."
 | 
						|
    return subject
 | 
						|
 | 
						|
 | 
						|
def send_zulip(entry: Any, feed_name: str) -> Dict[str, Any]:
 | 
						|
    body = entry.summary  # type: str
 | 
						|
    if opts.unwrap:
 | 
						|
        body = unwrap_text(body)
 | 
						|
 | 
						|
    content = "**[{}]({})**\n{}\n{}".format(
 | 
						|
        entry.title,
 | 
						|
        entry.link,
 | 
						|
        strip_tags(body),
 | 
						|
        entry.link,
 | 
						|
    )  # type: str
 | 
						|
 | 
						|
    if opts.math:
 | 
						|
        content = content.replace("$", "$$")
 | 
						|
 | 
						|
    message = {
 | 
						|
        "type": "stream",
 | 
						|
        "sender": opts.zulip_email,
 | 
						|
        "to": opts.stream,
 | 
						|
        "subject": elide_subject(feed_name),
 | 
						|
        "content": content,
 | 
						|
    }  # type: Dict[str, str]
 | 
						|
    return client.send_message(message)
 | 
						|
 | 
						|
 | 
						|
try:
 | 
						|
    with open(opts.feed_file) as f:
 | 
						|
        feed_urls = [feed.strip() for feed in f.readlines()]  # type: List[str]
 | 
						|
except OSError:
 | 
						|
    log_error_and_exit(f"Unable to read feed file at {opts.feed_file}.")
 | 
						|
 | 
						|
client = zulip.Client(
 | 
						|
    email=opts.zulip_email,
 | 
						|
    api_key=opts.zulip_api_key,
 | 
						|
    config_file=opts.zulip_config_file,
 | 
						|
    site=opts.zulip_site,
 | 
						|
    client="ZulipRSS/" + VERSION,
 | 
						|
)  # type: zulip.Client
 | 
						|
 | 
						|
first_message = True  # type: bool
 | 
						|
 | 
						|
for feed_url in feed_urls:
 | 
						|
    feed_file = os.path.join(opts.data_dir, urllib.parse.urlparse(feed_url).netloc)  # Type: str
 | 
						|
 | 
						|
    try:
 | 
						|
        with open(feed_file) as f:
 | 
						|
            old_feed_hashes = {
 | 
						|
                line.strip(): True for line in f.readlines()
 | 
						|
            }  # type: Dict[str, bool]
 | 
						|
    except OSError:
 | 
						|
        old_feed_hashes = {}
 | 
						|
 | 
						|
    new_hashes = []  # type: List[str]
 | 
						|
    data = feedparser.parse(feed_url)  # type: feedparser.parse
 | 
						|
 | 
						|
    for entry in data.entries:
 | 
						|
        entry_hash = compute_entry_hash(entry)  # type: str
 | 
						|
        # An entry has either been published or updated.
 | 
						|
        entry_time = entry.get(
 | 
						|
            "published_parsed", entry.get("updated_parsed")
 | 
						|
        )  # type: Tuple[int, int]
 | 
						|
        if (
 | 
						|
            entry_time is not None
 | 
						|
            and (time.time() - calendar.timegm(entry_time)) > OLDNESS_THRESHOLD * 60 * 60 * 24
 | 
						|
        ):
 | 
						|
            # As a safeguard against misbehaving feeds, don't try to process
 | 
						|
            # entries older than some threshold.
 | 
						|
            continue
 | 
						|
        if entry_hash in old_feed_hashes:
 | 
						|
            # We've already seen this. No need to process any older entries.
 | 
						|
            break
 | 
						|
        if (not old_feed_hashes) and (len(new_hashes) >= 3):
 | 
						|
            # On a first run, pick up the 3 most recent entries. An RSS feed has
 | 
						|
            # entries in reverse chronological order.
 | 
						|
            break
 | 
						|
 | 
						|
        feed_name = data.feed.title or feed_url  # type: str
 | 
						|
 | 
						|
        response = send_zulip(entry, feed_name)  # type: Dict[str, Any]
 | 
						|
        if response["result"] != "success":
 | 
						|
            logger.error(f"Error processing {feed_url}")
 | 
						|
            logger.error(str(response))
 | 
						|
            if first_message:
 | 
						|
                # This is probably some fundamental problem like the stream not
 | 
						|
                # existing or something being misconfigured, so bail instead of
 | 
						|
                # getting the same error for every RSS entry.
 | 
						|
                log_error_and_exit("Failed to process first message")
 | 
						|
        # Go ahead and move on -- perhaps this entry is corrupt.
 | 
						|
        new_hashes.append(entry_hash)
 | 
						|
        first_message = False
 | 
						|
 | 
						|
    with open(feed_file, "a") as f:
 | 
						|
        for hash in new_hashes:
 | 
						|
            f.write(hash + "\n")
 | 
						|
 | 
						|
    logger.info("Sent zulips for %d %s entries" % (len(new_hashes), feed_url))
 |