Compare commits

..

No commits in common. "02586f1d348dbcab3bc8d30b805fffd48a524e18" and "63c259b2bc3592e49459df0f6f795bb25a65f030" have entirely different histories.

19 changed files with 212 additions and 558 deletions

View file

@ -17,24 +17,33 @@ jobs:
fail-fast: false
matrix:
include:
# Focal ships with Python 3.8.10.
- docker_image: zulip/ci:focal
name: Ubuntu 20.04 (Python 3.8, backend)
os: focal
legacy_client_interface: "3"
# Base images are built using `tools/ci/Dockerfile.prod.template`.
# The comments at the top explain how to build and upload these images.
# Bionic ships with Python 3.6.
- docker_image: zulip/ci:bionic
name: Ubuntu 18.04 Bionic (Python 3.6, backend)
os: bionic
is_bionic: true
include_frontend_tests: false
# Configure this test to run with the Zulip 3.2 release.
legacy_client_interface: 3
server_version: refs/tags/3.2
# Focal ships with Python 3.8.2.
- docker_image: zulip/ci:focal
name: Ubuntu 20.04 Focal (Python 3.8, backend)
os: focal
is_focal: true
include_frontend_tests: false
legacy_client_interface: 4
server_version: refs/tags/4.0
# Bullseye ships with Python 3.9.2.
- docker_image: zulip/ci:bullseye
name: Debian 11 (Python 3.9, backend)
name: Debian 11 Bullseye (Python 3.9, backend)
os: bullseye
legacy_client_interface: "4"
is_bullseye: true
include_frontend_tests: false
legacy_client_interface: 4
server_version: refs/tags/4.0
# Ubuntu 22.04 ships with Python 3.10.6.
- docker_image: zulip/ci:jammy
name: Ubuntu 22.04 (Python 3.10, backend)
os: jammy
legacy_client_interface: "6"
server_version: refs/tags/6.0
runs-on: ubuntu-latest
name: ${{ matrix.name }} (Zulip ${{matrix.server_version}})
@ -49,18 +58,25 @@ jobs:
HOME: /home/github/
steps:
- name: "Check out python-zulip-api"
- name: 'Checkout python-zulip-api'
uses: actions/checkout@v2
with:
path: api
- name: "Check out Zulip server ${{ matrix.server_version }}"
- name: 'Checkout Zulip server ${{ matrix.server_version }}'
uses: actions/checkout@v2
with:
repository: zulip/zulip
ref: ${{ matrix.server_version }}
path: server
- name: Do Bionic hack
if: ${{ matrix.is_bionic }}
run: |
# Temporary hack till `sudo service redis-server start` gets fixes in Bionic. See
# https://chat.zulip.org/#narrow/stream/3-backend/topic/Ubuntu.20bionic.20CircleCI
sudo sed -i '/^bind/s/bind.*/bind 0.0.0.0/' /etc/redis/redis.conf
- name: Install dependencies
run: |
cd server

View file

@ -13,10 +13,10 @@ jobs:
steps:
- uses: actions/checkout@v2
- name: Set up Python 3.7
- name: Setup Python 3.6
uses: actions/setup-python@v2
with:
python-version: "3.7"
python-version: 3.6
- name: Install dependencies
run: tools/provision --force
@ -32,12 +32,15 @@ jobs:
fail-fast: false
matrix:
os: [ubuntu-latest, windows-latest]
python-version: ["3.7", "3.8", "3.9", "3.10"]
python-version: [3.6, 3.7, 3.8, 3.9]
exclude:
- os: windows-latest
python-version: 3.6 # cryptography install fails on Windows with python 3.6 since pip is quite old.
steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
- name: Setup Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}

View file

@ -1,4 +1,3 @@
Aman Agrawal <f2016561@pilani.bits-pilani.ac.in>
Anders Kaseorg <anders@zulip.com> <anders@zulipchat.com>
Anders Kaseorg <anders@zulip.com> <andersk@mit.edu>
Rishi Gupta <rishig@zulip.com> <rishig@zulipchat.com>
@ -8,4 +7,3 @@ Steve Howell <showell@zulip.com> <steve@zulip.com>
Tim Abbott <tabbott@zulip.com> <tabbott@humbughq.com>
Tim Abbott <tabbott@zulip.com> <tabbott@mit.edu>
Tim Abbott <tabbott@zulip.com> <tabbott@zulipchat.com>
Zixuan James Li <p359101898@gmail.com> <359101898@qq.com>

View file

@ -1,14 +1,8 @@
[tool.black]
line-length = 100
target-version = ["py37"]
target-version = ["py36"]
[tool.isort]
src_paths = [
"tools",
"zulip",
"zulip/integrations/zephyr",
"zulip_bots",
"zulip_botserver",
]
src_paths = ["tools", "zulip", "zulip_bots", "zulip_botserver"]
profile = "black"
line_length = 100

View file

@ -3,7 +3,7 @@
The [Zulip API](https://zulip.com/api) Python bindings require the
following dependencies:
* **Python (version >= 3.7)**
* **Python (version >= 3.6)**
* requests (version >= 0.12.1)
**Note**: If you'd like to use the Zulip bindings with Python 2, we

View file

@ -14,6 +14,9 @@ from concurrent.futures import ThreadPoolExecutor
from io import BytesIO
from typing import Any, Dict, List, Match, Optional, Set, Tuple, Type, Union
if os.name != "nt":
import magic
import magic.compat
import nio
from nio.responses import (
DownloadError,
@ -349,15 +352,17 @@ class ZulipToMatrix:
if result["result"] != "success":
success = False
continue
try:
with urllib.request.urlopen(self.server_url + result["url"]) as response:
file_content: bytes = response.read()
mimetype: str = response.headers.get_content_type()
file_content: bytes = urllib.request.urlopen(self.server_url + result["url"]).read()
except Exception:
success = False
continue
mimetype: str
if os.name == "nt":
mimetype = "m.file"
else:
mimetype = magic.from_buffer(file_content, mime=True)
filename: str = file.split("/")[-1]
response, _ = await self.matrix_client.upload(

View file

@ -1 +1,3 @@
matrix-nio
python-magic
python-magic-bin; platform_system == "Windows"

View file

@ -41,7 +41,7 @@ topic = matrix
ZULIP_MESSAGE_TEMPLATE: str = "**{username}** [{uid}]: {message}"
# For Python 3.7 compatibility.
# For Python 3.6 compatibility.
# (Since 3.8, there is unittest.IsolatedAsyncioTestCase!)
# source: https://stackoverflow.com/a/46324983
def async_test(coro: Callable[..., Awaitable[Any]]) -> Callable[..., Any]:

View file

@ -3,20 +3,12 @@ config = {
"email": "zulip-bot@email.com",
"api_key": "put api key here",
"site": "https://chat.zulip.org",
"stream": "test here",
"topic": "<- slack-bridge",
},
"slack": {
"username": "slack_username",
"token": "xoxb-your-slack-token",
},
# Mapping between Slack channels and Zulip stream-topic's.
# You can specify multiple pairs.
"channel_mapping": {
# Slack channel; must be channel ID
"C5Z5N7R8A": {
# Zulip stream
"stream": "test here",
# Zulip topic
"topic": "<- slack-bridge",
},
"channel": "C5Z5N7R8A -- must be channel id",
},
}

View file

@ -5,7 +5,7 @@ import os
import sys
import threading
import traceback
from typing import Any, Callable, Dict, Optional, Tuple
from typing import Any, Callable, Dict
import bridge_with_slack_config
import slack_sdk
@ -17,28 +17,18 @@ import zulip
ZULIP_MESSAGE_TEMPLATE = "**{username}**: {message}"
SLACK_MESSAGE_TEMPLATE = "<{username}> {message}"
StreamTopicT = Tuple[str, str]
def get_slack_channel_for_zulip_message(
msg: Dict[str, Any], zulip_to_slack_map: Dict[StreamTopicT, Any], bot_email: str
) -> Optional[str]:
def check_zulip_message_validity(msg: Dict[str, Any], config: Dict[str, Any]) -> bool:
is_a_stream = msg["type"] == "stream"
if not is_a_stream:
return None
in_the_specified_stream = msg["display_recipient"] == config["stream"]
at_the_specified_subject = msg["subject"] == config["topic"]
stream_name = msg["display_recipient"]
topic_name = msg["subject"]
stream_topic: StreamTopicT = (stream_name, topic_name)
if stream_topic not in zulip_to_slack_map:
return None
# We do this to identify the messages generated from Slack -> Zulip
# and we make sure we don't forward it again to the Slack.
from_zulip_bot = msg["sender_email"] == bot_email
if from_zulip_bot:
return None
return zulip_to_slack_map[stream_topic]
# We do this to identify the messages generated from Matrix -> Zulip
# and we make sure we don't forward it again to the Matrix.
not_from_zulip_bot = msg["sender_email"] != config["email"]
if is_a_stream and not_from_zulip_bot and in_the_specified_stream and at_the_specified_subject:
return True
return False
class SlackBridge:
@ -47,17 +37,14 @@ class SlackBridge:
self.zulip_config = config["zulip"]
self.slack_config = config["slack"]
self.slack_to_zulip_map: Dict[str, Dict[str, str]] = config["channel_mapping"]
self.zulip_to_slack_map: Dict[StreamTopicT, str] = {
(z["stream"], z["topic"]): s for s, z in config["channel_mapping"].items()
}
# zulip-specific
self.zulip_client = zulip.Client(
email=self.zulip_config["email"],
api_key=self.zulip_config["api_key"],
site=self.zulip_config["site"],
)
self.zulip_stream = self.zulip_config["stream"]
self.zulip_subject = self.zulip_config["topic"]
# slack-specific
self.channel = self.slack_config["channel"]
@ -81,16 +68,14 @@ class SlackBridge:
def zulip_to_slack(self) -> Callable[[Dict[str, Any]], None]:
def _zulip_to_slack(msg: Dict[str, Any]) -> None:
slack_channel = get_slack_channel_for_zulip_message(
msg, self.zulip_to_slack_map, self.zulip_config["email"]
)
if slack_channel is not None:
message_valid = check_zulip_message_validity(msg, self.zulip_config)
if message_valid:
self.wrap_slack_mention_with_bracket(msg)
slack_text = SLACK_MESSAGE_TEMPLATE.format(
username=msg["sender_full_name"], message=msg["content"]
)
self.slack_webclient.chat_postMessage(
channel=slack_channel,
channel=self.channel,
text=slack_text,
)
@ -106,7 +91,7 @@ class SlackBridge:
@rtm.on("message")
def slack_to_zulip(client: RTMClient, event: Dict[str, Any]) -> None:
if event["channel"] not in self.slack_to_zulip_map:
if event["channel"] != self.channel:
return
user_id = event["user"]
user = self.slack_id_to_name[user_id]
@ -115,12 +100,8 @@ class SlackBridge:
return
self.replace_slack_id_with_name(event)
content = ZULIP_MESSAGE_TEMPLATE.format(username=user, message=event["text"])
zulip_endpoint = self.slack_to_zulip_map[event["channel"]]
msg_data = dict(
type="stream",
to=zulip_endpoint["stream"],
subject=zulip_endpoint["topic"],
content=content,
type="stream", to=self.zulip_stream, subject=self.zulip_subject, content=content
)
self.zulip_client.send_message(msg_data)
@ -137,16 +118,10 @@ if __name__ == "__main__":
sys.path.append(os.path.join(os.path.dirname(__file__), ".."))
parser = argparse.ArgumentParser(usage=usage)
config: Dict[str, Any] = bridge_with_slack_config.config
if "channel_mapping" not in config:
print(
'The key "channel_mapping" is not found in bridge_with_slack_config.py.\n'
"Your config file may be outdated."
)
exit(1)
print("Starting slack mirroring bot")
print("MAKE SURE THE BOT IS SUBSCRIBED TO THE RELEVANT ZULIP STREAM(S) & SLACK CHANNEL(S)!")
print("MAKE SURE THE BOT IS SUBSCRIBED TO THE RELEVANT ZULIP STREAM")
config = bridge_with_slack_config.config
# We have to define rtm outside of SlackBridge because the rtm variable is used as a method decorator.
rtm = RTMClient(token=config["slack"]["token"])

View file

@ -6,10 +6,10 @@ import random
import subprocess
import sys
import time
from ctypes import byref, c_int, c_ushort
from typing import Dict, List, Set, Tuple
import zephyr_ctypes
import zephyr
import zulip
parser = optparse.OptionParser()
@ -136,43 +136,9 @@ for (stream, test) in test_streams:
actually_subscribed = False
for tries in range(10):
try:
zephyr_ctypes.check(zephyr_ctypes.ZInitialize())
zephyr_port = c_ushort()
zephyr_ctypes.check(zephyr_ctypes.ZOpenPort(byref(zephyr_port)))
zephyr_ctypes.check(zephyr_ctypes.ZCancelSubscriptions(0))
zephyr_ctypes.check(
zephyr_ctypes.ZSubscribeTo(
(zephyr_ctypes.ZSubscription_t * len(zephyr_subs_to_add))(
*(
zephyr_ctypes.ZSubscription_t(
zsub_class=cls.encode(),
zsub_classinst=instance.encode(),
zsub_recipient=recipient.encode(),
)
for cls, instance, recipient in zephyr_subs_to_add
)
),
len(zephyr_subs_to_add),
0,
)
)
try:
nsubs = c_int()
zephyr_ctypes.check(zephyr_ctypes.ZRetrieveSubscriptions(0, byref(nsubs)))
zsubs = (zephyr_ctypes.ZSubscription_t * nsubs.value)()
zephyr_ctypes.check(zephyr_ctypes.ZGetSubscriptions(zsubs, byref(nsubs)))
zephyr_subs = {
(
zsub.zsub_class.decode(),
zsub.zsub_classinst.decode(),
zsub.zsub_recipient.decode(),
)
for zsub in zsubs
}
finally:
zephyr_ctypes.ZFlushSubscriptions()
zephyr.init()
zephyr._z.subAll(zephyr_subs_to_add)
zephyr_subs = zephyr._z.getSubscriptions()
missing = 0
for elt in zephyr_subs_to_add:
@ -182,8 +148,8 @@ for tries in range(10):
if missing == 0:
actually_subscribed = True
break
except zephyr_ctypes.ZephyrError as e:
if e.code == zephyr_ctypes.ZERR_SERVNAK:
except OSError as e:
if "SERVNAK received" in e.args:
logger.error("SERVNAK repeatedly received, punting rest of test")
else:
logger.exception("Exception subscribing to zephyrs")
@ -219,16 +185,15 @@ notices = []
# receive queue with 30+ messages, which might result in messages
# being dropped.
def receive_zephyrs() -> None:
while zephyr_ctypes.ZPending() != 0:
notice = zephyr_ctypes.ZNotice_t()
sender = zephyr_ctypes.sockaddr_in()
while True:
try:
zephyr_ctypes.check(zephyr_ctypes.ZReceiveNotice(byref(notice), byref(sender)))
except zephyr_ctypes.ZephyrError:
notice = zephyr.receive(block=False)
except Exception:
logging.exception("Exception receiving zephyrs:")
notice = None
if notice is None:
break
if notice.z_opcode != b"":
zephyr_ctypes.ZFreeNotice(byref(notice))
if notice.opcode != "":
continue
notices.append(notice)
@ -329,16 +294,10 @@ def process_keys(content_list: List[str]) -> Tuple[Dict[str, int], Set[str], Set
# The h_foo variables are about the messages we _received_ in Zulip
# The z_foo variables are about the messages we _received_ in Zephyr
h_contents = [message["content"] for message in messages]
z_contents = [
notice.z_message[: notice.z_message_len].split(b"\0")[1].decode(errors="replace")
for notice in notices
]
z_contents = [notice.message.split("\0")[1] for notice in notices]
(h_key_counts, h_missing_z, h_missing_h, h_duplicates, h_success) = process_keys(h_contents)
(z_key_counts, z_missing_z, z_missing_h, z_duplicates, z_success) = process_keys(z_contents)
for notice in notices:
zephyr_ctypes.ZFreeNotice(byref(notice))
if z_success and h_success:
logger.info("Success!")
print_status_and_exit(0)

View file

@ -18,7 +18,7 @@ api_key_path = f"/home/zulip/api-keys/{program_name}"
open(api_key_path, "w").write(api_key + "\n")
# Setup supervisord configuration
supervisor_path = f"/etc/supervisor/conf.d/zmirror/{program_name}.conf"
supervisor_path = f"/etc/supervisor/conf.d/zulip/{program_name}.conf"
template = os.path.join(os.path.dirname(__file__), "zmirror_private.conf.template")
template_data = open(template).read()
session_path = f"/home/zulip/zephyr_sessions/{program_name}"

View file

@ -1,207 +0,0 @@
from ctypes import (
CDLL,
CFUNCTYPE,
POINTER,
Structure,
Union,
c_char,
c_char_p,
c_int,
c_long,
c_uint,
c_uint8,
c_uint16,
c_uint32,
c_ushort,
c_void_p,
)
libc = CDLL("libc.so.6")
com_err = CDLL("libcom_err.so.2")
libzephyr = CDLL("libzephyr.so.4")
# --- glibc/bits/sockaddr.h ---
sa_family_t = c_ushort
# --- glibc/sysdeps/unix/sysv/linux/bits/socket.h ---
class sockaddr(Structure):
_fields_ = [
("sa_family", sa_family_t),
("sa_data", c_char * 14),
]
# --- glibc/inet/netinet/in.h ---
in_port_t = c_uint16
in_addr_t = c_uint32
class in_addr(Structure):
_fields_ = [
("s_addr", in_addr_t),
]
class sockaddr_in(Structure):
_fields_ = [
("sin_family", sa_family_t),
("sin_port", in_port_t),
("sin_addr", in_addr),
("sin_zero", c_uint8 * 8),
]
class in6_addr(Structure):
_fields_ = [
("s6_addr", c_uint8 * 16),
]
class sockaddr_in6(Structure):
_fields_ = [
("sin6_family", sa_family_t),
("sin6_port", in_port_t),
("sin6_flowinfo", c_uint32),
("sin6_addr", in6_addr),
("sin6_scope_id", c_uint32),
]
# --- glibc/stdlib/stdlib.h ---
free = CFUNCTYPE(None, c_void_p)(("free", libc))
# --- e2fsprogs/lib/et/com_err.h ---
error_message = CFUNCTYPE(c_char_p, c_long)(("error_message", com_err))
# --- zephyr/h/zephyr/zephyr.h ---
Z_MAXOTHERFIELDS = 10
ZNotice_Kind_t = c_int
class _ZTimeval(Structure):
_fields_ = [
("tv_sec", c_int),
("tv_usec", c_int),
]
class ZUnique_Id_t(Structure):
_fields_ = [
("zuid_addr", in_addr),
("tv", _ZTimeval),
]
ZChecksum_t = c_uint
class _ZSenderSockaddr(Union):
_fields_ = [
("sa", sockaddr),
("ip4", sockaddr_in),
("ip6", sockaddr_in6),
]
class ZNotice_t(Structure):
_fields_ = [
("z_packet", c_char_p),
("z_version", c_char_p),
("z_kind", ZNotice_Kind_t),
("z_uid", ZUnique_Id_t),
("z_sender_sockaddr", _ZSenderSockaddr),
("z_time", _ZTimeval),
("z_port", c_ushort),
("z_charset", c_ushort),
("z_auth", c_int),
("z_checked_auth", c_int),
("z_authent_len", c_int),
("z_ascii_authent", c_char_p),
("z_class", c_char_p),
("z_class_inst", c_char_p),
("z_opcode", c_char_p),
("z_sender", c_char_p),
("z_recipient", c_char_p),
("z_default_format", c_char_p),
("z_multinotice", c_char_p),
("z_multiuid", ZUnique_Id_t),
("z_checksum", ZChecksum_t),
("z_ascii_checksum", c_char_p),
("z_num_other_fields", c_int),
("z_other_fields", c_char_p * Z_MAXOTHERFIELDS),
("z_message", POINTER(c_char)),
("z_message_len", c_int),
("z_num_hdr_fields", c_uint),
("z_hdr_fields", POINTER(c_char_p)),
]
class ZSubscription_t(Structure):
_fields_ = [
("zsub_recipient", c_char_p),
("zsub_class", c_char_p),
("zsub_classinst", c_char_p),
]
Code_t = c_int
ZInitialize = CFUNCTYPE(Code_t)(("ZInitialize", libzephyr))
ZRetrieveSubscriptions = CFUNCTYPE(Code_t, c_ushort, POINTER(c_int))(
("ZRetrieveSubscriptions", libzephyr)
)
ZGetSubscriptions = CFUNCTYPE(Code_t, POINTER(ZSubscription_t), POINTER(c_int))(
("ZGetSubscriptions", libzephyr)
)
ZOpenPort = CFUNCTYPE(Code_t, POINTER(c_ushort))(("ZOpenPort", libzephyr))
ZFlushSubscriptions = CFUNCTYPE(Code_t)(("ZFlushSubscriptions", libzephyr))
ZFreeNotice = CFUNCTYPE(Code_t, POINTER(ZNotice_t))(("ZFreeNotice", libzephyr))
ZSubscribeTo = CFUNCTYPE(Code_t, POINTER(ZSubscription_t), c_int, c_uint)(
("ZSubscribeTo", libzephyr)
)
ZCancelSubscriptions = CFUNCTYPE(Code_t, c_uint)(("ZCancelSubscriptions", libzephyr))
ZPending = CFUNCTYPE(c_int)(("ZPending", libzephyr))
ZReceiveNotice = CFUNCTYPE(Code_t, POINTER(ZNotice_t), POINTER(sockaddr_in))(
("ZReceiveNotice", libzephyr)
)
ZDumpSession = CFUNCTYPE(Code_t, POINTER(POINTER(c_char)), POINTER(c_int))(
("ZDumpSession", libzephyr)
)
ZLoadSession = CFUNCTYPE(Code_t, POINTER(c_char), c_int)(("ZLoadSession", libzephyr))
ZGetFD = CFUNCTYPE(c_int)(("ZGetFD", libzephyr))
ZERR_NONE = 0
# --- zephyr/lib/zephyr_err.et ---
ERROR_TABLE_BASE_zeph = -772103680
ZERR_SERVNAK = ERROR_TABLE_BASE_zeph + 16
# --- convenience helpers ---
class ZephyrError(Exception):
def __init__(self, code: int) -> None:
self.code = code
def __str__(self) -> str:
return error_message(self.code).decode()
def check(code: int) -> None:
if code != ZERR_NONE:
raise ZephyrError(code)

View file

@ -1,6 +1,5 @@
#!/usr/bin/env python3
import asyncio
import os
import signal
import subprocess
@ -33,22 +32,21 @@ if options.sync_subscriptions:
sys.exit(0)
if options.forward_class_messages and not options.noshard:
# Needed to get access to zephyr.lib.parallel
sys.path.append("/home/zulip/zulip")
if options.on_startup_command is not None:
subprocess.call([options.on_startup_command])
from zerver.lib.parallel import run_parallel
print("Starting parallel zephyr class mirroring bot")
shards = list("0123456789abcdef")
jobs = list("0123456789abcdef")
async def run_shard(shard: str) -> int:
process = await asyncio.create_subprocess_exec(*args, f"--shard={shard}")
return await process.wait()
def run_job(shard: str) -> int:
subprocess.call(args + [f"--shard={shard}"])
return 0
async def run_shards():
for coro in asyncio.as_completed(map(run_shard, shards)):
await coro
print("A mirroring shard died!")
asyncio.run(run_shards())
for (status, job) in run_parallel(run_job, jobs, threads=16):
print("A mirroring shard died!")
sys.exit(0)
backoff = RandomExponentialBackoff(timeout_success_equivalent=300)

View file

@ -13,23 +13,18 @@ import sys
import tempfile
import textwrap
import time
from ctypes import POINTER, byref, c_char, c_int, c_ushort
from queue import Queue
from threading import Thread
from types import FrameType
from typing import IO, Any, Dict, List, NoReturn, Optional, Set, Tuple, Union
from typing_extensions import Literal, TypedDict
import zephyr_ctypes
import zulip
from zulip import RandomExponentialBackoff
DEFAULT_SITE = "https://api.zulip.com"
class States:
Startup, ZulipToZephyr, ZephyrToZulip = list(range(3))
Startup, ZulipToZephyr, ZephyrToZulip, ChildSending = list(range(4))
CURRENT_STATE = States.Startup
@ -37,16 +32,6 @@ CURRENT_STATE = States.Startup
logger: logging.Logger
def make_zulip_client() -> zulip.Client:
return zulip.Client(
email=zulip_account_email,
api_key=api_key,
verbose=True,
client="zephyr_mirror",
site=options.site,
)
def to_zulip_username(zephyr_username: str) -> str:
if "@" in zephyr_username:
(user, realm) = zephyr_username.split("@")
@ -132,7 +117,7 @@ class ZephyrDict(TypedDict, total=False):
zsig: str
def send_zulip(zulip_client: zulip.Client, zeph: ZephyrDict) -> Dict[str, Any]:
def send_zulip(zeph: ZephyrDict) -> Dict[str, Any]:
message: Dict[str, Any]
message = {}
if options.forward_class_messages:
@ -166,7 +151,7 @@ def send_zulip(zulip_client: zulip.Client, zeph: ZephyrDict) -> Dict[str, Any]:
return zulip_client.send_message(message)
def send_error_zulip(zulip_client: zulip.Client, error_msg: str) -> None:
def send_error_zulip(error_msg: str) -> None:
message = {
"type": "private",
"sender": zulip_account_email,
@ -181,23 +166,8 @@ current_zephyr_subs = set()
def zephyr_bulk_subscribe(subs: List[Tuple[str, str, str]]) -> None:
try:
zephyr_ctypes.check(
zephyr_ctypes.ZSubscribeTo(
(zephyr_ctypes.ZSubscription_t * len(subs))(
*(
zephyr_ctypes.ZSubscription_t(
zsub_class=cls.encode(),
zsub_classinst=instance.encode(),
zsub_recipient=recipient.encode(),
)
for cls, instance, recipient in subs
)
),
len(subs),
0,
)
)
except zephyr_ctypes.ZephyrError:
zephyr._z.subAll(subs)
except OSError:
# Since we haven't added the subscription to
# current_zephyr_subs yet, we can just return (so that we'll
# continue processing normal messages) and we'll end up
@ -206,41 +176,26 @@ def zephyr_bulk_subscribe(subs: List[Tuple[str, str, str]]) -> None:
logger.exception("Error subscribing to streams (will retry automatically):")
logger.warning(f"Streams were: {[cls for cls, instance, recipient in subs]}")
return
try:
nsubs = c_int()
zephyr_ctypes.check(zephyr_ctypes.ZRetrieveSubscriptions(0, byref(nsubs)))
zsubs = (zephyr_ctypes.ZSubscription_t * nsubs.value)()
zephyr_ctypes.check(zephyr_ctypes.ZGetSubscriptions(zsubs, byref(nsubs)))
actual_zephyr_subs = {zsub.zsub_class.decode() for zsub in zsubs}
except zephyr_ctypes.ZephyrError:
actual_zephyr_subs = [cls for (cls, _, _) in zephyr._z.getSubscriptions()]
except OSError:
logger.exception("Error getting current Zephyr subscriptions")
# Don't add anything to current_zephyr_subs so that we'll
# retry the next time we check for streams to subscribe to
# (within 15 seconds).
return
finally:
zephyr_ctypes.ZFlushSubscriptions()
for (cls, instance, recipient) in subs:
if cls not in actual_zephyr_subs:
logger.error(f"Zephyr failed to subscribe us to {cls}; will retry")
# We'll retry automatically when we next check for
# streams to subscribe to (within 15 seconds), but
# it's worth doing 1 retry immediately to avoid
# missing 15 seconds of messages on the affected
# classes
zephyr_ctypes.ZSubscribeTo(
(zephyr_ctypes.ZSubscription_t * 1)(
zephyr_ctypes.ZSubscription_t(
zsub_class=cls.encode(),
zsub_classinst=instance.encode(),
zsub_recipient=recipient.encode(),
)
),
1,
0,
)
try:
# We'll retry automatically when we next check for
# streams to subscribe to (within 15 seconds), but
# it's worth doing 1 retry immediately to avoid
# missing 15 seconds of messages on the affected
# classes
zephyr._z.sub(cls, instance, recipient)
except OSError:
pass
else:
current_zephyr_subs.add(cls)
@ -291,8 +246,8 @@ def maybe_restart_mirroring_script() -> None:
logger.warning("zephyr mirroring script has been updated; restarting...")
maybe_kill_child()
try:
zephyr_ctypes.check(zephyr_ctypes.ZCancelSubscriptions(0))
except zephyr_ctypes.ZephyrError:
zephyr._z.cancelSubs()
except OSError:
# We don't care whether we failed to cancel subs properly, but we should log it
logger.exception("")
backoff = RandomExponentialBackoff(
@ -308,29 +263,27 @@ def maybe_restart_mirroring_script() -> None:
raise Exception("Failed to reload too many times, aborting!")
def process_loop(zulip_queue: "Queue[ZephyrDict]", log: Optional[IO[str]]) -> NoReturn:
def process_loop(log: Optional[IO[str]]) -> NoReturn:
restart_check_count = 0
last_check_time = time.time()
recieve_backoff = RandomExponentialBackoff()
while True:
select.select([zephyr_ctypes.ZGetFD()], [], [], 15)
select.select([zephyr._z.getFD()], [], [], 15)
try:
process_backoff = RandomExponentialBackoff()
# Fetch notices from the queue until its empty
while zephyr_ctypes.ZPending() != 0:
notice = zephyr_ctypes.ZNotice_t()
sender = zephyr_ctypes.sockaddr_in()
zephyr_ctypes.check(zephyr_ctypes.ZReceiveNotice(byref(notice), byref(sender)))
while True:
notice = zephyr.receive(block=False)
recieve_backoff.succeed()
if notice is None:
break
try:
recieve_backoff.succeed()
process_notice(notice, zulip_queue, log)
process_notice(notice, log)
process_backoff.succeed()
except zephyr_ctypes.ZephyrError:
except Exception:
logger.exception("Error relaying zephyr:")
process_backoff.fail()
finally:
zephyr_ctypes.ZFreeNotice(byref(notice))
except zephyr_ctypes.ZephyrError:
except Exception:
logger.exception("Error checking for new zephyrs:")
recieve_backoff.fail()
continue
@ -442,47 +395,38 @@ def decrypt_zephyr(zephyr_class: str, instance: str, body: str) -> str:
return decrypted
def process_notice(
notice: zephyr_ctypes.ZNotice_t, zulip_queue: "Queue[ZephyrDict]", log: Optional[IO[str]]
) -> None:
assert notice.z_sender is not None
(zsig, body) = parse_zephyr_body(
notice.z_message[: notice.z_message_len].decode(errors="replace"),
notice.z_default_format.decode(errors="replace"),
)
def process_notice(notice: "zephyr.ZNotice", log: Optional[IO[str]]) -> None:
assert notice.sender is not None
(zsig, body) = parse_zephyr_body(notice.message, notice.format)
is_personal = False
is_huddle = False
if notice.z_opcode == b"PING":
if notice.opcode == "PING":
# skip PING messages
return
zephyr_class = notice.z_class.decode()
zephyr_instance = notice.z_class_inst.decode()
zephyr_sender = notice.z_sender.decode()
zephyr_class = notice.cls.lower()
if zephyr_class.lower() == options.nagios_class:
if zephyr_class == options.nagios_class:
# Mark that we got the message and proceed
with open(options.nagios_path, "w") as f:
f.write("0\n")
return
if notice.z_recipient != b"":
if notice.recipient != "":
is_personal = True
# Drop messages not to the listed subscriptions
if is_personal and not options.forward_personals:
return
if (zephyr_class.lower() not in current_zephyr_subs) and not is_personal:
logger.debug(f"Skipping ... {zephyr_class}/{zephyr_instance}/{is_personal}")
if (zephyr_class not in current_zephyr_subs) and not is_personal:
logger.debug(f"Skipping ... {zephyr_class}/{notice.instance}/{is_personal}")
return
if notice.z_default_format.startswith(b"Zephyr error: See") or notice.z_default_format.endswith(
b"@(@color(blue))"
):
if notice.format.startswith("Zephyr error: See") or notice.format.endswith("@(@color(blue))"):
logger.debug("Skipping message we got from Zulip!")
return
if (
zephyr_class.lower() == "mail"
and zephyr_instance.lower() == "inbox"
zephyr_class == "mail"
and notice.instance.lower() == "inbox"
and is_personal
and not options.forward_mail_zephyrs
):
@ -496,21 +440,21 @@ def process_notice(
huddle_recipients = [
to_zulip_username(x.strip()) for x in body.split("\n")[0][4:].split()
]
if zephyr_sender not in huddle_recipients:
huddle_recipients.append(to_zulip_username(zephyr_sender))
if notice.sender not in huddle_recipients:
huddle_recipients.append(to_zulip_username(notice.sender))
body = body.split("\n", 1)[1]
if (
options.forward_class_messages
and notice.z_opcode is not None
and notice.z_opcode.lower() == b"crypt"
and notice.opcode is not None
and notice.opcode.lower() == "crypt"
):
body = decrypt_zephyr(zephyr_class.lower(), zephyr_instance.lower(), body)
body = decrypt_zephyr(zephyr_class, notice.instance.lower(), body)
zeph: ZephyrDict
zeph = {
"time": str(notice.z_time.tv_sec + notice.z_time.tv_usec / 1e6),
"sender": zephyr_sender,
"time": str(notice.time),
"sender": notice.sender,
"zsig": zsig, # logged here but not used by app
"content": body,
}
@ -518,47 +462,46 @@ def process_notice(
zeph["type"] = "private"
zeph["recipient"] = huddle_recipients
elif is_personal:
assert notice.z_recipient is not None
assert notice.recipient is not None
zeph["type"] = "private"
zeph["recipient"] = to_zulip_username(notice.z_recipient.decode())
zeph["recipient"] = to_zulip_username(notice.recipient)
else:
zeph["type"] = "stream"
zeph["stream"] = zephyr_class.lower()
if zephyr_instance.strip() != "":
zeph["subject"] = zephyr_instance
zeph["stream"] = zephyr_class
if notice.instance.strip() != "":
zeph["subject"] = notice.instance
else:
zeph["subject"] = f'(instance "{zephyr_instance}")'
zeph["subject"] = f'(instance "{notice.instance}")'
# Add instances in for instanced personals
if is_personal:
if zephyr_class.lower() != "message" and zephyr_instance.lower() != "personal":
heading = f"[-c {zephyr_class} -i {zephyr_instance}]\n"
elif zephyr_class.lower() != "message":
heading = f"[-c {zephyr_class}]\n"
elif zephyr_instance.lower() != "personal":
heading = f"[-i {zephyr_instance}]\n"
if notice.cls.lower() != "message" and notice.instance.lower() != "personal":
heading = f"[-c {notice.cls} -i {notice.instance}]\n"
elif notice.cls.lower() != "message":
heading = f"[-c {notice.cls}]\n"
elif notice.instance.lower() != "personal":
heading = f"[-i {notice.instance}]\n"
else:
heading = ""
zeph["content"] = heading + zeph["content"]
logger.info(f"Received a message on {zephyr_class}/{zephyr_instance} from {zephyr_sender}...")
logger.info(f"Received a message on {zephyr_class}/{notice.instance} from {notice.sender}...")
if log is not None:
log.write(json.dumps(zeph) + "\n")
log.flush()
zulip_queue.put(zeph)
def send_zulip_worker(zulip_queue: "Queue[ZephyrDict]", zulip_client: zulip.Client) -> None:
while True:
zeph = zulip_queue.get()
if os.fork() == 0:
global CURRENT_STATE
CURRENT_STATE = States.ChildSending
# Actually send the message in a child process, to avoid blocking.
try:
res = send_zulip(zulip_client, zeph)
res = send_zulip(zeph)
if res.get("result") != "success":
logger.error(f"Error relaying zephyr:\n{zeph}\n{res}")
except Exception:
logger.exception("Error relaying zephyr:")
zulip_queue.task_done()
finally:
os._exit(0)
def quit_failed_initialization(message: str) -> str:
@ -571,14 +514,12 @@ def zephyr_init_autoretry() -> None:
backoff = zulip.RandomExponentialBackoff()
while backoff.keep_going():
try:
# ZCancelSubscriptions sometimes gets a SERVNAK from the server
zephyr_ctypes.check(zephyr_ctypes.ZInitialize())
zephyr_port = c_ushort()
zephyr_ctypes.check(zephyr_ctypes.ZOpenPort(byref(zephyr_port)))
zephyr_ctypes.check(zephyr_ctypes.ZCancelSubscriptions(0))
# zephyr.init() tries to clear old subscriptions, and thus
# sometimes gets a SERVNAK from the server
zephyr.init()
backoff.succeed()
return
except zephyr_ctypes.ZephyrError:
except OSError:
logger.exception("Error initializing Zephyr library (retrying). Traceback:")
backoff.fail()
@ -591,10 +532,11 @@ def zephyr_load_session_autoretry(session_path: str) -> None:
try:
with open(session_path, "rb") as f:
session = f.read()
zephyr_ctypes.check(zephyr_ctypes.ZInitialize())
zephyr_ctypes.check(zephyr_ctypes.ZLoadSession(session, len(session)))
zephyr._z.initialize()
zephyr._z.load_session(session)
zephyr.__inited = True
return
except zephyr_ctypes.ZephyrError:
except OSError:
logger.exception("Error loading saved Zephyr session (retrying). Traceback:")
backoff.fail()
@ -602,26 +544,13 @@ def zephyr_load_session_autoretry(session_path: str) -> None:
def zephyr_subscribe_autoretry(sub: Tuple[str, str, str]) -> None:
cls, instance, recipient = sub
backoff = zulip.RandomExponentialBackoff()
while backoff.keep_going():
try:
zephyr_ctypes.check(
zephyr_ctypes.ZSubscribeTo(
(zephyr_ctypes.ZSubscription_t * 1)(
zephyr_ctypes.ZSubscription_t(
zsub_class=cls.encode(),
zsub_classinst=instance.encode(),
zsub_recipient=recipient.encode(),
)
),
1,
0,
)
)
zephyr.Subscriptions().add(sub)
backoff.succeed()
return
except zephyr_ctypes.ZephyrError:
except OSError:
# Probably a SERVNAK from the zephyr server, but log the
# traceback just in case it's something else
logger.exception("Error subscribing to personals (retrying). Traceback:")
@ -631,8 +560,6 @@ def zephyr_subscribe_autoretry(sub: Tuple[str, str, str]) -> None:
def zephyr_to_zulip(options: optparse.Values) -> None:
zulip_client = make_zulip_client()
if options.use_sessions and os.path.exists(options.session_path):
logger.info("Loading old session")
zephyr_load_session_autoretry(options.session_path)
@ -648,14 +575,8 @@ def zephyr_to_zulip(options: optparse.Values) -> None:
if options.nagios_class:
zephyr_subscribe_autoretry((options.nagios_class, "*", "*"))
if options.use_sessions:
buf = POINTER(c_char)()
buf_len = c_int()
zephyr_ctypes.check(zephyr_ctypes.ZDumpSession(byref(buf), byref(buf_len)))
try:
with open(options.session_path, "wb") as f:
f.write(buf[: buf_len.value]) # type: ignore[arg-type] # bytes, but mypy infers List[c_char]
finally:
zephyr_ctypes.free(buf)
with open(options.session_path, "wb") as f:
f.write(zephyr._z.dump_session())
if options.logs_to_resend is not None:
with open(options.logs_to_resend) as log:
@ -672,22 +593,18 @@ def zephyr_to_zulip(options: optparse.Values) -> None:
"sending saved message to %s from %s..."
% (zeph.get("stream", zeph.get("recipient")), zeph["sender"])
)
send_zulip(zulip_client, zeph)
send_zulip(zeph)
except Exception:
logger.exception("Could not send saved zephyr:")
time.sleep(2)
logger.info("Successfully initialized; Starting receive loop.")
# Actually send the messages in a thread, to avoid blocking.
zulip_queue: "Queue[ZephyrDict]" = Queue()
Thread(target=lambda: send_zulip_worker(zulip_queue, zulip_client)).start()
if options.resend_log_path is not None:
with open(options.resend_log_path, "a") as log:
process_loop(zulip_queue, log)
process_loop(log)
else:
process_loop(zulip_queue, None)
process_loop(None)
def send_zephyr(zwrite_args: List[str], content: str) -> Tuple[int, str]:
@ -758,7 +675,7 @@ def zcrypt_encrypt_content(zephyr_class: str, instance: str, content: str) -> Op
return encrypted
def forward_to_zephyr(message: Dict[str, Any], zulip_client: zulip.Client) -> None:
def forward_to_zephyr(message: Dict[str, Any]) -> None:
# 'Any' can be of any type of text
support_heading = "Hi there! This is an automated message from Zulip."
support_closing = """If you have any questions, please be in touch through the \
@ -832,7 +749,6 @@ Feedback button or at support@zulip.com."""
result = zcrypt_encrypt_content(zephyr_class, instance, wrapped_content)
if result is None:
send_error_zulip(
zulip_client,
"""%s
Your Zulip-Zephyr mirror bot was unable to forward that last message \
@ -842,7 +758,7 @@ key (perhaps because your AFS tokens expired). That means that while \
Zulip users (like you) received it, Zephyr users did not.
%s"""
% (support_heading, support_closing),
% (support_heading, support_closing)
)
return
@ -859,7 +775,6 @@ Zulip users (like you) received it, Zephyr users did not.
return
elif code == 0:
send_error_zulip(
zulip_client,
"""%s
Your last message was successfully mirrored to zephyr, but zwrite \
@ -868,7 +783,7 @@ returned the following warning:
%s
%s"""
% (support_heading, stderr, support_closing),
% (support_heading, stderr, support_closing)
)
return
elif code != 0 and (
@ -882,7 +797,6 @@ returned the following warning:
if options.ignore_expired_tickets:
return
send_error_zulip(
zulip_client,
"""%s
Your last message was forwarded from Zulip to Zephyr unauthenticated, \
@ -892,7 +806,7 @@ are running the Zulip-Zephyr mirroring bot, so we can send \
authenticated Zephyr messages for you again.
%s"""
% (support_heading, support_closing),
% (support_heading, support_closing)
)
return
@ -900,7 +814,6 @@ authenticated Zephyr messages for you again.
# probably because the recipient isn't subscribed to personals,
# but regardless, we should just notify the user.
send_error_zulip(
zulip_client,
"""%s
Your Zulip-Zephyr mirror bot was unable to forward that last message \
@ -910,12 +823,12 @@ received it, Zephyr users did not. The error message from zwrite was:
%s
%s"""
% (support_heading, stderr, support_closing),
% (support_heading, stderr, support_closing)
)
return
def maybe_forward_to_zephyr(message: Dict[str, Any], zulip_client: zulip.Client) -> None:
def maybe_forward_to_zephyr(message: Dict[str, Any]) -> None:
# The key string can be used to direct any type of text.
if message["sender_email"] == zulip_account_email:
if not (
@ -938,7 +851,7 @@ def maybe_forward_to_zephyr(message: Dict[str, Any], zulip_client: zulip.Client)
)
return
try:
forward_to_zephyr(message, zulip_client)
forward_to_zephyr(message)
except Exception:
# Don't let an exception forwarding one message crash the
# whole process
@ -946,16 +859,12 @@ def maybe_forward_to_zephyr(message: Dict[str, Any], zulip_client: zulip.Client)
def zulip_to_zephyr(options: optparse.Values) -> NoReturn:
zulip_client = make_zulip_client()
# Sync messages from zulip to zephyr
logger.info("Starting syncing messages.")
backoff = RandomExponentialBackoff(timeout_success_equivalent=120)
while True:
try:
zulip_client.call_on_each_message(
lambda message: maybe_forward_to_zephyr(message, zulip_client)
)
zulip_client.call_on_each_message(maybe_forward_to_zephyr)
except Exception:
logger.exception("Error syncing messages:")
backoff.fail()
@ -977,8 +886,6 @@ def subscribed_to_mail_messages() -> bool:
def add_zulip_subscriptions(verbose: bool) -> None:
zulip_client = make_zulip_client()
zephyr_subscriptions = set()
skipped = set()
for (cls, instance, recipient) in parse_zephyr_subs(verbose=verbose):
@ -1239,14 +1146,14 @@ def parse_args() -> Tuple[optparse.Values, List[str]]:
def die_gracefully(signal: int, frame: FrameType) -> None:
if CURRENT_STATE == States.ZulipToZephyr:
if CURRENT_STATE == States.ZulipToZephyr or CURRENT_STATE == States.ChildSending:
# this is a child process, so we want os._exit (no clean-up necessary)
os._exit(1)
if CURRENT_STATE == States.ZephyrToZulip and not options.use_sessions:
try:
# zephyr=>zulip processes may have added subs, so run ZCancelSubscriptions
zephyr_ctypes.check(zephyr_ctypes.ZCancelSubscriptions(0))
# zephyr=>zulip processes may have added subs, so run cancelSubs
zephyr._z.cancelSubs()
except OSError:
# We don't care whether we failed to cancel subs properly, but we should log it
logger.exception("")
@ -1300,6 +1207,15 @@ or specify the --api-key-file option."""
sys.exit(1)
zulip_account_email = options.user + "@mit.edu"
import zulip
zulip_client = zulip.Client(
email=zulip_account_email,
api_key=api_key,
verbose=True,
client="zephyr_mirror",
site=options.site,
)
start_time = time.time()
@ -1358,6 +1274,8 @@ or specify the --api-key-file option."""
child_pid = None
CURRENT_STATE = States.ZephyrToZulip
import zephyr
logger_name = "zephyr=>zulip"
if options.shard is not None:
logger_name += f"({options.shard})"

View file

@ -42,12 +42,12 @@ setup(
"License :: OSI Approved :: Apache Software License",
"Topic :: Communications :: Chat",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
],
python_requires=">=3.7",
python_requires=">=3.6",
url="https://www.zulip.org/",
project_urls={
"Source": "https://github.com/zulip/python-zulip-api/",

View file

@ -34,12 +34,12 @@ setup(
"License :: OSI Approved :: Apache Software License",
"Topic :: Communications :: Chat",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
],
python_requires=">=3.7",
python_requires=">=3.6",
url="https://www.zulip.org/",
project_urls={
"Source": "https://github.com/zulip/python-zulip-api/",

View file

@ -1 +1,2 @@
chess==1.*
python-chess==0.31.* ; python_version < '3.7'
chess==1.* ; python_version >= '3.7'

View file

@ -22,12 +22,12 @@ setup(
"License :: OSI Approved :: Apache Software License",
"Topic :: Communications :: Chat",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
],
python_requires=">=3.7",
python_requires=">=3.6",
url="https://www.zulip.org/",
project_urls={
"Source": "https://github.com/zulip/python-zulip-api/",