Compare commits

..

10 commits

Author SHA1 Message Date
Anders Kaseorg 02586f1d34 zephyr_mirror: Port sharding wrapper to asyncio.
Signed-off-by: Anders Kaseorg <anders@zulip.com>
2023-01-25 16:53:09 -08:00
Anders Kaseorg 7831d979c9 zephyr: Free received notices with ZFreeNotice.
Signed-off-by: Anders Kaseorg <anders@zulip.com>
2023-01-24 12:23:44 -08:00
Anders Kaseorg c94da617ed Remove Python 3.6; add Python 3.10.
Signed-off-by: Anders Kaseorg <anders@zulip.com>
2023-01-24 12:04:59 -08:00
Tim Abbott 4a3d225a38 mailmap: Add a few entries. 2022-11-16 15:54:34 -08:00
rht 582e9733a9 Slack bridge: Inform user to subscribe their Slack bot. 2022-10-19 15:17:15 -07:00
rht eef02fbb76 Slack bridge: Implement multiple channels bridges. 2022-10-19 15:17:15 -07:00
Robert Imschweiler 41ec1a9a29 matrix bridge: Improve mime-type detection. 2022-09-22 09:13:19 -07:00
Anders Kaseorg 92120914f8 process_ccache: Adjust supervisor_path to avoid Puppet purging.
Signed-off-by: Anders Kaseorg <anders@zulip.com>
2022-09-13 16:43:40 -07:00
Anders Kaseorg a534446315 zephyr: Remove python-zephyr in favor of ctypes.
Our custom patched version of python-zephyr only worked on Python 2.
Now we don’t need python-zephyr at all.

Signed-off-by: Anders Kaseorg <anders@zulip.com>
2022-09-13 16:43:40 -07:00
Anders Kaseorg 56f805a5d7 zephyr_mirror_backend: Fix thread safety problems.
As of commit 5eaac7bfba (#18),
zulip.Client is not thread-safe and especially not fork-safe due to
connections held open by requests.Session.

Delay construction of the Client until after forking off
zulip_to_zephyr.  Replace the fork for each message sent by
zephyr_to_zulip with a threaded queue worker.

Signed-off-by: Anders Kaseorg <anders@zulip.com>
2022-09-13 16:43:40 -07:00
19 changed files with 557 additions and 211 deletions

View file

@ -17,33 +17,24 @@ jobs:
fail-fast: false fail-fast: false
matrix: matrix:
include: include:
# Base images are built using `tools/ci/Dockerfile.prod.template`. # Focal ships with Python 3.8.10.
# The comments at the top explain how to build and upload these images.
# Bionic ships with Python 3.6.
- docker_image: zulip/ci:bionic
name: Ubuntu 18.04 Bionic (Python 3.6, backend)
os: bionic
is_bionic: true
include_frontend_tests: false
# Configure this test to run with the Zulip 3.2 release.
legacy_client_interface: 3
server_version: refs/tags/3.2
# Focal ships with Python 3.8.2.
- docker_image: zulip/ci:focal - docker_image: zulip/ci:focal
name: Ubuntu 20.04 Focal (Python 3.8, backend) name: Ubuntu 20.04 (Python 3.8, backend)
os: focal os: focal
is_focal: true legacy_client_interface: "3"
include_frontend_tests: false server_version: refs/tags/3.2
legacy_client_interface: 4
server_version: refs/tags/4.0
# Bullseye ships with Python 3.9.2. # Bullseye ships with Python 3.9.2.
- docker_image: zulip/ci:bullseye - docker_image: zulip/ci:bullseye
name: Debian 11 Bullseye (Python 3.9, backend) name: Debian 11 (Python 3.9, backend)
os: bullseye os: bullseye
is_bullseye: true legacy_client_interface: "4"
include_frontend_tests: false
legacy_client_interface: 4
server_version: refs/tags/4.0 server_version: refs/tags/4.0
# Ubuntu 22.04 ships with Python 3.10.6.
- docker_image: zulip/ci:jammy
name: Ubuntu 22.04 (Python 3.10, backend)
os: jammy
legacy_client_interface: "6"
server_version: refs/tags/6.0
runs-on: ubuntu-latest runs-on: ubuntu-latest
name: ${{ matrix.name }} (Zulip ${{matrix.server_version}}) name: ${{ matrix.name }} (Zulip ${{matrix.server_version}})
@ -58,25 +49,18 @@ jobs:
HOME: /home/github/ HOME: /home/github/
steps: steps:
- name: 'Checkout python-zulip-api' - name: "Check out python-zulip-api"
uses: actions/checkout@v2 uses: actions/checkout@v2
with: with:
path: api path: api
- name: 'Checkout Zulip server ${{ matrix.server_version }}' - name: "Check out Zulip server ${{ matrix.server_version }}"
uses: actions/checkout@v2 uses: actions/checkout@v2
with: with:
repository: zulip/zulip repository: zulip/zulip
ref: ${{ matrix.server_version }} ref: ${{ matrix.server_version }}
path: server path: server
- name: Do Bionic hack
if: ${{ matrix.is_bionic }}
run: |
# Temporary hack till `sudo service redis-server start` gets fixes in Bionic. See
# https://chat.zulip.org/#narrow/stream/3-backend/topic/Ubuntu.20bionic.20CircleCI
sudo sed -i '/^bind/s/bind.*/bind 0.0.0.0/' /etc/redis/redis.conf
- name: Install dependencies - name: Install dependencies
run: | run: |
cd server cd server

View file

@ -13,10 +13,10 @@ jobs:
steps: steps:
- uses: actions/checkout@v2 - uses: actions/checkout@v2
- name: Setup Python 3.6 - name: Set up Python 3.7
uses: actions/setup-python@v2 uses: actions/setup-python@v2
with: with:
python-version: 3.6 python-version: "3.7"
- name: Install dependencies - name: Install dependencies
run: tools/provision --force run: tools/provision --force
@ -32,15 +32,12 @@ jobs:
fail-fast: false fail-fast: false
matrix: matrix:
os: [ubuntu-latest, windows-latest] os: [ubuntu-latest, windows-latest]
python-version: [3.6, 3.7, 3.8, 3.9] python-version: ["3.7", "3.8", "3.9", "3.10"]
exclude:
- os: windows-latest
python-version: 3.6 # cryptography install fails on Windows with python 3.6 since pip is quite old.
steps: steps:
- uses: actions/checkout@v2 - uses: actions/checkout@v2
- name: Setup Python ${{ matrix.python-version }} - name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2 uses: actions/setup-python@v2
with: with:
python-version: ${{ matrix.python-version }} python-version: ${{ matrix.python-version }}

View file

@ -1,3 +1,4 @@
Aman Agrawal <f2016561@pilani.bits-pilani.ac.in>
Anders Kaseorg <anders@zulip.com> <anders@zulipchat.com> Anders Kaseorg <anders@zulip.com> <anders@zulipchat.com>
Anders Kaseorg <anders@zulip.com> <andersk@mit.edu> Anders Kaseorg <anders@zulip.com> <andersk@mit.edu>
Rishi Gupta <rishig@zulip.com> <rishig@zulipchat.com> Rishi Gupta <rishig@zulip.com> <rishig@zulipchat.com>
@ -7,3 +8,4 @@ Steve Howell <showell@zulip.com> <steve@zulip.com>
Tim Abbott <tabbott@zulip.com> <tabbott@humbughq.com> Tim Abbott <tabbott@zulip.com> <tabbott@humbughq.com>
Tim Abbott <tabbott@zulip.com> <tabbott@mit.edu> Tim Abbott <tabbott@zulip.com> <tabbott@mit.edu>
Tim Abbott <tabbott@zulip.com> <tabbott@zulipchat.com> Tim Abbott <tabbott@zulip.com> <tabbott@zulipchat.com>
Zixuan James Li <p359101898@gmail.com> <359101898@qq.com>

View file

@ -1,8 +1,14 @@
[tool.black] [tool.black]
line-length = 100 line-length = 100
target-version = ["py36"] target-version = ["py37"]
[tool.isort] [tool.isort]
src_paths = ["tools", "zulip", "zulip_bots", "zulip_botserver"] src_paths = [
"tools",
"zulip",
"zulip/integrations/zephyr",
"zulip_bots",
"zulip_botserver",
]
profile = "black" profile = "black"
line_length = 100 line_length = 100

View file

@ -3,7 +3,7 @@
The [Zulip API](https://zulip.com/api) Python bindings require the The [Zulip API](https://zulip.com/api) Python bindings require the
following dependencies: following dependencies:
* **Python (version >= 3.6)** * **Python (version >= 3.7)**
* requests (version >= 0.12.1) * requests (version >= 0.12.1)
**Note**: If you'd like to use the Zulip bindings with Python 2, we **Note**: If you'd like to use the Zulip bindings with Python 2, we

View file

@ -14,9 +14,6 @@ from concurrent.futures import ThreadPoolExecutor
from io import BytesIO from io import BytesIO
from typing import Any, Dict, List, Match, Optional, Set, Tuple, Type, Union from typing import Any, Dict, List, Match, Optional, Set, Tuple, Type, Union
if os.name != "nt":
import magic
import magic.compat
import nio import nio
from nio.responses import ( from nio.responses import (
DownloadError, DownloadError,
@ -352,17 +349,15 @@ class ZulipToMatrix:
if result["result"] != "success": if result["result"] != "success":
success = False success = False
continue continue
try: try:
file_content: bytes = urllib.request.urlopen(self.server_url + result["url"]).read() with urllib.request.urlopen(self.server_url + result["url"]) as response:
file_content: bytes = response.read()
mimetype: str = response.headers.get_content_type()
except Exception: except Exception:
success = False success = False
continue continue
mimetype: str
if os.name == "nt":
mimetype = "m.file"
else:
mimetype = magic.from_buffer(file_content, mime=True)
filename: str = file.split("/")[-1] filename: str = file.split("/")[-1]
response, _ = await self.matrix_client.upload( response, _ = await self.matrix_client.upload(

View file

@ -1,3 +1 @@
matrix-nio matrix-nio
python-magic
python-magic-bin; platform_system == "Windows"

View file

@ -41,7 +41,7 @@ topic = matrix
ZULIP_MESSAGE_TEMPLATE: str = "**{username}** [{uid}]: {message}" ZULIP_MESSAGE_TEMPLATE: str = "**{username}** [{uid}]: {message}"
# For Python 3.6 compatibility. # For Python 3.7 compatibility.
# (Since 3.8, there is unittest.IsolatedAsyncioTestCase!) # (Since 3.8, there is unittest.IsolatedAsyncioTestCase!)
# source: https://stackoverflow.com/a/46324983 # source: https://stackoverflow.com/a/46324983
def async_test(coro: Callable[..., Awaitable[Any]]) -> Callable[..., Any]: def async_test(coro: Callable[..., Awaitable[Any]]) -> Callable[..., Any]:

View file

@ -3,12 +3,20 @@ config = {
"email": "zulip-bot@email.com", "email": "zulip-bot@email.com",
"api_key": "put api key here", "api_key": "put api key here",
"site": "https://chat.zulip.org", "site": "https://chat.zulip.org",
"stream": "test here",
"topic": "<- slack-bridge",
}, },
"slack": { "slack": {
"username": "slack_username", "username": "slack_username",
"token": "xoxb-your-slack-token", "token": "xoxb-your-slack-token",
"channel": "C5Z5N7R8A -- must be channel id", },
# Mapping between Slack channels and Zulip stream-topic's.
# You can specify multiple pairs.
"channel_mapping": {
# Slack channel; must be channel ID
"C5Z5N7R8A": {
# Zulip stream
"stream": "test here",
# Zulip topic
"topic": "<- slack-bridge",
},
}, },
} }

View file

@ -5,7 +5,7 @@ import os
import sys import sys
import threading import threading
import traceback import traceback
from typing import Any, Callable, Dict from typing import Any, Callable, Dict, Optional, Tuple
import bridge_with_slack_config import bridge_with_slack_config
import slack_sdk import slack_sdk
@ -17,18 +17,28 @@ import zulip
ZULIP_MESSAGE_TEMPLATE = "**{username}**: {message}" ZULIP_MESSAGE_TEMPLATE = "**{username}**: {message}"
SLACK_MESSAGE_TEMPLATE = "<{username}> {message}" SLACK_MESSAGE_TEMPLATE = "<{username}> {message}"
StreamTopicT = Tuple[str, str]
def check_zulip_message_validity(msg: Dict[str, Any], config: Dict[str, Any]) -> bool:
def get_slack_channel_for_zulip_message(
msg: Dict[str, Any], zulip_to_slack_map: Dict[StreamTopicT, Any], bot_email: str
) -> Optional[str]:
is_a_stream = msg["type"] == "stream" is_a_stream = msg["type"] == "stream"
in_the_specified_stream = msg["display_recipient"] == config["stream"] if not is_a_stream:
at_the_specified_subject = msg["subject"] == config["topic"] return None
# We do this to identify the messages generated from Matrix -> Zulip stream_name = msg["display_recipient"]
# and we make sure we don't forward it again to the Matrix. topic_name = msg["subject"]
not_from_zulip_bot = msg["sender_email"] != config["email"] stream_topic: StreamTopicT = (stream_name, topic_name)
if is_a_stream and not_from_zulip_bot and in_the_specified_stream and at_the_specified_subject: if stream_topic not in zulip_to_slack_map:
return True return None
return False
# We do this to identify the messages generated from Slack -> Zulip
# and we make sure we don't forward it again to the Slack.
from_zulip_bot = msg["sender_email"] == bot_email
if from_zulip_bot:
return None
return zulip_to_slack_map[stream_topic]
class SlackBridge: class SlackBridge:
@ -37,14 +47,17 @@ class SlackBridge:
self.zulip_config = config["zulip"] self.zulip_config = config["zulip"]
self.slack_config = config["slack"] self.slack_config = config["slack"]
self.slack_to_zulip_map: Dict[str, Dict[str, str]] = config["channel_mapping"]
self.zulip_to_slack_map: Dict[StreamTopicT, str] = {
(z["stream"], z["topic"]): s for s, z in config["channel_mapping"].items()
}
# zulip-specific # zulip-specific
self.zulip_client = zulip.Client( self.zulip_client = zulip.Client(
email=self.zulip_config["email"], email=self.zulip_config["email"],
api_key=self.zulip_config["api_key"], api_key=self.zulip_config["api_key"],
site=self.zulip_config["site"], site=self.zulip_config["site"],
) )
self.zulip_stream = self.zulip_config["stream"]
self.zulip_subject = self.zulip_config["topic"]
# slack-specific # slack-specific
self.channel = self.slack_config["channel"] self.channel = self.slack_config["channel"]
@ -68,14 +81,16 @@ class SlackBridge:
def zulip_to_slack(self) -> Callable[[Dict[str, Any]], None]: def zulip_to_slack(self) -> Callable[[Dict[str, Any]], None]:
def _zulip_to_slack(msg: Dict[str, Any]) -> None: def _zulip_to_slack(msg: Dict[str, Any]) -> None:
message_valid = check_zulip_message_validity(msg, self.zulip_config) slack_channel = get_slack_channel_for_zulip_message(
if message_valid: msg, self.zulip_to_slack_map, self.zulip_config["email"]
)
if slack_channel is not None:
self.wrap_slack_mention_with_bracket(msg) self.wrap_slack_mention_with_bracket(msg)
slack_text = SLACK_MESSAGE_TEMPLATE.format( slack_text = SLACK_MESSAGE_TEMPLATE.format(
username=msg["sender_full_name"], message=msg["content"] username=msg["sender_full_name"], message=msg["content"]
) )
self.slack_webclient.chat_postMessage( self.slack_webclient.chat_postMessage(
channel=self.channel, channel=slack_channel,
text=slack_text, text=slack_text,
) )
@ -91,7 +106,7 @@ class SlackBridge:
@rtm.on("message") @rtm.on("message")
def slack_to_zulip(client: RTMClient, event: Dict[str, Any]) -> None: def slack_to_zulip(client: RTMClient, event: Dict[str, Any]) -> None:
if event["channel"] != self.channel: if event["channel"] not in self.slack_to_zulip_map:
return return
user_id = event["user"] user_id = event["user"]
user = self.slack_id_to_name[user_id] user = self.slack_id_to_name[user_id]
@ -100,8 +115,12 @@ class SlackBridge:
return return
self.replace_slack_id_with_name(event) self.replace_slack_id_with_name(event)
content = ZULIP_MESSAGE_TEMPLATE.format(username=user, message=event["text"]) content = ZULIP_MESSAGE_TEMPLATE.format(username=user, message=event["text"])
zulip_endpoint = self.slack_to_zulip_map[event["channel"]]
msg_data = dict( msg_data = dict(
type="stream", to=self.zulip_stream, subject=self.zulip_subject, content=content type="stream",
to=zulip_endpoint["stream"],
subject=zulip_endpoint["topic"],
content=content,
) )
self.zulip_client.send_message(msg_data) self.zulip_client.send_message(msg_data)
@ -118,10 +137,16 @@ if __name__ == "__main__":
sys.path.append(os.path.join(os.path.dirname(__file__), "..")) sys.path.append(os.path.join(os.path.dirname(__file__), ".."))
parser = argparse.ArgumentParser(usage=usage) parser = argparse.ArgumentParser(usage=usage)
print("Starting slack mirroring bot") config: Dict[str, Any] = bridge_with_slack_config.config
print("MAKE SURE THE BOT IS SUBSCRIBED TO THE RELEVANT ZULIP STREAM") if "channel_mapping" not in config:
print(
'The key "channel_mapping" is not found in bridge_with_slack_config.py.\n'
"Your config file may be outdated."
)
exit(1)
config = bridge_with_slack_config.config print("Starting slack mirroring bot")
print("MAKE SURE THE BOT IS SUBSCRIBED TO THE RELEVANT ZULIP STREAM(S) & SLACK CHANNEL(S)!")
# We have to define rtm outside of SlackBridge because the rtm variable is used as a method decorator. # We have to define rtm outside of SlackBridge because the rtm variable is used as a method decorator.
rtm = RTMClient(token=config["slack"]["token"]) rtm = RTMClient(token=config["slack"]["token"])

View file

@ -6,10 +6,10 @@ import random
import subprocess import subprocess
import sys import sys
import time import time
from ctypes import byref, c_int, c_ushort
from typing import Dict, List, Set, Tuple from typing import Dict, List, Set, Tuple
import zephyr import zephyr_ctypes
import zulip import zulip
parser = optparse.OptionParser() parser = optparse.OptionParser()
@ -136,9 +136,43 @@ for (stream, test) in test_streams:
actually_subscribed = False actually_subscribed = False
for tries in range(10): for tries in range(10):
try: try:
zephyr.init() zephyr_ctypes.check(zephyr_ctypes.ZInitialize())
zephyr._z.subAll(zephyr_subs_to_add) zephyr_port = c_ushort()
zephyr_subs = zephyr._z.getSubscriptions() zephyr_ctypes.check(zephyr_ctypes.ZOpenPort(byref(zephyr_port)))
zephyr_ctypes.check(zephyr_ctypes.ZCancelSubscriptions(0))
zephyr_ctypes.check(
zephyr_ctypes.ZSubscribeTo(
(zephyr_ctypes.ZSubscription_t * len(zephyr_subs_to_add))(
*(
zephyr_ctypes.ZSubscription_t(
zsub_class=cls.encode(),
zsub_classinst=instance.encode(),
zsub_recipient=recipient.encode(),
)
for cls, instance, recipient in zephyr_subs_to_add
)
),
len(zephyr_subs_to_add),
0,
)
)
try:
nsubs = c_int()
zephyr_ctypes.check(zephyr_ctypes.ZRetrieveSubscriptions(0, byref(nsubs)))
zsubs = (zephyr_ctypes.ZSubscription_t * nsubs.value)()
zephyr_ctypes.check(zephyr_ctypes.ZGetSubscriptions(zsubs, byref(nsubs)))
zephyr_subs = {
(
zsub.zsub_class.decode(),
zsub.zsub_classinst.decode(),
zsub.zsub_recipient.decode(),
)
for zsub in zsubs
}
finally:
zephyr_ctypes.ZFlushSubscriptions()
missing = 0 missing = 0
for elt in zephyr_subs_to_add: for elt in zephyr_subs_to_add:
@ -148,8 +182,8 @@ for tries in range(10):
if missing == 0: if missing == 0:
actually_subscribed = True actually_subscribed = True
break break
except OSError as e: except zephyr_ctypes.ZephyrError as e:
if "SERVNAK received" in e.args: if e.code == zephyr_ctypes.ZERR_SERVNAK:
logger.error("SERVNAK repeatedly received, punting rest of test") logger.error("SERVNAK repeatedly received, punting rest of test")
else: else:
logger.exception("Exception subscribing to zephyrs") logger.exception("Exception subscribing to zephyrs")
@ -185,15 +219,16 @@ notices = []
# receive queue with 30+ messages, which might result in messages # receive queue with 30+ messages, which might result in messages
# being dropped. # being dropped.
def receive_zephyrs() -> None: def receive_zephyrs() -> None:
while True: while zephyr_ctypes.ZPending() != 0:
notice = zephyr_ctypes.ZNotice_t()
sender = zephyr_ctypes.sockaddr_in()
try: try:
notice = zephyr.receive(block=False) zephyr_ctypes.check(zephyr_ctypes.ZReceiveNotice(byref(notice), byref(sender)))
except Exception: except zephyr_ctypes.ZephyrError:
logging.exception("Exception receiving zephyrs:") logging.exception("Exception receiving zephyrs:")
notice = None
if notice is None:
break break
if notice.opcode != "": if notice.z_opcode != b"":
zephyr_ctypes.ZFreeNotice(byref(notice))
continue continue
notices.append(notice) notices.append(notice)
@ -294,10 +329,16 @@ def process_keys(content_list: List[str]) -> Tuple[Dict[str, int], Set[str], Set
# The h_foo variables are about the messages we _received_ in Zulip # The h_foo variables are about the messages we _received_ in Zulip
# The z_foo variables are about the messages we _received_ in Zephyr # The z_foo variables are about the messages we _received_ in Zephyr
h_contents = [message["content"] for message in messages] h_contents = [message["content"] for message in messages]
z_contents = [notice.message.split("\0")[1] for notice in notices] z_contents = [
notice.z_message[: notice.z_message_len].split(b"\0")[1].decode(errors="replace")
for notice in notices
]
(h_key_counts, h_missing_z, h_missing_h, h_duplicates, h_success) = process_keys(h_contents) (h_key_counts, h_missing_z, h_missing_h, h_duplicates, h_success) = process_keys(h_contents)
(z_key_counts, z_missing_z, z_missing_h, z_duplicates, z_success) = process_keys(z_contents) (z_key_counts, z_missing_z, z_missing_h, z_duplicates, z_success) = process_keys(z_contents)
for notice in notices:
zephyr_ctypes.ZFreeNotice(byref(notice))
if z_success and h_success: if z_success and h_success:
logger.info("Success!") logger.info("Success!")
print_status_and_exit(0) print_status_and_exit(0)

View file

@ -18,7 +18,7 @@ api_key_path = f"/home/zulip/api-keys/{program_name}"
open(api_key_path, "w").write(api_key + "\n") open(api_key_path, "w").write(api_key + "\n")
# Setup supervisord configuration # Setup supervisord configuration
supervisor_path = f"/etc/supervisor/conf.d/zulip/{program_name}.conf" supervisor_path = f"/etc/supervisor/conf.d/zmirror/{program_name}.conf"
template = os.path.join(os.path.dirname(__file__), "zmirror_private.conf.template") template = os.path.join(os.path.dirname(__file__), "zmirror_private.conf.template")
template_data = open(template).read() template_data = open(template).read()
session_path = f"/home/zulip/zephyr_sessions/{program_name}" session_path = f"/home/zulip/zephyr_sessions/{program_name}"

View file

@ -0,0 +1,207 @@
from ctypes import (
CDLL,
CFUNCTYPE,
POINTER,
Structure,
Union,
c_char,
c_char_p,
c_int,
c_long,
c_uint,
c_uint8,
c_uint16,
c_uint32,
c_ushort,
c_void_p,
)
libc = CDLL("libc.so.6")
com_err = CDLL("libcom_err.so.2")
libzephyr = CDLL("libzephyr.so.4")
# --- glibc/bits/sockaddr.h ---
sa_family_t = c_ushort
# --- glibc/sysdeps/unix/sysv/linux/bits/socket.h ---
class sockaddr(Structure):
_fields_ = [
("sa_family", sa_family_t),
("sa_data", c_char * 14),
]
# --- glibc/inet/netinet/in.h ---
in_port_t = c_uint16
in_addr_t = c_uint32
class in_addr(Structure):
_fields_ = [
("s_addr", in_addr_t),
]
class sockaddr_in(Structure):
_fields_ = [
("sin_family", sa_family_t),
("sin_port", in_port_t),
("sin_addr", in_addr),
("sin_zero", c_uint8 * 8),
]
class in6_addr(Structure):
_fields_ = [
("s6_addr", c_uint8 * 16),
]
class sockaddr_in6(Structure):
_fields_ = [
("sin6_family", sa_family_t),
("sin6_port", in_port_t),
("sin6_flowinfo", c_uint32),
("sin6_addr", in6_addr),
("sin6_scope_id", c_uint32),
]
# --- glibc/stdlib/stdlib.h ---
free = CFUNCTYPE(None, c_void_p)(("free", libc))
# --- e2fsprogs/lib/et/com_err.h ---
error_message = CFUNCTYPE(c_char_p, c_long)(("error_message", com_err))
# --- zephyr/h/zephyr/zephyr.h ---
Z_MAXOTHERFIELDS = 10
ZNotice_Kind_t = c_int
class _ZTimeval(Structure):
_fields_ = [
("tv_sec", c_int),
("tv_usec", c_int),
]
class ZUnique_Id_t(Structure):
_fields_ = [
("zuid_addr", in_addr),
("tv", _ZTimeval),
]
ZChecksum_t = c_uint
class _ZSenderSockaddr(Union):
_fields_ = [
("sa", sockaddr),
("ip4", sockaddr_in),
("ip6", sockaddr_in6),
]
class ZNotice_t(Structure):
_fields_ = [
("z_packet", c_char_p),
("z_version", c_char_p),
("z_kind", ZNotice_Kind_t),
("z_uid", ZUnique_Id_t),
("z_sender_sockaddr", _ZSenderSockaddr),
("z_time", _ZTimeval),
("z_port", c_ushort),
("z_charset", c_ushort),
("z_auth", c_int),
("z_checked_auth", c_int),
("z_authent_len", c_int),
("z_ascii_authent", c_char_p),
("z_class", c_char_p),
("z_class_inst", c_char_p),
("z_opcode", c_char_p),
("z_sender", c_char_p),
("z_recipient", c_char_p),
("z_default_format", c_char_p),
("z_multinotice", c_char_p),
("z_multiuid", ZUnique_Id_t),
("z_checksum", ZChecksum_t),
("z_ascii_checksum", c_char_p),
("z_num_other_fields", c_int),
("z_other_fields", c_char_p * Z_MAXOTHERFIELDS),
("z_message", POINTER(c_char)),
("z_message_len", c_int),
("z_num_hdr_fields", c_uint),
("z_hdr_fields", POINTER(c_char_p)),
]
class ZSubscription_t(Structure):
_fields_ = [
("zsub_recipient", c_char_p),
("zsub_class", c_char_p),
("zsub_classinst", c_char_p),
]
Code_t = c_int
ZInitialize = CFUNCTYPE(Code_t)(("ZInitialize", libzephyr))
ZRetrieveSubscriptions = CFUNCTYPE(Code_t, c_ushort, POINTER(c_int))(
("ZRetrieveSubscriptions", libzephyr)
)
ZGetSubscriptions = CFUNCTYPE(Code_t, POINTER(ZSubscription_t), POINTER(c_int))(
("ZGetSubscriptions", libzephyr)
)
ZOpenPort = CFUNCTYPE(Code_t, POINTER(c_ushort))(("ZOpenPort", libzephyr))
ZFlushSubscriptions = CFUNCTYPE(Code_t)(("ZFlushSubscriptions", libzephyr))
ZFreeNotice = CFUNCTYPE(Code_t, POINTER(ZNotice_t))(("ZFreeNotice", libzephyr))
ZSubscribeTo = CFUNCTYPE(Code_t, POINTER(ZSubscription_t), c_int, c_uint)(
("ZSubscribeTo", libzephyr)
)
ZCancelSubscriptions = CFUNCTYPE(Code_t, c_uint)(("ZCancelSubscriptions", libzephyr))
ZPending = CFUNCTYPE(c_int)(("ZPending", libzephyr))
ZReceiveNotice = CFUNCTYPE(Code_t, POINTER(ZNotice_t), POINTER(sockaddr_in))(
("ZReceiveNotice", libzephyr)
)
ZDumpSession = CFUNCTYPE(Code_t, POINTER(POINTER(c_char)), POINTER(c_int))(
("ZDumpSession", libzephyr)
)
ZLoadSession = CFUNCTYPE(Code_t, POINTER(c_char), c_int)(("ZLoadSession", libzephyr))
ZGetFD = CFUNCTYPE(c_int)(("ZGetFD", libzephyr))
ZERR_NONE = 0
# --- zephyr/lib/zephyr_err.et ---
ERROR_TABLE_BASE_zeph = -772103680
ZERR_SERVNAK = ERROR_TABLE_BASE_zeph + 16
# --- convenience helpers ---
class ZephyrError(Exception):
def __init__(self, code: int) -> None:
self.code = code
def __str__(self) -> str:
return error_message(self.code).decode()
def check(code: int) -> None:
if code != ZERR_NONE:
raise ZephyrError(code)

View file

@ -1,5 +1,6 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import asyncio
import os import os
import signal import signal
import subprocess import subprocess
@ -32,21 +33,22 @@ if options.sync_subscriptions:
sys.exit(0) sys.exit(0)
if options.forward_class_messages and not options.noshard: if options.forward_class_messages and not options.noshard:
# Needed to get access to zephyr.lib.parallel
sys.path.append("/home/zulip/zulip")
if options.on_startup_command is not None: if options.on_startup_command is not None:
subprocess.call([options.on_startup_command]) subprocess.call([options.on_startup_command])
from zerver.lib.parallel import run_parallel
print("Starting parallel zephyr class mirroring bot") print("Starting parallel zephyr class mirroring bot")
jobs = list("0123456789abcdef") shards = list("0123456789abcdef")
def run_job(shard: str) -> int: async def run_shard(shard: str) -> int:
subprocess.call(args + [f"--shard={shard}"]) process = await asyncio.create_subprocess_exec(*args, f"--shard={shard}")
return 0 return await process.wait()
for (status, job) in run_parallel(run_job, jobs, threads=16): async def run_shards():
for coro in asyncio.as_completed(map(run_shard, shards)):
await coro
print("A mirroring shard died!") print("A mirroring shard died!")
asyncio.run(run_shards())
sys.exit(0) sys.exit(0)
backoff = RandomExponentialBackoff(timeout_success_equivalent=300) backoff = RandomExponentialBackoff(timeout_success_equivalent=300)

View file

@ -13,18 +13,23 @@ import sys
import tempfile import tempfile
import textwrap import textwrap
import time import time
from ctypes import POINTER, byref, c_char, c_int, c_ushort
from queue import Queue
from threading import Thread
from types import FrameType from types import FrameType
from typing import IO, Any, Dict, List, NoReturn, Optional, Set, Tuple, Union from typing import IO, Any, Dict, List, NoReturn, Optional, Set, Tuple, Union
from typing_extensions import Literal, TypedDict from typing_extensions import Literal, TypedDict
import zephyr_ctypes
import zulip
from zulip import RandomExponentialBackoff from zulip import RandomExponentialBackoff
DEFAULT_SITE = "https://api.zulip.com" DEFAULT_SITE = "https://api.zulip.com"
class States: class States:
Startup, ZulipToZephyr, ZephyrToZulip, ChildSending = list(range(4)) Startup, ZulipToZephyr, ZephyrToZulip = list(range(3))
CURRENT_STATE = States.Startup CURRENT_STATE = States.Startup
@ -32,6 +37,16 @@ CURRENT_STATE = States.Startup
logger: logging.Logger logger: logging.Logger
def make_zulip_client() -> zulip.Client:
return zulip.Client(
email=zulip_account_email,
api_key=api_key,
verbose=True,
client="zephyr_mirror",
site=options.site,
)
def to_zulip_username(zephyr_username: str) -> str: def to_zulip_username(zephyr_username: str) -> str:
if "@" in zephyr_username: if "@" in zephyr_username:
(user, realm) = zephyr_username.split("@") (user, realm) = zephyr_username.split("@")
@ -117,7 +132,7 @@ class ZephyrDict(TypedDict, total=False):
zsig: str zsig: str
def send_zulip(zeph: ZephyrDict) -> Dict[str, Any]: def send_zulip(zulip_client: zulip.Client, zeph: ZephyrDict) -> Dict[str, Any]:
message: Dict[str, Any] message: Dict[str, Any]
message = {} message = {}
if options.forward_class_messages: if options.forward_class_messages:
@ -151,7 +166,7 @@ def send_zulip(zeph: ZephyrDict) -> Dict[str, Any]:
return zulip_client.send_message(message) return zulip_client.send_message(message)
def send_error_zulip(error_msg: str) -> None: def send_error_zulip(zulip_client: zulip.Client, error_msg: str) -> None:
message = { message = {
"type": "private", "type": "private",
"sender": zulip_account_email, "sender": zulip_account_email,
@ -166,8 +181,23 @@ current_zephyr_subs = set()
def zephyr_bulk_subscribe(subs: List[Tuple[str, str, str]]) -> None: def zephyr_bulk_subscribe(subs: List[Tuple[str, str, str]]) -> None:
try: try:
zephyr._z.subAll(subs) zephyr_ctypes.check(
except OSError: zephyr_ctypes.ZSubscribeTo(
(zephyr_ctypes.ZSubscription_t * len(subs))(
*(
zephyr_ctypes.ZSubscription_t(
zsub_class=cls.encode(),
zsub_classinst=instance.encode(),
zsub_recipient=recipient.encode(),
)
for cls, instance, recipient in subs
)
),
len(subs),
0,
)
)
except zephyr_ctypes.ZephyrError:
# Since we haven't added the subscription to # Since we haven't added the subscription to
# current_zephyr_subs yet, we can just return (so that we'll # current_zephyr_subs yet, we can just return (so that we'll
# continue processing normal messages) and we'll end up # continue processing normal messages) and we'll end up
@ -176,26 +206,41 @@ def zephyr_bulk_subscribe(subs: List[Tuple[str, str, str]]) -> None:
logger.exception("Error subscribing to streams (will retry automatically):") logger.exception("Error subscribing to streams (will retry automatically):")
logger.warning(f"Streams were: {[cls for cls, instance, recipient in subs]}") logger.warning(f"Streams were: {[cls for cls, instance, recipient in subs]}")
return return
try: try:
actual_zephyr_subs = [cls for (cls, _, _) in zephyr._z.getSubscriptions()] nsubs = c_int()
except OSError: zephyr_ctypes.check(zephyr_ctypes.ZRetrieveSubscriptions(0, byref(nsubs)))
zsubs = (zephyr_ctypes.ZSubscription_t * nsubs.value)()
zephyr_ctypes.check(zephyr_ctypes.ZGetSubscriptions(zsubs, byref(nsubs)))
actual_zephyr_subs = {zsub.zsub_class.decode() for zsub in zsubs}
except zephyr_ctypes.ZephyrError:
logger.exception("Error getting current Zephyr subscriptions") logger.exception("Error getting current Zephyr subscriptions")
# Don't add anything to current_zephyr_subs so that we'll # Don't add anything to current_zephyr_subs so that we'll
# retry the next time we check for streams to subscribe to # retry the next time we check for streams to subscribe to
# (within 15 seconds). # (within 15 seconds).
return return
finally:
zephyr_ctypes.ZFlushSubscriptions()
for (cls, instance, recipient) in subs: for (cls, instance, recipient) in subs:
if cls not in actual_zephyr_subs: if cls not in actual_zephyr_subs:
logger.error(f"Zephyr failed to subscribe us to {cls}; will retry") logger.error(f"Zephyr failed to subscribe us to {cls}; will retry")
try:
# We'll retry automatically when we next check for # We'll retry automatically when we next check for
# streams to subscribe to (within 15 seconds), but # streams to subscribe to (within 15 seconds), but
# it's worth doing 1 retry immediately to avoid # it's worth doing 1 retry immediately to avoid
# missing 15 seconds of messages on the affected # missing 15 seconds of messages on the affected
# classes # classes
zephyr._z.sub(cls, instance, recipient) zephyr_ctypes.ZSubscribeTo(
except OSError: (zephyr_ctypes.ZSubscription_t * 1)(
pass zephyr_ctypes.ZSubscription_t(
zsub_class=cls.encode(),
zsub_classinst=instance.encode(),
zsub_recipient=recipient.encode(),
)
),
1,
0,
)
else: else:
current_zephyr_subs.add(cls) current_zephyr_subs.add(cls)
@ -246,8 +291,8 @@ def maybe_restart_mirroring_script() -> None:
logger.warning("zephyr mirroring script has been updated; restarting...") logger.warning("zephyr mirroring script has been updated; restarting...")
maybe_kill_child() maybe_kill_child()
try: try:
zephyr._z.cancelSubs() zephyr_ctypes.check(zephyr_ctypes.ZCancelSubscriptions(0))
except OSError: except zephyr_ctypes.ZephyrError:
# We don't care whether we failed to cancel subs properly, but we should log it # We don't care whether we failed to cancel subs properly, but we should log it
logger.exception("") logger.exception("")
backoff = RandomExponentialBackoff( backoff = RandomExponentialBackoff(
@ -263,27 +308,29 @@ def maybe_restart_mirroring_script() -> None:
raise Exception("Failed to reload too many times, aborting!") raise Exception("Failed to reload too many times, aborting!")
def process_loop(log: Optional[IO[str]]) -> NoReturn: def process_loop(zulip_queue: "Queue[ZephyrDict]", log: Optional[IO[str]]) -> NoReturn:
restart_check_count = 0 restart_check_count = 0
last_check_time = time.time() last_check_time = time.time()
recieve_backoff = RandomExponentialBackoff() recieve_backoff = RandomExponentialBackoff()
while True: while True:
select.select([zephyr._z.getFD()], [], [], 15) select.select([zephyr_ctypes.ZGetFD()], [], [], 15)
try: try:
process_backoff = RandomExponentialBackoff() process_backoff = RandomExponentialBackoff()
# Fetch notices from the queue until its empty # Fetch notices from the queue until its empty
while True: while zephyr_ctypes.ZPending() != 0:
notice = zephyr.receive(block=False) notice = zephyr_ctypes.ZNotice_t()
recieve_backoff.succeed() sender = zephyr_ctypes.sockaddr_in()
if notice is None: zephyr_ctypes.check(zephyr_ctypes.ZReceiveNotice(byref(notice), byref(sender)))
break
try: try:
process_notice(notice, log) recieve_backoff.succeed()
process_notice(notice, zulip_queue, log)
process_backoff.succeed() process_backoff.succeed()
except Exception: except zephyr_ctypes.ZephyrError:
logger.exception("Error relaying zephyr:") logger.exception("Error relaying zephyr:")
process_backoff.fail() process_backoff.fail()
except Exception: finally:
zephyr_ctypes.ZFreeNotice(byref(notice))
except zephyr_ctypes.ZephyrError:
logger.exception("Error checking for new zephyrs:") logger.exception("Error checking for new zephyrs:")
recieve_backoff.fail() recieve_backoff.fail()
continue continue
@ -395,38 +442,47 @@ def decrypt_zephyr(zephyr_class: str, instance: str, body: str) -> str:
return decrypted return decrypted
def process_notice(notice: "zephyr.ZNotice", log: Optional[IO[str]]) -> None: def process_notice(
assert notice.sender is not None notice: zephyr_ctypes.ZNotice_t, zulip_queue: "Queue[ZephyrDict]", log: Optional[IO[str]]
(zsig, body) = parse_zephyr_body(notice.message, notice.format) ) -> None:
assert notice.z_sender is not None
(zsig, body) = parse_zephyr_body(
notice.z_message[: notice.z_message_len].decode(errors="replace"),
notice.z_default_format.decode(errors="replace"),
)
is_personal = False is_personal = False
is_huddle = False is_huddle = False
if notice.opcode == "PING": if notice.z_opcode == b"PING":
# skip PING messages # skip PING messages
return return
zephyr_class = notice.cls.lower() zephyr_class = notice.z_class.decode()
zephyr_instance = notice.z_class_inst.decode()
zephyr_sender = notice.z_sender.decode()
if zephyr_class == options.nagios_class: if zephyr_class.lower() == options.nagios_class:
# Mark that we got the message and proceed # Mark that we got the message and proceed
with open(options.nagios_path, "w") as f: with open(options.nagios_path, "w") as f:
f.write("0\n") f.write("0\n")
return return
if notice.recipient != "": if notice.z_recipient != b"":
is_personal = True is_personal = True
# Drop messages not to the listed subscriptions # Drop messages not to the listed subscriptions
if is_personal and not options.forward_personals: if is_personal and not options.forward_personals:
return return
if (zephyr_class not in current_zephyr_subs) and not is_personal: if (zephyr_class.lower() not in current_zephyr_subs) and not is_personal:
logger.debug(f"Skipping ... {zephyr_class}/{notice.instance}/{is_personal}") logger.debug(f"Skipping ... {zephyr_class}/{zephyr_instance}/{is_personal}")
return return
if notice.format.startswith("Zephyr error: See") or notice.format.endswith("@(@color(blue))"): if notice.z_default_format.startswith(b"Zephyr error: See") or notice.z_default_format.endswith(
b"@(@color(blue))"
):
logger.debug("Skipping message we got from Zulip!") logger.debug("Skipping message we got from Zulip!")
return return
if ( if (
zephyr_class == "mail" zephyr_class.lower() == "mail"
and notice.instance.lower() == "inbox" and zephyr_instance.lower() == "inbox"
and is_personal and is_personal
and not options.forward_mail_zephyrs and not options.forward_mail_zephyrs
): ):
@ -440,21 +496,21 @@ def process_notice(notice: "zephyr.ZNotice", log: Optional[IO[str]]) -> None:
huddle_recipients = [ huddle_recipients = [
to_zulip_username(x.strip()) for x in body.split("\n")[0][4:].split() to_zulip_username(x.strip()) for x in body.split("\n")[0][4:].split()
] ]
if notice.sender not in huddle_recipients: if zephyr_sender not in huddle_recipients:
huddle_recipients.append(to_zulip_username(notice.sender)) huddle_recipients.append(to_zulip_username(zephyr_sender))
body = body.split("\n", 1)[1] body = body.split("\n", 1)[1]
if ( if (
options.forward_class_messages options.forward_class_messages
and notice.opcode is not None and notice.z_opcode is not None
and notice.opcode.lower() == "crypt" and notice.z_opcode.lower() == b"crypt"
): ):
body = decrypt_zephyr(zephyr_class, notice.instance.lower(), body) body = decrypt_zephyr(zephyr_class.lower(), zephyr_instance.lower(), body)
zeph: ZephyrDict zeph: ZephyrDict
zeph = { zeph = {
"time": str(notice.time), "time": str(notice.z_time.tv_sec + notice.z_time.tv_usec / 1e6),
"sender": notice.sender, "sender": zephyr_sender,
"zsig": zsig, # logged here but not used by app "zsig": zsig, # logged here but not used by app
"content": body, "content": body,
} }
@ -462,46 +518,47 @@ def process_notice(notice: "zephyr.ZNotice", log: Optional[IO[str]]) -> None:
zeph["type"] = "private" zeph["type"] = "private"
zeph["recipient"] = huddle_recipients zeph["recipient"] = huddle_recipients
elif is_personal: elif is_personal:
assert notice.recipient is not None assert notice.z_recipient is not None
zeph["type"] = "private" zeph["type"] = "private"
zeph["recipient"] = to_zulip_username(notice.recipient) zeph["recipient"] = to_zulip_username(notice.z_recipient.decode())
else: else:
zeph["type"] = "stream" zeph["type"] = "stream"
zeph["stream"] = zephyr_class zeph["stream"] = zephyr_class.lower()
if notice.instance.strip() != "": if zephyr_instance.strip() != "":
zeph["subject"] = notice.instance zeph["subject"] = zephyr_instance
else: else:
zeph["subject"] = f'(instance "{notice.instance}")' zeph["subject"] = f'(instance "{zephyr_instance}")'
# Add instances in for instanced personals # Add instances in for instanced personals
if is_personal: if is_personal:
if notice.cls.lower() != "message" and notice.instance.lower() != "personal": if zephyr_class.lower() != "message" and zephyr_instance.lower() != "personal":
heading = f"[-c {notice.cls} -i {notice.instance}]\n" heading = f"[-c {zephyr_class} -i {zephyr_instance}]\n"
elif notice.cls.lower() != "message": elif zephyr_class.lower() != "message":
heading = f"[-c {notice.cls}]\n" heading = f"[-c {zephyr_class}]\n"
elif notice.instance.lower() != "personal": elif zephyr_instance.lower() != "personal":
heading = f"[-i {notice.instance}]\n" heading = f"[-i {zephyr_instance}]\n"
else: else:
heading = "" heading = ""
zeph["content"] = heading + zeph["content"] zeph["content"] = heading + zeph["content"]
logger.info(f"Received a message on {zephyr_class}/{notice.instance} from {notice.sender}...") logger.info(f"Received a message on {zephyr_class}/{zephyr_instance} from {zephyr_sender}...")
if log is not None: if log is not None:
log.write(json.dumps(zeph) + "\n") log.write(json.dumps(zeph) + "\n")
log.flush() log.flush()
if os.fork() == 0: zulip_queue.put(zeph)
global CURRENT_STATE
CURRENT_STATE = States.ChildSending
# Actually send the message in a child process, to avoid blocking. def send_zulip_worker(zulip_queue: "Queue[ZephyrDict]", zulip_client: zulip.Client) -> None:
while True:
zeph = zulip_queue.get()
try: try:
res = send_zulip(zeph) res = send_zulip(zulip_client, zeph)
if res.get("result") != "success": if res.get("result") != "success":
logger.error(f"Error relaying zephyr:\n{zeph}\n{res}") logger.error(f"Error relaying zephyr:\n{zeph}\n{res}")
except Exception: except Exception:
logger.exception("Error relaying zephyr:") logger.exception("Error relaying zephyr:")
finally: zulip_queue.task_done()
os._exit(0)
def quit_failed_initialization(message: str) -> str: def quit_failed_initialization(message: str) -> str:
@ -514,12 +571,14 @@ def zephyr_init_autoretry() -> None:
backoff = zulip.RandomExponentialBackoff() backoff = zulip.RandomExponentialBackoff()
while backoff.keep_going(): while backoff.keep_going():
try: try:
# zephyr.init() tries to clear old subscriptions, and thus # ZCancelSubscriptions sometimes gets a SERVNAK from the server
# sometimes gets a SERVNAK from the server zephyr_ctypes.check(zephyr_ctypes.ZInitialize())
zephyr.init() zephyr_port = c_ushort()
zephyr_ctypes.check(zephyr_ctypes.ZOpenPort(byref(zephyr_port)))
zephyr_ctypes.check(zephyr_ctypes.ZCancelSubscriptions(0))
backoff.succeed() backoff.succeed()
return return
except OSError: except zephyr_ctypes.ZephyrError:
logger.exception("Error initializing Zephyr library (retrying). Traceback:") logger.exception("Error initializing Zephyr library (retrying). Traceback:")
backoff.fail() backoff.fail()
@ -532,11 +591,10 @@ def zephyr_load_session_autoretry(session_path: str) -> None:
try: try:
with open(session_path, "rb") as f: with open(session_path, "rb") as f:
session = f.read() session = f.read()
zephyr._z.initialize() zephyr_ctypes.check(zephyr_ctypes.ZInitialize())
zephyr._z.load_session(session) zephyr_ctypes.check(zephyr_ctypes.ZLoadSession(session, len(session)))
zephyr.__inited = True
return return
except OSError: except zephyr_ctypes.ZephyrError:
logger.exception("Error loading saved Zephyr session (retrying). Traceback:") logger.exception("Error loading saved Zephyr session (retrying). Traceback:")
backoff.fail() backoff.fail()
@ -544,13 +602,26 @@ def zephyr_load_session_autoretry(session_path: str) -> None:
def zephyr_subscribe_autoretry(sub: Tuple[str, str, str]) -> None: def zephyr_subscribe_autoretry(sub: Tuple[str, str, str]) -> None:
cls, instance, recipient = sub
backoff = zulip.RandomExponentialBackoff() backoff = zulip.RandomExponentialBackoff()
while backoff.keep_going(): while backoff.keep_going():
try: try:
zephyr.Subscriptions().add(sub) zephyr_ctypes.check(
zephyr_ctypes.ZSubscribeTo(
(zephyr_ctypes.ZSubscription_t * 1)(
zephyr_ctypes.ZSubscription_t(
zsub_class=cls.encode(),
zsub_classinst=instance.encode(),
zsub_recipient=recipient.encode(),
)
),
1,
0,
)
)
backoff.succeed() backoff.succeed()
return return
except OSError: except zephyr_ctypes.ZephyrError:
# Probably a SERVNAK from the zephyr server, but log the # Probably a SERVNAK from the zephyr server, but log the
# traceback just in case it's something else # traceback just in case it's something else
logger.exception("Error subscribing to personals (retrying). Traceback:") logger.exception("Error subscribing to personals (retrying). Traceback:")
@ -560,6 +631,8 @@ def zephyr_subscribe_autoretry(sub: Tuple[str, str, str]) -> None:
def zephyr_to_zulip(options: optparse.Values) -> None: def zephyr_to_zulip(options: optparse.Values) -> None:
zulip_client = make_zulip_client()
if options.use_sessions and os.path.exists(options.session_path): if options.use_sessions and os.path.exists(options.session_path):
logger.info("Loading old session") logger.info("Loading old session")
zephyr_load_session_autoretry(options.session_path) zephyr_load_session_autoretry(options.session_path)
@ -575,8 +648,14 @@ def zephyr_to_zulip(options: optparse.Values) -> None:
if options.nagios_class: if options.nagios_class:
zephyr_subscribe_autoretry((options.nagios_class, "*", "*")) zephyr_subscribe_autoretry((options.nagios_class, "*", "*"))
if options.use_sessions: if options.use_sessions:
buf = POINTER(c_char)()
buf_len = c_int()
zephyr_ctypes.check(zephyr_ctypes.ZDumpSession(byref(buf), byref(buf_len)))
try:
with open(options.session_path, "wb") as f: with open(options.session_path, "wb") as f:
f.write(zephyr._z.dump_session()) f.write(buf[: buf_len.value]) # type: ignore[arg-type] # bytes, but mypy infers List[c_char]
finally:
zephyr_ctypes.free(buf)
if options.logs_to_resend is not None: if options.logs_to_resend is not None:
with open(options.logs_to_resend) as log: with open(options.logs_to_resend) as log:
@ -593,18 +672,22 @@ def zephyr_to_zulip(options: optparse.Values) -> None:
"sending saved message to %s from %s..." "sending saved message to %s from %s..."
% (zeph.get("stream", zeph.get("recipient")), zeph["sender"]) % (zeph.get("stream", zeph.get("recipient")), zeph["sender"])
) )
send_zulip(zeph) send_zulip(zulip_client, zeph)
except Exception: except Exception:
logger.exception("Could not send saved zephyr:") logger.exception("Could not send saved zephyr:")
time.sleep(2) time.sleep(2)
logger.info("Successfully initialized; Starting receive loop.") logger.info("Successfully initialized; Starting receive loop.")
# Actually send the messages in a thread, to avoid blocking.
zulip_queue: "Queue[ZephyrDict]" = Queue()
Thread(target=lambda: send_zulip_worker(zulip_queue, zulip_client)).start()
if options.resend_log_path is not None: if options.resend_log_path is not None:
with open(options.resend_log_path, "a") as log: with open(options.resend_log_path, "a") as log:
process_loop(log) process_loop(zulip_queue, log)
else: else:
process_loop(None) process_loop(zulip_queue, None)
def send_zephyr(zwrite_args: List[str], content: str) -> Tuple[int, str]: def send_zephyr(zwrite_args: List[str], content: str) -> Tuple[int, str]:
@ -675,7 +758,7 @@ def zcrypt_encrypt_content(zephyr_class: str, instance: str, content: str) -> Op
return encrypted return encrypted
def forward_to_zephyr(message: Dict[str, Any]) -> None: def forward_to_zephyr(message: Dict[str, Any], zulip_client: zulip.Client) -> None:
# 'Any' can be of any type of text # 'Any' can be of any type of text
support_heading = "Hi there! This is an automated message from Zulip." support_heading = "Hi there! This is an automated message from Zulip."
support_closing = """If you have any questions, please be in touch through the \ support_closing = """If you have any questions, please be in touch through the \
@ -749,6 +832,7 @@ Feedback button or at support@zulip.com."""
result = zcrypt_encrypt_content(zephyr_class, instance, wrapped_content) result = zcrypt_encrypt_content(zephyr_class, instance, wrapped_content)
if result is None: if result is None:
send_error_zulip( send_error_zulip(
zulip_client,
"""%s """%s
Your Zulip-Zephyr mirror bot was unable to forward that last message \ Your Zulip-Zephyr mirror bot was unable to forward that last message \
@ -758,7 +842,7 @@ key (perhaps because your AFS tokens expired). That means that while \
Zulip users (like you) received it, Zephyr users did not. Zulip users (like you) received it, Zephyr users did not.
%s""" %s"""
% (support_heading, support_closing) % (support_heading, support_closing),
) )
return return
@ -775,6 +859,7 @@ Zulip users (like you) received it, Zephyr users did not.
return return
elif code == 0: elif code == 0:
send_error_zulip( send_error_zulip(
zulip_client,
"""%s """%s
Your last message was successfully mirrored to zephyr, but zwrite \ Your last message was successfully mirrored to zephyr, but zwrite \
@ -783,7 +868,7 @@ returned the following warning:
%s %s
%s""" %s"""
% (support_heading, stderr, support_closing) % (support_heading, stderr, support_closing),
) )
return return
elif code != 0 and ( elif code != 0 and (
@ -797,6 +882,7 @@ returned the following warning:
if options.ignore_expired_tickets: if options.ignore_expired_tickets:
return return
send_error_zulip( send_error_zulip(
zulip_client,
"""%s """%s
Your last message was forwarded from Zulip to Zephyr unauthenticated, \ Your last message was forwarded from Zulip to Zephyr unauthenticated, \
@ -806,7 +892,7 @@ are running the Zulip-Zephyr mirroring bot, so we can send \
authenticated Zephyr messages for you again. authenticated Zephyr messages for you again.
%s""" %s"""
% (support_heading, support_closing) % (support_heading, support_closing),
) )
return return
@ -814,6 +900,7 @@ authenticated Zephyr messages for you again.
# probably because the recipient isn't subscribed to personals, # probably because the recipient isn't subscribed to personals,
# but regardless, we should just notify the user. # but regardless, we should just notify the user.
send_error_zulip( send_error_zulip(
zulip_client,
"""%s """%s
Your Zulip-Zephyr mirror bot was unable to forward that last message \ Your Zulip-Zephyr mirror bot was unable to forward that last message \
@ -823,12 +910,12 @@ received it, Zephyr users did not. The error message from zwrite was:
%s %s
%s""" %s"""
% (support_heading, stderr, support_closing) % (support_heading, stderr, support_closing),
) )
return return
def maybe_forward_to_zephyr(message: Dict[str, Any]) -> None: def maybe_forward_to_zephyr(message: Dict[str, Any], zulip_client: zulip.Client) -> None:
# The key string can be used to direct any type of text. # The key string can be used to direct any type of text.
if message["sender_email"] == zulip_account_email: if message["sender_email"] == zulip_account_email:
if not ( if not (
@ -851,7 +938,7 @@ def maybe_forward_to_zephyr(message: Dict[str, Any]) -> None:
) )
return return
try: try:
forward_to_zephyr(message) forward_to_zephyr(message, zulip_client)
except Exception: except Exception:
# Don't let an exception forwarding one message crash the # Don't let an exception forwarding one message crash the
# whole process # whole process
@ -859,12 +946,16 @@ def maybe_forward_to_zephyr(message: Dict[str, Any]) -> None:
def zulip_to_zephyr(options: optparse.Values) -> NoReturn: def zulip_to_zephyr(options: optparse.Values) -> NoReturn:
zulip_client = make_zulip_client()
# Sync messages from zulip to zephyr # Sync messages from zulip to zephyr
logger.info("Starting syncing messages.") logger.info("Starting syncing messages.")
backoff = RandomExponentialBackoff(timeout_success_equivalent=120) backoff = RandomExponentialBackoff(timeout_success_equivalent=120)
while True: while True:
try: try:
zulip_client.call_on_each_message(maybe_forward_to_zephyr) zulip_client.call_on_each_message(
lambda message: maybe_forward_to_zephyr(message, zulip_client)
)
except Exception: except Exception:
logger.exception("Error syncing messages:") logger.exception("Error syncing messages:")
backoff.fail() backoff.fail()
@ -886,6 +977,8 @@ def subscribed_to_mail_messages() -> bool:
def add_zulip_subscriptions(verbose: bool) -> None: def add_zulip_subscriptions(verbose: bool) -> None:
zulip_client = make_zulip_client()
zephyr_subscriptions = set() zephyr_subscriptions = set()
skipped = set() skipped = set()
for (cls, instance, recipient) in parse_zephyr_subs(verbose=verbose): for (cls, instance, recipient) in parse_zephyr_subs(verbose=verbose):
@ -1146,14 +1239,14 @@ def parse_args() -> Tuple[optparse.Values, List[str]]:
def die_gracefully(signal: int, frame: FrameType) -> None: def die_gracefully(signal: int, frame: FrameType) -> None:
if CURRENT_STATE == States.ZulipToZephyr or CURRENT_STATE == States.ChildSending: if CURRENT_STATE == States.ZulipToZephyr:
# this is a child process, so we want os._exit (no clean-up necessary) # this is a child process, so we want os._exit (no clean-up necessary)
os._exit(1) os._exit(1)
if CURRENT_STATE == States.ZephyrToZulip and not options.use_sessions: if CURRENT_STATE == States.ZephyrToZulip and not options.use_sessions:
try: try:
# zephyr=>zulip processes may have added subs, so run cancelSubs # zephyr=>zulip processes may have added subs, so run ZCancelSubscriptions
zephyr._z.cancelSubs() zephyr_ctypes.check(zephyr_ctypes.ZCancelSubscriptions(0))
except OSError: except OSError:
# We don't care whether we failed to cancel subs properly, but we should log it # We don't care whether we failed to cancel subs properly, but we should log it
logger.exception("") logger.exception("")
@ -1207,15 +1300,6 @@ or specify the --api-key-file option."""
sys.exit(1) sys.exit(1)
zulip_account_email = options.user + "@mit.edu" zulip_account_email = options.user + "@mit.edu"
import zulip
zulip_client = zulip.Client(
email=zulip_account_email,
api_key=api_key,
verbose=True,
client="zephyr_mirror",
site=options.site,
)
start_time = time.time() start_time = time.time()
@ -1274,8 +1358,6 @@ or specify the --api-key-file option."""
child_pid = None child_pid = None
CURRENT_STATE = States.ZephyrToZulip CURRENT_STATE = States.ZephyrToZulip
import zephyr
logger_name = "zephyr=>zulip" logger_name = "zephyr=>zulip"
if options.shard is not None: if options.shard is not None:
logger_name += f"({options.shard})" logger_name += f"({options.shard})"

View file

@ -42,12 +42,12 @@ setup(
"License :: OSI Approved :: Apache Software License", "License :: OSI Approved :: Apache Software License",
"Topic :: Communications :: Chat", "Topic :: Communications :: Chat",
"Programming Language :: Python :: 3", "Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7", "Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
], ],
python_requires=">=3.6", python_requires=">=3.7",
url="https://www.zulip.org/", url="https://www.zulip.org/",
project_urls={ project_urls={
"Source": "https://github.com/zulip/python-zulip-api/", "Source": "https://github.com/zulip/python-zulip-api/",

View file

@ -34,12 +34,12 @@ setup(
"License :: OSI Approved :: Apache Software License", "License :: OSI Approved :: Apache Software License",
"Topic :: Communications :: Chat", "Topic :: Communications :: Chat",
"Programming Language :: Python :: 3", "Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7", "Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
], ],
python_requires=">=3.6", python_requires=">=3.7",
url="https://www.zulip.org/", url="https://www.zulip.org/",
project_urls={ project_urls={
"Source": "https://github.com/zulip/python-zulip-api/", "Source": "https://github.com/zulip/python-zulip-api/",

View file

@ -1,2 +1 @@
python-chess==0.31.* ; python_version < '3.7' chess==1.*
chess==1.* ; python_version >= '3.7'

View file

@ -22,12 +22,12 @@ setup(
"License :: OSI Approved :: Apache Software License", "License :: OSI Approved :: Apache Software License",
"Topic :: Communications :: Chat", "Topic :: Communications :: Chat",
"Programming Language :: Python :: 3", "Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7", "Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
], ],
python_requires=">=3.6", python_requires=">=3.7",
url="https://www.zulip.org/", url="https://www.zulip.org/",
project_urls={ project_urls={
"Source": "https://github.com/zulip/python-zulip-api/", "Source": "https://github.com/zulip/python-zulip-api/",