diff --git a/zulip/integrations/zephyr/check-mirroring b/zulip/integrations/zephyr/check-mirroring index 51f3821..6a61d16 100755 --- a/zulip/integrations/zephyr/check-mirroring +++ b/zulip/integrations/zephyr/check-mirroring @@ -9,7 +9,7 @@ import hashlib import zephyr import zulip -from typing import Any, Dict, List, Set, Tuple +from typing import Dict, List, Set, Tuple parser = optparse.OptionParser() parser.add_option('--verbose', @@ -101,11 +101,11 @@ def send_zulip(message: Dict[str, str]) -> None: # Returns True if and only if we "Detected server failure" sending the zephyr. def send_zephyr(zwrite_args: List[str], content: str) -> bool: p = subprocess.Popen(zwrite_args, stdin=subprocess.PIPE, - stdout=subprocess.PIPE, stderr=subprocess.PIPE) - stdout, stderr = p.communicate(input=content.encode("utf-8")) + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + universal_newlines=True) + stdout, stderr = p.communicate(input=content) if p.returncode != 0: - # FIXME: This should really look for a byte form of the string in stdout - if "Detected server failure while receiving acknowledgement for" in stdout: # type: ignore + if "Detected server failure while receiving acknowledgement for" in stdout: logger.warning("Got server failure error sending zephyr; retrying") logger.warning(stderr) return True @@ -151,7 +151,7 @@ for tries in range(10): actually_subscribed = True break except OSError as e: - if "SERVNAK received" in e: # type: ignore # https://github.com/python/mypy/issues/2118 + if "SERVNAK received" in e.args: logger.error("SERVNAK repeatedly received, punting rest of test") else: logger.exception("Exception subscribing to zephyrs") @@ -163,7 +163,7 @@ if not actually_subscribed: # Prepare keys zhkeys = {} # type: Dict[str, Tuple[str, str]] hzkeys = {} # type: Dict[str, Tuple[str, str]] -def gen_key(key_dict: Dict[str, Any]) -> str: +def gen_key(key_dict: Dict[str, Tuple[str, str]]) -> str: bits = str(random.getrandbits(32)) while bits in key_dict: # Avoid the unlikely event that we get the same bits twice diff --git a/zulip/integrations/zephyr/zephyr_mirror_backend.py b/zulip/integrations/zephyr/zephyr_mirror_backend.py index 4ac1c98..0395984 100755 --- a/zulip/integrations/zephyr/zephyr_mirror_backend.py +++ b/zulip/integrations/zephyr/zephyr_mirror_backend.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 -from typing import Any, Dict, IO, List, NoReturn, Optional, Set, Text, Tuple, cast +from typing import Any, Dict, IO, List, NoReturn, Optional, Set, Tuple, Union from types import FrameType import sys @@ -17,6 +17,8 @@ import hashlib import tempfile import select +from typing_extensions import Literal, TypedDict + from zulip import RandomExponentialBackoff DEFAULT_SITE = "https://api.zulip.com" @@ -25,7 +27,7 @@ class States: Startup, ZulipToZephyr, ZephyrToZulip, ChildSending = list(range(4)) CURRENT_STATE = States.Startup -logger = cast(logging.Logger, None) # type: logging.Logger # FIXME cast should not be needed? +logger: logging.Logger def to_zulip_username(zephyr_username: str) -> str: if "@" in zephyr_username: @@ -98,7 +100,18 @@ def unwrap_lines(body: str) -> str: result += previous_line return result -def send_zulip(zeph: Dict[str, str]) -> Dict[str, str]: +class ZephyrDict(TypedDict, total=False): + type: Literal["private", "stream"] + time: str + sender: str + stream: str + subject: str + recipient: Union[str, List[str]] + content: str + zsig: str + +def send_zulip(zeph: ZephyrDict) -> Dict[str, Any]: + message: Dict[str, Any] message = {} if options.forward_class_messages: message["forged"] = "yes" @@ -177,7 +190,7 @@ def zephyr_bulk_subscribe(subs: List[Tuple[str, str, str]]) -> None: def update_subscriptions() -> None: try: f = open(options.stream_file_path) - public_streams = json.loads(f.read()) + public_streams: List[str] = json.loads(f.read()) f.close() except Exception: logger.exception("Error reading public streams:") @@ -185,10 +198,10 @@ def update_subscriptions() -> None: classes_to_subscribe = set() for stream in public_streams: - zephyr_class = stream.encode("utf-8") + zephyr_class = stream if ( options.shard is not None - and not hashlib.sha1(zephyr_class).hexdigest().startswith(options.shard) + and not hashlib.sha1(zephyr_class.encode("utf-8")).hexdigest().startswith(options.shard) ): # This stream is being handled by a different zephyr_mirror job. continue @@ -232,7 +245,7 @@ def maybe_restart_mirroring_script() -> None: backoff.fail() raise Exception("Failed to reload too many times, aborting!") -def process_loop(log: Optional[IO[Any]]) -> NoReturn: +def process_loop(log: Optional[IO[str]]) -> NoReturn: restart_check_count = 0 last_check_time = time.time() recieve_backoff = RandomExponentialBackoff() @@ -293,7 +306,7 @@ def parse_zephyr_body(zephyr_data: str, notice_format: str) -> Tuple[str, str]: body = body.replace('\x00', '') return (zsig, body) -def parse_crypt_table(zephyr_class: Text, instance: str) -> Optional[str]: +def parse_crypt_table(zephyr_class: str, instance: str) -> Optional[str]: try: crypt_table = open(os.path.join(os.environ["HOME"], ".crypt-table")) except OSError: @@ -314,7 +327,7 @@ def parse_crypt_table(zephyr_class: Text, instance: str) -> Optional[str]: return groups["keypath"] return None -def decrypt_zephyr(zephyr_class: Text, instance: str, body: str) -> str: +def decrypt_zephyr(zephyr_class: str, instance: str, body: str) -> str: keypath = parse_crypt_table(zephyr_class, instance) if keypath is None: # We can't decrypt it, so we just return the original body @@ -338,13 +351,16 @@ def decrypt_zephyr(zephyr_class: Text, instance: str, body: str) -> str: keypath], stdin=subprocess.PIPE, stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - decrypted, _ = p.communicate(input=body) # type: ignore # Optional[bytes] vs string + stderr=subprocess.PIPE, + universal_newlines=True, + errors="replace") + decrypted, _ = p.communicate(input=body) # Restore our ignoring signals signal.signal(signal.SIGCHLD, signal.SIG_IGN) - return decrypted # type: ignore # bytes, expecting str + return decrypted -def process_notice(notice: Any, log: Optional[IO[Any]]) -> None: +def process_notice(notice: "zephyr.ZNotice", log: Optional[IO[str]]) -> None: + assert notice.sender is not None (zsig, body) = parse_zephyr_body(notice.message, notice.format) is_personal = False is_huddle = False @@ -392,9 +408,10 @@ def process_notice(notice: Any, log: Optional[IO[Any]]) -> None: huddle_recipients.append(to_zulip_username(notice.sender)) body = body.split("\n", 1)[1] - if options.forward_class_messages and notice.opcode.lower() == "crypt": + if options.forward_class_messages and notice.opcode is not None and notice.opcode.lower() == "crypt": body = decrypt_zephyr(zephyr_class, notice.instance.lower(), body) + zeph: ZephyrDict zeph = {'time': str(notice.time), 'sender': notice.sender, 'zsig': zsig, # logged here but not used by app @@ -403,6 +420,7 @@ def process_notice(notice: Any, log: Optional[IO[Any]]) -> None: zeph['type'] = 'private' zeph['recipient'] = huddle_recipients elif is_personal: + assert notice.recipient is not None zeph['type'] = 'private' zeph['recipient'] = to_zulip_username(notice.recipient) else: @@ -425,8 +443,6 @@ def process_notice(notice: Any, log: Optional[IO[Any]]) -> None: heading = "" zeph["content"] = heading + zeph["content"] - zeph = decode_unicode_byte_strings(zeph) - logger.info("Received a message on %s/%s from %s..." % (zephyr_class, notice.instance, notice.sender)) if log is not None: @@ -446,17 +462,6 @@ def process_notice(notice: Any, log: Optional[IO[Any]]) -> None: finally: os._exit(0) -def decode_unicode_byte_strings(zeph: Dict[str, Any]) -> Dict[str, str]: - # 'Any' can be of any type of text that is converted to str. - for field in zeph.keys(): - if isinstance(zeph[field], str): - try: - decoded = zeph[field].decode("utf-8") - except Exception: - decoded = zeph[field].decode("iso-8859-1") - zeph[field] = decoded - return zeph - def quit_failed_initialization(message: str) -> str: logger.error(message) maybe_kill_child() @@ -508,7 +513,7 @@ def zephyr_subscribe_autoretry(sub: Tuple[str, str, str]) -> None: quit_failed_initialization("Could not subscribe to personals, quitting!") -def zephyr_to_zulip(options: Any) -> None: +def zephyr_to_zulip(options: optparse.Values) -> None: if options.use_sessions and os.path.exists(options.session_path): logger.info("Loading old session") zephyr_load_session_autoretry(options.session_path) @@ -532,12 +537,6 @@ def zephyr_to_zulip(options: Any) -> None: for ln in log: try: zeph = json.loads(ln) - # New messages added to the log shouldn't have any - # elements of type str (they should already all be - # unicode), but older messages in the log are - # still of type str, so convert them before we - # send the message - zeph = decode_unicode_byte_strings(zeph) # Handle importing older zephyrs in the logs # where it isn't called a "stream" yet if "class" in zeph: @@ -562,19 +561,20 @@ def zephyr_to_zulip(options: Any) -> None: def send_zephyr(zwrite_args: List[str], content: str) -> Tuple[int, str]: p = subprocess.Popen(zwrite_args, stdin=subprocess.PIPE, - stdout=subprocess.PIPE, stderr=subprocess.PIPE) - stdout, stderr = p.communicate(input=content.encode("utf-8")) + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + universal_newlines=True) + stdout, stderr = p.communicate(input=content) if p.returncode: logger.error("zwrite command '%s' failed with return code %d:" % ( " ".join(zwrite_args), p.returncode,)) if stdout: - logger.info("stdout: " + stdout) # type: ignore # str + bytes + logger.info("stdout: " + stdout) elif stderr: logger.warning("zwrite command '%s' printed the following warning:" % ( " ".join(zwrite_args),)) if stderr: - logger.warning("stderr: " + stderr) # type: ignore # str + bytes - return (p.returncode, stderr) # type: ignore # bytes vs str + logger.warning("stderr: " + stderr) + return (p.returncode, stderr) def send_authed_zephyr(zwrite_args: List[str], content: str) -> Tuple[int, str]: return send_zephyr(zwrite_args, content) @@ -603,9 +603,10 @@ def zcrypt_encrypt_content(zephyr_class: str, instance: str, content: str) -> Op keypath], stdin=subprocess.PIPE, stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - encrypted, _ = p.communicate(input=content) # type: ignore # Optional[bytes] vs string - return encrypted # type: ignore # bytes, expecting Optional[str] + stderr=subprocess.PIPE, + universal_newlines=True) + encrypted, _ = p.communicate(input=content) + return encrypted def forward_to_zephyr(message: Dict[str, Any]) -> None: # 'Any' can be of any type of text @@ -688,8 +689,7 @@ Zulip users (like you) received it, Zephyr users did not. if options.test_mode: logger.debug("Would have forwarded: %s\n%s" % - (zwrite_args, wrapped_content.encode("utf-8"))) # type: ignore - # NOTE: mypy indicates %s outputs the encoded wrapped_content as per %r + (zwrite_args, wrapped_content)) return (code, stderr) = send_authed_zephyr(zwrite_args, wrapped_content) @@ -769,7 +769,7 @@ def maybe_forward_to_zephyr(message: Dict[str, Any]) -> None: # whole process logger.exception("Error forwarding message:") -def zulip_to_zephyr(options: int) -> NoReturn: +def zulip_to_zephyr(options: optparse.Values) -> NoReturn: # Sync messages from zulip to zephyr logger.info("Starting syncing messages.") backoff = RandomExponentialBackoff(timeout_success_equivalent=120) @@ -950,7 +950,7 @@ def configure_logger(logger: logging.Logger, direction_name: Optional[str]) -> N for handler in root_logger.handlers: handler.setFormatter(formatter) -def parse_args() -> Tuple[Any, ...]: +def parse_args() -> Tuple[optparse.Values, List[str]]: parser = optparse.OptionParser() parser.add_option('--forward-class-messages', default=False, @@ -1059,10 +1059,7 @@ if __name__ == "__main__": signal.signal(signal.SIGINT, die_gracefully) - # The properties available on 'options' are dynamically - # determined, so we have to treat it as an Any for type - # annotations. - (options, args) = parse_args() # type: Any, List[str] + (options, args) = parse_args() logger = open_logger() configure_logger(logger, "parent")