python-zulip-api/integrations/slack/zulip_slack.py
2017-05-09 10:48:08 -07:00

128 lines
4.3 KiB
Python
Executable file

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
#
# slacker is a dependency for this script.
#
from __future__ import absolute_import
from __future__ import print_function
import sys
import string
import random
from six.moves import range
from typing import List, Dict
import zulip
from slacker import Slacker, Response, Error as SlackError
import zulip_slack_config as config
client = zulip.Client(email=config.ZULIP_USER, api_key=config.ZULIP_API_KEY, site=config.ZULIP_SITE)
class FromSlackImporter(object):
def __init__(self, slack_token, get_archived_channels=True):
# type: (str, bool) -> None
self.slack = Slacker(slack_token)
self.get_archived_channels = get_archived_channels
self._check_slack_token()
def get_slack_users_email(self):
# type: () -> Dict[str, Dict[str, str]]
r = self.slack.users.list()
self._check_if_response_is_successful(r)
results_dict = {}
for user in r.body['members']:
if user['profile'].get('email') and user.get('deleted') is False:
results_dict[user['id']] = {'email': user['profile']['email'], 'name': user['profile']['real_name']}
return results_dict
def get_slack_public_channels_names(self):
# type: () -> List[Dict[str, str]]
r = self.slack.channels.list()
self._check_if_response_is_successful(r)
return [{'name': channel['name'], 'members': channel['members']} for channel in r.body['channels']]
def get_slack_private_channels_names(self):
# type: () -> List[str]
r = self.slack.groups.list()
self._check_if_response_is_successful(r)
return [
channel['name'] for channel in r.body['groups']
if not channel['is_archived'] or self.get_archived_channels
]
def _check_slack_token(self):
# type: () -> None
try:
r = self.slack.api.test()
self._check_if_response_is_successful(r)
except SlackError as e:
print(e)
sys.exit(1)
except Exception as e:
print(e)
sys.exit(1)
def _check_if_response_is_successful(self, response):
# type: (Response) -> None
print(response)
if not response.successful:
print(response.error)
sys.exit(1)
def _generate_random_password(size=10):
# type: (int) -> str
return ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(size))
def get_and_add_users(slack_importer):
# type: (Slacker) -> Dict[str, Dict[str, str]]
users = slack_importer.get_slack_users_email()
added_users = {}
print('######### IMPORTING USERS STARTED #########\n')
for user_id, user in users.items():
r = client.create_user({
'email': user['email'],
'full_name': user['name'],
'short_name': user['name']
})
if not r.get('msg'):
added_users[user_id] = user
print(u"{} -> {}\nCreated\n".format(user['name'], user['email']))
else:
print(u"{} -> {}\n{}\n".format(user['name'], user['email'], r.get('msg')))
print('######### IMPORTING USERS FINISHED #########\n')
return added_users
def create_streams_and_add_subscribers(slack_importer, added_users):
# type: (Slacker, Dict[str, Dict[str, str]]) -> None
channels_list = slack_importer.get_slack_public_channels_names()
print('######### IMPORTING STREAMS STARTED #########\n')
for stream in channels_list:
subscribed_users = [added_users[member]['email'] for member in stream['members'] if member in added_users.keys()]
if subscribed_users:
r = client.add_subscriptions([{"name": stream['name']}], principals=subscribed_users)
if not r.get('msg'):
print(u"{} -> created\n".format(stream['name']))
else:
print(u"{} -> {}\n".format(stream['name'], r.get('msg')))
else:
print(u"{} -> wasn't created\nNo subscribers\n".format(stream['name']))
print('######### IMPORTING STREAMS FINISHED #########\n')
def main():
# type: () -> None
importer = FromSlackImporter(config.SLACK_TOKEN)
added_users = get_and_add_users(importer)
create_streams_and_add_subscribers(importer, added_users)
if __name__ == '__main__':
main()