import asyncio import os import shutil import sys from contextlib import contextmanager from subprocess import PIPE, Popen from tempfile import mkdtemp from typing import Any, Awaitable, Callable, Iterator, List from unittest import TestCase, mock import nio from .matrix_bridge import MatrixToZulip, ZulipToMatrix, read_configuration script_file = "matrix_bridge.py" script_dir = os.path.dirname(__file__) script = os.path.join(script_dir, script_file) sample_config_path = "matrix_test.conf" sample_config_text = """[matrix] host = https://matrix.org mxid = @username:matrix.org password = password room_id = #zulip:matrix.org [zulip] email = glitch-bot@chat.zulip.org api_key = aPiKeY site = https://chat.zulip.org stream = test here topic = matrix [additional_bridge1] room_id = #example:matrix.org stream = new test topic = matrix """ ZULIP_MESSAGE_TEMPLATE: str = "**{username}** [{uid}]: {message}" # For Python 3.7 compatibility. # (Since 3.8, there is unittest.IsolatedAsyncioTestCase!) # source: https://stackoverflow.com/a/46324983 def async_test(coro: Callable[..., Awaitable[Any]]) -> Callable[..., Any]: def wrapper(*args: Any, **kwargs: Any) -> Any: loop = asyncio.new_event_loop() try: return loop.run_until_complete(coro(*args, **kwargs)) finally: loop.close() return wrapper @contextmanager def new_temp_dir() -> Iterator[str]: path = mkdtemp() yield path shutil.rmtree(path) class MatrixBridgeScriptTests(TestCase): def output_from_script(self, options: List[str]) -> List[str]: popen = Popen( [sys.executable, script] + options, stdin=PIPE, stdout=PIPE, universal_newlines=True ) return popen.communicate()[0].strip().split("\n") def test_no_args(self) -> None: output_lines = self.output_from_script([]) expected_lines = [ "Options required: -c or --config to run, OR --write-sample-config.", f"usage: {script_file} [-h]", ] for expected, output in zip(expected_lines, output_lines): self.assertIn(expected, output) def test_help_usage_and_description(self) -> None: output_lines = self.output_from_script(["-h"]) usage = f"usage: {script_file} [-h]" description = "Bridge between Zulip topics and Matrix channels." self.assertIn(usage, output_lines[0]) blank_lines = [num for num, line in enumerate(output_lines) if line == ""] # There should be blank lines in the output self.assertTrue(blank_lines) # There should be finite output self.assertTrue(len(output_lines) > blank_lines[0]) # Minimal description should be in the first line of the 2nd "paragraph" self.assertIn(description, output_lines[blank_lines[0] + 1]) def test_write_sample_config(self) -> None: with new_temp_dir() as tempdir: path = os.path.join(tempdir, sample_config_path) output_lines = self.output_from_script(["--write-sample-config", path]) self.assertEqual(output_lines, [f"Wrote sample configuration to '{path}'"]) with open(path) as sample_file: self.assertEqual(sample_file.read(), sample_config_text) def test_write_sample_config_from_zuliprc(self) -> None: zuliprc_template = ["[api]", "email={email}", "key={key}", "site={site}"] zulip_params = { "email": "foo@bar", "key": "some_api_key", "site": "https://some.chat.serverplace", } with new_temp_dir() as tempdir: path = os.path.join(tempdir, sample_config_path) zuliprc_path = os.path.join(tempdir, "zuliprc") with open(zuliprc_path, "w") as zuliprc_file: zuliprc_file.write("\n".join(zuliprc_template).format(**zulip_params)) output_lines = self.output_from_script( ["--write-sample-config", path, "--from-zuliprc", zuliprc_path] ) self.assertEqual( output_lines, [ "Wrote sample configuration to '{}' using zuliprc file '{}'".format( path, zuliprc_path ) ], ) with open(path) as sample_file: sample_lines = [line.strip() for line in sample_file.readlines()] expected_lines = sample_config_text.split("\n") expected_lines[7] = "email = {}".format(zulip_params["email"]) expected_lines[8] = "api_key = {}".format(zulip_params["key"]) expected_lines[9] = "site = {}".format(zulip_params["site"]) self.assertEqual(sample_lines, expected_lines[:-1]) def test_detect_zuliprc_does_not_exist(self) -> None: with new_temp_dir() as tempdir: path = os.path.join(tempdir, sample_config_path) zuliprc_path = os.path.join(tempdir, "zuliprc") # No writing of zuliprc file here -> triggers check for zuliprc absence output_lines = self.output_from_script( ["--write-sample-config", path, "--from-zuliprc", zuliprc_path] ) self.assertEqual( output_lines, [ "Could not write sample config: Zuliprc file '{}' does not exist.".format( zuliprc_path ) ], ) def test_parse_multiple_bridges(self) -> None: with new_temp_dir() as tempdir: path = os.path.join(tempdir, sample_config_path) output_lines = self.output_from_script(["--write-sample-config", path]) self.assertEqual(output_lines, [f"Wrote sample configuration to '{path}'"]) config = read_configuration(path) self.assertIn("zulip", config) self.assertIn("matrix", config) self.assertIn("bridges", config["zulip"]) self.assertIn("bridges", config["matrix"]) self.assertEqual( { ("test here", "matrix"): "#zulip:matrix.org", ("new test", "matrix"): "#example:matrix.org", }, config["zulip"]["bridges"], ) self.assertEqual( { "#zulip:matrix.org": ("test here", "matrix"), "#example:matrix.org": ("new test", "matrix"), }, config["matrix"]["bridges"], ) class MatrixBridgeMatrixToZulipTests(TestCase): user_name = "John Smith" user_uid = "@johnsmith:matrix.org" room = mock.MagicMock() room.user_name = lambda _: "John Smith" def setUp(self) -> None: self.matrix_to_zulip = mock.MagicMock() self.matrix_to_zulip.get_message_content_from_event = ( lambda event: MatrixToZulip.get_message_content_from_event( self.matrix_to_zulip, event, self.room ) ) @async_test async def test_get_message_content_from_event(self) -> None: class RoomMemberEvent(nio.RoomMemberEvent): def __init__(self, sender: str = self.user_uid) -> None: self.sender = sender class RoomMessageFormatted(nio.RoomMessageFormatted): def __init__(self, sender: str = self.user_uid) -> None: self.sender = sender self.body = "this is a message" self.assertIsNone( await self.matrix_to_zulip.get_message_content_from_event(RoomMemberEvent()) ) self.assertEqual( await self.matrix_to_zulip.get_message_content_from_event(RoomMessageFormatted()), ZULIP_MESSAGE_TEMPLATE.format( username=self.user_name, uid=self.user_uid, message="this is a message" ), ) class MatrixBridgeZulipToMatrixTests(TestCase): room = mock.MagicMock() valid_zulip_config = dict( stream="some stream", topic="some topic", email="some@email", bridges={("some stream", "some topic"): room}, ) valid_msg = dict( sender_email="John@Smith.smith", # must not be equal to config:email sender_id=42, type="stream", # Can only mirror Zulip streams display_recipient=valid_zulip_config["stream"], subject=valid_zulip_config["topic"], ) def setUp(self) -> None: self.zulip_to_matrix = mock.MagicMock() self.zulip_to_matrix.zulip_config = self.valid_zulip_config self.zulip_to_matrix.get_matrix_room_for_zulip_message = ( lambda msg: ZulipToMatrix.get_matrix_room_for_zulip_message(self.zulip_to_matrix, msg) ) def test_get_matrix_room_for_zulip_message_success(self) -> None: self.assertEqual( self.zulip_to_matrix.get_matrix_room_for_zulip_message(self.valid_msg), self.room ) def test_get_matrix_room_for_zulip_message_failure(self) -> None: self.assertIsNone( self.zulip_to_matrix.get_matrix_room_for_zulip_message( dict(self.valid_msg, type="private") ) ) self.assertIsNone( self.zulip_to_matrix.get_matrix_room_for_zulip_message( dict(self.valid_msg, sender_email="some@email") ) ) self.assertIsNone( self.zulip_to_matrix.get_matrix_room_for_zulip_message( dict(self.valid_msg, display_recipient="other stream") ) ) self.assertIsNone( self.zulip_to_matrix.get_matrix_room_for_zulip_message( dict(self.valid_msg, subject="other topic") ) )