python-zulip-api/zulip/integrations/bridge_with_matrix/test_matrix.py

265 lines
9.6 KiB
Python
Raw Permalink Normal View History

2021-07-15 15:15:19 -04:00
import asyncio
import os
import shutil
import sys
from contextlib import contextmanager
2021-05-28 05:00:04 -04:00
from subprocess import PIPE, Popen
from tempfile import mkdtemp
2021-07-15 15:15:19 -04:00
from typing import Any, Awaitable, Callable, Iterator, List
2021-05-28 05:00:04 -04:00
from unittest import TestCase, mock
2021-07-15 15:15:19 -04:00
import nio
from .matrix_bridge import MatrixToZulip, ZulipToMatrix, read_configuration
script_file = "matrix_bridge.py"
script_dir = os.path.dirname(__file__)
script = os.path.join(script_dir, script_file)
sample_config_path = "matrix_test.conf"
sample_config_text = """[matrix]
host = https://matrix.org
mxid = @username:matrix.org
password = password
room_id = #zulip:matrix.org
[zulip]
email = glitch-bot@chat.zulip.org
api_key = aPiKeY
site = https://chat.zulip.org
stream = test here
topic = matrix
2021-07-15 15:15:19 -04:00
[additional_bridge1]
room_id = #example:matrix.org
stream = new test
topic = matrix
"""
2021-07-15 15:15:19 -04:00
ZULIP_MESSAGE_TEMPLATE: str = "**{username}** [{uid}]: {message}"
# For Python 3.7 compatibility.
2021-07-15 15:15:19 -04:00
# (Since 3.8, there is unittest.IsolatedAsyncioTestCase!)
# source: https://stackoverflow.com/a/46324983
def async_test(coro: Callable[..., Awaitable[Any]]) -> Callable[..., Any]:
def wrapper(*args: Any, **kwargs: Any) -> Any:
loop = asyncio.new_event_loop()
try:
return loop.run_until_complete(coro(*args, **kwargs))
finally:
loop.close()
return wrapper
@contextmanager
def new_temp_dir() -> Iterator[str]:
path = mkdtemp()
yield path
shutil.rmtree(path)
class MatrixBridgeScriptTests(TestCase):
def output_from_script(self, options: List[str]) -> List[str]:
popen = Popen(
[sys.executable, script] + options, stdin=PIPE, stdout=PIPE, universal_newlines=True
)
return popen.communicate()[0].strip().split("\n")
def test_no_args(self) -> None:
output_lines = self.output_from_script([])
expected_lines = [
"Options required: -c or --config to run, OR --write-sample-config.",
f"usage: {script_file} [-h]",
]
for expected, output in zip(expected_lines, output_lines):
self.assertIn(expected, output)
def test_help_usage_and_description(self) -> None:
output_lines = self.output_from_script(["-h"])
usage = f"usage: {script_file} [-h]"
2021-07-15 15:15:19 -04:00
description = "Bridge between Zulip topics and Matrix channels."
self.assertIn(usage, output_lines[0])
blank_lines = [num for num, line in enumerate(output_lines) if line == ""]
# There should be blank lines in the output
self.assertTrue(blank_lines)
# There should be finite output
self.assertTrue(len(output_lines) > blank_lines[0])
# Minimal description should be in the first line of the 2nd "paragraph"
self.assertIn(description, output_lines[blank_lines[0] + 1])
def test_write_sample_config(self) -> None:
with new_temp_dir() as tempdir:
path = os.path.join(tempdir, sample_config_path)
output_lines = self.output_from_script(["--write-sample-config", path])
self.assertEqual(output_lines, [f"Wrote sample configuration to '{path}'"])
with open(path) as sample_file:
self.assertEqual(sample_file.read(), sample_config_text)
def test_write_sample_config_from_zuliprc(self) -> None:
zuliprc_template = ["[api]", "email={email}", "key={key}", "site={site}"]
zulip_params = {
"email": "foo@bar",
"key": "some_api_key",
"site": "https://some.chat.serverplace",
}
with new_temp_dir() as tempdir:
path = os.path.join(tempdir, sample_config_path)
zuliprc_path = os.path.join(tempdir, "zuliprc")
with open(zuliprc_path, "w") as zuliprc_file:
zuliprc_file.write("\n".join(zuliprc_template).format(**zulip_params))
output_lines = self.output_from_script(
["--write-sample-config", path, "--from-zuliprc", zuliprc_path]
)
self.assertEqual(
output_lines,
[
"Wrote sample configuration to '{}' using zuliprc file '{}'".format(
path, zuliprc_path
)
],
)
with open(path) as sample_file:
sample_lines = [line.strip() for line in sample_file.readlines()]
expected_lines = sample_config_text.split("\n")
expected_lines[7] = "email = {}".format(zulip_params["email"])
expected_lines[8] = "api_key = {}".format(zulip_params["key"])
expected_lines[9] = "site = {}".format(zulip_params["site"])
self.assertEqual(sample_lines, expected_lines[:-1])
def test_detect_zuliprc_does_not_exist(self) -> None:
with new_temp_dir() as tempdir:
path = os.path.join(tempdir, sample_config_path)
zuliprc_path = os.path.join(tempdir, "zuliprc")
# No writing of zuliprc file here -> triggers check for zuliprc absence
output_lines = self.output_from_script(
["--write-sample-config", path, "--from-zuliprc", zuliprc_path]
)
self.assertEqual(
output_lines,
[
"Could not write sample config: Zuliprc file '{}' does not exist.".format(
zuliprc_path
)
],
)
2021-07-15 15:15:19 -04:00
def test_parse_multiple_bridges(self) -> None:
with new_temp_dir() as tempdir:
path = os.path.join(tempdir, sample_config_path)
output_lines = self.output_from_script(["--write-sample-config", path])
self.assertEqual(output_lines, [f"Wrote sample configuration to '{path}'"])
2021-07-15 15:15:19 -04:00
config = read_configuration(path)
2021-07-15 15:15:19 -04:00
self.assertIn("zulip", config)
self.assertIn("matrix", config)
self.assertIn("bridges", config["zulip"])
self.assertIn("bridges", config["matrix"])
self.assertEqual(
{
("test here", "matrix"): "#zulip:matrix.org",
("new test", "matrix"): "#example:matrix.org",
},
config["zulip"]["bridges"],
)
self.assertEqual(
{
"#zulip:matrix.org": ("test here", "matrix"),
"#example:matrix.org": ("new test", "matrix"),
},
config["matrix"]["bridges"],
)
2021-07-15 15:15:19 -04:00
class MatrixBridgeMatrixToZulipTests(TestCase):
user_name = "John Smith"
user_uid = "@johnsmith:matrix.org"
room = mock.MagicMock()
room.user_name = lambda _: "John Smith"
2021-07-15 15:15:19 -04:00
def setUp(self) -> None:
self.matrix_to_zulip = mock.MagicMock()
self.matrix_to_zulip.get_message_content_from_event = (
lambda event: MatrixToZulip.get_message_content_from_event(
self.matrix_to_zulip, event, self.room
)
)
2021-07-15 15:15:19 -04:00
@async_test
async def test_get_message_content_from_event(self) -> None:
class RoomMemberEvent(nio.RoomMemberEvent):
def __init__(self, sender: str = self.user_uid) -> None:
self.sender = sender
2021-07-15 15:15:19 -04:00
class RoomMessageFormatted(nio.RoomMessageFormatted):
def __init__(self, sender: str = self.user_uid) -> None:
self.sender = sender
self.body = "this is a message"
2021-07-15 15:15:19 -04:00
self.assertIsNone(
await self.matrix_to_zulip.get_message_content_from_event(RoomMemberEvent())
)
self.assertEqual(
await self.matrix_to_zulip.get_message_content_from_event(RoomMessageFormatted()),
ZULIP_MESSAGE_TEMPLATE.format(
username=self.user_name, uid=self.user_uid, message="this is a message"
),
)
2021-07-15 15:15:19 -04:00
class MatrixBridgeZulipToMatrixTests(TestCase):
room = mock.MagicMock()
valid_zulip_config = dict(
stream="some stream",
topic="some topic",
email="some@email",
bridges={("some stream", "some topic"): room},
)
valid_msg = dict(
sender_email="John@Smith.smith", # must not be equal to config:email
sender_id=42,
type="stream", # Can only mirror Zulip streams
display_recipient=valid_zulip_config["stream"],
subject=valid_zulip_config["topic"],
)
2021-07-15 15:15:19 -04:00
def setUp(self) -> None:
self.zulip_to_matrix = mock.MagicMock()
self.zulip_to_matrix.zulip_config = self.valid_zulip_config
self.zulip_to_matrix.get_matrix_room_for_zulip_message = (
lambda msg: ZulipToMatrix.get_matrix_room_for_zulip_message(self.zulip_to_matrix, msg)
)
2021-07-15 15:15:19 -04:00
def test_get_matrix_room_for_zulip_message_success(self) -> None:
self.assertEqual(
self.zulip_to_matrix.get_matrix_room_for_zulip_message(self.valid_msg), self.room
)
2021-07-15 15:15:19 -04:00
def test_get_matrix_room_for_zulip_message_failure(self) -> None:
self.assertIsNone(
self.zulip_to_matrix.get_matrix_room_for_zulip_message(
dict(self.valid_msg, type="private")
)
)
self.assertIsNone(
self.zulip_to_matrix.get_matrix_room_for_zulip_message(
dict(self.valid_msg, sender_email="some@email")
)
)
self.assertIsNone(
self.zulip_to_matrix.get_matrix_room_for_zulip_message(
dict(self.valid_msg, display_recipient="other stream")
)
)
self.assertIsNone(
self.zulip_to_matrix.get_matrix_room_for_zulip_message(
dict(self.valid_msg, subject="other topic")
)
)