Milimetric has submitted this change and it was merged.

Change subject: Add reportupdater: a more reliable scheduler
......................................................................


Add reportupdater: a more reliable scheduler

Bug: T89251
Change-Id: I5885cd85499501741b78fbbc95225939dc46b329
---
M generate.py
A reportupdater/README.md
A reportupdater/__init__.py
A reportupdater/executor.py
A reportupdater/reader.py
A reportupdater/report.py
A reportupdater/reportupdater.py
A reportupdater/selector.py
A reportupdater/update_reports.py
A reportupdater/utils.py
A reportupdater/writer.py
A test/executor_test.py
A test/fixtures/config/reportupdater_test1.yaml
A test/fixtures/config/reportupdater_test2.yaml
A test/fixtures/config/reportupdater_test3.yaml
A test/fixtures/output/reader_test.csv
A test/fixtures/output/reader_test_error.csv
A test/fixtures/output/selector_test1.csv
A test/fixtures/output/selector_test2.csv
A test/fixtures/output/writer_test1.csv
A test/fixtures/sql/reader_test.sql
A test/fixtures/sql/reportupdater_test1.sql
A test/fixtures/sql/reportupdater_test2.sql
A test/fixtures/sql/reportupdater_test3.sql
A test/reader_test.py
A test/report_test.py
A test/reportupdater_test.py
A test/selector_test.py
A test/test_utils.py
A test/writer_test.py
M tox.ini
31 files changed, 2,179 insertions(+), 3 deletions(-)

Approvals:
  Milimetric: Looks good to me, approved
  jenkins-bot: Verified



diff --git a/generate.py b/generate.py
index d2f13cd..0e853e6 100644
--- a/generate.py
+++ b/generate.py
@@ -17,6 +17,7 @@
 import json
 
 from traceback import format_exc
+from reportupdater import reportupdater
 
 
 class DataGenerator(object):
@@ -130,15 +131,32 @@
             return {}
 
     def execute(self):
+        # Call reportupdater.
+        # It will be only called if 'reportupdater-reports' section
+        # can be found in the config file root level. Reportupdater
+        # will not interfere in generate.py execution and viceversa.
+        if 'reportupdater-reports' in self.config:
+            reportupdater.run(
+                config=self.config,
+                sql_folder=os.path.abspath(self.folder),
+                output_folder=os.path.abspath(self.config['output']['path'])
+            )
+        # End of reportupdater call.
         history = self.get_history()
         """Generates a CSV report by executing Python code and SQL queries."""
         if self.graph:
             name = self.graph
             graphs = {name: self.config['graphs'][name]}
         else:
-            graphs = self.config['graphs']
+            graphs = self.config['graphs'] or {}
 
+        reportupdater_reports = self.config.get('reportupdater-reports', {})
         for key, value in graphs.iteritems():
+            # Ensure that reports specified to be executed
+            # by reportupdater are not also run by generate.py.
+            if key in reportupdater_reports:
+                print '%s should have been executed by reportupdater, 
skipping.' % key
+                continue
             # title = value['title']
             freq = value['frequency']
             try:
diff --git a/reportupdater/README.md b/reportupdater/README.md
new file mode 100644
index 0000000..bd0f39c
--- /dev/null
+++ b/reportupdater/README.md
@@ -0,0 +1,3 @@
+reportupdater
+=============
+For documentation see: 
https://wikitech.wikimedia.org/wiki/Analytics/Reportupdater
diff --git a/reportupdater/__init__.py b/reportupdater/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/reportupdater/__init__.py
diff --git a/reportupdater/executor.py b/reportupdater/executor.py
new file mode 100644
index 0000000..06e7a4d
--- /dev/null
+++ b/reportupdater/executor.py
@@ -0,0 +1,129 @@
+
+# This module executes the report sql template
+# instantiated with the time values passed by the selector.
+# It handles the connection with the database,
+# formats the results data and stores it inside the report object.
+
+
+import MySQLdb
+import logging
+from datetime import datetime, date
+from selector import Selector
+from collections import defaultdict
+from utils import TIMESTAMP_FORMAT, raise_critical
+
+
+class Executor(object):
+
+
+    def __init__(self, selector, config):
+        if not isinstance(selector, Selector):
+            raise_critical(ValueError, 'Selector is not valid.')
+        if not isinstance(config, dict):
+            raise_critical(ValueError, 'Config is not a dict.')
+        self.selector = selector
+        self.config = config
+
+
+    def run(self):
+        if 'databases' not in self.config:
+            raise_critical(KeyError, 'Databases is not in config.')
+        if not isinstance(self.config['databases'], dict):
+            raise_critical(ValueError, 'Databases is not a dict.')
+        connections = {}
+        for report in self.selector.run():
+            logging.debug('Executing "{report}"...'.format(report=str(report)))
+            try:
+                sql_query = self.instantiate_sql(report)
+                if report.db_key not in connections:
+                    connections[report.db_key] = 
self.create_connection(report.db_key)
+                connection = connections[report.db_key]
+                report.results = self.execute_sql(sql_query, connection, 
report.is_funnel)
+                yield report
+            except Exception, e:
+                message = ('Report "{report_key}" could not be executed '
+                           'because of error: {error}')
+                logging.error(message.format(report_key=report.key, 
error=str(e)))
+
+
+    def instantiate_sql(self, report):
+        if report.is_timeboxed:
+            try:
+                return report.sql_template.format(
+                    from_timestamp=report.start.strftime(TIMESTAMP_FORMAT),
+                    to_timestamp=report.end.strftime(TIMESTAMP_FORMAT),
+                )
+            except KeyError:
+                raise ValueError('SQL template contains unknown placeholders.')
+        else:
+            return report.sql_template
+
+
+    def create_connection(self, db_key):
+        databases = self.config['databases']
+        if db_key not in databases:
+            raise KeyError('DB key is not in config databases.')
+        db_config = databases[db_key]
+        if not isinstance(db_config, dict):
+            raise ValueError('DB config is not a dict.')
+
+        if 'host' not in db_config:
+            raise KeyError('Host is not in DB config.')
+        if 'port' not in db_config:
+            raise KeyError('Port is not in DB config.')
+        if 'creds_file' not in db_config:
+            raise KeyError('Creds file is not in DB config.')
+        if 'db' not in db_config:
+            raise KeyError('DB name is not in DB config.')
+
+        db_host = db_config['host']
+        db_port = db_config['port']
+        db_creds_file = db_config['creds_file']
+        db_name = db_config['db']
+
+        if not isinstance(db_host, str):
+            raise ValueError('Host is not a string.')
+        if not isinstance(db_port, int):
+            raise ValueError('Port is not an integer.')
+        if not isinstance(db_creds_file, str):
+            raise ValueError('Creds file is not a string.')
+        if not isinstance(db_name, str):
+            raise ValueError('DB name is not a string.')
+        try:
+            return MySQLdb.connect(
+                host=db_host,
+                port=db_port,
+                read_default_file=db_creds_file,
+                db=db_name,
+                charset='utf8',
+                use_unicode=True
+            )
+        except Exception, e:
+            raise RuntimeError('MySQLdb can not connect to database (' + 
str(e) + ').')
+
+
+    def execute_sql(self, sql_query, connection, is_funnel=False):
+        cursor = connection.cursor()
+        try:
+            cursor.execute(sql_query)
+            rows = cursor.fetchall()
+            header = [field[0] for field in cursor.description]
+        except Exception, e:
+            raise RuntimeError('MySQLdb can not execute query (' + str(e) + 
').')
+        finally:
+            cursor.close()
+        if is_funnel:
+            data = defaultdict(list)
+        else:
+            data = {}
+        for row in rows:
+            sql_date = row[0]
+            if not isinstance(sql_date, date):
+                raise ValueError('Query results do not have date values in 
first column.')
+            # normalize to datetime
+            row[0] = datetime(sql_date.year, sql_date.month, sql_date.day, 0, 
0, 0, 0)
+            if is_funnel:
+                data[row[0]].append(row)
+            else:
+                data[row[0]] = row
+        return {'header': header, 'data': data}
diff --git a/reportupdater/reader.py b/reportupdater/reader.py
new file mode 100644
index 0000000..5c494ba
--- /dev/null
+++ b/reportupdater/reader.py
@@ -0,0 +1,122 @@
+
+# This module implements the first step in the pipeline.
+# Reads the report information from the config file.
+# For each report section contained in the config,
+# creates a Report object, that will be passed
+# to the rest of the pipeline.
+#
+# This step tries to check all possible input type and format
+# issues to minimize the impact of a possible config error.
+
+
+import os
+import io
+import logging
+from datetime import datetime, date
+from report import Report
+from utils import DATE_FORMAT, raise_critical
+
+
+class Reader(object):
+
+
+    def __init__(self, config):
+        if not isinstance(config, dict):
+            raise_critical(ValueError, 'Config is not a dict.')
+        self.config = config
+
+
+    def run(self):
+        if 'reportupdater-reports' not in self.config:
+            raise_critical(KeyError, 'Reportupdater-reports is not in config.')
+        reports = self.config['reportupdater-reports']
+        if not isinstance(reports, dict):
+            raise_critical(ValueError, 'Reportupdater-reports is not a dict.')
+        for report_key, report_config in reports.iteritems():
+            logging.debug('Reading 
"{report_key}"...'.format(report_key=report_key))
+            try:
+                report = self.create_report(report_key, report_config)
+                yield report
+            except Exception, e:
+                message = ('Report "{report_key}" could not be read from 
config '
+                           'because of error: {error}')
+                logging.error(message.format(report_key=report_key, 
error=str(e)))
+
+
+    def create_report(self, report_key, report_config):
+        if not isinstance(report_key, str):
+            raise TypeError('Report key is not a string.')
+        if not isinstance(report_config, dict):
+            raise TypeError('Report config is not a dict.')
+        report = Report()
+        report.key = report_key
+        report.frequency, report.granularity = 
self.get_frequency_and_granularity(report_config)
+        report.is_timeboxed = self.get_is_timeboxed(report_config)
+        report.is_funnel = self.get_is_funnel(report_config)
+        report.first_date = self.get_first_date(report_config, 
report.is_timeboxed)
+        report.db_key = self.get_db_key()
+        report.sql_template = self.get_sql_template(report_key)
+        return report
+
+
+    def get_frequency_and_granularity(self, report_config):
+        if 'frequency' not in report_config:
+            raise KeyError('Report frequency is not specified.')
+        if report_config['frequency'] == 'hourly':
+            return 'hours', 'days'
+        elif report_config['frequency'] == 'daily':
+            return 'days', 'months'
+        else:
+            raise ValueError('Report frequency is not valid.')
+
+
+    def get_is_timeboxed(self, report_config):
+        return 'timeboxed' in report_config and report_config['timeboxed'] is 
True
+
+
+    def get_is_funnel(self, report_config):
+        return 'funnel' in report_config and report_config['funnel'] is True
+
+
+    def get_first_date(self, report_config, is_timeboxed):
+        if 'starts' in report_config:
+            first_date = report_config['starts']
+            if isinstance(first_date, date):
+                first_date = datetime(first_date.year, first_date.month, 
first_date.day)
+            else:
+                try:
+                    first_date = datetime.strptime(first_date, DATE_FORMAT)
+                except TypeError:
+                    raise TypeError('Report starts is not a string.')
+                except ValueError:
+                    raise ValueError('Report starts does not match date 
format')
+            return first_date
+        elif is_timeboxed:
+            raise ValueError('Timeboxed report does not specify starts.')
+        else:
+            return None
+
+
+    def get_db_key(self):
+        if 'defaults' not in self.config:
+            raise KeyError('Defaults is not in config.')
+        if 'db' not in self.config['defaults']:
+            raise KeyError('DB default is not in defaults config.')
+        db_key = self.config['defaults']['db']
+        if not isinstance(db_key, str):
+            raise ValueError('DB default is not a string.')
+        return self.config['defaults']['db']
+
+
+    def get_sql_template(self, report_key):
+        if 'sql_folder' not in self.config:
+            raise KeyError('SQL folder is not in config.')
+        sql_folder = self.config['sql_folder']
+        if not isinstance(sql_folder, str):
+            raise ValueError('SQL folder is not a string.')
+        sql_template_path = os.path.join(sql_folder, report_key + '.sql')
+        try:
+            with io.open(sql_template_path, encoding='utf-8') as 
sql_template_file:
+                return sql_template_file.read()
+        except IOError, e:
+            raise IOError('Could not read the SQL template (' + str(e) + ').')
diff --git a/reportupdater/report.py b/reportupdater/report.py
new file mode 100644
index 0000000..42b91b3
--- /dev/null
+++ b/reportupdater/report.py
@@ -0,0 +1,77 @@
+
+# This module implements the report object that serves as
+# communication unit between the several pipeline layers.
+# It holds all the information referent to a report,
+# such as granularity, start and end dates and results.
+# It is not intended to hold any logic.
+
+
+import re
+from datetime import datetime
+from utils import DATE_FORMAT
+
+
+class Report(object):
+
+
+    def __init__(self):
+        self.key = None
+        self.frequency = None
+        self.granularity = None
+        self.is_timeboxed = False
+        self.is_funnel = False
+        self.first_date = None
+        self.start = None
+        self.end = None
+        self.db_key = None
+        self.sql_template = None
+        self.results = {'header': [], 'data': {}}
+
+
+    def __str__(self):
+        return (
+            '<Report' +
+            ' key=' + str(self.key) +
+            ' frequency=' + str(self.frequency) +
+            ' granularity=' + str(self.granularity) +
+            ' is_timeboxed=' + str(self.is_timeboxed) +
+            ' is_funnel=' + str(self.is_funnel) +
+            ' first_date=' + self.format_date(self.first_date) +
+            ' start=' + self.format_date(self.start) +
+            ' end=' + self.format_date(self.end) +
+            ' db_key=' + str(self.db_key) +
+            ' sql_template=' + self.format_sql(self.sql_template) +
+            ' results=' + self.format_results(self.results) +
+            '>'
+        )
+
+
+    def format_date(self, to_format):
+        if to_format:
+            if isinstance(to_format, datetime):
+                return to_format.strftime(DATE_FORMAT)
+            else:
+                return 'invalid date'
+        else:
+            return str(None)
+
+
+    def format_results(self, to_format):
+        if not isinstance(to_format, dict):
+            return 'invalid results'
+        header = str(to_format.get('header', 'invalid header'))
+        data = to_format.get('data', None)
+        if isinstance(data, dict):
+            data_lines = str(len(data)) + ' rows'
+        else:
+            data_lines = 'invalid data'
+        return str({'header': header, 'data': data_lines})
+
+
+    def format_sql(self, to_format):
+        if to_format is None:
+            return str(None)
+        sql = re.sub(r'\s+', ' ', to_format).strip()
+        if len(sql) > 100:
+            sql = sql[0:100] + '...'
+        return sql
diff --git a/reportupdater/reportupdater.py b/reportupdater/reportupdater.py
new file mode 100644
index 0000000..3884ede
--- /dev/null
+++ b/reportupdater/reportupdater.py
@@ -0,0 +1,169 @@
+
+# This is the main module of the project.
+#
+# Its 'run' method will execute the whole pipeline:
+#   1. Read the report information from config file
+#   2. Select or triage the reports that have to be executed
+#   3. Execute those reports against the database
+#   4. Write / update the files with the results
+#
+# In addition to that, this module uses a pid file
+# to avoid concurrent execution; blocking instances to run
+# when another instance is already running.
+#
+# Also, it stores and controls the last execution time,
+# used for report scheduling in the select step.
+
+
+import os
+import io
+import errno
+import yaml
+import logging
+from datetime import datetime
+from reader import Reader
+from selector import Selector
+from executor import Executor
+from writer import Writer
+from utils import DATE_AND_TIME_FORMAT
+
+
+PROJECT_ROOT = os.path.join(os.path.dirname(os.path.realpath(__file__)), '..')
+PID_FILE_PATH = os.path.join(PROJECT_ROOT, '.reportupdater.pid')
+
+
+def run(**kwargs):
+    params = get_params(kwargs)
+    configure_logging(params)
+
+    if only_instance_running():
+        logging.info('Starting execution.')
+        write_pid_file()  # create lock to avoid concurrent executions
+
+        current_exec_time = utcnow()
+        last_exec_time = replace_exec_time(current_exec_time, 
params['history_path'])
+
+        if 'config' in params:
+            config = params['config']
+        else:
+            config = load_config(params['config_path'])
+        config['current_exec_time'] = current_exec_time
+        config['last_exec_time'] = last_exec_time
+        config['sql_folder'] = params['sql_folder']
+        config['output_folder'] = params['output_folder']
+
+        reader = Reader(config)
+        selector = Selector(reader, config)
+        executor = Executor(selector, config)
+        writer = Writer(executor, config)
+        writer.run()
+
+        delete_pid_file()  # free lock for other instances to execute
+        logging.info('Execution complete.')
+    else:
+        logging.warning('Another instance is already running. Exiting.')
+
+
+def get_params(passed_params):
+    default_params = {
+        'history_path': os.path.join(PROJECT_ROOT, '.reportupdater.history'),
+        'config_path': os.path.join(PROJECT_ROOT, 'config.yaml'),
+        'sql_folder': os.path.join(PROJECT_ROOT, 'sql'),
+        'output_folder': os.path.join(PROJECT_ROOT, 'output'),
+        'log_level': logging.WARNING
+    }
+    default_params.update(passed_params)
+    return default_params
+
+
+def configure_logging(params):
+    logger = logging.getLogger()
+    if 'log_file' in params:
+        handler = logging.FileHandler(params['log_file'])
+    else:
+        handler = logging.StreamHandler()
+    formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
+    handler.setFormatter(formatter)
+    logger.addHandler(handler)
+    logger.setLevel(params['log_level'])
+
+
+def only_instance_running():
+    if os.path.isfile(PID_FILE_PATH):
+        try:
+            with io.open(PID_FILE_PATH, 'r') as pid_file:
+                pid = int(pid_file.read().strip())
+        except IOError:
+            # Permission error.
+            # Another instance run by another user is still executing.
+            logging.warning('An instance run by another user was found.')
+            return False
+        if pid_exists(pid):
+            # Another instance is still executing.
+            return False
+        else:
+            # Another instance terminated unexpectedly,
+            # leaving the stale pid file there.
+            return True
+    else:
+        return True
+
+
+def pid_exists(pid):
+    try:
+        # Sending signal 0 to a pid will raise an OSError exception
+        # if the pid is not running, and do nothing otherwise.
+        os.kill(pid, 0)
+    except OSError as err:
+        if err.errno == errno.ESRCH:
+            # No such process.
+            return False
+        elif err.errno == errno.EPERM:
+            # Valid process, no permits.
+            return True
+        else:
+            raise
+    else:
+        return True
+
+
+def write_pid_file():
+    logging.info('Writing the pid file.')
+    pid = os.getpid()
+    with io.open(PID_FILE_PATH, 'w') as pid_file:
+        pid_file.write(unicode(pid))
+
+
+def delete_pid_file():
+    logging.info('Deleting the pid file.')
+    try:
+        os.remove(PID_FILE_PATH)
+    except OSError, e:
+        logging.error('Unable to delete the pid file (' + str(e) + ').')
+
+
+def replace_exec_time(current_time, history_path):
+    # Writes the current execution time to the history file.
+    # If the file contains the last execution time, it is returned.
+    if os.path.exists(history_path):
+        with io.open(history_path) as history_file:
+            last_time_str = history_file.read().strip()
+            last_time = datetime.strptime(last_time_str, DATE_AND_TIME_FORMAT)
+    else:
+        last_time = None
+    with io.open(history_path, 'w') as history_file:
+        current_time_str = current_time.strftime(DATE_AND_TIME_FORMAT)
+        history_file.write(unicode(current_time_str))
+    return last_time
+
+
+def load_config(config_path):
+    try:
+        with io.open(config_path, encoding='utf-8') as config_file:
+            return yaml.load(config_file)
+    except IOError, e:
+        raise IOError('Can not read the config file because of: (' + str(e) + 
').')
+
+
+def utcnow():
+    return datetime.utcnow()
diff --git a/reportupdater/selector.py b/reportupdater/selector.py
new file mode 100644
index 0000000..e8eee6a
--- /dev/null
+++ b/reportupdater/selector.py
@@ -0,0 +1,117 @@
+
+# This module is in charge of triaging which reports
+# must be executed depending on:
+#   1. The time that has passed sinnce the last execution
+#   2. If the report data is up to date or not
+#
+# It also divides timeboxed reports in intervals of one time unit.
+# For example, if the report in question has a monthly granularity,
+# divides a 3-month report into 3 1-month reports.
+
+
+import logging
+from copy import deepcopy
+from datetime import datetime
+from dateutil.relativedelta import relativedelta
+from reader import Reader
+from utils import raise_critical, get_previous_results
+
+
+class Selector(object):
+
+
+    def __init__(self, reader, config):
+        if not isinstance(reader, Reader):
+            raise_critical(ValueError, 'Reader is not valid.')
+        if not isinstance(config, dict):
+            raise_critical(ValueError, 'Config is not a dict.')
+        self.reader = reader
+        self.config = config
+
+
+    def run(self):
+        if 'current_exec_time' not in self.config:
+            raise_critical(KeyError, 'Current exec time is not in config.')
+        if 'last_exec_time' not in self.config:
+            raise_critical(KeyError, 'Last exec time is not in config.')
+        now = self.config['current_exec_time']
+        last_exec_time = self.config['last_exec_time']
+        if not isinstance(now, datetime):
+            raise_critical(ValueError, 'Current exec time is not a date.')
+        if last_exec_time and last_exec_time > now:
+            raise_critical(ValueError, 'Last exec time is greater than current 
exec time.')
+
+        for report in self.reader.run():
+            logging.debug('Triaging "{report}"...'.format(report=str(report)))
+            try:
+                if self.is_time_to_execute(last_exec_time, now, 
report.frequency):
+                    if report.is_timeboxed:
+                        for report in self.get_interval_reports(report, now):
+                            yield report
+                    else:
+                        yield report
+            except Exception, e:
+                message = ('Report "{report_key}" could not be triaged for 
execution '
+                           'because of error: {error}')
+                logging.error(message.format(report_key=report.key, 
error=str(e)))
+
+
+    def is_time_to_execute(self, last_exec_time, now, frequency):
+        if last_exec_time:
+            t1 = self.truncate_date(last_exec_time, frequency)
+        else:
+            t1 = None
+        t2 = self.truncate_date(now, frequency)
+        return t1 != t2
+
+
+    def get_interval_reports(self, report, now):
+        if 'output_folder' not in self.config:
+            raise KeyError('Output folder is not in config.')
+        output_folder = self.config['output_folder']
+        if not isinstance(output_folder, str):
+            raise ValueError('Output folder is not a string.')
+
+        first_date = self.truncate_date(report.first_date, report.granularity)
+        current_date = self.truncate_date(now, report.granularity)
+        increment = self.get_increment(report.granularity)
+        previous_results = get_previous_results(report, output_folder)
+        already_done_dates = previous_results['data'].keys()
+
+        for start in self.get_all_start_dates(first_date, current_date, 
increment):
+            if start == current_date or start not in already_done_dates:
+                report_copy = deepcopy(report)
+                report_copy.start = start
+                report_copy.end = start + increment
+                yield report_copy
+
+
+    def truncate_date(self, date, period):
+        if period == 'hours':
+            return date.replace(minute=0, second=0, microsecond=0)
+        if period == 'days':
+            return date.replace(hour=0, minute=0, second=0, microsecond=0)
+        elif period == 'months':
+            return date.replace(day=1, hour=0, minute=0, second=0, 
microsecond=0)
+        else:
+            raise ValueError('Period is not valid.')
+
+
+    def get_increment(self, period):
+        if period == 'days':
+            return relativedelta(days=1)
+        elif period == 'months':
+            return relativedelta(months=1)
+        else:
+            raise ValueError('Period is not valid.')
+
+
+    def get_all_start_dates(self, first_date, current_date, increment):
+        if first_date > current_date:
+            raise ValueError('First date is greater than current date.')
+        if increment.days < 0 or increment.months < 0:
+            raise ValueError('Increment is negative.')
+        current_start = first_date
+        while current_start <= current_date:
+            yield current_start
+            current_start += increment
diff --git a/reportupdater/update_reports.py b/reportupdater/update_reports.py
new file mode 100755
index 0000000..afb89e8
--- /dev/null
+++ b/reportupdater/update_reports.py
@@ -0,0 +1,33 @@
+#!/usr/bin/python
+
+import logging
+import argparse
+import reportupdater
+
+
+LOGGING_LEVELS = {
+    'debug': logging.DEBUG,
+    'info': logging.INFO,
+    'warning': logging.WARNING,
+    'error': logging.ERROR,
+    'critical': logging.CRITICAL
+}
+
+
+def main():
+    parser = argparse.ArgumentParser(description='Write/Update SQL reports 
into CSV files.')
+    parser.add_argument('config_path', help='Yaml configuration file path.')
+    parser.add_argument('sql_folder', help='Folder with *.sql files.')
+    parser.add_argument('output_folder', help='Folder to write the *.csv files 
to.')
+    parser.add_argument('-l', '--log-level', 
help='(debug|info|warning|error|critical)')
+    args = vars(parser.parse_args())
+    if 'log_level' in args:
+        if args['log_level'] in LOGGING_LEVELS:
+            args['log_level'] = LOGGING_LEVELS[args['log_level']]
+        else:
+            del args['log_level']
+    reportupdater.run(**args)
+
+
+if __name__ == '__main__':
+    main()
diff --git a/reportupdater/utils.py b/reportupdater/utils.py
new file mode 100644
index 0000000..9c76348
--- /dev/null
+++ b/reportupdater/utils.py
@@ -0,0 +1,59 @@
+
+# This is a helper file that contains various utils.
+# Date formatters, logging facilities and a result parser.
+
+
+import os
+import io
+import csv
+import logging
+from datetime import datetime
+from collections import defaultdict
+
+
+DATE_FORMAT = '%Y-%m-%d'
+TIMESTAMP_FORMAT = '%Y%m%d%H%M%S'
+DATE_AND_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%f'
+
+
+def raise_critical(error_class, message):
+    logging.critical(message)
+    raise error_class(message)
+
+
+def get_previous_results(report, output_folder):
+    # Reads a report file to get its results
+    # and returns them in the expected dict(date->row) format.
+    previous_results = {'header': [], 'data': {}}
+    output_file_path = os.path.join(output_folder, report.key + '.csv')
+    if os.path.exists(output_file_path):
+        try:
+            with io.open(output_file_path, encoding='utf-8') as output_file:
+                rows = list(csv.reader(output_file))
+        except IOError, e:
+            raise IOError('Could not read the output file (' + str(e) + ').')
+        header = []
+        if report.is_funnel:
+            # If the report is for a funnel visualization,
+            # one same date may contain several lines in the csv.
+            # So, all lines for the same date, are listed in the
+            # same dict entry under the date key.
+            data = defaultdict(list)
+        else:
+            data = {}
+        for row in rows:
+            if not header:
+                header = row  # skip header
+            else:
+                try:
+                    date = datetime.strptime(row[0], DATE_FORMAT)
+                except ValueError:
+                    raise ValueError('Output file date does not match date 
format.')
+                row[0] = date
+                if report.is_funnel:
+                    data[date].append(row)
+                else:
+                    data[date] = row
+        previous_results['header'] = header
+        previous_results['data'] = data
+    return previous_results
diff --git a/reportupdater/writer.py b/reportupdater/writer.py
new file mode 100644
index 0000000..cba2219
--- /dev/null
+++ b/reportupdater/writer.py
@@ -0,0 +1,79 @@
+
+# This module is the last step of the pipeline.
+# It gets the results passed from the executor,
+# and updates the report's corresponding file.
+#
+# In the case of timeboxed reports, it handles the
+# update of previous results consistently.
+
+
+import os
+import io
+import csv
+import logging
+from executor import Executor
+from utils import raise_critical, get_previous_results, DATE_FORMAT
+
+
+class Writer(object):
+
+
+    def __init__(self, executor, config):
+        if not isinstance(executor, Executor):
+            raise_critical(ValueError, 'Executor is not valid.')
+        if not isinstance(config, dict):
+            raise_critical(ValueError, 'Config is not a dict.')
+        self.executor = executor
+        self.config = config
+
+
+    def run(self):
+        if 'output_folder' not in self.config:
+            raise KeyError('Output folder is not in config.')
+        output_folder = self.config['output_folder']
+        if not isinstance(output_folder, str):
+            raise ValueError('Output folder is not a string.')
+
+        for report in self.executor.run():
+            logging.debug('Writing "{report}"...'.format(report=str(report)))
+            previous_results = get_previous_results(report, output_folder)
+            previous_header = previous_results['header']
+            header = report.results['header']
+            if len(previous_header) > 0 and header != previous_header:
+                raise ValueError('Results header does not match previous 
headers.')
+
+            updated_data = previous_results['data']
+            for date, rows in report.results['data'].iteritems():
+                updated_data[date] = rows
+            try:
+                self.write_results(header, updated_data, report, output_folder)
+                logging.info('Report {report_key} has been 
updated.'.format(report_key=report.key))
+            except Exception, e:
+                message = ('Report "{report_key}" could not be written '
+                           'because of error: {error}')
+                logging.error(message.format(report_key=report.key, 
error=str(e)))
+
+
+    def write_results(self, header, data, report, output_folder):
+        dates = sorted(data.keys())
+        rows = [data[date] for date in dates]
+        if report.is_funnel:
+            rows = [row for sublist in rows for row in sublist]  # flatten
+        output_path = os.path.join(self.config['output_folder'], report.key + 
'.csv')
+        temp_output_path = output_path + '.tmp'
+
+        try:
+            # wb mode needed to avoid unicode conflict between io and csv
+            temp_output_file = io.open(temp_output_path, 'wb')
+        except Exception, e:
+            raise RuntimeError('Could not open the temporary output file (' + 
str(e) + ').')
+        csv_writer = csv.writer(temp_output_file)
+        csv_writer.writerow(header)
+        for row in rows:
+            row[0] = row[0].strftime(DATE_FORMAT)
+            csv_writer.writerow(row)
+        temp_output_file.close()
+        try:
+            os.rename(temp_output_path, output_path)
+        except Exception, e:
+            raise RuntimeError('Could not rename the output file (' + str(e) + 
').')
diff --git a/test/executor_test.py b/test/executor_test.py
new file mode 100644
index 0000000..492ed50
--- /dev/null
+++ b/test/executor_test.py
@@ -0,0 +1,239 @@
+
+from reportupdater.executor import Executor
+from reportupdater.selector import Selector
+from reportupdater.reader import Reader
+from reportupdater.report import Report
+from reportupdater.utils import TIMESTAMP_FORMAT
+from test_utils import ConnectionMock
+from unittest import TestCase
+from mock import MagicMock
+from datetime import datetime, date
+import MySQLdb
+
+
+class ExecutorTest(TestCase):
+
+
+    def setUp(self):
+        self.db_key = 'executor_test'
+        self.db_config = {
+            'host': 'some.host',
+            'port': 12345,
+            'creds_file': '/some/creds/file',
+            'db': 'database'
+        }
+        self.config = {
+            'databases': {
+                self.db_key: self.db_config
+            }
+        }
+        reader = Reader(self.config)
+        selector = Selector(reader, self.config)
+        self.executor = Executor(selector, self.config)
+
+        self.report = Report()
+        self.report.is_timeboxed = True
+        self.report.start = datetime(2015, 1, 1)
+        self.report.end = datetime(2015, 1, 2)
+        self.report.db_key = self.db_key
+        self.report.sql_template = ('SELECT date, value FROM table '
+                                    'WHERE date >= {from_timestamp} '
+                                    'AND date < {to_timestamp};')
+
+
+    def 
test_instantiate_sql_when_report_is_timeboxed_and_format_raises_error(self):
+        self.report.sql_template = 'SOME sql WITH AN {unknown} placeholder;'
+        with self.assertRaises(ValueError):
+            self.executor.instantiate_sql(self.report)
+
+
+    def test_instantiate_sql_when_report_is_timeboxed(self):
+        result = self.executor.instantiate_sql(self.report)
+        expected = self.report.sql_template.format(
+            from_timestamp=self.report.start.strftime(TIMESTAMP_FORMAT),
+            to_timestamp=self.report.end.strftime(TIMESTAMP_FORMAT)
+        )
+        self.assertEqual(result, expected)
+
+
+    def test_instantiate_sql_when_report_is_not_timeboxed(self):
+        self.report.is_timeboxed = False
+        self.report.sql_template = 'SOME sql CODE;'
+        sql_query = self.executor.instantiate_sql(self.report)
+        self.assertEqual(sql_query, self.report.sql_template)
+
+
+    def test_create_connection_when_db_key_is_not_in_db_config(self):
+        del self.config['databases'][self.db_key]
+        with self.assertRaises(KeyError):
+            self.executor.create_connection(self.db_key)
+
+
+    def test_create_connection_when_db_config_is_not_a_dict(self):
+        self.config['databases'][self.db_key] = 'not a dict'
+        with self.assertRaises(ValueError):
+            self.executor.create_connection(self.db_key)
+
+
+    def test_create_connection_when_host_is_not_in_config(self):
+        del self.db_config['host']
+        with self.assertRaises(KeyError):
+            self.executor.create_connection(self.db_key)
+
+
+    def test_create_connection_when_port_is_not_in_config(self):
+        del self.db_config['port']
+        with self.assertRaises(KeyError):
+            self.executor.create_connection(self.db_key)
+
+
+    def test_create_connection_when_creds_file_is_not_in_config(self):
+        del self.db_config['creds_file']
+        with self.assertRaises(KeyError):
+            self.executor.create_connection(self.db_key)
+
+
+    def test_create_connection_when_db_is_not_in_config(self):
+        del self.db_config['db']
+        with self.assertRaises(KeyError):
+            self.executor.create_connection(self.db_key)
+
+
+    def test_create_connection_when_host_is_not_a_string(self):
+        self.db_config['host'] = ('not', 'a', 'string')
+        with self.assertRaises(ValueError):
+            self.executor.create_connection(self.db_key)
+
+
+    def test_create_connection_when_port_is_not_an_integer(self):
+        self.db_config['port'] = ('not', 'an', 'integer')
+        with self.assertRaises(ValueError):
+            self.executor.create_connection(self.db_key)
+
+
+    def test_create_connection_when_creds_file_is_not_a_string(self):
+        self.db_config['creds_file'] = ('not', 'a', 'string')
+        with self.assertRaises(ValueError):
+            self.executor.create_connection(self.db_key)
+
+
+    def test_create_connection_when_db_is_not_a_string(self):
+        self.db_config['db'] = ('not', 'a', 'string')
+        with self.assertRaises(ValueError):
+            self.executor.create_connection(self.db_key)
+
+
+    def test_create_connection_when_mysqldb_connect_raises_error(self):
+        mysqldb_connect_stash = MySQLdb.connect
+        MySQLdb.connect = MagicMock(side_effect=Exception())
+        with self.assertRaises(RuntimeError):
+            self.executor.create_connection(self.db_key)
+        MySQLdb.connect = mysqldb_connect_stash
+
+
+    def test_create_connection(self):
+        mysqldb_connect_stash = MySQLdb.connect
+        MySQLdb.connect = MagicMock(return_value='connection')
+        connection = self.executor.create_connection(self.db_key)
+        self.assertEqual(connection, 'connection')
+        MySQLdb.connect = mysqldb_connect_stash
+
+
+    def test_execute_sql_when_mysqldb_execution_raises_error(self):
+        def execute_callback(sql_query):
+            raise Exception()
+        connection = ConnectionMock(execute_callback, None, [])
+        with self.assertRaises(RuntimeError):
+            self.executor.execute_sql('SOME sql;', connection)
+
+
+    def test_execute_sql_when_first_column_is_not_a_date(self):
+        def fetchall_callback():
+            return [
+                [date(2015, 1, 1), '1'],
+                ['bad formated date', '2']
+            ]
+        connection = ConnectionMock(None, fetchall_callback, [])
+        with self.assertRaises(ValueError):
+            self.executor.execute_sql('SOME sql;', connection)
+
+
+    def test_execute_sql_with_funnel_data(self):
+        def fetchall_callback():
+            return [
+                [date(2015, 1, 1), '1'],
+                [date(2015, 1, 1), '2'],
+                [date(2015, 1, 1), '3'],
+                [date(2015, 1, 2), '4'],
+                [date(2015, 1, 2), '5']
+            ]
+        connection = ConnectionMock(None, fetchall_callback, [])
+        result = self.executor.execute_sql('SOME sql;', connection, 
is_funnel=True)
+        expected = {
+            'header': [],
+            'data': {
+                datetime(2015, 1, 1): [
+                    [datetime(2015, 1, 1), '1'],
+                    [datetime(2015, 1, 1), '2'],
+                    [datetime(2015, 1, 1), '3']
+                ],
+                datetime(2015, 1, 2): [
+                    [datetime(2015, 1, 2), '4'],
+                    [datetime(2015, 1, 2), '5']
+                ]
+            }
+        }
+        self.assertEqual(result, expected)
+
+
+    def test_execute_sql(self):
+        def fetchall_callback():
+            return [
+                [date(2015, 1, 1), '1'],
+                [date(2015, 1, 2), '2']
+            ]
+        connection = ConnectionMock(None, fetchall_callback, [])
+        result = self.executor.execute_sql('SOME sql;', connection)
+        expected = {
+            'header': [],
+            'data': {
+                datetime(2015, 1, 1): [datetime(2015, 1, 1), '1'],
+                datetime(2015, 1, 2): [datetime(2015, 1, 2), '2'],
+            }
+        }
+        self.assertEqual(result, expected)
+
+
+    def test_run_when_databases_is_not_in_config(self):
+        del self.config['databases']
+        with self.assertRaises(KeyError):
+            list(self.executor.run())
+
+
+    def test_run_when_config_databases_is_not_a_dict(self):
+        self.config['databases'] = 'not a dict'
+        with self.assertRaises(ValueError):
+            list(self.executor.run())
+
+
+    def test_run_when_helper_method_raises_error(self):
+        selected = [self.report]
+        self.executor.selector.run = MagicMock(return_value=selected)
+        self.executor.instantiate_sql = MagicMock(side_effect=Exception())
+        executed = list(self.executor.run())
+        self.assertEqual(len(executed), 0)
+
+
+    def test_run(self):
+        selected = [self.report]
+        self.executor.selector.run = MagicMock(return_value=selected)
+        self.executor.create_connection = MagicMock(return_value='connection')
+        results = {
+            'header': ['some', 'sql', 'header'],
+            'data': {datetime(2015, 1, 1): [date(2015, 1, 1), 'some', 'value']}
+        }
+        self.executor.execute_sql = MagicMock(return_value=results)
+        executed = list(self.executor.run())
+        self.assertEqual(len(executed), 1)
+        report = executed[0]
+        self.assertEqual(report.results, results)
diff --git a/test/fixtures/config/reportupdater_test1.yaml 
b/test/fixtures/config/reportupdater_test1.yaml
new file mode 100644
index 0000000..7442fa5
--- /dev/null
+++ b/test/fixtures/config/reportupdater_test1.yaml
@@ -0,0 +1,13 @@
+databases:
+    reportupdater_db:
+        host: "some.db.host"
+        port: 1234
+        creds_file: /creds/.my.cnf
+        db: some_db_name
+defaults:
+    db: reportupdater_db
+reportupdater-reports:
+    reportupdater_test1:
+        frequency: hourly
+        timeboxed: true
+        starts: 2015-01-01
diff --git a/test/fixtures/config/reportupdater_test2.yaml 
b/test/fixtures/config/reportupdater_test2.yaml
new file mode 100644
index 0000000..3154dad
--- /dev/null
+++ b/test/fixtures/config/reportupdater_test2.yaml
@@ -0,0 +1,13 @@
+databases:
+    reportupdater_db:
+        host: "some.db.host"
+        port: 1234
+        creds_file: /creds/.my.cnf
+        db: some_db_name
+defaults:
+    db: reportupdater_db
+reportupdater-reports:
+    reportupdater_test2:
+        frequency: daily
+        timeboxed: true
+        starts: 2015-01-01
diff --git a/test/fixtures/config/reportupdater_test3.yaml 
b/test/fixtures/config/reportupdater_test3.yaml
new file mode 100644
index 0000000..43247d3
--- /dev/null
+++ b/test/fixtures/config/reportupdater_test3.yaml
@@ -0,0 +1,14 @@
+databases:
+    reportupdater_db:
+        host: "some.db.host"
+        port: 1234
+        creds_file: /creds/.my.cnf
+        db: some_db_name
+defaults:
+    db: reportupdater_db
+reportupdater-reports:
+    reportupdater_test3:
+        frequency: hourly
+        timeboxed: true
+        funnel: true
+        starts: 2015-01-01
diff --git a/test/fixtures/output/reader_test.csv 
b/test/fixtures/output/reader_test.csv
new file mode 100644
index 0000000..8a362e4
--- /dev/null
+++ b/test/fixtures/output/reader_test.csv
@@ -0,0 +1,4 @@
+date,value
+2015-01-01,1
+2015-01-02,2
+2015-01-03,3
diff --git a/test/fixtures/output/reader_test_error.csv 
b/test/fixtures/output/reader_test_error.csv
new file mode 100644
index 0000000..b15c649
--- /dev/null
+++ b/test/fixtures/output/reader_test_error.csv
@@ -0,0 +1,4 @@
+date,value
+2015-01-01,1
+badFormatedDate,2
+2015-01-03,3
diff --git a/test/fixtures/output/selector_test1.csv 
b/test/fixtures/output/selector_test1.csv
new file mode 100644
index 0000000..2f0e4f3
--- /dev/null
+++ b/test/fixtures/output/selector_test1.csv
@@ -0,0 +1,2 @@
+date,value
+2015-01-01,a
diff --git a/test/fixtures/output/selector_test2.csv 
b/test/fixtures/output/selector_test2.csv
new file mode 100644
index 0000000..d1828c0
--- /dev/null
+++ b/test/fixtures/output/selector_test2.csv
@@ -0,0 +1,3 @@
+date,value
+2015-01-01,a
+2015-01-02,b
diff --git a/test/fixtures/output/writer_test1.csv 
b/test/fixtures/output/writer_test1.csv
new file mode 100644
index 0000000..f0e10cc
--- /dev/null
+++ b/test/fixtures/output/writer_test1.csv
@@ -0,0 +1,2 @@
+date,val1,val2,val3
+2015-01-01,1,2,3
diff --git a/test/fixtures/sql/reader_test.sql 
b/test/fixtures/sql/reader_test.sql
new file mode 100644
index 0000000..e715b7e
--- /dev/null
+++ b/test/fixtures/sql/reader_test.sql
@@ -0,0 +1,5 @@
+SELECT date, value
+FROM table
+WHERE date >= {from_timestamp}
+AND date < {to_timestamp}
+;
diff --git a/test/fixtures/sql/reportupdater_test1.sql 
b/test/fixtures/sql/reportupdater_test1.sql
new file mode 100644
index 0000000..e715b7e
--- /dev/null
+++ b/test/fixtures/sql/reportupdater_test1.sql
@@ -0,0 +1,5 @@
+SELECT date, value
+FROM table
+WHERE date >= {from_timestamp}
+AND date < {to_timestamp}
+;
diff --git a/test/fixtures/sql/reportupdater_test2.sql 
b/test/fixtures/sql/reportupdater_test2.sql
new file mode 100644
index 0000000..e715b7e
--- /dev/null
+++ b/test/fixtures/sql/reportupdater_test2.sql
@@ -0,0 +1,5 @@
+SELECT date, value
+FROM table
+WHERE date >= {from_timestamp}
+AND date < {to_timestamp}
+;
diff --git a/test/fixtures/sql/reportupdater_test3.sql 
b/test/fixtures/sql/reportupdater_test3.sql
new file mode 100644
index 0000000..e715b7e
--- /dev/null
+++ b/test/fixtures/sql/reportupdater_test3.sql
@@ -0,0 +1,5 @@
+SELECT date, value
+FROM table
+WHERE date >= {from_timestamp}
+AND date < {to_timestamp}
+;
diff --git a/test/reader_test.py b/test/reader_test.py
new file mode 100644
index 0000000..c71ad0c
--- /dev/null
+++ b/test/reader_test.py
@@ -0,0 +1,259 @@
+
+import os
+import io
+from reportupdater.reader import Reader
+from reportupdater.utils import DATE_FORMAT
+from unittest import TestCase
+from mock import MagicMock
+from datetime import datetime
+
+
+class ReaderTest(TestCase):
+
+
+    def setUp(self):
+        self.report_key = 'reader_test'
+        self.report_config = {
+            'starts': '2015-01-01',
+            'timeboxed': True,
+            'frequency': 'hourly'
+        }
+        self.config = {
+            'sql_folder': 'test/fixtures/sql',
+            'output_folder': 'test/fixtures/output',
+            'reportupdater-reports': {
+                self.report_key: self.report_config
+            },
+            'defaults': {
+                'db': 'db_key'
+            }
+        }
+        self.reader = Reader(self.config)
+
+
+    def 
test_get_frequency_and_granularity_when_report_frequency_is_not_in_config(self):
+        report_config = {}
+        with self.assertRaises(KeyError):
+            self.reader.get_frequency_and_granularity(report_config)
+
+
+    def 
test_get_frequency_and_granularity_when_report_frequency_is_not_hourly_or_daily(self):
+        report_config = {'frequency': 'wrongly'}
+        with self.assertRaises(ValueError):
+            self.reader.get_frequency_and_granularity(report_config)
+
+
+    def test_get_frequency(self):
+        corresponding = {
+            'hourly': ('hours', 'days'),
+            'daily': ('days', 'months')
+        }
+        for frequency, expected in corresponding.iteritems():
+            report_config = {'frequency': frequency}
+            result = self.reader.get_frequency_and_granularity(report_config)
+            self.assertEqual(result, expected)
+
+
+    def test_get_is_timeboxed_when_report_timeboxed_is_not_in_config(self):
+        report_config = {}
+        is_timeboxed = self.reader.get_is_timeboxed(report_config)
+        self.assertFalse(is_timeboxed)
+
+
+    def test_get_is_timeboxed_when_report_timeboxed_is_not_true(self):
+        for value in [False, None, 0]:
+            report_config = {'timeboxed': value}
+            is_timeboxed = self.reader.get_is_timeboxed(report_config)
+            self.assertFalse(is_timeboxed)
+
+
+    def test_get_is_timeboxed_when_report_timeboxed_is_true(self):
+        report_config = {'timeboxed': True}
+        is_timeboxed = self.reader.get_is_timeboxed(report_config)
+        self.assertTrue(is_timeboxed)
+
+
+    def test_get_is_funnel_when_report_funnel_is_not_in_config(self):
+        report_config = {}
+        is_funnel = self.reader.get_is_funnel(report_config)
+        self.assertFalse(is_funnel)
+
+
+    def test_get_is_funnel_when_report_funnel_is_not_true(self):
+        for value in [False, None, 0]:
+            report_config = {'funnel': value}
+            is_funnel = self.reader.get_is_funnel(report_config)
+            self.assertFalse(is_funnel)
+
+
+    def test_get_is_funnel_when_report_funnel_is_true(self):
+        report_config = {'funnel': True}
+        is_funnel = self.reader.get_is_funnel(report_config)
+        self.assertTrue(is_funnel)
+
+
+    def test_get_first_date_when_report_starts_is_not_a_string(self):
+        report_config = {'starts': ('not', 'a', 'string')}
+        is_timeboxed = True
+        with self.assertRaises(TypeError):
+            self.reader.get_first_date(report_config, is_timeboxed)
+
+
+    def 
test_get_first_date_when_report_starts_does_not_match_date_format(self):
+        report_config = {'starts': 'no match'}
+        is_timeboxed = True
+        with self.assertRaises(ValueError):
+            self.reader.get_first_date(report_config, is_timeboxed)
+
+
+    def 
test_get_first_date_when_report_starts_is_not_in_timeboxed_config(self):
+        report_config = {}
+        is_timeboxed = True
+        with self.assertRaises(ValueError):
+            self.reader.get_first_date(report_config, is_timeboxed)
+
+
+    def test_get_first_date_when_report_starts_is_not_in_config(self):
+        report_config = {}
+        is_timeboxed = False
+        first_date = self.reader.get_first_date(report_config, is_timeboxed)
+        self.assertEqual(first_date, None)
+
+
+    def test_get_first_date(self):
+        date_str = '2015-01-01'
+        report_config = {'starts': date_str}
+        is_timeboxed = True
+        result = self.reader.get_first_date(report_config, is_timeboxed)
+        expected = datetime.strptime(date_str, DATE_FORMAT)
+        self.assertEqual(result, expected)
+
+
+    def test_get_db_key_when_defaults_is_not_in_config(self):
+        reader = Reader({})
+        with self.assertRaises(KeyError):
+            reader.get_db_key()
+
+
+    def test_get_db_key_when_defaults_db_is_not_in_config(self):
+        config = {'defaults': {}}
+        reader = Reader(config)
+        with self.assertRaises(KeyError):
+            reader.get_db_key()
+
+
+    def test_get_db_key_when_defaults_db_is_not_a_string(self):
+        config = {
+            'defaults': {
+                'db': None
+            }
+        }
+        reader = Reader(config)
+        with self.assertRaises(ValueError):
+            reader.get_db_key()
+
+
+    def test_get_db_key(self):
+        result = self.reader.get_db_key()
+        expected = self.config['defaults']['db']
+        self.assertEqual(result, expected)
+
+
+    def test_get_sql_template_when_sql_folder_is_not_in_config(self):
+        reader = Reader({})
+        with self.assertRaises(KeyError):
+            reader.get_sql_template('reader_test')
+
+
+    def test_get_sql_template_when_sql_folder_is_not_a_string(self):
+        config = {'sql_folder': ('not', 'a', 'string')}
+        reader = Reader(config)
+        with self.assertRaises(ValueError):
+            reader.get_sql_template('reader_test')
+
+
+    def test_get_sql_template_when_sql_folder_does_not_exist(self):
+        config = {'sql_folder': 'nonexistent'}
+        reader = Reader(config)
+        with self.assertRaises(IOError):
+            reader.get_sql_template('reader_test')
+
+
+    def test_get_sql_template_when_sql_file_does_not_exist(self):
+        with self.assertRaises(IOError):
+            self.reader.get_sql_template('wrong_report_key')
+
+
+    def test_get_sql_template(self):
+        result = self.reader.get_sql_template('reader_test')
+        sql_folder = self.config['sql_folder']
+        report_key = 'reader_test'
+        sql_template_path = os.path.join(sql_folder, report_key + '.sql')
+        with io.open(sql_template_path, encoding='utf-8') as sql_template_file:
+            expected = sql_template_file.read()
+        self.assertEqual(result, expected)
+
+
+    def test_create_report_when_report_key_is_not_a_string(self):
+        report_key = ('not', 'a', 'string')
+        with self.assertRaises(TypeError):
+            self.reader.create_report(report_key, self.report_config)
+
+
+    def test_create_report_when_report_config_is_not_a_dict(self):
+        report_config = None
+        with self.assertRaises(TypeError):
+            self.reader.create_report(self.report_key, report_config)
+
+
+    def test_create_report_when_helper_method_raises_error(self):
+        self.reader.get_first_date = MagicMock(side_effect=Exception())
+        with self.assertRaises(Exception):
+            self.reader.create_report(self.report_key, self.report_config)
+
+
+    def test_create_report(self):
+        self.reader.get_first_date = MagicMock(return_value='first_date')
+        self.reader.get_frequency_and_granularity = MagicMock(
+            return_value=('frequency', 'granularity'))
+        self.reader.get_is_timeboxed = MagicMock(return_value='is_timeboxed')
+        self.reader.get_is_funnel = MagicMock(return_value='is_funnel')
+        self.reader.get_db_key = MagicMock(return_value='db_key')
+        self.reader.get_sql_template = MagicMock(return_value='sql_template')
+        report = self.reader.create_report(self.report_key, self.report_config)
+        self.assertEqual(report.key, self.report_key)
+        self.assertEqual(report.first_date, 'first_date')
+        self.assertEqual(report.frequency, 'frequency')
+        self.assertEqual(report.granularity, 'granularity')
+        self.assertEqual(report.is_timeboxed, 'is_timeboxed')
+        self.assertEqual(report.is_funnel, 'is_funnel')
+        self.assertEqual(report.db_key, 'db_key')
+        self.assertEqual(report.sql_template, 'sql_template')
+        self.assertEqual(report.results, {'header': [], 'data': {}})
+        self.assertEqual(report.start, None)
+        self.assertEqual(report.end, None)
+
+
+    def test_run_when_reports_is_not_in_config(self):
+        reader = Reader({})
+        with self.assertRaises(KeyError):
+            list(reader.run())
+
+
+    def test_run_when_reports_is_not_a_dict(self):
+        config = {'reportupdater-reports': ('not', 'a', 'dict')}
+        reader = Reader(config)
+        with self.assertRaises(ValueError):
+            list(reader.run())
+
+
+    def test_run_when_create_report_raises_error(self):
+        self.reader.create_report = MagicMock(side_effect=Exception())
+        for report in self.reader.run():
+            self.assertTrue(False)
+
+
+    def test_run(self):
+        self.reader.create_report = MagicMock(return_value='report')
+        for report in self.reader.run():
+            self.assertEqual(report, 'report')
diff --git a/test/report_test.py b/test/report_test.py
new file mode 100644
index 0000000..643b3ac
--- /dev/null
+++ b/test/report_test.py
@@ -0,0 +1,79 @@
+
+from reportupdater.report import Report
+from unittest import TestCase
+from datetime import datetime
+
+
+class ReportTest(TestCase):
+
+
+    def setUp(self):
+        self.report = Report()
+        self.report.key = 'report_test'
+        self.report.frequency = 'hours'
+        self.report.granularity = 'days'
+        self.report.is_timeboxed = True
+        self.report.is_funnel = True
+        self.report.first_date = datetime(2015, 1, 1)
+        self.report.start = datetime(2015, 1, 2)
+        self.report.end = datetime(2015, 1, 3)
+        self.report.db_key = 'report_test'
+        self.report.sql_template = ('SELECT date, value FROM table '
+                                    'WHERE date >= {from_timestamp} '
+                                    'AND date < {to_timestamp}')
+        self.report.results = {
+            'header': ['date', 'value'],
+            'data': {
+                datetime(2015, 1, 1): ['2015-01-01', '100'],
+                datetime(2015, 1, 2): ['2015-01-02', '200']
+            }
+        }
+
+
+    def test_str_does_not_raise_error_when_first_date_is_not_expected(self):
+        self.report.first_date = None
+        str(self.report)
+        self.report.first_date = 'string instead of datetime'
+        str(self.report)
+        self.report.first_date = {}
+        str(self.report)
+
+
+    def test_str_does_not_raise_error_when_start_is_not_expected(self):
+        self.report.start = None
+        str(self.report)
+        self.report.start = 'string instead of datetime'
+        str(self.report)
+        self.report.start = {}
+        str(self.report)
+
+
+    def test_str_does_not_raise_error_when_end_is_not_expected(self):
+        self.report.end = None
+        str(self.report)
+        self.report.end = 'string instead of datetime'
+        str(self.report)
+        self.report.end = {}
+        str(self.report)
+
+
+    def test_str_does_not_raise_error_when_results_is_not_expected(self):
+        self.report.results = None
+        str(self.report)
+        self.report.results = 'string instead of dict'
+        str(self.report)
+        self.report.results = {
+            'header': []
+            # missing data entry
+        }
+        str(self.report)
+        self.report.results = {
+            # missing header entry
+            'data': {}
+        }
+        str(self.report)
+        self.report.results = {
+            'header': None,
+            'data': None
+        }
+        str(self.report)
diff --git a/test/reportupdater_test.py b/test/reportupdater_test.py
new file mode 100644
index 0000000..cc9e2c6
--- /dev/null
+++ b/test/reportupdater_test.py
@@ -0,0 +1,262 @@
+
+import os
+import io
+import time
+import MySQLdb
+from reportupdater import reportupdater
+from reportupdater.utils import DATE_AND_TIME_FORMAT, DATE_FORMAT
+from test_utils import ConnectionMock
+from unittest import TestCase
+from mock import MagicMock
+from datetime import datetime, date
+from dateutil.relativedelta import relativedelta
+from threading import Thread
+
+
+class ReportUpdaterTest(TestCase):
+
+
+    def setUp(self):
+        self.mysqldb_connect_stash = MySQLdb.connect
+        self.utcnow_stash = reportupdater.utcnow
+        self.paths_to_clean = [reportupdater.PID_FILE_PATH]
+        self.config_folder = 'test/fixtures/config'
+        self.sql_folder = 'test/fixtures/sql'
+        self.output_folder = 'test/fixtures/output'
+        self.history_path = 'test/fixtures/reportupdater_test.history'
+
+
+    def tearDown(self):
+        MySQLdb.connect = self.mysqldb_connect_stash
+        reportupdater.utcnow = self.utcnow_stash
+        for path in self.paths_to_clean:
+            try:
+                os.remove(path)
+            except:
+                pass
+
+
+    def 
test_when_current_exec_time_and_last_exec_time_are_within_the_same_hour(self):
+        last_exec_time = datetime(2015, 1, 2, 3, 4, 5)
+        self.write_time_to_history(last_exec_time)
+        reportupdater.utcnow = MagicMock(return_value=datetime(2015, 1, 2, 3, 
40, 50))
+        reportupdater.run(
+            config_path=os.path.join(self.config_folder, 
'reportupdater_test1.yaml'),
+            sql_folder=self.sql_folder,
+            output_folder=self.output_folder,
+            history_path=self.history_path
+        )
+        # The report should not be computed because it has already been 
computed
+        # within this hour. So the output file should not exist.
+        output_path = os.path.join(self.output_folder, 
'reportupdater_test1.csv')
+        self.assertFalse(os.path.exists(output_path))
+
+
+    def 
test_when_current_exec_time_and_last_exec_time_are_within_the_same_day(self):
+        last_exec_time = datetime(2015, 1, 2, 3, 4, 5)
+        self.write_time_to_history(last_exec_time)
+        reportupdater.utcnow = MagicMock(return_value=datetime(2015, 1, 2, 13, 
14, 15))
+        reportupdater.run(
+            config_path=os.path.join(self.config_folder, 
'reportupdater_test2.yaml'),
+            sql_folder=self.sql_folder,
+            output_folder=self.output_folder,
+            history_path=self.history_path
+        )
+        # The report should not be computed because it has already been 
computed
+        # within this day. So the output file should not exist.
+        output_path = os.path.join(self.output_folder, 
'reportupdater_test2.csv')
+        self.assertFalse(os.path.exists(output_path))
+
+
+    def test_when_two_threads_run_reportupdater_in_parallel(self):
+        # Mock database methods.
+        def fetchall_callback():
+            return []
+        header = ['date', 'value']
+        connection_mock = ConnectionMock(None, fetchall_callback, header)
+
+        def connect_with_lag(**kwargs):
+            # This makes the connection take some time to execute,
+            # thus giving time to the second thread to start.
+            time.sleep(0.3)
+            return connection_mock
+        MySQLdb.connect = MagicMock(wraps=connect_with_lag)
+
+        # The first thread should execute normally and output the results.
+        history_path1 = 'test/fixtures/reportupdater_test1.history'
+        output_path1 = os.path.join(self.output_folder, 
'reportupdater_test1.csv')
+        self.paths_to_clean.extend([history_path1, output_path1])
+        args1 = {
+            'config_path': os.path.join(self.config_folder, 
'reportupdater_test1.yaml'),
+            'sql_folder': self.sql_folder,
+            'output_folder': self.output_folder,
+            'history_path': history_path1
+        }
+        thread1 = Thread(target=reportupdater.run, kwargs=args1)
+        thread1.start()
+
+        # The second thread will start when the first thread is still running,
+        # so it should be discarded by the pidfile control
+        # and no output should be written.
+        # Note that the history file is different, so that
+        # the frequency control does not discard this thread.
+        time.sleep(0.1)
+        history_path2 = 'test/fixtures/reportupdater_test2.history'
+        output_path2 = os.path.join(self.output_folder, 
'reportupdater_test2.csv')
+        self.paths_to_clean.extend([history_path2, output_path2])
+        args2 = {
+            'config_path': os.path.join(self.config_folder, 
'reportupdater_test2.yaml'),
+            'sql_folder': self.sql_folder,
+            'output_folder': self.output_folder,
+            'history_path': history_path2
+        }
+        thread2 = Thread(target=reportupdater.run, kwargs=args2)
+        thread2.start()
+
+        # wait for the threads to finish and assert results
+        thread1.join()
+        output_path1 = os.path.join(self.output_folder, 
'reportupdater_test1.csv')
+        self.assertTrue(os.path.exists(output_path1))
+        thread2.join()
+        output_path2 = os.path.join(self.output_folder, 
'reportupdater_test2.csv')
+        self.assertFalse(os.path.exists(output_path2))
+
+
+    def test_hourly_timeboxed_report_without_previous_results(self):
+        def fetchall_callback():
+            # This method will return a subsequent row with each call.
+            try:
+                sql_date = self.last_date + relativedelta(days=+1)
+                value = self.last_value + 1
+            except AttributeError:
+                sql_date = date(2015, 1, 1)
+                value = 1
+            self.last_date = sql_date
+            self.last_value = value
+            return [[sql_date, str(value)]]
+        header = ['date', 'value']
+        connection_mock = ConnectionMock(None, fetchall_callback, header)
+        MySQLdb.connect = MagicMock(return_value=connection_mock)
+
+        config_path = os.path.join(self.config_folder, 
'reportupdater_test1.yaml')
+        output_path = os.path.join(self.output_folder, 
'reportupdater_test1.csv')
+        history_path = 'test/fixtures/reportupdater_test1.history'
+        self.paths_to_clean.extend([output_path, history_path])
+        reportupdater.run(
+            config_path=config_path,
+            sql_folder=self.sql_folder,
+            output_folder=self.output_folder,
+            history_path=history_path
+        )
+        self.assertTrue(os.path.exists(output_path))
+        with io.open(output_path, 'r', encoding='utf-8') as output_file:
+            output_lines = output_file.readlines()
+        self.assertTrue(len(output_lines) > 1)
+        header = output_lines.pop(0).strip()
+        self.assertEqual(header, 'date,value')
+        # Assert that all lines hold subsequent values.
+        expected_date = datetime(2015, 1, 1)
+        expected_value = 1
+        for line in output_lines:
+            expected_line = expected_date.strftime(DATE_FORMAT) + ',' + 
str(expected_value)
+            self.assertEqual(line.strip(), expected_line)
+            expected_date += relativedelta(days=+1)
+            expected_value += 1
+
+
+    def test_hourly_funnel_timeboxed_report_without_previous_results(self):
+        def fetchall_callback():
+            # This method will return a subsequent row with each call.
+            try:
+                sql_date = self.last_date + relativedelta(days=+1)
+            except AttributeError:
+                sql_date = date(2015, 1, 1)
+            self.last_date = sql_date
+            return [
+                [sql_date, '1'],
+                [sql_date, '2'],
+                [sql_date, '3']
+            ]
+        header = ['date', 'value']
+        connection_mock = ConnectionMock(None, fetchall_callback, header)
+        MySQLdb.connect = MagicMock(return_value=connection_mock)
+
+        config_path = os.path.join(self.config_folder, 
'reportupdater_test3.yaml')
+        output_path = os.path.join(self.output_folder, 
'reportupdater_test3.csv')
+        history_path = 'test/fixtures/reportupdater_test3.history'
+        self.paths_to_clean.extend([output_path, history_path])
+        reportupdater.run(
+            config_path=config_path,
+            sql_folder=self.sql_folder,
+            output_folder=self.output_folder,
+            history_path=history_path
+        )
+        self.assertTrue(os.path.exists(output_path))
+        with io.open(output_path, 'r', encoding='utf-8') as output_file:
+            output_lines = output_file.readlines()
+        self.assertTrue(len(output_lines) > 1)
+        header = output_lines.pop(0).strip()
+        self.assertEqual(header, 'date,value')
+        # Assert that all lines hold subsequent values.
+        expected_date = datetime(2015, 1, 1)
+        expected_value = 1
+        for line in output_lines:
+            expected_line = expected_date.strftime(DATE_FORMAT) + ',' + 
str(expected_value)
+            self.assertEqual(line.strip(), expected_line)
+            if expected_value < 3:
+                expected_value += 1
+            else:
+                expected_date += relativedelta(days=+1)
+                expected_value = 1
+
+
+    def test_daily_timeboxed_report_with_previous_results(self):
+        def fetchall_callback():
+            # This method will return a subsequent row with each call.
+            try:
+                date = self.last_date + relativedelta(months=+1)
+                value = self.last_value + 1
+            except AttributeError:
+                # Starts at Mar, Jan and Feb are in previous results
+                date = datetime(2015, 3, 1)
+                value = 3
+            self.last_date = date
+            self.last_value = value
+            return [[date.strftime(DATE_FORMAT), str(value)]]
+        header = ['date', 'value']
+        connection_mock = ConnectionMock(None, fetchall_callback, header)
+        MySQLdb.connect = MagicMock(return_value=connection_mock)
+
+        config_path = os.path.join(self.config_folder, 
'reportupdater_test2.yaml')
+        output_path = os.path.join(self.output_folder, 
'reportupdater_test2.csv')
+        history_path = 'test/fixtures/reportupdater_test2.history'
+        with io.open(output_path, 'w') as output_file:
+            
output_file.write(unicode('date,value\n2015-01-01,1\n2015-02-01,2\n'))
+        self.paths_to_clean.extend([output_path, history_path])
+        reportupdater.run(
+            config_path=config_path,
+            sql_folder=self.sql_folder,
+            output_folder=self.output_folder,
+            history_path=history_path
+        )
+        self.assertTrue(os.path.exists(output_path))
+        with io.open(output_path, 'r', encoding='utf-8') as output_file:
+            output_lines = output_file.readlines()
+        self.assertTrue(len(output_lines) > 1)
+        header = output_lines.pop(0).strip()
+        self.assertEqual(header, 'date,value')
+        # Assert that all lines hold subsequent values.
+        expected_date = datetime(2015, 1, 1)
+        expected_value = 1
+        for line in output_lines:
+            expected_line = expected_date.strftime(DATE_FORMAT) + ',' + 
str(expected_value)
+            self.assertEqual(line.strip(), expected_line)
+            expected_date += relativedelta(months=+1)
+            expected_value += 1
+
+
+    def write_time_to_history(self, last_exec_time):
+        last_exec_time_str = last_exec_time.strftime(DATE_AND_TIME_FORMAT)
+        with io.open(self.history_path, 'w') as history_file:
+            history_file.write(unicode(last_exec_time_str))
+        self.paths_to_clean.append(self.history_path)
diff --git a/test/selector_test.py b/test/selector_test.py
new file mode 100644
index 0000000..572f6aa
--- /dev/null
+++ b/test/selector_test.py
@@ -0,0 +1,236 @@
+
+import os
+from reportupdater.selector import Selector
+from reportupdater.reader import Reader
+from reportupdater.report import Report
+from unittest import TestCase
+from mock import MagicMock
+from datetime import datetime
+from dateutil.relativedelta import relativedelta
+
+
+class SelectorTest(TestCase):
+
+
+    def setUp(self):
+        self.config = {
+            'output_folder': 'test/fixtures/output',
+            'last_exec_time': datetime(2015, 1, 2, 23, 50, 30),
+            'current_exec_time': datetime(2015, 1, 3, 1, 20, 30)
+        }
+        reader = Reader(self.config)
+        self.selector = Selector(reader, self.config)
+
+        self.report = Report()
+        self.report.key = 'selector_test'
+        self.report.first_date = datetime(2015, 1, 1)
+        self.report.frequency = 'hours'
+        self.report.granularity = 'days'
+        self.report.is_timeboxed = True
+
+
+    def test_is_time_to_execute_when_last_exec_time_is_none(self):
+        last_exec_time = None
+        now = datetime.now()
+        frequency = 'hours'
+        is_time = self.selector.is_time_to_execute(last_exec_time, now, 
frequency)
+        self.assertTrue(is_time)
+
+
+    def test_is_time_to_execute_when_both_dates_are_in_the_same_hour(self):
+        last_exec_time = datetime(2015, 1, 1, 3, 30, 0)
+        now = datetime(2015, 1, 1, 3, 40, 0)
+        frequency = 'hours'
+        is_time = self.selector.is_time_to_execute(last_exec_time, now, 
frequency)
+        self.assertFalse(is_time)
+
+
+    def test_is_time_to_execute_when_both_dates_are_in_different_hours(self):
+        last_exec_time = datetime(2015, 1, 1, 3, 30, 0)
+        now = datetime(2015, 1, 1, 4, 20, 0)
+        frequency = 'hours'
+        is_time = self.selector.is_time_to_execute(last_exec_time, now, 
frequency)
+        self.assertTrue(is_time)
+
+
+    def test_is_time_to_execute_when_both_dates_are_in_the_same_day(self):
+        last_exec_time = datetime(2015, 1, 1, 3, 30, 0)
+        now = datetime(2015, 1, 1, 10, 40, 0)
+        frequency = 'days'
+        is_time = self.selector.is_time_to_execute(last_exec_time, now, 
frequency)
+        self.assertFalse(is_time)
+
+
+    def test_is_time_to_execute_when_both_dates_are_in_different_days(self):
+        last_exec_time = datetime(2015, 1, 1, 3, 30, 0)
+        now = datetime(2015, 1, 2, 4, 20, 0)
+        frequency = 'days'
+        is_time = self.selector.is_time_to_execute(last_exec_time, now, 
frequency)
+        self.assertTrue(is_time)
+
+
+    def test_get_interval_reports_when_previous_results_is_empty(self):
+        # Note no previous results csv exists for default report.
+        now = datetime(2015, 1, 2)
+        reports = list(self.selector.get_interval_reports(self.report, now))
+        self.assertEqual(len(reports), 2)
+        self.assertEqual(reports[0].start, datetime(2015, 1, 1))
+        self.assertEqual(reports[0].end, datetime(2015, 1, 2))
+        self.assertEqual(reports[1].start, datetime(2015, 1, 2))
+        self.assertEqual(reports[1].end, datetime(2015, 1, 3))
+
+
+    def test_get_interval_reports_when_previous_results_has_some_dates(self):
+        self.report.key = 'selector_test1'
+        # see: test/fixtures/output/selector_test1.csv
+        now = datetime(2015, 1, 2)
+        reports = list(self.selector.get_interval_reports(self.report, now))
+        self.assertEqual(len(reports), 1)
+        self.assertEqual(reports[0].start, datetime(2015, 1, 2))
+        self.assertEqual(reports[0].end, datetime(2015, 1, 3))
+
+
+    def test_get_interval_reports_when_previous_results_has_all_dates(self):
+        self.report.key = 'selector_test2'
+        # see: test/fixtures/output/selector_test2.csv
+        now = datetime(2015, 1, 2)
+        reports = list(self.selector.get_interval_reports(self.report, now))
+        self.assertEqual(len(reports), 1)
+        self.assertEqual(reports[0].start, datetime(2015, 1, 2))
+        self.assertEqual(reports[0].end, datetime(2015, 1, 3))
+
+
+    def test_truncate_date_when_period_is_hours(self):
+        date = datetime(2015, 1, 5, 10, 20, 30)
+        result = self.selector.truncate_date(date, 'hours')
+        expected = datetime(2015, 1, 5, 10, 0, 0)
+        self.assertEqual(result, expected)
+
+
+    def test_truncate_date_when_period_is_days(self):
+        date = datetime(2015, 1, 5, 10, 20, 30)
+        result = self.selector.truncate_date(date, 'days')
+        expected = datetime(2015, 1, 5, 0, 0, 0)
+        self.assertEqual(result, expected)
+
+
+    def test_truncate_date_when_period_is_months(self):
+        date = datetime(2015, 1, 5, 10, 20, 30)
+        result = self.selector.truncate_date(date, 'months')
+        expected = datetime(2015, 1, 1, 0, 0, 0)
+        self.assertEqual(result, expected)
+
+
+    def test_truncate_date_when_period_is_not_valid(self):
+        date = datetime(2015, 1, 5, 10, 20, 30)
+        with self.assertRaises(ValueError):
+            self.selector.truncate_date(date, 'notvalid')
+
+
+    def test_get_increment_when_period_is_days(self):
+        increment = self.selector.get_increment('days')
+        self.assertEqual(increment, relativedelta(days=1))
+
+
+    def test_get_increment_when_period_is_months(self):
+        increment = self.selector.get_increment('months')
+        self.assertEqual(increment, relativedelta(months=1))
+
+
+    def test_get_increment_when_period_is_invalid(self):
+        with self.assertRaises(ValueError):
+            self.selector.get_increment('notvalid')
+
+
+    def 
test_get_all_start_dates_when_first_date_is_greater_than_current_date(self):
+        first_date = datetime(2015, 1, 2)
+        current_date = datetime(2015, 1, 1)
+        increment = relativedelta(days=1)
+        with self.assertRaises(ValueError):
+            list(self.selector.get_all_start_dates(first_date, current_date, 
increment))
+
+
+    def test_get_all_start_dates_when_increment_is_negative(self):
+        first_date = datetime(2015, 1, 1)
+        current_date = datetime(2015, 1, 2)
+        increment = relativedelta(days=-1)
+        with self.assertRaises(ValueError):
+            list(self.selector.get_all_start_dates(first_date, current_date, 
increment))
+
+
+    def test_get_all_start_dates_when_first_date_equals_current_date(self):
+        date = datetime(2015, 1, 1)
+        increment = relativedelta(days=1)
+        all_dates = list(self.selector.get_all_start_dates(date, date, 
increment))
+        self.assertEqual(len(all_dates), 1)
+        self.assertEqual(all_dates[0], date)
+
+
+    def test_get_all_start_dates_when_increment_is_days(self):
+        first_date = datetime(2015, 1, 1)
+        current_date = datetime(2015, 1, 3)
+        increment = relativedelta(days=1)
+        result = list(self.selector.get_all_start_dates(first_date, 
current_date, increment))
+        expected = [
+            datetime(2015, 1, 1),
+            datetime(2015, 1, 2),
+            datetime(2015, 1, 3)
+        ]
+        self.assertEqual(result, expected)
+
+
+    def test_get_all_start_dates_when_increment_is_months(self):
+        first_date = datetime(2015, 1, 1)
+        current_date = datetime(2015, 3, 1)
+        increment = relativedelta(months=1)
+        result = list(self.selector.get_all_start_dates(first_date, 
current_date, increment))
+        expected = [
+            datetime(2015, 1, 1),
+            datetime(2015, 2, 1),
+            datetime(2015, 3, 1)
+        ]
+        self.assertEqual(result, expected)
+
+
+    def test_run_when_last_exec_time_is_greater_than_current_exec_time(self):
+        self.config['last_exec_time'] = datetime(2015, 1, 2)
+        self.config['current_exec_time'] = datetime(2015, 1, 1)
+        with self.assertRaises(ValueError):
+            list(self.selector.run())
+
+
+    def test_run_when_helper_method_raises_error(self):
+        read = [self.report]
+        self.selector.reader.run = MagicMock(return_value=read)
+        self.selector.is_time_to_execute = MagicMock(side_effect=Exception())
+        selected = list(self.selector.run())
+        self.assertEqual(len(selected), 0)
+
+
+    def test_run_when_not_is_time_to_execute(self):
+        read = [self.report]
+        self.selector.reader.run = MagicMock(return_value=read)
+        self.selector.is_time_to_execute = MagicMock(return_value=False)
+        selected = list(self.selector.run())
+        self.assertEqual(len(selected), 0)
+
+
+    def test_run_when_not_is_timeboxed(self):
+        self.report.is_timeboxed = False
+        read = [self.report]
+        self.selector.reader.run = MagicMock(return_value=read)
+        self.selector.is_time_to_execute = MagicMock(return_value=True)
+        selected = list(self.selector.run())
+        self.assertEqual(len(selected), 1)
+        self.assertEqual(selected[0], self.report)
+
+
+    def test_run_when_is_timeboxed(self):
+        self.report.is_timeboxed = True
+        read = [self.report]
+        self.selector.reader.run = MagicMock(return_value=read)
+        self.selector.is_time_to_execute = MagicMock(return_value=True)
+        self.selector.get_interval_reports = MagicMock(return_value=['report'])
+        selected = list(self.selector.run())
+        self.assertEqual(len(selected), 1)
+        self.assertEqual(selected[0], 'report')
diff --git a/test/test_utils.py b/test/test_utils.py
new file mode 100644
index 0000000..f179ce8
--- /dev/null
+++ b/test/test_utils.py
@@ -0,0 +1,29 @@
+
+class ConnectionCursorMock(object):
+
+    def __init__(self, execute_callback, fetchall_callback, header):
+        self.execute_callback = execute_callback
+        self.fetchall_callback = fetchall_callback
+        self.description = [[header_field] for header_field in header]
+
+    def execute(self, sql_query):
+        if self.execute_callback:
+            self.execute_callback(sql_query)
+
+    def fetchall(self):
+        if self.fetchall_callback:
+            return self.fetchall_callback()
+
+    def close(self):
+        pass
+
+
+class ConnectionMock(object):
+
+    def __init__(self, execute_callback, fetchall_callback, header):
+        self.execute_callback = execute_callback
+        self.fetchall_callback = fetchall_callback
+        self.header = header
+
+    def cursor(self):
+        return ConnectionCursorMock(self.execute_callback, 
self.fetchall_callback, self.header)
diff --git a/test/writer_test.py b/test/writer_test.py
new file mode 100644
index 0000000..04803c1
--- /dev/null
+++ b/test/writer_test.py
@@ -0,0 +1,190 @@
+
+import os
+import io
+from reportupdater.writer import Writer
+from reportupdater.executor import Executor
+from reportupdater.selector import Selector
+from reportupdater.reader import Reader
+from reportupdater.report import Report
+from unittest import TestCase
+from datetime import datetime
+from mock import MagicMock
+
+
+class WriterTest(TestCase):
+
+
+    def setUp(self):
+        self.config = {
+            'output_folder': 'test/fixtures/output'
+        }
+        reader = Reader(self.config)
+        selector = Selector(reader, self.config)
+        executor = Executor(selector, self.config)
+        self.writer = Writer(executor, self.config)
+
+        self.report = Report()
+        self.report.key = 'writer_test'
+        self.report.sql_template = 'SOME sql TEMPLATE;'
+        self.report.results = {
+            'header': ['date', 'value'],
+            'data': {
+                datetime(2015, 1, 1): [datetime(2015, 1, 1), '1']
+            }
+        }
+
+        self.io_open_stash = io.open
+        self.os_rename_stash = os.rename
+        self.paths_to_clean = []
+
+
+    def tearDown(self):
+        try:
+            os.remove('test/fixtures/output/writer_test.csv')
+        except:
+            pass
+        io.open = self.io_open_stash
+        os.rename = self.os_rename_stash
+        for path in self.paths_to_clean:
+            try:
+                os.remove(path)
+            except:
+                pass
+
+
+    def test_write_results_when_io_open_raises_error(self):
+        io.open = MagicMock(side_effect=Exception())
+        header = self.report.results['header']
+        data = self.report.results['data']
+        output_folder = self.config['output_folder']
+        with self.assertRaises(RuntimeError):
+            self.writer.write_results(header, data, self.report, output_folder)
+
+
+    def test_write_results_when_os_rename_raises_error(self):
+        os.rename = MagicMock(side_effect=Exception())
+        header = self.report.results['header']
+        data = self.report.results['data']
+        output_folder = self.config['output_folder']
+        with self.assertRaises(RuntimeError):
+            self.writer.write_results(header, data, self.report, output_folder)
+
+
+    def test_write_results_when_results_data_is_empty(self):
+        header = ['date', 'value']
+        data = {}
+        output_folder = self.config['output_folder']
+        self.writer.write_results(header, data, self.report, output_folder)
+        output_path = os.path.join(output_folder, self.report.key + '.csv')
+        self.paths_to_clean.append(output_path)
+        with io.open(output_path, 'r', encoding='utf-8') as output_file:
+            output = output_file.read().strip()
+        self.assertEqual(output, 'date,value')
+
+
+    def test_write_results_with_funnel_data(self):
+        self.report.is_funnel = True
+        header = ['date', 'value']
+        data = {
+            datetime(2015, 1, 2): [[datetime(2015, 1, 2), 'c'], 
[datetime(2015, 1, 2), 'd']],
+            datetime(2015, 1, 3): [[datetime(2015, 1, 3), 'e']],
+            datetime(2015, 1, 1): [[datetime(2015, 1, 1), 'a'], 
[datetime(2015, 1, 1), 'b']]
+        }
+        output_folder = self.config['output_folder']
+        self.writer.write_results(header, data, self.report, output_folder)
+        output_path = os.path.join(output_folder, self.report.key + '.csv')
+        self.paths_to_clean.append(output_path)
+        with io.open(output_path, 'r', encoding='utf-8') as output_file:
+            output_lines = output_file.readlines()
+        self.assertEqual(len(output_lines), 6)
+        self.assertEqual(output_lines[0], 'date,value\n')
+        self.assertEqual(output_lines[1], '2015-01-01,a\n')
+        self.assertEqual(output_lines[2], '2015-01-01,b\n')
+        self.assertEqual(output_lines[3], '2015-01-02,c\n')
+        self.assertEqual(output_lines[4], '2015-01-02,d\n')
+        self.assertEqual(output_lines[5], '2015-01-03,e\n')
+
+
+    def test_write_results(self):
+        header = ['date', 'value']
+        data = {
+            datetime(2015, 1, 2): [datetime(2015, 1, 2), 'a'],
+            datetime(2015, 1, 3): [datetime(2015, 1, 3), 'b'],
+            datetime(2015, 1, 1): [datetime(2015, 1, 1), 'c']
+        }
+        output_folder = self.config['output_folder']
+        self.writer.write_results(header, data, self.report, output_folder)
+        output_path = os.path.join(output_folder, self.report.key + '.csv')
+        self.paths_to_clean.append(output_path)
+        with io.open(output_path, 'r', encoding='utf-8') as output_file:
+            output_lines = output_file.readlines()
+        self.assertEqual(len(output_lines), 4)
+        self.assertEqual(output_lines[0], 'date,value\n')
+        self.assertEqual(output_lines[1], '2015-01-01,c\n')
+        self.assertEqual(output_lines[2], '2015-01-02,a\n')
+        self.assertEqual(output_lines[3], '2015-01-03,b\n')
+
+
+    def test_run_when_previous_results_header_is_empty(self):
+        # self.report has no previous results csv by default setup
+        executed = [self.report]
+        self.writer.executor.run = MagicMock(return_value=executed)
+        self.writer.write_results = MagicMock()
+        self.writer.run()
+        self.writer.write_results.assert_called_once_with(
+            self.report.results['header'],
+            self.report.results['data'],
+            self.report,
+            self.config['output_folder']
+        )
+
+
+    def 
test_run_when_previous_results_header_and_results_header_are_different(self):
+        self.report.key = 'writer_test1'
+        # previous header will be: ['date', 'val1', 'val2', 'val3']
+        # see: test/fixtures/output/writer_test1.csv
+        self.report.results = {
+            'header': ['date', 'val4', 'val5'],
+            'data': {
+                datetime(2015, 1, 1): [datetime(2015, 1, 1), 4, 5]
+            }
+        }
+        executed = [self.report]
+        self.writer.executor.run = MagicMock(return_value=executed)
+        with self.assertRaises(ValueError):
+            self.writer.run()
+
+
+    def test_run_when_helper_method_raises_error(self):
+        executed = [self.report]
+        self.writer.executor.run = MagicMock(return_value=executed)
+        self.writer.write_results = MagicMock(side_effect=Exception())
+        self.writer.run()
+        # just checking no error is raised
+
+
+    def test_run(self):
+        self.report.key = 'writer_test2'
+        output_path = os.path.join(self.config['output_folder'], 
self.report.key + '.csv')
+        self.paths_to_clean.append(output_path)
+        # Set up previous results.
+        # File can not be a permanent fixture, because it is overwritten be 
the test.
+        with io.open(output_path, 'w') as output_file:
+            
output_file.write(unicode('date,value\n2015-01-01,a\n2015-01-02,b\n'))
+        # Set up current results.
+        self.report.results['header'] = ['date', 'value']
+        self.report.results['data'] = {
+            datetime(2015, 1, 1): [datetime(2015, 1, 1), 'a'],
+            datetime(2015, 1, 2): [datetime(2015, 1, 2), 'b'],
+            datetime(2015, 1, 3): [datetime(2015, 1, 3), 'c']
+        }
+        executed = [self.report]
+        self.writer.executor.run = MagicMock(return_value=executed)
+        self.writer.run()
+        with io.open(output_path, 'r', encoding='utf-8') as output_file:
+            output_lines = output_file.readlines()
+        self.assertEqual(len(output_lines), 4)
+        self.assertEqual(output_lines[0], 'date,value\n')
+        self.assertEqual(output_lines[1], '2015-01-01,a\n')
+        self.assertEqual(output_lines[2], '2015-01-02,b\n')
+        self.assertEqual(output_lines[3], '2015-01-03,c\n')
diff --git a/tox.ini b/tox.ini
index feaeeee..63c0beb 100644
--- a/tox.ini
+++ b/tox.ini
@@ -22,7 +22,8 @@
 # E221: sometimes it makes more readable code if you can align a series of 
statements
 # E203: sometimes it makes more readable code if you can align a series of 
statements
 # E711: there are valid reasons to do column != None in SQL Alchemy
-ignore = W293,F401,E221,E203,E711
+# E303: Too many blank lines may be ok if it makes code more readable
+ignore = W293,F401,E221,E203,E711,E303
 # line lengths should be limited but not to 80
 max-line-length = 120
-exclude = .venv,.tox,dist,doc,build,*.egg
+exclude = .venv,.tox,dist,doc,build,*.egg,env

-- 
To view, visit https://gerrit.wikimedia.org/r/192319
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I5885cd85499501741b78fbbc95225939dc46b329
Gerrit-PatchSet: 15
Gerrit-Project: analytics/limn-mobile-data
Gerrit-Branch: master
Gerrit-Owner: Mforns <[email protected]>
Gerrit-Reviewer: Mforns <[email protected]>
Gerrit-Reviewer: Milimetric <[email protected]>
Gerrit-Reviewer: Nuria <[email protected]>
Gerrit-Reviewer: Yuvipanda <[email protected]>
Gerrit-Reviewer: jenkins-bot <>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to