diff --git a/zulip_bots/README.md b/zulip_bots/README.md index d1606af..28cf561 100644 --- a/zulip_bots/README.md +++ b/zulip_bots/README.md @@ -14,6 +14,7 @@ zulip_bots # This directory ├───zulip_bots # `zulip_bots` package. │ ├───bots/ # Actively maintained and tested bots. │ ├───bots_unmaintained/ # Unmaintained, potentially broken bots. +│ ├───game_handler.py # Handles game-related bots. │ ├───lib.py # Backbone of run.py │ ├───provision.py # Creates a development environment. │ ├───run.py # Used to run bots. diff --git a/zulip_bots/zulip_bots/bots/game_handler_bot/__init__.py b/zulip_bots/zulip_bots/bots/game_handler_bot/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/zulip_bots/zulip_bots/bots/game_handler_bot/game_handler_bot.py b/zulip_bots/zulip_bots/bots/game_handler_bot/game_handler_bot.py new file mode 100644 index 0000000..8540a2c --- /dev/null +++ b/zulip_bots/zulip_bots/bots/game_handler_bot/game_handler_bot.py @@ -0,0 +1,72 @@ +from zulip_bots.game_handler import GameAdapter, BadMoveException +from typing import List, Any + + +class GameHandlerBotMessageHandler(object): + tokens = [':blue_circle:', ':red_circle:'] + + def parse_board(self, board: Any) -> str: + return 'foo' + + def get_player_color(self, turn: int) -> str: + return self.tokens[turn] + + def alert_move_message(self, original_player: str, move_info: str) -> str: + column_number = move_info.replace('move ', '') + return original_player + ' moved in column ' + column_number + + def game_start_message(self) -> str: + return 'Type `move ` to place a token.\n \ +The first player to get 4 in a row wins!\n \ +Good Luck!' + + +class MockModel(object): + def __init__(self) -> None: + self.current_board = 'mock board' + + def make_move( + self, + move: str, + player: int, + is_computer: bool=False + ) -> Any: + if not is_computer: + if int(move.replace('move ', '')) < 9: + return 'mock board' + else: + raise BadMoveException('Invalid Move.') + return 'mock board' + + def determine_game_over(self, players: List[str]) -> None: + return None + + +class GameHandlerBotHandler(GameAdapter): + ''' + DO NOT USE THIS BOT + This bot is used to test game_handler.py + ''' + + def __init__(self) -> None: + game_name = 'foo test game' + bot_name = 'game_handler_bot' + move_help_message = '* To make your move during a game, type\n' \ + '```move ```' + move_regex = 'move (\d)$' + model = MockModel + gameMessageHandler = GameHandlerBotMessageHandler + + super(GameHandlerBotHandler, self).__init__( + game_name, + bot_name, + move_help_message, + move_regex, + model, + gameMessageHandler, + max_players=2, + supports_computer=True + ) + + +handler_class = GameHandlerBotHandler diff --git a/zulip_bots/zulip_bots/bots/game_handler_bot/test_game_handler_bot.py b/zulip_bots/zulip_bots/bots/game_handler_bot/test_game_handler_bot.py new file mode 100644 index 0000000..35c3565 --- /dev/null +++ b/zulip_bots/zulip_bots/bots/game_handler_bot/test_game_handler_bot.py @@ -0,0 +1,501 @@ +from zulip_bots.test_lib import BotTestCase +from zulip_bots.game_handler import GameInstance + +from contextlib import contextmanager +from mock import MagicMock, patch + +from typing import Any, Dict, List + + +class TestGameHandlerBot(BotTestCase): + bot_name = 'game_handler_bot' + + def make_request_message( + self, + content: str, + user: str='foo@example.com', + user_name: str='foo', + type: str='private', + stream: str='', + subject: str='' + ) -> Dict[str, str]: + message = dict( + sender_email=user, + sender_full_name=user_name, + content=content, + type=type, + display_recipient=stream, + subject=subject, + ) + return message + + # Function that serves similar purpose to BotTestCase.verify_dialog, but allows for multiple responses to be handled + def verify_response( + self, + request: str, + expected_response: str, + response_number: int, + bot: Any=None, + user_name: str='foo', + stream: str='', + subject: str='', + max_messages: int=20 + ) -> None: + ''' + This function serves a similar purpose + to BotTestCase.verify_dialog, but allows + for multiple responses to be validated, + and for mocking of the bot's internal data + ''' + if bot is None: + bot, bot_handler = self._get_handlers() + else: + _b, bot_handler = self._get_handlers() + type = 'private' if stream == '' else 'stream' + message = self.make_request_message( + request, user_name + '@example.com', user_name, type, stream, subject) + bot_handler.reset_transcript() + bot.handle_message(message, bot_handler) + + responses = [ + message + for (method, message) + in bot_handler.transcript + ] + first_response = responses[response_number] + self.assertEqual(expected_response, first_response['content']) + self.assertLessEqual(len(responses), max_messages) + + def add_user_to_cache(self, name: str, bot: Any=None) -> Any: + if bot is None: + bot, bot_handler = self._get_handlers() + message = { + 'sender_email': '{}@example.com'.format(name), + 'sender_full_name': '{}'.format(name) + } + bot.add_user_to_cache(message) + return bot + + def setup_game(self, id: str='', bot: Any=None, players: List[str]=['foo', 'baz'], subject: str='test game', stream: str='test') -> Any: + if bot is None: + bot, bot_handler = self._get_handlers() + for p in players: + self.add_user_to_cache(p, bot) + players_emails = [p + '@example.com' for p in players] + game_id = 'abc123' + if id != '': + game_id = id + instance = GameInstance(bot, False, subject, + game_id, players_emails, stream) + bot.instances.update({game_id: instance}) + instance.turn = -1 + instance.start() + return bot + + def setup_computer_game(self) -> Any: + bot = self.add_user_to_cache('foo') + bot.email = 'test-bot@example.com' + self.add_user_to_cache('test-bot', bot) + instance = GameInstance(bot, False, 'test game', 'abc123', [ + 'foo@example.com', 'test-bot@example.com'], 'test') + bot.instances.update({'abc123': instance}) + instance.start() + return bot + + def help_message(self) -> str: + return '''** foo test game Bot Help:** +*Preface all commands with @**test-bot*** +* To start a game in a stream (*recommended*), type +`start game` +* To start a game against another player, type +`start game with @` +* To start a game with the computer, type +`start game with` @**test-bot** +* To quit a game at any time, type +`quit` +* To end a game with a draw, type +`draw` +* To forfeit a game, type +`forfeit` +* To see the leaderboard, type +`leaderboard` +* To withdraw an invitation, type +`cancel game` +* To make your move during a game, type +```move ```''' + + def test_help_message(self) -> None: + self.verify_response('help', self.help_message(), 0) + self.verify_response('foo bar baz', self.help_message(), 0) + + def test_exception_handling(self) -> None: + with patch('logging.exception'), \ + patch('zulip_bots.game_handler.GameAdapter.command_quit', + side_effect=Exception): + self.verify_response('quit', 'Error .', 0) + + def test_not_in_game_messages(self) -> None: + self.verify_response( + 'move 3', 'You are not in a game at the moment. Type `help` for help.', 0, max_messages=1) + self.verify_response( + 'quit', 'You are not in a game. Type `help` for all commands.', 0, max_messages=1) + + def test_start_game_with_name(self) -> None: + bot = self.add_user_to_cache('baz') + self.verify_response('start game with @**baz**', + 'You\'ve sent an invitation to play foo test game with @**baz**', 1, bot=bot) + self.assertEqual(len(bot.invites), 1) + + def test_start_game_with_email(self) -> None: + bot = self.add_user_to_cache('baz') + self.verify_response('start game with baz@example.com', + 'You\'ve sent an invitation to play foo test game with @**baz**', 1, bot=bot) + self.assertEqual(len(bot.invites), 1) + + def test_join_game_and_start_in_stream(self) -> None: + bot = self.add_user_to_cache('baz') + self.add_user_to_cache('foo', bot) + bot.invites = { + 'abc': { + 'stream': 'test', + 'subject': 'test game', + 'host': 'foo@example.com' + } + } + self.verify_response('join', '@**baz** has joined the game', 0, bot=bot, + stream='test', subject='test game', user_name='baz') + self.assertEqual(len(bot.instances.keys()), 1) + + def test_start_game_in_stream(self) -> None: + self.verify_response( + 'start game', + '**foo** wants to play **foo test game**. Type @**test-bot** join to play them!', + 0, + stream='test', + subject='test game' + ) + + def test_start_invite_game_in_stream(self) -> None: + bot = self.add_user_to_cache('baz') + self.verify_response( + 'start game with @**baz**', + 'If you were invited, and you\'re here, type "@**test-bot** accept" to accept the invite!', + 2, + bot=bot, + stream='test', + subject='game test' + ) + + def test_join_no_game(self) -> None: + self.verify_response('join', 'There is not a game in this subject. Type `help` for all commands.', + 0, stream='test', subject='test game', user_name='baz', max_messages=1) + + def test_accept_invitation(self) -> None: + bot = self.add_user_to_cache('baz') + self.add_user_to_cache('foo', bot) + bot.invites = { + 'abc': { + 'subject': '###private###', + 'stream': 'games', + 'host': 'foo@example.com', + 'baz@example.com': 'p' + } + } + self.verify_response( + 'accept', 'Accepted invitation to play **foo test game** from @**foo**.', 0, bot, 'baz') + + def test_decline_invitation(self) -> None: + bot = self.add_user_to_cache('baz') + self.add_user_to_cache('foo', bot) + bot.invites = { + 'abc': { + 'subject': '###private###', + 'host': 'foo@example.com', + 'baz@example.com': 'p' + } + } + self.verify_response( + 'decline', 'Declined invitation to play **foo test game** from @**foo**.', 0, bot, 'baz') + + def test_quit_invite(self) -> None: + bot = self.add_user_to_cache('foo') + bot.invites = { + 'abc': { + 'subject': '###private###', + 'host': 'foo@example.com' + } + } + self.verify_response( + 'quit', 'Game cancelled.\nfoo@example.com quit.', 0, bot, 'foo') + + def test_user_already_in_game_errors(self) -> None: + bot = self.setup_game() + self.verify_response('start game with @**baz**', + 'You are already in a game. Type `quit` to leave.', 0, bot=bot, max_messages=1) + self.verify_response( + 'start game', 'You are already in a game. Type `quit` to leave.', 0, bot=bot, stream='test', max_messages=1) + self.verify_response( + 'accept', 'You are already in a game. Type `quit` to leave.', 0, bot=bot, max_messages=1) + self.verify_response( + 'decline', 'You are already in a game. Type `quit` to leave.', 0, bot=bot, max_messages=1) + self.verify_response( + 'join', 'You are already in a game. Type `quit` to leave.', 0, bot=bot, max_messages=1) + + def test_register_command(self) -> None: + bot = self.add_user_to_cache('foo') + self.verify_response( + 'register', 'Hello @**foo**. Thanks for registering!', 0, bot, 'foo') + self.assertIn('foo@example.com', bot.user_cache.keys()) + + def test_no_active_invite_errors(self) -> None: + self.verify_response( + 'accept', 'No active invites. Type `help` for commands.', 0) + self.verify_response( + 'decline', 'No active invites. Type `help` for commands.', 0) + + def test_wrong_number_of_players_message(self) -> None: + bot = self.add_user_to_cache('baz') + bot.min_players = 5 + self.verify_response('start game with @**baz**', + 'You must have at least 5 players to play.\nGame cancelled.', 0, bot=bot) + bot.min_players = 2 + bot.max_players = 1 + self.verify_response('start game with @**baz**', + 'The maximum number of players for this game is 1.', 0, bot=bot) + bot.max_players = 1 + bot.invites = { + 'abc': { + 'stream': 'test', + 'subject': 'test game', + 'host': 'foo@example.com' + } + } + self.verify_response('join', 'This game is full.', 0, bot=bot, + stream='test', subject='test game', user_name='baz') + + def test_public_accept(self) -> None: + bot = self.add_user_to_cache('baz') + self.add_user_to_cache('foo', bot) + bot.invites = { + 'abc': { + 'stream': 'test', + 'subject': 'test game', + 'host': 'baz@example.com', + 'foo@example.com': 'p' + } + } + self.verify_response('accept', '@**foo** has accepted the invitation.', + 0, bot=bot, stream='test', subject='test game') + + def test_start_game_with_computer(self) -> None: + self.verify_response('start game with @**test-bot**', + 'Wait... That\'s me!', 4, stream='test', subject='test game') + + def test_sent_by_bot(self) -> None: + with self.assertRaises(IndexError): + self.verify_response( + 'foo', '', 0, user_name='test-bot', stream='test', subject='test game') + + def test_forfeit(self) -> None: + bot = self.setup_game() + self.verify_response('forfeit', '**foo** forfeited!', + 0, bot=bot, stream='test', subject='test game') + + def test_draw(self) -> None: + bot = self.setup_game() + self.verify_response('draw', '**foo** has voted for a draw!\nType `draw` to accept', + 0, bot=bot, stream='test', subject='test game') + self.verify_response('draw', 'It was a draw!', 0, bot=bot, stream='test', + subject='test game', user_name='baz') + + def test_normal_turns(self) -> None: + bot = self.setup_game() + self.verify_response('move 3', '**foo** moved in column 3\n\nfoo\n\nIt\'s @**baz**\'s (:red_circle:) turn.', + 0, bot=bot, stream='test', subject='test game') + self.verify_response('move 3', '**baz** moved in column 3\n\nfoo\n\nIt\'s @**foo**\'s (:blue_circle:) turn.', + 0, bot=bot, stream='test', subject='test game', user_name='baz') + + def test_wrong_turn(self) -> None: + bot = self.setup_game() + self.verify_response('move 5', 'It\'s @**foo**\'s (:blue_circle:) turn.', 0, + bot=bot, stream='test', subject='test game', user_name='baz') + + def test_private_message_error(self) -> None: + self.verify_response( + 'start game', 'If you are starting a game in private messages, you must invite players. Type `help` for commands.', 0, max_messages=1) + bot = self.add_user_to_cache('bar') + bot.invites = { + 'abcdefg': { + 'host': 'bar@example.com', + 'stream': 'test', + 'subject': 'test game' + } + } + self.verify_response( + 'join', 'You cannot join games in private messages. Type `help` for all commands.', 0, bot=bot, max_messages=1) + + def test_game_already_in_subject(self) -> None: + bot = self.add_user_to_cache('foo') + bot.invites = { + 'abcdefg': { + 'host': 'foo@example.com', + 'stream': 'test', + 'subject': 'test game' + } + } + self.verify_response('start game', 'There is already a game in this stream.', 0, + bot=bot, stream='test', subject='test game', user_name='baz', max_messages=1) + + # def test_not_authorized(self) -> None: + # bot = self.setup_game() + # self.verify_response('move 3', 'You are not authorized to send messages in this stream', 0, bot=bot, + # user_name='bar', stream='test', subject='test game', max_messages=1) + + def test_unknown_user(self) -> None: + self.verify_response('start game with @**bar**', + 'I don\'t know @**bar**. Tell them to say @**test-bot** register', 0) + self.verify_response('start game with bar@example.com', + 'I don\'t know bar@example.com. Tell them to use @**test-bot** register', 0) + + def test_is_user_not_player(self) -> None: + bot = self.add_user_to_cache('foo') + self.add_user_to_cache('baz', bot) + bot.invites = { + 'abcdefg': { + 'host': 'foo@example.com', + 'baz@example.com': 'a' + } + } + self.assertFalse(bot.is_user_not_player('foo@example.com')) + self.assertFalse(bot.is_user_not_player('baz@example.com')) + + def test_move_help_message(self) -> None: + bot = self.setup_game() + self.verify_response('move 123', '* To make your move during a game, type\n```move ```', + 0, bot=bot, stream='test', subject='test game') + + def test_invalid_move_message(self) -> None: + bot = self.setup_game() + self.verify_response('move 9', 'Invalid Move.', 0, + bot=bot, stream='test', subject='test game', max_messages=1) + + def test_get_game_id_by_email(self) -> None: + bot = self.setup_game() + self.assertEqual(bot.get_game_id_by_email('foo@example.com'), 'abc123') + + def test_game_over_and_leaderboard(self) -> None: + bot = self.setup_game() + bot.put_user_cache() + with patch('zulip_bots.bots.game_handler_bot.game_handler_bot.MockModel.determine_game_over', return_value='foo@example.com'): + self.verify_response('move 3', 'foo@example.com won! :tada:', + 1, bot=bot, stream='test', subject='test game') + leaderboard = '**Most wins**\n\n\ +Player | Games Won | Games Drawn | Games Lost | Total Games\n\ + --- | --- | --- | --- | --- \n\ + **foo** | 1 | 0 | 0 | 1\n\ + **baz** | 0 | 0 | 1 | 1\n\ + **test-bot** | 0 | 0 | 0 | 0' + self.verify_response('leaderboard', leaderboard, 0, bot=bot) + + def test_current_turn_winner(self) -> None: + bot = self.setup_game() + with patch('zulip_bots.bots.game_handler_bot.game_handler_bot.MockModel.determine_game_over', return_value='current turn'): + self.verify_response('move 3', 'foo@example.com won! :tada:', + 1, bot=bot, stream='test', subject='test game') + + def test_computer_turn(self) -> None: + bot = self.setup_computer_game() + self.verify_response('move 3', '**foo** moved in column 3\n\nfoo\n\nIt\'s @**test-bot**\'s (:red_circle:) turn.', + 0, bot=bot, stream='test', subject='test game') + with patch('zulip_bots.bots.game_handler_bot.game_handler_bot.MockModel.determine_game_over', return_value='test-bot@example.com'): + self.verify_response('move 5', 'I won! Well Played!', + 2, bot=bot, stream='test', subject='test game') + + def test_computer_endgame_responses(self) -> None: + bot = self.setup_computer_game() + with patch('zulip_bots.bots.game_handler_bot.game_handler_bot.MockModel.determine_game_over', return_value='foo@example.com'): + self.verify_response('move 5', 'You won! Nice!', + 2, bot=bot, stream='test', subject='test game') + bot = self.setup_computer_game() + with patch('zulip_bots.bots.game_handler_bot.game_handler_bot.MockModel.determine_game_over', return_value='draw'): + self.verify_response('move 5', 'It was a draw! Well Played!', + 2, bot=bot, stream='test', subject='test game') + + def test_add_user_statistics(self) -> None: + bot = self.add_user_to_cache('foo') + bot.add_user_statistics('foo@example.com', {'foo': 3}) + self.assertEqual(bot.user_cache['foo@example.com']['stats']['foo'], 3) + + def test_get_players(self) -> None: + bot = self.setup_game() + players = bot.get_players('abc123') + self.assertEqual(players, ['foo@example.com', 'baz@example.com']) + + def test_none_function_responses(self) -> None: + bot, bot_handler = self._get_handlers() + self.assertEqual(bot.get_players('abc'), []) + self.assertEqual(bot.get_user_by_name('no one'), {}) + self.assertEqual(bot.get_user_by_email('no one'), {}) + + def test_get_game_info(self) -> None: + bot = self.add_user_to_cache('foo') + self.add_user_to_cache('baz', bot) + bot.invites = { + 'abcdefg': { + 'host': 'foo@example.com', + 'baz@example.com': 'a', + 'stream': 'test', + 'subject': 'test game' + } + } + self.assertEqual(bot.get_game_info('abcdefg'), { + 'game_id': 'abcdefg', + 'type': 'invite', + 'stream': 'test', + 'subject': 'test game', + 'players': ['foo@example.com', 'baz@example.com'] + }) + + def test_parse_message(self) -> None: + bot = self.setup_game() + self.verify_response('move 3', 'Join your game using the link below!\n\n> **Game `abc123`**\n\ +> foo@example.com\n\ +> foo test game\n\ +> 2/2 players\n\ +> **[Join Game](/#narrow/stream/test/topic/test game)**', 0, bot=bot) + bot = self.setup_game() + self.verify_response('move 3', '''Your current game is not in this subject. \n\ +To move subjects, send your message again, otherwise join the game using the link below. + +> **Game `abc123`** +> foo@example.com +> foo test game +> 2/2 players +> **[Join Game](/#narrow/stream/test/topic/test game)**''', 0, bot=bot, stream='test 2', subject='game 2') + self.verify_response('move 3', 'foo', 0, bot=bot, + stream='test 2', subject='game 2') + + def test_change_game_subject(self) -> None: + bot = self.setup_game('abc123') + self.setup_game('abcdefg', bot, ['bar', 'abc'], 'test game 2', 'test2') + self.verify_response('move 3', '''Your current game is not in this subject. \n\ +To move subjects, send your message again, otherwise join the game using the link below. + +> **Game `abcdefg`** +> bar@example.com +> foo test game +> 2/2 players +> **[Join Game](/#narrow/stream/test2/topic/test game 2)**''', 0, bot=bot, user_name='bar', stream='test game', subject='test2') + self.verify_response('move 3', 'There is already a game in this subject.', + 0, bot=bot, user_name='bar', stream='test game', subject='test') + bot.invites = { + 'foo bar baz': { + 'host': 'foo@example.com', + 'baz@example.com': 'a', + 'stream': 'test', + 'subject': 'test game' + } + } + bot.change_game_subject('foo bar baz', 'test2', + 'game2', self.make_request_message('foo')) + self.assertEqual(bot.invites['foo bar baz']['stream'], 'test2') diff --git a/zulip_bots/zulip_bots/game_handler.py b/zulip_bots/zulip_bots/game_handler.py new file mode 100644 index 0000000..44b43b0 --- /dev/null +++ b/zulip_bots/zulip_bots/game_handler.py @@ -0,0 +1,822 @@ +import json +import re +import random +import logging +from copy import deepcopy +from typing import Any, Dict, Tuple, List +from zulip_bots.test_lib import BotTestCase +import operator +import random + + +class BadMoveException(Exception): + def __init__(self, message: str) -> None: + self.message = message + + def __str__(self) -> str: + return self.message + + +class GameAdapter(object): + ''' + Class that serves as a template to easily + create multiplayer games. + This class handles all commands, and creates + GameInstances which run the actual game logic. + ''' + + def __init__( + self, + game_name: str, + bot_name: str, + move_help_message: str, + move_regex: str, + model: Any, + gameMessageHandler: Any, + max_players: int=2, + min_players: int=2, + supports_computer: bool=False + ) -> None: + self.game_name = game_name + self.bot_name = bot_name + self.move_help_message = move_help_message + self.move_regex = re.compile(move_regex) + self.model = model + self.max_players = max_players + self.min_players = min_players + self.supports_computer = supports_computer + self.gameMessageHandler = gameMessageHandler() + self.invites = {} # type: Dict[str, Dict[str, str]] + self.instances = {} # type: Dict[str, Any] + self.user_cache = {} # type: Dict[str, Dict[str, Any]] + self.pending_subject_changes = [] # type: List[str] + self.stream = 'games' + + # Values are [won, lost, drawn, total] new values can be added, but MUST be added to the end of the list. + def add_user_statistics(self, user: str, values: Dict[str, int]) -> None: + self.get_user_cache() + current_values = {} # type: Dict[str, int] + if 'stats' in self.get_user_by_email(user).keys(): + current_values = self.user_cache[user]['stats'] + for key, value in values.items(): + if key not in current_values.keys(): + current_values.update({key: 0}) + current_values[key] += value + self.user_cache[user].update({'stats': current_values}) + self.put_user_cache() + + def help_message(self) -> str: + return '''** {} Bot Help:** +*Preface all commands with @**{}*** +* To start a game in a stream (*recommended*), type +`start game` +* To start a game against another player, type +`start game with @`{} +* To quit a game at any time, type +`quit` +* To end a game with a draw, type +`draw` +* To forfeit a game, type +`forfeit` +* To see the leaderboard, type +`leaderboard` +* To withdraw an invitation, type +`cancel game` +{}'''.format(self.game_name, self.get_bot_username(), self.play_with_computer_help(), self.move_help_message) + + def already_in_game_message(self) -> str: + return 'You are already in a game. Type `quit` to leave.' + + def confirm_new_invitation(self, opponent: str) -> str: + return 'You\'ve sent an invitation to play ' + self.game_name + ' with @**' +\ + self.get_user_by_email(opponent)['full_name'] + '**' + + def play_with_computer_help(self) -> str: + if self.supports_computer: + return '\n* To start a game with the computer, type\n`start game with` @**{}**'.format(self.get_bot_username()) + return '' + + def alert_new_invitation(self, game_id: str) -> str: + # Since the first player invites, the challenger is always the first player + return '**' + self.get_host(game_id) + ' has invited you to play a game of ' + self.game_name + '.**\n' +\ + self.get_formatted_game_object(game_id) + '\n\n' +\ + 'Type ```accept``` to accept the game invitation\n' +\ + 'Type ```decline``` to decline the game invitation.' + + def confirm_invitation_accepted(self, game_id: str) -> str: + host = self.invites[game_id]['host'] + return 'Accepted invitation to play **{}** from @**{}**.'.format(self.game_name, self.get_username_by_email(host)) + + def confirm_invitation_declined(self, game_id: str) -> str: + host = self.invites[game_id]['host'] + return 'Declined invitation to play **{}** from @**{}**.'.format(self.game_name, self.get_username_by_email(host)) + + def send_message(self, to: str, content: str, is_private: bool, subject: str='') -> None: + self.bot_handler.send_message(dict( + type='private' if is_private else 'stream', + to=to, + content=content, + subject=subject + )) + + def send_reply(self, original_message: Dict[str, Any], content: str) -> None: + self.bot_handler.send_reply(original_message, content) + + def usage(self) -> str: + return ''' + Bot that allows users to play another user + or the computer in a game of ''' + self.game_name + ''' + + To see the entire list of commands, type + @bot-name help + ''' + + def initialize(self, bot_handler: Any) -> None: + self.bot_handler = bot_handler + self.get_user_cache() + self.email = self.bot_handler.email + self.full_name = self.bot_handler.full_name + + def handle_message(self, message: Dict[str, Any], bot_handler: Any) -> None: + try: + self.bot_handler = bot_handler + content = message['content'].strip() + sender = message['sender_email'].lower() + message['sender_email'] = message['sender_email'].lower() + + if self.email not in self.user_cache.keys() and self.supports_computer: + self.add_user_to_cache({ + 'sender_email': self.email, + 'sender_full_name': self.full_name + }) + + if sender == self.email: + return + + if sender not in self.user_cache.keys(): + self.add_user_to_cache(message) + logging.info('Added {} to user cache'.format(sender)) + + if content.lower() == 'help' or content == '': + self.send_reply(message, self.help_message()) + return + + elif content.lower().startswith('start game with '): + self.command_start_game_with(message, sender, content) + + elif content.lower().startswith('start game'): + self.command_start_game(message, sender, content) + + elif content.lower() == 'accept': + self.command_accept(message, sender, content) + + elif content.lower() == 'decline': + self.command_decline(message, sender, content) + + elif content.lower() == 'quit': + self.command_quit(message, sender, content) + + elif content.lower() == 'register': + self.send_reply( + message, 'Hello @**{}**. Thanks for registering!'.format(message['sender_full_name'])) + + elif content.lower() == 'leaderboard': + self.command_leaderboard(message, sender, content) + + elif content.lower() == 'join': + self.command_join(message, sender, content) + + elif self.is_user_in_game(sender) is not '': + self.parse_message(message) + + elif self.move_regex.match(content) is not None or content.lower() == 'draw' or content.lower() == 'forfeit': + self.send_reply( + message, 'You are not in a game at the moment. Type `help` for help.') + else: + self.send_reply(message, self.help_message()) + except Exception as e: + logging.exception(str(e)) + self.bot_handler.send_reply(message, 'Error {}.'.format(e)) + + def is_user_in_game(self, user_email: str) -> str: + for instance in self.instances.values(): + if user_email in instance.players: + return instance.game_id + return '' + + def command_start_game_with(self, message: Dict[str, Any], sender: str, content: str) -> None: + if not self.is_user_not_player(sender, message): + self.send_reply( + message, self.already_in_game_message()) + return + users = content.replace('start game with ', '').strip().split(', ') + self.create_game_lobby(message, users) + + def command_start_game(self, message: Dict[str, Any], sender: str, content: str) -> None: + if message['type'] == 'private': + self.send_reply( + message, 'If you are starting a game in private messages, you must invite players. Type `help` for commands.') + if not self.is_user_not_player(sender, message): + self.send_reply( + message, self.already_in_game_message()) + return + self.create_game_lobby(message) + + def command_accept(self, message: Dict[str, Any], sender: str, content: str) -> None: + if not self.is_user_not_player(sender, message): + self.send_reply( + message, self.already_in_game_message()) + return + game_id = self.set_invite_by_user(sender, True, message) + if game_id is '': + self.send_reply( + message, 'No active invites. Type `help` for commands.') + return + if message['type'] == 'private': + self.send_reply(message, self.confirm_invitation_accepted(game_id)) + self.broadcast( + game_id, '@**{}** has accepted the invitation.'.format(self.get_username_by_email(sender))) + self.start_game_if_ready(game_id) + + def create_game_lobby(self, message: Dict[str, Any], users: List[str]=[]) -> None: + if self.is_game_in_subject(message['subject'], message['display_recipient']): + self.send_reply(message, 'There is already a game in this stream.') + return + if len(users) > 0: + users = self.verify_users(users, message=message) + if len(users) + 1 < self.min_players: + self.send_reply( + message, 'You must have at least {} players to play.\nGame cancelled.'.format(self.min_players)) + return + if len(users) + 1 > self.max_players: + self.send_reply( + message, 'The maximum number of players for this game is {}.'.format(self.max_players)) + return + game_id = self.generate_game_id() + stream_subject = '###private###' + if message['type'] == 'stream': + stream_subject = message['subject'] + self.invites[game_id] = {'host': message['sender_email'].lower( + ), 'subject': stream_subject, 'stream': message['display_recipient']} + if message['type'] == 'private': + self.invites[game_id]['stream'] = 'games' + for user in users: + self.send_invite(game_id, user, message) + if message['type'] == 'stream': + if len(users) > 0: + self.broadcast(game_id, 'If you were invited, and you\'re here, type "@**{}** accept" to accept the invite!'.format( + self.get_bot_username()), include_private=False) + if len(users) + 1 < self.max_players: + self.broadcast( + game_id, '**{}** wants to play **{}**. Type @**{}** join to play them!'.format( + self.get_username_by_email(message['sender_email']), + self.game_name, + self.get_bot_username()) + ) + if self.email in users: + self.broadcast(game_id, 'Wait... That\'s me!', + include_private=True) + if message['type'] == 'stream': + self.broadcast( + game_id, '@**{}** accept'.format(self.get_bot_username()), include_private=False) + game_id = self.set_invite_by_user( + self.email, True, {'type': 'stream'}) + self.start_game_if_ready(game_id) + + def command_decline(self, message: Dict[str, Any], sender: str, content: str) -> None: + if not self.is_user_not_player(sender, message): + self.send_reply( + message, self.already_in_game_message()) + return + game_id = self.set_invite_by_user(sender, False, message) + if game_id is '': + self.send_reply( + message, 'No active invites. Type `help` for commands.') + return + self.send_reply(message, self.confirm_invitation_declined(game_id)) + self.broadcast( + game_id, '@**{}** has declined the invitation.'.format(self.get_username_by_email(sender))) + if len(self.get_players(game_id, parameter='')) < self.min_players: + self.cancel_game(game_id) + + def command_quit(self, message: Dict[str, Any], sender: str, content: str) -> None: + game_id = self.get_game_id_by_email(sender) + if game_id is '': + self.send_reply( + message, 'You are not in a game. Type `help` for all commands.') + self.cancel_game(game_id, reason='{} quit.'.format(sender)) + + def command_join(self, message: Dict[str, Any], sender: str, content: str) -> None: + if not self.is_user_not_player(sender, message): + self.send_reply( + message, self.already_in_game_message()) + return + if message['type'] == 'private': + self.send_reply( + message, 'You cannot join games in private messages. Type `help` for all commands.') + return + game_id = self.get_invite_in_subject( + message['subject'], message['display_recipient']) + if game_id is '': + self.send_reply( + message, 'There is not a game in this subject. Type `help` for all commands.') + return + self.join_game(game_id, sender, message) + + def command_leaderboard(self, message: Dict[str, Any], sender: str, content: str) -> None: + stats = self.get_sorted_player_statistics() + num = 5 if len(stats) > 5 else len(stats) + top_stats = stats[0:num] + response = '**Most wins**\n\n' + raw_headers = ['games_won', 'games_drawn', 'games_lost', 'total_games'] + headers = ['Player'] + \ + [key.replace('_', ' ').title() for key in raw_headers] + response += ' | '.join(headers) + response += '\n' + ' | '.join([' --- ' for header in headers]) + for player, stat in top_stats: + response += '\n **{}** | '.format( + self.get_username_by_email(player)) + values = [str(stat[key]) for key in raw_headers] + response += ' | '.join(values) + self.send_reply(message, response) + return + + def get_sorted_player_statistics(self) -> List[Tuple[str, Dict[str, int]]]: + players = [] + for user_name, u in self.user_cache.items(): + if 'stats' in u.keys(): + players.append((user_name, u['stats'])) + return sorted( + players, + key=lambda player: (player[1]['games_won'], + player[1]['games_drawn'], + player[1]['total_games']), + reverse=True + ) + + def send_invite(self, game_id: str, user_email: str, message: Dict[str, Any]={}) -> None: + self.invites[game_id].update({user_email.lower(): 'p'}) + self.send_message(user_email, self.alert_new_invitation(game_id), True) + if message != {}: + self.send_reply(message, self.confirm_new_invitation(user_email)) + + def cancel_game(self, game_id: str, reason: str='') -> None: + if game_id in self.invites.keys(): + self.broadcast(game_id, 'Game cancelled.\n' + reason) + del self.invites[game_id] + return + if game_id in self.instances.keys(): + self.instances[game_id].broadcast('Game ended.\n' + reason) + del self.instances[game_id] + return + + def start_game_if_ready(self, game_id: str) -> None: + players = self.get_players(game_id) + if len(players) == self.max_players: + self.start_game(game_id) + + def start_game(self, game_id: str) -> None: + players = self.get_players(game_id) + subject = game_id + stream = self.invites[game_id]['stream'] + if self.invites[game_id]['subject'] != '###private###': + subject = self.invites[game_id]['subject'] + self.instances[game_id] = GameInstance( + self, False, subject, game_id, players, stream) + self.broadcast(game_id, 'The game has started in #{} {}'.format( + stream, self.instances[game_id].subject) + '\n' + self.get_formatted_game_object(game_id)) + del self.invites[game_id] + self.instances[game_id].start() + + def get_formatted_game_object(self, game_id: str) -> str: + object = '''> **Game `{}`** +> {} +> {} +> {}/{} players'''.format(game_id, self.get_host(game_id), self.game_name, self.get_number_of_players(game_id), self.max_players) + if game_id in self.instances.keys(): + instance = self.instances[game_id] + object += '\n> **[Join Game](/#narrow/stream/{}/topic/{})**'.format( + instance.stream, instance.subject) + return object + + def join_game(self, game_id: str, user_email: str, message: Dict[str, Any]={}) -> None: + if len(self.get_players(game_id)) >= self.max_players: + if message != {}: + self.send_reply(message, 'This game is full.') + return + self.invites[game_id].update({user_email: 'a'}) + self.broadcast( + game_id, '@**{}** has joined the game'.format(self.get_username_by_email(user_email))) + self.start_game_if_ready(game_id) + + def get_players(self, game_id: str, parameter: str='a') -> List[str]: + if game_id in self.invites.keys(): + players = [] # type: List[str] + if (self.invites[game_id]['subject'] == '###private###' and 'p' in parameter) or 'p' not in parameter: + players = [self.invites[game_id]['host']] + for player, accepted in self.invites[game_id].items(): + if player == 'host' or player == 'subject' or player == 'stream': + continue + if parameter in accepted: + players.append(player) + return players + if game_id in self.instances.keys() and 'p' not in parameter: + players = self.instances[game_id].players + return players + return [] + + def get_game_info(self, game_id: str) -> Dict[str, Any]: + game_info = {} # type: Dict[str, Any] + if game_id in self.instances.keys(): + instance = self.instances[game_id] + game_info = { + 'game_id': game_id, + 'type': 'instance', + 'stream': instance.stream, + 'subject': instance.subject, + 'players': self.get_players(game_id) + } + if game_id in self.invites.keys(): + invite = self.invites[game_id] + game_info = { + 'game_id': game_id, + 'type': 'invite', + 'stream': invite['stream'], + 'subject': invite['subject'], + 'players': self.get_players(game_id) + } + return game_info + + def get_user_by_name(self, name: str) -> Dict[str, Any]: + name = name.strip() + for user in self.user_cache.values(): + if 'full_name' in user.keys(): + if user['full_name'].lower() == name.lower(): + return user + return {} + + def get_number_of_players(self, game_id: str) -> int: + num = len(self.get_players(game_id)) + return num + + def get_host(self, game_id: str) -> str: + return self.get_players(game_id)[0] + + def parse_message(self, message: Dict[str, Any]) -> None: + game_id = self.is_user_in_game(message['sender_email']) + game = self.get_game_info(game_id) + if message['type'] == 'private': + self.send_reply(message, 'Join your game using the link below!\n\n{}'.format( + self.get_formatted_game_object(game_id))) + return + if game['subject'] != message['subject'] or game['stream'] != message['display_recipient']: + if game_id not in self.pending_subject_changes: + self.send_reply(message, 'Your current game is not in this subject. \n\ +To move subjects, send your message again, otherwise join the game using the link below.\n\n\ +{}'.format(self.get_formatted_game_object(game_id))) + self.pending_subject_changes.append(game_id) + return + self.pending_subject_changes.remove(game_id) + self.change_game_subject( + game_id, message['display_recipient'], message['subject'], message) + self.instances[game_id].handle_message( + message['content'], message['sender_email']) + + def change_game_subject( + self, + game_id: str, + stream_name: str, + subject_name: str, + message: Dict[str, Any]={} + ) -> None: + if self.get_game_instance_by_subject(stream_name, subject_name) is not None: + if message != {}: + self.send_reply( + message, 'There is already a game in this subject.') + return + if game_id in self.instances.keys(): + self.instances[game_id].change_subject(stream_name, subject_name) + if game_id in self.invites.keys(): + invite = self.invites[game_id] + invite['stream'] = stream_name + invite['subject'] = stream_name + + def set_invite_by_user(self, user_email: str, is_accepted: bool, message: Dict[str, Any]) -> str: + user_email = user_email.lower() + for game, users in self.invites.items(): + if user_email in users.keys(): + if is_accepted: + if message['type'] == 'private': + users[user_email] = 'pa' + else: + users[user_email] = 'a' + else: + users.pop(user_email) + return game + return '' + + def add_user_to_cache(self, message: Dict[str, Any]) -> None: + user = { + 'email': message['sender_email'].lower(), + 'full_name': message['sender_full_name'], + 'stats': { + 'total_games': 0, + 'games_won': 0, + 'games_lost': 0, + 'games_drawn': 0 + } + } + self.user_cache.update({message['sender_email'].lower(): user}) + self.put_user_cache() + + def put_user_cache(self) -> Dict[str, Any]: + user_cache_str = json.dumps(self.user_cache) + self.bot_handler.storage.put('users', user_cache_str) + return self.user_cache + + def get_user_cache(self) -> Dict[str, Any]: + try: + user_cache_str = self.bot_handler.storage.get('users') + except KeyError as e: + return {} + self.user_cache = json.loads(user_cache_str) + return self.user_cache + + def verify_users(self, users: List[str], message: Dict[str, Any]={}) -> List[str]: + verified_users = [] + failed = False + for u in users: + user = u.strip().lstrip('@**').rstrip('**') + if user == self.get_bot_username() or user == self.email: + self.send_reply( + message, 'You cannot play against the computer in this game.') + if '@' not in user: + user_obj = self.get_user_by_name(user) + if user_obj == {}: + self.send_reply( + message, 'I don\'t know {}. Tell them to say @**{}** register'.format(u, self.get_bot_username())) + failed = True + continue + user = user_obj['email'] + if self.is_user_not_player(user, message): + verified_users.append(user) + else: + failed = True + if failed: + return [] + else: + return verified_users + + def get_game_instance_by_subject(self, subject_name: str, stream_name: str) -> Any: + for instance in self.instances.values(): + if instance.subject == subject_name and instance.stream == stream_name: + return instance + return None + + def get_invite_in_subject(self, subject_name: str, stream_name: str) -> str: + for key, invite in self.invites.items(): + if invite['subject'] == subject_name and invite['stream'] == stream_name: + return key + return '' + + def is_game_in_subject(self, subject_name: str, stream_name: str) -> bool: + return self.get_invite_in_subject(subject_name, stream_name) is not '' or \ + self.get_game_instance_by_subject( + subject_name, stream_name) is not None + + def is_user_not_player(self, user_email: str, message: Dict[str, Any]={}) -> bool: + user = self.get_user_by_email(user_email) + if user == {}: + if message != {}: + self.send_reply(message, 'I don\'t know {}. Tell them to use @**{}** register'.format( + user_email, self.get_bot_username())) + return False + for instance in self.instances.values(): + if user_email in instance.players: + return False + for invite in self.invites.values(): + for u in invite.keys(): + if u == 'host': + if user_email == invite['host']: + return False + if u == user_email and 'a' in invite[u]: + return False + return True + + def generate_game_id(self) -> str: + id = '' + valid_characters = 'abcdefghijklmnopqrstuvwxyz0123456789' + for i in range(6): + id += valid_characters[random.randrange(0, len(valid_characters))] + return id + + def broadcast(self, game_id: str, content: str, include_private: bool=True) -> bool: + if include_private: + private_recipients = self.get_players(game_id, parameter='p') + if private_recipients is not None: + for user in private_recipients: + self.send_message(user, content, True) + if game_id in self.invites.keys(): + if self.invites[game_id]['subject'] != '###private###': + self.send_message( + self.invites[game_id]['stream'], content, False, self.invites[game_id]['subject']) + return True + if game_id in self.instances.keys(): + self.send_message( + self.instances[game_id].stream, content, False, self.instances[game_id].subject) + return True + return False + + def get_username_by_email(self, user_email: str) -> str: + return self.get_user_by_email(user_email)['full_name'] + + def get_user_by_email(self, user_email: str) -> Dict[str, Any]: + if user_email in self.user_cache: + return self.user_cache[user_email] + return {} + + def get_game_id_by_email(self, user_email: str) -> str: + for instance in self.instances.values(): + if user_email in instance.players: + return instance.game_id + for game_id in self.invites.keys(): + players = self.get_players(game_id) + if user_email in players: + return game_id + return '' + + def get_bot_username(self) -> str: + return self.bot_handler.full_name + + +class GameInstance(object): + ''' + The GameInstance class handles the game logic for a certain game, + and is associated with a certain stream. + + It gets player info from GameAdapter + + It only runs when the game is being played, not in the invite + or waiting states. + ''' + + def __init__(self, gameAdapter: GameAdapter, is_private: bool, subject: str, game_id: str, players: List[str], stream: str) -> None: + self.gameAdapter = gameAdapter + self.is_private = is_private + self.subject = subject + self.game_id = game_id + self.players = players + self.stream = stream + self.model = deepcopy(self.gameAdapter.model()) + self.board = self.model.current_board + self.turn = random.randrange(0, len(players)) - 1 + self.current_draw = {} # type: Dict[str, bool] + self.current_messages = [] # type: List[str] + self.is_changing_subject = False + + def start(self) -> None: + self.current_messages.append(self.get_start_message()) + self.current_messages.append(self.parse_current_board()) + self.next_turn() + + def change_subject(self, stream: str, subject: str) -> None: + self.stream = stream + self.subject = subject + self.current_messages.append(self.parse_current_board()) + self.broadcast_current_message() + + def get_player_text(self) -> str: + player_text = '' + for player in self.players: + player_text += ' @**{}**'.format( + self.gameAdapter.get_user_by_email(player)['full_name']) + return player_text + + def get_start_message(self) -> str: + start_message = 'Game `{}` started.\n*Remember to start your message with* @**{}**'.format( + self.game_id, self.gameAdapter.get_bot_username()) + if not self.is_private: + player_text = '\n**Players**' + player_text += self.get_player_text() + start_message += player_text + start_message += '\n' + self.gameAdapter.gameMessageHandler.game_start_message() + return start_message + + def handle_message(self, content: str, player_email: str) -> None: + if content == 'forfeit': + player_name = self.gameAdapter.get_username_by_email(player_email) + self.broadcast('**{}** forfeited!'.format(player_name)) + self.end_game('except:' + player_email) + return + if content == 'draw': + if player_email in self.current_draw.keys(): + self.current_draw[player_email] = True + else: + self.current_draw = {p: False for p in self.players} + self.broadcast('**{}** has voted for a draw!\nType `draw` to accept'.format( + self.gameAdapter.get_username_by_email(player_email))) + self.current_draw[player_email] = True + if self.check_draw(): + self.end_game('draw') + return + if self.is_turn_of(player_email): + self.handle_current_player_command(content) + else: + self.broadcast('It\'s @**{}**\'s ({}) turn.'.format( + self.gameAdapter.get_username_by_email( + self.players[self.turn]), + self.gameAdapter.gameMessageHandler.get_player_color(self.turn))) + + def broadcast(self, content: str) -> None: + self.gameAdapter.broadcast(self.game_id, content) + + def check_draw(self) -> bool: + for d in self.current_draw.values(): + if not d: + return False + return len(self.current_draw.values()) > 0 + + def handle_current_player_command(self, content: str) -> None: + re_result = self.gameAdapter.move_regex.match(content) + if re_result is None: + self.broadcast(self.gameAdapter.move_help_message) + return + self.make_move(content, False) + + def make_move(self, content: str, is_computer: bool) -> None: + try: + board = self.model.make_move(content, self.turn, is_computer) + except BadMoveException as e: + self.broadcast(e.message) + return + if not is_computer: + self.current_messages.append(self.gameAdapter.gameMessageHandler.alert_move_message( + '**{}**'.format(self.gameAdapter.get_username_by_email(self.players[self.turn])), content)) + self.current_messages.append(self.parse_current_board()) + game_over = self.model.determine_game_over(self.players) + if game_over: + self.broadcast_current_message() + if game_over == 'current turn': + game_over = self.players[self.turn] + self.end_game(game_over) + return + self.next_turn() + + def is_turn_of(self, player_email: str) -> bool: + return self.players[self.turn].lower() == player_email.lower() + + def next_turn(self) -> None: + self.turn += 1 + if self.turn >= len(self.players): + self.turn = 0 + self.current_messages.append('It\'s @**{}**\'s ({}) turn.'.format( + self.gameAdapter.get_username_by_email(self.players[self.turn]), + self.gameAdapter.gameMessageHandler.get_player_color(self.turn) + )) + self.broadcast_current_message() + if self.players[self.turn] == self.gameAdapter.email: + self.make_move('', True) + + def broadcast_current_message(self) -> None: + content = '\n\n'.join(self.current_messages) + self.broadcast(content) + self.current_messages = [] + + def parse_current_board(self) -> Any: + return self.gameAdapter.gameMessageHandler.parse_board(self.model.current_board) + + def end_game(self, winner: str) -> None: + loser = '' + if winner == 'draw': + self.broadcast('It was a draw!') + elif winner.startswith('except:'): + loser = winner.lstrip('except:') + else: + self.broadcast('{} won! :tada:'.format(winner)) + for u in self.players: + values = {'total_games': 1, 'games_won': 0, + 'games_lost': 0, 'games_drawn': 0} + if loser == '': + if u == winner: + values.update({'games_won': 1}) + elif winner == 'draw': + values.update({'games_drawn': 1}) + else: + values.update({'games_lost': 1}) + else: + if u == loser: + values.update({'games_lost': 1}) + else: + values.update({'games_won': 1}) + self.gameAdapter.add_user_statistics(u, values) + if self.gameAdapter.email in self.players: + self.send_win_responses(winner) + self.gameAdapter.cancel_game(self.game_id) + + def send_win_responses(self, winner: str) -> None: + if winner == self.gameAdapter.email: + self.broadcast('I won! Well Played!') + elif winner == 'draw': + self.broadcast('It was a draw! Well Played!') + else: + self.broadcast('You won! Nice!') diff --git a/zulip_bots/zulip_bots/test_lib.py b/zulip_bots/zulip_bots/test_lib.py index 18dc315..bc44197 100755 --- a/zulip_bots/zulip_bots/test_lib.py +++ b/zulip_bots/zulip_bots/test_lib.py @@ -21,6 +21,8 @@ class StubBotHandler: def __init__(self): # type: () -> None self.storage = SimpleStorage() + self.full_name = 'test-bot' + self.email = 'test-bot@example.com' self.message_server = SimpleMessageServer() self.reset_transcript()