From 56f805a5d773157e76eef63d33bc9ded75b6584f Mon Sep 17 00:00:00 2001 From: Anders Kaseorg Date: Mon, 22 Aug 2022 19:19:30 -0700 Subject: [PATCH] zephyr_mirror_backend: Fix thread safety problems. As of commit 5eaac7bfba4f959ba4a44f25f1a760e28a1a5d8d (#18), zulip.Client is not thread-safe and especially not fork-safe due to connections held open by requests.Session. Delay construction of the Client until after forking off zulip_to_zephyr. Replace the fork for each message sent by zephyr_to_zulip with a threaded queue worker. Signed-off-by: Anders Kaseorg --- .../zephyr/zephyr_mirror_backend.py | 91 ++++++++++++------- 1 file changed, 57 insertions(+), 34 deletions(-) diff --git a/zulip/integrations/zephyr/zephyr_mirror_backend.py b/zulip/integrations/zephyr/zephyr_mirror_backend.py index 6ab1537..6111248 100755 --- a/zulip/integrations/zephyr/zephyr_mirror_backend.py +++ b/zulip/integrations/zephyr/zephyr_mirror_backend.py @@ -13,18 +13,21 @@ import sys import tempfile import textwrap import time +from queue import Queue +from threading import Thread from types import FrameType from typing import IO, Any, Dict, List, NoReturn, Optional, Set, Tuple, Union from typing_extensions import Literal, TypedDict +import zulip from zulip import RandomExponentialBackoff DEFAULT_SITE = "https://api.zulip.com" class States: - Startup, ZulipToZephyr, ZephyrToZulip, ChildSending = list(range(4)) + Startup, ZulipToZephyr, ZephyrToZulip = list(range(3)) CURRENT_STATE = States.Startup @@ -32,6 +35,16 @@ CURRENT_STATE = States.Startup logger: logging.Logger +def make_zulip_client() -> zulip.Client: + return zulip.Client( + email=zulip_account_email, + api_key=api_key, + verbose=True, + client="zephyr_mirror", + site=options.site, + ) + + def to_zulip_username(zephyr_username: str) -> str: if "@" in zephyr_username: (user, realm) = zephyr_username.split("@") @@ -117,7 +130,7 @@ class ZephyrDict(TypedDict, total=False): zsig: str -def send_zulip(zeph: ZephyrDict) -> Dict[str, Any]: +def send_zulip(zulip_client: zulip.Client, zeph: ZephyrDict) -> Dict[str, Any]: message: Dict[str, Any] message = {} if options.forward_class_messages: @@ -151,7 +164,7 @@ def send_zulip(zeph: ZephyrDict) -> Dict[str, Any]: return zulip_client.send_message(message) -def send_error_zulip(error_msg: str) -> None: +def send_error_zulip(zulip_client: zulip.Client, error_msg: str) -> None: message = { "type": "private", "sender": zulip_account_email, @@ -263,7 +276,7 @@ def maybe_restart_mirroring_script() -> None: raise Exception("Failed to reload too many times, aborting!") -def process_loop(log: Optional[IO[str]]) -> NoReturn: +def process_loop(zulip_queue: "Queue[ZephyrDict]", log: Optional[IO[str]]) -> NoReturn: restart_check_count = 0 last_check_time = time.time() recieve_backoff = RandomExponentialBackoff() @@ -278,7 +291,7 @@ def process_loop(log: Optional[IO[str]]) -> NoReturn: if notice is None: break try: - process_notice(notice, log) + process_notice(notice, zulip_queue, log) process_backoff.succeed() except Exception: logger.exception("Error relaying zephyr:") @@ -395,7 +408,9 @@ def decrypt_zephyr(zephyr_class: str, instance: str, body: str) -> str: return decrypted -def process_notice(notice: "zephyr.ZNotice", log: Optional[IO[str]]) -> None: +def process_notice( + notice: "zephyr.ZNotice", zulip_queue: "Queue[ZephyrDict]", log: Optional[IO[str]] +) -> None: assert notice.sender is not None (zsig, body) = parse_zephyr_body(notice.message, notice.format) is_personal = False @@ -490,18 +505,19 @@ def process_notice(notice: "zephyr.ZNotice", log: Optional[IO[str]]) -> None: log.write(json.dumps(zeph) + "\n") log.flush() - if os.fork() == 0: - global CURRENT_STATE - CURRENT_STATE = States.ChildSending - # Actually send the message in a child process, to avoid blocking. + zulip_queue.put(zeph) + + +def send_zulip_worker(zulip_queue: "Queue[ZephyrDict]", zulip_client: zulip.Client) -> None: + while True: + zeph = zulip_queue.get() try: - res = send_zulip(zeph) + res = send_zulip(zulip_client, zeph) if res.get("result") != "success": logger.error(f"Error relaying zephyr:\n{zeph}\n{res}") except Exception: logger.exception("Error relaying zephyr:") - finally: - os._exit(0) + zulip_queue.task_done() def quit_failed_initialization(message: str) -> str: @@ -560,6 +576,8 @@ def zephyr_subscribe_autoretry(sub: Tuple[str, str, str]) -> None: def zephyr_to_zulip(options: optparse.Values) -> None: + zulip_client = make_zulip_client() + if options.use_sessions and os.path.exists(options.session_path): logger.info("Loading old session") zephyr_load_session_autoretry(options.session_path) @@ -593,18 +611,22 @@ def zephyr_to_zulip(options: optparse.Values) -> None: "sending saved message to %s from %s..." % (zeph.get("stream", zeph.get("recipient")), zeph["sender"]) ) - send_zulip(zeph) + send_zulip(zulip_client, zeph) except Exception: logger.exception("Could not send saved zephyr:") time.sleep(2) logger.info("Successfully initialized; Starting receive loop.") + # Actually send the messages in a thread, to avoid blocking. + zulip_queue: "Queue[ZephyrDict]" = Queue() + Thread(target=lambda: send_zulip_worker(zulip_queue, zulip_client)).start() + if options.resend_log_path is not None: with open(options.resend_log_path, "a") as log: - process_loop(log) + process_loop(zulip_queue, log) else: - process_loop(None) + process_loop(zulip_queue, None) def send_zephyr(zwrite_args: List[str], content: str) -> Tuple[int, str]: @@ -675,7 +697,7 @@ def zcrypt_encrypt_content(zephyr_class: str, instance: str, content: str) -> Op return encrypted -def forward_to_zephyr(message: Dict[str, Any]) -> None: +def forward_to_zephyr(message: Dict[str, Any], zulip_client: zulip.Client) -> None: # 'Any' can be of any type of text support_heading = "Hi there! This is an automated message from Zulip." support_closing = """If you have any questions, please be in touch through the \ @@ -749,6 +771,7 @@ Feedback button or at support@zulip.com.""" result = zcrypt_encrypt_content(zephyr_class, instance, wrapped_content) if result is None: send_error_zulip( + zulip_client, """%s Your Zulip-Zephyr mirror bot was unable to forward that last message \ @@ -758,7 +781,7 @@ key (perhaps because your AFS tokens expired). That means that while \ Zulip users (like you) received it, Zephyr users did not. %s""" - % (support_heading, support_closing) + % (support_heading, support_closing), ) return @@ -775,6 +798,7 @@ Zulip users (like you) received it, Zephyr users did not. return elif code == 0: send_error_zulip( + zulip_client, """%s Your last message was successfully mirrored to zephyr, but zwrite \ @@ -783,7 +807,7 @@ returned the following warning: %s %s""" - % (support_heading, stderr, support_closing) + % (support_heading, stderr, support_closing), ) return elif code != 0 and ( @@ -797,6 +821,7 @@ returned the following warning: if options.ignore_expired_tickets: return send_error_zulip( + zulip_client, """%s Your last message was forwarded from Zulip to Zephyr unauthenticated, \ @@ -806,7 +831,7 @@ are running the Zulip-Zephyr mirroring bot, so we can send \ authenticated Zephyr messages for you again. %s""" - % (support_heading, support_closing) + % (support_heading, support_closing), ) return @@ -814,6 +839,7 @@ authenticated Zephyr messages for you again. # probably because the recipient isn't subscribed to personals, # but regardless, we should just notify the user. send_error_zulip( + zulip_client, """%s Your Zulip-Zephyr mirror bot was unable to forward that last message \ @@ -823,12 +849,12 @@ received it, Zephyr users did not. The error message from zwrite was: %s %s""" - % (support_heading, stderr, support_closing) + % (support_heading, stderr, support_closing), ) return -def maybe_forward_to_zephyr(message: Dict[str, Any]) -> None: +def maybe_forward_to_zephyr(message: Dict[str, Any], zulip_client: zulip.Client) -> None: # The key string can be used to direct any type of text. if message["sender_email"] == zulip_account_email: if not ( @@ -851,7 +877,7 @@ def maybe_forward_to_zephyr(message: Dict[str, Any]) -> None: ) return try: - forward_to_zephyr(message) + forward_to_zephyr(message, zulip_client) except Exception: # Don't let an exception forwarding one message crash the # whole process @@ -859,12 +885,16 @@ def maybe_forward_to_zephyr(message: Dict[str, Any]) -> None: def zulip_to_zephyr(options: optparse.Values) -> NoReturn: + zulip_client = make_zulip_client() + # Sync messages from zulip to zephyr logger.info("Starting syncing messages.") backoff = RandomExponentialBackoff(timeout_success_equivalent=120) while True: try: - zulip_client.call_on_each_message(maybe_forward_to_zephyr) + zulip_client.call_on_each_message( + lambda message: maybe_forward_to_zephyr(message, zulip_client) + ) except Exception: logger.exception("Error syncing messages:") backoff.fail() @@ -886,6 +916,8 @@ def subscribed_to_mail_messages() -> bool: def add_zulip_subscriptions(verbose: bool) -> None: + zulip_client = make_zulip_client() + zephyr_subscriptions = set() skipped = set() for (cls, instance, recipient) in parse_zephyr_subs(verbose=verbose): @@ -1146,7 +1178,7 @@ def parse_args() -> Tuple[optparse.Values, List[str]]: def die_gracefully(signal: int, frame: FrameType) -> None: - if CURRENT_STATE == States.ZulipToZephyr or CURRENT_STATE == States.ChildSending: + if CURRENT_STATE == States.ZulipToZephyr: # this is a child process, so we want os._exit (no clean-up necessary) os._exit(1) @@ -1207,15 +1239,6 @@ or specify the --api-key-file option.""" sys.exit(1) zulip_account_email = options.user + "@mit.edu" - import zulip - - zulip_client = zulip.Client( - email=zulip_account_email, - api_key=api_key, - verbose=True, - client="zephyr_mirror", - site=options.site, - ) start_time = time.time()