pyupgrade: Replace Text with str.

We uses `pyupgrade --py3-plus` to automatically replace all occurence
of `Text`. But manual fix is required to remove the unused imports. Note
that with this configuration pyupgrade also convert string literals to
.format(...) style, which is manually not included in the commit as well.
This commit is contained in:
PIG208 2021-05-28 19:13:13 +08:00 committed by Tim Abbott
parent a54cccc012
commit e27ac0ddbe
11 changed files with 42 additions and 46 deletions

View file

@ -13,7 +13,6 @@ import os
import os.path
import subprocess
import sys
from typing import Text
sys.path.insert(0, os.path.dirname(__file__))
import zulip_git_config as config
@ -33,7 +32,7 @@ client = zulip.Client(
)
def git_repository_name() -> Text:
def git_repository_name() -> str:
output = subprocess.check_output(["git", "rev-parse", "--is-bare-repository"])
if output.strip() == "true":
return os.path.basename(os.getcwd())[: -len(".git")]

View file

@ -1,5 +1,5 @@
#
from typing import Dict, Optional, Text
from typing import Dict, Optional
# Name of the stream to send notifications to, default is "commits"
STREAM_NAME = "commits"
@ -23,7 +23,7 @@ ZULIP_API_KEY = "0123456789abcdef0123456789abcdef"
# * stream "commits"
# * topic "master"
# And similarly for branch "test-post-receive" (for use when testing).
def commit_notice_destination(repo: Text, branch: Text, commit: Text) -> Optional[Dict[Text, Text]]:
def commit_notice_destination(repo: str, branch: str, commit: str) -> Optional[Dict[str, str]]:
if branch in ["master", "test-post-receive"]:
return dict(stream=STREAM_NAME, subject="%s" % (branch,))
@ -36,7 +36,7 @@ def commit_notice_destination(repo: Text, branch: Text, commit: Text) -> Optiona
# graphical repository viewer, e.g.
#
# return '!avatar(%s) [%s](https://example.com/commits/%s)\n' % (author, subject, commit_id)
def format_commit_message(author: Text, subject: Text, commit_id: Text) -> Text:
def format_commit_message(author: str, subject: str, commit_id: str) -> str:
return "!avatar(%s) %s\n" % (author, subject)

View file

@ -6,7 +6,6 @@
# `hg push`). See https://zulip.com/integrations for installation instructions.
import sys
from typing import Text
from mercurial import repository as repo
from mercurial import ui
@ -17,8 +16,8 @@ VERSION = "0.9"
def format_summary_line(
web_url: str, user: str, base: int, tip: int, branch: str, node: Text
) -> Text:
web_url: str, user: str, base: int, tip: int, branch: str, node: str
) -> str:
"""
Format the first line of the message, which contains summary
information about the changeset and links to the changelog if a
@ -71,7 +70,7 @@ def format_commit_lines(web_url: str, repo: repo, base: int, tip: int) -> str:
def send_zulip(
email: str, api_key: str, site: str, stream: str, subject: str, content: Text
email: str, api_key: str, site: str, stream: str, subject: str, content: str
) -> None:
"""
Send a message to Zulip using the provided credentials, which should be for
@ -100,7 +99,7 @@ def get_config(ui: ui, item: str) -> str:
sys.exit(1)
def hook(ui: ui, repo: repo, **kwargs: Text) -> None:
def hook(ui: ui, repo: repo, **kwargs: str) -> None:
"""
Invoked by configuring a [hook] entry in .hg/hgrc.
"""

View file

@ -1,5 +1,5 @@
# https://github.com/python/mypy/issues/1141
from typing import Dict, Optional, Text
from typing import Dict, Optional
# Change these values to configure authentication for the plugin
ZULIP_USER = "openshift-bot@example.com"
@ -19,7 +19,7 @@ ZULIP_API_KEY = "0123456789abcdef0123456789abcdef"
# * stream "deployments"
# * topic "master"
# And similarly for branch "test-post-receive" (for use when testing).
def deployment_notice_destination(branch: str) -> Optional[Dict[str, Text]]:
def deployment_notice_destination(branch: str) -> Optional[Dict[str, str]]:
if branch in ["master", "test-post-receive"]:
return dict(stream="deployments", subject="%s" % (branch,))

View file

@ -1,4 +1,4 @@
from typing import Dict, Optional, Text
from typing import Dict, Optional
# Change these values to configure authentication for the plugin
ZULIP_USER = "p4-bot@example.com"
@ -28,7 +28,7 @@ P4_WEB: Optional[str] = None
# "master-plan" and "secret" subdirectories of //depot/ to:
# * stream "depot_subdirectory-commits"
# * subject "change_root"
def commit_notice_destination(path: Text, changelist: int) -> Optional[Dict[Text, Text]]:
def commit_notice_destination(path: str, changelist: int) -> Optional[Dict[str, str]]:
dirs = path.split("/")
if len(dirs) >= 4 and dirs[3] not in ("*", "..."):
directory = dirs[3]

View file

@ -1,4 +1,4 @@
from typing import Dict, Optional, Text
from typing import Dict, Optional
# Change these values to configure authentication for the plugin
ZULIP_USER = "svn-bot@example.com"
@ -18,7 +18,7 @@ ZULIP_API_KEY = "0123456789abcdef0123456789abcdef"
# and "my-super-secret-repository" repos to
# * stream "commits"
# * topic "branch_name"
def commit_notice_destination(path: Text, commit: Text) -> Optional[Dict[Text, Text]]:
def commit_notice_destination(path: str, commit: str) -> Optional[Dict[str, str]]:
repo = path.split("/")[-1]
if repo not in ["evil-master-plan", "my-super-secret-repository"]:
return dict(stream="commits", subject="%s" % (repo,))

View file

@ -22,7 +22,6 @@ from typing import (
Mapping,
Optional,
Sequence,
Text,
Tuple,
Union,
)
@ -313,7 +312,7 @@ def get_default_config_filename() -> Optional[str]:
return config_file
def validate_boolean_field(field: Optional[Text]) -> Union[bool, None]:
def validate_boolean_field(field: Optional[str]) -> Union[bool, None]:
if not isinstance(field, str):
return None
@ -563,7 +562,7 @@ class Client:
req_files = []
for (key, val) in orig_request.items():
if isinstance(val, str) or isinstance(val, Text):
if isinstance(val, str) or isinstance(val, str):
request[key] = val
else:
request[key] = json.dumps(val)

View file

@ -3,12 +3,11 @@ for extract.py
"""
from json.decoder import JSONDecodeError
from typing import Text
from zulip_bots.bots.monkeytestit.lib import extract, report
def execute(message: Text, apikey: Text) -> Text:
def execute(message: str, apikey: str) -> str:
"""Parses message and returns a dictionary
:param message: The message
@ -69,7 +68,7 @@ def execute(message: Text, apikey: Text) -> Text:
return "Unknown command. Available commands: `check <website> " "[params]`"
def failed(message: Text) -> Text:
def failed(message: str) -> str:
"""Simply attaches a failed marker to a message
:param message: The message

View file

@ -1,10 +1,10 @@
"""Used to mainly compose a decorated report for the user
"""
from typing import Dict, List, Text
from typing import Dict, List
def compose(results: Dict) -> Text:
def compose(results: Dict) -> str:
"""Composes a report based on test results
An example would be:
@ -38,7 +38,7 @@ def compose(results: Dict) -> Text:
return response
def print_more_info_url(results: Dict) -> Text:
def print_more_info_url(results: Dict) -> str:
"""Creates info for the test URL from monkeytest.it
Example:
@ -51,7 +51,7 @@ def print_more_info_url(results: Dict) -> Text:
return "More info: {}".format(results["results_url"])
def print_test_id(results: Dict) -> Text:
def print_test_id(results: Dict) -> str:
"""Prints the test-id with attached to the url
:param results: A dictionary containing the results of a check
@ -60,7 +60,7 @@ def print_test_id(results: Dict) -> Text:
return "Test: https://monkeytest.it/test/{}".format(results["test_id"])
def print_failures_checkers(results: Dict) -> Text:
def print_failures_checkers(results: Dict) -> str:
"""Creates info for failures in enabled checkers
Example:
@ -105,7 +105,7 @@ def get_enabled_checkers(results: Dict) -> List:
return enabled_checkers
def print_enabled_checkers(results: Dict) -> Text:
def print_enabled_checkers(results: Dict) -> str:
"""Creates info for enabled checkers. This joins the list of enabled
checkers and format it with the current string response
@ -118,7 +118,7 @@ def print_enabled_checkers(results: Dict) -> Text:
return "Enabled checkers: {}".format(", ".join(get_enabled_checkers(results)))
def print_status(results: Dict) -> Text:
def print_status(results: Dict) -> str:
"""Creates info for the check status.
Example: Status: tests_failed

View file

@ -3,12 +3,12 @@ import importlib.abc
import importlib.util
import os
from pathlib import Path
from typing import Any, Optional, Text, Tuple
from typing import Any, Optional, Tuple
current_dir = os.path.dirname(os.path.abspath(__file__))
def import_module_from_source(path: Text, name: Text) -> Any:
def import_module_from_source(path: str, name: str) -> Any:
spec = importlib.util.spec_from_file_location(name, path)
module = importlib.util.module_from_spec(spec)
loader = spec.loader
@ -18,14 +18,14 @@ def import_module_from_source(path: Text, name: Text) -> Any:
return module
def import_module_by_name(name: Text) -> Any:
def import_module_by_name(name: str) -> Any:
try:
return importlib.import_module(name)
except ImportError:
return None
def resolve_bot_path(name: Text) -> Optional[Tuple[Path, Text]]:
def resolve_bot_path(name: str) -> Optional[Tuple[Path, str]]:
if os.path.isfile(name):
bot_path = Path(name)
bot_name = Path(bot_path).stem

View file

@ -7,7 +7,7 @@ import signal
import sys
import time
from contextlib import contextmanager
from typing import IO, Any, Dict, Iterator, List, Optional, Set, Text
from typing import IO, Any, Dict, Iterator, List, Optional, Set
from typing_extensions import Protocol
@ -79,13 +79,13 @@ class BotIdentity:
class BotStorage(Protocol):
def put(self, key: Text, value: Any) -> None:
def put(self, key: str, value: Any) -> None:
...
def get(self, key: Text) -> Any:
def get(self, key: str) -> Any:
...
def contains(self, key: Text) -> bool:
def contains(self, key: str) -> bool:
...
@ -100,13 +100,13 @@ class CachedStorage:
self._cache = init_data
self._dirty_keys: Set[str] = set()
def put(self, key: Text, value: Any) -> None:
def put(self, key: str, value: Any) -> None:
# In the cached storage, values being put to the storage is not flushed to the parent storage.
# It will be marked dirty until it get flushed.
self._cache[key] = value
self._dirty_keys.add(key)
def get(self, key: Text) -> Any:
def get(self, key: str) -> Any:
# Unless the key is not found in the cache, the cached storage will not lookup the parent storage.
if key in self._cache:
return self._cache[key]
@ -123,11 +123,11 @@ class CachedStorage:
key = self._dirty_keys.pop()
self._parent_storage.put(key, self._cache[key])
def flush_one(self, key: Text) -> None:
def flush_one(self, key: str) -> None:
self._dirty_keys.remove(key)
self._parent_storage.put(key, self._cache[key])
def contains(self, key: Text) -> bool:
def contains(self, key: str) -> bool:
if key in self._cache:
return True
else:
@ -139,15 +139,15 @@ class StateHandler:
self._client = client
self.marshal = lambda obj: json.dumps(obj)
self.demarshal = lambda obj: json.loads(obj)
self.state_ = dict() # type: Dict[Text, Any]
self.state_ = dict()
def put(self, key: Text, value: Any) -> None:
def put(self, key: str, value: Any) -> None:
self.state_[key] = self.marshal(value)
response = self._client.update_storage({"storage": {key: self.state_[key]}})
if response["result"] != "success":
raise StateHandlerError("Error updating state: {}".format(str(response)))
def get(self, key: Text) -> Any:
def get(self, key: str) -> Any:
if key in self.state_:
return self.demarshal(self.state_[key])
@ -159,12 +159,12 @@ class StateHandler:
self.state_[key] = marshalled_value
return self.demarshal(marshalled_value)
def contains(self, key: Text) -> bool:
def contains(self, key: str) -> bool:
return key in self.state_
@contextmanager
def use_storage(storage: BotStorage, keys: List[Text]) -> Iterator[BotStorage]:
def use_storage(storage: BotStorage, keys: List[str]) -> Iterator[BotStorage]:
# The context manager for StateHandler that minimizes the number of round-trips to the server.
# It will fetch all the data using the specified keys and store them to
# a CachedStorage that will not communicate with the server until manually