zulip_bots: Consolidate game-playing bots.

This commit is contained in:
fredfishgames 2018-01-19 15:55:18 +00:00 committed by showell
parent 25820c9bb2
commit ec5be8fc7e
6 changed files with 1398 additions and 0 deletions

View file

@ -14,6 +14,7 @@ zulip_bots # This directory
├───zulip_bots # `zulip_bots` package.
│ ├───bots/ # Actively maintained and tested bots.
│ ├───bots_unmaintained/ # Unmaintained, potentially broken bots.
│ ├───game_handler.py # Handles game-related bots.
│ ├───lib.py # Backbone of run.py
│ ├───provision.py # Creates a development environment.
│ ├───run.py # Used to run bots.

View file

@ -0,0 +1,72 @@
from zulip_bots.game_handler import GameAdapter, BadMoveException
from typing import List, Any
class GameHandlerBotMessageHandler(object):
tokens = [':blue_circle:', ':red_circle:']
def parse_board(self, board: Any) -> str:
return 'foo'
def get_player_color(self, turn: int) -> str:
return self.tokens[turn]
def alert_move_message(self, original_player: str, move_info: str) -> str:
column_number = move_info.replace('move ', '')
return original_player + ' moved in column ' + column_number
def game_start_message(self) -> str:
return 'Type `move <column>` to place a token.\n \
The first player to get 4 in a row wins!\n \
Good Luck!'
class MockModel(object):
def __init__(self) -> None:
self.current_board = 'mock board'
def make_move(
self,
move: str,
player: int,
is_computer: bool=False
) -> Any:
if not is_computer:
if int(move.replace('move ', '')) < 9:
return 'mock board'
else:
raise BadMoveException('Invalid Move.')
return 'mock board'
def determine_game_over(self, players: List[str]) -> None:
return None
class GameHandlerBotHandler(GameAdapter):
'''
DO NOT USE THIS BOT
This bot is used to test game_handler.py
'''
def __init__(self) -> None:
game_name = 'foo test game'
bot_name = 'game_handler_bot'
move_help_message = '* To make your move during a game, type\n' \
'```move <column-number>```'
move_regex = 'move (\d)$'
model = MockModel
gameMessageHandler = GameHandlerBotMessageHandler
super(GameHandlerBotHandler, self).__init__(
game_name,
bot_name,
move_help_message,
move_regex,
model,
gameMessageHandler,
max_players=2,
supports_computer=True
)
handler_class = GameHandlerBotHandler

View file

@ -0,0 +1,501 @@
from zulip_bots.test_lib import BotTestCase
from zulip_bots.game_handler import GameInstance
from contextlib import contextmanager
from mock import MagicMock, patch
from typing import Any, Dict, List
class TestGameHandlerBot(BotTestCase):
bot_name = 'game_handler_bot'
def make_request_message(
self,
content: str,
user: str='foo@example.com',
user_name: str='foo',
type: str='private',
stream: str='',
subject: str=''
) -> Dict[str, str]:
message = dict(
sender_email=user,
sender_full_name=user_name,
content=content,
type=type,
display_recipient=stream,
subject=subject,
)
return message
# Function that serves similar purpose to BotTestCase.verify_dialog, but allows for multiple responses to be handled
def verify_response(
self,
request: str,
expected_response: str,
response_number: int,
bot: Any=None,
user_name: str='foo',
stream: str='',
subject: str='',
max_messages: int=20
) -> None:
'''
This function serves a similar purpose
to BotTestCase.verify_dialog, but allows
for multiple responses to be validated,
and for mocking of the bot's internal data
'''
if bot is None:
bot, bot_handler = self._get_handlers()
else:
_b, bot_handler = self._get_handlers()
type = 'private' if stream == '' else 'stream'
message = self.make_request_message(
request, user_name + '@example.com', user_name, type, stream, subject)
bot_handler.reset_transcript()
bot.handle_message(message, bot_handler)
responses = [
message
for (method, message)
in bot_handler.transcript
]
first_response = responses[response_number]
self.assertEqual(expected_response, first_response['content'])
self.assertLessEqual(len(responses), max_messages)
def add_user_to_cache(self, name: str, bot: Any=None) -> Any:
if bot is None:
bot, bot_handler = self._get_handlers()
message = {
'sender_email': '{}@example.com'.format(name),
'sender_full_name': '{}'.format(name)
}
bot.add_user_to_cache(message)
return bot
def setup_game(self, id: str='', bot: Any=None, players: List[str]=['foo', 'baz'], subject: str='test game', stream: str='test') -> Any:
if bot is None:
bot, bot_handler = self._get_handlers()
for p in players:
self.add_user_to_cache(p, bot)
players_emails = [p + '@example.com' for p in players]
game_id = 'abc123'
if id != '':
game_id = id
instance = GameInstance(bot, False, subject,
game_id, players_emails, stream)
bot.instances.update({game_id: instance})
instance.turn = -1
instance.start()
return bot
def setup_computer_game(self) -> Any:
bot = self.add_user_to_cache('foo')
bot.email = 'test-bot@example.com'
self.add_user_to_cache('test-bot', bot)
instance = GameInstance(bot, False, 'test game', 'abc123', [
'foo@example.com', 'test-bot@example.com'], 'test')
bot.instances.update({'abc123': instance})
instance.start()
return bot
def help_message(self) -> str:
return '''** foo test game Bot Help:**
*Preface all commands with @**test-bot***
* To start a game in a stream (*recommended*), type
`start game`
* To start a game against another player, type
`start game with @<player-name>`
* To start a game with the computer, type
`start game with` @**test-bot**
* To quit a game at any time, type
`quit`
* To end a game with a draw, type
`draw`
* To forfeit a game, type
`forfeit`
* To see the leaderboard, type
`leaderboard`
* To withdraw an invitation, type
`cancel game`
* To make your move during a game, type
```move <column-number>```'''
def test_help_message(self) -> None:
self.verify_response('help', self.help_message(), 0)
self.verify_response('foo bar baz', self.help_message(), 0)
def test_exception_handling(self) -> None:
with patch('logging.exception'), \
patch('zulip_bots.game_handler.GameAdapter.command_quit',
side_effect=Exception):
self.verify_response('quit', 'Error .', 0)
def test_not_in_game_messages(self) -> None:
self.verify_response(
'move 3', 'You are not in a game at the moment. Type `help` for help.', 0, max_messages=1)
self.verify_response(
'quit', 'You are not in a game. Type `help` for all commands.', 0, max_messages=1)
def test_start_game_with_name(self) -> None:
bot = self.add_user_to_cache('baz')
self.verify_response('start game with @**baz**',
'You\'ve sent an invitation to play foo test game with @**baz**', 1, bot=bot)
self.assertEqual(len(bot.invites), 1)
def test_start_game_with_email(self) -> None:
bot = self.add_user_to_cache('baz')
self.verify_response('start game with baz@example.com',
'You\'ve sent an invitation to play foo test game with @**baz**', 1, bot=bot)
self.assertEqual(len(bot.invites), 1)
def test_join_game_and_start_in_stream(self) -> None:
bot = self.add_user_to_cache('baz')
self.add_user_to_cache('foo', bot)
bot.invites = {
'abc': {
'stream': 'test',
'subject': 'test game',
'host': 'foo@example.com'
}
}
self.verify_response('join', '@**baz** has joined the game', 0, bot=bot,
stream='test', subject='test game', user_name='baz')
self.assertEqual(len(bot.instances.keys()), 1)
def test_start_game_in_stream(self) -> None:
self.verify_response(
'start game',
'**foo** wants to play **foo test game**. Type @**test-bot** join to play them!',
0,
stream='test',
subject='test game'
)
def test_start_invite_game_in_stream(self) -> None:
bot = self.add_user_to_cache('baz')
self.verify_response(
'start game with @**baz**',
'If you were invited, and you\'re here, type "@**test-bot** accept" to accept the invite!',
2,
bot=bot,
stream='test',
subject='game test'
)
def test_join_no_game(self) -> None:
self.verify_response('join', 'There is not a game in this subject. Type `help` for all commands.',
0, stream='test', subject='test game', user_name='baz', max_messages=1)
def test_accept_invitation(self) -> None:
bot = self.add_user_to_cache('baz')
self.add_user_to_cache('foo', bot)
bot.invites = {
'abc': {
'subject': '###private###',
'stream': 'games',
'host': 'foo@example.com',
'baz@example.com': 'p'
}
}
self.verify_response(
'accept', 'Accepted invitation to play **foo test game** from @**foo**.', 0, bot, 'baz')
def test_decline_invitation(self) -> None:
bot = self.add_user_to_cache('baz')
self.add_user_to_cache('foo', bot)
bot.invites = {
'abc': {
'subject': '###private###',
'host': 'foo@example.com',
'baz@example.com': 'p'
}
}
self.verify_response(
'decline', 'Declined invitation to play **foo test game** from @**foo**.', 0, bot, 'baz')
def test_quit_invite(self) -> None:
bot = self.add_user_to_cache('foo')
bot.invites = {
'abc': {
'subject': '###private###',
'host': 'foo@example.com'
}
}
self.verify_response(
'quit', 'Game cancelled.\nfoo@example.com quit.', 0, bot, 'foo')
def test_user_already_in_game_errors(self) -> None:
bot = self.setup_game()
self.verify_response('start game with @**baz**',
'You are already in a game. Type `quit` to leave.', 0, bot=bot, max_messages=1)
self.verify_response(
'start game', 'You are already in a game. Type `quit` to leave.', 0, bot=bot, stream='test', max_messages=1)
self.verify_response(
'accept', 'You are already in a game. Type `quit` to leave.', 0, bot=bot, max_messages=1)
self.verify_response(
'decline', 'You are already in a game. Type `quit` to leave.', 0, bot=bot, max_messages=1)
self.verify_response(
'join', 'You are already in a game. Type `quit` to leave.', 0, bot=bot, max_messages=1)
def test_register_command(self) -> None:
bot = self.add_user_to_cache('foo')
self.verify_response(
'register', 'Hello @**foo**. Thanks for registering!', 0, bot, 'foo')
self.assertIn('foo@example.com', bot.user_cache.keys())
def test_no_active_invite_errors(self) -> None:
self.verify_response(
'accept', 'No active invites. Type `help` for commands.', 0)
self.verify_response(
'decline', 'No active invites. Type `help` for commands.', 0)
def test_wrong_number_of_players_message(self) -> None:
bot = self.add_user_to_cache('baz')
bot.min_players = 5
self.verify_response('start game with @**baz**',
'You must have at least 5 players to play.\nGame cancelled.', 0, bot=bot)
bot.min_players = 2
bot.max_players = 1
self.verify_response('start game with @**baz**',
'The maximum number of players for this game is 1.', 0, bot=bot)
bot.max_players = 1
bot.invites = {
'abc': {
'stream': 'test',
'subject': 'test game',
'host': 'foo@example.com'
}
}
self.verify_response('join', 'This game is full.', 0, bot=bot,
stream='test', subject='test game', user_name='baz')
def test_public_accept(self) -> None:
bot = self.add_user_to_cache('baz')
self.add_user_to_cache('foo', bot)
bot.invites = {
'abc': {
'stream': 'test',
'subject': 'test game',
'host': 'baz@example.com',
'foo@example.com': 'p'
}
}
self.verify_response('accept', '@**foo** has accepted the invitation.',
0, bot=bot, stream='test', subject='test game')
def test_start_game_with_computer(self) -> None:
self.verify_response('start game with @**test-bot**',
'Wait... That\'s me!', 4, stream='test', subject='test game')
def test_sent_by_bot(self) -> None:
with self.assertRaises(IndexError):
self.verify_response(
'foo', '', 0, user_name='test-bot', stream='test', subject='test game')
def test_forfeit(self) -> None:
bot = self.setup_game()
self.verify_response('forfeit', '**foo** forfeited!',
0, bot=bot, stream='test', subject='test game')
def test_draw(self) -> None:
bot = self.setup_game()
self.verify_response('draw', '**foo** has voted for a draw!\nType `draw` to accept',
0, bot=bot, stream='test', subject='test game')
self.verify_response('draw', 'It was a draw!', 0, bot=bot, stream='test',
subject='test game', user_name='baz')
def test_normal_turns(self) -> None:
bot = self.setup_game()
self.verify_response('move 3', '**foo** moved in column 3\n\nfoo\n\nIt\'s @**baz**\'s (:red_circle:) turn.',
0, bot=bot, stream='test', subject='test game')
self.verify_response('move 3', '**baz** moved in column 3\n\nfoo\n\nIt\'s @**foo**\'s (:blue_circle:) turn.',
0, bot=bot, stream='test', subject='test game', user_name='baz')
def test_wrong_turn(self) -> None:
bot = self.setup_game()
self.verify_response('move 5', 'It\'s @**foo**\'s (:blue_circle:) turn.', 0,
bot=bot, stream='test', subject='test game', user_name='baz')
def test_private_message_error(self) -> None:
self.verify_response(
'start game', 'If you are starting a game in private messages, you must invite players. Type `help` for commands.', 0, max_messages=1)
bot = self.add_user_to_cache('bar')
bot.invites = {
'abcdefg': {
'host': 'bar@example.com',
'stream': 'test',
'subject': 'test game'
}
}
self.verify_response(
'join', 'You cannot join games in private messages. Type `help` for all commands.', 0, bot=bot, max_messages=1)
def test_game_already_in_subject(self) -> None:
bot = self.add_user_to_cache('foo')
bot.invites = {
'abcdefg': {
'host': 'foo@example.com',
'stream': 'test',
'subject': 'test game'
}
}
self.verify_response('start game', 'There is already a game in this stream.', 0,
bot=bot, stream='test', subject='test game', user_name='baz', max_messages=1)
# def test_not_authorized(self) -> None:
# bot = self.setup_game()
# self.verify_response('move 3', 'You are not authorized to send messages in this stream', 0, bot=bot,
# user_name='bar', stream='test', subject='test game', max_messages=1)
def test_unknown_user(self) -> None:
self.verify_response('start game with @**bar**',
'I don\'t know @**bar**. Tell them to say @**test-bot** register', 0)
self.verify_response('start game with bar@example.com',
'I don\'t know bar@example.com. Tell them to use @**test-bot** register', 0)
def test_is_user_not_player(self) -> None:
bot = self.add_user_to_cache('foo')
self.add_user_to_cache('baz', bot)
bot.invites = {
'abcdefg': {
'host': 'foo@example.com',
'baz@example.com': 'a'
}
}
self.assertFalse(bot.is_user_not_player('foo@example.com'))
self.assertFalse(bot.is_user_not_player('baz@example.com'))
def test_move_help_message(self) -> None:
bot = self.setup_game()
self.verify_response('move 123', '* To make your move during a game, type\n```move <column-number>```',
0, bot=bot, stream='test', subject='test game')
def test_invalid_move_message(self) -> None:
bot = self.setup_game()
self.verify_response('move 9', 'Invalid Move.', 0,
bot=bot, stream='test', subject='test game', max_messages=1)
def test_get_game_id_by_email(self) -> None:
bot = self.setup_game()
self.assertEqual(bot.get_game_id_by_email('foo@example.com'), 'abc123')
def test_game_over_and_leaderboard(self) -> None:
bot = self.setup_game()
bot.put_user_cache()
with patch('zulip_bots.bots.game_handler_bot.game_handler_bot.MockModel.determine_game_over', return_value='foo@example.com'):
self.verify_response('move 3', 'foo@example.com won! :tada:',
1, bot=bot, stream='test', subject='test game')
leaderboard = '**Most wins**\n\n\
Player | Games Won | Games Drawn | Games Lost | Total Games\n\
--- | --- | --- | --- | --- \n\
**foo** | 1 | 0 | 0 | 1\n\
**baz** | 0 | 0 | 1 | 1\n\
**test-bot** | 0 | 0 | 0 | 0'
self.verify_response('leaderboard', leaderboard, 0, bot=bot)
def test_current_turn_winner(self) -> None:
bot = self.setup_game()
with patch('zulip_bots.bots.game_handler_bot.game_handler_bot.MockModel.determine_game_over', return_value='current turn'):
self.verify_response('move 3', 'foo@example.com won! :tada:',
1, bot=bot, stream='test', subject='test game')
def test_computer_turn(self) -> None:
bot = self.setup_computer_game()
self.verify_response('move 3', '**foo** moved in column 3\n\nfoo\n\nIt\'s @**test-bot**\'s (:red_circle:) turn.',
0, bot=bot, stream='test', subject='test game')
with patch('zulip_bots.bots.game_handler_bot.game_handler_bot.MockModel.determine_game_over', return_value='test-bot@example.com'):
self.verify_response('move 5', 'I won! Well Played!',
2, bot=bot, stream='test', subject='test game')
def test_computer_endgame_responses(self) -> None:
bot = self.setup_computer_game()
with patch('zulip_bots.bots.game_handler_bot.game_handler_bot.MockModel.determine_game_over', return_value='foo@example.com'):
self.verify_response('move 5', 'You won! Nice!',
2, bot=bot, stream='test', subject='test game')
bot = self.setup_computer_game()
with patch('zulip_bots.bots.game_handler_bot.game_handler_bot.MockModel.determine_game_over', return_value='draw'):
self.verify_response('move 5', 'It was a draw! Well Played!',
2, bot=bot, stream='test', subject='test game')
def test_add_user_statistics(self) -> None:
bot = self.add_user_to_cache('foo')
bot.add_user_statistics('foo@example.com', {'foo': 3})
self.assertEqual(bot.user_cache['foo@example.com']['stats']['foo'], 3)
def test_get_players(self) -> None:
bot = self.setup_game()
players = bot.get_players('abc123')
self.assertEqual(players, ['foo@example.com', 'baz@example.com'])
def test_none_function_responses(self) -> None:
bot, bot_handler = self._get_handlers()
self.assertEqual(bot.get_players('abc'), [])
self.assertEqual(bot.get_user_by_name('no one'), {})
self.assertEqual(bot.get_user_by_email('no one'), {})
def test_get_game_info(self) -> None:
bot = self.add_user_to_cache('foo')
self.add_user_to_cache('baz', bot)
bot.invites = {
'abcdefg': {
'host': 'foo@example.com',
'baz@example.com': 'a',
'stream': 'test',
'subject': 'test game'
}
}
self.assertEqual(bot.get_game_info('abcdefg'), {
'game_id': 'abcdefg',
'type': 'invite',
'stream': 'test',
'subject': 'test game',
'players': ['foo@example.com', 'baz@example.com']
})
def test_parse_message(self) -> None:
bot = self.setup_game()
self.verify_response('move 3', 'Join your game using the link below!\n\n> **Game `abc123`**\n\
> foo@example.com\n\
> foo test game\n\
> 2/2 players\n\
> **[Join Game](/#narrow/stream/test/topic/test game)**', 0, bot=bot)
bot = self.setup_game()
self.verify_response('move 3', '''Your current game is not in this subject. \n\
To move subjects, send your message again, otherwise join the game using the link below.
> **Game `abc123`**
> foo@example.com
> foo test game
> 2/2 players
> **[Join Game](/#narrow/stream/test/topic/test game)**''', 0, bot=bot, stream='test 2', subject='game 2')
self.verify_response('move 3', 'foo', 0, bot=bot,
stream='test 2', subject='game 2')
def test_change_game_subject(self) -> None:
bot = self.setup_game('abc123')
self.setup_game('abcdefg', bot, ['bar', 'abc'], 'test game 2', 'test2')
self.verify_response('move 3', '''Your current game is not in this subject. \n\
To move subjects, send your message again, otherwise join the game using the link below.
> **Game `abcdefg`**
> bar@example.com
> foo test game
> 2/2 players
> **[Join Game](/#narrow/stream/test2/topic/test game 2)**''', 0, bot=bot, user_name='bar', stream='test game', subject='test2')
self.verify_response('move 3', 'There is already a game in this subject.',
0, bot=bot, user_name='bar', stream='test game', subject='test')
bot.invites = {
'foo bar baz': {
'host': 'foo@example.com',
'baz@example.com': 'a',
'stream': 'test',
'subject': 'test game'
}
}
bot.change_game_subject('foo bar baz', 'test2',
'game2', self.make_request_message('foo'))
self.assertEqual(bot.invites['foo bar baz']['stream'], 'test2')

View file

@ -0,0 +1,822 @@
import json
import re
import random
import logging
from copy import deepcopy
from typing import Any, Dict, Tuple, List
from zulip_bots.test_lib import BotTestCase
import operator
import random
class BadMoveException(Exception):
def __init__(self, message: str) -> None:
self.message = message
def __str__(self) -> str:
return self.message
class GameAdapter(object):
'''
Class that serves as a template to easily
create multiplayer games.
This class handles all commands, and creates
GameInstances which run the actual game logic.
'''
def __init__(
self,
game_name: str,
bot_name: str,
move_help_message: str,
move_regex: str,
model: Any,
gameMessageHandler: Any,
max_players: int=2,
min_players: int=2,
supports_computer: bool=False
) -> None:
self.game_name = game_name
self.bot_name = bot_name
self.move_help_message = move_help_message
self.move_regex = re.compile(move_regex)
self.model = model
self.max_players = max_players
self.min_players = min_players
self.supports_computer = supports_computer
self.gameMessageHandler = gameMessageHandler()
self.invites = {} # type: Dict[str, Dict[str, str]]
self.instances = {} # type: Dict[str, Any]
self.user_cache = {} # type: Dict[str, Dict[str, Any]]
self.pending_subject_changes = [] # type: List[str]
self.stream = 'games'
# Values are [won, lost, drawn, total] new values can be added, but MUST be added to the end of the list.
def add_user_statistics(self, user: str, values: Dict[str, int]) -> None:
self.get_user_cache()
current_values = {} # type: Dict[str, int]
if 'stats' in self.get_user_by_email(user).keys():
current_values = self.user_cache[user]['stats']
for key, value in values.items():
if key not in current_values.keys():
current_values.update({key: 0})
current_values[key] += value
self.user_cache[user].update({'stats': current_values})
self.put_user_cache()
def help_message(self) -> str:
return '''** {} Bot Help:**
*Preface all commands with @**{}***
* To start a game in a stream (*recommended*), type
`start game`
* To start a game against another player, type
`start game with @<player-name>`{}
* To quit a game at any time, type
`quit`
* To end a game with a draw, type
`draw`
* To forfeit a game, type
`forfeit`
* To see the leaderboard, type
`leaderboard`
* To withdraw an invitation, type
`cancel game`
{}'''.format(self.game_name, self.get_bot_username(), self.play_with_computer_help(), self.move_help_message)
def already_in_game_message(self) -> str:
return 'You are already in a game. Type `quit` to leave.'
def confirm_new_invitation(self, opponent: str) -> str:
return 'You\'ve sent an invitation to play ' + self.game_name + ' with @**' +\
self.get_user_by_email(opponent)['full_name'] + '**'
def play_with_computer_help(self) -> str:
if self.supports_computer:
return '\n* To start a game with the computer, type\n`start game with` @**{}**'.format(self.get_bot_username())
return ''
def alert_new_invitation(self, game_id: str) -> str:
# Since the first player invites, the challenger is always the first player
return '**' + self.get_host(game_id) + ' has invited you to play a game of ' + self.game_name + '.**\n' +\
self.get_formatted_game_object(game_id) + '\n\n' +\
'Type ```accept``` to accept the game invitation\n' +\
'Type ```decline``` to decline the game invitation.'
def confirm_invitation_accepted(self, game_id: str) -> str:
host = self.invites[game_id]['host']
return 'Accepted invitation to play **{}** from @**{}**.'.format(self.game_name, self.get_username_by_email(host))
def confirm_invitation_declined(self, game_id: str) -> str:
host = self.invites[game_id]['host']
return 'Declined invitation to play **{}** from @**{}**.'.format(self.game_name, self.get_username_by_email(host))
def send_message(self, to: str, content: str, is_private: bool, subject: str='') -> None:
self.bot_handler.send_message(dict(
type='private' if is_private else 'stream',
to=to,
content=content,
subject=subject
))
def send_reply(self, original_message: Dict[str, Any], content: str) -> None:
self.bot_handler.send_reply(original_message, content)
def usage(self) -> str:
return '''
Bot that allows users to play another user
or the computer in a game of ''' + self.game_name + '''
To see the entire list of commands, type
@bot-name help
'''
def initialize(self, bot_handler: Any) -> None:
self.bot_handler = bot_handler
self.get_user_cache()
self.email = self.bot_handler.email
self.full_name = self.bot_handler.full_name
def handle_message(self, message: Dict[str, Any], bot_handler: Any) -> None:
try:
self.bot_handler = bot_handler
content = message['content'].strip()
sender = message['sender_email'].lower()
message['sender_email'] = message['sender_email'].lower()
if self.email not in self.user_cache.keys() and self.supports_computer:
self.add_user_to_cache({
'sender_email': self.email,
'sender_full_name': self.full_name
})
if sender == self.email:
return
if sender not in self.user_cache.keys():
self.add_user_to_cache(message)
logging.info('Added {} to user cache'.format(sender))
if content.lower() == 'help' or content == '':
self.send_reply(message, self.help_message())
return
elif content.lower().startswith('start game with '):
self.command_start_game_with(message, sender, content)
elif content.lower().startswith('start game'):
self.command_start_game(message, sender, content)
elif content.lower() == 'accept':
self.command_accept(message, sender, content)
elif content.lower() == 'decline':
self.command_decline(message, sender, content)
elif content.lower() == 'quit':
self.command_quit(message, sender, content)
elif content.lower() == 'register':
self.send_reply(
message, 'Hello @**{}**. Thanks for registering!'.format(message['sender_full_name']))
elif content.lower() == 'leaderboard':
self.command_leaderboard(message, sender, content)
elif content.lower() == 'join':
self.command_join(message, sender, content)
elif self.is_user_in_game(sender) is not '':
self.parse_message(message)
elif self.move_regex.match(content) is not None or content.lower() == 'draw' or content.lower() == 'forfeit':
self.send_reply(
message, 'You are not in a game at the moment. Type `help` for help.')
else:
self.send_reply(message, self.help_message())
except Exception as e:
logging.exception(str(e))
self.bot_handler.send_reply(message, 'Error {}.'.format(e))
def is_user_in_game(self, user_email: str) -> str:
for instance in self.instances.values():
if user_email in instance.players:
return instance.game_id
return ''
def command_start_game_with(self, message: Dict[str, Any], sender: str, content: str) -> None:
if not self.is_user_not_player(sender, message):
self.send_reply(
message, self.already_in_game_message())
return
users = content.replace('start game with ', '').strip().split(', ')
self.create_game_lobby(message, users)
def command_start_game(self, message: Dict[str, Any], sender: str, content: str) -> None:
if message['type'] == 'private':
self.send_reply(
message, 'If you are starting a game in private messages, you must invite players. Type `help` for commands.')
if not self.is_user_not_player(sender, message):
self.send_reply(
message, self.already_in_game_message())
return
self.create_game_lobby(message)
def command_accept(self, message: Dict[str, Any], sender: str, content: str) -> None:
if not self.is_user_not_player(sender, message):
self.send_reply(
message, self.already_in_game_message())
return
game_id = self.set_invite_by_user(sender, True, message)
if game_id is '':
self.send_reply(
message, 'No active invites. Type `help` for commands.')
return
if message['type'] == 'private':
self.send_reply(message, self.confirm_invitation_accepted(game_id))
self.broadcast(
game_id, '@**{}** has accepted the invitation.'.format(self.get_username_by_email(sender)))
self.start_game_if_ready(game_id)
def create_game_lobby(self, message: Dict[str, Any], users: List[str]=[]) -> None:
if self.is_game_in_subject(message['subject'], message['display_recipient']):
self.send_reply(message, 'There is already a game in this stream.')
return
if len(users) > 0:
users = self.verify_users(users, message=message)
if len(users) + 1 < self.min_players:
self.send_reply(
message, 'You must have at least {} players to play.\nGame cancelled.'.format(self.min_players))
return
if len(users) + 1 > self.max_players:
self.send_reply(
message, 'The maximum number of players for this game is {}.'.format(self.max_players))
return
game_id = self.generate_game_id()
stream_subject = '###private###'
if message['type'] == 'stream':
stream_subject = message['subject']
self.invites[game_id] = {'host': message['sender_email'].lower(
), 'subject': stream_subject, 'stream': message['display_recipient']}
if message['type'] == 'private':
self.invites[game_id]['stream'] = 'games'
for user in users:
self.send_invite(game_id, user, message)
if message['type'] == 'stream':
if len(users) > 0:
self.broadcast(game_id, 'If you were invited, and you\'re here, type "@**{}** accept" to accept the invite!'.format(
self.get_bot_username()), include_private=False)
if len(users) + 1 < self.max_players:
self.broadcast(
game_id, '**{}** wants to play **{}**. Type @**{}** join to play them!'.format(
self.get_username_by_email(message['sender_email']),
self.game_name,
self.get_bot_username())
)
if self.email in users:
self.broadcast(game_id, 'Wait... That\'s me!',
include_private=True)
if message['type'] == 'stream':
self.broadcast(
game_id, '@**{}** accept'.format(self.get_bot_username()), include_private=False)
game_id = self.set_invite_by_user(
self.email, True, {'type': 'stream'})
self.start_game_if_ready(game_id)
def command_decline(self, message: Dict[str, Any], sender: str, content: str) -> None:
if not self.is_user_not_player(sender, message):
self.send_reply(
message, self.already_in_game_message())
return
game_id = self.set_invite_by_user(sender, False, message)
if game_id is '':
self.send_reply(
message, 'No active invites. Type `help` for commands.')
return
self.send_reply(message, self.confirm_invitation_declined(game_id))
self.broadcast(
game_id, '@**{}** has declined the invitation.'.format(self.get_username_by_email(sender)))
if len(self.get_players(game_id, parameter='')) < self.min_players:
self.cancel_game(game_id)
def command_quit(self, message: Dict[str, Any], sender: str, content: str) -> None:
game_id = self.get_game_id_by_email(sender)
if game_id is '':
self.send_reply(
message, 'You are not in a game. Type `help` for all commands.')
self.cancel_game(game_id, reason='{} quit.'.format(sender))
def command_join(self, message: Dict[str, Any], sender: str, content: str) -> None:
if not self.is_user_not_player(sender, message):
self.send_reply(
message, self.already_in_game_message())
return
if message['type'] == 'private':
self.send_reply(
message, 'You cannot join games in private messages. Type `help` for all commands.')
return
game_id = self.get_invite_in_subject(
message['subject'], message['display_recipient'])
if game_id is '':
self.send_reply(
message, 'There is not a game in this subject. Type `help` for all commands.')
return
self.join_game(game_id, sender, message)
def command_leaderboard(self, message: Dict[str, Any], sender: str, content: str) -> None:
stats = self.get_sorted_player_statistics()
num = 5 if len(stats) > 5 else len(stats)
top_stats = stats[0:num]
response = '**Most wins**\n\n'
raw_headers = ['games_won', 'games_drawn', 'games_lost', 'total_games']
headers = ['Player'] + \
[key.replace('_', ' ').title() for key in raw_headers]
response += ' | '.join(headers)
response += '\n' + ' | '.join([' --- ' for header in headers])
for player, stat in top_stats:
response += '\n **{}** | '.format(
self.get_username_by_email(player))
values = [str(stat[key]) for key in raw_headers]
response += ' | '.join(values)
self.send_reply(message, response)
return
def get_sorted_player_statistics(self) -> List[Tuple[str, Dict[str, int]]]:
players = []
for user_name, u in self.user_cache.items():
if 'stats' in u.keys():
players.append((user_name, u['stats']))
return sorted(
players,
key=lambda player: (player[1]['games_won'],
player[1]['games_drawn'],
player[1]['total_games']),
reverse=True
)
def send_invite(self, game_id: str, user_email: str, message: Dict[str, Any]={}) -> None:
self.invites[game_id].update({user_email.lower(): 'p'})
self.send_message(user_email, self.alert_new_invitation(game_id), True)
if message != {}:
self.send_reply(message, self.confirm_new_invitation(user_email))
def cancel_game(self, game_id: str, reason: str='') -> None:
if game_id in self.invites.keys():
self.broadcast(game_id, 'Game cancelled.\n' + reason)
del self.invites[game_id]
return
if game_id in self.instances.keys():
self.instances[game_id].broadcast('Game ended.\n' + reason)
del self.instances[game_id]
return
def start_game_if_ready(self, game_id: str) -> None:
players = self.get_players(game_id)
if len(players) == self.max_players:
self.start_game(game_id)
def start_game(self, game_id: str) -> None:
players = self.get_players(game_id)
subject = game_id
stream = self.invites[game_id]['stream']
if self.invites[game_id]['subject'] != '###private###':
subject = self.invites[game_id]['subject']
self.instances[game_id] = GameInstance(
self, False, subject, game_id, players, stream)
self.broadcast(game_id, 'The game has started in #{} {}'.format(
stream, self.instances[game_id].subject) + '\n' + self.get_formatted_game_object(game_id))
del self.invites[game_id]
self.instances[game_id].start()
def get_formatted_game_object(self, game_id: str) -> str:
object = '''> **Game `{}`**
> {}
> {}
> {}/{} players'''.format(game_id, self.get_host(game_id), self.game_name, self.get_number_of_players(game_id), self.max_players)
if game_id in self.instances.keys():
instance = self.instances[game_id]
object += '\n> **[Join Game](/#narrow/stream/{}/topic/{})**'.format(
instance.stream, instance.subject)
return object
def join_game(self, game_id: str, user_email: str, message: Dict[str, Any]={}) -> None:
if len(self.get_players(game_id)) >= self.max_players:
if message != {}:
self.send_reply(message, 'This game is full.')
return
self.invites[game_id].update({user_email: 'a'})
self.broadcast(
game_id, '@**{}** has joined the game'.format(self.get_username_by_email(user_email)))
self.start_game_if_ready(game_id)
def get_players(self, game_id: str, parameter: str='a') -> List[str]:
if game_id in self.invites.keys():
players = [] # type: List[str]
if (self.invites[game_id]['subject'] == '###private###' and 'p' in parameter) or 'p' not in parameter:
players = [self.invites[game_id]['host']]
for player, accepted in self.invites[game_id].items():
if player == 'host' or player == 'subject' or player == 'stream':
continue
if parameter in accepted:
players.append(player)
return players
if game_id in self.instances.keys() and 'p' not in parameter:
players = self.instances[game_id].players
return players
return []
def get_game_info(self, game_id: str) -> Dict[str, Any]:
game_info = {} # type: Dict[str, Any]
if game_id in self.instances.keys():
instance = self.instances[game_id]
game_info = {
'game_id': game_id,
'type': 'instance',
'stream': instance.stream,
'subject': instance.subject,
'players': self.get_players(game_id)
}
if game_id in self.invites.keys():
invite = self.invites[game_id]
game_info = {
'game_id': game_id,
'type': 'invite',
'stream': invite['stream'],
'subject': invite['subject'],
'players': self.get_players(game_id)
}
return game_info
def get_user_by_name(self, name: str) -> Dict[str, Any]:
name = name.strip()
for user in self.user_cache.values():
if 'full_name' in user.keys():
if user['full_name'].lower() == name.lower():
return user
return {}
def get_number_of_players(self, game_id: str) -> int:
num = len(self.get_players(game_id))
return num
def get_host(self, game_id: str) -> str:
return self.get_players(game_id)[0]
def parse_message(self, message: Dict[str, Any]) -> None:
game_id = self.is_user_in_game(message['sender_email'])
game = self.get_game_info(game_id)
if message['type'] == 'private':
self.send_reply(message, 'Join your game using the link below!\n\n{}'.format(
self.get_formatted_game_object(game_id)))
return
if game['subject'] != message['subject'] or game['stream'] != message['display_recipient']:
if game_id not in self.pending_subject_changes:
self.send_reply(message, 'Your current game is not in this subject. \n\
To move subjects, send your message again, otherwise join the game using the link below.\n\n\
{}'.format(self.get_formatted_game_object(game_id)))
self.pending_subject_changes.append(game_id)
return
self.pending_subject_changes.remove(game_id)
self.change_game_subject(
game_id, message['display_recipient'], message['subject'], message)
self.instances[game_id].handle_message(
message['content'], message['sender_email'])
def change_game_subject(
self,
game_id: str,
stream_name: str,
subject_name: str,
message: Dict[str, Any]={}
) -> None:
if self.get_game_instance_by_subject(stream_name, subject_name) is not None:
if message != {}:
self.send_reply(
message, 'There is already a game in this subject.')
return
if game_id in self.instances.keys():
self.instances[game_id].change_subject(stream_name, subject_name)
if game_id in self.invites.keys():
invite = self.invites[game_id]
invite['stream'] = stream_name
invite['subject'] = stream_name
def set_invite_by_user(self, user_email: str, is_accepted: bool, message: Dict[str, Any]) -> str:
user_email = user_email.lower()
for game, users in self.invites.items():
if user_email in users.keys():
if is_accepted:
if message['type'] == 'private':
users[user_email] = 'pa'
else:
users[user_email] = 'a'
else:
users.pop(user_email)
return game
return ''
def add_user_to_cache(self, message: Dict[str, Any]) -> None:
user = {
'email': message['sender_email'].lower(),
'full_name': message['sender_full_name'],
'stats': {
'total_games': 0,
'games_won': 0,
'games_lost': 0,
'games_drawn': 0
}
}
self.user_cache.update({message['sender_email'].lower(): user})
self.put_user_cache()
def put_user_cache(self) -> Dict[str, Any]:
user_cache_str = json.dumps(self.user_cache)
self.bot_handler.storage.put('users', user_cache_str)
return self.user_cache
def get_user_cache(self) -> Dict[str, Any]:
try:
user_cache_str = self.bot_handler.storage.get('users')
except KeyError as e:
return {}
self.user_cache = json.loads(user_cache_str)
return self.user_cache
def verify_users(self, users: List[str], message: Dict[str, Any]={}) -> List[str]:
verified_users = []
failed = False
for u in users:
user = u.strip().lstrip('@**').rstrip('**')
if user == self.get_bot_username() or user == self.email:
self.send_reply(
message, 'You cannot play against the computer in this game.')
if '@' not in user:
user_obj = self.get_user_by_name(user)
if user_obj == {}:
self.send_reply(
message, 'I don\'t know {}. Tell them to say @**{}** register'.format(u, self.get_bot_username()))
failed = True
continue
user = user_obj['email']
if self.is_user_not_player(user, message):
verified_users.append(user)
else:
failed = True
if failed:
return []
else:
return verified_users
def get_game_instance_by_subject(self, subject_name: str, stream_name: str) -> Any:
for instance in self.instances.values():
if instance.subject == subject_name and instance.stream == stream_name:
return instance
return None
def get_invite_in_subject(self, subject_name: str, stream_name: str) -> str:
for key, invite in self.invites.items():
if invite['subject'] == subject_name and invite['stream'] == stream_name:
return key
return ''
def is_game_in_subject(self, subject_name: str, stream_name: str) -> bool:
return self.get_invite_in_subject(subject_name, stream_name) is not '' or \
self.get_game_instance_by_subject(
subject_name, stream_name) is not None
def is_user_not_player(self, user_email: str, message: Dict[str, Any]={}) -> bool:
user = self.get_user_by_email(user_email)
if user == {}:
if message != {}:
self.send_reply(message, 'I don\'t know {}. Tell them to use @**{}** register'.format(
user_email, self.get_bot_username()))
return False
for instance in self.instances.values():
if user_email in instance.players:
return False
for invite in self.invites.values():
for u in invite.keys():
if u == 'host':
if user_email == invite['host']:
return False
if u == user_email and 'a' in invite[u]:
return False
return True
def generate_game_id(self) -> str:
id = ''
valid_characters = 'abcdefghijklmnopqrstuvwxyz0123456789'
for i in range(6):
id += valid_characters[random.randrange(0, len(valid_characters))]
return id
def broadcast(self, game_id: str, content: str, include_private: bool=True) -> bool:
if include_private:
private_recipients = self.get_players(game_id, parameter='p')
if private_recipients is not None:
for user in private_recipients:
self.send_message(user, content, True)
if game_id in self.invites.keys():
if self.invites[game_id]['subject'] != '###private###':
self.send_message(
self.invites[game_id]['stream'], content, False, self.invites[game_id]['subject'])
return True
if game_id in self.instances.keys():
self.send_message(
self.instances[game_id].stream, content, False, self.instances[game_id].subject)
return True
return False
def get_username_by_email(self, user_email: str) -> str:
return self.get_user_by_email(user_email)['full_name']
def get_user_by_email(self, user_email: str) -> Dict[str, Any]:
if user_email in self.user_cache:
return self.user_cache[user_email]
return {}
def get_game_id_by_email(self, user_email: str) -> str:
for instance in self.instances.values():
if user_email in instance.players:
return instance.game_id
for game_id in self.invites.keys():
players = self.get_players(game_id)
if user_email in players:
return game_id
return ''
def get_bot_username(self) -> str:
return self.bot_handler.full_name
class GameInstance(object):
'''
The GameInstance class handles the game logic for a certain game,
and is associated with a certain stream.
It gets player info from GameAdapter
It only runs when the game is being played, not in the invite
or waiting states.
'''
def __init__(self, gameAdapter: GameAdapter, is_private: bool, subject: str, game_id: str, players: List[str], stream: str) -> None:
self.gameAdapter = gameAdapter
self.is_private = is_private
self.subject = subject
self.game_id = game_id
self.players = players
self.stream = stream
self.model = deepcopy(self.gameAdapter.model())
self.board = self.model.current_board
self.turn = random.randrange(0, len(players)) - 1
self.current_draw = {} # type: Dict[str, bool]
self.current_messages = [] # type: List[str]
self.is_changing_subject = False
def start(self) -> None:
self.current_messages.append(self.get_start_message())
self.current_messages.append(self.parse_current_board())
self.next_turn()
def change_subject(self, stream: str, subject: str) -> None:
self.stream = stream
self.subject = subject
self.current_messages.append(self.parse_current_board())
self.broadcast_current_message()
def get_player_text(self) -> str:
player_text = ''
for player in self.players:
player_text += ' @**{}**'.format(
self.gameAdapter.get_user_by_email(player)['full_name'])
return player_text
def get_start_message(self) -> str:
start_message = 'Game `{}` started.\n*Remember to start your message with* @**{}**'.format(
self.game_id, self.gameAdapter.get_bot_username())
if not self.is_private:
player_text = '\n**Players**'
player_text += self.get_player_text()
start_message += player_text
start_message += '\n' + self.gameAdapter.gameMessageHandler.game_start_message()
return start_message
def handle_message(self, content: str, player_email: str) -> None:
if content == 'forfeit':
player_name = self.gameAdapter.get_username_by_email(player_email)
self.broadcast('**{}** forfeited!'.format(player_name))
self.end_game('except:' + player_email)
return
if content == 'draw':
if player_email in self.current_draw.keys():
self.current_draw[player_email] = True
else:
self.current_draw = {p: False for p in self.players}
self.broadcast('**{}** has voted for a draw!\nType `draw` to accept'.format(
self.gameAdapter.get_username_by_email(player_email)))
self.current_draw[player_email] = True
if self.check_draw():
self.end_game('draw')
return
if self.is_turn_of(player_email):
self.handle_current_player_command(content)
else:
self.broadcast('It\'s @**{}**\'s ({}) turn.'.format(
self.gameAdapter.get_username_by_email(
self.players[self.turn]),
self.gameAdapter.gameMessageHandler.get_player_color(self.turn)))
def broadcast(self, content: str) -> None:
self.gameAdapter.broadcast(self.game_id, content)
def check_draw(self) -> bool:
for d in self.current_draw.values():
if not d:
return False
return len(self.current_draw.values()) > 0
def handle_current_player_command(self, content: str) -> None:
re_result = self.gameAdapter.move_regex.match(content)
if re_result is None:
self.broadcast(self.gameAdapter.move_help_message)
return
self.make_move(content, False)
def make_move(self, content: str, is_computer: bool) -> None:
try:
board = self.model.make_move(content, self.turn, is_computer)
except BadMoveException as e:
self.broadcast(e.message)
return
if not is_computer:
self.current_messages.append(self.gameAdapter.gameMessageHandler.alert_move_message(
'**{}**'.format(self.gameAdapter.get_username_by_email(self.players[self.turn])), content))
self.current_messages.append(self.parse_current_board())
game_over = self.model.determine_game_over(self.players)
if game_over:
self.broadcast_current_message()
if game_over == 'current turn':
game_over = self.players[self.turn]
self.end_game(game_over)
return
self.next_turn()
def is_turn_of(self, player_email: str) -> bool:
return self.players[self.turn].lower() == player_email.lower()
def next_turn(self) -> None:
self.turn += 1
if self.turn >= len(self.players):
self.turn = 0
self.current_messages.append('It\'s @**{}**\'s ({}) turn.'.format(
self.gameAdapter.get_username_by_email(self.players[self.turn]),
self.gameAdapter.gameMessageHandler.get_player_color(self.turn)
))
self.broadcast_current_message()
if self.players[self.turn] == self.gameAdapter.email:
self.make_move('', True)
def broadcast_current_message(self) -> None:
content = '\n\n'.join(self.current_messages)
self.broadcast(content)
self.current_messages = []
def parse_current_board(self) -> Any:
return self.gameAdapter.gameMessageHandler.parse_board(self.model.current_board)
def end_game(self, winner: str) -> None:
loser = ''
if winner == 'draw':
self.broadcast('It was a draw!')
elif winner.startswith('except:'):
loser = winner.lstrip('except:')
else:
self.broadcast('{} won! :tada:'.format(winner))
for u in self.players:
values = {'total_games': 1, 'games_won': 0,
'games_lost': 0, 'games_drawn': 0}
if loser == '':
if u == winner:
values.update({'games_won': 1})
elif winner == 'draw':
values.update({'games_drawn': 1})
else:
values.update({'games_lost': 1})
else:
if u == loser:
values.update({'games_lost': 1})
else:
values.update({'games_won': 1})
self.gameAdapter.add_user_statistics(u, values)
if self.gameAdapter.email in self.players:
self.send_win_responses(winner)
self.gameAdapter.cancel_game(self.game_id)
def send_win_responses(self, winner: str) -> None:
if winner == self.gameAdapter.email:
self.broadcast('I won! Well Played!')
elif winner == 'draw':
self.broadcast('It was a draw! Well Played!')
else:
self.broadcast('You won! Nice!')

View file

@ -21,6 +21,8 @@ class StubBotHandler:
def __init__(self):
# type: () -> None
self.storage = SimpleStorage()
self.full_name = 'test-bot'
self.email = 'test-bot@example.com'
self.message_server = SimpleMessageServer()
self.reset_transcript()