zephyr_mirror_backend: Fix thread safety problems.

As of commit 5eaac7bfba (#18),
zulip.Client is not thread-safe and especially not fork-safe due to
connections held open by requests.Session.

Delay construction of the Client until after forking off
zulip_to_zephyr.  Replace the fork for each message sent by
zephyr_to_zulip with a threaded queue worker.

Signed-off-by: Anders Kaseorg <anders@zulip.com>
This commit is contained in:
Anders Kaseorg 2022-08-22 19:19:30 -07:00 committed by Anders Kaseorg
parent 63c259b2bc
commit 56f805a5d7

View file

@ -13,18 +13,21 @@ import sys
import tempfile
import textwrap
import time
from queue import Queue
from threading import Thread
from types import FrameType
from typing import IO, Any, Dict, List, NoReturn, Optional, Set, Tuple, Union
from typing_extensions import Literal, TypedDict
import zulip
from zulip import RandomExponentialBackoff
DEFAULT_SITE = "https://api.zulip.com"
class States:
Startup, ZulipToZephyr, ZephyrToZulip, ChildSending = list(range(4))
Startup, ZulipToZephyr, ZephyrToZulip = list(range(3))
CURRENT_STATE = States.Startup
@ -32,6 +35,16 @@ CURRENT_STATE = States.Startup
logger: logging.Logger
def make_zulip_client() -> zulip.Client:
return zulip.Client(
email=zulip_account_email,
api_key=api_key,
verbose=True,
client="zephyr_mirror",
site=options.site,
)
def to_zulip_username(zephyr_username: str) -> str:
if "@" in zephyr_username:
(user, realm) = zephyr_username.split("@")
@ -117,7 +130,7 @@ class ZephyrDict(TypedDict, total=False):
zsig: str
def send_zulip(zeph: ZephyrDict) -> Dict[str, Any]:
def send_zulip(zulip_client: zulip.Client, zeph: ZephyrDict) -> Dict[str, Any]:
message: Dict[str, Any]
message = {}
if options.forward_class_messages:
@ -151,7 +164,7 @@ def send_zulip(zeph: ZephyrDict) -> Dict[str, Any]:
return zulip_client.send_message(message)
def send_error_zulip(error_msg: str) -> None:
def send_error_zulip(zulip_client: zulip.Client, error_msg: str) -> None:
message = {
"type": "private",
"sender": zulip_account_email,
@ -263,7 +276,7 @@ def maybe_restart_mirroring_script() -> None:
raise Exception("Failed to reload too many times, aborting!")
def process_loop(log: Optional[IO[str]]) -> NoReturn:
def process_loop(zulip_queue: "Queue[ZephyrDict]", log: Optional[IO[str]]) -> NoReturn:
restart_check_count = 0
last_check_time = time.time()
recieve_backoff = RandomExponentialBackoff()
@ -278,7 +291,7 @@ def process_loop(log: Optional[IO[str]]) -> NoReturn:
if notice is None:
break
try:
process_notice(notice, log)
process_notice(notice, zulip_queue, log)
process_backoff.succeed()
except Exception:
logger.exception("Error relaying zephyr:")
@ -395,7 +408,9 @@ def decrypt_zephyr(zephyr_class: str, instance: str, body: str) -> str:
return decrypted
def process_notice(notice: "zephyr.ZNotice", log: Optional[IO[str]]) -> None:
def process_notice(
notice: "zephyr.ZNotice", zulip_queue: "Queue[ZephyrDict]", log: Optional[IO[str]]
) -> None:
assert notice.sender is not None
(zsig, body) = parse_zephyr_body(notice.message, notice.format)
is_personal = False
@ -490,18 +505,19 @@ def process_notice(notice: "zephyr.ZNotice", log: Optional[IO[str]]) -> None:
log.write(json.dumps(zeph) + "\n")
log.flush()
if os.fork() == 0:
global CURRENT_STATE
CURRENT_STATE = States.ChildSending
# Actually send the message in a child process, to avoid blocking.
zulip_queue.put(zeph)
def send_zulip_worker(zulip_queue: "Queue[ZephyrDict]", zulip_client: zulip.Client) -> None:
while True:
zeph = zulip_queue.get()
try:
res = send_zulip(zeph)
res = send_zulip(zulip_client, zeph)
if res.get("result") != "success":
logger.error(f"Error relaying zephyr:\n{zeph}\n{res}")
except Exception:
logger.exception("Error relaying zephyr:")
finally:
os._exit(0)
zulip_queue.task_done()
def quit_failed_initialization(message: str) -> str:
@ -560,6 +576,8 @@ def zephyr_subscribe_autoretry(sub: Tuple[str, str, str]) -> None:
def zephyr_to_zulip(options: optparse.Values) -> None:
zulip_client = make_zulip_client()
if options.use_sessions and os.path.exists(options.session_path):
logger.info("Loading old session")
zephyr_load_session_autoretry(options.session_path)
@ -593,18 +611,22 @@ def zephyr_to_zulip(options: optparse.Values) -> None:
"sending saved message to %s from %s..."
% (zeph.get("stream", zeph.get("recipient")), zeph["sender"])
)
send_zulip(zeph)
send_zulip(zulip_client, zeph)
except Exception:
logger.exception("Could not send saved zephyr:")
time.sleep(2)
logger.info("Successfully initialized; Starting receive loop.")
# Actually send the messages in a thread, to avoid blocking.
zulip_queue: "Queue[ZephyrDict]" = Queue()
Thread(target=lambda: send_zulip_worker(zulip_queue, zulip_client)).start()
if options.resend_log_path is not None:
with open(options.resend_log_path, "a") as log:
process_loop(log)
process_loop(zulip_queue, log)
else:
process_loop(None)
process_loop(zulip_queue, None)
def send_zephyr(zwrite_args: List[str], content: str) -> Tuple[int, str]:
@ -675,7 +697,7 @@ def zcrypt_encrypt_content(zephyr_class: str, instance: str, content: str) -> Op
return encrypted
def forward_to_zephyr(message: Dict[str, Any]) -> None:
def forward_to_zephyr(message: Dict[str, Any], zulip_client: zulip.Client) -> None:
# 'Any' can be of any type of text
support_heading = "Hi there! This is an automated message from Zulip."
support_closing = """If you have any questions, please be in touch through the \
@ -749,6 +771,7 @@ Feedback button or at support@zulip.com."""
result = zcrypt_encrypt_content(zephyr_class, instance, wrapped_content)
if result is None:
send_error_zulip(
zulip_client,
"""%s
Your Zulip-Zephyr mirror bot was unable to forward that last message \
@ -758,7 +781,7 @@ key (perhaps because your AFS tokens expired). That means that while \
Zulip users (like you) received it, Zephyr users did not.
%s"""
% (support_heading, support_closing)
% (support_heading, support_closing),
)
return
@ -775,6 +798,7 @@ Zulip users (like you) received it, Zephyr users did not.
return
elif code == 0:
send_error_zulip(
zulip_client,
"""%s
Your last message was successfully mirrored to zephyr, but zwrite \
@ -783,7 +807,7 @@ returned the following warning:
%s
%s"""
% (support_heading, stderr, support_closing)
% (support_heading, stderr, support_closing),
)
return
elif code != 0 and (
@ -797,6 +821,7 @@ returned the following warning:
if options.ignore_expired_tickets:
return
send_error_zulip(
zulip_client,
"""%s
Your last message was forwarded from Zulip to Zephyr unauthenticated, \
@ -806,7 +831,7 @@ are running the Zulip-Zephyr mirroring bot, so we can send \
authenticated Zephyr messages for you again.
%s"""
% (support_heading, support_closing)
% (support_heading, support_closing),
)
return
@ -814,6 +839,7 @@ authenticated Zephyr messages for you again.
# probably because the recipient isn't subscribed to personals,
# but regardless, we should just notify the user.
send_error_zulip(
zulip_client,
"""%s
Your Zulip-Zephyr mirror bot was unable to forward that last message \
@ -823,12 +849,12 @@ received it, Zephyr users did not. The error message from zwrite was:
%s
%s"""
% (support_heading, stderr, support_closing)
% (support_heading, stderr, support_closing),
)
return
def maybe_forward_to_zephyr(message: Dict[str, Any]) -> None:
def maybe_forward_to_zephyr(message: Dict[str, Any], zulip_client: zulip.Client) -> None:
# The key string can be used to direct any type of text.
if message["sender_email"] == zulip_account_email:
if not (
@ -851,7 +877,7 @@ def maybe_forward_to_zephyr(message: Dict[str, Any]) -> None:
)
return
try:
forward_to_zephyr(message)
forward_to_zephyr(message, zulip_client)
except Exception:
# Don't let an exception forwarding one message crash the
# whole process
@ -859,12 +885,16 @@ def maybe_forward_to_zephyr(message: Dict[str, Any]) -> None:
def zulip_to_zephyr(options: optparse.Values) -> NoReturn:
zulip_client = make_zulip_client()
# Sync messages from zulip to zephyr
logger.info("Starting syncing messages.")
backoff = RandomExponentialBackoff(timeout_success_equivalent=120)
while True:
try:
zulip_client.call_on_each_message(maybe_forward_to_zephyr)
zulip_client.call_on_each_message(
lambda message: maybe_forward_to_zephyr(message, zulip_client)
)
except Exception:
logger.exception("Error syncing messages:")
backoff.fail()
@ -886,6 +916,8 @@ def subscribed_to_mail_messages() -> bool:
def add_zulip_subscriptions(verbose: bool) -> None:
zulip_client = make_zulip_client()
zephyr_subscriptions = set()
skipped = set()
for (cls, instance, recipient) in parse_zephyr_subs(verbose=verbose):
@ -1146,7 +1178,7 @@ def parse_args() -> Tuple[optparse.Values, List[str]]:
def die_gracefully(signal: int, frame: FrameType) -> None:
if CURRENT_STATE == States.ZulipToZephyr or CURRENT_STATE == States.ChildSending:
if CURRENT_STATE == States.ZulipToZephyr:
# this is a child process, so we want os._exit (no clean-up necessary)
os._exit(1)
@ -1207,15 +1239,6 @@ or specify the --api-key-file option."""
sys.exit(1)
zulip_account_email = options.user + "@mit.edu"
import zulip
zulip_client = zulip.Client(
email=zulip_account_email,
api_key=api_key,
verbose=True,
client="zephyr_mirror",
site=options.site,
)
start_time = time.time()