Compare commits
10 commits
63c259b2bc
...
02586f1d34
Author | SHA1 | Date | |
---|---|---|---|
02586f1d34 | |||
7831d979c9 | |||
c94da617ed | |||
4a3d225a38 | |||
582e9733a9 | |||
eef02fbb76 | |||
41ec1a9a29 | |||
92120914f8 | |||
a534446315 | |||
56f805a5d7 |
44
.github/workflows/zulip-ci.yml
vendored
44
.github/workflows/zulip-ci.yml
vendored
|
@ -17,33 +17,24 @@ jobs:
|
||||||
fail-fast: false
|
fail-fast: false
|
||||||
matrix:
|
matrix:
|
||||||
include:
|
include:
|
||||||
# Base images are built using `tools/ci/Dockerfile.prod.template`.
|
# Focal ships with Python 3.8.10.
|
||||||
# The comments at the top explain how to build and upload these images.
|
|
||||||
# Bionic ships with Python 3.6.
|
|
||||||
- docker_image: zulip/ci:bionic
|
|
||||||
name: Ubuntu 18.04 Bionic (Python 3.6, backend)
|
|
||||||
os: bionic
|
|
||||||
is_bionic: true
|
|
||||||
include_frontend_tests: false
|
|
||||||
# Configure this test to run with the Zulip 3.2 release.
|
|
||||||
legacy_client_interface: 3
|
|
||||||
server_version: refs/tags/3.2
|
|
||||||
# Focal ships with Python 3.8.2.
|
|
||||||
- docker_image: zulip/ci:focal
|
- docker_image: zulip/ci:focal
|
||||||
name: Ubuntu 20.04 Focal (Python 3.8, backend)
|
name: Ubuntu 20.04 (Python 3.8, backend)
|
||||||
os: focal
|
os: focal
|
||||||
is_focal: true
|
legacy_client_interface: "3"
|
||||||
include_frontend_tests: false
|
server_version: refs/tags/3.2
|
||||||
legacy_client_interface: 4
|
|
||||||
server_version: refs/tags/4.0
|
|
||||||
# Bullseye ships with Python 3.9.2.
|
# Bullseye ships with Python 3.9.2.
|
||||||
- docker_image: zulip/ci:bullseye
|
- docker_image: zulip/ci:bullseye
|
||||||
name: Debian 11 Bullseye (Python 3.9, backend)
|
name: Debian 11 (Python 3.9, backend)
|
||||||
os: bullseye
|
os: bullseye
|
||||||
is_bullseye: true
|
legacy_client_interface: "4"
|
||||||
include_frontend_tests: false
|
|
||||||
legacy_client_interface: 4
|
|
||||||
server_version: refs/tags/4.0
|
server_version: refs/tags/4.0
|
||||||
|
# Ubuntu 22.04 ships with Python 3.10.6.
|
||||||
|
- docker_image: zulip/ci:jammy
|
||||||
|
name: Ubuntu 22.04 (Python 3.10, backend)
|
||||||
|
os: jammy
|
||||||
|
legacy_client_interface: "6"
|
||||||
|
server_version: refs/tags/6.0
|
||||||
|
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
name: ${{ matrix.name }} (Zulip ${{matrix.server_version}})
|
name: ${{ matrix.name }} (Zulip ${{matrix.server_version}})
|
||||||
|
@ -58,25 +49,18 @@ jobs:
|
||||||
HOME: /home/github/
|
HOME: /home/github/
|
||||||
|
|
||||||
steps:
|
steps:
|
||||||
- name: 'Checkout python-zulip-api'
|
- name: "Check out python-zulip-api"
|
||||||
uses: actions/checkout@v2
|
uses: actions/checkout@v2
|
||||||
with:
|
with:
|
||||||
path: api
|
path: api
|
||||||
|
|
||||||
- name: 'Checkout Zulip server ${{ matrix.server_version }}'
|
- name: "Check out Zulip server ${{ matrix.server_version }}"
|
||||||
uses: actions/checkout@v2
|
uses: actions/checkout@v2
|
||||||
with:
|
with:
|
||||||
repository: zulip/zulip
|
repository: zulip/zulip
|
||||||
ref: ${{ matrix.server_version }}
|
ref: ${{ matrix.server_version }}
|
||||||
path: server
|
path: server
|
||||||
|
|
||||||
- name: Do Bionic hack
|
|
||||||
if: ${{ matrix.is_bionic }}
|
|
||||||
run: |
|
|
||||||
# Temporary hack till `sudo service redis-server start` gets fixes in Bionic. See
|
|
||||||
# https://chat.zulip.org/#narrow/stream/3-backend/topic/Ubuntu.20bionic.20CircleCI
|
|
||||||
sudo sed -i '/^bind/s/bind.*/bind 0.0.0.0/' /etc/redis/redis.conf
|
|
||||||
|
|
||||||
- name: Install dependencies
|
- name: Install dependencies
|
||||||
run: |
|
run: |
|
||||||
cd server
|
cd server
|
||||||
|
|
11
.github/workflows/zulip-tests.yml
vendored
11
.github/workflows/zulip-tests.yml
vendored
|
@ -13,10 +13,10 @@ jobs:
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v2
|
- uses: actions/checkout@v2
|
||||||
|
|
||||||
- name: Setup Python 3.6
|
- name: Set up Python 3.7
|
||||||
uses: actions/setup-python@v2
|
uses: actions/setup-python@v2
|
||||||
with:
|
with:
|
||||||
python-version: 3.6
|
python-version: "3.7"
|
||||||
|
|
||||||
- name: Install dependencies
|
- name: Install dependencies
|
||||||
run: tools/provision --force
|
run: tools/provision --force
|
||||||
|
@ -32,15 +32,12 @@ jobs:
|
||||||
fail-fast: false
|
fail-fast: false
|
||||||
matrix:
|
matrix:
|
||||||
os: [ubuntu-latest, windows-latest]
|
os: [ubuntu-latest, windows-latest]
|
||||||
python-version: [3.6, 3.7, 3.8, 3.9]
|
python-version: ["3.7", "3.8", "3.9", "3.10"]
|
||||||
exclude:
|
|
||||||
- os: windows-latest
|
|
||||||
python-version: 3.6 # cryptography install fails on Windows with python 3.6 since pip is quite old.
|
|
||||||
|
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v2
|
- uses: actions/checkout@v2
|
||||||
|
|
||||||
- name: Setup Python ${{ matrix.python-version }}
|
- name: Set up Python ${{ matrix.python-version }}
|
||||||
uses: actions/setup-python@v2
|
uses: actions/setup-python@v2
|
||||||
with:
|
with:
|
||||||
python-version: ${{ matrix.python-version }}
|
python-version: ${{ matrix.python-version }}
|
||||||
|
|
2
.mailmap
2
.mailmap
|
@ -1,3 +1,4 @@
|
||||||
|
Aman Agrawal <f2016561@pilani.bits-pilani.ac.in>
|
||||||
Anders Kaseorg <anders@zulip.com> <anders@zulipchat.com>
|
Anders Kaseorg <anders@zulip.com> <anders@zulipchat.com>
|
||||||
Anders Kaseorg <anders@zulip.com> <andersk@mit.edu>
|
Anders Kaseorg <anders@zulip.com> <andersk@mit.edu>
|
||||||
Rishi Gupta <rishig@zulip.com> <rishig@zulipchat.com>
|
Rishi Gupta <rishig@zulip.com> <rishig@zulipchat.com>
|
||||||
|
@ -7,3 +8,4 @@ Steve Howell <showell@zulip.com> <steve@zulip.com>
|
||||||
Tim Abbott <tabbott@zulip.com> <tabbott@humbughq.com>
|
Tim Abbott <tabbott@zulip.com> <tabbott@humbughq.com>
|
||||||
Tim Abbott <tabbott@zulip.com> <tabbott@mit.edu>
|
Tim Abbott <tabbott@zulip.com> <tabbott@mit.edu>
|
||||||
Tim Abbott <tabbott@zulip.com> <tabbott@zulipchat.com>
|
Tim Abbott <tabbott@zulip.com> <tabbott@zulipchat.com>
|
||||||
|
Zixuan James Li <p359101898@gmail.com> <359101898@qq.com>
|
||||||
|
|
|
@ -1,8 +1,14 @@
|
||||||
[tool.black]
|
[tool.black]
|
||||||
line-length = 100
|
line-length = 100
|
||||||
target-version = ["py36"]
|
target-version = ["py37"]
|
||||||
|
|
||||||
[tool.isort]
|
[tool.isort]
|
||||||
src_paths = ["tools", "zulip", "zulip_bots", "zulip_botserver"]
|
src_paths = [
|
||||||
|
"tools",
|
||||||
|
"zulip",
|
||||||
|
"zulip/integrations/zephyr",
|
||||||
|
"zulip_bots",
|
||||||
|
"zulip_botserver",
|
||||||
|
]
|
||||||
profile = "black"
|
profile = "black"
|
||||||
line_length = 100
|
line_length = 100
|
||||||
|
|
|
@ -3,7 +3,7 @@
|
||||||
The [Zulip API](https://zulip.com/api) Python bindings require the
|
The [Zulip API](https://zulip.com/api) Python bindings require the
|
||||||
following dependencies:
|
following dependencies:
|
||||||
|
|
||||||
* **Python (version >= 3.6)**
|
* **Python (version >= 3.7)**
|
||||||
* requests (version >= 0.12.1)
|
* requests (version >= 0.12.1)
|
||||||
|
|
||||||
**Note**: If you'd like to use the Zulip bindings with Python 2, we
|
**Note**: If you'd like to use the Zulip bindings with Python 2, we
|
||||||
|
|
|
@ -14,9 +14,6 @@ from concurrent.futures import ThreadPoolExecutor
|
||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
from typing import Any, Dict, List, Match, Optional, Set, Tuple, Type, Union
|
from typing import Any, Dict, List, Match, Optional, Set, Tuple, Type, Union
|
||||||
|
|
||||||
if os.name != "nt":
|
|
||||||
import magic
|
|
||||||
import magic.compat
|
|
||||||
import nio
|
import nio
|
||||||
from nio.responses import (
|
from nio.responses import (
|
||||||
DownloadError,
|
DownloadError,
|
||||||
|
@ -352,17 +349,15 @@ class ZulipToMatrix:
|
||||||
if result["result"] != "success":
|
if result["result"] != "success":
|
||||||
success = False
|
success = False
|
||||||
continue
|
continue
|
||||||
|
|
||||||
try:
|
try:
|
||||||
file_content: bytes = urllib.request.urlopen(self.server_url + result["url"]).read()
|
with urllib.request.urlopen(self.server_url + result["url"]) as response:
|
||||||
|
file_content: bytes = response.read()
|
||||||
|
mimetype: str = response.headers.get_content_type()
|
||||||
except Exception:
|
except Exception:
|
||||||
success = False
|
success = False
|
||||||
continue
|
continue
|
||||||
|
|
||||||
mimetype: str
|
|
||||||
if os.name == "nt":
|
|
||||||
mimetype = "m.file"
|
|
||||||
else:
|
|
||||||
mimetype = magic.from_buffer(file_content, mime=True)
|
|
||||||
filename: str = file.split("/")[-1]
|
filename: str = file.split("/")[-1]
|
||||||
|
|
||||||
response, _ = await self.matrix_client.upload(
|
response, _ = await self.matrix_client.upload(
|
||||||
|
|
|
@ -1,3 +1 @@
|
||||||
matrix-nio
|
matrix-nio
|
||||||
python-magic
|
|
||||||
python-magic-bin; platform_system == "Windows"
|
|
||||||
|
|
|
@ -41,7 +41,7 @@ topic = matrix
|
||||||
ZULIP_MESSAGE_TEMPLATE: str = "**{username}** [{uid}]: {message}"
|
ZULIP_MESSAGE_TEMPLATE: str = "**{username}** [{uid}]: {message}"
|
||||||
|
|
||||||
|
|
||||||
# For Python 3.6 compatibility.
|
# For Python 3.7 compatibility.
|
||||||
# (Since 3.8, there is unittest.IsolatedAsyncioTestCase!)
|
# (Since 3.8, there is unittest.IsolatedAsyncioTestCase!)
|
||||||
# source: https://stackoverflow.com/a/46324983
|
# source: https://stackoverflow.com/a/46324983
|
||||||
def async_test(coro: Callable[..., Awaitable[Any]]) -> Callable[..., Any]:
|
def async_test(coro: Callable[..., Awaitable[Any]]) -> Callable[..., Any]:
|
||||||
|
|
|
@ -3,12 +3,20 @@ config = {
|
||||||
"email": "zulip-bot@email.com",
|
"email": "zulip-bot@email.com",
|
||||||
"api_key": "put api key here",
|
"api_key": "put api key here",
|
||||||
"site": "https://chat.zulip.org",
|
"site": "https://chat.zulip.org",
|
||||||
"stream": "test here",
|
|
||||||
"topic": "<- slack-bridge",
|
|
||||||
},
|
},
|
||||||
"slack": {
|
"slack": {
|
||||||
"username": "slack_username",
|
"username": "slack_username",
|
||||||
"token": "xoxb-your-slack-token",
|
"token": "xoxb-your-slack-token",
|
||||||
"channel": "C5Z5N7R8A -- must be channel id",
|
},
|
||||||
|
# Mapping between Slack channels and Zulip stream-topic's.
|
||||||
|
# You can specify multiple pairs.
|
||||||
|
"channel_mapping": {
|
||||||
|
# Slack channel; must be channel ID
|
||||||
|
"C5Z5N7R8A": {
|
||||||
|
# Zulip stream
|
||||||
|
"stream": "test here",
|
||||||
|
# Zulip topic
|
||||||
|
"topic": "<- slack-bridge",
|
||||||
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,7 +5,7 @@ import os
|
||||||
import sys
|
import sys
|
||||||
import threading
|
import threading
|
||||||
import traceback
|
import traceback
|
||||||
from typing import Any, Callable, Dict
|
from typing import Any, Callable, Dict, Optional, Tuple
|
||||||
|
|
||||||
import bridge_with_slack_config
|
import bridge_with_slack_config
|
||||||
import slack_sdk
|
import slack_sdk
|
||||||
|
@ -17,18 +17,28 @@ import zulip
|
||||||
ZULIP_MESSAGE_TEMPLATE = "**{username}**: {message}"
|
ZULIP_MESSAGE_TEMPLATE = "**{username}**: {message}"
|
||||||
SLACK_MESSAGE_TEMPLATE = "<{username}> {message}"
|
SLACK_MESSAGE_TEMPLATE = "<{username}> {message}"
|
||||||
|
|
||||||
|
StreamTopicT = Tuple[str, str]
|
||||||
|
|
||||||
def check_zulip_message_validity(msg: Dict[str, Any], config: Dict[str, Any]) -> bool:
|
|
||||||
|
def get_slack_channel_for_zulip_message(
|
||||||
|
msg: Dict[str, Any], zulip_to_slack_map: Dict[StreamTopicT, Any], bot_email: str
|
||||||
|
) -> Optional[str]:
|
||||||
is_a_stream = msg["type"] == "stream"
|
is_a_stream = msg["type"] == "stream"
|
||||||
in_the_specified_stream = msg["display_recipient"] == config["stream"]
|
if not is_a_stream:
|
||||||
at_the_specified_subject = msg["subject"] == config["topic"]
|
return None
|
||||||
|
|
||||||
# We do this to identify the messages generated from Matrix -> Zulip
|
stream_name = msg["display_recipient"]
|
||||||
# and we make sure we don't forward it again to the Matrix.
|
topic_name = msg["subject"]
|
||||||
not_from_zulip_bot = msg["sender_email"] != config["email"]
|
stream_topic: StreamTopicT = (stream_name, topic_name)
|
||||||
if is_a_stream and not_from_zulip_bot and in_the_specified_stream and at_the_specified_subject:
|
if stream_topic not in zulip_to_slack_map:
|
||||||
return True
|
return None
|
||||||
return False
|
|
||||||
|
# We do this to identify the messages generated from Slack -> Zulip
|
||||||
|
# and we make sure we don't forward it again to the Slack.
|
||||||
|
from_zulip_bot = msg["sender_email"] == bot_email
|
||||||
|
if from_zulip_bot:
|
||||||
|
return None
|
||||||
|
return zulip_to_slack_map[stream_topic]
|
||||||
|
|
||||||
|
|
||||||
class SlackBridge:
|
class SlackBridge:
|
||||||
|
@ -37,14 +47,17 @@ class SlackBridge:
|
||||||
self.zulip_config = config["zulip"]
|
self.zulip_config = config["zulip"]
|
||||||
self.slack_config = config["slack"]
|
self.slack_config = config["slack"]
|
||||||
|
|
||||||
|
self.slack_to_zulip_map: Dict[str, Dict[str, str]] = config["channel_mapping"]
|
||||||
|
self.zulip_to_slack_map: Dict[StreamTopicT, str] = {
|
||||||
|
(z["stream"], z["topic"]): s for s, z in config["channel_mapping"].items()
|
||||||
|
}
|
||||||
|
|
||||||
# zulip-specific
|
# zulip-specific
|
||||||
self.zulip_client = zulip.Client(
|
self.zulip_client = zulip.Client(
|
||||||
email=self.zulip_config["email"],
|
email=self.zulip_config["email"],
|
||||||
api_key=self.zulip_config["api_key"],
|
api_key=self.zulip_config["api_key"],
|
||||||
site=self.zulip_config["site"],
|
site=self.zulip_config["site"],
|
||||||
)
|
)
|
||||||
self.zulip_stream = self.zulip_config["stream"]
|
|
||||||
self.zulip_subject = self.zulip_config["topic"]
|
|
||||||
|
|
||||||
# slack-specific
|
# slack-specific
|
||||||
self.channel = self.slack_config["channel"]
|
self.channel = self.slack_config["channel"]
|
||||||
|
@ -68,14 +81,16 @@ class SlackBridge:
|
||||||
|
|
||||||
def zulip_to_slack(self) -> Callable[[Dict[str, Any]], None]:
|
def zulip_to_slack(self) -> Callable[[Dict[str, Any]], None]:
|
||||||
def _zulip_to_slack(msg: Dict[str, Any]) -> None:
|
def _zulip_to_slack(msg: Dict[str, Any]) -> None:
|
||||||
message_valid = check_zulip_message_validity(msg, self.zulip_config)
|
slack_channel = get_slack_channel_for_zulip_message(
|
||||||
if message_valid:
|
msg, self.zulip_to_slack_map, self.zulip_config["email"]
|
||||||
|
)
|
||||||
|
if slack_channel is not None:
|
||||||
self.wrap_slack_mention_with_bracket(msg)
|
self.wrap_slack_mention_with_bracket(msg)
|
||||||
slack_text = SLACK_MESSAGE_TEMPLATE.format(
|
slack_text = SLACK_MESSAGE_TEMPLATE.format(
|
||||||
username=msg["sender_full_name"], message=msg["content"]
|
username=msg["sender_full_name"], message=msg["content"]
|
||||||
)
|
)
|
||||||
self.slack_webclient.chat_postMessage(
|
self.slack_webclient.chat_postMessage(
|
||||||
channel=self.channel,
|
channel=slack_channel,
|
||||||
text=slack_text,
|
text=slack_text,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -91,7 +106,7 @@ class SlackBridge:
|
||||||
|
|
||||||
@rtm.on("message")
|
@rtm.on("message")
|
||||||
def slack_to_zulip(client: RTMClient, event: Dict[str, Any]) -> None:
|
def slack_to_zulip(client: RTMClient, event: Dict[str, Any]) -> None:
|
||||||
if event["channel"] != self.channel:
|
if event["channel"] not in self.slack_to_zulip_map:
|
||||||
return
|
return
|
||||||
user_id = event["user"]
|
user_id = event["user"]
|
||||||
user = self.slack_id_to_name[user_id]
|
user = self.slack_id_to_name[user_id]
|
||||||
|
@ -100,8 +115,12 @@ class SlackBridge:
|
||||||
return
|
return
|
||||||
self.replace_slack_id_with_name(event)
|
self.replace_slack_id_with_name(event)
|
||||||
content = ZULIP_MESSAGE_TEMPLATE.format(username=user, message=event["text"])
|
content = ZULIP_MESSAGE_TEMPLATE.format(username=user, message=event["text"])
|
||||||
|
zulip_endpoint = self.slack_to_zulip_map[event["channel"]]
|
||||||
msg_data = dict(
|
msg_data = dict(
|
||||||
type="stream", to=self.zulip_stream, subject=self.zulip_subject, content=content
|
type="stream",
|
||||||
|
to=zulip_endpoint["stream"],
|
||||||
|
subject=zulip_endpoint["topic"],
|
||||||
|
content=content,
|
||||||
)
|
)
|
||||||
self.zulip_client.send_message(msg_data)
|
self.zulip_client.send_message(msg_data)
|
||||||
|
|
||||||
|
@ -118,10 +137,16 @@ if __name__ == "__main__":
|
||||||
sys.path.append(os.path.join(os.path.dirname(__file__), ".."))
|
sys.path.append(os.path.join(os.path.dirname(__file__), ".."))
|
||||||
parser = argparse.ArgumentParser(usage=usage)
|
parser = argparse.ArgumentParser(usage=usage)
|
||||||
|
|
||||||
print("Starting slack mirroring bot")
|
config: Dict[str, Any] = bridge_with_slack_config.config
|
||||||
print("MAKE SURE THE BOT IS SUBSCRIBED TO THE RELEVANT ZULIP STREAM")
|
if "channel_mapping" not in config:
|
||||||
|
print(
|
||||||
|
'The key "channel_mapping" is not found in bridge_with_slack_config.py.\n'
|
||||||
|
"Your config file may be outdated."
|
||||||
|
)
|
||||||
|
exit(1)
|
||||||
|
|
||||||
config = bridge_with_slack_config.config
|
print("Starting slack mirroring bot")
|
||||||
|
print("MAKE SURE THE BOT IS SUBSCRIBED TO THE RELEVANT ZULIP STREAM(S) & SLACK CHANNEL(S)!")
|
||||||
|
|
||||||
# We have to define rtm outside of SlackBridge because the rtm variable is used as a method decorator.
|
# We have to define rtm outside of SlackBridge because the rtm variable is used as a method decorator.
|
||||||
rtm = RTMClient(token=config["slack"]["token"])
|
rtm = RTMClient(token=config["slack"]["token"])
|
||||||
|
|
|
@ -6,10 +6,10 @@ import random
|
||||||
import subprocess
|
import subprocess
|
||||||
import sys
|
import sys
|
||||||
import time
|
import time
|
||||||
|
from ctypes import byref, c_int, c_ushort
|
||||||
from typing import Dict, List, Set, Tuple
|
from typing import Dict, List, Set, Tuple
|
||||||
|
|
||||||
import zephyr
|
import zephyr_ctypes
|
||||||
|
|
||||||
import zulip
|
import zulip
|
||||||
|
|
||||||
parser = optparse.OptionParser()
|
parser = optparse.OptionParser()
|
||||||
|
@ -136,9 +136,43 @@ for (stream, test) in test_streams:
|
||||||
actually_subscribed = False
|
actually_subscribed = False
|
||||||
for tries in range(10):
|
for tries in range(10):
|
||||||
try:
|
try:
|
||||||
zephyr.init()
|
zephyr_ctypes.check(zephyr_ctypes.ZInitialize())
|
||||||
zephyr._z.subAll(zephyr_subs_to_add)
|
zephyr_port = c_ushort()
|
||||||
zephyr_subs = zephyr._z.getSubscriptions()
|
zephyr_ctypes.check(zephyr_ctypes.ZOpenPort(byref(zephyr_port)))
|
||||||
|
zephyr_ctypes.check(zephyr_ctypes.ZCancelSubscriptions(0))
|
||||||
|
|
||||||
|
zephyr_ctypes.check(
|
||||||
|
zephyr_ctypes.ZSubscribeTo(
|
||||||
|
(zephyr_ctypes.ZSubscription_t * len(zephyr_subs_to_add))(
|
||||||
|
*(
|
||||||
|
zephyr_ctypes.ZSubscription_t(
|
||||||
|
zsub_class=cls.encode(),
|
||||||
|
zsub_classinst=instance.encode(),
|
||||||
|
zsub_recipient=recipient.encode(),
|
||||||
|
)
|
||||||
|
for cls, instance, recipient in zephyr_subs_to_add
|
||||||
|
)
|
||||||
|
),
|
||||||
|
len(zephyr_subs_to_add),
|
||||||
|
0,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
nsubs = c_int()
|
||||||
|
zephyr_ctypes.check(zephyr_ctypes.ZRetrieveSubscriptions(0, byref(nsubs)))
|
||||||
|
zsubs = (zephyr_ctypes.ZSubscription_t * nsubs.value)()
|
||||||
|
zephyr_ctypes.check(zephyr_ctypes.ZGetSubscriptions(zsubs, byref(nsubs)))
|
||||||
|
zephyr_subs = {
|
||||||
|
(
|
||||||
|
zsub.zsub_class.decode(),
|
||||||
|
zsub.zsub_classinst.decode(),
|
||||||
|
zsub.zsub_recipient.decode(),
|
||||||
|
)
|
||||||
|
for zsub in zsubs
|
||||||
|
}
|
||||||
|
finally:
|
||||||
|
zephyr_ctypes.ZFlushSubscriptions()
|
||||||
|
|
||||||
missing = 0
|
missing = 0
|
||||||
for elt in zephyr_subs_to_add:
|
for elt in zephyr_subs_to_add:
|
||||||
|
@ -148,8 +182,8 @@ for tries in range(10):
|
||||||
if missing == 0:
|
if missing == 0:
|
||||||
actually_subscribed = True
|
actually_subscribed = True
|
||||||
break
|
break
|
||||||
except OSError as e:
|
except zephyr_ctypes.ZephyrError as e:
|
||||||
if "SERVNAK received" in e.args:
|
if e.code == zephyr_ctypes.ZERR_SERVNAK:
|
||||||
logger.error("SERVNAK repeatedly received, punting rest of test")
|
logger.error("SERVNAK repeatedly received, punting rest of test")
|
||||||
else:
|
else:
|
||||||
logger.exception("Exception subscribing to zephyrs")
|
logger.exception("Exception subscribing to zephyrs")
|
||||||
|
@ -185,15 +219,16 @@ notices = []
|
||||||
# receive queue with 30+ messages, which might result in messages
|
# receive queue with 30+ messages, which might result in messages
|
||||||
# being dropped.
|
# being dropped.
|
||||||
def receive_zephyrs() -> None:
|
def receive_zephyrs() -> None:
|
||||||
while True:
|
while zephyr_ctypes.ZPending() != 0:
|
||||||
|
notice = zephyr_ctypes.ZNotice_t()
|
||||||
|
sender = zephyr_ctypes.sockaddr_in()
|
||||||
try:
|
try:
|
||||||
notice = zephyr.receive(block=False)
|
zephyr_ctypes.check(zephyr_ctypes.ZReceiveNotice(byref(notice), byref(sender)))
|
||||||
except Exception:
|
except zephyr_ctypes.ZephyrError:
|
||||||
logging.exception("Exception receiving zephyrs:")
|
logging.exception("Exception receiving zephyrs:")
|
||||||
notice = None
|
|
||||||
if notice is None:
|
|
||||||
break
|
break
|
||||||
if notice.opcode != "":
|
if notice.z_opcode != b"":
|
||||||
|
zephyr_ctypes.ZFreeNotice(byref(notice))
|
||||||
continue
|
continue
|
||||||
notices.append(notice)
|
notices.append(notice)
|
||||||
|
|
||||||
|
@ -294,10 +329,16 @@ def process_keys(content_list: List[str]) -> Tuple[Dict[str, int], Set[str], Set
|
||||||
# The h_foo variables are about the messages we _received_ in Zulip
|
# The h_foo variables are about the messages we _received_ in Zulip
|
||||||
# The z_foo variables are about the messages we _received_ in Zephyr
|
# The z_foo variables are about the messages we _received_ in Zephyr
|
||||||
h_contents = [message["content"] for message in messages]
|
h_contents = [message["content"] for message in messages]
|
||||||
z_contents = [notice.message.split("\0")[1] for notice in notices]
|
z_contents = [
|
||||||
|
notice.z_message[: notice.z_message_len].split(b"\0")[1].decode(errors="replace")
|
||||||
|
for notice in notices
|
||||||
|
]
|
||||||
(h_key_counts, h_missing_z, h_missing_h, h_duplicates, h_success) = process_keys(h_contents)
|
(h_key_counts, h_missing_z, h_missing_h, h_duplicates, h_success) = process_keys(h_contents)
|
||||||
(z_key_counts, z_missing_z, z_missing_h, z_duplicates, z_success) = process_keys(z_contents)
|
(z_key_counts, z_missing_z, z_missing_h, z_duplicates, z_success) = process_keys(z_contents)
|
||||||
|
|
||||||
|
for notice in notices:
|
||||||
|
zephyr_ctypes.ZFreeNotice(byref(notice))
|
||||||
|
|
||||||
if z_success and h_success:
|
if z_success and h_success:
|
||||||
logger.info("Success!")
|
logger.info("Success!")
|
||||||
print_status_and_exit(0)
|
print_status_and_exit(0)
|
||||||
|
|
|
@ -18,7 +18,7 @@ api_key_path = f"/home/zulip/api-keys/{program_name}"
|
||||||
open(api_key_path, "w").write(api_key + "\n")
|
open(api_key_path, "w").write(api_key + "\n")
|
||||||
|
|
||||||
# Setup supervisord configuration
|
# Setup supervisord configuration
|
||||||
supervisor_path = f"/etc/supervisor/conf.d/zulip/{program_name}.conf"
|
supervisor_path = f"/etc/supervisor/conf.d/zmirror/{program_name}.conf"
|
||||||
template = os.path.join(os.path.dirname(__file__), "zmirror_private.conf.template")
|
template = os.path.join(os.path.dirname(__file__), "zmirror_private.conf.template")
|
||||||
template_data = open(template).read()
|
template_data = open(template).read()
|
||||||
session_path = f"/home/zulip/zephyr_sessions/{program_name}"
|
session_path = f"/home/zulip/zephyr_sessions/{program_name}"
|
||||||
|
|
207
zulip/integrations/zephyr/zephyr_ctypes.py
Normal file
207
zulip/integrations/zephyr/zephyr_ctypes.py
Normal file
|
@ -0,0 +1,207 @@
|
||||||
|
from ctypes import (
|
||||||
|
CDLL,
|
||||||
|
CFUNCTYPE,
|
||||||
|
POINTER,
|
||||||
|
Structure,
|
||||||
|
Union,
|
||||||
|
c_char,
|
||||||
|
c_char_p,
|
||||||
|
c_int,
|
||||||
|
c_long,
|
||||||
|
c_uint,
|
||||||
|
c_uint8,
|
||||||
|
c_uint16,
|
||||||
|
c_uint32,
|
||||||
|
c_ushort,
|
||||||
|
c_void_p,
|
||||||
|
)
|
||||||
|
|
||||||
|
libc = CDLL("libc.so.6")
|
||||||
|
com_err = CDLL("libcom_err.so.2")
|
||||||
|
libzephyr = CDLL("libzephyr.so.4")
|
||||||
|
|
||||||
|
|
||||||
|
# --- glibc/bits/sockaddr.h ---
|
||||||
|
|
||||||
|
sa_family_t = c_ushort
|
||||||
|
|
||||||
|
|
||||||
|
# --- glibc/sysdeps/unix/sysv/linux/bits/socket.h ---
|
||||||
|
|
||||||
|
|
||||||
|
class sockaddr(Structure):
|
||||||
|
_fields_ = [
|
||||||
|
("sa_family", sa_family_t),
|
||||||
|
("sa_data", c_char * 14),
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
# --- glibc/inet/netinet/in.h ---
|
||||||
|
|
||||||
|
in_port_t = c_uint16
|
||||||
|
in_addr_t = c_uint32
|
||||||
|
|
||||||
|
|
||||||
|
class in_addr(Structure):
|
||||||
|
_fields_ = [
|
||||||
|
("s_addr", in_addr_t),
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
class sockaddr_in(Structure):
|
||||||
|
_fields_ = [
|
||||||
|
("sin_family", sa_family_t),
|
||||||
|
("sin_port", in_port_t),
|
||||||
|
("sin_addr", in_addr),
|
||||||
|
("sin_zero", c_uint8 * 8),
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
class in6_addr(Structure):
|
||||||
|
_fields_ = [
|
||||||
|
("s6_addr", c_uint8 * 16),
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
class sockaddr_in6(Structure):
|
||||||
|
_fields_ = [
|
||||||
|
("sin6_family", sa_family_t),
|
||||||
|
("sin6_port", in_port_t),
|
||||||
|
("sin6_flowinfo", c_uint32),
|
||||||
|
("sin6_addr", in6_addr),
|
||||||
|
("sin6_scope_id", c_uint32),
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
# --- glibc/stdlib/stdlib.h ---
|
||||||
|
|
||||||
|
free = CFUNCTYPE(None, c_void_p)(("free", libc))
|
||||||
|
|
||||||
|
|
||||||
|
# --- e2fsprogs/lib/et/com_err.h ---
|
||||||
|
|
||||||
|
error_message = CFUNCTYPE(c_char_p, c_long)(("error_message", com_err))
|
||||||
|
|
||||||
|
|
||||||
|
# --- zephyr/h/zephyr/zephyr.h ---
|
||||||
|
|
||||||
|
Z_MAXOTHERFIELDS = 10
|
||||||
|
|
||||||
|
ZNotice_Kind_t = c_int
|
||||||
|
|
||||||
|
|
||||||
|
class _ZTimeval(Structure):
|
||||||
|
_fields_ = [
|
||||||
|
("tv_sec", c_int),
|
||||||
|
("tv_usec", c_int),
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
class ZUnique_Id_t(Structure):
|
||||||
|
_fields_ = [
|
||||||
|
("zuid_addr", in_addr),
|
||||||
|
("tv", _ZTimeval),
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
ZChecksum_t = c_uint
|
||||||
|
|
||||||
|
|
||||||
|
class _ZSenderSockaddr(Union):
|
||||||
|
_fields_ = [
|
||||||
|
("sa", sockaddr),
|
||||||
|
("ip4", sockaddr_in),
|
||||||
|
("ip6", sockaddr_in6),
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
class ZNotice_t(Structure):
|
||||||
|
_fields_ = [
|
||||||
|
("z_packet", c_char_p),
|
||||||
|
("z_version", c_char_p),
|
||||||
|
("z_kind", ZNotice_Kind_t),
|
||||||
|
("z_uid", ZUnique_Id_t),
|
||||||
|
("z_sender_sockaddr", _ZSenderSockaddr),
|
||||||
|
("z_time", _ZTimeval),
|
||||||
|
("z_port", c_ushort),
|
||||||
|
("z_charset", c_ushort),
|
||||||
|
("z_auth", c_int),
|
||||||
|
("z_checked_auth", c_int),
|
||||||
|
("z_authent_len", c_int),
|
||||||
|
("z_ascii_authent", c_char_p),
|
||||||
|
("z_class", c_char_p),
|
||||||
|
("z_class_inst", c_char_p),
|
||||||
|
("z_opcode", c_char_p),
|
||||||
|
("z_sender", c_char_p),
|
||||||
|
("z_recipient", c_char_p),
|
||||||
|
("z_default_format", c_char_p),
|
||||||
|
("z_multinotice", c_char_p),
|
||||||
|
("z_multiuid", ZUnique_Id_t),
|
||||||
|
("z_checksum", ZChecksum_t),
|
||||||
|
("z_ascii_checksum", c_char_p),
|
||||||
|
("z_num_other_fields", c_int),
|
||||||
|
("z_other_fields", c_char_p * Z_MAXOTHERFIELDS),
|
||||||
|
("z_message", POINTER(c_char)),
|
||||||
|
("z_message_len", c_int),
|
||||||
|
("z_num_hdr_fields", c_uint),
|
||||||
|
("z_hdr_fields", POINTER(c_char_p)),
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
class ZSubscription_t(Structure):
|
||||||
|
_fields_ = [
|
||||||
|
("zsub_recipient", c_char_p),
|
||||||
|
("zsub_class", c_char_p),
|
||||||
|
("zsub_classinst", c_char_p),
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
Code_t = c_int
|
||||||
|
|
||||||
|
ZInitialize = CFUNCTYPE(Code_t)(("ZInitialize", libzephyr))
|
||||||
|
ZRetrieveSubscriptions = CFUNCTYPE(Code_t, c_ushort, POINTER(c_int))(
|
||||||
|
("ZRetrieveSubscriptions", libzephyr)
|
||||||
|
)
|
||||||
|
ZGetSubscriptions = CFUNCTYPE(Code_t, POINTER(ZSubscription_t), POINTER(c_int))(
|
||||||
|
("ZGetSubscriptions", libzephyr)
|
||||||
|
)
|
||||||
|
ZOpenPort = CFUNCTYPE(Code_t, POINTER(c_ushort))(("ZOpenPort", libzephyr))
|
||||||
|
ZFlushSubscriptions = CFUNCTYPE(Code_t)(("ZFlushSubscriptions", libzephyr))
|
||||||
|
ZFreeNotice = CFUNCTYPE(Code_t, POINTER(ZNotice_t))(("ZFreeNotice", libzephyr))
|
||||||
|
ZSubscribeTo = CFUNCTYPE(Code_t, POINTER(ZSubscription_t), c_int, c_uint)(
|
||||||
|
("ZSubscribeTo", libzephyr)
|
||||||
|
)
|
||||||
|
ZCancelSubscriptions = CFUNCTYPE(Code_t, c_uint)(("ZCancelSubscriptions", libzephyr))
|
||||||
|
ZPending = CFUNCTYPE(c_int)(("ZPending", libzephyr))
|
||||||
|
ZReceiveNotice = CFUNCTYPE(Code_t, POINTER(ZNotice_t), POINTER(sockaddr_in))(
|
||||||
|
("ZReceiveNotice", libzephyr)
|
||||||
|
)
|
||||||
|
ZDumpSession = CFUNCTYPE(Code_t, POINTER(POINTER(c_char)), POINTER(c_int))(
|
||||||
|
("ZDumpSession", libzephyr)
|
||||||
|
)
|
||||||
|
ZLoadSession = CFUNCTYPE(Code_t, POINTER(c_char), c_int)(("ZLoadSession", libzephyr))
|
||||||
|
ZGetFD = CFUNCTYPE(c_int)(("ZGetFD", libzephyr))
|
||||||
|
|
||||||
|
ZERR_NONE = 0
|
||||||
|
|
||||||
|
|
||||||
|
# --- zephyr/lib/zephyr_err.et ---
|
||||||
|
|
||||||
|
ERROR_TABLE_BASE_zeph = -772103680
|
||||||
|
ZERR_SERVNAK = ERROR_TABLE_BASE_zeph + 16
|
||||||
|
|
||||||
|
|
||||||
|
# --- convenience helpers ---
|
||||||
|
|
||||||
|
|
||||||
|
class ZephyrError(Exception):
|
||||||
|
def __init__(self, code: int) -> None:
|
||||||
|
self.code = code
|
||||||
|
|
||||||
|
def __str__(self) -> str:
|
||||||
|
return error_message(self.code).decode()
|
||||||
|
|
||||||
|
|
||||||
|
def check(code: int) -> None:
|
||||||
|
if code != ZERR_NONE:
|
||||||
|
raise ZephyrError(code)
|
|
@ -1,5 +1,6 @@
|
||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
import asyncio
|
||||||
import os
|
import os
|
||||||
import signal
|
import signal
|
||||||
import subprocess
|
import subprocess
|
||||||
|
@ -32,21 +33,22 @@ if options.sync_subscriptions:
|
||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
|
|
||||||
if options.forward_class_messages and not options.noshard:
|
if options.forward_class_messages and not options.noshard:
|
||||||
# Needed to get access to zephyr.lib.parallel
|
|
||||||
sys.path.append("/home/zulip/zulip")
|
|
||||||
if options.on_startup_command is not None:
|
if options.on_startup_command is not None:
|
||||||
subprocess.call([options.on_startup_command])
|
subprocess.call([options.on_startup_command])
|
||||||
from zerver.lib.parallel import run_parallel
|
|
||||||
|
|
||||||
print("Starting parallel zephyr class mirroring bot")
|
print("Starting parallel zephyr class mirroring bot")
|
||||||
jobs = list("0123456789abcdef")
|
shards = list("0123456789abcdef")
|
||||||
|
|
||||||
def run_job(shard: str) -> int:
|
async def run_shard(shard: str) -> int:
|
||||||
subprocess.call(args + [f"--shard={shard}"])
|
process = await asyncio.create_subprocess_exec(*args, f"--shard={shard}")
|
||||||
return 0
|
return await process.wait()
|
||||||
|
|
||||||
for (status, job) in run_parallel(run_job, jobs, threads=16):
|
async def run_shards():
|
||||||
print("A mirroring shard died!")
|
for coro in asyncio.as_completed(map(run_shard, shards)):
|
||||||
|
await coro
|
||||||
|
print("A mirroring shard died!")
|
||||||
|
|
||||||
|
asyncio.run(run_shards())
|
||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
|
|
||||||
backoff = RandomExponentialBackoff(timeout_success_equivalent=300)
|
backoff = RandomExponentialBackoff(timeout_success_equivalent=300)
|
||||||
|
|
|
@ -13,18 +13,23 @@ import sys
|
||||||
import tempfile
|
import tempfile
|
||||||
import textwrap
|
import textwrap
|
||||||
import time
|
import time
|
||||||
|
from ctypes import POINTER, byref, c_char, c_int, c_ushort
|
||||||
|
from queue import Queue
|
||||||
|
from threading import Thread
|
||||||
from types import FrameType
|
from types import FrameType
|
||||||
from typing import IO, Any, Dict, List, NoReturn, Optional, Set, Tuple, Union
|
from typing import IO, Any, Dict, List, NoReturn, Optional, Set, Tuple, Union
|
||||||
|
|
||||||
from typing_extensions import Literal, TypedDict
|
from typing_extensions import Literal, TypedDict
|
||||||
|
|
||||||
|
import zephyr_ctypes
|
||||||
|
import zulip
|
||||||
from zulip import RandomExponentialBackoff
|
from zulip import RandomExponentialBackoff
|
||||||
|
|
||||||
DEFAULT_SITE = "https://api.zulip.com"
|
DEFAULT_SITE = "https://api.zulip.com"
|
||||||
|
|
||||||
|
|
||||||
class States:
|
class States:
|
||||||
Startup, ZulipToZephyr, ZephyrToZulip, ChildSending = list(range(4))
|
Startup, ZulipToZephyr, ZephyrToZulip = list(range(3))
|
||||||
|
|
||||||
|
|
||||||
CURRENT_STATE = States.Startup
|
CURRENT_STATE = States.Startup
|
||||||
|
@ -32,6 +37,16 @@ CURRENT_STATE = States.Startup
|
||||||
logger: logging.Logger
|
logger: logging.Logger
|
||||||
|
|
||||||
|
|
||||||
|
def make_zulip_client() -> zulip.Client:
|
||||||
|
return zulip.Client(
|
||||||
|
email=zulip_account_email,
|
||||||
|
api_key=api_key,
|
||||||
|
verbose=True,
|
||||||
|
client="zephyr_mirror",
|
||||||
|
site=options.site,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def to_zulip_username(zephyr_username: str) -> str:
|
def to_zulip_username(zephyr_username: str) -> str:
|
||||||
if "@" in zephyr_username:
|
if "@" in zephyr_username:
|
||||||
(user, realm) = zephyr_username.split("@")
|
(user, realm) = zephyr_username.split("@")
|
||||||
|
@ -117,7 +132,7 @@ class ZephyrDict(TypedDict, total=False):
|
||||||
zsig: str
|
zsig: str
|
||||||
|
|
||||||
|
|
||||||
def send_zulip(zeph: ZephyrDict) -> Dict[str, Any]:
|
def send_zulip(zulip_client: zulip.Client, zeph: ZephyrDict) -> Dict[str, Any]:
|
||||||
message: Dict[str, Any]
|
message: Dict[str, Any]
|
||||||
message = {}
|
message = {}
|
||||||
if options.forward_class_messages:
|
if options.forward_class_messages:
|
||||||
|
@ -151,7 +166,7 @@ def send_zulip(zeph: ZephyrDict) -> Dict[str, Any]:
|
||||||
return zulip_client.send_message(message)
|
return zulip_client.send_message(message)
|
||||||
|
|
||||||
|
|
||||||
def send_error_zulip(error_msg: str) -> None:
|
def send_error_zulip(zulip_client: zulip.Client, error_msg: str) -> None:
|
||||||
message = {
|
message = {
|
||||||
"type": "private",
|
"type": "private",
|
||||||
"sender": zulip_account_email,
|
"sender": zulip_account_email,
|
||||||
|
@ -166,8 +181,23 @@ current_zephyr_subs = set()
|
||||||
|
|
||||||
def zephyr_bulk_subscribe(subs: List[Tuple[str, str, str]]) -> None:
|
def zephyr_bulk_subscribe(subs: List[Tuple[str, str, str]]) -> None:
|
||||||
try:
|
try:
|
||||||
zephyr._z.subAll(subs)
|
zephyr_ctypes.check(
|
||||||
except OSError:
|
zephyr_ctypes.ZSubscribeTo(
|
||||||
|
(zephyr_ctypes.ZSubscription_t * len(subs))(
|
||||||
|
*(
|
||||||
|
zephyr_ctypes.ZSubscription_t(
|
||||||
|
zsub_class=cls.encode(),
|
||||||
|
zsub_classinst=instance.encode(),
|
||||||
|
zsub_recipient=recipient.encode(),
|
||||||
|
)
|
||||||
|
for cls, instance, recipient in subs
|
||||||
|
)
|
||||||
|
),
|
||||||
|
len(subs),
|
||||||
|
0,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
except zephyr_ctypes.ZephyrError:
|
||||||
# Since we haven't added the subscription to
|
# Since we haven't added the subscription to
|
||||||
# current_zephyr_subs yet, we can just return (so that we'll
|
# current_zephyr_subs yet, we can just return (so that we'll
|
||||||
# continue processing normal messages) and we'll end up
|
# continue processing normal messages) and we'll end up
|
||||||
|
@ -176,26 +206,41 @@ def zephyr_bulk_subscribe(subs: List[Tuple[str, str, str]]) -> None:
|
||||||
logger.exception("Error subscribing to streams (will retry automatically):")
|
logger.exception("Error subscribing to streams (will retry automatically):")
|
||||||
logger.warning(f"Streams were: {[cls for cls, instance, recipient in subs]}")
|
logger.warning(f"Streams were: {[cls for cls, instance, recipient in subs]}")
|
||||||
return
|
return
|
||||||
|
|
||||||
try:
|
try:
|
||||||
actual_zephyr_subs = [cls for (cls, _, _) in zephyr._z.getSubscriptions()]
|
nsubs = c_int()
|
||||||
except OSError:
|
zephyr_ctypes.check(zephyr_ctypes.ZRetrieveSubscriptions(0, byref(nsubs)))
|
||||||
|
zsubs = (zephyr_ctypes.ZSubscription_t * nsubs.value)()
|
||||||
|
zephyr_ctypes.check(zephyr_ctypes.ZGetSubscriptions(zsubs, byref(nsubs)))
|
||||||
|
actual_zephyr_subs = {zsub.zsub_class.decode() for zsub in zsubs}
|
||||||
|
except zephyr_ctypes.ZephyrError:
|
||||||
logger.exception("Error getting current Zephyr subscriptions")
|
logger.exception("Error getting current Zephyr subscriptions")
|
||||||
# Don't add anything to current_zephyr_subs so that we'll
|
# Don't add anything to current_zephyr_subs so that we'll
|
||||||
# retry the next time we check for streams to subscribe to
|
# retry the next time we check for streams to subscribe to
|
||||||
# (within 15 seconds).
|
# (within 15 seconds).
|
||||||
return
|
return
|
||||||
|
finally:
|
||||||
|
zephyr_ctypes.ZFlushSubscriptions()
|
||||||
|
|
||||||
for (cls, instance, recipient) in subs:
|
for (cls, instance, recipient) in subs:
|
||||||
if cls not in actual_zephyr_subs:
|
if cls not in actual_zephyr_subs:
|
||||||
logger.error(f"Zephyr failed to subscribe us to {cls}; will retry")
|
logger.error(f"Zephyr failed to subscribe us to {cls}; will retry")
|
||||||
try:
|
# We'll retry automatically when we next check for
|
||||||
# We'll retry automatically when we next check for
|
# streams to subscribe to (within 15 seconds), but
|
||||||
# streams to subscribe to (within 15 seconds), but
|
# it's worth doing 1 retry immediately to avoid
|
||||||
# it's worth doing 1 retry immediately to avoid
|
# missing 15 seconds of messages on the affected
|
||||||
# missing 15 seconds of messages on the affected
|
# classes
|
||||||
# classes
|
zephyr_ctypes.ZSubscribeTo(
|
||||||
zephyr._z.sub(cls, instance, recipient)
|
(zephyr_ctypes.ZSubscription_t * 1)(
|
||||||
except OSError:
|
zephyr_ctypes.ZSubscription_t(
|
||||||
pass
|
zsub_class=cls.encode(),
|
||||||
|
zsub_classinst=instance.encode(),
|
||||||
|
zsub_recipient=recipient.encode(),
|
||||||
|
)
|
||||||
|
),
|
||||||
|
1,
|
||||||
|
0,
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
current_zephyr_subs.add(cls)
|
current_zephyr_subs.add(cls)
|
||||||
|
|
||||||
|
@ -246,8 +291,8 @@ def maybe_restart_mirroring_script() -> None:
|
||||||
logger.warning("zephyr mirroring script has been updated; restarting...")
|
logger.warning("zephyr mirroring script has been updated; restarting...")
|
||||||
maybe_kill_child()
|
maybe_kill_child()
|
||||||
try:
|
try:
|
||||||
zephyr._z.cancelSubs()
|
zephyr_ctypes.check(zephyr_ctypes.ZCancelSubscriptions(0))
|
||||||
except OSError:
|
except zephyr_ctypes.ZephyrError:
|
||||||
# We don't care whether we failed to cancel subs properly, but we should log it
|
# We don't care whether we failed to cancel subs properly, but we should log it
|
||||||
logger.exception("")
|
logger.exception("")
|
||||||
backoff = RandomExponentialBackoff(
|
backoff = RandomExponentialBackoff(
|
||||||
|
@ -263,27 +308,29 @@ def maybe_restart_mirroring_script() -> None:
|
||||||
raise Exception("Failed to reload too many times, aborting!")
|
raise Exception("Failed to reload too many times, aborting!")
|
||||||
|
|
||||||
|
|
||||||
def process_loop(log: Optional[IO[str]]) -> NoReturn:
|
def process_loop(zulip_queue: "Queue[ZephyrDict]", log: Optional[IO[str]]) -> NoReturn:
|
||||||
restart_check_count = 0
|
restart_check_count = 0
|
||||||
last_check_time = time.time()
|
last_check_time = time.time()
|
||||||
recieve_backoff = RandomExponentialBackoff()
|
recieve_backoff = RandomExponentialBackoff()
|
||||||
while True:
|
while True:
|
||||||
select.select([zephyr._z.getFD()], [], [], 15)
|
select.select([zephyr_ctypes.ZGetFD()], [], [], 15)
|
||||||
try:
|
try:
|
||||||
process_backoff = RandomExponentialBackoff()
|
process_backoff = RandomExponentialBackoff()
|
||||||
# Fetch notices from the queue until its empty
|
# Fetch notices from the queue until its empty
|
||||||
while True:
|
while zephyr_ctypes.ZPending() != 0:
|
||||||
notice = zephyr.receive(block=False)
|
notice = zephyr_ctypes.ZNotice_t()
|
||||||
recieve_backoff.succeed()
|
sender = zephyr_ctypes.sockaddr_in()
|
||||||
if notice is None:
|
zephyr_ctypes.check(zephyr_ctypes.ZReceiveNotice(byref(notice), byref(sender)))
|
||||||
break
|
|
||||||
try:
|
try:
|
||||||
process_notice(notice, log)
|
recieve_backoff.succeed()
|
||||||
|
process_notice(notice, zulip_queue, log)
|
||||||
process_backoff.succeed()
|
process_backoff.succeed()
|
||||||
except Exception:
|
except zephyr_ctypes.ZephyrError:
|
||||||
logger.exception("Error relaying zephyr:")
|
logger.exception("Error relaying zephyr:")
|
||||||
process_backoff.fail()
|
process_backoff.fail()
|
||||||
except Exception:
|
finally:
|
||||||
|
zephyr_ctypes.ZFreeNotice(byref(notice))
|
||||||
|
except zephyr_ctypes.ZephyrError:
|
||||||
logger.exception("Error checking for new zephyrs:")
|
logger.exception("Error checking for new zephyrs:")
|
||||||
recieve_backoff.fail()
|
recieve_backoff.fail()
|
||||||
continue
|
continue
|
||||||
|
@ -395,38 +442,47 @@ def decrypt_zephyr(zephyr_class: str, instance: str, body: str) -> str:
|
||||||
return decrypted
|
return decrypted
|
||||||
|
|
||||||
|
|
||||||
def process_notice(notice: "zephyr.ZNotice", log: Optional[IO[str]]) -> None:
|
def process_notice(
|
||||||
assert notice.sender is not None
|
notice: zephyr_ctypes.ZNotice_t, zulip_queue: "Queue[ZephyrDict]", log: Optional[IO[str]]
|
||||||
(zsig, body) = parse_zephyr_body(notice.message, notice.format)
|
) -> None:
|
||||||
|
assert notice.z_sender is not None
|
||||||
|
(zsig, body) = parse_zephyr_body(
|
||||||
|
notice.z_message[: notice.z_message_len].decode(errors="replace"),
|
||||||
|
notice.z_default_format.decode(errors="replace"),
|
||||||
|
)
|
||||||
is_personal = False
|
is_personal = False
|
||||||
is_huddle = False
|
is_huddle = False
|
||||||
|
|
||||||
if notice.opcode == "PING":
|
if notice.z_opcode == b"PING":
|
||||||
# skip PING messages
|
# skip PING messages
|
||||||
return
|
return
|
||||||
|
|
||||||
zephyr_class = notice.cls.lower()
|
zephyr_class = notice.z_class.decode()
|
||||||
|
zephyr_instance = notice.z_class_inst.decode()
|
||||||
|
zephyr_sender = notice.z_sender.decode()
|
||||||
|
|
||||||
if zephyr_class == options.nagios_class:
|
if zephyr_class.lower() == options.nagios_class:
|
||||||
# Mark that we got the message and proceed
|
# Mark that we got the message and proceed
|
||||||
with open(options.nagios_path, "w") as f:
|
with open(options.nagios_path, "w") as f:
|
||||||
f.write("0\n")
|
f.write("0\n")
|
||||||
return
|
return
|
||||||
|
|
||||||
if notice.recipient != "":
|
if notice.z_recipient != b"":
|
||||||
is_personal = True
|
is_personal = True
|
||||||
# Drop messages not to the listed subscriptions
|
# Drop messages not to the listed subscriptions
|
||||||
if is_personal and not options.forward_personals:
|
if is_personal and not options.forward_personals:
|
||||||
return
|
return
|
||||||
if (zephyr_class not in current_zephyr_subs) and not is_personal:
|
if (zephyr_class.lower() not in current_zephyr_subs) and not is_personal:
|
||||||
logger.debug(f"Skipping ... {zephyr_class}/{notice.instance}/{is_personal}")
|
logger.debug(f"Skipping ... {zephyr_class}/{zephyr_instance}/{is_personal}")
|
||||||
return
|
return
|
||||||
if notice.format.startswith("Zephyr error: See") or notice.format.endswith("@(@color(blue))"):
|
if notice.z_default_format.startswith(b"Zephyr error: See") or notice.z_default_format.endswith(
|
||||||
|
b"@(@color(blue))"
|
||||||
|
):
|
||||||
logger.debug("Skipping message we got from Zulip!")
|
logger.debug("Skipping message we got from Zulip!")
|
||||||
return
|
return
|
||||||
if (
|
if (
|
||||||
zephyr_class == "mail"
|
zephyr_class.lower() == "mail"
|
||||||
and notice.instance.lower() == "inbox"
|
and zephyr_instance.lower() == "inbox"
|
||||||
and is_personal
|
and is_personal
|
||||||
and not options.forward_mail_zephyrs
|
and not options.forward_mail_zephyrs
|
||||||
):
|
):
|
||||||
|
@ -440,21 +496,21 @@ def process_notice(notice: "zephyr.ZNotice", log: Optional[IO[str]]) -> None:
|
||||||
huddle_recipients = [
|
huddle_recipients = [
|
||||||
to_zulip_username(x.strip()) for x in body.split("\n")[0][4:].split()
|
to_zulip_username(x.strip()) for x in body.split("\n")[0][4:].split()
|
||||||
]
|
]
|
||||||
if notice.sender not in huddle_recipients:
|
if zephyr_sender not in huddle_recipients:
|
||||||
huddle_recipients.append(to_zulip_username(notice.sender))
|
huddle_recipients.append(to_zulip_username(zephyr_sender))
|
||||||
body = body.split("\n", 1)[1]
|
body = body.split("\n", 1)[1]
|
||||||
|
|
||||||
if (
|
if (
|
||||||
options.forward_class_messages
|
options.forward_class_messages
|
||||||
and notice.opcode is not None
|
and notice.z_opcode is not None
|
||||||
and notice.opcode.lower() == "crypt"
|
and notice.z_opcode.lower() == b"crypt"
|
||||||
):
|
):
|
||||||
body = decrypt_zephyr(zephyr_class, notice.instance.lower(), body)
|
body = decrypt_zephyr(zephyr_class.lower(), zephyr_instance.lower(), body)
|
||||||
|
|
||||||
zeph: ZephyrDict
|
zeph: ZephyrDict
|
||||||
zeph = {
|
zeph = {
|
||||||
"time": str(notice.time),
|
"time": str(notice.z_time.tv_sec + notice.z_time.tv_usec / 1e6),
|
||||||
"sender": notice.sender,
|
"sender": zephyr_sender,
|
||||||
"zsig": zsig, # logged here but not used by app
|
"zsig": zsig, # logged here but not used by app
|
||||||
"content": body,
|
"content": body,
|
||||||
}
|
}
|
||||||
|
@ -462,46 +518,47 @@ def process_notice(notice: "zephyr.ZNotice", log: Optional[IO[str]]) -> None:
|
||||||
zeph["type"] = "private"
|
zeph["type"] = "private"
|
||||||
zeph["recipient"] = huddle_recipients
|
zeph["recipient"] = huddle_recipients
|
||||||
elif is_personal:
|
elif is_personal:
|
||||||
assert notice.recipient is not None
|
assert notice.z_recipient is not None
|
||||||
zeph["type"] = "private"
|
zeph["type"] = "private"
|
||||||
zeph["recipient"] = to_zulip_username(notice.recipient)
|
zeph["recipient"] = to_zulip_username(notice.z_recipient.decode())
|
||||||
else:
|
else:
|
||||||
zeph["type"] = "stream"
|
zeph["type"] = "stream"
|
||||||
zeph["stream"] = zephyr_class
|
zeph["stream"] = zephyr_class.lower()
|
||||||
if notice.instance.strip() != "":
|
if zephyr_instance.strip() != "":
|
||||||
zeph["subject"] = notice.instance
|
zeph["subject"] = zephyr_instance
|
||||||
else:
|
else:
|
||||||
zeph["subject"] = f'(instance "{notice.instance}")'
|
zeph["subject"] = f'(instance "{zephyr_instance}")'
|
||||||
|
|
||||||
# Add instances in for instanced personals
|
# Add instances in for instanced personals
|
||||||
if is_personal:
|
if is_personal:
|
||||||
if notice.cls.lower() != "message" and notice.instance.lower() != "personal":
|
if zephyr_class.lower() != "message" and zephyr_instance.lower() != "personal":
|
||||||
heading = f"[-c {notice.cls} -i {notice.instance}]\n"
|
heading = f"[-c {zephyr_class} -i {zephyr_instance}]\n"
|
||||||
elif notice.cls.lower() != "message":
|
elif zephyr_class.lower() != "message":
|
||||||
heading = f"[-c {notice.cls}]\n"
|
heading = f"[-c {zephyr_class}]\n"
|
||||||
elif notice.instance.lower() != "personal":
|
elif zephyr_instance.lower() != "personal":
|
||||||
heading = f"[-i {notice.instance}]\n"
|
heading = f"[-i {zephyr_instance}]\n"
|
||||||
else:
|
else:
|
||||||
heading = ""
|
heading = ""
|
||||||
zeph["content"] = heading + zeph["content"]
|
zeph["content"] = heading + zeph["content"]
|
||||||
|
|
||||||
logger.info(f"Received a message on {zephyr_class}/{notice.instance} from {notice.sender}...")
|
logger.info(f"Received a message on {zephyr_class}/{zephyr_instance} from {zephyr_sender}...")
|
||||||
if log is not None:
|
if log is not None:
|
||||||
log.write(json.dumps(zeph) + "\n")
|
log.write(json.dumps(zeph) + "\n")
|
||||||
log.flush()
|
log.flush()
|
||||||
|
|
||||||
if os.fork() == 0:
|
zulip_queue.put(zeph)
|
||||||
global CURRENT_STATE
|
|
||||||
CURRENT_STATE = States.ChildSending
|
|
||||||
# Actually send the message in a child process, to avoid blocking.
|
def send_zulip_worker(zulip_queue: "Queue[ZephyrDict]", zulip_client: zulip.Client) -> None:
|
||||||
|
while True:
|
||||||
|
zeph = zulip_queue.get()
|
||||||
try:
|
try:
|
||||||
res = send_zulip(zeph)
|
res = send_zulip(zulip_client, zeph)
|
||||||
if res.get("result") != "success":
|
if res.get("result") != "success":
|
||||||
logger.error(f"Error relaying zephyr:\n{zeph}\n{res}")
|
logger.error(f"Error relaying zephyr:\n{zeph}\n{res}")
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception("Error relaying zephyr:")
|
logger.exception("Error relaying zephyr:")
|
||||||
finally:
|
zulip_queue.task_done()
|
||||||
os._exit(0)
|
|
||||||
|
|
||||||
|
|
||||||
def quit_failed_initialization(message: str) -> str:
|
def quit_failed_initialization(message: str) -> str:
|
||||||
|
@ -514,12 +571,14 @@ def zephyr_init_autoretry() -> None:
|
||||||
backoff = zulip.RandomExponentialBackoff()
|
backoff = zulip.RandomExponentialBackoff()
|
||||||
while backoff.keep_going():
|
while backoff.keep_going():
|
||||||
try:
|
try:
|
||||||
# zephyr.init() tries to clear old subscriptions, and thus
|
# ZCancelSubscriptions sometimes gets a SERVNAK from the server
|
||||||
# sometimes gets a SERVNAK from the server
|
zephyr_ctypes.check(zephyr_ctypes.ZInitialize())
|
||||||
zephyr.init()
|
zephyr_port = c_ushort()
|
||||||
|
zephyr_ctypes.check(zephyr_ctypes.ZOpenPort(byref(zephyr_port)))
|
||||||
|
zephyr_ctypes.check(zephyr_ctypes.ZCancelSubscriptions(0))
|
||||||
backoff.succeed()
|
backoff.succeed()
|
||||||
return
|
return
|
||||||
except OSError:
|
except zephyr_ctypes.ZephyrError:
|
||||||
logger.exception("Error initializing Zephyr library (retrying). Traceback:")
|
logger.exception("Error initializing Zephyr library (retrying). Traceback:")
|
||||||
backoff.fail()
|
backoff.fail()
|
||||||
|
|
||||||
|
@ -532,11 +591,10 @@ def zephyr_load_session_autoretry(session_path: str) -> None:
|
||||||
try:
|
try:
|
||||||
with open(session_path, "rb") as f:
|
with open(session_path, "rb") as f:
|
||||||
session = f.read()
|
session = f.read()
|
||||||
zephyr._z.initialize()
|
zephyr_ctypes.check(zephyr_ctypes.ZInitialize())
|
||||||
zephyr._z.load_session(session)
|
zephyr_ctypes.check(zephyr_ctypes.ZLoadSession(session, len(session)))
|
||||||
zephyr.__inited = True
|
|
||||||
return
|
return
|
||||||
except OSError:
|
except zephyr_ctypes.ZephyrError:
|
||||||
logger.exception("Error loading saved Zephyr session (retrying). Traceback:")
|
logger.exception("Error loading saved Zephyr session (retrying). Traceback:")
|
||||||
backoff.fail()
|
backoff.fail()
|
||||||
|
|
||||||
|
@ -544,13 +602,26 @@ def zephyr_load_session_autoretry(session_path: str) -> None:
|
||||||
|
|
||||||
|
|
||||||
def zephyr_subscribe_autoretry(sub: Tuple[str, str, str]) -> None:
|
def zephyr_subscribe_autoretry(sub: Tuple[str, str, str]) -> None:
|
||||||
|
cls, instance, recipient = sub
|
||||||
backoff = zulip.RandomExponentialBackoff()
|
backoff = zulip.RandomExponentialBackoff()
|
||||||
while backoff.keep_going():
|
while backoff.keep_going():
|
||||||
try:
|
try:
|
||||||
zephyr.Subscriptions().add(sub)
|
zephyr_ctypes.check(
|
||||||
|
zephyr_ctypes.ZSubscribeTo(
|
||||||
|
(zephyr_ctypes.ZSubscription_t * 1)(
|
||||||
|
zephyr_ctypes.ZSubscription_t(
|
||||||
|
zsub_class=cls.encode(),
|
||||||
|
zsub_classinst=instance.encode(),
|
||||||
|
zsub_recipient=recipient.encode(),
|
||||||
|
)
|
||||||
|
),
|
||||||
|
1,
|
||||||
|
0,
|
||||||
|
)
|
||||||
|
)
|
||||||
backoff.succeed()
|
backoff.succeed()
|
||||||
return
|
return
|
||||||
except OSError:
|
except zephyr_ctypes.ZephyrError:
|
||||||
# Probably a SERVNAK from the zephyr server, but log the
|
# Probably a SERVNAK from the zephyr server, but log the
|
||||||
# traceback just in case it's something else
|
# traceback just in case it's something else
|
||||||
logger.exception("Error subscribing to personals (retrying). Traceback:")
|
logger.exception("Error subscribing to personals (retrying). Traceback:")
|
||||||
|
@ -560,6 +631,8 @@ def zephyr_subscribe_autoretry(sub: Tuple[str, str, str]) -> None:
|
||||||
|
|
||||||
|
|
||||||
def zephyr_to_zulip(options: optparse.Values) -> None:
|
def zephyr_to_zulip(options: optparse.Values) -> None:
|
||||||
|
zulip_client = make_zulip_client()
|
||||||
|
|
||||||
if options.use_sessions and os.path.exists(options.session_path):
|
if options.use_sessions and os.path.exists(options.session_path):
|
||||||
logger.info("Loading old session")
|
logger.info("Loading old session")
|
||||||
zephyr_load_session_autoretry(options.session_path)
|
zephyr_load_session_autoretry(options.session_path)
|
||||||
|
@ -575,8 +648,14 @@ def zephyr_to_zulip(options: optparse.Values) -> None:
|
||||||
if options.nagios_class:
|
if options.nagios_class:
|
||||||
zephyr_subscribe_autoretry((options.nagios_class, "*", "*"))
|
zephyr_subscribe_autoretry((options.nagios_class, "*", "*"))
|
||||||
if options.use_sessions:
|
if options.use_sessions:
|
||||||
with open(options.session_path, "wb") as f:
|
buf = POINTER(c_char)()
|
||||||
f.write(zephyr._z.dump_session())
|
buf_len = c_int()
|
||||||
|
zephyr_ctypes.check(zephyr_ctypes.ZDumpSession(byref(buf), byref(buf_len)))
|
||||||
|
try:
|
||||||
|
with open(options.session_path, "wb") as f:
|
||||||
|
f.write(buf[: buf_len.value]) # type: ignore[arg-type] # bytes, but mypy infers List[c_char]
|
||||||
|
finally:
|
||||||
|
zephyr_ctypes.free(buf)
|
||||||
|
|
||||||
if options.logs_to_resend is not None:
|
if options.logs_to_resend is not None:
|
||||||
with open(options.logs_to_resend) as log:
|
with open(options.logs_to_resend) as log:
|
||||||
|
@ -593,18 +672,22 @@ def zephyr_to_zulip(options: optparse.Values) -> None:
|
||||||
"sending saved message to %s from %s..."
|
"sending saved message to %s from %s..."
|
||||||
% (zeph.get("stream", zeph.get("recipient")), zeph["sender"])
|
% (zeph.get("stream", zeph.get("recipient")), zeph["sender"])
|
||||||
)
|
)
|
||||||
send_zulip(zeph)
|
send_zulip(zulip_client, zeph)
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception("Could not send saved zephyr:")
|
logger.exception("Could not send saved zephyr:")
|
||||||
time.sleep(2)
|
time.sleep(2)
|
||||||
|
|
||||||
logger.info("Successfully initialized; Starting receive loop.")
|
logger.info("Successfully initialized; Starting receive loop.")
|
||||||
|
|
||||||
|
# Actually send the messages in a thread, to avoid blocking.
|
||||||
|
zulip_queue: "Queue[ZephyrDict]" = Queue()
|
||||||
|
Thread(target=lambda: send_zulip_worker(zulip_queue, zulip_client)).start()
|
||||||
|
|
||||||
if options.resend_log_path is not None:
|
if options.resend_log_path is not None:
|
||||||
with open(options.resend_log_path, "a") as log:
|
with open(options.resend_log_path, "a") as log:
|
||||||
process_loop(log)
|
process_loop(zulip_queue, log)
|
||||||
else:
|
else:
|
||||||
process_loop(None)
|
process_loop(zulip_queue, None)
|
||||||
|
|
||||||
|
|
||||||
def send_zephyr(zwrite_args: List[str], content: str) -> Tuple[int, str]:
|
def send_zephyr(zwrite_args: List[str], content: str) -> Tuple[int, str]:
|
||||||
|
@ -675,7 +758,7 @@ def zcrypt_encrypt_content(zephyr_class: str, instance: str, content: str) -> Op
|
||||||
return encrypted
|
return encrypted
|
||||||
|
|
||||||
|
|
||||||
def forward_to_zephyr(message: Dict[str, Any]) -> None:
|
def forward_to_zephyr(message: Dict[str, Any], zulip_client: zulip.Client) -> None:
|
||||||
# 'Any' can be of any type of text
|
# 'Any' can be of any type of text
|
||||||
support_heading = "Hi there! This is an automated message from Zulip."
|
support_heading = "Hi there! This is an automated message from Zulip."
|
||||||
support_closing = """If you have any questions, please be in touch through the \
|
support_closing = """If you have any questions, please be in touch through the \
|
||||||
|
@ -749,6 +832,7 @@ Feedback button or at support@zulip.com."""
|
||||||
result = zcrypt_encrypt_content(zephyr_class, instance, wrapped_content)
|
result = zcrypt_encrypt_content(zephyr_class, instance, wrapped_content)
|
||||||
if result is None:
|
if result is None:
|
||||||
send_error_zulip(
|
send_error_zulip(
|
||||||
|
zulip_client,
|
||||||
"""%s
|
"""%s
|
||||||
|
|
||||||
Your Zulip-Zephyr mirror bot was unable to forward that last message \
|
Your Zulip-Zephyr mirror bot was unable to forward that last message \
|
||||||
|
@ -758,7 +842,7 @@ key (perhaps because your AFS tokens expired). That means that while \
|
||||||
Zulip users (like you) received it, Zephyr users did not.
|
Zulip users (like you) received it, Zephyr users did not.
|
||||||
|
|
||||||
%s"""
|
%s"""
|
||||||
% (support_heading, support_closing)
|
% (support_heading, support_closing),
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
@ -775,6 +859,7 @@ Zulip users (like you) received it, Zephyr users did not.
|
||||||
return
|
return
|
||||||
elif code == 0:
|
elif code == 0:
|
||||||
send_error_zulip(
|
send_error_zulip(
|
||||||
|
zulip_client,
|
||||||
"""%s
|
"""%s
|
||||||
|
|
||||||
Your last message was successfully mirrored to zephyr, but zwrite \
|
Your last message was successfully mirrored to zephyr, but zwrite \
|
||||||
|
@ -783,7 +868,7 @@ returned the following warning:
|
||||||
%s
|
%s
|
||||||
|
|
||||||
%s"""
|
%s"""
|
||||||
% (support_heading, stderr, support_closing)
|
% (support_heading, stderr, support_closing),
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
elif code != 0 and (
|
elif code != 0 and (
|
||||||
|
@ -797,6 +882,7 @@ returned the following warning:
|
||||||
if options.ignore_expired_tickets:
|
if options.ignore_expired_tickets:
|
||||||
return
|
return
|
||||||
send_error_zulip(
|
send_error_zulip(
|
||||||
|
zulip_client,
|
||||||
"""%s
|
"""%s
|
||||||
|
|
||||||
Your last message was forwarded from Zulip to Zephyr unauthenticated, \
|
Your last message was forwarded from Zulip to Zephyr unauthenticated, \
|
||||||
|
@ -806,7 +892,7 @@ are running the Zulip-Zephyr mirroring bot, so we can send \
|
||||||
authenticated Zephyr messages for you again.
|
authenticated Zephyr messages for you again.
|
||||||
|
|
||||||
%s"""
|
%s"""
|
||||||
% (support_heading, support_closing)
|
% (support_heading, support_closing),
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
@ -814,6 +900,7 @@ authenticated Zephyr messages for you again.
|
||||||
# probably because the recipient isn't subscribed to personals,
|
# probably because the recipient isn't subscribed to personals,
|
||||||
# but regardless, we should just notify the user.
|
# but regardless, we should just notify the user.
|
||||||
send_error_zulip(
|
send_error_zulip(
|
||||||
|
zulip_client,
|
||||||
"""%s
|
"""%s
|
||||||
|
|
||||||
Your Zulip-Zephyr mirror bot was unable to forward that last message \
|
Your Zulip-Zephyr mirror bot was unable to forward that last message \
|
||||||
|
@ -823,12 +910,12 @@ received it, Zephyr users did not. The error message from zwrite was:
|
||||||
%s
|
%s
|
||||||
|
|
||||||
%s"""
|
%s"""
|
||||||
% (support_heading, stderr, support_closing)
|
% (support_heading, stderr, support_closing),
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|
||||||
def maybe_forward_to_zephyr(message: Dict[str, Any]) -> None:
|
def maybe_forward_to_zephyr(message: Dict[str, Any], zulip_client: zulip.Client) -> None:
|
||||||
# The key string can be used to direct any type of text.
|
# The key string can be used to direct any type of text.
|
||||||
if message["sender_email"] == zulip_account_email:
|
if message["sender_email"] == zulip_account_email:
|
||||||
if not (
|
if not (
|
||||||
|
@ -851,7 +938,7 @@ def maybe_forward_to_zephyr(message: Dict[str, Any]) -> None:
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
try:
|
try:
|
||||||
forward_to_zephyr(message)
|
forward_to_zephyr(message, zulip_client)
|
||||||
except Exception:
|
except Exception:
|
||||||
# Don't let an exception forwarding one message crash the
|
# Don't let an exception forwarding one message crash the
|
||||||
# whole process
|
# whole process
|
||||||
|
@ -859,12 +946,16 @@ def maybe_forward_to_zephyr(message: Dict[str, Any]) -> None:
|
||||||
|
|
||||||
|
|
||||||
def zulip_to_zephyr(options: optparse.Values) -> NoReturn:
|
def zulip_to_zephyr(options: optparse.Values) -> NoReturn:
|
||||||
|
zulip_client = make_zulip_client()
|
||||||
|
|
||||||
# Sync messages from zulip to zephyr
|
# Sync messages from zulip to zephyr
|
||||||
logger.info("Starting syncing messages.")
|
logger.info("Starting syncing messages.")
|
||||||
backoff = RandomExponentialBackoff(timeout_success_equivalent=120)
|
backoff = RandomExponentialBackoff(timeout_success_equivalent=120)
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
zulip_client.call_on_each_message(maybe_forward_to_zephyr)
|
zulip_client.call_on_each_message(
|
||||||
|
lambda message: maybe_forward_to_zephyr(message, zulip_client)
|
||||||
|
)
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception("Error syncing messages:")
|
logger.exception("Error syncing messages:")
|
||||||
backoff.fail()
|
backoff.fail()
|
||||||
|
@ -886,6 +977,8 @@ def subscribed_to_mail_messages() -> bool:
|
||||||
|
|
||||||
|
|
||||||
def add_zulip_subscriptions(verbose: bool) -> None:
|
def add_zulip_subscriptions(verbose: bool) -> None:
|
||||||
|
zulip_client = make_zulip_client()
|
||||||
|
|
||||||
zephyr_subscriptions = set()
|
zephyr_subscriptions = set()
|
||||||
skipped = set()
|
skipped = set()
|
||||||
for (cls, instance, recipient) in parse_zephyr_subs(verbose=verbose):
|
for (cls, instance, recipient) in parse_zephyr_subs(verbose=verbose):
|
||||||
|
@ -1146,14 +1239,14 @@ def parse_args() -> Tuple[optparse.Values, List[str]]:
|
||||||
|
|
||||||
|
|
||||||
def die_gracefully(signal: int, frame: FrameType) -> None:
|
def die_gracefully(signal: int, frame: FrameType) -> None:
|
||||||
if CURRENT_STATE == States.ZulipToZephyr or CURRENT_STATE == States.ChildSending:
|
if CURRENT_STATE == States.ZulipToZephyr:
|
||||||
# this is a child process, so we want os._exit (no clean-up necessary)
|
# this is a child process, so we want os._exit (no clean-up necessary)
|
||||||
os._exit(1)
|
os._exit(1)
|
||||||
|
|
||||||
if CURRENT_STATE == States.ZephyrToZulip and not options.use_sessions:
|
if CURRENT_STATE == States.ZephyrToZulip and not options.use_sessions:
|
||||||
try:
|
try:
|
||||||
# zephyr=>zulip processes may have added subs, so run cancelSubs
|
# zephyr=>zulip processes may have added subs, so run ZCancelSubscriptions
|
||||||
zephyr._z.cancelSubs()
|
zephyr_ctypes.check(zephyr_ctypes.ZCancelSubscriptions(0))
|
||||||
except OSError:
|
except OSError:
|
||||||
# We don't care whether we failed to cancel subs properly, but we should log it
|
# We don't care whether we failed to cancel subs properly, but we should log it
|
||||||
logger.exception("")
|
logger.exception("")
|
||||||
|
@ -1207,15 +1300,6 @@ or specify the --api-key-file option."""
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
zulip_account_email = options.user + "@mit.edu"
|
zulip_account_email = options.user + "@mit.edu"
|
||||||
import zulip
|
|
||||||
|
|
||||||
zulip_client = zulip.Client(
|
|
||||||
email=zulip_account_email,
|
|
||||||
api_key=api_key,
|
|
||||||
verbose=True,
|
|
||||||
client="zephyr_mirror",
|
|
||||||
site=options.site,
|
|
||||||
)
|
|
||||||
|
|
||||||
start_time = time.time()
|
start_time = time.time()
|
||||||
|
|
||||||
|
@ -1274,8 +1358,6 @@ or specify the --api-key-file option."""
|
||||||
child_pid = None
|
child_pid = None
|
||||||
CURRENT_STATE = States.ZephyrToZulip
|
CURRENT_STATE = States.ZephyrToZulip
|
||||||
|
|
||||||
import zephyr
|
|
||||||
|
|
||||||
logger_name = "zephyr=>zulip"
|
logger_name = "zephyr=>zulip"
|
||||||
if options.shard is not None:
|
if options.shard is not None:
|
||||||
logger_name += f"({options.shard})"
|
logger_name += f"({options.shard})"
|
||||||
|
|
|
@ -42,12 +42,12 @@ setup(
|
||||||
"License :: OSI Approved :: Apache Software License",
|
"License :: OSI Approved :: Apache Software License",
|
||||||
"Topic :: Communications :: Chat",
|
"Topic :: Communications :: Chat",
|
||||||
"Programming Language :: Python :: 3",
|
"Programming Language :: Python :: 3",
|
||||||
"Programming Language :: Python :: 3.6",
|
|
||||||
"Programming Language :: Python :: 3.7",
|
"Programming Language :: Python :: 3.7",
|
||||||
"Programming Language :: Python :: 3.8",
|
"Programming Language :: Python :: 3.8",
|
||||||
"Programming Language :: Python :: 3.9",
|
"Programming Language :: Python :: 3.9",
|
||||||
|
"Programming Language :: Python :: 3.10",
|
||||||
],
|
],
|
||||||
python_requires=">=3.6",
|
python_requires=">=3.7",
|
||||||
url="https://www.zulip.org/",
|
url="https://www.zulip.org/",
|
||||||
project_urls={
|
project_urls={
|
||||||
"Source": "https://github.com/zulip/python-zulip-api/",
|
"Source": "https://github.com/zulip/python-zulip-api/",
|
||||||
|
|
|
@ -34,12 +34,12 @@ setup(
|
||||||
"License :: OSI Approved :: Apache Software License",
|
"License :: OSI Approved :: Apache Software License",
|
||||||
"Topic :: Communications :: Chat",
|
"Topic :: Communications :: Chat",
|
||||||
"Programming Language :: Python :: 3",
|
"Programming Language :: Python :: 3",
|
||||||
"Programming Language :: Python :: 3.6",
|
|
||||||
"Programming Language :: Python :: 3.7",
|
"Programming Language :: Python :: 3.7",
|
||||||
"Programming Language :: Python :: 3.8",
|
"Programming Language :: Python :: 3.8",
|
||||||
"Programming Language :: Python :: 3.9",
|
"Programming Language :: Python :: 3.9",
|
||||||
|
"Programming Language :: Python :: 3.10",
|
||||||
],
|
],
|
||||||
python_requires=">=3.6",
|
python_requires=">=3.7",
|
||||||
url="https://www.zulip.org/",
|
url="https://www.zulip.org/",
|
||||||
project_urls={
|
project_urls={
|
||||||
"Source": "https://github.com/zulip/python-zulip-api/",
|
"Source": "https://github.com/zulip/python-zulip-api/",
|
||||||
|
|
|
@ -1,2 +1 @@
|
||||||
python-chess==0.31.* ; python_version < '3.7'
|
chess==1.*
|
||||||
chess==1.* ; python_version >= '3.7'
|
|
||||||
|
|
|
@ -22,12 +22,12 @@ setup(
|
||||||
"License :: OSI Approved :: Apache Software License",
|
"License :: OSI Approved :: Apache Software License",
|
||||||
"Topic :: Communications :: Chat",
|
"Topic :: Communications :: Chat",
|
||||||
"Programming Language :: Python :: 3",
|
"Programming Language :: Python :: 3",
|
||||||
"Programming Language :: Python :: 3.6",
|
|
||||||
"Programming Language :: Python :: 3.7",
|
"Programming Language :: Python :: 3.7",
|
||||||
"Programming Language :: Python :: 3.8",
|
"Programming Language :: Python :: 3.8",
|
||||||
"Programming Language :: Python :: 3.9",
|
"Programming Language :: Python :: 3.9",
|
||||||
|
"Programming Language :: Python :: 3.10",
|
||||||
],
|
],
|
||||||
python_requires=">=3.6",
|
python_requires=">=3.7",
|
||||||
url="https://www.zulip.org/",
|
url="https://www.zulip.org/",
|
||||||
project_urls={
|
project_urls={
|
||||||
"Source": "https://github.com/zulip/python-zulip-api/",
|
"Source": "https://github.com/zulip/python-zulip-api/",
|
||||||
|
|
Loading…
Reference in a new issue