Compare commits

..

No commits in common. "02586f1d348dbcab3bc8d30b805fffd48a524e18" and "63c259b2bc3592e49459df0f6f795bb25a65f030" have entirely different histories.

19 changed files with 212 additions and 558 deletions

View file

@ -17,24 +17,33 @@ jobs:
fail-fast: false fail-fast: false
matrix: matrix:
include: include:
# Focal ships with Python 3.8.10. # Base images are built using `tools/ci/Dockerfile.prod.template`.
- docker_image: zulip/ci:focal # The comments at the top explain how to build and upload these images.
name: Ubuntu 20.04 (Python 3.8, backend) # Bionic ships with Python 3.6.
os: focal - docker_image: zulip/ci:bionic
legacy_client_interface: "3" name: Ubuntu 18.04 Bionic (Python 3.6, backend)
os: bionic
is_bionic: true
include_frontend_tests: false
# Configure this test to run with the Zulip 3.2 release.
legacy_client_interface: 3
server_version: refs/tags/3.2 server_version: refs/tags/3.2
# Focal ships with Python 3.8.2.
- docker_image: zulip/ci:focal
name: Ubuntu 20.04 Focal (Python 3.8, backend)
os: focal
is_focal: true
include_frontend_tests: false
legacy_client_interface: 4
server_version: refs/tags/4.0
# Bullseye ships with Python 3.9.2. # Bullseye ships with Python 3.9.2.
- docker_image: zulip/ci:bullseye - docker_image: zulip/ci:bullseye
name: Debian 11 (Python 3.9, backend) name: Debian 11 Bullseye (Python 3.9, backend)
os: bullseye os: bullseye
legacy_client_interface: "4" is_bullseye: true
include_frontend_tests: false
legacy_client_interface: 4
server_version: refs/tags/4.0 server_version: refs/tags/4.0
# Ubuntu 22.04 ships with Python 3.10.6.
- docker_image: zulip/ci:jammy
name: Ubuntu 22.04 (Python 3.10, backend)
os: jammy
legacy_client_interface: "6"
server_version: refs/tags/6.0
runs-on: ubuntu-latest runs-on: ubuntu-latest
name: ${{ matrix.name }} (Zulip ${{matrix.server_version}}) name: ${{ matrix.name }} (Zulip ${{matrix.server_version}})
@ -49,18 +58,25 @@ jobs:
HOME: /home/github/ HOME: /home/github/
steps: steps:
- name: "Check out python-zulip-api" - name: 'Checkout python-zulip-api'
uses: actions/checkout@v2 uses: actions/checkout@v2
with: with:
path: api path: api
- name: "Check out Zulip server ${{ matrix.server_version }}" - name: 'Checkout Zulip server ${{ matrix.server_version }}'
uses: actions/checkout@v2 uses: actions/checkout@v2
with: with:
repository: zulip/zulip repository: zulip/zulip
ref: ${{ matrix.server_version }} ref: ${{ matrix.server_version }}
path: server path: server
- name: Do Bionic hack
if: ${{ matrix.is_bionic }}
run: |
# Temporary hack till `sudo service redis-server start` gets fixes in Bionic. See
# https://chat.zulip.org/#narrow/stream/3-backend/topic/Ubuntu.20bionic.20CircleCI
sudo sed -i '/^bind/s/bind.*/bind 0.0.0.0/' /etc/redis/redis.conf
- name: Install dependencies - name: Install dependencies
run: | run: |
cd server cd server

View file

@ -13,10 +13,10 @@ jobs:
steps: steps:
- uses: actions/checkout@v2 - uses: actions/checkout@v2
- name: Set up Python 3.7 - name: Setup Python 3.6
uses: actions/setup-python@v2 uses: actions/setup-python@v2
with: with:
python-version: "3.7" python-version: 3.6
- name: Install dependencies - name: Install dependencies
run: tools/provision --force run: tools/provision --force
@ -32,12 +32,15 @@ jobs:
fail-fast: false fail-fast: false
matrix: matrix:
os: [ubuntu-latest, windows-latest] os: [ubuntu-latest, windows-latest]
python-version: ["3.7", "3.8", "3.9", "3.10"] python-version: [3.6, 3.7, 3.8, 3.9]
exclude:
- os: windows-latest
python-version: 3.6 # cryptography install fails on Windows with python 3.6 since pip is quite old.
steps: steps:
- uses: actions/checkout@v2 - uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }} - name: Setup Python ${{ matrix.python-version }}
uses: actions/setup-python@v2 uses: actions/setup-python@v2
with: with:
python-version: ${{ matrix.python-version }} python-version: ${{ matrix.python-version }}

View file

@ -1,4 +1,3 @@
Aman Agrawal <f2016561@pilani.bits-pilani.ac.in>
Anders Kaseorg <anders@zulip.com> <anders@zulipchat.com> Anders Kaseorg <anders@zulip.com> <anders@zulipchat.com>
Anders Kaseorg <anders@zulip.com> <andersk@mit.edu> Anders Kaseorg <anders@zulip.com> <andersk@mit.edu>
Rishi Gupta <rishig@zulip.com> <rishig@zulipchat.com> Rishi Gupta <rishig@zulip.com> <rishig@zulipchat.com>
@ -8,4 +7,3 @@ Steve Howell <showell@zulip.com> <steve@zulip.com>
Tim Abbott <tabbott@zulip.com> <tabbott@humbughq.com> Tim Abbott <tabbott@zulip.com> <tabbott@humbughq.com>
Tim Abbott <tabbott@zulip.com> <tabbott@mit.edu> Tim Abbott <tabbott@zulip.com> <tabbott@mit.edu>
Tim Abbott <tabbott@zulip.com> <tabbott@zulipchat.com> Tim Abbott <tabbott@zulip.com> <tabbott@zulipchat.com>
Zixuan James Li <p359101898@gmail.com> <359101898@qq.com>

View file

@ -1,14 +1,8 @@
[tool.black] [tool.black]
line-length = 100 line-length = 100
target-version = ["py37"] target-version = ["py36"]
[tool.isort] [tool.isort]
src_paths = [ src_paths = ["tools", "zulip", "zulip_bots", "zulip_botserver"]
"tools",
"zulip",
"zulip/integrations/zephyr",
"zulip_bots",
"zulip_botserver",
]
profile = "black" profile = "black"
line_length = 100 line_length = 100

View file

@ -3,7 +3,7 @@
The [Zulip API](https://zulip.com/api) Python bindings require the The [Zulip API](https://zulip.com/api) Python bindings require the
following dependencies: following dependencies:
* **Python (version >= 3.7)** * **Python (version >= 3.6)**
* requests (version >= 0.12.1) * requests (version >= 0.12.1)
**Note**: If you'd like to use the Zulip bindings with Python 2, we **Note**: If you'd like to use the Zulip bindings with Python 2, we

View file

@ -14,6 +14,9 @@ from concurrent.futures import ThreadPoolExecutor
from io import BytesIO from io import BytesIO
from typing import Any, Dict, List, Match, Optional, Set, Tuple, Type, Union from typing import Any, Dict, List, Match, Optional, Set, Tuple, Type, Union
if os.name != "nt":
import magic
import magic.compat
import nio import nio
from nio.responses import ( from nio.responses import (
DownloadError, DownloadError,
@ -349,15 +352,17 @@ class ZulipToMatrix:
if result["result"] != "success": if result["result"] != "success":
success = False success = False
continue continue
try: try:
with urllib.request.urlopen(self.server_url + result["url"]) as response: file_content: bytes = urllib.request.urlopen(self.server_url + result["url"]).read()
file_content: bytes = response.read()
mimetype: str = response.headers.get_content_type()
except Exception: except Exception:
success = False success = False
continue continue
mimetype: str
if os.name == "nt":
mimetype = "m.file"
else:
mimetype = magic.from_buffer(file_content, mime=True)
filename: str = file.split("/")[-1] filename: str = file.split("/")[-1]
response, _ = await self.matrix_client.upload( response, _ = await self.matrix_client.upload(

View file

@ -1 +1,3 @@
matrix-nio matrix-nio
python-magic
python-magic-bin; platform_system == "Windows"

View file

@ -41,7 +41,7 @@ topic = matrix
ZULIP_MESSAGE_TEMPLATE: str = "**{username}** [{uid}]: {message}" ZULIP_MESSAGE_TEMPLATE: str = "**{username}** [{uid}]: {message}"
# For Python 3.7 compatibility. # For Python 3.6 compatibility.
# (Since 3.8, there is unittest.IsolatedAsyncioTestCase!) # (Since 3.8, there is unittest.IsolatedAsyncioTestCase!)
# source: https://stackoverflow.com/a/46324983 # source: https://stackoverflow.com/a/46324983
def async_test(coro: Callable[..., Awaitable[Any]]) -> Callable[..., Any]: def async_test(coro: Callable[..., Awaitable[Any]]) -> Callable[..., Any]:

View file

@ -3,20 +3,12 @@ config = {
"email": "zulip-bot@email.com", "email": "zulip-bot@email.com",
"api_key": "put api key here", "api_key": "put api key here",
"site": "https://chat.zulip.org", "site": "https://chat.zulip.org",
"stream": "test here",
"topic": "<- slack-bridge",
}, },
"slack": { "slack": {
"username": "slack_username", "username": "slack_username",
"token": "xoxb-your-slack-token", "token": "xoxb-your-slack-token",
}, "channel": "C5Z5N7R8A -- must be channel id",
# Mapping between Slack channels and Zulip stream-topic's.
# You can specify multiple pairs.
"channel_mapping": {
# Slack channel; must be channel ID
"C5Z5N7R8A": {
# Zulip stream
"stream": "test here",
# Zulip topic
"topic": "<- slack-bridge",
},
}, },
} }

View file

@ -5,7 +5,7 @@ import os
import sys import sys
import threading import threading
import traceback import traceback
from typing import Any, Callable, Dict, Optional, Tuple from typing import Any, Callable, Dict
import bridge_with_slack_config import bridge_with_slack_config
import slack_sdk import slack_sdk
@ -17,28 +17,18 @@ import zulip
ZULIP_MESSAGE_TEMPLATE = "**{username}**: {message}" ZULIP_MESSAGE_TEMPLATE = "**{username}**: {message}"
SLACK_MESSAGE_TEMPLATE = "<{username}> {message}" SLACK_MESSAGE_TEMPLATE = "<{username}> {message}"
StreamTopicT = Tuple[str, str]
def check_zulip_message_validity(msg: Dict[str, Any], config: Dict[str, Any]) -> bool:
def get_slack_channel_for_zulip_message(
msg: Dict[str, Any], zulip_to_slack_map: Dict[StreamTopicT, Any], bot_email: str
) -> Optional[str]:
is_a_stream = msg["type"] == "stream" is_a_stream = msg["type"] == "stream"
if not is_a_stream: in_the_specified_stream = msg["display_recipient"] == config["stream"]
return None at_the_specified_subject = msg["subject"] == config["topic"]
stream_name = msg["display_recipient"] # We do this to identify the messages generated from Matrix -> Zulip
topic_name = msg["subject"] # and we make sure we don't forward it again to the Matrix.
stream_topic: StreamTopicT = (stream_name, topic_name) not_from_zulip_bot = msg["sender_email"] != config["email"]
if stream_topic not in zulip_to_slack_map: if is_a_stream and not_from_zulip_bot and in_the_specified_stream and at_the_specified_subject:
return None return True
return False
# We do this to identify the messages generated from Slack -> Zulip
# and we make sure we don't forward it again to the Slack.
from_zulip_bot = msg["sender_email"] == bot_email
if from_zulip_bot:
return None
return zulip_to_slack_map[stream_topic]
class SlackBridge: class SlackBridge:
@ -47,17 +37,14 @@ class SlackBridge:
self.zulip_config = config["zulip"] self.zulip_config = config["zulip"]
self.slack_config = config["slack"] self.slack_config = config["slack"]
self.slack_to_zulip_map: Dict[str, Dict[str, str]] = config["channel_mapping"]
self.zulip_to_slack_map: Dict[StreamTopicT, str] = {
(z["stream"], z["topic"]): s for s, z in config["channel_mapping"].items()
}
# zulip-specific # zulip-specific
self.zulip_client = zulip.Client( self.zulip_client = zulip.Client(
email=self.zulip_config["email"], email=self.zulip_config["email"],
api_key=self.zulip_config["api_key"], api_key=self.zulip_config["api_key"],
site=self.zulip_config["site"], site=self.zulip_config["site"],
) )
self.zulip_stream = self.zulip_config["stream"]
self.zulip_subject = self.zulip_config["topic"]
# slack-specific # slack-specific
self.channel = self.slack_config["channel"] self.channel = self.slack_config["channel"]
@ -81,16 +68,14 @@ class SlackBridge:
def zulip_to_slack(self) -> Callable[[Dict[str, Any]], None]: def zulip_to_slack(self) -> Callable[[Dict[str, Any]], None]:
def _zulip_to_slack(msg: Dict[str, Any]) -> None: def _zulip_to_slack(msg: Dict[str, Any]) -> None:
slack_channel = get_slack_channel_for_zulip_message( message_valid = check_zulip_message_validity(msg, self.zulip_config)
msg, self.zulip_to_slack_map, self.zulip_config["email"] if message_valid:
)
if slack_channel is not None:
self.wrap_slack_mention_with_bracket(msg) self.wrap_slack_mention_with_bracket(msg)
slack_text = SLACK_MESSAGE_TEMPLATE.format( slack_text = SLACK_MESSAGE_TEMPLATE.format(
username=msg["sender_full_name"], message=msg["content"] username=msg["sender_full_name"], message=msg["content"]
) )
self.slack_webclient.chat_postMessage( self.slack_webclient.chat_postMessage(
channel=slack_channel, channel=self.channel,
text=slack_text, text=slack_text,
) )
@ -106,7 +91,7 @@ class SlackBridge:
@rtm.on("message") @rtm.on("message")
def slack_to_zulip(client: RTMClient, event: Dict[str, Any]) -> None: def slack_to_zulip(client: RTMClient, event: Dict[str, Any]) -> None:
if event["channel"] not in self.slack_to_zulip_map: if event["channel"] != self.channel:
return return
user_id = event["user"] user_id = event["user"]
user = self.slack_id_to_name[user_id] user = self.slack_id_to_name[user_id]
@ -115,12 +100,8 @@ class SlackBridge:
return return
self.replace_slack_id_with_name(event) self.replace_slack_id_with_name(event)
content = ZULIP_MESSAGE_TEMPLATE.format(username=user, message=event["text"]) content = ZULIP_MESSAGE_TEMPLATE.format(username=user, message=event["text"])
zulip_endpoint = self.slack_to_zulip_map[event["channel"]]
msg_data = dict( msg_data = dict(
type="stream", type="stream", to=self.zulip_stream, subject=self.zulip_subject, content=content
to=zulip_endpoint["stream"],
subject=zulip_endpoint["topic"],
content=content,
) )
self.zulip_client.send_message(msg_data) self.zulip_client.send_message(msg_data)
@ -137,16 +118,10 @@ if __name__ == "__main__":
sys.path.append(os.path.join(os.path.dirname(__file__), "..")) sys.path.append(os.path.join(os.path.dirname(__file__), ".."))
parser = argparse.ArgumentParser(usage=usage) parser = argparse.ArgumentParser(usage=usage)
config: Dict[str, Any] = bridge_with_slack_config.config
if "channel_mapping" not in config:
print(
'The key "channel_mapping" is not found in bridge_with_slack_config.py.\n'
"Your config file may be outdated."
)
exit(1)
print("Starting slack mirroring bot") print("Starting slack mirroring bot")
print("MAKE SURE THE BOT IS SUBSCRIBED TO THE RELEVANT ZULIP STREAM(S) & SLACK CHANNEL(S)!") print("MAKE SURE THE BOT IS SUBSCRIBED TO THE RELEVANT ZULIP STREAM")
config = bridge_with_slack_config.config
# We have to define rtm outside of SlackBridge because the rtm variable is used as a method decorator. # We have to define rtm outside of SlackBridge because the rtm variable is used as a method decorator.
rtm = RTMClient(token=config["slack"]["token"]) rtm = RTMClient(token=config["slack"]["token"])

View file

@ -6,10 +6,10 @@ import random
import subprocess import subprocess
import sys import sys
import time import time
from ctypes import byref, c_int, c_ushort
from typing import Dict, List, Set, Tuple from typing import Dict, List, Set, Tuple
import zephyr_ctypes import zephyr
import zulip import zulip
parser = optparse.OptionParser() parser = optparse.OptionParser()
@ -136,43 +136,9 @@ for (stream, test) in test_streams:
actually_subscribed = False actually_subscribed = False
for tries in range(10): for tries in range(10):
try: try:
zephyr_ctypes.check(zephyr_ctypes.ZInitialize()) zephyr.init()
zephyr_port = c_ushort() zephyr._z.subAll(zephyr_subs_to_add)
zephyr_ctypes.check(zephyr_ctypes.ZOpenPort(byref(zephyr_port))) zephyr_subs = zephyr._z.getSubscriptions()
zephyr_ctypes.check(zephyr_ctypes.ZCancelSubscriptions(0))
zephyr_ctypes.check(
zephyr_ctypes.ZSubscribeTo(
(zephyr_ctypes.ZSubscription_t * len(zephyr_subs_to_add))(
*(
zephyr_ctypes.ZSubscription_t(
zsub_class=cls.encode(),
zsub_classinst=instance.encode(),
zsub_recipient=recipient.encode(),
)
for cls, instance, recipient in zephyr_subs_to_add
)
),
len(zephyr_subs_to_add),
0,
)
)
try:
nsubs = c_int()
zephyr_ctypes.check(zephyr_ctypes.ZRetrieveSubscriptions(0, byref(nsubs)))
zsubs = (zephyr_ctypes.ZSubscription_t * nsubs.value)()
zephyr_ctypes.check(zephyr_ctypes.ZGetSubscriptions(zsubs, byref(nsubs)))
zephyr_subs = {
(
zsub.zsub_class.decode(),
zsub.zsub_classinst.decode(),
zsub.zsub_recipient.decode(),
)
for zsub in zsubs
}
finally:
zephyr_ctypes.ZFlushSubscriptions()
missing = 0 missing = 0
for elt in zephyr_subs_to_add: for elt in zephyr_subs_to_add:
@ -182,8 +148,8 @@ for tries in range(10):
if missing == 0: if missing == 0:
actually_subscribed = True actually_subscribed = True
break break
except zephyr_ctypes.ZephyrError as e: except OSError as e:
if e.code == zephyr_ctypes.ZERR_SERVNAK: if "SERVNAK received" in e.args:
logger.error("SERVNAK repeatedly received, punting rest of test") logger.error("SERVNAK repeatedly received, punting rest of test")
else: else:
logger.exception("Exception subscribing to zephyrs") logger.exception("Exception subscribing to zephyrs")
@ -219,16 +185,15 @@ notices = []
# receive queue with 30+ messages, which might result in messages # receive queue with 30+ messages, which might result in messages
# being dropped. # being dropped.
def receive_zephyrs() -> None: def receive_zephyrs() -> None:
while zephyr_ctypes.ZPending() != 0: while True:
notice = zephyr_ctypes.ZNotice_t()
sender = zephyr_ctypes.sockaddr_in()
try: try:
zephyr_ctypes.check(zephyr_ctypes.ZReceiveNotice(byref(notice), byref(sender))) notice = zephyr.receive(block=False)
except zephyr_ctypes.ZephyrError: except Exception:
logging.exception("Exception receiving zephyrs:") logging.exception("Exception receiving zephyrs:")
notice = None
if notice is None:
break break
if notice.z_opcode != b"": if notice.opcode != "":
zephyr_ctypes.ZFreeNotice(byref(notice))
continue continue
notices.append(notice) notices.append(notice)
@ -329,16 +294,10 @@ def process_keys(content_list: List[str]) -> Tuple[Dict[str, int], Set[str], Set
# The h_foo variables are about the messages we _received_ in Zulip # The h_foo variables are about the messages we _received_ in Zulip
# The z_foo variables are about the messages we _received_ in Zephyr # The z_foo variables are about the messages we _received_ in Zephyr
h_contents = [message["content"] for message in messages] h_contents = [message["content"] for message in messages]
z_contents = [ z_contents = [notice.message.split("\0")[1] for notice in notices]
notice.z_message[: notice.z_message_len].split(b"\0")[1].decode(errors="replace")
for notice in notices
]
(h_key_counts, h_missing_z, h_missing_h, h_duplicates, h_success) = process_keys(h_contents) (h_key_counts, h_missing_z, h_missing_h, h_duplicates, h_success) = process_keys(h_contents)
(z_key_counts, z_missing_z, z_missing_h, z_duplicates, z_success) = process_keys(z_contents) (z_key_counts, z_missing_z, z_missing_h, z_duplicates, z_success) = process_keys(z_contents)
for notice in notices:
zephyr_ctypes.ZFreeNotice(byref(notice))
if z_success and h_success: if z_success and h_success:
logger.info("Success!") logger.info("Success!")
print_status_and_exit(0) print_status_and_exit(0)

View file

@ -18,7 +18,7 @@ api_key_path = f"/home/zulip/api-keys/{program_name}"
open(api_key_path, "w").write(api_key + "\n") open(api_key_path, "w").write(api_key + "\n")
# Setup supervisord configuration # Setup supervisord configuration
supervisor_path = f"/etc/supervisor/conf.d/zmirror/{program_name}.conf" supervisor_path = f"/etc/supervisor/conf.d/zulip/{program_name}.conf"
template = os.path.join(os.path.dirname(__file__), "zmirror_private.conf.template") template = os.path.join(os.path.dirname(__file__), "zmirror_private.conf.template")
template_data = open(template).read() template_data = open(template).read()
session_path = f"/home/zulip/zephyr_sessions/{program_name}" session_path = f"/home/zulip/zephyr_sessions/{program_name}"

View file

@ -1,207 +0,0 @@
from ctypes import (
CDLL,
CFUNCTYPE,
POINTER,
Structure,
Union,
c_char,
c_char_p,
c_int,
c_long,
c_uint,
c_uint8,
c_uint16,
c_uint32,
c_ushort,
c_void_p,
)
libc = CDLL("libc.so.6")
com_err = CDLL("libcom_err.so.2")
libzephyr = CDLL("libzephyr.so.4")
# --- glibc/bits/sockaddr.h ---
sa_family_t = c_ushort
# --- glibc/sysdeps/unix/sysv/linux/bits/socket.h ---
class sockaddr(Structure):
_fields_ = [
("sa_family", sa_family_t),
("sa_data", c_char * 14),
]
# --- glibc/inet/netinet/in.h ---
in_port_t = c_uint16
in_addr_t = c_uint32
class in_addr(Structure):
_fields_ = [
("s_addr", in_addr_t),
]
class sockaddr_in(Structure):
_fields_ = [
("sin_family", sa_family_t),
("sin_port", in_port_t),
("sin_addr", in_addr),
("sin_zero", c_uint8 * 8),
]
class in6_addr(Structure):
_fields_ = [
("s6_addr", c_uint8 * 16),
]
class sockaddr_in6(Structure):
_fields_ = [
("sin6_family", sa_family_t),
("sin6_port", in_port_t),
("sin6_flowinfo", c_uint32),
("sin6_addr", in6_addr),
("sin6_scope_id", c_uint32),
]
# --- glibc/stdlib/stdlib.h ---
free = CFUNCTYPE(None, c_void_p)(("free", libc))
# --- e2fsprogs/lib/et/com_err.h ---
error_message = CFUNCTYPE(c_char_p, c_long)(("error_message", com_err))
# --- zephyr/h/zephyr/zephyr.h ---
Z_MAXOTHERFIELDS = 10
ZNotice_Kind_t = c_int
class _ZTimeval(Structure):
_fields_ = [
("tv_sec", c_int),
("tv_usec", c_int),
]
class ZUnique_Id_t(Structure):
_fields_ = [
("zuid_addr", in_addr),
("tv", _ZTimeval),
]
ZChecksum_t = c_uint
class _ZSenderSockaddr(Union):
_fields_ = [
("sa", sockaddr),
("ip4", sockaddr_in),
("ip6", sockaddr_in6),
]
class ZNotice_t(Structure):
_fields_ = [
("z_packet", c_char_p),
("z_version", c_char_p),
("z_kind", ZNotice_Kind_t),
("z_uid", ZUnique_Id_t),
("z_sender_sockaddr", _ZSenderSockaddr),
("z_time", _ZTimeval),
("z_port", c_ushort),
("z_charset", c_ushort),
("z_auth", c_int),
("z_checked_auth", c_int),
("z_authent_len", c_int),
("z_ascii_authent", c_char_p),
("z_class", c_char_p),
("z_class_inst", c_char_p),
("z_opcode", c_char_p),
("z_sender", c_char_p),
("z_recipient", c_char_p),
("z_default_format", c_char_p),
("z_multinotice", c_char_p),
("z_multiuid", ZUnique_Id_t),
("z_checksum", ZChecksum_t),
("z_ascii_checksum", c_char_p),
("z_num_other_fields", c_int),
("z_other_fields", c_char_p * Z_MAXOTHERFIELDS),
("z_message", POINTER(c_char)),
("z_message_len", c_int),
("z_num_hdr_fields", c_uint),
("z_hdr_fields", POINTER(c_char_p)),
]
class ZSubscription_t(Structure):
_fields_ = [
("zsub_recipient", c_char_p),
("zsub_class", c_char_p),
("zsub_classinst", c_char_p),
]
Code_t = c_int
ZInitialize = CFUNCTYPE(Code_t)(("ZInitialize", libzephyr))
ZRetrieveSubscriptions = CFUNCTYPE(Code_t, c_ushort, POINTER(c_int))(
("ZRetrieveSubscriptions", libzephyr)
)
ZGetSubscriptions = CFUNCTYPE(Code_t, POINTER(ZSubscription_t), POINTER(c_int))(
("ZGetSubscriptions", libzephyr)
)
ZOpenPort = CFUNCTYPE(Code_t, POINTER(c_ushort))(("ZOpenPort", libzephyr))
ZFlushSubscriptions = CFUNCTYPE(Code_t)(("ZFlushSubscriptions", libzephyr))
ZFreeNotice = CFUNCTYPE(Code_t, POINTER(ZNotice_t))(("ZFreeNotice", libzephyr))
ZSubscribeTo = CFUNCTYPE(Code_t, POINTER(ZSubscription_t), c_int, c_uint)(
("ZSubscribeTo", libzephyr)
)
ZCancelSubscriptions = CFUNCTYPE(Code_t, c_uint)(("ZCancelSubscriptions", libzephyr))
ZPending = CFUNCTYPE(c_int)(("ZPending", libzephyr))
ZReceiveNotice = CFUNCTYPE(Code_t, POINTER(ZNotice_t), POINTER(sockaddr_in))(
("ZReceiveNotice", libzephyr)
)
ZDumpSession = CFUNCTYPE(Code_t, POINTER(POINTER(c_char)), POINTER(c_int))(
("ZDumpSession", libzephyr)
)
ZLoadSession = CFUNCTYPE(Code_t, POINTER(c_char), c_int)(("ZLoadSession", libzephyr))
ZGetFD = CFUNCTYPE(c_int)(("ZGetFD", libzephyr))
ZERR_NONE = 0
# --- zephyr/lib/zephyr_err.et ---
ERROR_TABLE_BASE_zeph = -772103680
ZERR_SERVNAK = ERROR_TABLE_BASE_zeph + 16
# --- convenience helpers ---
class ZephyrError(Exception):
def __init__(self, code: int) -> None:
self.code = code
def __str__(self) -> str:
return error_message(self.code).decode()
def check(code: int) -> None:
if code != ZERR_NONE:
raise ZephyrError(code)

View file

@ -1,6 +1,5 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import asyncio
import os import os
import signal import signal
import subprocess import subprocess
@ -33,22 +32,21 @@ if options.sync_subscriptions:
sys.exit(0) sys.exit(0)
if options.forward_class_messages and not options.noshard: if options.forward_class_messages and not options.noshard:
# Needed to get access to zephyr.lib.parallel
sys.path.append("/home/zulip/zulip")
if options.on_startup_command is not None: if options.on_startup_command is not None:
subprocess.call([options.on_startup_command]) subprocess.call([options.on_startup_command])
from zerver.lib.parallel import run_parallel
print("Starting parallel zephyr class mirroring bot") print("Starting parallel zephyr class mirroring bot")
shards = list("0123456789abcdef") jobs = list("0123456789abcdef")
async def run_shard(shard: str) -> int: def run_job(shard: str) -> int:
process = await asyncio.create_subprocess_exec(*args, f"--shard={shard}") subprocess.call(args + [f"--shard={shard}"])
return await process.wait() return 0
async def run_shards(): for (status, job) in run_parallel(run_job, jobs, threads=16):
for coro in asyncio.as_completed(map(run_shard, shards)):
await coro
print("A mirroring shard died!") print("A mirroring shard died!")
asyncio.run(run_shards())
sys.exit(0) sys.exit(0)
backoff = RandomExponentialBackoff(timeout_success_equivalent=300) backoff = RandomExponentialBackoff(timeout_success_equivalent=300)

View file

@ -13,23 +13,18 @@ import sys
import tempfile import tempfile
import textwrap import textwrap
import time import time
from ctypes import POINTER, byref, c_char, c_int, c_ushort
from queue import Queue
from threading import Thread
from types import FrameType from types import FrameType
from typing import IO, Any, Dict, List, NoReturn, Optional, Set, Tuple, Union from typing import IO, Any, Dict, List, NoReturn, Optional, Set, Tuple, Union
from typing_extensions import Literal, TypedDict from typing_extensions import Literal, TypedDict
import zephyr_ctypes
import zulip
from zulip import RandomExponentialBackoff from zulip import RandomExponentialBackoff
DEFAULT_SITE = "https://api.zulip.com" DEFAULT_SITE = "https://api.zulip.com"
class States: class States:
Startup, ZulipToZephyr, ZephyrToZulip = list(range(3)) Startup, ZulipToZephyr, ZephyrToZulip, ChildSending = list(range(4))
CURRENT_STATE = States.Startup CURRENT_STATE = States.Startup
@ -37,16 +32,6 @@ CURRENT_STATE = States.Startup
logger: logging.Logger logger: logging.Logger
def make_zulip_client() -> zulip.Client:
return zulip.Client(
email=zulip_account_email,
api_key=api_key,
verbose=True,
client="zephyr_mirror",
site=options.site,
)
def to_zulip_username(zephyr_username: str) -> str: def to_zulip_username(zephyr_username: str) -> str:
if "@" in zephyr_username: if "@" in zephyr_username:
(user, realm) = zephyr_username.split("@") (user, realm) = zephyr_username.split("@")
@ -132,7 +117,7 @@ class ZephyrDict(TypedDict, total=False):
zsig: str zsig: str
def send_zulip(zulip_client: zulip.Client, zeph: ZephyrDict) -> Dict[str, Any]: def send_zulip(zeph: ZephyrDict) -> Dict[str, Any]:
message: Dict[str, Any] message: Dict[str, Any]
message = {} message = {}
if options.forward_class_messages: if options.forward_class_messages:
@ -166,7 +151,7 @@ def send_zulip(zulip_client: zulip.Client, zeph: ZephyrDict) -> Dict[str, Any]:
return zulip_client.send_message(message) return zulip_client.send_message(message)
def send_error_zulip(zulip_client: zulip.Client, error_msg: str) -> None: def send_error_zulip(error_msg: str) -> None:
message = { message = {
"type": "private", "type": "private",
"sender": zulip_account_email, "sender": zulip_account_email,
@ -181,23 +166,8 @@ current_zephyr_subs = set()
def zephyr_bulk_subscribe(subs: List[Tuple[str, str, str]]) -> None: def zephyr_bulk_subscribe(subs: List[Tuple[str, str, str]]) -> None:
try: try:
zephyr_ctypes.check( zephyr._z.subAll(subs)
zephyr_ctypes.ZSubscribeTo( except OSError:
(zephyr_ctypes.ZSubscription_t * len(subs))(
*(
zephyr_ctypes.ZSubscription_t(
zsub_class=cls.encode(),
zsub_classinst=instance.encode(),
zsub_recipient=recipient.encode(),
)
for cls, instance, recipient in subs
)
),
len(subs),
0,
)
)
except zephyr_ctypes.ZephyrError:
# Since we haven't added the subscription to # Since we haven't added the subscription to
# current_zephyr_subs yet, we can just return (so that we'll # current_zephyr_subs yet, we can just return (so that we'll
# continue processing normal messages) and we'll end up # continue processing normal messages) and we'll end up
@ -206,41 +176,26 @@ def zephyr_bulk_subscribe(subs: List[Tuple[str, str, str]]) -> None:
logger.exception("Error subscribing to streams (will retry automatically):") logger.exception("Error subscribing to streams (will retry automatically):")
logger.warning(f"Streams were: {[cls for cls, instance, recipient in subs]}") logger.warning(f"Streams were: {[cls for cls, instance, recipient in subs]}")
return return
try: try:
nsubs = c_int() actual_zephyr_subs = [cls for (cls, _, _) in zephyr._z.getSubscriptions()]
zephyr_ctypes.check(zephyr_ctypes.ZRetrieveSubscriptions(0, byref(nsubs))) except OSError:
zsubs = (zephyr_ctypes.ZSubscription_t * nsubs.value)()
zephyr_ctypes.check(zephyr_ctypes.ZGetSubscriptions(zsubs, byref(nsubs)))
actual_zephyr_subs = {zsub.zsub_class.decode() for zsub in zsubs}
except zephyr_ctypes.ZephyrError:
logger.exception("Error getting current Zephyr subscriptions") logger.exception("Error getting current Zephyr subscriptions")
# Don't add anything to current_zephyr_subs so that we'll # Don't add anything to current_zephyr_subs so that we'll
# retry the next time we check for streams to subscribe to # retry the next time we check for streams to subscribe to
# (within 15 seconds). # (within 15 seconds).
return return
finally:
zephyr_ctypes.ZFlushSubscriptions()
for (cls, instance, recipient) in subs: for (cls, instance, recipient) in subs:
if cls not in actual_zephyr_subs: if cls not in actual_zephyr_subs:
logger.error(f"Zephyr failed to subscribe us to {cls}; will retry") logger.error(f"Zephyr failed to subscribe us to {cls}; will retry")
try:
# We'll retry automatically when we next check for # We'll retry automatically when we next check for
# streams to subscribe to (within 15 seconds), but # streams to subscribe to (within 15 seconds), but
# it's worth doing 1 retry immediately to avoid # it's worth doing 1 retry immediately to avoid
# missing 15 seconds of messages on the affected # missing 15 seconds of messages on the affected
# classes # classes
zephyr_ctypes.ZSubscribeTo( zephyr._z.sub(cls, instance, recipient)
(zephyr_ctypes.ZSubscription_t * 1)( except OSError:
zephyr_ctypes.ZSubscription_t( pass
zsub_class=cls.encode(),
zsub_classinst=instance.encode(),
zsub_recipient=recipient.encode(),
)
),
1,
0,
)
else: else:
current_zephyr_subs.add(cls) current_zephyr_subs.add(cls)
@ -291,8 +246,8 @@ def maybe_restart_mirroring_script() -> None:
logger.warning("zephyr mirroring script has been updated; restarting...") logger.warning("zephyr mirroring script has been updated; restarting...")
maybe_kill_child() maybe_kill_child()
try: try:
zephyr_ctypes.check(zephyr_ctypes.ZCancelSubscriptions(0)) zephyr._z.cancelSubs()
except zephyr_ctypes.ZephyrError: except OSError:
# We don't care whether we failed to cancel subs properly, but we should log it # We don't care whether we failed to cancel subs properly, but we should log it
logger.exception("") logger.exception("")
backoff = RandomExponentialBackoff( backoff = RandomExponentialBackoff(
@ -308,29 +263,27 @@ def maybe_restart_mirroring_script() -> None:
raise Exception("Failed to reload too many times, aborting!") raise Exception("Failed to reload too many times, aborting!")
def process_loop(zulip_queue: "Queue[ZephyrDict]", log: Optional[IO[str]]) -> NoReturn: def process_loop(log: Optional[IO[str]]) -> NoReturn:
restart_check_count = 0 restart_check_count = 0
last_check_time = time.time() last_check_time = time.time()
recieve_backoff = RandomExponentialBackoff() recieve_backoff = RandomExponentialBackoff()
while True: while True:
select.select([zephyr_ctypes.ZGetFD()], [], [], 15) select.select([zephyr._z.getFD()], [], [], 15)
try: try:
process_backoff = RandomExponentialBackoff() process_backoff = RandomExponentialBackoff()
# Fetch notices from the queue until its empty # Fetch notices from the queue until its empty
while zephyr_ctypes.ZPending() != 0: while True:
notice = zephyr_ctypes.ZNotice_t() notice = zephyr.receive(block=False)
sender = zephyr_ctypes.sockaddr_in()
zephyr_ctypes.check(zephyr_ctypes.ZReceiveNotice(byref(notice), byref(sender)))
try:
recieve_backoff.succeed() recieve_backoff.succeed()
process_notice(notice, zulip_queue, log) if notice is None:
break
try:
process_notice(notice, log)
process_backoff.succeed() process_backoff.succeed()
except zephyr_ctypes.ZephyrError: except Exception:
logger.exception("Error relaying zephyr:") logger.exception("Error relaying zephyr:")
process_backoff.fail() process_backoff.fail()
finally: except Exception:
zephyr_ctypes.ZFreeNotice(byref(notice))
except zephyr_ctypes.ZephyrError:
logger.exception("Error checking for new zephyrs:") logger.exception("Error checking for new zephyrs:")
recieve_backoff.fail() recieve_backoff.fail()
continue continue
@ -442,47 +395,38 @@ def decrypt_zephyr(zephyr_class: str, instance: str, body: str) -> str:
return decrypted return decrypted
def process_notice( def process_notice(notice: "zephyr.ZNotice", log: Optional[IO[str]]) -> None:
notice: zephyr_ctypes.ZNotice_t, zulip_queue: "Queue[ZephyrDict]", log: Optional[IO[str]] assert notice.sender is not None
) -> None: (zsig, body) = parse_zephyr_body(notice.message, notice.format)
assert notice.z_sender is not None
(zsig, body) = parse_zephyr_body(
notice.z_message[: notice.z_message_len].decode(errors="replace"),
notice.z_default_format.decode(errors="replace"),
)
is_personal = False is_personal = False
is_huddle = False is_huddle = False
if notice.z_opcode == b"PING": if notice.opcode == "PING":
# skip PING messages # skip PING messages
return return
zephyr_class = notice.z_class.decode() zephyr_class = notice.cls.lower()
zephyr_instance = notice.z_class_inst.decode()
zephyr_sender = notice.z_sender.decode()
if zephyr_class.lower() == options.nagios_class: if zephyr_class == options.nagios_class:
# Mark that we got the message and proceed # Mark that we got the message and proceed
with open(options.nagios_path, "w") as f: with open(options.nagios_path, "w") as f:
f.write("0\n") f.write("0\n")
return return
if notice.z_recipient != b"": if notice.recipient != "":
is_personal = True is_personal = True
# Drop messages not to the listed subscriptions # Drop messages not to the listed subscriptions
if is_personal and not options.forward_personals: if is_personal and not options.forward_personals:
return return
if (zephyr_class.lower() not in current_zephyr_subs) and not is_personal: if (zephyr_class not in current_zephyr_subs) and not is_personal:
logger.debug(f"Skipping ... {zephyr_class}/{zephyr_instance}/{is_personal}") logger.debug(f"Skipping ... {zephyr_class}/{notice.instance}/{is_personal}")
return return
if notice.z_default_format.startswith(b"Zephyr error: See") or notice.z_default_format.endswith( if notice.format.startswith("Zephyr error: See") or notice.format.endswith("@(@color(blue))"):
b"@(@color(blue))"
):
logger.debug("Skipping message we got from Zulip!") logger.debug("Skipping message we got from Zulip!")
return return
if ( if (
zephyr_class.lower() == "mail" zephyr_class == "mail"
and zephyr_instance.lower() == "inbox" and notice.instance.lower() == "inbox"
and is_personal and is_personal
and not options.forward_mail_zephyrs and not options.forward_mail_zephyrs
): ):
@ -496,21 +440,21 @@ def process_notice(
huddle_recipients = [ huddle_recipients = [
to_zulip_username(x.strip()) for x in body.split("\n")[0][4:].split() to_zulip_username(x.strip()) for x in body.split("\n")[0][4:].split()
] ]
if zephyr_sender not in huddle_recipients: if notice.sender not in huddle_recipients:
huddle_recipients.append(to_zulip_username(zephyr_sender)) huddle_recipients.append(to_zulip_username(notice.sender))
body = body.split("\n", 1)[1] body = body.split("\n", 1)[1]
if ( if (
options.forward_class_messages options.forward_class_messages
and notice.z_opcode is not None and notice.opcode is not None
and notice.z_opcode.lower() == b"crypt" and notice.opcode.lower() == "crypt"
): ):
body = decrypt_zephyr(zephyr_class.lower(), zephyr_instance.lower(), body) body = decrypt_zephyr(zephyr_class, notice.instance.lower(), body)
zeph: ZephyrDict zeph: ZephyrDict
zeph = { zeph = {
"time": str(notice.z_time.tv_sec + notice.z_time.tv_usec / 1e6), "time": str(notice.time),
"sender": zephyr_sender, "sender": notice.sender,
"zsig": zsig, # logged here but not used by app "zsig": zsig, # logged here but not used by app
"content": body, "content": body,
} }
@ -518,47 +462,46 @@ def process_notice(
zeph["type"] = "private" zeph["type"] = "private"
zeph["recipient"] = huddle_recipients zeph["recipient"] = huddle_recipients
elif is_personal: elif is_personal:
assert notice.z_recipient is not None assert notice.recipient is not None
zeph["type"] = "private" zeph["type"] = "private"
zeph["recipient"] = to_zulip_username(notice.z_recipient.decode()) zeph["recipient"] = to_zulip_username(notice.recipient)
else: else:
zeph["type"] = "stream" zeph["type"] = "stream"
zeph["stream"] = zephyr_class.lower() zeph["stream"] = zephyr_class
if zephyr_instance.strip() != "": if notice.instance.strip() != "":
zeph["subject"] = zephyr_instance zeph["subject"] = notice.instance
else: else:
zeph["subject"] = f'(instance "{zephyr_instance}")' zeph["subject"] = f'(instance "{notice.instance}")'
# Add instances in for instanced personals # Add instances in for instanced personals
if is_personal: if is_personal:
if zephyr_class.lower() != "message" and zephyr_instance.lower() != "personal": if notice.cls.lower() != "message" and notice.instance.lower() != "personal":
heading = f"[-c {zephyr_class} -i {zephyr_instance}]\n" heading = f"[-c {notice.cls} -i {notice.instance}]\n"
elif zephyr_class.lower() != "message": elif notice.cls.lower() != "message":
heading = f"[-c {zephyr_class}]\n" heading = f"[-c {notice.cls}]\n"
elif zephyr_instance.lower() != "personal": elif notice.instance.lower() != "personal":
heading = f"[-i {zephyr_instance}]\n" heading = f"[-i {notice.instance}]\n"
else: else:
heading = "" heading = ""
zeph["content"] = heading + zeph["content"] zeph["content"] = heading + zeph["content"]
logger.info(f"Received a message on {zephyr_class}/{zephyr_instance} from {zephyr_sender}...") logger.info(f"Received a message on {zephyr_class}/{notice.instance} from {notice.sender}...")
if log is not None: if log is not None:
log.write(json.dumps(zeph) + "\n") log.write(json.dumps(zeph) + "\n")
log.flush() log.flush()
zulip_queue.put(zeph) if os.fork() == 0:
global CURRENT_STATE
CURRENT_STATE = States.ChildSending
def send_zulip_worker(zulip_queue: "Queue[ZephyrDict]", zulip_client: zulip.Client) -> None: # Actually send the message in a child process, to avoid blocking.
while True:
zeph = zulip_queue.get()
try: try:
res = send_zulip(zulip_client, zeph) res = send_zulip(zeph)
if res.get("result") != "success": if res.get("result") != "success":
logger.error(f"Error relaying zephyr:\n{zeph}\n{res}") logger.error(f"Error relaying zephyr:\n{zeph}\n{res}")
except Exception: except Exception:
logger.exception("Error relaying zephyr:") logger.exception("Error relaying zephyr:")
zulip_queue.task_done() finally:
os._exit(0)
def quit_failed_initialization(message: str) -> str: def quit_failed_initialization(message: str) -> str:
@ -571,14 +514,12 @@ def zephyr_init_autoretry() -> None:
backoff = zulip.RandomExponentialBackoff() backoff = zulip.RandomExponentialBackoff()
while backoff.keep_going(): while backoff.keep_going():
try: try:
# ZCancelSubscriptions sometimes gets a SERVNAK from the server # zephyr.init() tries to clear old subscriptions, and thus
zephyr_ctypes.check(zephyr_ctypes.ZInitialize()) # sometimes gets a SERVNAK from the server
zephyr_port = c_ushort() zephyr.init()
zephyr_ctypes.check(zephyr_ctypes.ZOpenPort(byref(zephyr_port)))
zephyr_ctypes.check(zephyr_ctypes.ZCancelSubscriptions(0))
backoff.succeed() backoff.succeed()
return return
except zephyr_ctypes.ZephyrError: except OSError:
logger.exception("Error initializing Zephyr library (retrying). Traceback:") logger.exception("Error initializing Zephyr library (retrying). Traceback:")
backoff.fail() backoff.fail()
@ -591,10 +532,11 @@ def zephyr_load_session_autoretry(session_path: str) -> None:
try: try:
with open(session_path, "rb") as f: with open(session_path, "rb") as f:
session = f.read() session = f.read()
zephyr_ctypes.check(zephyr_ctypes.ZInitialize()) zephyr._z.initialize()
zephyr_ctypes.check(zephyr_ctypes.ZLoadSession(session, len(session))) zephyr._z.load_session(session)
zephyr.__inited = True
return return
except zephyr_ctypes.ZephyrError: except OSError:
logger.exception("Error loading saved Zephyr session (retrying). Traceback:") logger.exception("Error loading saved Zephyr session (retrying). Traceback:")
backoff.fail() backoff.fail()
@ -602,26 +544,13 @@ def zephyr_load_session_autoretry(session_path: str) -> None:
def zephyr_subscribe_autoretry(sub: Tuple[str, str, str]) -> None: def zephyr_subscribe_autoretry(sub: Tuple[str, str, str]) -> None:
cls, instance, recipient = sub
backoff = zulip.RandomExponentialBackoff() backoff = zulip.RandomExponentialBackoff()
while backoff.keep_going(): while backoff.keep_going():
try: try:
zephyr_ctypes.check( zephyr.Subscriptions().add(sub)
zephyr_ctypes.ZSubscribeTo(
(zephyr_ctypes.ZSubscription_t * 1)(
zephyr_ctypes.ZSubscription_t(
zsub_class=cls.encode(),
zsub_classinst=instance.encode(),
zsub_recipient=recipient.encode(),
)
),
1,
0,
)
)
backoff.succeed() backoff.succeed()
return return
except zephyr_ctypes.ZephyrError: except OSError:
# Probably a SERVNAK from the zephyr server, but log the # Probably a SERVNAK from the zephyr server, but log the
# traceback just in case it's something else # traceback just in case it's something else
logger.exception("Error subscribing to personals (retrying). Traceback:") logger.exception("Error subscribing to personals (retrying). Traceback:")
@ -631,8 +560,6 @@ def zephyr_subscribe_autoretry(sub: Tuple[str, str, str]) -> None:
def zephyr_to_zulip(options: optparse.Values) -> None: def zephyr_to_zulip(options: optparse.Values) -> None:
zulip_client = make_zulip_client()
if options.use_sessions and os.path.exists(options.session_path): if options.use_sessions and os.path.exists(options.session_path):
logger.info("Loading old session") logger.info("Loading old session")
zephyr_load_session_autoretry(options.session_path) zephyr_load_session_autoretry(options.session_path)
@ -648,14 +575,8 @@ def zephyr_to_zulip(options: optparse.Values) -> None:
if options.nagios_class: if options.nagios_class:
zephyr_subscribe_autoretry((options.nagios_class, "*", "*")) zephyr_subscribe_autoretry((options.nagios_class, "*", "*"))
if options.use_sessions: if options.use_sessions:
buf = POINTER(c_char)()
buf_len = c_int()
zephyr_ctypes.check(zephyr_ctypes.ZDumpSession(byref(buf), byref(buf_len)))
try:
with open(options.session_path, "wb") as f: with open(options.session_path, "wb") as f:
f.write(buf[: buf_len.value]) # type: ignore[arg-type] # bytes, but mypy infers List[c_char] f.write(zephyr._z.dump_session())
finally:
zephyr_ctypes.free(buf)
if options.logs_to_resend is not None: if options.logs_to_resend is not None:
with open(options.logs_to_resend) as log: with open(options.logs_to_resend) as log:
@ -672,22 +593,18 @@ def zephyr_to_zulip(options: optparse.Values) -> None:
"sending saved message to %s from %s..." "sending saved message to %s from %s..."
% (zeph.get("stream", zeph.get("recipient")), zeph["sender"]) % (zeph.get("stream", zeph.get("recipient")), zeph["sender"])
) )
send_zulip(zulip_client, zeph) send_zulip(zeph)
except Exception: except Exception:
logger.exception("Could not send saved zephyr:") logger.exception("Could not send saved zephyr:")
time.sleep(2) time.sleep(2)
logger.info("Successfully initialized; Starting receive loop.") logger.info("Successfully initialized; Starting receive loop.")
# Actually send the messages in a thread, to avoid blocking.
zulip_queue: "Queue[ZephyrDict]" = Queue()
Thread(target=lambda: send_zulip_worker(zulip_queue, zulip_client)).start()
if options.resend_log_path is not None: if options.resend_log_path is not None:
with open(options.resend_log_path, "a") as log: with open(options.resend_log_path, "a") as log:
process_loop(zulip_queue, log) process_loop(log)
else: else:
process_loop(zulip_queue, None) process_loop(None)
def send_zephyr(zwrite_args: List[str], content: str) -> Tuple[int, str]: def send_zephyr(zwrite_args: List[str], content: str) -> Tuple[int, str]:
@ -758,7 +675,7 @@ def zcrypt_encrypt_content(zephyr_class: str, instance: str, content: str) -> Op
return encrypted return encrypted
def forward_to_zephyr(message: Dict[str, Any], zulip_client: zulip.Client) -> None: def forward_to_zephyr(message: Dict[str, Any]) -> None:
# 'Any' can be of any type of text # 'Any' can be of any type of text
support_heading = "Hi there! This is an automated message from Zulip." support_heading = "Hi there! This is an automated message from Zulip."
support_closing = """If you have any questions, please be in touch through the \ support_closing = """If you have any questions, please be in touch through the \
@ -832,7 +749,6 @@ Feedback button or at support@zulip.com."""
result = zcrypt_encrypt_content(zephyr_class, instance, wrapped_content) result = zcrypt_encrypt_content(zephyr_class, instance, wrapped_content)
if result is None: if result is None:
send_error_zulip( send_error_zulip(
zulip_client,
"""%s """%s
Your Zulip-Zephyr mirror bot was unable to forward that last message \ Your Zulip-Zephyr mirror bot was unable to forward that last message \
@ -842,7 +758,7 @@ key (perhaps because your AFS tokens expired). That means that while \
Zulip users (like you) received it, Zephyr users did not. Zulip users (like you) received it, Zephyr users did not.
%s""" %s"""
% (support_heading, support_closing), % (support_heading, support_closing)
) )
return return
@ -859,7 +775,6 @@ Zulip users (like you) received it, Zephyr users did not.
return return
elif code == 0: elif code == 0:
send_error_zulip( send_error_zulip(
zulip_client,
"""%s """%s
Your last message was successfully mirrored to zephyr, but zwrite \ Your last message was successfully mirrored to zephyr, but zwrite \
@ -868,7 +783,7 @@ returned the following warning:
%s %s
%s""" %s"""
% (support_heading, stderr, support_closing), % (support_heading, stderr, support_closing)
) )
return return
elif code != 0 and ( elif code != 0 and (
@ -882,7 +797,6 @@ returned the following warning:
if options.ignore_expired_tickets: if options.ignore_expired_tickets:
return return
send_error_zulip( send_error_zulip(
zulip_client,
"""%s """%s
Your last message was forwarded from Zulip to Zephyr unauthenticated, \ Your last message was forwarded from Zulip to Zephyr unauthenticated, \
@ -892,7 +806,7 @@ are running the Zulip-Zephyr mirroring bot, so we can send \
authenticated Zephyr messages for you again. authenticated Zephyr messages for you again.
%s""" %s"""
% (support_heading, support_closing), % (support_heading, support_closing)
) )
return return
@ -900,7 +814,6 @@ authenticated Zephyr messages for you again.
# probably because the recipient isn't subscribed to personals, # probably because the recipient isn't subscribed to personals,
# but regardless, we should just notify the user. # but regardless, we should just notify the user.
send_error_zulip( send_error_zulip(
zulip_client,
"""%s """%s
Your Zulip-Zephyr mirror bot was unable to forward that last message \ Your Zulip-Zephyr mirror bot was unable to forward that last message \
@ -910,12 +823,12 @@ received it, Zephyr users did not. The error message from zwrite was:
%s %s
%s""" %s"""
% (support_heading, stderr, support_closing), % (support_heading, stderr, support_closing)
) )
return return
def maybe_forward_to_zephyr(message: Dict[str, Any], zulip_client: zulip.Client) -> None: def maybe_forward_to_zephyr(message: Dict[str, Any]) -> None:
# The key string can be used to direct any type of text. # The key string can be used to direct any type of text.
if message["sender_email"] == zulip_account_email: if message["sender_email"] == zulip_account_email:
if not ( if not (
@ -938,7 +851,7 @@ def maybe_forward_to_zephyr(message: Dict[str, Any], zulip_client: zulip.Client)
) )
return return
try: try:
forward_to_zephyr(message, zulip_client) forward_to_zephyr(message)
except Exception: except Exception:
# Don't let an exception forwarding one message crash the # Don't let an exception forwarding one message crash the
# whole process # whole process
@ -946,16 +859,12 @@ def maybe_forward_to_zephyr(message: Dict[str, Any], zulip_client: zulip.Client)
def zulip_to_zephyr(options: optparse.Values) -> NoReturn: def zulip_to_zephyr(options: optparse.Values) -> NoReturn:
zulip_client = make_zulip_client()
# Sync messages from zulip to zephyr # Sync messages from zulip to zephyr
logger.info("Starting syncing messages.") logger.info("Starting syncing messages.")
backoff = RandomExponentialBackoff(timeout_success_equivalent=120) backoff = RandomExponentialBackoff(timeout_success_equivalent=120)
while True: while True:
try: try:
zulip_client.call_on_each_message( zulip_client.call_on_each_message(maybe_forward_to_zephyr)
lambda message: maybe_forward_to_zephyr(message, zulip_client)
)
except Exception: except Exception:
logger.exception("Error syncing messages:") logger.exception("Error syncing messages:")
backoff.fail() backoff.fail()
@ -977,8 +886,6 @@ def subscribed_to_mail_messages() -> bool:
def add_zulip_subscriptions(verbose: bool) -> None: def add_zulip_subscriptions(verbose: bool) -> None:
zulip_client = make_zulip_client()
zephyr_subscriptions = set() zephyr_subscriptions = set()
skipped = set() skipped = set()
for (cls, instance, recipient) in parse_zephyr_subs(verbose=verbose): for (cls, instance, recipient) in parse_zephyr_subs(verbose=verbose):
@ -1239,14 +1146,14 @@ def parse_args() -> Tuple[optparse.Values, List[str]]:
def die_gracefully(signal: int, frame: FrameType) -> None: def die_gracefully(signal: int, frame: FrameType) -> None:
if CURRENT_STATE == States.ZulipToZephyr: if CURRENT_STATE == States.ZulipToZephyr or CURRENT_STATE == States.ChildSending:
# this is a child process, so we want os._exit (no clean-up necessary) # this is a child process, so we want os._exit (no clean-up necessary)
os._exit(1) os._exit(1)
if CURRENT_STATE == States.ZephyrToZulip and not options.use_sessions: if CURRENT_STATE == States.ZephyrToZulip and not options.use_sessions:
try: try:
# zephyr=>zulip processes may have added subs, so run ZCancelSubscriptions # zephyr=>zulip processes may have added subs, so run cancelSubs
zephyr_ctypes.check(zephyr_ctypes.ZCancelSubscriptions(0)) zephyr._z.cancelSubs()
except OSError: except OSError:
# We don't care whether we failed to cancel subs properly, but we should log it # We don't care whether we failed to cancel subs properly, but we should log it
logger.exception("") logger.exception("")
@ -1300,6 +1207,15 @@ or specify the --api-key-file option."""
sys.exit(1) sys.exit(1)
zulip_account_email = options.user + "@mit.edu" zulip_account_email = options.user + "@mit.edu"
import zulip
zulip_client = zulip.Client(
email=zulip_account_email,
api_key=api_key,
verbose=True,
client="zephyr_mirror",
site=options.site,
)
start_time = time.time() start_time = time.time()
@ -1358,6 +1274,8 @@ or specify the --api-key-file option."""
child_pid = None child_pid = None
CURRENT_STATE = States.ZephyrToZulip CURRENT_STATE = States.ZephyrToZulip
import zephyr
logger_name = "zephyr=>zulip" logger_name = "zephyr=>zulip"
if options.shard is not None: if options.shard is not None:
logger_name += f"({options.shard})" logger_name += f"({options.shard})"

View file

@ -42,12 +42,12 @@ setup(
"License :: OSI Approved :: Apache Software License", "License :: OSI Approved :: Apache Software License",
"Topic :: Communications :: Chat", "Topic :: Communications :: Chat",
"Programming Language :: Python :: 3", "Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7", "Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
], ],
python_requires=">=3.7", python_requires=">=3.6",
url="https://www.zulip.org/", url="https://www.zulip.org/",
project_urls={ project_urls={
"Source": "https://github.com/zulip/python-zulip-api/", "Source": "https://github.com/zulip/python-zulip-api/",

View file

@ -34,12 +34,12 @@ setup(
"License :: OSI Approved :: Apache Software License", "License :: OSI Approved :: Apache Software License",
"Topic :: Communications :: Chat", "Topic :: Communications :: Chat",
"Programming Language :: Python :: 3", "Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7", "Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
], ],
python_requires=">=3.7", python_requires=">=3.6",
url="https://www.zulip.org/", url="https://www.zulip.org/",
project_urls={ project_urls={
"Source": "https://github.com/zulip/python-zulip-api/", "Source": "https://github.com/zulip/python-zulip-api/",

View file

@ -1 +1,2 @@
chess==1.* python-chess==0.31.* ; python_version < '3.7'
chess==1.* ; python_version >= '3.7'

View file

@ -22,12 +22,12 @@ setup(
"License :: OSI Approved :: Apache Software License", "License :: OSI Approved :: Apache Software License",
"Topic :: Communications :: Chat", "Topic :: Communications :: Chat",
"Programming Language :: Python :: 3", "Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7", "Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
], ],
python_requires=">=3.7", python_requires=">=3.6",
url="https://www.zulip.org/", url="https://www.zulip.org/",
project_urls={ project_urls={
"Source": "https://github.com/zulip/python-zulip-api/", "Source": "https://github.com/zulip/python-zulip-api/",