#!/usr/bin/env python # -*- coding: utf-8 -*- # # Asana integration for Zulip # # Copyright © 2014 Zulip, Inc. # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. # # The "zulip_asana_mirror" script is run continuously, possibly on a work computer # or preferably on a server. # # When restarted, it will attempt to pick up where it left off. # # python-dateutil is a dependency for this script. import base64 from datetime import datetime, timedelta import json import logging import os import time import urllib2 import sys try: import dateutil.parser import dateutil.tz except ImportError, e: print >>sys.stderr, e print >>sys.stderr, "Please install the python-dateutil package." exit(1) sys.path.insert(0, os.path.dirname(__file__)) import zulip_asana_config as config VERSION = "0.9" if config.ZULIP_API_PATH is not None: sys.path.append(config.ZULIP_API_PATH) import zulip if config.LOG_FILE: logging.basicConfig(filename=config.LOG_FILE, level=logging.WARNING) else: logging.basicConfig(level=logging.INFO) client = zulip.Client(email=config.ZULIP_USER, api_key=config.ZULIP_API_KEY, site=config.ZULIP_SITE, client="ZulipAsana/" + VERSION) def fetch_from_asana(path): """ Request a resource through the Asana API, authenticating using HTTP basic auth. """ auth = base64.encodestring('%s:' % (config.ASANA_API_KEY,)) headers = {"Authorization": "Basic %s" % auth} url = "https://app.asana.com/api/1.0" + path request = urllib2.Request(url, None, headers) result = urllib2.urlopen(request) return json.load(result) def send_zulip(topic, content): """ Send a message to Zulip using the configured stream and bot credentials. """ message = {"type": "stream", "sender": config.ZULIP_USER, "to": config.ZULIP_STREAM_NAME, "subject": topic, "content": content, } return client.send_message(message) def datestring_to_datetime(datestring): """ Given an ISO 8601 datestring, return the corresponding datetime object. """ return dateutil.parser.parse(datestring).replace( tzinfo=dateutil.tz.gettz('Z')) class TaskDict(dict): """ A helper class to turn a dictionary with task information into an object where each of the keys is an attribute for easy access. """ def __getattr__(self, field): return self.get(field) def format_topic(task, projects): """ Return a string that will be the Zulip message topic for this task. """ # Tasks can be associated with multiple projects, but in practice they seem # to mostly be associated with one. project_name = projects[task.projects[0]["id"]] return "%s: %s" % (project_name, task.name) def format_assignee(task, users): """ Return a string describing the task's assignee. """ if task.assignee: assignee_name = users[task.assignee["id"]] assignee_info = "**Assigned to**: %s (%s)" % ( assignee_name, task.assignee_status) else: assignee_info = "**Status**: Unassigned" return assignee_info def format_due_date(task): """ Return a string describing the task's due date. """ if task.due_on: due_date_info = "**Due on**: %s" % (task.due_on,) else: due_date_info = "**Due date**: None" return due_date_info def format_task_creation_event(task, projects, users): """ Format the topic and content for a newly-created task. """ topic = format_topic(task, projects) assignee_info = format_assignee(task, users) due_date_info = format_due_date(task) content = """Task **%s** created: ~~~ quote %s ~~~ %s %s """ % (task.name, task.notes, assignee_info, due_date_info) return topic, content def format_task_completion_event(task, projects, users): """ Format the topic and content for a completed task. """ topic = format_topic(task, projects) assignee_info = format_assignee(task, users) due_date_info = format_due_date(task) content = """Task **%s** completed. :white_check_mark: %s %s """ % (task.name, assignee_info, due_date_info) return topic, content def since(): """ Return a newness threshold for task events to be processed. """ # If we have a record of the last event processed and it is recent, use it, # else process everything from ASANA_INITIAL_HISTORY_HOURS ago. def default_since(): return datetime.utcnow() - timedelta( hours=config.ASANA_INITIAL_HISTORY_HOURS) if os.path.exists(config.RESUME_FILE): try: with open(config.RESUME_FILE, "r") as f: datestring = f.readline().strip() timestamp = float(datestring) max_timestamp_processed = datetime.fromtimestamp(timestamp) logging.info("Reading from resume file: " + datestring) except (ValueError,IOError) as e: logging.warn("Could not open resume file: %s" % ( e.message or e.strerror,)) max_timestamp_processed = default_since() else: logging.info("No resume file, processing an initial history.") max_timestamp_processed = default_since() # Even if we can read a timestamp from RESUME_FILE, if it is old don't use # it. return max(max_timestamp_processed, default_since()) def process_new_events(): """ Forward new Asana task events to Zulip. """ # In task queries, Asana only exposes IDs for projects and users, so we need # to look up the mappings. projects = dict((elt["id"], elt["name"]) for elt in \ fetch_from_asana("/projects")["data"]) users = dict((elt["id"], elt["name"]) for elt in \ fetch_from_asana("/users")["data"]) cutoff = since() max_timestamp_processed = cutoff time_operations = (("created_at", format_task_creation_event), ("completed_at", format_task_completion_event)) task_fields = ["assignee", "assignee_status", "created_at", "completed_at", "modified_at", "due_on", "name", "notes", "projects"] # First, gather all of the tasks that need processing. We'll # process them in order. new_events = [] for project_id in projects: project_url = "/projects/%d/tasks?opt_fields=%s" % ( project_id, ",".join(task_fields)) tasks = fetch_from_asana(project_url)["data"] for task in tasks: task = TaskDict(task) for time_field, operation in time_operations: if task[time_field]: operation_time = datestring_to_datetime(task[time_field]) if operation_time > cutoff: new_events.append((operation_time, time_field, operation, task)) new_events.sort() now = datetime.utcnow() for operation_time, time_field, operation, task in new_events: # Unfortunately, creating an Asana task is not an atomic operation. If # the task was just created, or is missing basic information, it is # probably because the task is still being filled out -- wait until the # next round to process it. if (time_field == "created_at") and \ (now - operation_time < timedelta(seconds=30)): # The task was just created, give the user some time to fill out # more information. return if (time_field == "created_at") and (not task.name) and \ (now - operation_time < timedelta(seconds=60)): # If this new task hasn't had a name for a full 30 seconds, assume # you don't plan on giving it one. return topic, content = operation(task, projects, users) logging.info("Sending Zulip for " + topic) result = send_zulip(topic, content) # If the Zulip wasn't sent successfully, don't update the # max timestamp processed so the task has another change to # be forwarded. Exit, giving temporary issues time to # resolve. if not result.get("result"): logging.warn("Malformed result, exiting:") logging.warn(result) return if result["result"] != "success": logging.warn(result["msg"]) return if operation_time > max_timestamp_processed: max_timestamp_processed = operation_time if max_timestamp_processed > cutoff: max_datestring = max_timestamp_processed.strftime("%s.%f") logging.info("Updating resume file: " + max_datestring) open(config.RESUME_FILE, 'w').write(max_datestring) while True: try: process_new_events() time.sleep(5) except KeyboardInterrupt: logging.info("Shutting down...") logging.info("Set LOG_FILE to log to a file instead of stdout.") break