rhunwicks closed pull request #3492: PandasConnector
URL: https://github.com/apache/incubator-superset/pull/3492
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/contrib/__init__.py b/contrib/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/contrib/cache/__init__.py b/contrib/cache/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/contrib/cache/dataframe.py b/contrib/cache/dataframe.py
new file mode 100644
index 0000000000..4d4014b824
--- /dev/null
+++ b/contrib/cache/dataframe.py
@@ -0,0 +1,195 @@
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+from __future__ import unicode_literals
+
+from io import open
+import json
+import os
+import tempfile
+from time import time
+try:
+    import cPickle as pickle
+except ImportError:  # pragma: no cover
+    import pickle
+
+import pandas as pd
+from six import u
+from werkzeug.contrib.cache import FileSystemCache
+
+
+class DataFrameCache(FileSystemCache):
+    """
+    A cache that stores Pandas DataFrames on the file system.
+
+    DataFrames are stored in Feather Format - a fast on-disk representation
+    of the Apache Arrow in-memory format to eliminate serialization
+    overhead.
+
+    This cache depends on being the only user of the `cache_dir`. Make
+    absolutely sure that nobody but this cache stores files there or
+    otherwise the cache will randomly delete files therein.
+
+    :param cache_dir: the directory where cache files are stored.
+    :param threshold: the maximum number of items the cache stores before
+                      it starts deleting some.
+    :param default_timeout: the default timeout that is used if no timeout is
+                            specified on :meth:`~BaseCache.set`. A timeout of
+                            0 indicates that the cache never expires.
+    :param mode: the file mode wanted for the cache files, default 0600
+    """
+
+    _fs_cache_suffix = '.cached'
+    _fs_metadata_suffix = '.metadata'
+
+    def _list_dir(self):
+        """return a list of (fully qualified) cache filenames
+        """
+        return [os.path.join(self._path, fn) for fn in os.listdir(self._path)
+                if fn.endswith(self._fs_cache_suffix)]
+
+    def _prune(self):
+        entries = self._list_dir()
+        if len(entries) > self._threshold:
+            now = time()
+            for idx, cname in enumerate(entries):
+                mname = os.path.splitext(cname)[0] + self._fs_metadata_suffix
+                try:
+                    with open(mname, 'r', encoding='utf-8') as f:
+                        metadata = json.load(f)
+                except (IOError, OSError):
+                    metadata = {'expires': -1}
+                try:
+                    remove = ((metadata['expires'] != 0 and 
metadata['expires'] <= now)
+                              or idx % 3 == 0)
+                    if remove:
+                        os.remove(cname)
+                        os.remove(mname)
+                except (IOError, OSError):
+                    pass
+
+    def clear(self):
+        for cname in self._list_dir():
+            try:
+                mname = os.path.splitext(cname)[0] + self._fs_metadata_suffix
+                os.remove(cname)
+                os.remove(mname)
+            except (IOError, OSError):
+                return False
+        return True
+
+    def get(self, key):
+        filename = self._get_filename(key)
+        cname = filename + self._fs_cache_suffix
+        mname = filename + self._fs_metadata_suffix
+        try:
+            with open(mname, 'r', encoding='utf-8') as f:
+                metadata = json.load(f)
+        except (IOError, OSError):
+            metadata = {'expires': -1}
+        try:
+            with open(cname, 'rb') as f:
+                if metadata['expires'] == 0 or metadata['expires'] > time():
+                    read_method = getattr(pd, 
'read_{}'.format(metadata['format']))
+                    read_args = metadata.get('read_args', {})
+                    if metadata['format'] == 'hdf':
+                        return read_method(f.name, **read_args)
+                    else:
+                        return read_method(f, **read_args)
+                else:
+                    os.remove(cname)
+                    os.remove(mname)
+                    return None
+        except (IOError, OSError):
+            return None
+
+    def add(self, key, value, timeout=None):
+        filename = self._get_filename(key) + self._fs_cache_suffix
+        if not os.path.exists(filename):
+            return self.set(key, value, timeout)
+        return False
+
+    def set(self, key, value, timeout=None):
+        metadata = {'expires': self._normalize_timeout(timeout)}
+        filename = self._get_filename(key)
+        cname = filename + self._fs_cache_suffix
+        mname = filename + self._fs_metadata_suffix
+        suffix = self._fs_transaction_suffix
+        self._prune()
+
+        def to_feather(filename, dataframe, metadata):
+            with tempfile.NamedTemporaryFile(dir=self._path, suffix=suffix) as 
f:
+                dataframe.to_feather(f)
+                metadata['format'] = 'feather'
+                os.link(f.name, cname)
+
+        def to_hdf(filename, dataframe, metadata):
+            with tempfile.NamedTemporaryFile(dir=self._path, suffix=suffix) as 
f:
+                dataframe.to_hdf(f.name, 'df')
+                metadata['format'] = 'hdf'
+                metadata['read_args'] = {'key': 'df'}
+                os.link(f.name, cname)
+
+        def to_pickle(filename, dataframe, metadata):
+            with tempfile.NamedTemporaryFile(dir=self._path, suffix=suffix) as 
f:
+                pickle.dump(dataframe, f, pickle.HIGHEST_PROTOCOL)
+                metadata['format'] = 'pickle'
+                os.link(f.name, cname)
+
+        for serializer in [to_feather, to_hdf, to_pickle]:
+            try:
+                serializer(cname, value, metadata)
+                with open(mname, 'w', encoding='utf-8') as f:
+                    f.write(u(json.dumps(metadata)))
+                return True
+            except Exception:
+                # Try the next serializer
+                pass
+
+        # We didn't successfully save the data
+        return False
+
+    def delete(self, key):
+        filename = self._get_filename(key)
+        cname = filename + self._fs_cache_suffix
+        mname = filename + self._fs_metadata_suffix
+        try:
+            os.remove(cname)
+            os.remove(mname)
+        except (IOError, OSError):
+            return False
+        else:
+            return True
+
+    def has(self, key):
+        filename = self._get_filename(key)
+        cname = filename + self._fs_cache_suffix
+        mname = filename + self._fs_metadata_suffix
+        try:
+            with open(mname, 'r', encoding='utf-8') as f:
+                metadata = json.load(f)
+        except (IOError, OSError):
+            metadata = {'expires': -1}
+        try:
+            with open(cname, 'rb') as f:
+                if metadata['expires'] == 0 or metadata['expires'] > time():
+                    return True
+                else:
+                    os.remove(cname)
+                    os.remove(mname)
+                    return False
+        except (IOError, OSError):
+            return False
+
+    def inc(self, key, delta=1):
+        raise NotImplementedError()
+
+    def dec(self, key, delta=1):
+        raise NotImplementedError()
+
+
+def dataframe(app, config, args, kwargs):
+    """Return a DataFrameCache for use by Flask-Cache."""
+    args.insert(0, config['CACHE_DIR'])
+    kwargs.update(dict(threshold=config['CACHE_THRESHOLD']))
+    return DataFrameCache(*args, **kwargs)
diff --git a/contrib/connectors/__init__.py b/contrib/connectors/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/contrib/connectors/pandas/__init__.py 
b/contrib/connectors/pandas/__init__.py
new file mode 100644
index 0000000000..b2df79851f
--- /dev/null
+++ b/contrib/connectors/pandas/__init__.py
@@ -0,0 +1,2 @@
+from . import models  # noqa
+from . import views  # noqa
diff --git a/contrib/connectors/pandas/models.py 
b/contrib/connectors/pandas/models.py
new file mode 100644
index 0000000000..8db84a2c3e
--- /dev/null
+++ b/contrib/connectors/pandas/models.py
@@ -0,0 +1,931 @@
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+from __future__ import unicode_literals
+
+from collections import OrderedDict
+from datetime import datetime
+import hashlib
+from io import BytesIO
+import logging
+try:
+    from urllib.parse import urlparse
+except ImportError:
+    from urlparse import urlparse
+
+from flask import escape, Markup
+from flask_appbuilder import Model
+from flask_babel import lazy_gettext as _
+import pandas as pd
+from pandas.api.types import (
+    is_datetime64_any_dtype, is_numeric_dtype, is_string_dtype)
+from past.builtins import basestring
+import requests
+import sqlalchemy as sa
+from sqlalchemy import (
+    and_, Column, ForeignKey, Integer, or_, String, Text,
+)
+from sqlalchemy.orm import backref, relationship
+from sqlalchemy_utils import ChoiceType, JSONType
+
+from superset import dataframe_cache, db, sm, utils
+from superset.connectors.base.models import (
+    BaseColumn, BaseDatasource, BaseMetric)
+from superset.models.helpers import QueryResult, set_perm
+from superset.utils import QueryStatus
+
+FORMATS = [
+    ('csv', 'csv'),
+    ('html', 'html'),
+    ('json', 'json'),
+    ('excel', 'Microsoft Excel'),
+    ('stata', 'Stata'),
+]
+
+try:
+    import tables  # NOQA
+    FORMATS.append(('hdf', 'HDF5'))
+except ImportError:
+    pass
+
+try:
+    import feather  # NOQA
+    FORMATS.append(('feather', 'Feather'))
+except ImportError:
+    pass
+
+
+class PandasDatabase(object):
+    """Non-ORM object for a Pandas Source"""
+
+    def __init__(self, database_name, cache_timeout=None):
+        self.database_name = database_name
+        self.cache_timeout = cache_timeout
+
+    def __str__(self):
+        return self.database_name
+
+
+class PandasColumn(Model, BaseColumn):
+    """
+    ORM object for Pandas columns.
+
+    Each Pandas Datasource can have multiple columns"""
+
+    __tablename__ = 'pandas_columns'
+
+    id = Column(Integer, primary_key=True)
+    pandas_datasource_id = Column(Integer, ForeignKey('pandas_datasources.id'))
+    datasource = relationship(
+        'PandasDatasource',
+        backref=backref('columns', cascade='all, delete-orphan'),
+        foreign_keys=[pandas_datasource_id])
+    expression = Column(Text)
+
+    @property
+    def is_num(self):
+        return self.type and is_numeric_dtype(self.type)
+
+    @property
+    def is_time(self):
+        return self.type and is_datetime64_any_dtype(self.type)
+
+    @property
+    def is_dttm(self):
+        return self.is_time
+
+    @property
+    def is_string(self):
+        return self.type and is_string_dtype(self.type)
+
+    @property
+    def data(self):
+        attrs = (
+            'column_name', 'verbose_name', 'description', 'expression',
+            'filterable', 'groupby', 'is_dttm')
+        return {s: getattr(self, s) for s in attrs}
+
+    def get_perm(self):
+        if self.datasource:
+            return ('{parent_name}.[{obj.expression}]'
+                    '(id:{obj.id})').format(
+                obj=self,
+                parent_name=self.datasource.full_name)
+        return None
+
+
+class PandasMetric(Model, BaseMetric):
+    """
+    ORM object for Pandas metrics.
+
+    Each Pandas Datasource can have multiple metrics
+    """
+
+    __tablename__ = 'pandas_metrics'
+
+    id = Column(Integer, primary_key=True)
+    pandas_datasource_id = Column(Integer, ForeignKey('pandas_datasources.id'))
+    datasource = relationship(
+        'PandasDatasource',
+        backref=backref('metrics', cascade='all, delete-orphan'),
+        foreign_keys=[pandas_datasource_id])
+    source = Column(Text)
+    expression = Column(Text)
+
+    def get_perm(self):
+        if self.datasource:
+            return ('{parent_name}.[{obj.metric_name}]'
+                    '(id:{obj.id})').format(
+                obj=self,
+                parent_name=self.datasource.full_name)
+        return None
+
+
+class PandasDatasource(Model, BaseDatasource):
+    """A datasource based on a Pandas DataFrame"""
+
+    # See 
http://pandas.pydata.org/pandas-docs/stable/timeseries.html#offset-aliases # 
NOQA
+    GRAINS = OrderedDict([
+        ('5 seconds', '5S'),
+        ('30 seconds', '30S'),
+        ('1 minute', 'T'),
+        ('5 minutes', '5T'),
+        ('1 hour', 'H'),
+        ('6 hour', '6H'),
+        ('day', 'D'),
+        ('one day', 'D'),
+        ('1 day', 'D'),
+        ('7 days', '7D'),
+        ('week', 'W-MON'),
+        ('week_starting_sunday', 'W-SUN'),
+        ('week_ending_saturday', 'W-SUN'),
+        ('month', 'M'),
+        ('quarter', 'Q'),
+        ('year', 'A'),
+    ])
+
+    __tablename__ = 'pandas_datasources'
+    type = 'pandas'
+    baselink = 'pandasdatasourcemodelview'  # url portion pointing to ModelView
+    column_class = PandasColumn
+    metric_class = PandasMetric
+
+    name = Column(String(100), nullable=False)
+    source_url = Column(String(1000), nullable=False)
+    source_auth = Column(JSONType)
+    source_parameters = Column(JSONType)
+    format = Column(ChoiceType(FORMATS), nullable=False)
+    additional_parameters = Column(JSONType)
+
+    user_id = Column(Integer, ForeignKey('ab_user.id'))
+    owner = relationship(
+        sm.user_model,
+        backref='pandas_datasources',
+        foreign_keys=[user_id])
+
+    fetch_values_predicate = Column(String(1000))
+    main_dttm_col = Column(String(250))
+
+    # Used to do code highlighting when displaying the query in the UI
+    query_language = None
+
+    # A Pandas Dataframe containing the data retrieved from the source url
+    df = None
+
+    def __repr__(self):
+        return self.name
+
+    @property
+    def datasource_name(self):
+        return self.name
+
+    @property
+    def full_name(self):
+        return self.name
+
+    @property
+    def database(self):
+        try:
+            database_name = urlparse(self.source_url).netloc
+        except AttributeError:
+            database_name = 'memory'
+        return PandasDatabase(database_name=database_name,
+                              cache_timeout=None)
+
+    @property
+    def connection(self):
+        return self.source_url
+
+    @property
+    def schema(self):
+        uri = urlparse(self.source_url)
+        return uri.path
+
+    @property
+    def schema_perm(self):
+        """Returns endpoint permission if present, host one otherwise."""
+        return utils.get_schema_perm(self.database, self.schema)
+
+    @property
+    def description_markeddown(self):
+        return utils.markdown(self.description)
+
+    @property
+    def link(self):
+        name = escape(self.name)
+        return Markup(
+            '<a href="{self.explore_url}">{name}</a>'.format(**locals()))
+
+    def get_perm(self):
+        return (
+            'pandas.{obj.name}'
+            '(id:{obj.id})').format(obj=self)
+
+    @property
+    def dttm_cols(self):
+        l = [c.column_name for c in self.columns if c.is_dttm]  # noqa: E741
+        if self.main_dttm_col and self.main_dttm_col not in l:
+            l.append(self.main_dttm_col)
+        return l
+
+    @property
+    def num_cols(self):
+        return [c.column_name for c in self.columns if c.is_num]
+
+    @property
+    def any_dttm_col(self):
+        cols = self.dttm_cols
+        if cols:
+            return cols[0]
+
+    @property
+    def html(self):
+        t = ((c.column_name, c.type) for c in self.columns)
+        df = pd.DataFrame(t)
+        df.columns = ['field', 'type']
+        return df.to_html(
+            index=False,
+            classes=(
+                'dataframe table table-striped table-bordered '
+                'table-condensed'))
+
+    @property
+    def data(self):
+        d = super(PandasDatasource, self).data
+        # Note that the front end uses `granularity_sqla` and
+        # `time_grain_sqla` as the parameters for selecting the
+        # column and time grain separately.
+        d['granularity_sqla'] = utils.choicify(self.dttm_cols)
+        d['time_grain_sqla'] = [(g, g) for g in self.GRAINS.keys()]
+        logging.info(d)
+        return d
+
+    @property
+    def pandas_read_method(self):
+        try:
+            # The format is a Choice object
+            format = self.format.code
+        except AttributeError:
+            format = self.format
+        return getattr(pd, 'read_{format}'.format(format=format))
+
+    @property
+    def pandas_read_parameters(self):
+        return self.additional_parameters or {}
+
+    def get_empty_dataframe(self):
+        """Create an empty dataframe with the correct columns and dtypes"""
+        columns = []
+        for col in self.columns:
+            type = ('datetime64[ns]'
+                    if is_datetime64_any_dtype(col.type)
+                    else col.type)
+            columns.append((col.column_name, type))
+        return pd.DataFrame({k: pd.Series(dtype=t) for k, t in columns})
+
+    @property
+    def cache_key(self):
+        source = {'source_url': self.source_url,
+                  'source_auth': self.source_auth}
+        source.update(self.source_parameters or {})
+        source.update(self.pandas_read_parameters)
+        s = str([(k, source[k]) for k in sorted(source.keys())])
+        return hashlib.md5(s.encode('utf-8')).hexdigest()
+
+    def get_dataframe(self):
+        """
+        Read the source_url and return a Pandas DataFrame.
+
+        Use the PandasColumns to coerce columns into the correct dtype,
+        and add any calculated columns to the DataFrame.
+        """
+        if self.df is None:
+            if dataframe_cache:
+                cache_key = self.cache_key
+                self.df = dataframe_cache.get(cache_key)
+            if not isinstance(self.df, pd.DataFrame):
+                if (isinstance(self.source_url, basestring) and
+                        self.source_url[:4] == 'http'):
+                    # Use requests to retrieve remote data so we can handle 
authentication
+                    auth = self.source_auth
+                    url = self.source_url
+                    if isinstance(auth, (tuple, list)):
+                        response = requests.get(url, 
params=self.source_parameters,
+                                                auth=tuple(auth))
+                    elif auth:
+                        response = requests.get(url, 
params=self.source_parameters,
+                                                headers={'Authorization': 
auth})
+                    else:
+                        response = requests.get(url, 
params=self.source_parameters)
+                    response.raise_for_status()
+                    data = BytesIO(response.content)
+                else:
+                    # Local file, so just use Pandas directly
+                    data = self.source_url
+                # Read the dataframe from the response
+                self.df = self.pandas_read_method(data, 
**self.pandas_read_parameters)
+
+                # read_html returns a list of DataFrames
+                if (isinstance(self.df, list) and
+                        isinstance(self.df[0], pd.DataFrame)):
+                    self.df = self.df[0]
+
+                # Our column names are always strings
+                self.df.columns = [str(col) for col in self.df.columns]
+
+                if dataframe_cache:
+                    timeout = self.cache_timeout or self.database.cache_timeout
+                    dataframe_cache.set(cache_key, self.df, timeout)
+
+        calculated_columns = []
+        for col in self.columns:
+            name = col.column_name
+            type = col.type
+            # Prepare calculated columns
+            if col.expression:
+                calculated_columns.append('{name} = {expr}'.format(
+                    name=name, expr=col.expression))
+            elif type != self.df[name].dtype.name:
+                # Convert column to correct dtype
+                try:
+                    self.df[name] = self.df[name].values.astype(type)
+                except ValueError as e:
+                    message = ('Failed to convert column {name} '
+                               'from {old_type} to {new_type}').format(
+                        name=name,
+                        old_type=self.df[name].dtype.name,
+                        new_type=type)
+                    e.args = (message,) + e.args
+                    raise
+        # Add the calcuated columns, using a multi-line string to add them all 
at once
+        # See 
https://pandas.pydata.org/pandas-docs/stable/enhancingperf.html#enhancingperf-eval
  # NOQA: E501
+        if calculated_columns:
+            self.df.eval('\n'.join(calculated_columns),
+                         truediv=True,
+                         inplace=True)
+        return self.df
+
+    def get_filter_query(self, filter):
+        """
+        Build a query string to filter a dataframe.
+
+        Filter is a list of dicts of op, col and value.
+
+        Returns a string that can be passed to DataFrame.query() to
+        restrict the DataFrame to only the matching rows.
+        """
+        cols = {col.column_name: col for col in self.columns}
+        query = ''
+        for flt in filter:
+            if not all([flt.get(s) for s in ['col', 'op', 'val']]):
+                continue
+            col = flt['col']
+            col_obj = cols.get(col)
+            op = flt['op']
+            eq = flt['val']
+            if query:
+                query += ' and '
+            if op == 'LIKE':
+                query += "{col}.str.match('{eq}')".format(col=col, eq=eq)
+            else:
+                # Rely on Pandas partial string indexing for datetime fields,
+                # see 
https://pandas.pydata.org/pandas-docs/stable/timeseries.html#partial-string-indexing
  # NOQA
+                try:
+                    if ((col_obj.is_string or col_obj.is_dttm) and
+                            not isinstance(eq, list)):
+                        eq = "'{}'".format(eq)
+                except AttributeError:
+                    # col_obj is None, probably because the col is a metric,
+                    # in which case it is numeric anyway
+                    pass
+                query += '({col} {op} {eq})'.format(col=col, op=op, eq=eq)
+        return query
+
+    def get_agg_function(self, expr):
+        """
+        Return a function that can be passed to DataFrame.apply().
+
+        Complex expressions that work on multiple columns must be a function
+        that accepts a Group as the parameter.
+
+        The function can be defined on the Connector, or on the DataFrame,
+        in the local scope
+        """
+        if expr in ['sum', 'mean', 'std', 'sem', 'count', 'min', 'max']:
+            return expr
+        if hasattr(self, expr):
+            return getattr(self, expr)
+        if hasattr(self.get_dataframe(), expr):
+            return getattr(self.get_dataframe(), expr)
+        return locals()[expr]
+
+    def process_dataframe(
+            self,
+            df,
+            groupby, metrics,
+            granularity,
+            from_dttm, to_dttm,
+            filter=None,  # noqa
+            is_timeseries=True,
+            timeseries_limit=15,
+            timeseries_limit_metric=None,
+            row_limit=None,
+            inner_from_dttm=None,
+            inner_to_dttm=None,
+            orderby=None,
+            extras=None,
+            columns=None,
+            form_data=None,
+            order_desc=True):
+        """Querying any dataframe table from this common interface"""
+        if orderby:
+            orderby, ascending = map(list, zip(*orderby))
+        else:
+            orderby = []
+            ascending = []
+        metric_order_asc = not order_desc
+        filter = filter or []
+        query_str = 'df'
+
+        # Build a dict of the metrics to include, including those that
+        # are required for post-aggregation filtering
+        # Note that the front end uses `having_druid` as the parameter
+        # for post-aggregation filters, and we are reusing that
+        # interface component.
+        filtered_metrics = [flt['col']
+                            for flt in extras.get('having_druid', [])
+                            if flt['col'] not in metrics]
+        metrics_dict = {m.metric_name: m for m in self.metrics}
+        metrics_exprs = OrderedDict()
+        for m in metrics + filtered_metrics:
+            try:
+                metric = metrics_dict[m]
+            except KeyError:
+                raise Exception(_("Metric '{}' is not valid".format(m)))
+            metrics_exprs[m] = metric
+
+        # Standard tests (copied from SqlaTable)
+        if not granularity and is_timeseries:
+            raise Exception(_(
+                'Datetime column not provided as part table configuration '
+                'and is required by this type of chart'))
+
+        # Filter the DataFrame by the time column, and resample if necessary
+        timestamp_cols = []
+        timestamp_exprs = []
+        if granularity and granularity != 'all':
+
+            if from_dttm:
+                filter.append({'col': granularity,
+                               'op': '>=',
+                               'val': from_dttm})
+            if to_dttm:
+                filter.append({'col': granularity,
+                               'op': '<=',
+                               'val': to_dttm})
+
+            if is_timeseries:
+                # Note that the front end uses `time_grain_sqla` as
+                # the parameter for setting the time grain when the
+                # granularity is being used to select the timetamp column
+                time_grain = self.GRAINS[extras.get('time_grain_sqla')]
+                timestamp_cols = ['__timestamp']
+                timestamp_exprs = [pd.Grouper(key=granularity,
+                                              freq=time_grain)]
+
+                if timeseries_limit_metric and timeseries_limit:
+                    metric = metrics_dict[timeseries_limit_metric]
+                    assert isinstance(metric.source, basestring)
+                    aggregates = {metric.source: metric.expression}
+                    df = (df[df.set_index(groupby)
+                               .index.isin(
+                                   df.groupby(groupby, sort=False)
+                                   .aggregate(aggregates)
+                                   .sort_values(metric.source,
+                                                ascending=metric_order_asc)
+                                   .iloc[:timeseries_limit].index)])
+
+                    query_str += ('[df.set_index({groupby}).index.isin('
+                                  'df.groupby({groupby}, sort=False)'
+                                  '.aggregate({aggregates})'
+                                  ".sort_values('{metric.source}', "
+                                  'ascending={metric_order_asc})'
+                                  '.iloc[:{timeseries_limit}].index)]').format(
+                        groupby=groupby,
+                        timeseries_limit_metric=timeseries_limit_metric,
+                        timeseries_limit=timeseries_limit,
+                        aggregates=aggregates,
+                        metric=metric,
+                        metric_order_asc=metric_order_asc)
+
+        # Additional filtering of rows prior to aggregation
+        if filter:
+            filter_str = self.get_filter_query(filter)
+            df = df.query(filter_str)
+            query_str += '.query("{filter_str}")'.format(filter_str=filter_str)
+
+        # We have one of:
+        # - columns only: return a simple table of results with no aggregation
+        # - metrics only: return a single row with one column per metric
+        #                 aggregated for the whole dataframe
+        # - groupby and metrics: return a table of distinct groupby columns
+        #                 and aggregations
+        # - groupby only: return a table of distinct rows
+        if columns:
+            # A simple table of results with no aggregation or grouping
+            if orderby:
+                df = df.sort_values(orderby, ascending=ascending)
+                query_str += ('.sort_values({orderby}, '
+                              'ascending={ascending})').format(
+                    orderby=orderby,
+                    ascending=ascending)
+            df = df[columns]
+            query_str += '[{columns}]'.format(columns=columns)
+
+        elif metrics_exprs:
+            # Aggregate the dataframe
+
+            # Single-column aggregates can be calculated using aggregate,
+            # multi-column ones need to use apply.
+            # aggregates is a dict keyed by a column name, where the value is
+            # a list of expressions that can be used by DataFrame.aggregate()
+            # apply_functions is a dict keyed by the metric name, where the
+            # value is a function that can be passed to DataFrame.apply()
+            aggregates = OrderedDict()
+            agg_names = []
+            apply_functions = []
+            apply_names = []
+            for metric_name, metric in metrics_exprs.items():
+                sources = []
+                if metric.source:
+                    sources = [s.strip() for s in metric.source.split(',') if 
s]
+                if len(sources) == 1:
+                    # Single column source, so use aggregate
+                    func = self.get_agg_function(metric.expression)
+                    if metric.source in aggregates:
+                        aggregates[metric.source].append(func)
+                    else:
+                        aggregates[metric.source] = [func]
+                    agg_names.append(metric_name)
+                else:
+                    # Multiple columns so the expression must be a function
+                    # that accepts a Group as the parameter
+                    apply_functions.append((sources,
+                                            metric.expression,
+                                            
self.get_agg_function(metric.expression)))
+                    apply_names.append(metric_name)
+
+            # Build a list of like-indexed DataFrames containing the results
+            # of DataFrame.aggregate() and individual DataFrame.apply() calls
+            dfs = []
+            query_strs = []
+
+            if groupby or timestamp_exprs:
+                df = df.groupby(groupby + timestamp_exprs, sort=False)
+                query_str += '.groupby({}, sort=False)'.format(groupby + 
timestamp_exprs)
+
+            for sources, expr, func in apply_functions:
+                apply_df = df
+                apply_str = query_str
+
+                if sources:
+                    apply_df = df[sources]
+                    apply_str = query_str + '[{}]'.format(sources)
+
+                if groupby or timestamp_exprs:
+                    apply_df = df.apply(func)
+                    apply_str += '.apply({})'.format(expr)
+                else:
+                    # If we call DataFrame.apply() without a groupby then the 
func is
+                    # called on each column individually. Therefore, if we 
have a
+                    # summary with multi-column aggregates we need to pass the 
whole
+                    # DataFrame to the function.
+                    apply_df = pd.Series(func(apply_df)).to_frame()
+                    apply_str = 'pd.Series({expr}({df})).to_frame()'.format(
+                        expr=expr,
+                        df=apply_str)
+
+                    # Superset expects a DataFrame with single Row and
+                    # the metrics as columns, rather than with the metrics as 
the
+                    # index, so if we have a summary then we need to pivot it.
+                    apply_df = apply_df.unstack().to_frame().T
+                    apply_str += '.unstack().to_frame().T'
+
+                dfs.append(apply_df)
+                query_strs.append(apply_str)
+
+            if aggregates:
+                if groupby or timestamp_exprs:
+                    dfs.append(df.aggregate(aggregates))
+                    query_strs.append(query_str +
+                                      '.aggregate({})'.format(aggregates))
+                else:
+                    # For a summary table we need to preserve the metric order
+                    # so we can set the correct column names, so process 
aggregates
+                    # for each dataframe column in turn
+                    for col, col_agg in aggregates.items():
+                        agg_df = df.aggregate({col: col_agg})
+                        agg_str = query_str + '.aggregate({}: {})'.format(col, 
col_agg)
+
+                        # Superset expects a DataFrame with single Row and
+                        # the metrics as columns, rather than with the metrics 
as the
+                        # index, so if we have a summary then we need to pivot 
it.
+                        agg_df = agg_df.unstack().to_frame().T
+                        agg_str += '.unstack().to_frame().T'
+
+                        dfs.append(agg_df)
+                        query_strs.append(agg_str)
+
+            # If there is more than one DataFrame in the list then
+            # concatenate them along the index
+            if len(dfs) > 1:
+                df = pd.concat(dfs, axis=1)
+                query_str = 'pd.concat([{}], axis=1)'.format(', 
'.join(query_strs))
+            else:
+                df = dfs[0]
+                query_str = query_strs[0]
+
+            if groupby or timestamp_exprs:
+                df = df.reset_index()
+                query_str += '.reset_index()'
+
+            # Set the correct columns names and then reorder the columns
+            # to match the requested order
+            df.columns = groupby + timestamp_cols + apply_names + agg_names
+            df = df[groupby + timestamp_cols + metrics + filtered_metrics]
+
+            # Filtering of rows post-aggregation based on metrics
+            # Note that the front end uses `having_druid` as the parameter
+            # for post-aggregation filters, and we are reusing that
+            # interface component.
+            if extras.get('having_druid'):
+                filter_str = self.get_filter_query(extras.get('having_druid'))
+                df = df.query(filter_str)
+                query_str += '.query("{filter_str}")'.format(
+                    filter_str=filter_str)
+
+            # Order by the first metric descending by default,
+            # or within the existing orderby
+            orderby.append((metrics + filtered_metrics)[0])
+            ascending.append(metric_order_asc)
+
+            # Use the groupby and __timestamp by as a tie breaker
+            orderby = orderby + groupby + timestamp_cols
+            ascending = ascending + ([True] * len(groupby + timestamp_cols))
+
+            # Sort the values
+            if orderby:
+                df = df.sort_values(orderby, ascending=ascending)
+                query_str += ('.sort_values({orderby}, '
+                              'ascending={ascending})').format(
+                    orderby=orderby,
+                    ascending=ascending)
+
+            # Remove metrics only added for post-aggregation filtering
+            if filtered_metrics:
+                df = df.drop(filtered_metrics, axis=1)
+                query_str += '.drop({filtered_metrics}, axis=1)'.format(
+                    filtered_metrics=filtered_metrics)
+
+        elif groupby:
+            # Group by without any metrics is equivalent to SELECT DISTINCT,
+            # order by the size descending by default, or within the
+            # existing orderby
+            orderby.append(0)
+            ascending.append(not order_desc)
+            # Use the group by as a tie breaker
+            orderby = orderby + groupby
+            ascending = ascending + ([True] * len(groupby))
+            df = (df.groupby(groupby, sort=False)
+                    .size()
+                    .reset_index()
+                    .sort_values(orderby, ascending=ascending)
+                    .drop(0, axis=1))
+            query_str += ('.groupby({groupby}, sort=False).size()'
+                          '.reset_index()'
+                          '.sort_values({orderby}, ascending={ascending})'
+                          '.drop(0, axis=1)').format(
+                groupby=groupby,
+                orderby=orderby,
+                ascending=ascending)
+
+        if row_limit:
+            df = df.iloc[:row_limit]
+            query_str += '.iloc[:{row_limit}]'.format(row_limit=row_limit)
+
+        # Coerce datetimes to str so that Pandas can set the correct precision
+        for col in df.columns:
+            if is_datetime64_any_dtype(df[col].dtype):
+                df[col] = df[col].astype(str)
+
+        return df, query_str
+
+    def get_query_str(self, query_obj):
+        """Returns a query as a string
+
+        This is used to be displayed to the user so that she/he can
+        understand what is taking place behind the scene"""
+        logging.debug('query_obj: %s', query_obj)
+        df = self.get_empty_dataframe()
+        df, query_str = self.process_dataframe(df, **query_obj)
+        logging.debug('query_str: %s', query_str)
+        return query_str
+
+    def query(self, query_obj):
+        """Executes the query and returns a dataframe
+
+        query_obj is a dictionary representing Superset's query interface.
+        Should return a ``superset.models.helpers.QueryResult``
+        """
+        logging.debug('query_obj: %s', query_obj)
+        qry_start_dttm = datetime.now()
+        status = QueryStatus.SUCCESS
+        error_message = None
+        df = None
+        query_str = ''
+        try:
+            df = self.get_dataframe()
+            df, query_str = self.process_dataframe(df, **query_obj)
+            logging.debug('query_str: %s', query_str)
+        except Exception as e:
+            status = QueryStatus.FAILED
+            logging.exception(e)
+            error_message = str(e)
+
+        return QueryResult(
+            status=status,
+            df=df,
+            duration=datetime.now() - qry_start_dttm,
+            query=query_str,
+            error_message=error_message)
+
+    def values_for_column(self, column_name, limit=10000):
+        """Given a column, returns an iterable of distinct values
+
+        This is used to populate the dropdown showing a list of
+        values in filters in the explore view"""
+        values = self.get_dataframe()[column_name].unique()
+        if limit:
+            values = values[:limit]
+        return values.tolist()
+
+    def get_metadata(self):
+        """Build the metadata for the table and merge it in"""
+        df = self.get_dataframe()
+
+        any_date_col = None
+        dbcols = (
+            db.session.query(PandasColumn)
+            .filter(PandasColumn.datasource == self)
+            .filter(or_(PandasColumn.column_name == col
+                        for col in df.columns)))
+        dbcols = {dbcol.column_name: dbcol for dbcol in dbcols}
+        for col in df.columns:
+            dbcol = dbcols.get(col, None)
+
+            if not dbcol:
+                dtype = df.dtypes[col].name
+                # Pandas defaults columns where all values are None to a dtype 
of
+                # float with all values as NaN, but if we can't correctly infer
+                # the dtype we are better to assume object so we don't
+                # create large numbers of unwanted Metrics
+                if self.df[col].isnull().all():
+                    dtype = 'object'
+                dbcol = PandasColumn(column_name=str(col), type=dtype)
+                # Only treat `object` as string if we have some data
+                if self.df[col].notnull().any():
+                    dbcol.groupby = dbcol.is_string
+                    dbcol.filterable = dbcol.is_string
+                dbcol.sum = dbcol.is_num
+                dbcol.avg = dbcol.is_num
+                dbcol.min = dbcol.is_num or dbcol.is_dttm
+                dbcol.max = dbcol.is_num or dbcol.is_dttm
+            self.columns.append(dbcol)
+
+            if not any_date_col and dbcol.is_time:
+                any_date_col = col
+
+        # Datasource-level metrics
+        # Note that column-specific metrics are created by 
reconcile_column_metrics
+        # when the column is created.
+        metrics = [
+            PandasMetric(
+                metric_name='count',
+                verbose_name='count',
+                metric_type='count',
+                source=None,
+                expression='count')]
+        dbmetrics = (
+            db.session.query(PandasMetric)
+            .filter(PandasMetric.datasource == self)
+            .filter(or_(PandasMetric.metric_name == metric.metric_name
+                        for metric in metrics)))
+        dbmetrics = {metric.metric_name: metric for metric in dbmetrics}
+        for metric in metrics:
+            if not dbmetrics.get(metric.metric_name, None):
+                metric.pandas_datasource_id = self.id
+                db.session.add(metric)
+
+        if not self.main_dttm_col:
+            self.main_dttm_col = any_date_col
+
+        db.session.merge(self)
+        db.session.commit()
+
+
+def reconcile_column_metrics(mapper, connection, target):
+    """
+    Create or delete PandasMetrics to match the metric attributes
+    specified on a PandasColumn
+    """
+    mtable = PandasMetric.__table__
+    for metric_type in ('sum', 'avg', 'max', 'min', 'count_distinct'):
+        # Set up the metric attributes
+        metric_name = metric_type + '__' + target.column_name
+        verbose_name = metric_name
+        source = target.column_name
+        if metric_type == 'avg':
+            expression = 'mean',
+        elif metric_type == 'count_distinct':
+            expression = 'nunique'
+        else:
+            expression = metric_type
+
+        if getattr(target, metric_type):
+            # Create the metric if it doesn't already exist
+            result = connection.execute(
+                mtable
+                .select()
+                .where(
+                    and_(
+                        mtable.c.pandas_datasource_id == 
target.pandas_datasource_id,
+                        mtable.c.metric_name == metric_name)))
+            if not result.rowcount:
+                connection.execute(
+                    mtable.insert(),
+                    pandas_datasource_id=target.pandas_datasource_id,
+                    metric_name=metric_name,
+                    verbose_name=verbose_name,
+                    source=source,
+                    expression=expression)
+        else:
+            # Delete the metric if it exists and hasn't been customized
+            connection.execute(
+                mtable
+                .delete()
+                .where(
+                    and_(
+                        mtable.c.pandas_datasource_id == 
target.pandas_datasource_id,
+                        mtable.c.metric_name == metric_name,
+                        mtable.c.verbose_name == verbose_name,
+                        mtable.c.source == source,
+                        mtable.c.expression == expression)))
+
+
+def reconcile_metric_column(mapper, connection, target):
+    """
+    Clear the metric attribute on a PandasColumn if the
+    corresponding PandasMetric is deleted
+    """
+    ctable = PandasColumn.__table__
+    try:
+        metric_type, column_name = target.metric_name.split('__', 1)
+        if metric_type in ctable.c:
+            connection.execute(
+                ctable
+                .update()
+                .values(**{metric_type: False})
+                .where(
+                    and_(
+                        ctable.c.pandas_datasource_id == 
target.pandas_datasource_id,
+                        ctable.c.column_name == column_name)))
+    except ValueError:
+        # Metric name doesn't contain __
+        pass
+
+
+sa.event.listen(PandasColumn, 'after_insert', reconcile_column_metrics)
+sa.event.listen(PandasColumn, 'after_update', reconcile_column_metrics)
+sa.event.listen(PandasMetric, 'before_delete', reconcile_metric_column)
+sa.event.listen(PandasDatasource, 'after_insert', set_perm)
+sa.event.listen(PandasDatasource, 'after_update', set_perm)
diff --git a/contrib/connectors/pandas/views.py 
b/contrib/connectors/pandas/views.py
new file mode 100644
index 0000000000..197fc05ee7
--- /dev/null
+++ b/contrib/connectors/pandas/views.py
@@ -0,0 +1,336 @@
+"""Views used by the SqlAlchemy connector"""
+import json
+import logging
+
+from flask import flash, Markup, redirect
+from flask_appbuilder import CompactCRUDMixin, expose
+from flask_appbuilder.fieldwidgets import BS3TextAreaFieldWidget, 
BS3TextFieldWidget
+from flask_appbuilder.models.sqla.interface import SQLAInterface
+from flask_babel import gettext as __
+from flask_babel import lazy_gettext as _
+from past.builtins import basestring
+import sqlalchemy as sa
+from wtforms import SelectField, StringField, validators
+
+from superset import appbuilder, db, security, sm, utils
+from superset.connectors.base.views import DatasourceModelView
+from superset.utils import has_access
+from superset.views.base import (
+    DatasourceFilter, DeleteMixin, get_datasource_exist_error_mgs,
+    ListWidgetWithCheckboxes, SupersetModelView,
+)
+from .models import FORMATS, PandasColumn, PandasDatasource, PandasMetric
+
+
+class ChoiceTypeSelectField(SelectField):
+    """A SelectField based on a ChoiceType model field."""
+
+    def process_data(self, value):
+        """Use the code rather than the str() representation as data"""
+        try:
+            self.data = value.code
+        except AttributeError:
+            super(ChoiceTypeSelectField, self).process_data(value)
+
+
+class JSONField(StringField):
+    """
+    JSON field for WTForms that converts between the form string data
+    and a dictionary representation, with validation
+
+    See https://gist.github.com/dukebody/dcc371bf286534d546e9
+    """
+    def _value(self):
+        return json.dumps(self.data) if self.data else ''
+
+    def process_formdata(self, valuelist):
+        if valuelist:
+            try:
+                self.data = json.loads(valuelist[0])
+            except ValueError:
+                raise ValueError('This field contains invalid JSON')
+        else:
+            self.data = None
+
+    def pre_validate(self, form):
+        super().pre_validate(form)
+        if self.data:
+            try:
+                json.dumps(self.data)
+            except TypeError:
+                raise ValueError('This field contains invalid JSON')
+
+
+class PandasColumnInlineView(CompactCRUDMixin, SupersetModelView):  # noqa
+    datamodel = SQLAInterface(PandasColumn)
+
+    list_title = _('List Columns')
+    show_title = _('Show Column')
+    add_title = _('Add Column')
+    edit_title = _('Edit Column')
+
+    list_widget = ListWidgetWithCheckboxes
+    edit_columns = [
+        'column_name', 'verbose_name', 'description',
+        'type', 'groupby', 'filterable',
+        'datasource', 'count_distinct', 'sum', 'avg', 'min', 'max']
+    add_columns = edit_columns
+    list_columns = [
+        'column_name', 'verbose_name', 'type', 'groupby', 'filterable',
+        'count_distinct', 'sum', 'avg', 'min', 'max']
+    page_size = 500
+    description_columns = {
+        'is_dttm': _(
+            'Whether to make this column available as a '
+            '[Time Granularity] option, column has to be DATETIME or '
+            'DATETIME-like'),
+        'filterable': _(
+            'Whether this column is exposed in the `Filters` section '
+            'of the explore view.'),
+        'type': _(
+            'The data type that was inferred by Pandas. '
+            'It may be necessary to input a type manually for '
+            'expression-defined columns in some cases. In most case '
+            'users should not need to alter this.'),
+    }
+    label_columns = {
+        'column_name': _('Column'),
+        'verbose_name': _('Verbose Name'),
+        'description': _('Description'),
+        'groupby': _('Groupable'),
+        'filterable': _('Filterable'),
+        'datasource': _('Datasource'),
+        'count_distinct': _('Count Distinct'),
+        'sum': _('Sum'),
+        'avg': _('Average'),
+        'min': _('Min'),
+        'max': _('Max'),
+        'type': _('Type'),
+    }
+
+
+appbuilder.add_view_no_menu(PandasColumnInlineView)
+
+
+class PandasMetricInlineView(CompactCRUDMixin, SupersetModelView, 
DeleteMixin):  # noqa
+    datamodel = SQLAInterface(PandasMetric)
+
+    list_title = _('List Metrics')
+    show_title = _('Show Metric')
+    add_title = _('Add Metric')
+    edit_title = _('Edit Metric')
+
+    list_columns = ['metric_name', 'verbose_name', 'metric_type']
+    edit_columns = [
+        'metric_name', 'description', 'verbose_name', 'metric_type',
+        'source', 'expression', 'datasource', 'd3format', 'is_restricted',
+        'warning_text']
+    description_columns = {
+        'source': utils.markdown(
+            'a comma-separated list of column(s) used to calculate '
+            ' the metric. Example: `claim_amount`', True),
+        'expression': utils.markdown(
+            'a valid Pandas expression as supported by the underlying '
+            'backend. Example: `count()`', True),
+        'is_restricted': _('Whether the access to this metric is restricted '
+                           'to certain roles. Only roles with the permission '
+                           "'metric access on XXX (the name of this metric)' "
+                           'are allowed to access this metric'),
+        'd3format': utils.markdown(
+            'd3 formatting string as defined [here]'
+            '(https://github.com/d3/d3-format/blob/master/README.md#format). '
+            'For instance, this default formatting applies in the Table '
+            'visualization and allow for different metric to use different '
+            'formats', True,
+        ),
+    }
+    add_columns = edit_columns
+    page_size = 500
+    label_columns = {
+        'metric_name': _('Metric'),
+        'description': _('Description'),
+        'verbose_name': _('Verbose Name'),
+        'metric_type': _('Type'),
+        'source': _('Pandas Source Columns'),
+        'expression': _('Pandas Expression'),
+        'datasource': _('Datasource'),
+        'd3format': _('D3 Format'),
+        'is_restricted': _('Is Restricted'),
+        'warning_text': _('Warning Message'),
+    }
+
+    def post_add(self, metric):
+        if metric.is_restricted:
+            security.merge_perm(sm, 'metric_access', metric.get_perm())
+
+    def post_update(self, metric):
+        if metric.is_restricted:
+            security.merge_perm(sm, 'metric_access', metric.get_perm())
+
+
+appbuilder.add_view_no_menu(PandasMetricInlineView)
+
+
+class PandasDatasourceModelView(DatasourceModelView, DeleteMixin):  # noqa
+    datamodel = SQLAInterface(PandasDatasource)
+
+    list_title = _('List File Datasources')
+    show_title = _('Show File Datasource')
+    add_title = _('Add File Datasource')
+    edit_title = _('Edit File Datasource')
+
+    list_columns = [
+        'link', 'changed_by_', 'modified']
+    order_columns = [
+        'link', 'changed_on_']
+    add_columns = ['name', 'source_url', 'source_auth', 'source_parameters',
+                   'format', 'additional_parameters']
+    add_form_extra_fields = {
+        'source_auth': JSONField(
+            _('Source Credentials'),
+            [validators.optional(), validators.length(max=100)],
+            widget=BS3TextFieldWidget(),
+            description=(
+                'Credentials required to access the raw data, if required. '
+                'Can be either a username and password in the form '
+                "'[\"username\", \"password\"]' which will be authenticated "
+                'using HTTP Basic Auth, or a string which will be used as '
+                'an Authorization header')),
+        'source_parameters': JSONField(
+            _('Additional Query Parameters'),
+            [validators.optional(), validators.length(max=500)],
+            widget=BS3TextAreaFieldWidget(),
+            description=(
+                'A JSON-formatted dictionary of additional parameters '
+                'used to request the remote file')),
+        'format': ChoiceTypeSelectField(_('Format'), choices=FORMATS),
+        'additional_parameters': JSONField(
+            _('Additional Read Parameters'),
+            [validators.optional(), validators.length(max=500)],
+            widget=BS3TextAreaFieldWidget(),
+            description=(
+                'A JSON-formatted dictionary of additional parameters '
+                'passed to the Pandas read function')),
+    }
+    edit_columns = [
+        'name', 'source_url', 'source_auth', 'source_parameters',
+        'format', 'additional_parameters',
+        'filter_select_enabled', 'slices',
+        'fetch_values_predicate',
+        'description', 'owner',
+        'main_dttm_col', 'default_endpoint', 'offset', 'cache_timeout']
+    edit_form_extra_fields = add_form_extra_fields
+    show_columns = edit_columns + ['perm']
+    related_views = [PandasColumnInlineView, PandasMetricInlineView]
+    base_order = ('changed_on', 'desc')
+    search_columns = (
+        'owner', 'name', 'source_url',
+    )
+    description_columns = {
+        'slices': _(
+            'The list of slices associated with this datasource. By '
+            'altering this datasource, you may change how these associated '
+            'slices behave. '
+            'Also note that slices need to point to a datasource, so '
+            'this form will fail at saving if removing slices from a '
+            'datasource. If you want to change the datasource for a slice, '
+            "overwrite the slice from the 'explore view'"),
+        'offset': _('Timezone offset (in hours) for this datasource'),
+        'name': _(
+            'The name of this datasource'),
+        'source_url': _(
+            'The URL used to access the raw data'),
+        'format': _(
+            'The format of the raw data, e.g. csv'),
+        'description': Markup(
+            "Supports <a href='https://daringfireball.net/projects/markdown/'>"
+            'markdown</a>'),
+        'fetch_values_predicate': _(
+            'Predicate applied when fetching distinct value to '
+            'populate the filter control component. Supports '
+            'jinja template syntax. Applies only when '
+            '`Enable Filter Select` is on.'),
+        'default_endpoint': _(
+            'Redirects to this endpoint when clicking on the datasource '
+            'from the datasource list'),
+        'filter_select_enabled': _(
+            "Whether to populate the filter's dropdown in the explore "
+            "view's filter section with a list of distinct values fetched "
+            'from the backend on the fly'),
+    }
+    base_filters = [['id', DatasourceFilter, lambda: []]]
+    label_columns = {
+        'slices': _('Associated Slices'),
+        'link': _('Datasource'),
+        'changed_by_': _('Changed By'),
+        'changed_on_': _('Last Changed'),
+        'filter_select_enabled': _('Enable Filter Select'),
+        'default_endpoint': _('Default Endpoint'),
+        'offset': _('Offset'),
+        'cache_timeout': _('Cache Timeout'),
+        'name': _('Name'),
+        'source_url': _('Source URL'),
+        'format': _('Format'),
+        'fetch_values_predicate': _('Fetch Values Predicate'),
+        'owner': _('Owner'),
+        'main_dttm_col': _('Main Datetime Column'),
+        'description': _('Description'),
+    }
+
+    def pre_add(self, datasource):
+        number_of_existing_datasources = (
+            db.session
+              .query(sa.func.count('*'))
+              .filter(PandasDatasource.source_url == datasource.source_url)
+              .scalar())
+        # datasource object is already added to the session
+        if number_of_existing_datasources > 1:
+            raise Exception(
+                get_datasource_exist_error_mgs(datasource.full_name))
+
+        # Fail before adding if the datasource can't be found
+        try:
+            datasource.get_dataframe()
+        except Exception as e:
+            logging.exception(e)
+            raise Exception(_(
+                'File [{}] could not be read, '
+                'please double check the '
+                'Source URL and Read Method').format(datasource.name))
+
+    def post_add(self, datasource, flash_message=True):
+        datasource.get_metadata()
+        security.merge_perm(sm, 'datasource_access', datasource.get_perm())
+
+        if flash_message:
+            flash(_(
+                'The datasource was created. '
+                'As part of this two phase configuration '
+                'process, you should now click the edit button by '
+                'the new datasource to configure it.'), 'info')
+
+    def post_update(self, datasource):
+        self.post_add(datasource, flash_message=False)
+
+    def _delete(self, pk):
+        DeleteMixin._delete(self, pk)
+
+    @expose('/edit/<pk>', methods=['GET', 'POST'])
+    @has_access
+    def edit(self, pk):
+        """Simple hack to redirect to explore view after saving"""
+        resp = super(PandasDatasourceModelView, self).edit(pk)
+        if isinstance(resp, basestring):
+            return resp
+        return redirect('/superset/explore/pandas/{}/'.format(pk))
+
+
+appbuilder.add_view(
+    PandasDatasourceModelView,
+    'File Datasources',
+    label=__('File Datasources'),
+    category='Sources',
+    category_label=__('Sources'),
+    icon='fa-file',)
+
+appbuilder.add_separator('Sources')
diff --git a/contrib/migrations/__init__.py b/contrib/migrations/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/contrib/migrations/versions/__init__.py 
b/contrib/migrations/versions/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/contrib/tests/__init__.py b/contrib/tests/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/contrib/tests/cache_tests.py b/contrib/tests/cache_tests.py
new file mode 100644
index 0000000000..5039339e50
--- /dev/null
+++ b/contrib/tests/cache_tests.py
@@ -0,0 +1,142 @@
+"""Unit tests for DataFrameCache"""
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+from __future__ import unicode_literals
+
+import datetime
+import os
+import shutil
+import tempfile
+import time
+
+import pandas as pd
+from pandas.testing import assert_frame_equal
+from tests.base_tests import SupersetTestCase
+
+from ..cache.dataframe import DataFrameCache
+
+
+class DataFrameCacheTestCase(SupersetTestCase):
+
+    def setUp(self):
+        self.cache = DataFrameCache(cache_dir=tempfile.mkdtemp())
+
+    def tearDown(self):
+        shutil.rmtree(self.cache._path)
+
+    def get_df(self, key):
+        return pd.DataFrame({'one': pd.Series([1, 2, 3]),
+                             key: pd.Series([1, 2, 3, 4])})
+
+    def test_get_dict(self):
+        a = self.get_df('a')
+        b = self.get_df('b')
+        assert self.cache.set('a', a)
+        assert self.cache.set('b', b)
+        d = self.cache.get_dict('a', 'b')
+        assert 'a' in d
+        assert_frame_equal(a, d['a'])
+        assert_frame_equal(b, d['b'])
+
+    def test_set_get(self):
+        for i in range(3):
+            assert self.cache.set(str(i), self.get_df(str(i)))
+
+        for i in range(3):
+            assert_frame_equal(self.cache.get(str(i)), self.get_df(str(i)))
+
+    def test_get_set(self):
+        assert self.cache.set('foo', self.get_df('bar'))
+        assert_frame_equal(self.cache.get('foo'), self.get_df('bar'))
+
+    def test_get_many(self):
+        assert self.cache.set('foo', self.get_df('bar'))
+        assert self.cache.set('spam', self.get_df('eggs'))
+        result = list(self.cache.get_many('foo', 'spam'))
+        assert_frame_equal(result[0], self.get_df('bar'))
+        assert_frame_equal(result[1], self.get_df('eggs'))
+
+    def test_set_many(self):
+        assert self.cache.set_many({'foo': self.get_df('bar'),
+                                    'spam': self.get_df('eggs')})
+        assert_frame_equal(self.cache.get('foo'), self.get_df('bar'))
+        assert_frame_equal(self.cache.get('spam'), self.get_df('eggs'))
+
+    def test_add(self):
+        # sanity check that add() works like set()
+        assert self.cache.add('foo', self.get_df('bar'))
+        assert_frame_equal(self.cache.get('foo'), self.get_df('bar'))
+        assert not self.cache.add('foo', self.get_df('qux'))
+        assert_frame_equal(self.cache.get('foo'), self.get_df('bar'))
+
+    def test_delete(self):
+        assert self.cache.add('foo', self.get_df('bar'))
+        assert_frame_equal(self.cache.get('foo'), self.get_df('bar'))
+        assert self.cache.delete('foo')
+        assert self.cache.get('foo') is None
+
+    def test_delete_many(self):
+        assert self.cache.add('foo', self.get_df('bar'))
+        assert self.cache.add('spam', self.get_df('eggs'))
+        assert self.cache.delete_many('foo', 'spam')
+        assert self.cache.get('foo') is None
+        assert self.cache.get('spam') is None
+
+    def test_timeout(self):
+        self.cache.set('foo', self.get_df('bar'), 0)
+        assert_frame_equal(self.cache.get('foo'), self.get_df('bar'))
+        self.cache.set('baz', self.get_df('qux'), 1)
+        assert_frame_equal(self.cache.get('baz'), self.get_df('qux'))
+        time.sleep(3)
+        # timeout of zero means no timeout
+        assert_frame_equal(self.cache.get('foo'), self.get_df('bar'))
+        assert self.cache.get('baz') is None
+
+    def test_has(self):
+        assert self.cache.has('foo') in (False, 0)
+        assert self.cache.has('spam') in (False, 0)
+        assert self.cache.set('foo', self.get_df('bar'))
+        assert self.cache.has('foo') in (True, 1)
+        assert self.cache.has('spam') in (False, 0)
+        self.cache.delete('foo')
+        assert self.cache.has('foo') in (False, 0)
+        assert self.cache.has('spam') in (False, 0)
+
+    def test_prune(self):
+        THRESHOLD = 13
+        c = DataFrameCache(cache_dir=tempfile.mkdtemp(),
+                           threshold=THRESHOLD)
+
+        for i in range(2 * THRESHOLD):
+            assert c.set(str(i), self.get_df(str(i)))
+
+        cache_files = os.listdir(c._path)
+        shutil.rmtree(c._path)
+
+        # There will be a small .expires file for every cached file
+        assert len(cache_files) <= THRESHOLD * 2
+
+    def test_clear(self):
+        cache_files = os.listdir(self.cache._path)
+        assert self.cache.set('foo', self.get_df('bar'))
+        cache_files = os.listdir(self.cache._path)
+        # There will be a small .expires file for every cached file
+        assert len(cache_files) == 2
+        assert self.cache.clear()
+        cache_files = os.listdir(self.cache._path)
+        assert len(cache_files) == 0
+
+    def test_non_feather_format(self):
+        # The Feather on-disk format isn't indexed and doesn't handle
+        # Object-type columns with non-homogeneous data
+        # See:
+        # - https://github.com/wesm/feather/tree/master/python#limitations
+        # - https://github.com/wesm/feather/issues/200
+        now = datetime.datetime.now
+        df = pd.DataFrame({'one': pd.Series([1, 2, 3], index=['a', 'b', 'c']),
+                           'two': pd.Series([1, 'string', now(), 4],
+                                            index=['a', 'b', 'c', 'd'])})
+
+        assert self.cache.set('foo', df)
+        assert_frame_equal(self.cache.get('foo'), df)
diff --git a/contrib/tests/connector_tests.py b/contrib/tests/connector_tests.py
new file mode 100644
index 0000000000..682e1ef005
--- /dev/null
+++ b/contrib/tests/connector_tests.py
@@ -0,0 +1,1047 @@
+"""Unit tests for Superset"""
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+from __future__ import unicode_literals
+
+from collections import OrderedDict
+import datetime
+import io
+import re
+import unittest
+
+import markdown
+import pandas as pd
+from pandas.testing import assert_frame_equal
+from sqlalchemy import Date
+from tests.base_tests import SupersetTestCase
+
+from superset.connectors.sqla.models import SqlaTable, SqlMetric, TableColumn
+from superset.models.core import Database
+from superset.models.helpers import QueryResult
+from superset.utils import QueryStatus
+from ..connectors.pandas.models import (
+    PandasColumn, PandasDatasource, PandasMetric)
+
+
+class BaseConnectorTestCase(SupersetTestCase):
+    """
+    Standard tests for Connectors, especially for .query()
+
+    Connector-specific subclasses should override, `setUp()`
+    to create a Datasource with the appropriate Columns and
+    Metrics, and store a copy of the raw
+    data in the `self.df` attribute
+    """
+
+    def __init__(self, methodName='runTest'):
+        """Setup assertEqual for DataFrames"""
+        super(BaseConnectorTestCase, self).__init__(methodName)
+        self.addTypeEqualityFunc(pd.DataFrame, 'assertFrameEqual')
+
+    @classmethod
+    def setUpClass(cls):
+        if cls is BaseConnectorTestCase:
+            raise unittest.SkipTest('Skip tests from BaseConnectorTestCase')
+        super(BaseConnectorTestCase, cls).setUpClass()
+
+    data = """
+        | region   | district   | project   | received            | value  | 
value2 | category  |
+        
|----------|------------|-----------|---------------------|--------|--------|-----------|
+        | Region 1 | District A | Project A | 2001-01-31 10:00:00 | 33     |  
12.30 | CategoryA |
+        | Region 1 | District A | Project A | 2001-01-31 12:00:00 | 32     |  
13.60 | CategoryB |
+        | Region 1 | District B | Project B | 2001-01-31 13:00:00 | 35     |  
15.50 | CategoryA |
+        | Region 2 | District C | Project C | 2001-01-31 09:00:00 | 12     |  
17.80 | CategoryB |
+        | Region 1 | District A | Project A | 2001-02-28 09:00:00 | 66     |  
11.30 | CategoryB |
+        | Region 1 | District B | Project B | 2001-02-28 08:00:00 | 15     |  
19.90 | CategoryB |
+        | Region 1 | District B | Project B | 2001-02-28 10:00:00 | 25     |  
15.30 | CategoryA |
+        | Region 2 | District C | Project C | 2001-02-28 08:00:00 | 18     |  
13.80 | CategoryA |
+        | Region 1 | District A | Project A | 2001-03-31 11:00:00 | 85     |  
45.10 | CategoryB |
+        | Region 1 | District B | Project B | 2001-03-31 12:00:00 |  5     |  
28.10 | CategoryA |
+        | Region 2 | District C | Project C | 2001-03-31 14:00:00 | 35     |  
22.60 | CategoryB |
+        | Region 1 | District A | Project A | 2001-04-30 10:00:00 | 15     |  
11.00 | CategoryA |
+        | Region 1 | District A | Project A | 2001-04-30 12:00:00 | 15     |  
16.10 | CategoryB |
+        | Region 2 | District C | Project C | 2001-04-30 13:00:00 | 15     |  
18.50 | CategoryA |
+        """  # NOQA: E501
+
+    def assertFrameEqual(self, frame1, frame2, msg=None):
+        # We don't care about the index, because it is
+        # not part of the data returned to the client
+        return assert_frame_equal(frame1.reset_index(drop=True),
+                                  frame2.reset_index(drop=True),
+                                  msg)
+
+    def setUp(self):
+        # Create a copy of the raw data as a dataframe to be
+        # used for calculated expected results
+        self.df = self.md_to_df(self.data)
+        self.df['received'] = 
self.df['received'].values.astype('datetime64[D]')
+
+    def md_to_html(self, md_raw):
+        """
+        Convert a possibly-indented Markdown-table to a file-like object
+        that can be parsed by pd.read_html().
+
+        See 
https://github.com/XLSForm/pyxform/blob/master/pyxform/tests_v1/pyxform_test_case.py
 # NOQA
+        for the inspiration
+        """
+        _md = []
+        for line in md_raw.split('\n'):
+            if re.match(r'^\s+\#', line):
+                # ignore lines which start with pound sign
+                continue
+            elif re.match(r'^(.*)(\#[^\|]+)$', line):
+                # keep everything before the # outside of the last occurrence
+                # of |
+                _md.append(
+                    re.match(r'^(.*)(\#[^\|]+)$', line).groups()[0].strip())
+            else:
+                _md.append(line.strip())
+        md = '\n'.join(_md)
+        return io.StringIO(
+            markdown.markdown(md, extensions=['markdown.extensions.tables']))
+
+    def md_to_df(self, md_raw):
+        """
+        Convert a possibly-indented Markdown-table to a Pandas DataFrame
+        """
+        return pd.read_html(self.md_to_html(md_raw))[0]
+
+    def test_values_for_column(self):
+        result = self.datasource.values_for_column('project')
+        self.assertEqual(result, self.df['project'].unique().tolist())
+
+    def test_values_for_column_with_limit(self):
+        result = self.datasource.values_for_column('project', 1)
+        self.assertEqual(result, self.df['project'].unique()[:1].tolist())
+
+    def test_get_query_str(self):
+        p = {
+            'groupby': ['project'],
+            'metrics': ['sum__value'],
+            'granularity': 'received',
+            'from_dttm': datetime.datetime(2001, 1, 1),
+            'to_dttm': datetime.datetime(2001, 12, 31),
+            'filter': [],
+            'is_timeseries': False,
+            'timeseries_limit': 0,
+            'timeseries_limit_metric': None,
+            'row_limit': 5000,
+            'extras': {
+                'time_grain_sqla': None,
+            },
+        }
+        result = self.datasource.get_query_str(p)
+        self.assertIn('project', result)
+
+    def test_summary_single_metric(self):
+        p = {
+            'groupby': [],
+            'metrics': ['sum__value'],
+            'granularity': 'received',
+            'from_dttm': datetime.datetime(2001, 1, 1),
+            'to_dttm': datetime.datetime(2001, 12, 31),
+            'filter': [],
+            'is_timeseries': False,
+            'timeseries_limit': 0,
+            'timeseries_limit_metric': None,
+            'row_limit': 5000,
+            'extras': {
+                'time_grain_sqla': None,
+            },
+        }
+        result = self.datasource.query(p)
+        self.assertIsInstance(result, QueryResult)
+        self.assertEqual(result.error_message, None)
+        self.assertEqual(result.status, QueryStatus.SUCCESS)
+        expected_df = pd.DataFrame({'sum__value': [self.df['value'].sum()]})
+        self.assertEqual(result.df, expected_df)
+
+    def test_summary_multiple_metrics(self):
+        p = {
+            'groupby': [],
+            'metrics': ['sum__value', 'avg__value', 'value_percentage', 
'ratio'],
+            'granularity': 'received',
+            'from_dttm': datetime.datetime(2001, 1, 1),
+            'to_dttm': datetime.datetime(2001, 12, 31),
+            'filter': [],
+            'is_timeseries': False,
+            'timeseries_limit': 0,
+            'timeseries_limit_metric': None,
+            'row_limit': 5000,
+            'extras': {
+                'time_grain_sqla': None,
+            },
+        }
+        self.df['ratio'] = self.df['value'] / self.df['value2']
+        result = self.datasource.query(p)
+        self.assertIsInstance(result, QueryResult)
+        self.assertEqual(result.error_message, None)
+        self.assertEqual(result.status, QueryStatus.SUCCESS)
+        expected_df = pd.DataFrame(OrderedDict([
+            ('sum__value', [self.df['value'].sum()]),
+            ('avg__value', [self.df['value'].mean()]),
+            ('value_percentage', [sum(self.df['value']) /
+                                  sum(self.df['value'] + self.df['value2'])]),
+            ('ratio', [self.df['ratio'].mean()]),
+        ]))
+        self.assertEqual(result.df, expected_df)
+
+    def test_from_to_dttm(self):
+        p = {
+            'groupby': [],
+            'metrics': ['sum__value'],
+            'granularity': 'received',
+            'from_dttm': datetime.datetime(2001, 3, 1),
+            'to_dttm': datetime.datetime(2001, 12, 31),
+            'filter': [],
+            'is_timeseries': False,
+            'timeseries_limit': 0,
+            'timeseries_limit_metric': None,
+            'row_limit': 5000,
+            'extras': {
+                'time_grain_sqla': None,
+            },
+        }
+        result = self.datasource.query(p)
+        self.assertIsInstance(result, QueryResult)
+        self.assertEqual(result.error_message, None)
+        self.assertEqual(result.status, QueryStatus.SUCCESS)
+        df = self.df.loc[self.df['received'] > datetime.datetime(2001, 3, 1)]
+        expected_df = pd.DataFrame({'sum__value': [df['value'].sum()]})
+        self.assertEqual(result.df, expected_df)
+
+    def test_filter_eq_string(self):
+        p = {
+            'groupby': ['project', 'region'],
+            'metrics': ['sum__value'],
+            'granularity': 'received',
+            'from_dttm': datetime.datetime(2001, 1, 1),
+            'to_dttm': datetime.datetime(2001, 12, 31),
+            'filter': [
+                {'col': 'district', 'val': 'District A', 'op': '=='},
+            ],
+            'is_timeseries': False,
+            'timeseries_limit': 0,
+            'timeseries_limit_metric': None,
+            'row_limit': 5000,
+            'extras': {
+                'time_grain_sqla': None,
+            },
+        }
+        result = self.datasource.query(p)
+        self.assertIsInstance(result, QueryResult)
+        self.assertEqual(result.error_message, None)
+        self.assertEqual(result.status, QueryStatus.SUCCESS)
+        expected_df = (self.df
+                           .loc[self.df['district'] == 'District A']
+                           .groupby(p['groupby'])
+                           .aggregate({'value': ['sum']})
+                           .reset_index())
+        expected_df.columns = p['groupby'] + p['metrics']
+        expected_df = expected_df.sort_values(['sum__value'], ascending=False)
+        self.assertEqual(result.df, expected_df)
+
+    def test_filter_eq_num(self):
+        p = {
+            'groupby': ['project', 'region'],
+            'metrics': ['sum__value'],
+            'granularity': 'received',
+            'from_dttm': datetime.datetime(2001, 1, 1),
+            'to_dttm': datetime.datetime(2001, 12, 31),
+            'filter': [
+                {'col': 'value', 'val': '85', 'op': '=='},
+            ],
+            'is_timeseries': False,
+            'timeseries_limit': 0,
+            'timeseries_limit_metric': None,
+            'row_limit': 5000,
+            'extras': {
+                'time_grain_sqla': None,
+            },
+        }
+        result = self.datasource.query(p)
+        self.assertIsInstance(result, QueryResult)
+        self.assertEqual(result.error_message, None)
+        self.assertEqual(result.status, QueryStatus.SUCCESS)
+        expected_df = (self.df
+                           .loc[self.df['value'] == 85]
+                           .groupby(p['groupby'])
+                           .aggregate({'value': ['sum']})
+                           .reset_index())
+        expected_df.columns = p['groupby'] + p['metrics']
+        expected_df = expected_df.sort_values(['sum__value'], ascending=False)
+        self.assertEqual(result.df, expected_df)
+
+    def test_filter_eq_date(self):
+        p = {
+            'groupby': ['project', 'region'],
+            'metrics': ['sum__value'],
+            'granularity': 'received',
+            'from_dttm': datetime.datetime(2001, 1, 1),
+            'to_dttm': datetime.datetime(2001, 12, 31),
+            'filter': [
+                {'col': 'received', 'val': '2001-02-28', 'op': '=='},
+            ],
+            'is_timeseries': False,
+            'timeseries_limit': 0,
+            'timeseries_limit_metric': None,
+            'row_limit': 5000,
+            'extras': {
+                'time_grain_sqla': None,
+            },
+        }
+        result = self.datasource.query(p)
+        self.assertIsInstance(result, QueryResult)
+        self.assertEqual(result.error_message, None)
+        self.assertEqual(result.status, QueryStatus.SUCCESS)
+        expected_df = (self.df
+                           .loc[self.df['received'] == datetime.datetime(2001, 
2, 28)]
+                           .groupby(p['groupby'])
+                           .aggregate({'value': ['sum']})
+                           .reset_index())
+        expected_df.columns = p['groupby'] + p['metrics']
+        expected_df = expected_df.sort_values(['sum__value'], ascending=False)
+        self.assertEqual(result.df, expected_df)
+
+    def test_filter_gte(self):
+        p = {
+            'groupby': ['project', 'region'],
+            'metrics': ['sum__value'],
+            'granularity': 'received',
+            'from_dttm': datetime.datetime(2001, 1, 1),
+            'to_dttm': datetime.datetime(2001, 12, 31),
+            'filter': [
+                {'col': 'value', 'val': '70', 'op': '>='},
+            ],
+            'is_timeseries': False,
+            'timeseries_limit': 0,
+            'timeseries_limit_metric': None,
+            'row_limit': 5000,
+            'extras': {
+                'time_grain_sqla': None,
+            },
+        }
+        result = self.datasource.query(p)
+        self.assertIsInstance(result, QueryResult)
+        self.assertEqual(result.error_message, None)
+        self.assertEqual(result.status, QueryStatus.SUCCESS)
+        expected_df = (self.df
+                           .loc[self.df['value'] >= 70]
+                           .groupby(p['groupby'])
+                           .aggregate({'value': ['sum']})
+                           .reset_index())
+        expected_df.columns = p['groupby'] + p['metrics']
+        expected_df = expected_df.sort_values(['sum__value'], ascending=False)
+        self.assertEqual(result.df, expected_df)
+
+    def test_filter_in_num(self):
+        p = {
+            'groupby': ['project', 'region'],
+            'metrics': ['sum__value'],
+            'granularity': 'received',
+            'from_dttm': datetime.datetime(2001, 1, 1),
+            'to_dttm': datetime.datetime(2001, 12, 31),
+            'filter': [
+                {'col': 'value', 'val': ['32', '35'], 'op': 'in'},
+            ],
+            'is_timeseries': False,
+            'timeseries_limit': 0,
+            'timeseries_limit_metric': None,
+            'row_limit': 5000,
+            'extras': {
+                'time_grain_sqla': None,
+            },
+        }
+        result = self.datasource.query(p)
+        self.assertIsInstance(result, QueryResult)
+        self.assertEqual(result.error_message, None)
+        self.assertEqual(result.status, QueryStatus.SUCCESS)
+        expected_df = (self.df
+                           .loc[~((self.df['value'] != 32) &
+                                  (self.df['value'] != 35))]
+                           .groupby(p['groupby'])
+                           .aggregate({'value': ['sum']})
+                           .reset_index())
+        expected_df.columns = p['groupby'] + p['metrics']
+        expected_df = (expected_df.sort_values(['sum__value'], ascending=False)
+                                  .reset_index(drop=True))
+        self.assertEqual(result.df, expected_df)
+
+    def test_filter_in_str(self):
+        p = {
+            'groupby': ['project', 'region'],
+            'metrics': ['sum__value'],
+            'granularity': 'received',
+            'from_dttm': datetime.datetime(2001, 1, 1),
+            'to_dttm': datetime.datetime(2001, 12, 31),
+            'filter': [
+                {'col': 'project', 'val': ['Project A', 'Project C'], 'op': 
'in'},
+            ],
+            'is_timeseries': False,
+            'timeseries_limit': 0,
+            'timeseries_limit_metric': None,
+            'row_limit': 5000,
+            'extras': {
+                'time_grain_sqla': None,
+            },
+        }
+        result = self.datasource.query(p)
+        self.assertIsInstance(result, QueryResult)
+        self.assertEqual(result.error_message, None)
+        self.assertEqual(result.status, QueryStatus.SUCCESS)
+        expected_df = (self.df
+                           .loc[~((self.df['project'] != 'Project A') &
+                                  (self.df['project'] != 'Project C'))]
+                           .groupby(p['groupby'])
+                           .aggregate({'value': ['sum']})
+                           .reset_index())
+        expected_df.columns = p['groupby'] + p['metrics']
+        expected_df = (expected_df.sort_values(['sum__value'], ascending=False)
+                                  .reset_index(drop=True))
+        self.assertEqual(result.df, expected_df)
+
+    def test_filter_not_in_num(self):
+        p = {
+            'groupby': ['project', 'region'],
+            'metrics': ['sum__value'],
+            'granularity': 'received',
+            'from_dttm': datetime.datetime(2001, 1, 1),
+            'to_dttm': datetime.datetime(2001, 12, 31),
+            'filter': [
+                {'col': 'value', 'val': ['32', '35'], 'op': 'not in'},
+            ],
+            'is_timeseries': False,
+            'timeseries_limit': 0,
+            'timeseries_limit_metric': None,
+            'row_limit': 5000,
+            'extras': {
+                'time_grain_sqla': None,
+            },
+        }
+        result = self.datasource.query(p)
+        self.assertIsInstance(result, QueryResult)
+        self.assertEqual(result.error_message, None)
+        self.assertEqual(result.status, QueryStatus.SUCCESS)
+        expected_df = (self.df
+                           .loc[((self.df['value'] != 32) &
+                                 (self.df['value'] != 35))]
+                           .groupby(p['groupby'])
+                           .aggregate({'value': ['sum']})
+                           .reset_index())
+        expected_df.columns = p['groupby'] + p['metrics']
+        expected_df = (expected_df.sort_values(['sum__value'], ascending=False)
+                                  .reset_index(drop=True))
+        self.assertEqual(result.df, expected_df)
+
+    def test_filter_not_in_str(self):
+        p = {
+            'groupby': ['project', 'region'],
+            'metrics': ['sum__value'],
+            'granularity': 'received',
+            'from_dttm': datetime.datetime(2001, 1, 1),
+            'to_dttm': datetime.datetime(2001, 12, 31),
+            'filter': [
+                {'col': 'project', 'val': ['Project A', 'Project C'], 'op': 
'not in'},
+            ],
+            'is_timeseries': False,
+            'timeseries_limit': 0,
+            'timeseries_limit_metric': None,
+            'row_limit': 5000,
+            'extras': {
+                'time_grain_sqla': None,
+            },
+        }
+        result = self.datasource.query(p)
+        self.assertIsInstance(result, QueryResult)
+        self.assertEqual(result.error_message, None)
+        self.assertEqual(result.status, QueryStatus.SUCCESS)
+        expected_df = (self.df
+                           .loc[((self.df['project'] != 'Project A') &
+                                 (self.df['project'] != 'Project C'))]
+                           .groupby(p['groupby'])
+                           .aggregate({'value': ['sum']})
+                           .reset_index())
+        expected_df.columns = p['groupby'] + p['metrics']
+        expected_df = expected_df.sort_values(['sum__value'], ascending=False)
+        self.assertEqual(result.df, expected_df)
+
+    def test_columns_only(self):
+        p = {
+            'groupby': [],
+            'metrics': [],
+            'granularity': 'received',
+            'from_dttm': datetime.datetime(2001, 1, 1),
+            'to_dttm': datetime.datetime(2001, 12, 31),
+            'filter': [],
+            'is_timeseries': False,
+            'timeseries_limit': 0,
+            'timeseries_limit_metric': None,
+            'row_limit': 5000,
+            'extras': {
+                'time_grain_sqla': None,
+            },
+            'columns': ['project', 'region', 'received', 'value'],
+        }
+        result = self.datasource.query(p)
+        self.assertIsInstance(result, QueryResult)
+        self.assertEqual(result.error_message, None)
+        self.assertEqual(result.status, QueryStatus.SUCCESS)
+        expected_df = self.df[p['columns']].copy()
+        expected_df['received'] = expected_df['received'].astype(str)
+        self.assertEqual(result.df, expected_df)
+
+    def test_orderby_with_columns(self):
+        p = {
+            'groupby': [],
+            'metrics': [],
+            'granularity': 'received',
+            'from_dttm': datetime.datetime(2001, 1, 1),
+            'to_dttm': datetime.datetime(2001, 12, 31),
+            'filter': [],
+            'is_timeseries': False,
+            'timeseries_limit': 0,
+            'timeseries_limit_metric': None,
+            'row_limit': 5000,
+            'extras': {
+                'time_grain_sqla': None,
+            },
+            'columns': ['project', 'region', 'received', 'value'],
+            'orderby': [
+                ['project', False],
+                ['region', True],
+            ],
+        }
+        result = self.datasource.query(p)
+        self.assertIsInstance(result, QueryResult)
+        self.assertEqual(result.error_message, None)
+        self.assertEqual(result.status, QueryStatus.SUCCESS)
+        expected_df = (self.df
+                           .sort_values(['project', 'region'],
+                                        ascending=[False, True])
+                           .reset_index(drop=True)
+                       [p['columns']])
+        expected_df['received'] = expected_df['received'].astype(str)
+        self.assertEqual(result.df, expected_df)
+
+    def test_groupby_only(self):
+        p = {
+            'groupby': ['project', 'region'],
+            'metrics': [],
+            'granularity': 'received',
+            'from_dttm': datetime.datetime(2001, 1, 1),
+            'to_dttm': datetime.datetime(2001, 12, 31),
+            'filter': [],
+            'is_timeseries': False,
+            'timeseries_limit': 0,
+            'timeseries_limit_metric': None,
+            'row_limit': 5000,
+            'extras': {
+                'time_grain_sqla': None,
+            },
+        }
+        result = self.datasource.query(p)
+        self.assertIsInstance(result, QueryResult)
+        self.assertEqual(result.error_message, None)
+        self.assertEqual(result.status, QueryStatus.SUCCESS)
+        expected_df = (self.df.groupby(p['groupby'])
+                              .size()
+                              .reset_index()
+                              .sort_values([0], ascending=False)
+                              .drop(0, axis=1))
+        self.assertEqual(result.df, expected_df)
+
+    def test_groupby_single_metric(self):
+        p = {
+            'groupby': ['project', 'region'],
+            'metrics': ['sum__value'],
+            'granularity': 'received',
+            'from_dttm': datetime.datetime(2001, 1, 1),
+            'to_dttm': datetime.datetime(2001, 12, 31),
+            'filter': [],
+            'is_timeseries': False,
+            'timeseries_limit': 0,
+            'timeseries_limit_metric': None,
+            'row_limit': 5000,
+            'extras': {
+                'time_grain_sqla': None,
+            },
+        }
+        result = self.datasource.query(p)
+        self.assertIsInstance(result, QueryResult)
+        self.assertEqual(result.error_message, None)
+        self.assertEqual(result.status, QueryStatus.SUCCESS)
+        expected_df = (self.df.groupby(p['groupby'])['value']
+                              .sum()
+                              .reset_index()
+                              .sort_values(['value'], ascending=False))
+        expected_df.columns = p['groupby'] + p['metrics']
+        self.assertEqual(result.df, expected_df)
+
+    def test_groupby_multiple_metrics(self):
+        p = {
+            'groupby': ['project', 'region'],
+            'metrics': ['sum__value', 'avg__value', 'value_percentage', 
'ratio'],
+            'granularity': 'received',
+            'from_dttm': datetime.datetime(2001, 1, 1),
+            'to_dttm': datetime.datetime(2001, 12, 31),
+            'filter': [],
+            'is_timeseries': False,
+            'timeseries_limit': 0,
+            'timeseries_limit_metric': None,
+            'row_limit': 5000,
+            'extras': {
+                'time_grain_sqla': None,
+            },
+        }
+        self.df['ratio'] = self.df['value'] / self.df['value2']
+        result = self.datasource.query(p)
+        self.assertIsInstance(result, QueryResult)
+        self.assertEqual(result.error_message, None)
+        self.assertEqual(result.status, QueryStatus.SUCCESS)
+        expected_df = (self.df.groupby(p['groupby'])
+                           .aggregate(OrderedDict([('value', ['sum', 'mean']),
+                                                   ('ratio', ['mean'])])))
+        expected_df['value_percentage'] = (self.df.groupby(p['groupby'])
+                                                  .apply(lambda x: 
sum(x['value']) /
+                                                         sum(x['value'] + 
x['value2'])))
+        expected_df = expected_df.reset_index()
+        expected_df.columns = (p['groupby'] +
+                               ['sum__value', 'avg__value', 'ratio', 
'value_percentage'])
+        expected_df = (expected_df[p['groupby'] + p['metrics']]
+                       .sort_values(['sum__value'], ascending=False))
+        self.assertEqual(result.df, expected_df)
+
+    def test_groupby_ratio_metric(self):
+        p = {
+            'groupby': ['project', 'region'],
+            'metrics': ['ratio'],
+            'granularity': 'received',
+            'from_dttm': datetime.datetime(2001, 1, 1),
+            'to_dttm': datetime.datetime(2001, 12, 31),
+            'filter': [],
+            'is_timeseries': False,
+            'timeseries_limit': 0,
+            'timeseries_limit_metric': None,
+            'row_limit': 5000,
+            'extras': {
+                'time_grain_sqla': None,
+            },
+        }
+        self.df['ratio'] = self.df['value'] / self.df['value2']
+        result = self.datasource.query(p)
+        self.assertIsInstance(result, QueryResult)
+        self.assertEqual(result.error_message, None)
+        self.assertEqual(result.status, QueryStatus.SUCCESS)
+        expected_df = (self.df.groupby(p['groupby'])['ratio']
+                              .mean()
+                              .reset_index()
+                              .sort_values(['ratio'], ascending=False))
+        expected_df.columns = p['groupby'] + p['metrics']
+        self.assertEqual(result.df, expected_df)
+
+    def test_groupby_value_percentage_metric(self):
+        p = {
+            'groupby': ['project', 'region'],
+            'metrics': ['value_percentage'],
+            'granularity': 'received',
+            'from_dttm': datetime.datetime(2001, 1, 1),
+            'to_dttm': datetime.datetime(2001, 12, 31),
+            'filter': [],
+            'is_timeseries': False,
+            'timeseries_limit': 0,
+            'timeseries_limit_metric': None,
+            'row_limit': 5000,
+            'extras': {
+                'time_grain_sqla': None,
+            },
+        }
+        result = self.datasource.query(p)
+        self.assertIsInstance(result, QueryResult)
+        self.assertEqual(result.error_message, None)
+        self.assertEqual(result.status, QueryStatus.SUCCESS)
+        expected_df = (self.df.groupby(p['groupby'])
+                              .apply(lambda x: sum(x['value']) /
+                                     sum(x['value'] + x['value2']))
+                              .reset_index()
+                              .sort_values([0], ascending=False))
+        expected_df.columns = p['groupby'] + p['metrics']
+        self.assertEqual(result.df, expected_df)
+
+    def test_groupby_category_percentage_metric(self):
+        p = {
+            'groupby': ['project', 'region'],
+            'metrics': ['category_percentage'],
+            'granularity': 'received',
+            'from_dttm': datetime.datetime(2001, 1, 1),
+            'to_dttm': datetime.datetime(2001, 12, 31),
+            'filter': [],
+            'is_timeseries': False,
+            'timeseries_limit': 0,
+            'timeseries_limit_metric': None,
+            'row_limit': 5000,
+            'extras': {
+                'time_grain_sqla': None,
+            },
+        }
+        result = self.datasource.query(p)
+        self.assertIsInstance(result, QueryResult)
+        self.assertEqual(result.error_message, None)
+        self.assertEqual(result.status, QueryStatus.SUCCESS)
+        expected_df = (self.df.groupby(p['groupby'])['category']
+                              .value_counts(normalize=True)
+                              .reset_index(p['groupby'])
+                              .loc['CategoryA']
+                              .reset_index(drop=True)
+                              .sort_values(['category'], ascending=False))
+        expected_df.columns = p['groupby'] + p['metrics']
+        self.assertEqual(result.df, expected_df)
+
+    def test_groupby_ascending_order(self):
+        p = {
+            'groupby': ['project', 'region'],
+            'metrics': ['sum__value', 'avg__value'],
+            'granularity': 'received',
+            'from_dttm': datetime.datetime(2001, 1, 1),
+            'to_dttm': datetime.datetime(2001, 12, 31),
+            'filter': [],
+            'is_timeseries': False,
+            'timeseries_limit': 0,
+            'timeseries_limit_metric': None,
+            'row_limit': 5000,
+            'extras': {
+                'time_grain_sqla': None,
+            },
+            'order_desc': False,
+        }
+        result = self.datasource.query(p)
+        self.assertIsInstance(result, QueryResult)
+        self.assertEqual(result.error_message, None)
+        self.assertEqual(result.status, QueryStatus.SUCCESS)
+        expected_df = (self.df.groupby(p['groupby'])
+                           .aggregate({'value': ['sum', 'mean']})
+                           .reset_index())
+        expected_df.columns = p['groupby'] + p['metrics']
+        expected_df = expected_df.sort_values(['sum__value'], ascending=True)
+        self.assertEqual(result.df, expected_df)
+
+    def test_timeseries_single_metric(self):
+        p = {
+            'groupby': [],
+            'metrics': ['sum__value'],
+            'granularity': 'received',
+            'from_dttm': datetime.datetime(2001, 1, 1),
+            'to_dttm': datetime.datetime(2001, 12, 31),
+            'filter': [],
+            'is_timeseries': True,
+            'timeseries_limit': 50,
+            'timeseries_limit_metric': None,
+            'row_limit': 5000,
+            'extras': {
+                # Note that week and month don't work on SQLite
+                # See https://github.com/apache/incubator-superset/issues/617
+                'time_grain_sqla': 'day',
+            },
+            'order_desc': True,
+        }
+        result = self.datasource.query(p)
+        self.assertIsInstance(result, QueryResult)
+        self.assertEqual(result.error_message, None)
+        self.assertEqual(result.status, QueryStatus.SUCCESS)
+        time_grain = PandasDatasource.GRAINS[p['extras']['time_grain_sqla']]
+        expected_df = (self.df.groupby(p['groupby'] +
+                                       [pd.Grouper(key=p['granularity'],
+                                                   freq=time_grain)])
+                           .aggregate({'value': ['sum']})
+                           .reset_index())
+        expected_df.columns = (p['groupby'] +
+                               ['__timestamp'] +
+                               p['metrics'])
+        expected_df['__timestamp'] = expected_df['__timestamp'].astype(str)
+        expected_df = (expected_df.sort_values(p['metrics'][0],
+                                               ascending=(not p['order_desc']))
+                                  .reset_index(drop=True))
+        self.assertEqual(result.df, expected_df)
+
+    def test_timeseries_multiple_metrics(self):
+        p = {
+            'groupby': [],
+            'metrics': ['sum__value', 'avg__value'],
+            'granularity': 'received',
+            'from_dttm': datetime.datetime(2001, 1, 1),
+            'to_dttm': datetime.datetime(2001, 12, 31),
+            'filter': [],
+            'is_timeseries': True,
+            'timeseries_limit': 50,
+            'timeseries_limit_metric': None,
+            'row_limit': 5000,
+            'extras': {
+                # Note that week and month don't work on SQLite
+                # See https://github.com/apache/incubator-superset/issues/617
+                'time_grain_sqla': 'day',
+            },
+            'order_desc': True,
+        }
+        result = self.datasource.query(p)
+        self.assertIsInstance(result, QueryResult)
+        self.assertEqual(result.error_message, None)
+        self.assertEqual(result.status, QueryStatus.SUCCESS)
+        time_grain = PandasDatasource.GRAINS[p['extras']['time_grain_sqla']]
+        expected_df = (self.df.groupby(p['groupby'] +
+                                       [pd.Grouper(key=p['granularity'],
+                                                   freq=time_grain)])
+                           .aggregate({'value': ['sum', 'mean']})
+                           .reset_index())
+        expected_df.columns = (p['groupby'] +
+                               ['__timestamp'] +
+                               p['metrics'])
+        expected_df['__timestamp'] = expected_df['__timestamp'].astype(str)
+        expected_df = (expected_df.sort_values(p['metrics'][0],
+                                               ascending=(not 
p['order_desc'])))
+        self.assertEqual(result.df, expected_df)
+
+    def test_timeseries_groupby(self):
+        p = {
+            'groupby': ['project'],
+            'metrics': ['sum__value', 'avg__value'],
+            'granularity': 'received',
+            'from_dttm': datetime.datetime(2001, 1, 1),
+            'to_dttm': datetime.datetime(2001, 12, 31),
+            'filter': [],
+            'is_timeseries': True,
+            'timeseries_limit': 50,
+            'timeseries_limit_metric': None,
+            'row_limit': 5000,
+            'extras': {
+                # Note that week and month don't work on SQLite
+                # See https://github.com/apache/incubator-superset/issues/617
+                'time_grain_sqla': 'day',
+            },
+        }
+        result = self.datasource.query(p)
+        self.assertIsInstance(result, QueryResult)
+        self.assertEqual(result.error_message, None)
+        self.assertEqual(result.status, QueryStatus.SUCCESS)
+        time_grain = PandasDatasource.GRAINS[p['extras']['time_grain_sqla']]
+        expected_df = (self.df.groupby(p['groupby'] +
+                                       [pd.Grouper(key=p['granularity'],
+                                                   freq=time_grain)])
+                           .aggregate({'value': ['sum', 'mean']})
+                           .reset_index())
+        expected_df.columns = (p['groupby'] +
+                               ['__timestamp'] +
+                               p['metrics'])
+        expected_df['__timestamp'] = expected_df['__timestamp'].astype(str)
+        expected_df = (expected_df.sort_values(['sum__value'], ascending=False)
+                                  .reset_index(drop=True))
+        self.assertEqual(result.df, expected_df)
+
+    def test_timeseries_limit(self):
+        p = {
+            'groupby': ['project', 'district'],
+            'metrics': ['sum__value'],
+            'granularity': 'received',
+            'from_dttm': datetime.datetime(2001, 1, 1),
+            'to_dttm': datetime.datetime(2001, 12, 31),
+            'filter': [],
+            'is_timeseries': True,
+            'timeseries_limit': 2,
+            'timeseries_limit_metric': 'avg__value',
+            'row_limit': 5000,
+            'extras': {
+                # Note that week and month don't work on SQLite
+                # See https://github.com/apache/incubator-superset/issues/617
+                'time_grain_sqla': 'day',
+            },
+            'order_desc': True,
+        }
+        result = self.datasource.query(p)
+        self.assertIsInstance(result, QueryResult)
+        self.assertEqual(result.error_message, None)
+        self.assertEqual(result.status, QueryStatus.SUCCESS)
+        time_grain = PandasDatasource.GRAINS[p['extras']['time_grain_sqla']]
+        limit_df = (self.df.groupby(p['groupby'])
+                           .aggregate({'value': 'mean'})
+                           .sort_values('value', ascending=(not 
p['order_desc']))
+                           .iloc[:p['timeseries_limit']])
+        source_df = self.df.set_index(p['groupby'])
+        expected_df = (source_df[source_df.index.isin(limit_df.index)]
+                       .groupby(p['groupby'] + 
[pd.Grouper(key=p['granularity'],
+                                freq=time_grain)])
+                       .aggregate({'value': ['sum']})
+                       .reset_index())
+        expected_df.columns = (p['groupby'] +
+                               ['__timestamp'] +
+                               p['metrics'])
+        expected_df['__timestamp'] = expected_df['__timestamp'].astype(str)
+        expected_df = (expected_df.sort_values(['sum__value'],
+                                               ascending=(not p['order_desc']))
+                                  .reset_index(drop=True))
+        self.assertEqual(result.df, expected_df)
+
+    def test_timeseries_limit_ascending_order(self):
+        p = {
+            'groupby': ['project', 'district'],
+            'metrics': ['sum__value'],
+            'granularity': 'received',
+            'from_dttm': datetime.datetime(2001, 1, 1),
+            'to_dttm': datetime.datetime(2001, 12, 31),
+            'filter': [],
+            'is_timeseries': True,
+            'timeseries_limit': 2,
+            'timeseries_limit_metric': 'avg__value',
+            'row_limit': 5000,
+            'extras': {
+                # Note that week and month don't work on SQLite
+                # See https://github.com/apache/incubator-superset/issues/617
+                'time_grain_sqla': 'day',
+            },
+            'order_desc': False,
+        }
+        result = self.datasource.query(p)
+        self.assertIsInstance(result, QueryResult)
+        self.assertEqual(result.error_message, None)
+        self.assertEqual(result.status, QueryStatus.SUCCESS)
+        time_grain = PandasDatasource.GRAINS[p['extras']['time_grain_sqla']]
+        limit_df = (self.df.groupby(p['groupby'])
+                           .aggregate({'value': 'mean'})
+                           .sort_values('value', ascending=(not 
p['order_desc']))
+                           .iloc[:p['timeseries_limit']])
+        source_df = self.df.set_index(p['groupby'])
+        expected_df = (source_df[source_df.index.isin(limit_df.index)]
+                       .groupby(p['groupby'] + 
[pd.Grouper(key=p['granularity'],
+                                freq=time_grain)])
+                       .aggregate({'value': ['sum']})
+                       .reset_index())
+        expected_df.columns = (p['groupby'] +
+                               ['__timestamp'] +
+                               p['metrics'])
+        expected_df['__timestamp'] = expected_df['__timestamp'].astype(str)
+        expected_df = (expected_df.sort_values(['sum__value'],
+                                               ascending=(not p['order_desc']))
+                                  .reset_index(drop=True))
+        self.assertEqual(result.df, expected_df)
+
+
+class SqlaConnectorTestCase(BaseConnectorTestCase):
+
+    columns = [
+        TableColumn(column_name='region', type='VARCHAR(20)'),
+        TableColumn(column_name='district', type='VARCHAR(20)'),
+        TableColumn(column_name='project', type='VARCHAR(20)'),
+        TableColumn(column_name='received', type='DATE', is_dttm=True),
+        TableColumn(column_name='value', type='BIGINT'),
+    ]
+
+    metrics = [
+        SqlMetric(metric_name='sum__value', metric_type='sum',
+                  expression='SUM(value)'),
+        SqlMetric(metric_name='avg__value', metric_type='avg',
+                  expression='AVG(value)'),
+        SqlMetric(metric_name='ratio', metric_type='avg',
+                  expression='AVG(value/value2)'),
+        SqlMetric(metric_name='value_percentage', metric_type='custom',
+                  expression='SUM(value)/SUM(value + value2)'),
+        SqlMetric(metric_name='category_percentage', metric_type='custom',
+                  expression="SUM(CASE WHEN category='CategoryA' THEN 1 ELSE 0 
END)/"
+                             'CAST(COUNT(*) AS REAL)'),
+    ]
+
+    def setUp(self):
+        super(SqlaConnectorTestCase, self).setUp()
+        sqlalchemy_uri = 'sqlite:////tmp/test.db'
+        database = Database(
+            database_name='test_database',
+            sqlalchemy_uri=sqlalchemy_uri)
+        self.connection = database.get_sqla_engine().connect()
+        self.datasource = SqlaTable(table_name='test_datasource',
+                                    database=database,
+                                    columns=self.columns,
+                                    metrics=self.metrics)
+        with database.get_sqla_engine().begin() as connection:
+            self.df.to_sql(self.datasource.table_name,
+                           connection,
+                           if_exists='replace',
+                           index=False,
+                           dtype={'received': Date})
+
+
+class PandasConnectorTestCase(BaseConnectorTestCase):
+
+    columns = [
+        PandasColumn(column_name='region', type='object'),
+        PandasColumn(column_name='district', type='object'),
+        PandasColumn(column_name='project', type='object'),
+        PandasColumn(column_name='received', type='datetime64[D]'),
+        PandasColumn(column_name='value', type='int64'),
+        PandasColumn(column_name='ratio', type='float64',
+                     expression='value / value2'),
+        PandasColumn(column_name='inverse_ratio', type='float64',
+                     expression='value2 / value'),
+    ]
+
+    metrics = [
+        PandasMetric(metric_name='sum__value', metric_type='sum',
+                     source='value', expression='sum'),
+        PandasMetric(metric_name='avg__value', metric_type='avg',
+                     source='value', expression='mean'),
+        PandasMetric(metric_name='ratio', metric_type='avg',
+                     source='ratio', expression='mean'),
+        PandasMetric(metric_name='value_percentage', metric_type='custom',
+                     source=None, expression='calc_value_percentage'),
+        PandasMetric(metric_name='category_percentage', metric_type='custom',
+                     source='category', expression='calc_category_percentage'),
+    ]
+
+    def setUp(self):
+        super(PandasConnectorTestCase, self).setUp()
+        self.datasource = PandasDatasource(name='test datasource',
+                                           
source_url=self.md_to_html(self.data),
+                                           format='html',
+                                           columns=self.columns,
+                                           metrics=self.metrics)
+
+        def calc_value_percentage(group):
+            return sum(group['value']) / sum(group['value'] + group['value2'])
+
+        self.datasource.calc_value_percentage = calc_value_percentage
+
+        def calc_category_percentage(group):
+            return group.value_counts(normalize=True).loc['CategoryA']
+
+        self.datasource.calc_category_percentage = calc_category_percentage
+
+    def test_post_aggregation_filter(self):
+        p = {
+            'groupby': ['project', 'region'],
+            'metrics': ['sum__value'],
+            'granularity': 'received',
+            'from_dttm': datetime.datetime(2001, 1, 1),
+            'to_dttm': datetime.datetime(2001, 12, 31),
+            'is_timeseries': False,
+            'timeseries_limit': 0,
+            'timeseries_limit_metric': None,
+            'row_limit': 5000,
+            'extras': {
+                'time_grain_sqla': None,
+                'having_druid': [
+                    {'col': 'sum__value', 'val': '150', 'op': '>='},
+                ],
+            },
+        }
+        result = self.datasource.query(p)
+        self.assertIsInstance(result, QueryResult)
+        self.assertEqual(result.error_message, None)
+        self.assertEqual(result.status, QueryStatus.SUCCESS)
+        expected_df = (self.df
+                           .groupby(p['groupby'])
+                           .aggregate({'value': ['sum']})
+                           .reset_index())
+        expected_df.columns = p['groupby'] + p['metrics']
+        expected_df = (expected_df.loc[expected_df['sum__value'] >= 150]
+                                  .sort_values(['sum__value'], 
ascending=False))
+        self.assertEqual(result.df, expected_df)
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/docs/files.rst b/docs/files.rst
new file mode 100644
index 0000000000..7b0040e4cd
--- /dev/null
+++ b/docs/files.rst
@@ -0,0 +1,107 @@
+Files
+=====
+
+Superset can visualize data contained in flat files or outputted by REST APIs
+using Pandas.  This page clarifies the features and limitations of using File
+Datasources with Superset.
+
+File Datasources do not use files uploaded directly to the Superset server. The
+files are stored elsewhere and downloaded (and cached) by Superset when 
required.
+This approach offers two important advantages over uploaded files:
+
+1. If the remote file is updated, then Superset will automatically download
+and use the new data when the datasource timeout expires. If the file had
+been uploaded manually then each change in the source data would require a
+new manual upload.
+2. Data can be provided by a REST API rather than an actual file. This provides
+a method to use Superset to visualize current data stored in other systems
+without needing to manually extract it first.
+
+.. note ::
+    A File Datasource downloads the full content of the source url
+    and then performs all filtering, grouping and aggregation locally
+    on the Superset server.  File Datasources that access large
+    volumes of data may impact server performance and affect other users.
+    Server administrators should ensure that adequate memory and CPU
+    resources are available before enabling the File Datasource.
+
+Installation
+''''''''''''
+
+File Datasources are not enabled by default. To enable them the systems
+administrator should update the `superset_config.py` or other configuration
+file to include: :: python
+
+    # Include additional data sources
+    ADDITIONAL_MODULE_DS_MAP = {
+        'contrib.connectors.pandas.models': ['PandasDatasource'],
+    }
+
+Supported Formats
+'''''''''''''''''
+
+File Datasources use the `Pandas library <http://pandas.pydata.org/>`_
+directly. Using a default installation in Superset, Pandas can read the
+following formats:
+
+* csv
+* html
+* json
+* Microsoft Excel
+* Stata
+
+If the appropriate dependencies have also been installed then the following
+additional formats are supported:
+
+* HDF5 (if PyTables is installed: `pip install tables`)
+* Feather (if Feather is installed: `pip install feather-format`)
+
+See the `Pandas Dependencies 
<http://pandas.pydata.org/pandas-docs/stable/install.html#dependencies>`_
+documentation for more information.
+
+Adding a File Datasource
+''''''''''''''''''''''''
+
+When you add a new File Datasource you need to provide the following 
information:
+
+* Source URL: the URL that the file to be visualized can be downloaded from.
+This can be a file hosted on another server or on a file sharing platform
+such as Dropbox, Google Drive or Amazon S3. It can also be the URL of a REST 
API
+end point.
+* Source Credentials: if the Source URL requires authentication then specify
+the credentials to be used. Credentials entered as ``["username", 
"password"]`` -
+i.e. as a valid username and password enclosed in quotation marks, separated
+by a comma and surrounded by parentheses - will be treated as a separate 
username
+and password and the File Datasource will use HTTP Basic Auth to authenticate 
to
+the remote server. Text in any other format will be passed to the remote server
+as an `Authentication` header. Typically this is used with API tokens issued by
+the remote server. Remote servers that require authentication should also use
+an HTTPS Source URL.
+* Source Parameters: a JSON-formatted dictionary of additional query parameters
+that are passed to the remote server. This field will not be required for file
+downloads, but is useful for specifying requests against REST APIs.
+* Format: The format of the data returned by the remote server
+* Read Parameters: a JSON-formatted dictionary of additional parameters that 
are
+passed to Pandas when the file retrieved from the remote Server is read into a
+DataFrame.
+
+Aggregations
+''''''''''''
+
+Common aggregations can be defined and used in Superset.
+The first and simpler use case is to use the checkbox matrix exposed in your
+datasource's edit view (``Sources -> File Datasources ->
+[your datasource] -> Edit -> [tab] List Datasource Column``).
+Clicking the ``GroupBy`` and ``Filterable`` checkboxes will make the column
+appear in the related dropdowns while in explore view. Checking
+``Count Distinct``, ``Min``, ``Max`` ``Average`` or ``Sum`` will result in 
creating
+new metrics that will appear in the ``List Metrics`` tab. 
+You can create your own aggregations manually from the ``List Metrics`` tab for
+more complex cases.
+
+Post-Aggregations
+'''''''''''''''''
+
+File Datasources allow post aggregation in Superset. Any Metric that has been
+defined for the Datasource can be used as part of a Result Filter to limit
+the returned data.
diff --git a/docs/index.rst b/docs/index.rst
index eba2e94516..5a3b831630 100644
--- a/docs/index.rst
+++ b/docs/index.rst
@@ -25,10 +25,10 @@ intelligence web application
     endorsed by the ASF.
 
 Overview
-=======================================
+========
 
 Features
----------
+--------
 
 - A rich set of data visualizations
 - An easy-to-use interface for exploring and visualizing data
@@ -61,7 +61,7 @@ Features
 
 
 Contents
----------
+--------
 
 .. toctree::
     :maxdepth: 2
@@ -74,6 +74,7 @@ Contents
     videos
     gallery
     druid
+    files
     faq
 
 
diff --git a/setup.py b/setup.py
index 8d0dba3a0f..f1997274ce 100644
--- a/setup.py
+++ b/setup.py
@@ -79,6 +79,17 @@ def get_git_sha():
         'thrift>=0.9.3',
         'thrift-sasl>=0.2.1',
         'unidecode>=0.04.21',
+        # PandasConnector deps
+        # @TODO sort into main deps
+        # prior to merge. Currently,
+        # separate to minimize merge conflicts
+        'beautifulsoup4==4.6.0',
+        'bottleneck==1.2.1',
+        'feather-format==0.4.0',
+        'html5lib==0.999999999',
+        'lxml==3.8.0',
+        'numexpr==2.6.4',
+        'xlrd==1.1.0',
     ],
     extras_require={
         'cors': ['Flask-Cors>=2.0.0'],
diff --git a/superset/__init__.py b/superset/__init__.py
index 9ef665ecca..2308eddb1e 100644
--- a/superset/__init__.py
+++ b/superset/__init__.py
@@ -90,6 +90,13 @@ def get_js_manifest():
 
 cache = utils.setup_cache(app, conf.get('CACHE_CONFIG'))
 tables_cache = utils.setup_cache(app, conf.get('TABLE_NAMES_CACHE_CONFIG'))
+# For example:
+# DATAFRAME_CACHE_CONFIG = {
+#    'CACHE_TYPE': 'contrib.connectors.pandas.cache.dataframe',
+#    'CACHE_DEFAULT_TIMEOUT': 60 * 60 * 24,
+#    'CACHE_DIR': '/tmp/pandasdatasource_cache',
+#    'CACHE_THRESHOLD': 200}
+dataframe_cache = utils.setup_cache(app, conf.get('DATAFRAME_CACHE_CONFIG'))
 
 migrate = Migrate(app, db, directory=APP_DIR + '/migrations')
 
diff --git a/superset/assets/javascripts/explore/stores/visTypes.js 
b/superset/assets/javascripts/explore/stores/visTypes.js
index ef9dc4112c..b8af343f70 100644
--- a/superset/assets/javascripts/explore/stores/visTypes.js
+++ b/superset/assets/javascripts/explore/stores/visTypes.js
@@ -1374,7 +1374,7 @@ export function sectionsToRender(vizType, datasourceType) 
{
   const viz = visTypes[vizType];
   return [].concat(
     sections.datasourceAndVizType,
-    datasourceType === 'table' ? sections.sqlaTimeSeries : 
sections.druidTimeSeries,
+    (datasourceType === 'table' || datasourceType === 'pandas') ? 
sections.sqlaTimeSeries : sections.druidTimeSeries,
     viz.controlPanelSections,
     datasourceType === 'table' ? sections.sqlClause : [],
     datasourceType === 'table' ? sections.filters[0] : sections.filters,
diff --git a/superset/migrations/versions/b2cd059e8803_add_pandasdatasource.py 
b/superset/migrations/versions/b2cd059e8803_add_pandasdatasource.py
new file mode 100644
index 0000000000..b8545d457b
--- /dev/null
+++ b/superset/migrations/versions/b2cd059e8803_add_pandasdatasource.py
@@ -0,0 +1,108 @@
+"""Add PandasDatasource
+
+Revision ID: b2cd059e8803
+Revises: ca69c70ec99b
+Create Date: 2017-08-30 09:33:40.431377
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = 'b2cd059e8803'
+down_revision = 'ca69c70ec99b'
+
+from alembic import op
+import sqlalchemy as sa
+import sqlalchemy_utils
+
+
+FORMATS = [
+    ('csv', 'CSV'),
+    ('html', 'HTML')
+]
+
+
+def upgrade():
+    # ### commands auto generated by Alembic - please adjust! ###
+    op.create_table('pandas_datasources',
+    sa.Column('created_on', sa.DateTime(), nullable=True),
+    sa.Column('changed_on', sa.DateTime(), nullable=True),
+    sa.Column('id', sa.Integer(), nullable=False),
+    sa.Column('description', sa.Text(), nullable=True),
+    sa.Column('default_endpoint', sa.Text(), nullable=True),
+    sa.Column('is_featured', sa.Boolean(), nullable=True),
+    sa.Column('filter_select_enabled', sa.Boolean(), nullable=True),
+    sa.Column('offset', sa.Integer(), nullable=True),
+    sa.Column('cache_timeout', sa.Integer(), nullable=True),
+    sa.Column('params', sa.String(length=1000), nullable=True),
+    sa.Column('perm', sa.String(length=1000), nullable=True),
+    sa.Column('name', sa.String(length=100), nullable=False),
+    sa.Column('source_url', sa.String(length=1000), nullable=False),
+    sa.Column('source_auth', sqlalchemy_utils.types.json.JSONType(), 
nullable=True),
+    sa.Column('source_parameters', sqlalchemy_utils.types.json.JSONType(), 
nullable=True),
+    sa.Column('format', sqlalchemy_utils.types.choice.ChoiceType(FORMATS), 
nullable=False),
+    sa.Column('additional_parameters', sqlalchemy_utils.types.json.JSONType(), 
nullable=True),
+    sa.Column('user_id', sa.Integer(), nullable=True),
+    sa.Column('fetch_values_predicate', sa.String(length=1000), nullable=True),
+    sa.Column('main_dttm_col', sa.String(length=250), nullable=True),
+    sa.Column('created_by_fk', sa.Integer(), nullable=True),
+    sa.Column('changed_by_fk', sa.Integer(), nullable=True),
+    sa.ForeignKeyConstraint(['changed_by_fk'], ['ab_user.id'], ),
+    sa.ForeignKeyConstraint(['created_by_fk'], ['ab_user.id'], ),
+    sa.ForeignKeyConstraint(['user_id'], ['ab_user.id'], ),
+    sa.PrimaryKeyConstraint('id')
+    )
+    op.create_table('pandas_columns',
+    sa.Column('created_on', sa.DateTime(), nullable=True),
+    sa.Column('changed_on', sa.DateTime(), nullable=True),
+    sa.Column('column_name', sa.String(length=255), nullable=True),
+    sa.Column('verbose_name', sa.String(length=1024), nullable=True),
+    sa.Column('is_active', sa.Boolean(), nullable=True),
+    sa.Column('type', sa.String(length=32), nullable=True),
+    sa.Column('groupby', sa.Boolean(), nullable=True),
+    sa.Column('count_distinct', sa.Boolean(), nullable=True),
+    sa.Column('sum', sa.Boolean(), nullable=True),
+    sa.Column('avg', sa.Boolean(), nullable=True),
+    sa.Column('max', sa.Boolean(), nullable=True),
+    sa.Column('min', sa.Boolean(), nullable=True),
+    sa.Column('filterable', sa.Boolean(), nullable=True),
+    sa.Column('description', sa.Text(), nullable=True),
+    sa.Column('expression', sa.Text(), nullable=True),
+    sa.Column('id', sa.Integer(), nullable=False),
+    sa.Column('pandas_datasource_id', sa.Integer(), nullable=True),
+    sa.Column('created_by_fk', sa.Integer(), nullable=True),
+    sa.Column('changed_by_fk', sa.Integer(), nullable=True),
+    sa.ForeignKeyConstraint(['changed_by_fk'], ['ab_user.id'], ),
+    sa.ForeignKeyConstraint(['created_by_fk'], ['ab_user.id'], ),
+    sa.ForeignKeyConstraint(['pandas_datasource_id'], 
['pandas_datasources.id'], ),
+    sa.PrimaryKeyConstraint('id')
+    )
+    op.create_table('pandas_metrics',
+    sa.Column('created_on', sa.DateTime(), nullable=True),
+    sa.Column('changed_on', sa.DateTime(), nullable=True),
+    sa.Column('metric_name', sa.String(length=512), nullable=True),
+    sa.Column('verbose_name', sa.String(length=1024), nullable=True),
+    sa.Column('metric_type', sa.String(length=32), nullable=True),
+    sa.Column('description', sa.Text(), nullable=True),
+    sa.Column('warning_text', sa.Text(), nullable=True),
+    sa.Column('is_restricted', sa.Boolean(), nullable=True),
+    sa.Column('d3format', sa.String(length=128), nullable=True),
+    sa.Column('id', sa.Integer(), nullable=False),
+    sa.Column('pandas_datasource_id', sa.Integer(), nullable=True),
+    sa.Column('source', sa.Text(), nullable=True),
+    sa.Column('expression', sa.Text(), nullable=True),
+    sa.Column('created_by_fk', sa.Integer(), nullable=True),
+    sa.Column('changed_by_fk', sa.Integer(), nullable=True),
+    sa.ForeignKeyConstraint(['changed_by_fk'], ['ab_user.id'], ),
+    sa.ForeignKeyConstraint(['created_by_fk'], ['ab_user.id'], ),
+    sa.ForeignKeyConstraint(['pandas_datasource_id'], 
['pandas_datasources.id'], ),
+    sa.PrimaryKeyConstraint('id')
+    )
+    # ### end Alembic commands ###
+
+
+def downgrade():
+    # ### commands auto generated by Alembic - please adjust! ###
+    op.drop_table('pandas_metrics')
+    op.drop_table('pandas_columns')
+    op.drop_table('pandas_datasources')
+    # ### end Alembic commands ###
diff --git a/superset/migrations/versions/ff40cbbdde10_.py 
b/superset/migrations/versions/ff40cbbdde10_.py
new file mode 100644
index 0000000000..917d31317b
--- /dev/null
+++ b/superset/migrations/versions/ff40cbbdde10_.py
@@ -0,0 +1,22 @@
+"""merge PandasConnector
+
+Revision ID: ff40cbbdde10
+Revises: ('a9c47e2c1547', 'b2cd059e8803')
+Create Date: 2017-09-28 09:07:11.852491
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = 'ff40cbbdde10'
+down_revision = ('4736ec66ce19', 'b2cd059e8803')
+
+from alembic import op
+import sqlalchemy as sa
+
+
+def upgrade():
+    pass
+
+
+def downgrade():
+    pass
diff --git a/superset/views/core.py b/superset/views/core.py
index 4d55fae635..106fdb442b 100755
--- a/superset/views/core.py
+++ b/superset/views/core.py
@@ -1343,6 +1343,9 @@ def checkbox(self, model_view, id_, attr, value):
             'TableColumnInlineView':
                 ConnectorRegistry.sources['table'].column_class,
         }
+        if 'pandas' in ConnectorRegistry.sources:
+            model = ConnectorRegistry.sources['pandas'].column_class
+            modelview_to_model[model_view] = model
         model = modelview_to_model[model_view]
         obj = db.session.query(model).filter_by(id=id_).first()
         if obj:


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to