Compare commits
10 commits
63c259b2bc
...
02586f1d34
Author | SHA1 | Date | |
---|---|---|---|
02586f1d34 | |||
7831d979c9 | |||
c94da617ed | |||
4a3d225a38 | |||
582e9733a9 | |||
eef02fbb76 | |||
41ec1a9a29 | |||
92120914f8 | |||
a534446315 | |||
56f805a5d7 |
44
.github/workflows/zulip-ci.yml
vendored
44
.github/workflows/zulip-ci.yml
vendored
|
@ -17,33 +17,24 @@ jobs:
|
|||
fail-fast: false
|
||||
matrix:
|
||||
include:
|
||||
# Base images are built using `tools/ci/Dockerfile.prod.template`.
|
||||
# The comments at the top explain how to build and upload these images.
|
||||
# Bionic ships with Python 3.6.
|
||||
- docker_image: zulip/ci:bionic
|
||||
name: Ubuntu 18.04 Bionic (Python 3.6, backend)
|
||||
os: bionic
|
||||
is_bionic: true
|
||||
include_frontend_tests: false
|
||||
# Configure this test to run with the Zulip 3.2 release.
|
||||
legacy_client_interface: 3
|
||||
server_version: refs/tags/3.2
|
||||
# Focal ships with Python 3.8.2.
|
||||
# Focal ships with Python 3.8.10.
|
||||
- docker_image: zulip/ci:focal
|
||||
name: Ubuntu 20.04 Focal (Python 3.8, backend)
|
||||
name: Ubuntu 20.04 (Python 3.8, backend)
|
||||
os: focal
|
||||
is_focal: true
|
||||
include_frontend_tests: false
|
||||
legacy_client_interface: 4
|
||||
server_version: refs/tags/4.0
|
||||
legacy_client_interface: "3"
|
||||
server_version: refs/tags/3.2
|
||||
# Bullseye ships with Python 3.9.2.
|
||||
- docker_image: zulip/ci:bullseye
|
||||
name: Debian 11 Bullseye (Python 3.9, backend)
|
||||
name: Debian 11 (Python 3.9, backend)
|
||||
os: bullseye
|
||||
is_bullseye: true
|
||||
include_frontend_tests: false
|
||||
legacy_client_interface: 4
|
||||
legacy_client_interface: "4"
|
||||
server_version: refs/tags/4.0
|
||||
# Ubuntu 22.04 ships with Python 3.10.6.
|
||||
- docker_image: zulip/ci:jammy
|
||||
name: Ubuntu 22.04 (Python 3.10, backend)
|
||||
os: jammy
|
||||
legacy_client_interface: "6"
|
||||
server_version: refs/tags/6.0
|
||||
|
||||
runs-on: ubuntu-latest
|
||||
name: ${{ matrix.name }} (Zulip ${{matrix.server_version}})
|
||||
|
@ -58,25 +49,18 @@ jobs:
|
|||
HOME: /home/github/
|
||||
|
||||
steps:
|
||||
- name: 'Checkout python-zulip-api'
|
||||
- name: "Check out python-zulip-api"
|
||||
uses: actions/checkout@v2
|
||||
with:
|
||||
path: api
|
||||
|
||||
- name: 'Checkout Zulip server ${{ matrix.server_version }}'
|
||||
- name: "Check out Zulip server ${{ matrix.server_version }}"
|
||||
uses: actions/checkout@v2
|
||||
with:
|
||||
repository: zulip/zulip
|
||||
ref: ${{ matrix.server_version }}
|
||||
path: server
|
||||
|
||||
- name: Do Bionic hack
|
||||
if: ${{ matrix.is_bionic }}
|
||||
run: |
|
||||
# Temporary hack till `sudo service redis-server start` gets fixes in Bionic. See
|
||||
# https://chat.zulip.org/#narrow/stream/3-backend/topic/Ubuntu.20bionic.20CircleCI
|
||||
sudo sed -i '/^bind/s/bind.*/bind 0.0.0.0/' /etc/redis/redis.conf
|
||||
|
||||
- name: Install dependencies
|
||||
run: |
|
||||
cd server
|
||||
|
|
9
.github/workflows/zulip-tests.yml
vendored
9
.github/workflows/zulip-tests.yml
vendored
|
@ -13,10 +13,10 @@ jobs:
|
|||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
|
||||
- name: Setup Python 3.6
|
||||
- name: Set up Python 3.7
|
||||
uses: actions/setup-python@v2
|
||||
with:
|
||||
python-version: 3.6
|
||||
python-version: "3.7"
|
||||
|
||||
- name: Install dependencies
|
||||
run: tools/provision --force
|
||||
|
@ -32,10 +32,7 @@ jobs:
|
|||
fail-fast: false
|
||||
matrix:
|
||||
os: [ubuntu-latest, windows-latest]
|
||||
python-version: [3.6, 3.7, 3.8, 3.9]
|
||||
exclude:
|
||||
- os: windows-latest
|
||||
python-version: 3.6 # cryptography install fails on Windows with python 3.6 since pip is quite old.
|
||||
python-version: ["3.7", "3.8", "3.9", "3.10"]
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
|
|
2
.mailmap
2
.mailmap
|
@ -1,3 +1,4 @@
|
|||
Aman Agrawal <f2016561@pilani.bits-pilani.ac.in>
|
||||
Anders Kaseorg <anders@zulip.com> <anders@zulipchat.com>
|
||||
Anders Kaseorg <anders@zulip.com> <andersk@mit.edu>
|
||||
Rishi Gupta <rishig@zulip.com> <rishig@zulipchat.com>
|
||||
|
@ -7,3 +8,4 @@ Steve Howell <showell@zulip.com> <steve@zulip.com>
|
|||
Tim Abbott <tabbott@zulip.com> <tabbott@humbughq.com>
|
||||
Tim Abbott <tabbott@zulip.com> <tabbott@mit.edu>
|
||||
Tim Abbott <tabbott@zulip.com> <tabbott@zulipchat.com>
|
||||
Zixuan James Li <p359101898@gmail.com> <359101898@qq.com>
|
||||
|
|
|
@ -1,8 +1,14 @@
|
|||
[tool.black]
|
||||
line-length = 100
|
||||
target-version = ["py36"]
|
||||
target-version = ["py37"]
|
||||
|
||||
[tool.isort]
|
||||
src_paths = ["tools", "zulip", "zulip_bots", "zulip_botserver"]
|
||||
src_paths = [
|
||||
"tools",
|
||||
"zulip",
|
||||
"zulip/integrations/zephyr",
|
||||
"zulip_bots",
|
||||
"zulip_botserver",
|
||||
]
|
||||
profile = "black"
|
||||
line_length = 100
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
The [Zulip API](https://zulip.com/api) Python bindings require the
|
||||
following dependencies:
|
||||
|
||||
* **Python (version >= 3.6)**
|
||||
* **Python (version >= 3.7)**
|
||||
* requests (version >= 0.12.1)
|
||||
|
||||
**Note**: If you'd like to use the Zulip bindings with Python 2, we
|
||||
|
|
|
@ -14,9 +14,6 @@ from concurrent.futures import ThreadPoolExecutor
|
|||
from io import BytesIO
|
||||
from typing import Any, Dict, List, Match, Optional, Set, Tuple, Type, Union
|
||||
|
||||
if os.name != "nt":
|
||||
import magic
|
||||
import magic.compat
|
||||
import nio
|
||||
from nio.responses import (
|
||||
DownloadError,
|
||||
|
@ -352,17 +349,15 @@ class ZulipToMatrix:
|
|||
if result["result"] != "success":
|
||||
success = False
|
||||
continue
|
||||
|
||||
try:
|
||||
file_content: bytes = urllib.request.urlopen(self.server_url + result["url"]).read()
|
||||
with urllib.request.urlopen(self.server_url + result["url"]) as response:
|
||||
file_content: bytes = response.read()
|
||||
mimetype: str = response.headers.get_content_type()
|
||||
except Exception:
|
||||
success = False
|
||||
continue
|
||||
|
||||
mimetype: str
|
||||
if os.name == "nt":
|
||||
mimetype = "m.file"
|
||||
else:
|
||||
mimetype = magic.from_buffer(file_content, mime=True)
|
||||
filename: str = file.split("/")[-1]
|
||||
|
||||
response, _ = await self.matrix_client.upload(
|
||||
|
|
|
@ -1,3 +1 @@
|
|||
matrix-nio
|
||||
python-magic
|
||||
python-magic-bin; platform_system == "Windows"
|
||||
|
|
|
@ -41,7 +41,7 @@ topic = matrix
|
|||
ZULIP_MESSAGE_TEMPLATE: str = "**{username}** [{uid}]: {message}"
|
||||
|
||||
|
||||
# For Python 3.6 compatibility.
|
||||
# For Python 3.7 compatibility.
|
||||
# (Since 3.8, there is unittest.IsolatedAsyncioTestCase!)
|
||||
# source: https://stackoverflow.com/a/46324983
|
||||
def async_test(coro: Callable[..., Awaitable[Any]]) -> Callable[..., Any]:
|
||||
|
|
|
@ -3,12 +3,20 @@ config = {
|
|||
"email": "zulip-bot@email.com",
|
||||
"api_key": "put api key here",
|
||||
"site": "https://chat.zulip.org",
|
||||
"stream": "test here",
|
||||
"topic": "<- slack-bridge",
|
||||
},
|
||||
"slack": {
|
||||
"username": "slack_username",
|
||||
"token": "xoxb-your-slack-token",
|
||||
"channel": "C5Z5N7R8A -- must be channel id",
|
||||
},
|
||||
# Mapping between Slack channels and Zulip stream-topic's.
|
||||
# You can specify multiple pairs.
|
||||
"channel_mapping": {
|
||||
# Slack channel; must be channel ID
|
||||
"C5Z5N7R8A": {
|
||||
# Zulip stream
|
||||
"stream": "test here",
|
||||
# Zulip topic
|
||||
"topic": "<- slack-bridge",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
|
|
@ -5,7 +5,7 @@ import os
|
|||
import sys
|
||||
import threading
|
||||
import traceback
|
||||
from typing import Any, Callable, Dict
|
||||
from typing import Any, Callable, Dict, Optional, Tuple
|
||||
|
||||
import bridge_with_slack_config
|
||||
import slack_sdk
|
||||
|
@ -17,18 +17,28 @@ import zulip
|
|||
ZULIP_MESSAGE_TEMPLATE = "**{username}**: {message}"
|
||||
SLACK_MESSAGE_TEMPLATE = "<{username}> {message}"
|
||||
|
||||
StreamTopicT = Tuple[str, str]
|
||||
|
||||
def check_zulip_message_validity(msg: Dict[str, Any], config: Dict[str, Any]) -> bool:
|
||||
|
||||
def get_slack_channel_for_zulip_message(
|
||||
msg: Dict[str, Any], zulip_to_slack_map: Dict[StreamTopicT, Any], bot_email: str
|
||||
) -> Optional[str]:
|
||||
is_a_stream = msg["type"] == "stream"
|
||||
in_the_specified_stream = msg["display_recipient"] == config["stream"]
|
||||
at_the_specified_subject = msg["subject"] == config["topic"]
|
||||
if not is_a_stream:
|
||||
return None
|
||||
|
||||
# We do this to identify the messages generated from Matrix -> Zulip
|
||||
# and we make sure we don't forward it again to the Matrix.
|
||||
not_from_zulip_bot = msg["sender_email"] != config["email"]
|
||||
if is_a_stream and not_from_zulip_bot and in_the_specified_stream and at_the_specified_subject:
|
||||
return True
|
||||
return False
|
||||
stream_name = msg["display_recipient"]
|
||||
topic_name = msg["subject"]
|
||||
stream_topic: StreamTopicT = (stream_name, topic_name)
|
||||
if stream_topic not in zulip_to_slack_map:
|
||||
return None
|
||||
|
||||
# We do this to identify the messages generated from Slack -> Zulip
|
||||
# and we make sure we don't forward it again to the Slack.
|
||||
from_zulip_bot = msg["sender_email"] == bot_email
|
||||
if from_zulip_bot:
|
||||
return None
|
||||
return zulip_to_slack_map[stream_topic]
|
||||
|
||||
|
||||
class SlackBridge:
|
||||
|
@ -37,14 +47,17 @@ class SlackBridge:
|
|||
self.zulip_config = config["zulip"]
|
||||
self.slack_config = config["slack"]
|
||||
|
||||
self.slack_to_zulip_map: Dict[str, Dict[str, str]] = config["channel_mapping"]
|
||||
self.zulip_to_slack_map: Dict[StreamTopicT, str] = {
|
||||
(z["stream"], z["topic"]): s for s, z in config["channel_mapping"].items()
|
||||
}
|
||||
|
||||
# zulip-specific
|
||||
self.zulip_client = zulip.Client(
|
||||
email=self.zulip_config["email"],
|
||||
api_key=self.zulip_config["api_key"],
|
||||
site=self.zulip_config["site"],
|
||||
)
|
||||
self.zulip_stream = self.zulip_config["stream"]
|
||||
self.zulip_subject = self.zulip_config["topic"]
|
||||
|
||||
# slack-specific
|
||||
self.channel = self.slack_config["channel"]
|
||||
|
@ -68,14 +81,16 @@ class SlackBridge:
|
|||
|
||||
def zulip_to_slack(self) -> Callable[[Dict[str, Any]], None]:
|
||||
def _zulip_to_slack(msg: Dict[str, Any]) -> None:
|
||||
message_valid = check_zulip_message_validity(msg, self.zulip_config)
|
||||
if message_valid:
|
||||
slack_channel = get_slack_channel_for_zulip_message(
|
||||
msg, self.zulip_to_slack_map, self.zulip_config["email"]
|
||||
)
|
||||
if slack_channel is not None:
|
||||
self.wrap_slack_mention_with_bracket(msg)
|
||||
slack_text = SLACK_MESSAGE_TEMPLATE.format(
|
||||
username=msg["sender_full_name"], message=msg["content"]
|
||||
)
|
||||
self.slack_webclient.chat_postMessage(
|
||||
channel=self.channel,
|
||||
channel=slack_channel,
|
||||
text=slack_text,
|
||||
)
|
||||
|
||||
|
@ -91,7 +106,7 @@ class SlackBridge:
|
|||
|
||||
@rtm.on("message")
|
||||
def slack_to_zulip(client: RTMClient, event: Dict[str, Any]) -> None:
|
||||
if event["channel"] != self.channel:
|
||||
if event["channel"] not in self.slack_to_zulip_map:
|
||||
return
|
||||
user_id = event["user"]
|
||||
user = self.slack_id_to_name[user_id]
|
||||
|
@ -100,8 +115,12 @@ class SlackBridge:
|
|||
return
|
||||
self.replace_slack_id_with_name(event)
|
||||
content = ZULIP_MESSAGE_TEMPLATE.format(username=user, message=event["text"])
|
||||
zulip_endpoint = self.slack_to_zulip_map[event["channel"]]
|
||||
msg_data = dict(
|
||||
type="stream", to=self.zulip_stream, subject=self.zulip_subject, content=content
|
||||
type="stream",
|
||||
to=zulip_endpoint["stream"],
|
||||
subject=zulip_endpoint["topic"],
|
||||
content=content,
|
||||
)
|
||||
self.zulip_client.send_message(msg_data)
|
||||
|
||||
|
@ -118,10 +137,16 @@ if __name__ == "__main__":
|
|||
sys.path.append(os.path.join(os.path.dirname(__file__), ".."))
|
||||
parser = argparse.ArgumentParser(usage=usage)
|
||||
|
||||
print("Starting slack mirroring bot")
|
||||
print("MAKE SURE THE BOT IS SUBSCRIBED TO THE RELEVANT ZULIP STREAM")
|
||||
config: Dict[str, Any] = bridge_with_slack_config.config
|
||||
if "channel_mapping" not in config:
|
||||
print(
|
||||
'The key "channel_mapping" is not found in bridge_with_slack_config.py.\n'
|
||||
"Your config file may be outdated."
|
||||
)
|
||||
exit(1)
|
||||
|
||||
config = bridge_with_slack_config.config
|
||||
print("Starting slack mirroring bot")
|
||||
print("MAKE SURE THE BOT IS SUBSCRIBED TO THE RELEVANT ZULIP STREAM(S) & SLACK CHANNEL(S)!")
|
||||
|
||||
# We have to define rtm outside of SlackBridge because the rtm variable is used as a method decorator.
|
||||
rtm = RTMClient(token=config["slack"]["token"])
|
||||
|
|
|
@ -6,10 +6,10 @@ import random
|
|||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
from ctypes import byref, c_int, c_ushort
|
||||
from typing import Dict, List, Set, Tuple
|
||||
|
||||
import zephyr
|
||||
|
||||
import zephyr_ctypes
|
||||
import zulip
|
||||
|
||||
parser = optparse.OptionParser()
|
||||
|
@ -136,9 +136,43 @@ for (stream, test) in test_streams:
|
|||
actually_subscribed = False
|
||||
for tries in range(10):
|
||||
try:
|
||||
zephyr.init()
|
||||
zephyr._z.subAll(zephyr_subs_to_add)
|
||||
zephyr_subs = zephyr._z.getSubscriptions()
|
||||
zephyr_ctypes.check(zephyr_ctypes.ZInitialize())
|
||||
zephyr_port = c_ushort()
|
||||
zephyr_ctypes.check(zephyr_ctypes.ZOpenPort(byref(zephyr_port)))
|
||||
zephyr_ctypes.check(zephyr_ctypes.ZCancelSubscriptions(0))
|
||||
|
||||
zephyr_ctypes.check(
|
||||
zephyr_ctypes.ZSubscribeTo(
|
||||
(zephyr_ctypes.ZSubscription_t * len(zephyr_subs_to_add))(
|
||||
*(
|
||||
zephyr_ctypes.ZSubscription_t(
|
||||
zsub_class=cls.encode(),
|
||||
zsub_classinst=instance.encode(),
|
||||
zsub_recipient=recipient.encode(),
|
||||
)
|
||||
for cls, instance, recipient in zephyr_subs_to_add
|
||||
)
|
||||
),
|
||||
len(zephyr_subs_to_add),
|
||||
0,
|
||||
)
|
||||
)
|
||||
|
||||
try:
|
||||
nsubs = c_int()
|
||||
zephyr_ctypes.check(zephyr_ctypes.ZRetrieveSubscriptions(0, byref(nsubs)))
|
||||
zsubs = (zephyr_ctypes.ZSubscription_t * nsubs.value)()
|
||||
zephyr_ctypes.check(zephyr_ctypes.ZGetSubscriptions(zsubs, byref(nsubs)))
|
||||
zephyr_subs = {
|
||||
(
|
||||
zsub.zsub_class.decode(),
|
||||
zsub.zsub_classinst.decode(),
|
||||
zsub.zsub_recipient.decode(),
|
||||
)
|
||||
for zsub in zsubs
|
||||
}
|
||||
finally:
|
||||
zephyr_ctypes.ZFlushSubscriptions()
|
||||
|
||||
missing = 0
|
||||
for elt in zephyr_subs_to_add:
|
||||
|
@ -148,8 +182,8 @@ for tries in range(10):
|
|||
if missing == 0:
|
||||
actually_subscribed = True
|
||||
break
|
||||
except OSError as e:
|
||||
if "SERVNAK received" in e.args:
|
||||
except zephyr_ctypes.ZephyrError as e:
|
||||
if e.code == zephyr_ctypes.ZERR_SERVNAK:
|
||||
logger.error("SERVNAK repeatedly received, punting rest of test")
|
||||
else:
|
||||
logger.exception("Exception subscribing to zephyrs")
|
||||
|
@ -185,15 +219,16 @@ notices = []
|
|||
# receive queue with 30+ messages, which might result in messages
|
||||
# being dropped.
|
||||
def receive_zephyrs() -> None:
|
||||
while True:
|
||||
while zephyr_ctypes.ZPending() != 0:
|
||||
notice = zephyr_ctypes.ZNotice_t()
|
||||
sender = zephyr_ctypes.sockaddr_in()
|
||||
try:
|
||||
notice = zephyr.receive(block=False)
|
||||
except Exception:
|
||||
zephyr_ctypes.check(zephyr_ctypes.ZReceiveNotice(byref(notice), byref(sender)))
|
||||
except zephyr_ctypes.ZephyrError:
|
||||
logging.exception("Exception receiving zephyrs:")
|
||||
notice = None
|
||||
if notice is None:
|
||||
break
|
||||
if notice.opcode != "":
|
||||
if notice.z_opcode != b"":
|
||||
zephyr_ctypes.ZFreeNotice(byref(notice))
|
||||
continue
|
||||
notices.append(notice)
|
||||
|
||||
|
@ -294,10 +329,16 @@ def process_keys(content_list: List[str]) -> Tuple[Dict[str, int], Set[str], Set
|
|||
# The h_foo variables are about the messages we _received_ in Zulip
|
||||
# The z_foo variables are about the messages we _received_ in Zephyr
|
||||
h_contents = [message["content"] for message in messages]
|
||||
z_contents = [notice.message.split("\0")[1] for notice in notices]
|
||||
z_contents = [
|
||||
notice.z_message[: notice.z_message_len].split(b"\0")[1].decode(errors="replace")
|
||||
for notice in notices
|
||||
]
|
||||
(h_key_counts, h_missing_z, h_missing_h, h_duplicates, h_success) = process_keys(h_contents)
|
||||
(z_key_counts, z_missing_z, z_missing_h, z_duplicates, z_success) = process_keys(z_contents)
|
||||
|
||||
for notice in notices:
|
||||
zephyr_ctypes.ZFreeNotice(byref(notice))
|
||||
|
||||
if z_success and h_success:
|
||||
logger.info("Success!")
|
||||
print_status_and_exit(0)
|
||||
|
|
|
@ -18,7 +18,7 @@ api_key_path = f"/home/zulip/api-keys/{program_name}"
|
|||
open(api_key_path, "w").write(api_key + "\n")
|
||||
|
||||
# Setup supervisord configuration
|
||||
supervisor_path = f"/etc/supervisor/conf.d/zulip/{program_name}.conf"
|
||||
supervisor_path = f"/etc/supervisor/conf.d/zmirror/{program_name}.conf"
|
||||
template = os.path.join(os.path.dirname(__file__), "zmirror_private.conf.template")
|
||||
template_data = open(template).read()
|
||||
session_path = f"/home/zulip/zephyr_sessions/{program_name}"
|
||||
|
|
207
zulip/integrations/zephyr/zephyr_ctypes.py
Normal file
207
zulip/integrations/zephyr/zephyr_ctypes.py
Normal file
|
@ -0,0 +1,207 @@
|
|||
from ctypes import (
|
||||
CDLL,
|
||||
CFUNCTYPE,
|
||||
POINTER,
|
||||
Structure,
|
||||
Union,
|
||||
c_char,
|
||||
c_char_p,
|
||||
c_int,
|
||||
c_long,
|
||||
c_uint,
|
||||
c_uint8,
|
||||
c_uint16,
|
||||
c_uint32,
|
||||
c_ushort,
|
||||
c_void_p,
|
||||
)
|
||||
|
||||
libc = CDLL("libc.so.6")
|
||||
com_err = CDLL("libcom_err.so.2")
|
||||
libzephyr = CDLL("libzephyr.so.4")
|
||||
|
||||
|
||||
# --- glibc/bits/sockaddr.h ---
|
||||
|
||||
sa_family_t = c_ushort
|
||||
|
||||
|
||||
# --- glibc/sysdeps/unix/sysv/linux/bits/socket.h ---
|
||||
|
||||
|
||||
class sockaddr(Structure):
|
||||
_fields_ = [
|
||||
("sa_family", sa_family_t),
|
||||
("sa_data", c_char * 14),
|
||||
]
|
||||
|
||||
|
||||
# --- glibc/inet/netinet/in.h ---
|
||||
|
||||
in_port_t = c_uint16
|
||||
in_addr_t = c_uint32
|
||||
|
||||
|
||||
class in_addr(Structure):
|
||||
_fields_ = [
|
||||
("s_addr", in_addr_t),
|
||||
]
|
||||
|
||||
|
||||
class sockaddr_in(Structure):
|
||||
_fields_ = [
|
||||
("sin_family", sa_family_t),
|
||||
("sin_port", in_port_t),
|
||||
("sin_addr", in_addr),
|
||||
("sin_zero", c_uint8 * 8),
|
||||
]
|
||||
|
||||
|
||||
class in6_addr(Structure):
|
||||
_fields_ = [
|
||||
("s6_addr", c_uint8 * 16),
|
||||
]
|
||||
|
||||
|
||||
class sockaddr_in6(Structure):
|
||||
_fields_ = [
|
||||
("sin6_family", sa_family_t),
|
||||
("sin6_port", in_port_t),
|
||||
("sin6_flowinfo", c_uint32),
|
||||
("sin6_addr", in6_addr),
|
||||
("sin6_scope_id", c_uint32),
|
||||
]
|
||||
|
||||
|
||||
# --- glibc/stdlib/stdlib.h ---
|
||||
|
||||
free = CFUNCTYPE(None, c_void_p)(("free", libc))
|
||||
|
||||
|
||||
# --- e2fsprogs/lib/et/com_err.h ---
|
||||
|
||||
error_message = CFUNCTYPE(c_char_p, c_long)(("error_message", com_err))
|
||||
|
||||
|
||||
# --- zephyr/h/zephyr/zephyr.h ---
|
||||
|
||||
Z_MAXOTHERFIELDS = 10
|
||||
|
||||
ZNotice_Kind_t = c_int
|
||||
|
||||
|
||||
class _ZTimeval(Structure):
|
||||
_fields_ = [
|
||||
("tv_sec", c_int),
|
||||
("tv_usec", c_int),
|
||||
]
|
||||
|
||||
|
||||
class ZUnique_Id_t(Structure):
|
||||
_fields_ = [
|
||||
("zuid_addr", in_addr),
|
||||
("tv", _ZTimeval),
|
||||
]
|
||||
|
||||
|
||||
ZChecksum_t = c_uint
|
||||
|
||||
|
||||
class _ZSenderSockaddr(Union):
|
||||
_fields_ = [
|
||||
("sa", sockaddr),
|
||||
("ip4", sockaddr_in),
|
||||
("ip6", sockaddr_in6),
|
||||
]
|
||||
|
||||
|
||||
class ZNotice_t(Structure):
|
||||
_fields_ = [
|
||||
("z_packet", c_char_p),
|
||||
("z_version", c_char_p),
|
||||
("z_kind", ZNotice_Kind_t),
|
||||
("z_uid", ZUnique_Id_t),
|
||||
("z_sender_sockaddr", _ZSenderSockaddr),
|
||||
("z_time", _ZTimeval),
|
||||
("z_port", c_ushort),
|
||||
("z_charset", c_ushort),
|
||||
("z_auth", c_int),
|
||||
("z_checked_auth", c_int),
|
||||
("z_authent_len", c_int),
|
||||
("z_ascii_authent", c_char_p),
|
||||
("z_class", c_char_p),
|
||||
("z_class_inst", c_char_p),
|
||||
("z_opcode", c_char_p),
|
||||
("z_sender", c_char_p),
|
||||
("z_recipient", c_char_p),
|
||||
("z_default_format", c_char_p),
|
||||
("z_multinotice", c_char_p),
|
||||
("z_multiuid", ZUnique_Id_t),
|
||||
("z_checksum", ZChecksum_t),
|
||||
("z_ascii_checksum", c_char_p),
|
||||
("z_num_other_fields", c_int),
|
||||
("z_other_fields", c_char_p * Z_MAXOTHERFIELDS),
|
||||
("z_message", POINTER(c_char)),
|
||||
("z_message_len", c_int),
|
||||
("z_num_hdr_fields", c_uint),
|
||||
("z_hdr_fields", POINTER(c_char_p)),
|
||||
]
|
||||
|
||||
|
||||
class ZSubscription_t(Structure):
|
||||
_fields_ = [
|
||||
("zsub_recipient", c_char_p),
|
||||
("zsub_class", c_char_p),
|
||||
("zsub_classinst", c_char_p),
|
||||
]
|
||||
|
||||
|
||||
Code_t = c_int
|
||||
|
||||
ZInitialize = CFUNCTYPE(Code_t)(("ZInitialize", libzephyr))
|
||||
ZRetrieveSubscriptions = CFUNCTYPE(Code_t, c_ushort, POINTER(c_int))(
|
||||
("ZRetrieveSubscriptions", libzephyr)
|
||||
)
|
||||
ZGetSubscriptions = CFUNCTYPE(Code_t, POINTER(ZSubscription_t), POINTER(c_int))(
|
||||
("ZGetSubscriptions", libzephyr)
|
||||
)
|
||||
ZOpenPort = CFUNCTYPE(Code_t, POINTER(c_ushort))(("ZOpenPort", libzephyr))
|
||||
ZFlushSubscriptions = CFUNCTYPE(Code_t)(("ZFlushSubscriptions", libzephyr))
|
||||
ZFreeNotice = CFUNCTYPE(Code_t, POINTER(ZNotice_t))(("ZFreeNotice", libzephyr))
|
||||
ZSubscribeTo = CFUNCTYPE(Code_t, POINTER(ZSubscription_t), c_int, c_uint)(
|
||||
("ZSubscribeTo", libzephyr)
|
||||
)
|
||||
ZCancelSubscriptions = CFUNCTYPE(Code_t, c_uint)(("ZCancelSubscriptions", libzephyr))
|
||||
ZPending = CFUNCTYPE(c_int)(("ZPending", libzephyr))
|
||||
ZReceiveNotice = CFUNCTYPE(Code_t, POINTER(ZNotice_t), POINTER(sockaddr_in))(
|
||||
("ZReceiveNotice", libzephyr)
|
||||
)
|
||||
ZDumpSession = CFUNCTYPE(Code_t, POINTER(POINTER(c_char)), POINTER(c_int))(
|
||||
("ZDumpSession", libzephyr)
|
||||
)
|
||||
ZLoadSession = CFUNCTYPE(Code_t, POINTER(c_char), c_int)(("ZLoadSession", libzephyr))
|
||||
ZGetFD = CFUNCTYPE(c_int)(("ZGetFD", libzephyr))
|
||||
|
||||
ZERR_NONE = 0
|
||||
|
||||
|
||||
# --- zephyr/lib/zephyr_err.et ---
|
||||
|
||||
ERROR_TABLE_BASE_zeph = -772103680
|
||||
ZERR_SERVNAK = ERROR_TABLE_BASE_zeph + 16
|
||||
|
||||
|
||||
# --- convenience helpers ---
|
||||
|
||||
|
||||
class ZephyrError(Exception):
|
||||
def __init__(self, code: int) -> None:
|
||||
self.code = code
|
||||
|
||||
def __str__(self) -> str:
|
||||
return error_message(self.code).decode()
|
||||
|
||||
|
||||
def check(code: int) -> None:
|
||||
if code != ZERR_NONE:
|
||||
raise ZephyrError(code)
|
|
@ -1,5 +1,6 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
import signal
|
||||
import subprocess
|
||||
|
@ -32,21 +33,22 @@ if options.sync_subscriptions:
|
|||
sys.exit(0)
|
||||
|
||||
if options.forward_class_messages and not options.noshard:
|
||||
# Needed to get access to zephyr.lib.parallel
|
||||
sys.path.append("/home/zulip/zulip")
|
||||
if options.on_startup_command is not None:
|
||||
subprocess.call([options.on_startup_command])
|
||||
from zerver.lib.parallel import run_parallel
|
||||
|
||||
print("Starting parallel zephyr class mirroring bot")
|
||||
jobs = list("0123456789abcdef")
|
||||
shards = list("0123456789abcdef")
|
||||
|
||||
def run_job(shard: str) -> int:
|
||||
subprocess.call(args + [f"--shard={shard}"])
|
||||
return 0
|
||||
async def run_shard(shard: str) -> int:
|
||||
process = await asyncio.create_subprocess_exec(*args, f"--shard={shard}")
|
||||
return await process.wait()
|
||||
|
||||
for (status, job) in run_parallel(run_job, jobs, threads=16):
|
||||
async def run_shards():
|
||||
for coro in asyncio.as_completed(map(run_shard, shards)):
|
||||
await coro
|
||||
print("A mirroring shard died!")
|
||||
|
||||
asyncio.run(run_shards())
|
||||
sys.exit(0)
|
||||
|
||||
backoff = RandomExponentialBackoff(timeout_success_equivalent=300)
|
||||
|
|
|
@ -13,18 +13,23 @@ import sys
|
|||
import tempfile
|
||||
import textwrap
|
||||
import time
|
||||
from ctypes import POINTER, byref, c_char, c_int, c_ushort
|
||||
from queue import Queue
|
||||
from threading import Thread
|
||||
from types import FrameType
|
||||
from typing import IO, Any, Dict, List, NoReturn, Optional, Set, Tuple, Union
|
||||
|
||||
from typing_extensions import Literal, TypedDict
|
||||
|
||||
import zephyr_ctypes
|
||||
import zulip
|
||||
from zulip import RandomExponentialBackoff
|
||||
|
||||
DEFAULT_SITE = "https://api.zulip.com"
|
||||
|
||||
|
||||
class States:
|
||||
Startup, ZulipToZephyr, ZephyrToZulip, ChildSending = list(range(4))
|
||||
Startup, ZulipToZephyr, ZephyrToZulip = list(range(3))
|
||||
|
||||
|
||||
CURRENT_STATE = States.Startup
|
||||
|
@ -32,6 +37,16 @@ CURRENT_STATE = States.Startup
|
|||
logger: logging.Logger
|
||||
|
||||
|
||||
def make_zulip_client() -> zulip.Client:
|
||||
return zulip.Client(
|
||||
email=zulip_account_email,
|
||||
api_key=api_key,
|
||||
verbose=True,
|
||||
client="zephyr_mirror",
|
||||
site=options.site,
|
||||
)
|
||||
|
||||
|
||||
def to_zulip_username(zephyr_username: str) -> str:
|
||||
if "@" in zephyr_username:
|
||||
(user, realm) = zephyr_username.split("@")
|
||||
|
@ -117,7 +132,7 @@ class ZephyrDict(TypedDict, total=False):
|
|||
zsig: str
|
||||
|
||||
|
||||
def send_zulip(zeph: ZephyrDict) -> Dict[str, Any]:
|
||||
def send_zulip(zulip_client: zulip.Client, zeph: ZephyrDict) -> Dict[str, Any]:
|
||||
message: Dict[str, Any]
|
||||
message = {}
|
||||
if options.forward_class_messages:
|
||||
|
@ -151,7 +166,7 @@ def send_zulip(zeph: ZephyrDict) -> Dict[str, Any]:
|
|||
return zulip_client.send_message(message)
|
||||
|
||||
|
||||
def send_error_zulip(error_msg: str) -> None:
|
||||
def send_error_zulip(zulip_client: zulip.Client, error_msg: str) -> None:
|
||||
message = {
|
||||
"type": "private",
|
||||
"sender": zulip_account_email,
|
||||
|
@ -166,8 +181,23 @@ current_zephyr_subs = set()
|
|||
|
||||
def zephyr_bulk_subscribe(subs: List[Tuple[str, str, str]]) -> None:
|
||||
try:
|
||||
zephyr._z.subAll(subs)
|
||||
except OSError:
|
||||
zephyr_ctypes.check(
|
||||
zephyr_ctypes.ZSubscribeTo(
|
||||
(zephyr_ctypes.ZSubscription_t * len(subs))(
|
||||
*(
|
||||
zephyr_ctypes.ZSubscription_t(
|
||||
zsub_class=cls.encode(),
|
||||
zsub_classinst=instance.encode(),
|
||||
zsub_recipient=recipient.encode(),
|
||||
)
|
||||
for cls, instance, recipient in subs
|
||||
)
|
||||
),
|
||||
len(subs),
|
||||
0,
|
||||
)
|
||||
)
|
||||
except zephyr_ctypes.ZephyrError:
|
||||
# Since we haven't added the subscription to
|
||||
# current_zephyr_subs yet, we can just return (so that we'll
|
||||
# continue processing normal messages) and we'll end up
|
||||
|
@ -176,26 +206,41 @@ def zephyr_bulk_subscribe(subs: List[Tuple[str, str, str]]) -> None:
|
|||
logger.exception("Error subscribing to streams (will retry automatically):")
|
||||
logger.warning(f"Streams were: {[cls for cls, instance, recipient in subs]}")
|
||||
return
|
||||
|
||||
try:
|
||||
actual_zephyr_subs = [cls for (cls, _, _) in zephyr._z.getSubscriptions()]
|
||||
except OSError:
|
||||
nsubs = c_int()
|
||||
zephyr_ctypes.check(zephyr_ctypes.ZRetrieveSubscriptions(0, byref(nsubs)))
|
||||
zsubs = (zephyr_ctypes.ZSubscription_t * nsubs.value)()
|
||||
zephyr_ctypes.check(zephyr_ctypes.ZGetSubscriptions(zsubs, byref(nsubs)))
|
||||
actual_zephyr_subs = {zsub.zsub_class.decode() for zsub in zsubs}
|
||||
except zephyr_ctypes.ZephyrError:
|
||||
logger.exception("Error getting current Zephyr subscriptions")
|
||||
# Don't add anything to current_zephyr_subs so that we'll
|
||||
# retry the next time we check for streams to subscribe to
|
||||
# (within 15 seconds).
|
||||
return
|
||||
finally:
|
||||
zephyr_ctypes.ZFlushSubscriptions()
|
||||
|
||||
for (cls, instance, recipient) in subs:
|
||||
if cls not in actual_zephyr_subs:
|
||||
logger.error(f"Zephyr failed to subscribe us to {cls}; will retry")
|
||||
try:
|
||||
# We'll retry automatically when we next check for
|
||||
# streams to subscribe to (within 15 seconds), but
|
||||
# it's worth doing 1 retry immediately to avoid
|
||||
# missing 15 seconds of messages on the affected
|
||||
# classes
|
||||
zephyr._z.sub(cls, instance, recipient)
|
||||
except OSError:
|
||||
pass
|
||||
zephyr_ctypes.ZSubscribeTo(
|
||||
(zephyr_ctypes.ZSubscription_t * 1)(
|
||||
zephyr_ctypes.ZSubscription_t(
|
||||
zsub_class=cls.encode(),
|
||||
zsub_classinst=instance.encode(),
|
||||
zsub_recipient=recipient.encode(),
|
||||
)
|
||||
),
|
||||
1,
|
||||
0,
|
||||
)
|
||||
else:
|
||||
current_zephyr_subs.add(cls)
|
||||
|
||||
|
@ -246,8 +291,8 @@ def maybe_restart_mirroring_script() -> None:
|
|||
logger.warning("zephyr mirroring script has been updated; restarting...")
|
||||
maybe_kill_child()
|
||||
try:
|
||||
zephyr._z.cancelSubs()
|
||||
except OSError:
|
||||
zephyr_ctypes.check(zephyr_ctypes.ZCancelSubscriptions(0))
|
||||
except zephyr_ctypes.ZephyrError:
|
||||
# We don't care whether we failed to cancel subs properly, but we should log it
|
||||
logger.exception("")
|
||||
backoff = RandomExponentialBackoff(
|
||||
|
@ -263,27 +308,29 @@ def maybe_restart_mirroring_script() -> None:
|
|||
raise Exception("Failed to reload too many times, aborting!")
|
||||
|
||||
|
||||
def process_loop(log: Optional[IO[str]]) -> NoReturn:
|
||||
def process_loop(zulip_queue: "Queue[ZephyrDict]", log: Optional[IO[str]]) -> NoReturn:
|
||||
restart_check_count = 0
|
||||
last_check_time = time.time()
|
||||
recieve_backoff = RandomExponentialBackoff()
|
||||
while True:
|
||||
select.select([zephyr._z.getFD()], [], [], 15)
|
||||
select.select([zephyr_ctypes.ZGetFD()], [], [], 15)
|
||||
try:
|
||||
process_backoff = RandomExponentialBackoff()
|
||||
# Fetch notices from the queue until its empty
|
||||
while True:
|
||||
notice = zephyr.receive(block=False)
|
||||
recieve_backoff.succeed()
|
||||
if notice is None:
|
||||
break
|
||||
while zephyr_ctypes.ZPending() != 0:
|
||||
notice = zephyr_ctypes.ZNotice_t()
|
||||
sender = zephyr_ctypes.sockaddr_in()
|
||||
zephyr_ctypes.check(zephyr_ctypes.ZReceiveNotice(byref(notice), byref(sender)))
|
||||
try:
|
||||
process_notice(notice, log)
|
||||
recieve_backoff.succeed()
|
||||
process_notice(notice, zulip_queue, log)
|
||||
process_backoff.succeed()
|
||||
except Exception:
|
||||
except zephyr_ctypes.ZephyrError:
|
||||
logger.exception("Error relaying zephyr:")
|
||||
process_backoff.fail()
|
||||
except Exception:
|
||||
finally:
|
||||
zephyr_ctypes.ZFreeNotice(byref(notice))
|
||||
except zephyr_ctypes.ZephyrError:
|
||||
logger.exception("Error checking for new zephyrs:")
|
||||
recieve_backoff.fail()
|
||||
continue
|
||||
|
@ -395,38 +442,47 @@ def decrypt_zephyr(zephyr_class: str, instance: str, body: str) -> str:
|
|||
return decrypted
|
||||
|
||||
|
||||
def process_notice(notice: "zephyr.ZNotice", log: Optional[IO[str]]) -> None:
|
||||
assert notice.sender is not None
|
||||
(zsig, body) = parse_zephyr_body(notice.message, notice.format)
|
||||
def process_notice(
|
||||
notice: zephyr_ctypes.ZNotice_t, zulip_queue: "Queue[ZephyrDict]", log: Optional[IO[str]]
|
||||
) -> None:
|
||||
assert notice.z_sender is not None
|
||||
(zsig, body) = parse_zephyr_body(
|
||||
notice.z_message[: notice.z_message_len].decode(errors="replace"),
|
||||
notice.z_default_format.decode(errors="replace"),
|
||||
)
|
||||
is_personal = False
|
||||
is_huddle = False
|
||||
|
||||
if notice.opcode == "PING":
|
||||
if notice.z_opcode == b"PING":
|
||||
# skip PING messages
|
||||
return
|
||||
|
||||
zephyr_class = notice.cls.lower()
|
||||
zephyr_class = notice.z_class.decode()
|
||||
zephyr_instance = notice.z_class_inst.decode()
|
||||
zephyr_sender = notice.z_sender.decode()
|
||||
|
||||
if zephyr_class == options.nagios_class:
|
||||
if zephyr_class.lower() == options.nagios_class:
|
||||
# Mark that we got the message and proceed
|
||||
with open(options.nagios_path, "w") as f:
|
||||
f.write("0\n")
|
||||
return
|
||||
|
||||
if notice.recipient != "":
|
||||
if notice.z_recipient != b"":
|
||||
is_personal = True
|
||||
# Drop messages not to the listed subscriptions
|
||||
if is_personal and not options.forward_personals:
|
||||
return
|
||||
if (zephyr_class not in current_zephyr_subs) and not is_personal:
|
||||
logger.debug(f"Skipping ... {zephyr_class}/{notice.instance}/{is_personal}")
|
||||
if (zephyr_class.lower() not in current_zephyr_subs) and not is_personal:
|
||||
logger.debug(f"Skipping ... {zephyr_class}/{zephyr_instance}/{is_personal}")
|
||||
return
|
||||
if notice.format.startswith("Zephyr error: See") or notice.format.endswith("@(@color(blue))"):
|
||||
if notice.z_default_format.startswith(b"Zephyr error: See") or notice.z_default_format.endswith(
|
||||
b"@(@color(blue))"
|
||||
):
|
||||
logger.debug("Skipping message we got from Zulip!")
|
||||
return
|
||||
if (
|
||||
zephyr_class == "mail"
|
||||
and notice.instance.lower() == "inbox"
|
||||
zephyr_class.lower() == "mail"
|
||||
and zephyr_instance.lower() == "inbox"
|
||||
and is_personal
|
||||
and not options.forward_mail_zephyrs
|
||||
):
|
||||
|
@ -440,21 +496,21 @@ def process_notice(notice: "zephyr.ZNotice", log: Optional[IO[str]]) -> None:
|
|||
huddle_recipients = [
|
||||
to_zulip_username(x.strip()) for x in body.split("\n")[0][4:].split()
|
||||
]
|
||||
if notice.sender not in huddle_recipients:
|
||||
huddle_recipients.append(to_zulip_username(notice.sender))
|
||||
if zephyr_sender not in huddle_recipients:
|
||||
huddle_recipients.append(to_zulip_username(zephyr_sender))
|
||||
body = body.split("\n", 1)[1]
|
||||
|
||||
if (
|
||||
options.forward_class_messages
|
||||
and notice.opcode is not None
|
||||
and notice.opcode.lower() == "crypt"
|
||||
and notice.z_opcode is not None
|
||||
and notice.z_opcode.lower() == b"crypt"
|
||||
):
|
||||
body = decrypt_zephyr(zephyr_class, notice.instance.lower(), body)
|
||||
body = decrypt_zephyr(zephyr_class.lower(), zephyr_instance.lower(), body)
|
||||
|
||||
zeph: ZephyrDict
|
||||
zeph = {
|
||||
"time": str(notice.time),
|
||||
"sender": notice.sender,
|
||||
"time": str(notice.z_time.tv_sec + notice.z_time.tv_usec / 1e6),
|
||||
"sender": zephyr_sender,
|
||||
"zsig": zsig, # logged here but not used by app
|
||||
"content": body,
|
||||
}
|
||||
|
@ -462,46 +518,47 @@ def process_notice(notice: "zephyr.ZNotice", log: Optional[IO[str]]) -> None:
|
|||
zeph["type"] = "private"
|
||||
zeph["recipient"] = huddle_recipients
|
||||
elif is_personal:
|
||||
assert notice.recipient is not None
|
||||
assert notice.z_recipient is not None
|
||||
zeph["type"] = "private"
|
||||
zeph["recipient"] = to_zulip_username(notice.recipient)
|
||||
zeph["recipient"] = to_zulip_username(notice.z_recipient.decode())
|
||||
else:
|
||||
zeph["type"] = "stream"
|
||||
zeph["stream"] = zephyr_class
|
||||
if notice.instance.strip() != "":
|
||||
zeph["subject"] = notice.instance
|
||||
zeph["stream"] = zephyr_class.lower()
|
||||
if zephyr_instance.strip() != "":
|
||||
zeph["subject"] = zephyr_instance
|
||||
else:
|
||||
zeph["subject"] = f'(instance "{notice.instance}")'
|
||||
zeph["subject"] = f'(instance "{zephyr_instance}")'
|
||||
|
||||
# Add instances in for instanced personals
|
||||
if is_personal:
|
||||
if notice.cls.lower() != "message" and notice.instance.lower() != "personal":
|
||||
heading = f"[-c {notice.cls} -i {notice.instance}]\n"
|
||||
elif notice.cls.lower() != "message":
|
||||
heading = f"[-c {notice.cls}]\n"
|
||||
elif notice.instance.lower() != "personal":
|
||||
heading = f"[-i {notice.instance}]\n"
|
||||
if zephyr_class.lower() != "message" and zephyr_instance.lower() != "personal":
|
||||
heading = f"[-c {zephyr_class} -i {zephyr_instance}]\n"
|
||||
elif zephyr_class.lower() != "message":
|
||||
heading = f"[-c {zephyr_class}]\n"
|
||||
elif zephyr_instance.lower() != "personal":
|
||||
heading = f"[-i {zephyr_instance}]\n"
|
||||
else:
|
||||
heading = ""
|
||||
zeph["content"] = heading + zeph["content"]
|
||||
|
||||
logger.info(f"Received a message on {zephyr_class}/{notice.instance} from {notice.sender}...")
|
||||
logger.info(f"Received a message on {zephyr_class}/{zephyr_instance} from {zephyr_sender}...")
|
||||
if log is not None:
|
||||
log.write(json.dumps(zeph) + "\n")
|
||||
log.flush()
|
||||
|
||||
if os.fork() == 0:
|
||||
global CURRENT_STATE
|
||||
CURRENT_STATE = States.ChildSending
|
||||
# Actually send the message in a child process, to avoid blocking.
|
||||
zulip_queue.put(zeph)
|
||||
|
||||
|
||||
def send_zulip_worker(zulip_queue: "Queue[ZephyrDict]", zulip_client: zulip.Client) -> None:
|
||||
while True:
|
||||
zeph = zulip_queue.get()
|
||||
try:
|
||||
res = send_zulip(zeph)
|
||||
res = send_zulip(zulip_client, zeph)
|
||||
if res.get("result") != "success":
|
||||
logger.error(f"Error relaying zephyr:\n{zeph}\n{res}")
|
||||
except Exception:
|
||||
logger.exception("Error relaying zephyr:")
|
||||
finally:
|
||||
os._exit(0)
|
||||
zulip_queue.task_done()
|
||||
|
||||
|
||||
def quit_failed_initialization(message: str) -> str:
|
||||
|
@ -514,12 +571,14 @@ def zephyr_init_autoretry() -> None:
|
|||
backoff = zulip.RandomExponentialBackoff()
|
||||
while backoff.keep_going():
|
||||
try:
|
||||
# zephyr.init() tries to clear old subscriptions, and thus
|
||||
# sometimes gets a SERVNAK from the server
|
||||
zephyr.init()
|
||||
# ZCancelSubscriptions sometimes gets a SERVNAK from the server
|
||||
zephyr_ctypes.check(zephyr_ctypes.ZInitialize())
|
||||
zephyr_port = c_ushort()
|
||||
zephyr_ctypes.check(zephyr_ctypes.ZOpenPort(byref(zephyr_port)))
|
||||
zephyr_ctypes.check(zephyr_ctypes.ZCancelSubscriptions(0))
|
||||
backoff.succeed()
|
||||
return
|
||||
except OSError:
|
||||
except zephyr_ctypes.ZephyrError:
|
||||
logger.exception("Error initializing Zephyr library (retrying). Traceback:")
|
||||
backoff.fail()
|
||||
|
||||
|
@ -532,11 +591,10 @@ def zephyr_load_session_autoretry(session_path: str) -> None:
|
|||
try:
|
||||
with open(session_path, "rb") as f:
|
||||
session = f.read()
|
||||
zephyr._z.initialize()
|
||||
zephyr._z.load_session(session)
|
||||
zephyr.__inited = True
|
||||
zephyr_ctypes.check(zephyr_ctypes.ZInitialize())
|
||||
zephyr_ctypes.check(zephyr_ctypes.ZLoadSession(session, len(session)))
|
||||
return
|
||||
except OSError:
|
||||
except zephyr_ctypes.ZephyrError:
|
||||
logger.exception("Error loading saved Zephyr session (retrying). Traceback:")
|
||||
backoff.fail()
|
||||
|
||||
|
@ -544,13 +602,26 @@ def zephyr_load_session_autoretry(session_path: str) -> None:
|
|||
|
||||
|
||||
def zephyr_subscribe_autoretry(sub: Tuple[str, str, str]) -> None:
|
||||
cls, instance, recipient = sub
|
||||
backoff = zulip.RandomExponentialBackoff()
|
||||
while backoff.keep_going():
|
||||
try:
|
||||
zephyr.Subscriptions().add(sub)
|
||||
zephyr_ctypes.check(
|
||||
zephyr_ctypes.ZSubscribeTo(
|
||||
(zephyr_ctypes.ZSubscription_t * 1)(
|
||||
zephyr_ctypes.ZSubscription_t(
|
||||
zsub_class=cls.encode(),
|
||||
zsub_classinst=instance.encode(),
|
||||
zsub_recipient=recipient.encode(),
|
||||
)
|
||||
),
|
||||
1,
|
||||
0,
|
||||
)
|
||||
)
|
||||
backoff.succeed()
|
||||
return
|
||||
except OSError:
|
||||
except zephyr_ctypes.ZephyrError:
|
||||
# Probably a SERVNAK from the zephyr server, but log the
|
||||
# traceback just in case it's something else
|
||||
logger.exception("Error subscribing to personals (retrying). Traceback:")
|
||||
|
@ -560,6 +631,8 @@ def zephyr_subscribe_autoretry(sub: Tuple[str, str, str]) -> None:
|
|||
|
||||
|
||||
def zephyr_to_zulip(options: optparse.Values) -> None:
|
||||
zulip_client = make_zulip_client()
|
||||
|
||||
if options.use_sessions and os.path.exists(options.session_path):
|
||||
logger.info("Loading old session")
|
||||
zephyr_load_session_autoretry(options.session_path)
|
||||
|
@ -575,8 +648,14 @@ def zephyr_to_zulip(options: optparse.Values) -> None:
|
|||
if options.nagios_class:
|
||||
zephyr_subscribe_autoretry((options.nagios_class, "*", "*"))
|
||||
if options.use_sessions:
|
||||
buf = POINTER(c_char)()
|
||||
buf_len = c_int()
|
||||
zephyr_ctypes.check(zephyr_ctypes.ZDumpSession(byref(buf), byref(buf_len)))
|
||||
try:
|
||||
with open(options.session_path, "wb") as f:
|
||||
f.write(zephyr._z.dump_session())
|
||||
f.write(buf[: buf_len.value]) # type: ignore[arg-type] # bytes, but mypy infers List[c_char]
|
||||
finally:
|
||||
zephyr_ctypes.free(buf)
|
||||
|
||||
if options.logs_to_resend is not None:
|
||||
with open(options.logs_to_resend) as log:
|
||||
|
@ -593,18 +672,22 @@ def zephyr_to_zulip(options: optparse.Values) -> None:
|
|||
"sending saved message to %s from %s..."
|
||||
% (zeph.get("stream", zeph.get("recipient")), zeph["sender"])
|
||||
)
|
||||
send_zulip(zeph)
|
||||
send_zulip(zulip_client, zeph)
|
||||
except Exception:
|
||||
logger.exception("Could not send saved zephyr:")
|
||||
time.sleep(2)
|
||||
|
||||
logger.info("Successfully initialized; Starting receive loop.")
|
||||
|
||||
# Actually send the messages in a thread, to avoid blocking.
|
||||
zulip_queue: "Queue[ZephyrDict]" = Queue()
|
||||
Thread(target=lambda: send_zulip_worker(zulip_queue, zulip_client)).start()
|
||||
|
||||
if options.resend_log_path is not None:
|
||||
with open(options.resend_log_path, "a") as log:
|
||||
process_loop(log)
|
||||
process_loop(zulip_queue, log)
|
||||
else:
|
||||
process_loop(None)
|
||||
process_loop(zulip_queue, None)
|
||||
|
||||
|
||||
def send_zephyr(zwrite_args: List[str], content: str) -> Tuple[int, str]:
|
||||
|
@ -675,7 +758,7 @@ def zcrypt_encrypt_content(zephyr_class: str, instance: str, content: str) -> Op
|
|||
return encrypted
|
||||
|
||||
|
||||
def forward_to_zephyr(message: Dict[str, Any]) -> None:
|
||||
def forward_to_zephyr(message: Dict[str, Any], zulip_client: zulip.Client) -> None:
|
||||
# 'Any' can be of any type of text
|
||||
support_heading = "Hi there! This is an automated message from Zulip."
|
||||
support_closing = """If you have any questions, please be in touch through the \
|
||||
|
@ -749,6 +832,7 @@ Feedback button or at support@zulip.com."""
|
|||
result = zcrypt_encrypt_content(zephyr_class, instance, wrapped_content)
|
||||
if result is None:
|
||||
send_error_zulip(
|
||||
zulip_client,
|
||||
"""%s
|
||||
|
||||
Your Zulip-Zephyr mirror bot was unable to forward that last message \
|
||||
|
@ -758,7 +842,7 @@ key (perhaps because your AFS tokens expired). That means that while \
|
|||
Zulip users (like you) received it, Zephyr users did not.
|
||||
|
||||
%s"""
|
||||
% (support_heading, support_closing)
|
||||
% (support_heading, support_closing),
|
||||
)
|
||||
return
|
||||
|
||||
|
@ -775,6 +859,7 @@ Zulip users (like you) received it, Zephyr users did not.
|
|||
return
|
||||
elif code == 0:
|
||||
send_error_zulip(
|
||||
zulip_client,
|
||||
"""%s
|
||||
|
||||
Your last message was successfully mirrored to zephyr, but zwrite \
|
||||
|
@ -783,7 +868,7 @@ returned the following warning:
|
|||
%s
|
||||
|
||||
%s"""
|
||||
% (support_heading, stderr, support_closing)
|
||||
% (support_heading, stderr, support_closing),
|
||||
)
|
||||
return
|
||||
elif code != 0 and (
|
||||
|
@ -797,6 +882,7 @@ returned the following warning:
|
|||
if options.ignore_expired_tickets:
|
||||
return
|
||||
send_error_zulip(
|
||||
zulip_client,
|
||||
"""%s
|
||||
|
||||
Your last message was forwarded from Zulip to Zephyr unauthenticated, \
|
||||
|
@ -806,7 +892,7 @@ are running the Zulip-Zephyr mirroring bot, so we can send \
|
|||
authenticated Zephyr messages for you again.
|
||||
|
||||
%s"""
|
||||
% (support_heading, support_closing)
|
||||
% (support_heading, support_closing),
|
||||
)
|
||||
return
|
||||
|
||||
|
@ -814,6 +900,7 @@ authenticated Zephyr messages for you again.
|
|||
# probably because the recipient isn't subscribed to personals,
|
||||
# but regardless, we should just notify the user.
|
||||
send_error_zulip(
|
||||
zulip_client,
|
||||
"""%s
|
||||
|
||||
Your Zulip-Zephyr mirror bot was unable to forward that last message \
|
||||
|
@ -823,12 +910,12 @@ received it, Zephyr users did not. The error message from zwrite was:
|
|||
%s
|
||||
|
||||
%s"""
|
||||
% (support_heading, stderr, support_closing)
|
||||
% (support_heading, stderr, support_closing),
|
||||
)
|
||||
return
|
||||
|
||||
|
||||
def maybe_forward_to_zephyr(message: Dict[str, Any]) -> None:
|
||||
def maybe_forward_to_zephyr(message: Dict[str, Any], zulip_client: zulip.Client) -> None:
|
||||
# The key string can be used to direct any type of text.
|
||||
if message["sender_email"] == zulip_account_email:
|
||||
if not (
|
||||
|
@ -851,7 +938,7 @@ def maybe_forward_to_zephyr(message: Dict[str, Any]) -> None:
|
|||
)
|
||||
return
|
||||
try:
|
||||
forward_to_zephyr(message)
|
||||
forward_to_zephyr(message, zulip_client)
|
||||
except Exception:
|
||||
# Don't let an exception forwarding one message crash the
|
||||
# whole process
|
||||
|
@ -859,12 +946,16 @@ def maybe_forward_to_zephyr(message: Dict[str, Any]) -> None:
|
|||
|
||||
|
||||
def zulip_to_zephyr(options: optparse.Values) -> NoReturn:
|
||||
zulip_client = make_zulip_client()
|
||||
|
||||
# Sync messages from zulip to zephyr
|
||||
logger.info("Starting syncing messages.")
|
||||
backoff = RandomExponentialBackoff(timeout_success_equivalent=120)
|
||||
while True:
|
||||
try:
|
||||
zulip_client.call_on_each_message(maybe_forward_to_zephyr)
|
||||
zulip_client.call_on_each_message(
|
||||
lambda message: maybe_forward_to_zephyr(message, zulip_client)
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("Error syncing messages:")
|
||||
backoff.fail()
|
||||
|
@ -886,6 +977,8 @@ def subscribed_to_mail_messages() -> bool:
|
|||
|
||||
|
||||
def add_zulip_subscriptions(verbose: bool) -> None:
|
||||
zulip_client = make_zulip_client()
|
||||
|
||||
zephyr_subscriptions = set()
|
||||
skipped = set()
|
||||
for (cls, instance, recipient) in parse_zephyr_subs(verbose=verbose):
|
||||
|
@ -1146,14 +1239,14 @@ def parse_args() -> Tuple[optparse.Values, List[str]]:
|
|||
|
||||
|
||||
def die_gracefully(signal: int, frame: FrameType) -> None:
|
||||
if CURRENT_STATE == States.ZulipToZephyr or CURRENT_STATE == States.ChildSending:
|
||||
if CURRENT_STATE == States.ZulipToZephyr:
|
||||
# this is a child process, so we want os._exit (no clean-up necessary)
|
||||
os._exit(1)
|
||||
|
||||
if CURRENT_STATE == States.ZephyrToZulip and not options.use_sessions:
|
||||
try:
|
||||
# zephyr=>zulip processes may have added subs, so run cancelSubs
|
||||
zephyr._z.cancelSubs()
|
||||
# zephyr=>zulip processes may have added subs, so run ZCancelSubscriptions
|
||||
zephyr_ctypes.check(zephyr_ctypes.ZCancelSubscriptions(0))
|
||||
except OSError:
|
||||
# We don't care whether we failed to cancel subs properly, but we should log it
|
||||
logger.exception("")
|
||||
|
@ -1207,15 +1300,6 @@ or specify the --api-key-file option."""
|
|||
sys.exit(1)
|
||||
|
||||
zulip_account_email = options.user + "@mit.edu"
|
||||
import zulip
|
||||
|
||||
zulip_client = zulip.Client(
|
||||
email=zulip_account_email,
|
||||
api_key=api_key,
|
||||
verbose=True,
|
||||
client="zephyr_mirror",
|
||||
site=options.site,
|
||||
)
|
||||
|
||||
start_time = time.time()
|
||||
|
||||
|
@ -1274,8 +1358,6 @@ or specify the --api-key-file option."""
|
|||
child_pid = None
|
||||
CURRENT_STATE = States.ZephyrToZulip
|
||||
|
||||
import zephyr
|
||||
|
||||
logger_name = "zephyr=>zulip"
|
||||
if options.shard is not None:
|
||||
logger_name += f"({options.shard})"
|
||||
|
|
|
@ -42,12 +42,12 @@ setup(
|
|||
"License :: OSI Approved :: Apache Software License",
|
||||
"Topic :: Communications :: Chat",
|
||||
"Programming Language :: Python :: 3",
|
||||
"Programming Language :: Python :: 3.6",
|
||||
"Programming Language :: Python :: 3.7",
|
||||
"Programming Language :: Python :: 3.8",
|
||||
"Programming Language :: Python :: 3.9",
|
||||
"Programming Language :: Python :: 3.10",
|
||||
],
|
||||
python_requires=">=3.6",
|
||||
python_requires=">=3.7",
|
||||
url="https://www.zulip.org/",
|
||||
project_urls={
|
||||
"Source": "https://github.com/zulip/python-zulip-api/",
|
||||
|
|
|
@ -34,12 +34,12 @@ setup(
|
|||
"License :: OSI Approved :: Apache Software License",
|
||||
"Topic :: Communications :: Chat",
|
||||
"Programming Language :: Python :: 3",
|
||||
"Programming Language :: Python :: 3.6",
|
||||
"Programming Language :: Python :: 3.7",
|
||||
"Programming Language :: Python :: 3.8",
|
||||
"Programming Language :: Python :: 3.9",
|
||||
"Programming Language :: Python :: 3.10",
|
||||
],
|
||||
python_requires=">=3.6",
|
||||
python_requires=">=3.7",
|
||||
url="https://www.zulip.org/",
|
||||
project_urls={
|
||||
"Source": "https://github.com/zulip/python-zulip-api/",
|
||||
|
|
|
@ -1,2 +1 @@
|
|||
python-chess==0.31.* ; python_version < '3.7'
|
||||
chess==1.* ; python_version >= '3.7'
|
||||
chess==1.*
|
||||
|
|
|
@ -22,12 +22,12 @@ setup(
|
|||
"License :: OSI Approved :: Apache Software License",
|
||||
"Topic :: Communications :: Chat",
|
||||
"Programming Language :: Python :: 3",
|
||||
"Programming Language :: Python :: 3.6",
|
||||
"Programming Language :: Python :: 3.7",
|
||||
"Programming Language :: Python :: 3.8",
|
||||
"Programming Language :: Python :: 3.9",
|
||||
"Programming Language :: Python :: 3.10",
|
||||
],
|
||||
python_requires=">=3.6",
|
||||
python_requires=">=3.7",
|
||||
url="https://www.zulip.org/",
|
||||
project_urls={
|
||||
"Source": "https://github.com/zulip/python-zulip-api/",
|
||||
|
|
Loading…
Reference in a new issue