zephyr: Attempt to fix types.

The mirror has some chance of running on Python 3 now, once the
python-zephyr patch is rebased on 0.2.1, though it’s untested.

Signed-off-by: Anders Kaseorg <anders@zulip.com>
This commit is contained in:
Anders Kaseorg 2021-03-10 13:55:13 -08:00 committed by Tim Abbott
parent 34012a4015
commit 503e8ed82d
2 changed files with 54 additions and 57 deletions

View file

@ -9,7 +9,7 @@ import hashlib
import zephyr
import zulip
from typing import Any, Dict, List, Set, Tuple
from typing import Dict, List, Set, Tuple
parser = optparse.OptionParser()
parser.add_option('--verbose',
@ -101,11 +101,11 @@ def send_zulip(message: Dict[str, str]) -> None:
# Returns True if and only if we "Detected server failure" sending the zephyr.
def send_zephyr(zwrite_args: List[str], content: str) -> bool:
p = subprocess.Popen(zwrite_args, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate(input=content.encode("utf-8"))
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
universal_newlines=True)
stdout, stderr = p.communicate(input=content)
if p.returncode != 0:
# FIXME: This should really look for a byte form of the string in stdout
if "Detected server failure while receiving acknowledgement for" in stdout: # type: ignore
if "Detected server failure while receiving acknowledgement for" in stdout:
logger.warning("Got server failure error sending zephyr; retrying")
logger.warning(stderr)
return True
@ -151,7 +151,7 @@ for tries in range(10):
actually_subscribed = True
break
except OSError as e:
if "SERVNAK received" in e: # type: ignore # https://github.com/python/mypy/issues/2118
if "SERVNAK received" in e.args:
logger.error("SERVNAK repeatedly received, punting rest of test")
else:
logger.exception("Exception subscribing to zephyrs")
@ -163,7 +163,7 @@ if not actually_subscribed:
# Prepare keys
zhkeys = {} # type: Dict[str, Tuple[str, str]]
hzkeys = {} # type: Dict[str, Tuple[str, str]]
def gen_key(key_dict: Dict[str, Any]) -> str:
def gen_key(key_dict: Dict[str, Tuple[str, str]]) -> str:
bits = str(random.getrandbits(32))
while bits in key_dict:
# Avoid the unlikely event that we get the same bits twice

View file

@ -1,6 +1,6 @@
#!/usr/bin/env python3
from typing import Any, Dict, IO, List, NoReturn, Optional, Set, Text, Tuple, cast
from typing import Any, Dict, IO, List, NoReturn, Optional, Set, Tuple, Union
from types import FrameType
import sys
@ -17,6 +17,8 @@ import hashlib
import tempfile
import select
from typing_extensions import Literal, TypedDict
from zulip import RandomExponentialBackoff
DEFAULT_SITE = "https://api.zulip.com"
@ -25,7 +27,7 @@ class States:
Startup, ZulipToZephyr, ZephyrToZulip, ChildSending = list(range(4))
CURRENT_STATE = States.Startup
logger = cast(logging.Logger, None) # type: logging.Logger # FIXME cast should not be needed?
logger: logging.Logger
def to_zulip_username(zephyr_username: str) -> str:
if "@" in zephyr_username:
@ -98,7 +100,18 @@ def unwrap_lines(body: str) -> str:
result += previous_line
return result
def send_zulip(zeph: Dict[str, str]) -> Dict[str, str]:
class ZephyrDict(TypedDict, total=False):
type: Literal["private", "stream"]
time: str
sender: str
stream: str
subject: str
recipient: Union[str, List[str]]
content: str
zsig: str
def send_zulip(zeph: ZephyrDict) -> Dict[str, Any]:
message: Dict[str, Any]
message = {}
if options.forward_class_messages:
message["forged"] = "yes"
@ -177,7 +190,7 @@ def zephyr_bulk_subscribe(subs: List[Tuple[str, str, str]]) -> None:
def update_subscriptions() -> None:
try:
f = open(options.stream_file_path)
public_streams = json.loads(f.read())
public_streams: List[str] = json.loads(f.read())
f.close()
except Exception:
logger.exception("Error reading public streams:")
@ -185,10 +198,10 @@ def update_subscriptions() -> None:
classes_to_subscribe = set()
for stream in public_streams:
zephyr_class = stream.encode("utf-8")
zephyr_class = stream
if (
options.shard is not None
and not hashlib.sha1(zephyr_class).hexdigest().startswith(options.shard)
and not hashlib.sha1(zephyr_class.encode("utf-8")).hexdigest().startswith(options.shard)
):
# This stream is being handled by a different zephyr_mirror job.
continue
@ -232,7 +245,7 @@ def maybe_restart_mirroring_script() -> None:
backoff.fail()
raise Exception("Failed to reload too many times, aborting!")
def process_loop(log: Optional[IO[Any]]) -> NoReturn:
def process_loop(log: Optional[IO[str]]) -> NoReturn:
restart_check_count = 0
last_check_time = time.time()
recieve_backoff = RandomExponentialBackoff()
@ -293,7 +306,7 @@ def parse_zephyr_body(zephyr_data: str, notice_format: str) -> Tuple[str, str]:
body = body.replace('\x00', '')
return (zsig, body)
def parse_crypt_table(zephyr_class: Text, instance: str) -> Optional[str]:
def parse_crypt_table(zephyr_class: str, instance: str) -> Optional[str]:
try:
crypt_table = open(os.path.join(os.environ["HOME"], ".crypt-table"))
except OSError:
@ -314,7 +327,7 @@ def parse_crypt_table(zephyr_class: Text, instance: str) -> Optional[str]:
return groups["keypath"]
return None
def decrypt_zephyr(zephyr_class: Text, instance: str, body: str) -> str:
def decrypt_zephyr(zephyr_class: str, instance: str, body: str) -> str:
keypath = parse_crypt_table(zephyr_class, instance)
if keypath is None:
# We can't decrypt it, so we just return the original body
@ -338,13 +351,16 @@ def decrypt_zephyr(zephyr_class: Text, instance: str, body: str) -> str:
keypath],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
decrypted, _ = p.communicate(input=body) # type: ignore # Optional[bytes] vs string
stderr=subprocess.PIPE,
universal_newlines=True,
errors="replace")
decrypted, _ = p.communicate(input=body)
# Restore our ignoring signals
signal.signal(signal.SIGCHLD, signal.SIG_IGN)
return decrypted # type: ignore # bytes, expecting str
return decrypted
def process_notice(notice: Any, log: Optional[IO[Any]]) -> None:
def process_notice(notice: "zephyr.ZNotice", log: Optional[IO[str]]) -> None:
assert notice.sender is not None
(zsig, body) = parse_zephyr_body(notice.message, notice.format)
is_personal = False
is_huddle = False
@ -392,9 +408,10 @@ def process_notice(notice: Any, log: Optional[IO[Any]]) -> None:
huddle_recipients.append(to_zulip_username(notice.sender))
body = body.split("\n", 1)[1]
if options.forward_class_messages and notice.opcode.lower() == "crypt":
if options.forward_class_messages and notice.opcode is not None and notice.opcode.lower() == "crypt":
body = decrypt_zephyr(zephyr_class, notice.instance.lower(), body)
zeph: ZephyrDict
zeph = {'time': str(notice.time),
'sender': notice.sender,
'zsig': zsig, # logged here but not used by app
@ -403,6 +420,7 @@ def process_notice(notice: Any, log: Optional[IO[Any]]) -> None:
zeph['type'] = 'private'
zeph['recipient'] = huddle_recipients
elif is_personal:
assert notice.recipient is not None
zeph['type'] = 'private'
zeph['recipient'] = to_zulip_username(notice.recipient)
else:
@ -425,8 +443,6 @@ def process_notice(notice: Any, log: Optional[IO[Any]]) -> None:
heading = ""
zeph["content"] = heading + zeph["content"]
zeph = decode_unicode_byte_strings(zeph)
logger.info("Received a message on %s/%s from %s..." %
(zephyr_class, notice.instance, notice.sender))
if log is not None:
@ -446,17 +462,6 @@ def process_notice(notice: Any, log: Optional[IO[Any]]) -> None:
finally:
os._exit(0)
def decode_unicode_byte_strings(zeph: Dict[str, Any]) -> Dict[str, str]:
# 'Any' can be of any type of text that is converted to str.
for field in zeph.keys():
if isinstance(zeph[field], str):
try:
decoded = zeph[field].decode("utf-8")
except Exception:
decoded = zeph[field].decode("iso-8859-1")
zeph[field] = decoded
return zeph
def quit_failed_initialization(message: str) -> str:
logger.error(message)
maybe_kill_child()
@ -508,7 +513,7 @@ def zephyr_subscribe_autoretry(sub: Tuple[str, str, str]) -> None:
quit_failed_initialization("Could not subscribe to personals, quitting!")
def zephyr_to_zulip(options: Any) -> None:
def zephyr_to_zulip(options: optparse.Values) -> None:
if options.use_sessions and os.path.exists(options.session_path):
logger.info("Loading old session")
zephyr_load_session_autoretry(options.session_path)
@ -532,12 +537,6 @@ def zephyr_to_zulip(options: Any) -> None:
for ln in log:
try:
zeph = json.loads(ln)
# New messages added to the log shouldn't have any
# elements of type str (they should already all be
# unicode), but older messages in the log are
# still of type str, so convert them before we
# send the message
zeph = decode_unicode_byte_strings(zeph)
# Handle importing older zephyrs in the logs
# where it isn't called a "stream" yet
if "class" in zeph:
@ -562,19 +561,20 @@ def zephyr_to_zulip(options: Any) -> None:
def send_zephyr(zwrite_args: List[str], content: str) -> Tuple[int, str]:
p = subprocess.Popen(zwrite_args, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate(input=content.encode("utf-8"))
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
universal_newlines=True)
stdout, stderr = p.communicate(input=content)
if p.returncode:
logger.error("zwrite command '%s' failed with return code %d:" % (
" ".join(zwrite_args), p.returncode,))
if stdout:
logger.info("stdout: " + stdout) # type: ignore # str + bytes
logger.info("stdout: " + stdout)
elif stderr:
logger.warning("zwrite command '%s' printed the following warning:" % (
" ".join(zwrite_args),))
if stderr:
logger.warning("stderr: " + stderr) # type: ignore # str + bytes
return (p.returncode, stderr) # type: ignore # bytes vs str
logger.warning("stderr: " + stderr)
return (p.returncode, stderr)
def send_authed_zephyr(zwrite_args: List[str], content: str) -> Tuple[int, str]:
return send_zephyr(zwrite_args, content)
@ -603,9 +603,10 @@ def zcrypt_encrypt_content(zephyr_class: str, instance: str, content: str) -> Op
keypath],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
encrypted, _ = p.communicate(input=content) # type: ignore # Optional[bytes] vs string
return encrypted # type: ignore # bytes, expecting Optional[str]
stderr=subprocess.PIPE,
universal_newlines=True)
encrypted, _ = p.communicate(input=content)
return encrypted
def forward_to_zephyr(message: Dict[str, Any]) -> None:
# 'Any' can be of any type of text
@ -688,8 +689,7 @@ Zulip users (like you) received it, Zephyr users did not.
if options.test_mode:
logger.debug("Would have forwarded: %s\n%s" %
(zwrite_args, wrapped_content.encode("utf-8"))) # type: ignore
# NOTE: mypy indicates %s outputs the encoded wrapped_content as per %r
(zwrite_args, wrapped_content))
return
(code, stderr) = send_authed_zephyr(zwrite_args, wrapped_content)
@ -769,7 +769,7 @@ def maybe_forward_to_zephyr(message: Dict[str, Any]) -> None:
# whole process
logger.exception("Error forwarding message:")
def zulip_to_zephyr(options: int) -> NoReturn:
def zulip_to_zephyr(options: optparse.Values) -> NoReturn:
# Sync messages from zulip to zephyr
logger.info("Starting syncing messages.")
backoff = RandomExponentialBackoff(timeout_success_equivalent=120)
@ -950,7 +950,7 @@ def configure_logger(logger: logging.Logger, direction_name: Optional[str]) -> N
for handler in root_logger.handlers:
handler.setFormatter(formatter)
def parse_args() -> Tuple[Any, ...]:
def parse_args() -> Tuple[optparse.Values, List[str]]:
parser = optparse.OptionParser()
parser.add_option('--forward-class-messages',
default=False,
@ -1059,10 +1059,7 @@ if __name__ == "__main__":
signal.signal(signal.SIGINT, die_gracefully)
# The properties available on 'options' are dynamically
# determined, so we have to treat it as an Any for type
# annotations.
(options, args) = parse_args() # type: Any, List[str]
(options, args) = parse_args()
logger = open_logger()
configure_logger(logger, "parent")