diff --git a/contrib/__init__.py b/contrib/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/contrib/cache/__init__.py b/contrib/cache/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/contrib/cache/dataframe.py b/contrib/cache/dataframe.py
new file mode 100644
index 0000000000..4d4014b824
--- /dev/null
+++ b/contrib/cache/dataframe.py
@@ -0,0 +1,195 @@
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+from __future__ import unicode_literals
+from io import open
+import json
+import os
+import tempfile
+from time import time
+    import cPickle as pickle
+except ImportError:  # pragma: no cover
+    import pickle
+import pandas as pd
+from six import u
+from werkzeug.contrib.cache import FileSystemCache
+class DataFrameCache(FileSystemCache):
+    """
+    A cache that stores Pandas DataFrames on the file system.
+    DataFrames are stored in Feather Format - a fast on-disk representation
+    of the Apache Arrow in-memory format to eliminate serialization
+    overhead.
+    This cache depends on being the only user of the `cache_dir`. Make
+    absolutely sure that nobody but this cache stores files there or
+    otherwise the cache will randomly delete files therein.
+    :param cache_dir: the directory where cache files are stored.
+    :param threshold: the maximum number of items the cache stores before
+                      it starts deleting some.
+    :param default_timeout: the default timeout that is used if no timeout is
+                            specified on :meth:`~BaseCache.set`. A timeout of
+                            0 indicates that the cache never expires.
+    :param mode: the file mode wanted for the cache files, default 0600
+    """
+    _fs_cache_suffix = '.cached'
+    _fs_metadata_suffix = '.metadata'
+    def _list_dir(self):
+        """return a list of (fully qualified) cache filenames
+        """
+        return [os.path.join(self._path, fn) for fn in os.listdir(self._path)
+                if fn.endswith(self._fs_cache_suffix)]
+    def _prune(self):
+        entries = self._list_dir()
+        if len(entries) > self._threshold:
+            now = time()
+            for idx, cname in enumerate(entries):
+                mname = os.path.splitext(cname)[0] + self._fs_metadata_suffix
+                try:
+                    with open(mname, 'r', encoding='utf-8') as f:
+                        metadata = json.load(f)
+                except (IOError, OSError):
+                    metadata = {'expires': -1}
+                try:
+                    remove = ((metadata['expires'] != 0 and 
metadata['expires'] <= now)
+                              or idx % 3 == 0)
+                    if remove:
+                        os.remove(cname)
+                        os.remove(mname)
+                except (IOError, OSError):
+                    pass
+    def clear(self):
+        for cname in self._list_dir():
+            try:
+                mname = os.path.splitext(cname)[0] + self._fs_metadata_suffix
+                os.remove(cname)
+                os.remove(mname)
+            except (IOError, OSError):
+                return False
+        return True
+    def get(self, key):
+        filename = self._get_filename(key)
+        cname = filename + self._fs_cache_suffix
+        mname = filename + self._fs_metadata_suffix
+        try:
+            with open(mname, 'r', encoding='utf-8') as f:
+                metadata = json.load(f)
+        except (IOError, OSError):
+            metadata = {'expires': -1}
+        try:
+            with open(cname, 'rb') as f:
+                if metadata['expires'] == 0 or metadata['expires'] > time():
+                    read_method = getattr(pd, 
+                    read_args = metadata.get('read_args', {})
+                    if metadata['format'] == 'hdf':
+                        return read_method(f.name, **read_args)
+                    else:
+                        return read_method(f, **read_args)
+                else:
+                    os.remove(cname)
+                    os.remove(mname)
+                    return None
+        except (IOError, OSError):
+            return None
+    def add(self, key, value, timeout=None):
+        filename = self._get_filename(key) + self._fs_cache_suffix
+        if not os.path.exists(filename):
+            return self.set(key, value, timeout)
+        return False
+    def set(self, key, value, timeout=None):
+        metadata = {'expires': self._normalize_timeout(timeout)}
+        filename = self._get_filename(key)
+        cname = filename + self._fs_cache_suffix
+        mname = filename + self._fs_metadata_suffix
+        suffix = self._fs_transaction_suffix
+        self._prune()
+        def to_feather(filename, dataframe, metadata):
+            with tempfile.NamedTemporaryFile(dir=self._path, suffix=suffix) as 
+                dataframe.to_feather(f)
+                metadata['format'] = 'feather'
+                os.link(f.name, cname)
+        def to_hdf(filename, dataframe, metadata):
+            with tempfile.NamedTemporaryFile(dir=self._path, suffix=suffix) as 
+                dataframe.to_hdf(f.name, 'df')
+                metadata['format'] = 'hdf'
+                metadata['read_args'] = {'key': 'df'}
+                os.link(f.name, cname)
+        def to_pickle(filename, dataframe, metadata):
+            with tempfile.NamedTemporaryFile(dir=self._path, suffix=suffix) as 
+                pickle.dump(dataframe, f, pickle.HIGHEST_PROTOCOL)
+                metadata['format'] = 'pickle'
+                os.link(f.name, cname)
+        for serializer in [to_feather, to_hdf, to_pickle]:
+            try:
+                serializer(cname, value, metadata)
+                with open(mname, 'w', encoding='utf-8') as f:
+                    f.write(u(json.dumps(metadata)))
+                return True
+            except Exception:
+                # Try the next serializer
+                pass
+        # We didn't successfully save the data
+        return False
+    def delete(self, key):
+        filename = self._get_filename(key)
+        cname = filename + self._fs_cache_suffix
+        mname = filename + self._fs_metadata_suffix
+        try:
+            os.remove(cname)
+            os.remove(mname)
+        except (IOError, OSError):
+            return False
+        else:
+            return True
+    def has(self, key):
+        filename = self._get_filename(key)
+        cname = filename + self._fs_cache_suffix
+        mname = filename + self._fs_metadata_suffix
+        try:
+            with open(mname, 'r', encoding='utf-8') as f:
+                metadata = json.load(f)
+        except (IOError, OSError):
+            metadata = {'expires': -1}
+        try:
+            with open(cname, 'rb') as f:
+                if metadata['expires'] == 0 or metadata['expires'] > time():
+                    return True
+                else:
+                    os.remove(cname)
+                    os.remove(mname)
+                    return False
+        except (IOError, OSError):
+            return False
+    def inc(self, key, delta=1):
+        raise NotImplementedError()
+    def dec(self, key, delta=1):
+        raise NotImplementedError()
+def dataframe(app, config, args, kwargs):
+    """Return a DataFrameCache for use by Flask-Cache."""
+    args.insert(0, config['CACHE_DIR'])
+    kwargs.update(dict(threshold=config['CACHE_THRESHOLD']))
+    return DataFrameCache(*args, **kwargs)
diff --git a/contrib/connectors/__init__.py b/contrib/connectors/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/contrib/connectors/pandas/__init__.py 
new file mode 100644
index 0000000000..b2df79851f
--- /dev/null
+++ b/contrib/connectors/pandas/__init__.py
@@ -0,0 +1,2 @@
+from . import models  # noqa
+from . import views  # noqa
diff --git a/contrib/connectors/pandas/models.py 
new file mode 100644
index 0000000000..8db84a2c3e
--- /dev/null
+++ b/contrib/connectors/pandas/models.py
@@ -0,0 +1,931 @@
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+from __future__ import unicode_literals
+from collections import OrderedDict
+from datetime import datetime
+import hashlib
+from io import BytesIO
+import logging
+    from urllib.parse import urlparse
+except ImportError:
+    from urlparse import urlparse
+from flask import escape, Markup
+from flask_appbuilder import Model
+from flask_babel import lazy_gettext as _
+import pandas as pd
+from pandas.api.types import (
+    is_datetime64_any_dtype, is_numeric_dtype, is_string_dtype)
+from past.builtins import basestring
+import requests
+import sqlalchemy as sa
+from sqlalchemy import (
+    and_, Column, ForeignKey, Integer, or_, String, Text,
+from sqlalchemy.orm import backref, relationship
+from sqlalchemy_utils import ChoiceType, JSONType
+from superset import dataframe_cache, db, sm, utils
+from superset.connectors.base.models import (
+    BaseColumn, BaseDatasource, BaseMetric)
+from superset.models.helpers import QueryResult, set_perm
+from superset.utils import QueryStatus
+    ('csv', 'csv'),
+    ('html', 'html'),
+    ('json', 'json'),
+    ('excel', 'Microsoft Excel'),
+    ('stata', 'Stata'),
+    import tables  # NOQA
+    FORMATS.append(('hdf', 'HDF5'))
+except ImportError:
+    pass
+    import feather  # NOQA
+    FORMATS.append(('feather', 'Feather'))
+except ImportError:
+    pass
+class PandasDatabase(object):
+    """Non-ORM object for a Pandas Source"""
+    def __init__(self, database_name, cache_timeout=None):
+        self.database_name = database_name
+        self.cache_timeout = cache_timeout
+    def __str__(self):
+        return self.database_name
+class PandasColumn(Model, BaseColumn):
+    """
+    ORM object for Pandas columns.
+    Each Pandas Datasource can have multiple columns"""
+    __tablename__ = 'pandas_columns'
+    id = Column(Integer, primary_key=True)
+    pandas_datasource_id = Column(Integer, ForeignKey('pandas_datasources.id'))
+    datasource = relationship(
+        'PandasDatasource',
+        backref=backref('columns', cascade='all, delete-orphan'),
+        foreign_keys=[pandas_datasource_id])
+    expression = Column(Text)
+    @property
+    def is_num(self):
+        return self.type and is_numeric_dtype(self.type)
+    @property
+    def is_time(self):
+        return self.type and is_datetime64_any_dtype(self.type)
+    @property
+    def is_dttm(self):
+        return self.is_time
+    @property
+    def is_string(self):
+        return self.type and is_string_dtype(self.type)
+    @property
+    def data(self):
+        attrs = (
+            'column_name', 'verbose_name', 'description', 'expression',
+            'filterable', 'groupby', 'is_dttm')
+        return {s: getattr(self, s) for s in attrs}
+    def get_perm(self):
+        if self.datasource:
+            return ('{parent_name}.[{obj.expression}]'
+                    '(id:{obj.id})').format(
+                obj=self,
+                parent_name=self.datasource.full_name)
+        return None
+class PandasMetric(Model, BaseMetric):
+    """
+    ORM object for Pandas metrics.
+    Each Pandas Datasource can have multiple metrics
+    """
+    __tablename__ = 'pandas_metrics'
+    id = Column(Integer, primary_key=True)
+    pandas_datasource_id = Column(Integer, ForeignKey('pandas_datasources.id'))
+    datasource = relationship(
+        'PandasDatasource',
+        backref=backref('metrics', cascade='all, delete-orphan'),
+        foreign_keys=[pandas_datasource_id])
+    source = Column(Text)
+    expression = Column(Text)
+    def get_perm(self):
+        if self.datasource:
+            return ('{parent_name}.[{obj.metric_name}]'
+                    '(id:{obj.id})').format(
+                obj=self,
+                parent_name=self.datasource.full_name)
+        return None
+class PandasDatasource(Model, BaseDatasource):
+    """A datasource based on a Pandas DataFrame"""
+    # See 
http://pandas.pydata.org/pandas-docs/stable/timeseries.html#offset-aliases # 
+    GRAINS = OrderedDict([
+        ('5 seconds', '5S'),
+        ('30 seconds', '30S'),
+        ('1 minute', 'T'),
+        ('5 minutes', '5T'),
+        ('1 hour', 'H'),
+        ('6 hour', '6H'),
+        ('day', 'D'),
+        ('one day', 'D'),
+        ('1 day', 'D'),
+        ('7 days', '7D'),
+        ('week', 'W-MON'),
+        ('week_starting_sunday', 'W-SUN'),
+        ('week_ending_saturday', 'W-SUN'),
+        ('month', 'M'),
+        ('quarter', 'Q'),
+        ('year', 'A'),
+    ])
+    __tablename__ = 'pandas_datasources'
+    type = 'pandas'
+    baselink = 'pandasdatasourcemodelview'  # url portion pointing to ModelView
+    column_class = PandasColumn
+    metric_class = PandasMetric
+    name = Column(String(100), nullable=False)
+    source_url = Column(String(1000), nullable=False)
+    source_auth = Column(JSONType)
+    source_parameters = Column(JSONType)
+    format = Column(ChoiceType(FORMATS), nullable=False)
+    additional_parameters = Column(JSONType)
+    user_id = Column(Integer, ForeignKey('ab_user.id'))
+    owner = relationship(
+        sm.user_model,
+        backref='pandas_datasources',
+        foreign_keys=[user_id])
+    fetch_values_predicate = Column(String(1000))
+    main_dttm_col = Column(String(250))
+    # Used to do code highlighting when displaying the query in the UI
+    query_language = None
+    # A Pandas Dataframe containing the data retrieved from the source url
+    df = None
+    def __repr__(self):
+        return self.name
+    @property
+    def datasource_name(self):
+        return self.name
+    @property
+    def full_name(self):
+        return self.name
+    @property
+    def database(self):
+        try:
+            database_name = urlparse(self.source_url).netloc
+        except AttributeError:
+            database_name = 'memory'
+        return PandasDatabase(database_name=database_name,
+                              cache_timeout=None)
+    @property
+    def connection(self):
+        return self.source_url
+    @property
+    def schema(self):
+        uri = urlparse(self.source_url)
+        return uri.path
+    @property
+    def schema_perm(self):
+        """Returns endpoint permission if present, host one otherwise."""
+        return utils.get_schema_perm(self.database, self.schema)
+    @property
+    def description_markeddown(self):
+        return utils.markdown(self.description)
+    @property
+    def link(self):
+        name = escape(self.name)
+        return Markup(
+            '<a href="{self.explore_url}">{name}</a>'.format(**locals()))
+    def get_perm(self):
+        return (
+            'pandas.{obj.name}'
+            '(id:{obj.id})').format(obj=self)
+    @property
+    def dttm_cols(self):
+        l = [c.column_name for c in self.columns if c.is_dttm]  # noqa: E741
+        if self.main_dttm_col and self.main_dttm_col not in l:
+            l.append(self.main_dttm_col)
+        return l
+    @property
+    def num_cols(self):
+        return [c.column_name for c in self.columns if c.is_num]
+    @property
+    def any_dttm_col(self):
+        cols = self.dttm_cols
+        if cols:
+            return cols[0]
+    @property
+    def html(self):
+        t = ((c.column_name, c.type) for c in self.columns)
+        df = pd.DataFrame(t)
+        df.columns = ['field', 'type']
+        return df.to_html(
+            index=False,
+            classes=(
+                'dataframe table table-striped table-bordered '
+                'table-condensed'))
+    @property
+    def data(self):
+        d = super(PandasDatasource, self).data
+        # Note that the front end uses `granularity_sqla` and
+        # `time_grain_sqla` as the parameters for selecting the
+        # column and time grain separately.
+        d['granularity_sqla'] = utils.choicify(self.dttm_cols)
+        d['time_grain_sqla'] = [(g, g) for g in self.GRAINS.keys()]
+        logging.info(d)
+        return d
+    @property
+    def pandas_read_method(self):
+        try:
+            # The format is a Choice object
+            format = self.format.code
+        except AttributeError:
+            format = self.format
+        return getattr(pd, 'read_{format}'.format(format=format))
+    @property
+    def pandas_read_parameters(self):
+        return self.additional_parameters or {}
+    def get_empty_dataframe(self):
+        """Create an empty dataframe with the correct columns and dtypes"""
+        columns = []
+        for col in self.columns:
+            type = ('datetime64[ns]'
+                    if is_datetime64_any_dtype(col.type)
+                    else col.type)
+            columns.append((col.column_name, type))
+        return pd.DataFrame({k: pd.Series(dtype=t) for k, t in columns})
+    @property
+    def cache_key(self):
+        source = {'source_url': self.source_url,
+                  'source_auth': self.source_auth}
+        source.update(self.source_parameters or {})
+        source.update(self.pandas_read_parameters)
+        s = str([(k, source[k]) for k in sorted(source.keys())])
+        return hashlib.md5(s.encode('utf-8')).hexdigest()
+    def get_dataframe(self):
+        """
+        Read the source_url and return a Pandas DataFrame.
+        Use the PandasColumns to coerce columns into the correct dtype,
+        and add any calculated columns to the DataFrame.
+        """
+        if self.df is None:
+            if dataframe_cache:
+                cache_key = self.cache_key
+                self.df = dataframe_cache.get(cache_key)
+            if not isinstance(self.df, pd.DataFrame):
+                if (isinstance(self.source_url, basestring) and
+                        self.source_url[:4] == 'http'):
+                    # Use requests to retrieve remote data so we can handle 
+                    auth = self.source_auth
+                    url = self.source_url
+                    if isinstance(auth, (tuple, list)):
+                        response = requests.get(url, 
+                                                auth=tuple(auth))
+                    elif auth:
+                        response = requests.get(url, 
+                                                headers={'Authorization': 
+                    else:
+                        response = requests.get(url, 
+                    response.raise_for_status()
+                    data = BytesIO(response.content)
+                else:
+                    # Local file, so just use Pandas directly
+                    data = self.source_url
+                # Read the dataframe from the response
+                self.df = self.pandas_read_method(data, 
+                # read_html returns a list of DataFrames
+                if (isinstance(self.df, list) and
+                        isinstance(self.df[0], pd.DataFrame)):
+                    self.df = self.df[0]
+                # Our column names are always strings
+                self.df.columns = [str(col) for col in self.df.columns]
+                if dataframe_cache:
+                    timeout = self.cache_timeout or self.database.cache_timeout
+                    dataframe_cache.set(cache_key, self.df, timeout)
+        calculated_columns = []
+        for col in self.columns:
+            name = col.column_name
+            type = col.type
+            # Prepare calculated columns
+            if col.expression:
+                calculated_columns.append('{name} = {expr}'.format(
+                    name=name, expr=col.expression))
+            elif type != self.df[name].dtype.name:
+                # Convert column to correct dtype
+                try:
+                    self.df[name] = self.df[name].values.astype(type)
+                except ValueError as e:
+                    message = ('Failed to convert column {name} '
+                               'from {old_type} to {new_type}').format(
+                        name=name,
+                        old_type=self.df[name].dtype.name,
+                        new_type=type)
+                    e.args = (message,) + e.args
+                    raise
+        # Add the calcuated columns, using a multi-line string to add them all 
at once
+        # See 
  # NOQA: E501
+        if calculated_columns:
+            self.df.eval('\n'.join(calculated_columns),
+                         truediv=True,
+                         inplace=True)
+        return self.df
+    def get_filter_query(self, filter):
+        """
+        Build a query string to filter a dataframe.
+        Filter is a list of dicts of op, col and value.
+        Returns a string that can be passed to DataFrame.query() to
+        restrict the DataFrame to only the matching rows.
+        """
+        cols = {col.column_name: col for col in self.columns}
+        query = ''
+        for flt in filter:
+            if not all([flt.get(s) for s in ['col', 'op', 'val']]):
+                continue
+            col = flt['col']
+            col_obj = cols.get(col)
+            op = flt['op']
+            eq = flt['val']
+            if query:
+                query += ' and '
+            if op == 'LIKE':
+                query += "{col}.str.match('{eq}')".format(col=col, eq=eq)
+            else:
+                # Rely on Pandas partial string indexing for datetime fields,
+                # see 
  # NOQA
+                try:
+                    if ((col_obj.is_string or col_obj.is_dttm) and
+                            not isinstance(eq, list)):
+                        eq = "'{}'".format(eq)
+                except AttributeError:
+                    # col_obj is None, probably because the col is a metric,
+                    # in which case it is numeric anyway
+                    pass
+                query += '({col} {op} {eq})'.format(col=col, op=op, eq=eq)
+        return query
+    def get_agg_function(self, expr):
+        """
+        Return a function that can be passed to DataFrame.apply().
+        Complex expressions that work on multiple columns must be a function
+        that accepts a Group as the parameter.
+        The function can be defined on the Connector, or on the DataFrame,
+        in the local scope
+        """
+        if expr in ['sum', 'mean', 'std', 'sem', 'count', 'min', 'max']:
+            return expr
+        if hasattr(self, expr):
+            return getattr(self, expr)
+        if hasattr(self.get_dataframe(), expr):
+            return getattr(self.get_dataframe(), expr)
+        return locals()[expr]
+    def process_dataframe(
+            self,
+            df,
+            groupby, metrics,
+            granularity,
+            from_dttm, to_dttm,
+            filter=None,  # noqa
+            is_timeseries=True,
+            timeseries_limit=15,
+            timeseries_limit_metric=None,
+            row_limit=None,
+            inner_from_dttm=None,
+            inner_to_dttm=None,
+            orderby=None,
+            extras=None,
+            columns=None,
+            form_data=None,
+            order_desc=True):
+        """Querying any dataframe table from this common interface"""
+        if orderby:
+            orderby, ascending = map(list, zip(*orderby))
+        else:
+            orderby = []
+            ascending = []
+        metric_order_asc = not order_desc
+        filter = filter or []
+        query_str = 'df'
+        # Build a dict of the metrics to include, including those that
+        # are required for post-aggregation filtering
+        # Note that the front end uses `having_druid` as the parameter
+        # for post-aggregation filters, and we are reusing that
+        # interface component.
+        filtered_metrics = [flt['col']
+                            for flt in extras.get('having_druid', [])
+                            if flt['col'] not in metrics]
+        metrics_dict = {m.metric_name: m for m in self.metrics}
+        metrics_exprs = OrderedDict()
+        for m in metrics + filtered_metrics:
+            try:
+                metric = metrics_dict[m]
+            except KeyError:
+                raise Exception(_("Metric '{}' is not valid".format(m)))
+            metrics_exprs[m] = metric
+        # Standard tests (copied from SqlaTable)
+        if not granularity and is_timeseries:
+            raise Exception(_(
+                'Datetime column not provided as part table configuration '
+                'and is required by this type of chart'))
+        # Filter the DataFrame by the time column, and resample if necessary
+        timestamp_cols = []
+        timestamp_exprs = []
+        if granularity and granularity != 'all':
+            if from_dttm:
+                filter.append({'col': granularity,
+                               'op': '>=',
+                               'val': from_dttm})
+            if to_dttm:
+                filter.append({'col': granularity,
+                               'op': '<=',
+                               'val': to_dttm})
+            if is_timeseries:
+                # Note that the front end uses `time_grain_sqla` as
+                # the parameter for setting the time grain when the
+                # granularity is being used to select the timetamp column
+                time_grain = self.GRAINS[extras.get('time_grain_sqla')]
+                timestamp_cols = ['__timestamp']
+                timestamp_exprs = [pd.Grouper(key=granularity,
+                                              freq=time_grain)]
+                if timeseries_limit_metric and timeseries_limit:
+                    metric = metrics_dict[timeseries_limit_metric]
+                    assert isinstance(metric.source, basestring)
+                    aggregates = {metric.source: metric.expression}
+                    df = (df[df.set_index(groupby)
+                               .index.isin(
+                                   df.groupby(groupby, sort=False)
+                                   .aggregate(aggregates)
+                                   .sort_values(metric.source,
+                                                ascending=metric_order_asc)
+                                   .iloc[:timeseries_limit].index)])
+                    query_str += ('[df.set_index({groupby}).index.isin('
+                                  'df.groupby({groupby}, sort=False)'
+                                  '.aggregate({aggregates})'
+                                  ".sort_values('{metric.source}', "
+                                  'ascending={metric_order_asc})'
+                                  '.iloc[:{timeseries_limit}].index)]').format(
+                        groupby=groupby,
+                        timeseries_limit_metric=timeseries_limit_metric,
+                        timeseries_limit=timeseries_limit,
+                        aggregates=aggregates,
+                        metric=metric,
+                        metric_order_asc=metric_order_asc)
+        # Additional filtering of rows prior to aggregation
+        if filter:
+            filter_str = self.get_filter_query(filter)
+            df = df.query(filter_str)
+            query_str += '.query("{filter_str}")'.format(filter_str=filter_str)
+        # We have one of:
+        # - columns only: return a simple table of results with no aggregation
+        # - metrics only: return a single row with one column per metric
+        #                 aggregated for the whole dataframe
+        # - groupby and metrics: return a table of distinct groupby columns
+        #                 and aggregations
+        # - groupby only: return a table of distinct rows
+        if columns:
+            # A simple table of results with no aggregation or grouping
+            if orderby:
+                df = df.sort_values(orderby, ascending=ascending)
+                query_str += ('.sort_values({orderby}, '
+                              'ascending={ascending})').format(
+                    orderby=orderby,
+                    ascending=ascending)
+            df = df[columns]
+            query_str += '[{columns}]'.format(columns=columns)
+        elif metrics_exprs:
+            # Aggregate the dataframe
+            # Single-column aggregates can be calculated using aggregate,
+            # multi-column ones need to use apply.
+            # aggregates is a dict keyed by a column name, where the value is
+            # a list of expressions that can be used by DataFrame.aggregate()
+            # apply_functions is a dict keyed by the metric name, where the
+            # value is a function that can be passed to DataFrame.apply()
+            aggregates = OrderedDict()
+            agg_names = []
+            apply_functions = []
+            apply_names = []
+            for metric_name, metric in metrics_exprs.items():
+                sources = []
+                if metric.source:
+                    sources = [s.strip() for s in metric.source.split(',') if 
+                if len(sources) == 1:
+                    # Single column source, so use aggregate
+                    func = self.get_agg_function(metric.expression)
+                    if metric.source in aggregates:
+                        aggregates[metric.source].append(func)
+                    else:
+                        aggregates[metric.source] = [func]
+                    agg_names.append(metric_name)
+                else:
+                    # Multiple columns so the expression must be a function
+                    # that accepts a Group as the parameter
+                    apply_functions.append((sources,
+                                            metric.expression,
+                    apply_names.append(metric_name)
+            # Build a list of like-indexed DataFrames containing the results
+            # of DataFrame.aggregate() and individual DataFrame.apply() calls
+            dfs = []
+            query_strs = []
+            if groupby or timestamp_exprs:
+                df = df.groupby(groupby + timestamp_exprs, sort=False)
+                query_str += '.groupby({}, sort=False)'.format(groupby + 
+            for sources, expr, func in apply_functions:
+                apply_df = df
+                apply_str = query_str
+                if sources:
+                    apply_df = df[sources]
+                    apply_str = query_str + '[{}]'.format(sources)
+                if groupby or timestamp_exprs:
+                    apply_df = df.apply(func)
+                    apply_str += '.apply({})'.format(expr)
+                else:
+                    # If we call DataFrame.apply() without a groupby then the 
func is
+                    # called on each column individually. Therefore, if we 
have a
+                    # summary with multi-column aggregates we need to pass the 
+                    # DataFrame to the function.
+                    apply_df = pd.Series(func(apply_df)).to_frame()
+                    apply_str = 'pd.Series({expr}({df})).to_frame()'.format(
+                        expr=expr,
+                        df=apply_str)
+                    # Superset expects a DataFrame with single Row and
+                    # the metrics as columns, rather than with the metrics as 
+                    # index, so if we have a summary then we need to pivot it.
+                    apply_df = apply_df.unstack().to_frame().T
+                    apply_str += '.unstack().to_frame().T'
+                dfs.append(apply_df)
+                query_strs.append(apply_str)
+            if aggregates:
+                if groupby or timestamp_exprs:
+                    dfs.append(df.aggregate(aggregates))
+                    query_strs.append(query_str +
+                                      '.aggregate({})'.format(aggregates))
+                else:
+                    # For a summary table we need to preserve the metric order
+                    # so we can set the correct column names, so process 
+                    # for each dataframe column in turn
+                    for col, col_agg in aggregates.items():
+                        agg_df = df.aggregate({col: col_agg})
+                        agg_str = query_str + '.aggregate({}: {})'.format(col, 
+                        # Superset expects a DataFrame with single Row and
+                        # the metrics as columns, rather than with the metrics 
as the
+                        # index, so if we have a summary then we need to pivot 
+                        agg_df = agg_df.unstack().to_frame().T
+                        agg_str += '.unstack().to_frame().T'
+                        dfs.append(agg_df)
+                        query_strs.append(agg_str)
+            # If there is more than one DataFrame in the list then
+            # concatenate them along the index
+            if len(dfs) > 1:
+                df = pd.concat(dfs, axis=1)
+                query_str = 'pd.concat([{}], axis=1)'.format(', 
+            else:
+                df = dfs[0]
+                query_str = query_strs[0]
+            if groupby or timestamp_exprs:
+                df = df.reset_index()
+                query_str += '.reset_index()'
+            # Set the correct columns names and then reorder the columns
+            # to match the requested order
+            df.columns = groupby + timestamp_cols + apply_names + agg_names
+            df = df[groupby + timestamp_cols + metrics + filtered_metrics]
+            # Filtering of rows post-aggregation based on metrics
+            # Note that the front end uses `having_druid` as the parameter
+            # for post-aggregation filters, and we are reusing that
+            # interface component.
+            if extras.get('having_druid'):
+                filter_str = self.get_filter_query(extras.get('having_druid'))
+                df = df.query(filter_str)
+                query_str += '.query("{filter_str}")'.format(
+                    filter_str=filter_str)
+            # Order by the first metric descending by default,
+            # or within the existing orderby
+            orderby.append((metrics + filtered_metrics)[0])
+            ascending.append(metric_order_asc)
+            # Use the groupby and __timestamp by as a tie breaker
+            orderby = orderby + groupby + timestamp_cols
+            ascending = ascending + ([True] * len(groupby + timestamp_cols))
+            # Sort the values
+            if orderby:
+                df = df.sort_values(orderby, ascending=ascending)
+                query_str += ('.sort_values({orderby}, '
+                              'ascending={ascending})').format(
+                    orderby=orderby,
+                    ascending=ascending)
+            # Remove metrics only added for post-aggregation filtering
+            if filtered_metrics:
+                df = df.drop(filtered_metrics, axis=1)
+                query_str += '.drop({filtered_metrics}, axis=1)'.format(
+                    filtered_metrics=filtered_metrics)
+        elif groupby:
+            # Group by without any metrics is equivalent to SELECT DISTINCT,
+            # order by the size descending by default, or within the
+            # existing orderby
+            orderby.append(0)
+            ascending.append(not order_desc)
+            # Use the group by as a tie breaker
+            orderby = orderby + groupby
+            ascending = ascending + ([True] * len(groupby))
+            df = (df.groupby(groupby, sort=False)
+                    .size()
+                    .reset_index()
+                    .sort_values(orderby, ascending=ascending)
+                    .drop(0, axis=1))
+            query_str += ('.groupby({groupby}, sort=False).size()'
+                          '.reset_index()'
+                          '.sort_values({orderby}, ascending={ascending})'
+                          '.drop(0, axis=1)').format(
+                groupby=groupby,
+                orderby=orderby,
+                ascending=ascending)
+        if row_limit:
+            df = df.iloc[:row_limit]
+            query_str += '.iloc[:{row_limit}]'.format(row_limit=row_limit)
+        # Coerce datetimes to str so that Pandas can set the correct precision
+        for col in df.columns:
+            if is_datetime64_any_dtype(df[col].dtype):
+                df[col] = df[col].astype(str)
+        return df, query_str
+    def get_query_str(self, query_obj):
+        """Returns a query as a string
+        This is used to be displayed to the user so that she/he can
+        understand what is taking place behind the scene"""
+        logging.debug('query_obj: %s', query_obj)
+        df = self.get_empty_dataframe()
+        df, query_str = self.process_dataframe(df, **query_obj)
+        logging.debug('query_str: %s', query_str)
+        return query_str
+    def query(self, query_obj):
+        """Executes the query and returns a dataframe
+        query_obj is a dictionary representing Superset's query interface.
+        Should return a ``superset.models.helpers.QueryResult``
+        """
+        logging.debug('query_obj: %s', query_obj)
+        qry_start_dttm = datetime.now()
+        status = QueryStatus.SUCCESS
+        error_message = None
+        df = None
+        query_str = ''
+        try:
+            df = self.get_dataframe()
+            df, query_str = self.process_dataframe(df, **query_obj)
+            logging.debug('query_str: %s', query_str)
+        except Exception as e:
+            status = QueryStatus.FAILED
+            logging.exception(e)
+            error_message = str(e)
+        return QueryResult(
+            status=status,
+            df=df,
+            duration=datetime.now() - qry_start_dttm,
+            query=query_str,
+            error_message=error_message)
+    def values_for_column(self, column_name, limit=10000):
+        """Given a column, returns an iterable of distinct values
+        This is used to populate the dropdown showing a list of
+        values in filters in the explore view"""
+        values = self.get_dataframe()[column_name].unique()
+        if limit:
+            values = values[:limit]
+        return values.tolist()
+    def get_metadata(self):
+        """Build the metadata for the table and merge it in"""
+        df = self.get_dataframe()
+        any_date_col = None
+        dbcols = (
+            db.session.query(PandasColumn)
+            .filter(PandasColumn.datasource == self)
+            .filter(or_(PandasColumn.column_name == col
+                        for col in df.columns)))
+        dbcols = {dbcol.column_name: dbcol for dbcol in dbcols}
+        for col in df.columns:
+            dbcol = dbcols.get(col, None)
+            if not dbcol:
+                dtype = df.dtypes[col].name
+                # Pandas defaults columns where all values are None to a dtype 
+                # float with all values as NaN, but if we can't correctly infer
+                # the dtype we are better to assume object so we don't
+                # create large numbers of unwanted Metrics
+                if self.df[col].isnull().all():
+                    dtype = 'object'
+                dbcol = PandasColumn(column_name=str(col), type=dtype)
+                # Only treat `object` as string if we have some data
+                if self.df[col].notnull().any():
+                    dbcol.groupby = dbcol.is_string
+                    dbcol.filterable = dbcol.is_string
+                dbcol.sum = dbcol.is_num
+                dbcol.avg = dbcol.is_num
+                dbcol.min = dbcol.is_num or dbcol.is_dttm
+                dbcol.max = dbcol.is_num or dbcol.is_dttm
+            self.columns.append(dbcol)
+            if not any_date_col and dbcol.is_time:
+                any_date_col = col
+        # Datasource-level metrics
+        # Note that column-specific metrics are created by 
+        # when the column is created.
+        metrics = [
+            PandasMetric(
+                metric_name='count',
+                verbose_name='count',
+                metric_type='count',
+                source=None,
+                expression='count')]
+        dbmetrics = (
+            db.session.query(PandasMetric)
+            .filter(PandasMetric.datasource == self)
+            .filter(or_(PandasMetric.metric_name == metric.metric_name
+                        for metric in metrics)))
+        dbmetrics = {metric.metric_name: metric for metric in dbmetrics}
+        for metric in metrics:
+            if not dbmetrics.get(metric.metric_name, None):
+                metric.pandas_datasource_id = self.id
+                db.session.add(metric)
+        if not self.main_dttm_col:
+            self.main_dttm_col = any_date_col
+        db.session.merge(self)
+        db.session.commit()
+def reconcile_column_metrics(mapper, connection, target):
+    """
+    Create or delete PandasMetrics to match the metric attributes
+    specified on a PandasColumn
+    """
+    mtable = PandasMetric.__table__
+    for metric_type in ('sum', 'avg', 'max', 'min', 'count_distinct'):
+        # Set up the metric attributes
+        metric_name = metric_type + '__' + target.column_name
+        verbose_name = metric_name
+        source = target.column_name
+        if metric_type == 'avg':
+            expression = 'mean',
+        elif metric_type == 'count_distinct':
+            expression = 'nunique'
+        else:
+            expression = metric_type
+        if getattr(target, metric_type):
+            # Create the metric if it doesn't already exist
+            result = connection.execute(
+                mtable
+                .select()
+                .where(
+                    and_(
+                        mtable.c.pandas_datasource_id == 
+                        mtable.c.metric_name == metric_name)))
+            if not result.rowcount:
+                connection.execute(
+                    mtable.insert(),
+                    pandas_datasource_id=target.pandas_datasource_id,
+                    metric_name=metric_name,
+                    verbose_name=verbose_name,
+                    source=source,
+                    expression=expression)
+        else:
+            # Delete the metric if it exists and hasn't been customized
+            connection.execute(
+                mtable
+                .delete()
+                .where(
+                    and_(
+                        mtable.c.pandas_datasource_id == 
+                        mtable.c.metric_name == metric_name,
+                        mtable.c.verbose_name == verbose_name,
+                        mtable.c.source == source,
+                        mtable.c.expression == expression)))
+def reconcile_metric_column(mapper, connection, target):
+    """
+    Clear the metric attribute on a PandasColumn if the
+    corresponding PandasMetric is deleted
+    """
+    ctable = PandasColumn.__table__
+    try:
+        metric_type, column_name = target.metric_name.split('__', 1)
+        if metric_type in ctable.c:
+            connection.execute(
+                ctable
+                .update()
+                .values(**{metric_type: False})
+                .where(
+                    and_(
+                        ctable.c.pandas_datasource_id == 
+                        ctable.c.column_name == column_name)))
+    except ValueError:
+        # Metric name doesn't contain __
+        pass
+sa.event.listen(PandasColumn, 'after_insert', reconcile_column_metrics)
+sa.event.listen(PandasColumn, 'after_update', reconcile_column_metrics)
+sa.event.listen(PandasMetric, 'before_delete', reconcile_metric_column)
+sa.event.listen(PandasDatasource, 'after_insert', set_perm)
+sa.event.listen(PandasDatasource, 'after_update', set_perm)
diff --git a/contrib/connectors/pandas/views.py 
new file mode 100644
index 0000000000..197fc05ee7
--- /dev/null
+++ b/contrib/connectors/pandas/views.py
@@ -0,0 +1,336 @@
+"""Views used by the SqlAlchemy connector"""
+import json
+import logging
+from flask import flash, Markup, redirect
+from flask_appbuilder import CompactCRUDMixin, expose
+from flask_appbuilder.fieldwidgets import BS3TextAreaFieldWidget, 
+from flask_appbuilder.models.sqla.interface import SQLAInterface
+from flask_babel import gettext as __
+from flask_babel import lazy_gettext as _
+from past.builtins import basestring
+import sqlalchemy as sa
+from wtforms import SelectField, StringField, validators
+from superset import appbuilder, db, security, sm, utils
+from superset.connectors.base.views import DatasourceModelView
+from superset.utils import has_access
+from superset.views.base import (
+    DatasourceFilter, DeleteMixin, get_datasource_exist_error_mgs,
+    ListWidgetWithCheckboxes, SupersetModelView,
+from .models import FORMATS, PandasColumn, PandasDatasource, PandasMetric
+class ChoiceTypeSelectField(SelectField):
+    """A SelectField based on a ChoiceType model field."""
+    def process_data(self, value):
+        """Use the code rather than the str() representation as data"""
+        try:
+            self.data = value.code
+        except AttributeError:
+            super(ChoiceTypeSelectField, self).process_data(value)
+class JSONField(StringField):
+    """
+    JSON field for WTForms that converts between the form string data
+    and a dictionary representation, with validation
+    See https://gist.github.com/dukebody/dcc371bf286534d546e9
+    """
+    def _value(self):
+        return json.dumps(self.data) if self.data else ''
+    def process_formdata(self, valuelist):
+        if valuelist:
+            try:
+                self.data = json.loads(valuelist[0])
+            except ValueError:
+                raise ValueError('This field contains invalid JSON')
+        else:
+            self.data = None
+    def pre_validate(self, form):
+        super().pre_validate(form)
+        if self.data:
+            try:
+                json.dumps(self.data)
+            except TypeError:
+                raise ValueError('This field contains invalid JSON')
+class PandasColumnInlineView(CompactCRUDMixin, SupersetModelView):  # noqa
+    datamodel = SQLAInterface(PandasColumn)
+    list_title = _('List Columns')
+    show_title = _('Show Column')
+    add_title = _('Add Column')
+    edit_title = _('Edit Column')
+    list_widget = ListWidgetWithCheckboxes
+    edit_columns = [
+        'column_name', 'verbose_name', 'description',
+        'type', 'groupby', 'filterable',
+        'datasource', 'count_distinct', 'sum', 'avg', 'min', 'max']
+    add_columns = edit_columns
+    list_columns = [
+        'column_name', 'verbose_name', 'type', 'groupby', 'filterable',
+        'count_distinct', 'sum', 'avg', 'min', 'max']
+    page_size = 500
+    description_columns = {
+        'is_dttm': _(
+            'Whether to make this column available as a '
+            '[Time Granularity] option, column has to be DATETIME or '
+            'DATETIME-like'),
+        'filterable': _(
+            'Whether this column is exposed in the `Filters` section '
+            'of the explore view.'),
+        'type': _(
+            'The data type that was inferred by Pandas. '
+            'It may be necessary to input a type manually for '
+            'expression-defined columns in some cases. In most case '
+            'users should not need to alter this.'),
+    }
+    label_columns = {
+        'column_name': _('Column'),
+        'verbose_name': _('Verbose Name'),
+        'description': _('Description'),
+        'groupby': _('Groupable'),
+        'filterable': _('Filterable'),
+        'datasource': _('Datasource'),
+        'count_distinct': _('Count Distinct'),
+        'sum': _('Sum'),
+        'avg': _('Average'),
+        'min': _('Min'),
+        'max': _('Max'),
+        'type': _('Type'),
+    }
+class PandasMetricInlineView(CompactCRUDMixin, SupersetModelView, 
DeleteMixin):  # noqa
+    datamodel = SQLAInterface(PandasMetric)
+    list_title = _('List Metrics')
+    show_title = _('Show Metric')
+    add_title = _('Add Metric')
+    edit_title = _('Edit Metric')
+    list_columns = ['metric_name', 'verbose_name', 'metric_type']
+    edit_columns = [
+        'metric_name', 'description', 'verbose_name', 'metric_type',
+        'source', 'expression', 'datasource', 'd3format', 'is_restricted',
+        'warning_text']
+    description_columns = {
+        'source': utils.markdown(
+            'a comma-separated list of column(s) used to calculate '
+            ' the metric. Example: `claim_amount`', True),
+        'expression': utils.markdown(
+            'a valid Pandas expression as supported by the underlying '
+            'backend. Example: `count()`', True),
+        'is_restricted': _('Whether the access to this metric is restricted '
+                           'to certain roles. Only roles with the permission '
+                           "'metric access on XXX (the name of this metric)' "
+                           'are allowed to access this metric'),
+        'd3format': utils.markdown(
+            'd3 formatting string as defined [here]'
+            '(https://github.com/d3/d3-format/blob/master/README.md#format). '
+            'For instance, this default formatting applies in the Table '
+            'visualization and allow for different metric to use different '
+            'formats', True,
+        ),
+    }
+    add_columns = edit_columns
+    page_size = 500
+    label_columns = {
+        'metric_name': _('Metric'),
+        'description': _('Description'),
+        'verbose_name': _('Verbose Name'),
+        'metric_type': _('Type'),
+        'source': _('Pandas Source Columns'),
+        'expression': _('Pandas Expression'),
+        'datasource': _('Datasource'),
+        'd3format': _('D3 Format'),
+        'is_restricted': _('Is Restricted'),
+        'warning_text': _('Warning Message'),
+    }
+    def post_add(self, metric):
+        if metric.is_restricted:
+            security.merge_perm(sm, 'metric_access', metric.get_perm())
+    def post_update(self, metric):
+        if metric.is_restricted:
+            security.merge_perm(sm, 'metric_access', metric.get_perm())
+class PandasDatasourceModelView(DatasourceModelView, DeleteMixin):  # noqa
+    datamodel = SQLAInterface(PandasDatasource)
+    list_title = _('List File Datasources')
+    show_title = _('Show File Datasource')
+    add_title = _('Add File Datasource')
+    edit_title = _('Edit File Datasource')
+    list_columns = [
+        'link', 'changed_by_', 'modified']
+    order_columns = [
+        'link', 'changed_on_']
+    add_columns = ['name', 'source_url', 'source_auth', 'source_parameters',
+                   'format', 'additional_parameters']
+    add_form_extra_fields = {
+        'source_auth': JSONField(
+            _('Source Credentials'),
+            [validators.optional(), validators.length(max=100)],
+            widget=BS3TextFieldWidget(),
+            description=(
+                'Credentials required to access the raw data, if required. '
+                'Can be either a username and password in the form '
+                "'[\"username\", \"password\"]' which will be authenticated "
+                'using HTTP Basic Auth, or a string which will be used as '
+                'an Authorization header')),
+        'source_parameters': JSONField(
+            _('Additional Query Parameters'),
+            [validators.optional(), validators.length(max=500)],
+            widget=BS3TextAreaFieldWidget(),
+            description=(
+                'A JSON-formatted dictionary of additional parameters '
+                'used to request the remote file')),
+        'format': ChoiceTypeSelectField(_('Format'), choices=FORMATS),
+        'additional_parameters': JSONField(
+            _('Additional Read Parameters'),
+            [validators.optional(), validators.length(max=500)],
+            widget=BS3TextAreaFieldWidget(),
+            description=(
+                'A JSON-formatted dictionary of additional parameters '
+                'passed to the Pandas read function')),
+    }
+    edit_columns = [
+        'name', 'source_url', 'source_auth', 'source_parameters',
+        'format', 'additional_parameters',
+        'filter_select_enabled', 'slices',
+        'fetch_values_predicate',
+        'description', 'owner',
+        'main_dttm_col', 'default_endpoint', 'offset', 'cache_timeout']
+    edit_form_extra_fields = add_form_extra_fields
+    show_columns = edit_columns + ['perm']
+    related_views = [PandasColumnInlineView, PandasMetricInlineView]
+    base_order = ('changed_on', 'desc')
+    search_columns = (
+        'owner', 'name', 'source_url',
+    )
+    description_columns = {
+        'slices': _(
+            'The list of slices associated with this datasource. By '
+            'altering this datasource, you may change how these associated '
+            'slices behave. '
+            'Also note that slices need to point to a datasource, so '
+            'this form will fail at saving if removing slices from a '
+            'datasource. If you want to change the datasource for a slice, '
+            "overwrite the slice from the 'explore view'"),
+        'offset': _('Timezone offset (in hours) for this datasource'),
+        'name': _(
+            'The name of this datasource'),
+        'source_url': _(
+            'The URL used to access the raw data'),
+        'format': _(
+            'The format of the raw data, e.g. csv'),
+        'description': Markup(
+            "Supports <a href='https://daringfireball.net/projects/markdown/'>"
+            'markdown</a>'),
+        'fetch_values_predicate': _(
+            'Predicate applied when fetching distinct value to '
+            'populate the filter control component. Supports '
+            'jinja template syntax. Applies only when '
+            '`Enable Filter Select` is on.'),
+        'default_endpoint': _(
+            'Redirects to this endpoint when clicking on the datasource '
+            'from the datasource list'),
+        'filter_select_enabled': _(
+            "Whether to populate the filter's dropdown in the explore "
+            "view's filter section with a list of distinct values fetched "
+            'from the backend on the fly'),
+    }
+    base_filters = [['id', DatasourceFilter, lambda: []]]
+    label_columns = {
+        'slices': _('Associated Slices'),
+        'link': _('Datasource'),
+        'changed_by_': _('Changed By'),
+        'changed_on_': _('Last Changed'),
+        'filter_select_enabled': _('Enable Filter Select'),
+        'default_endpoint': _('Default Endpoint'),
+        'offset': _('Offset'),
+        'cache_timeout': _('Cache Timeout'),
+        'name': _('Name'),
+        'source_url': _('Source URL'),
+        'format': _('Format'),
+        'fetch_values_predicate': _('Fetch Values Predicate'),
+        'owner': _('Owner'),
+        'main_dttm_col': _('Main Datetime Column'),
+        'description': _('Description'),
+    }
+    def pre_add(self, datasource):
+        number_of_existing_datasources = (
+            db.session
+              .query(sa.func.count('*'))
+              .filter(PandasDatasource.source_url == datasource.source_url)
+              .scalar())
+        # datasource object is already added to the session
+        if number_of_existing_datasources > 1:
+            raise Exception(
+                get_datasource_exist_error_mgs(datasource.full_name))
+        # Fail before adding if the datasource can't be found
+        try:
+            datasource.get_dataframe()
+        except Exception as e:
+            logging.exception(e)
+            raise Exception(_(
+                'File [{}] could not be read, '
+                'please double check the '
+                'Source URL and Read Method').format(datasource.name))
+    def post_add(self, datasource, flash_message=True):
+        datasource.get_metadata()
+        security.merge_perm(sm, 'datasource_access', datasource.get_perm())
+        if flash_message:
+            flash(_(
+                'The datasource was created. '
+                'As part of this two phase configuration '
+                'process, you should now click the edit button by '
+                'the new datasource to configure it.'), 'info')
+    def post_update(self, datasource):
+        self.post_add(datasource, flash_message=False)
+    def _delete(self, pk):
+        DeleteMixin._delete(self, pk)
+    @expose('/edit/<pk>', methods=['GET', 'POST'])
+    @has_access
+    def edit(self, pk):
+        """Simple hack to redirect to explore view after saving"""
+        resp = super(PandasDatasourceModelView, self).edit(pk)
+        if isinstance(resp, basestring):
+            return resp
+        return redirect('/superset/explore/pandas/{}/'.format(pk))
+    PandasDatasourceModelView,
+    'File Datasources',
+    label=__('File Datasources'),
+    category='Sources',
+    category_label=__('Sources'),
+    icon='fa-file',)
diff --git a/contrib/migrations/__init__.py b/contrib/migrations/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/contrib/migrations/versions/__init__.py 
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/contrib/tests/__init__.py b/contrib/tests/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/contrib/tests/cache_tests.py b/contrib/tests/cache_tests.py
new file mode 100644
index 0000000000..5039339e50
--- /dev/null
+++ b/contrib/tests/cache_tests.py
@@ -0,0 +1,142 @@
+"""Unit tests for DataFrameCache"""
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+from __future__ import unicode_literals
+import datetime
+import os
+import shutil
+import tempfile
+import time
+import pandas as pd
+from pandas.testing import assert_frame_equal
+from tests.base_tests import SupersetTestCase
+from ..cache.dataframe import DataFrameCache
+class DataFrameCacheTestCase(SupersetTestCase):
+    def setUp(self):
+        self.cache = DataFrameCache(cache_dir=tempfile.mkdtemp())
+    def tearDown(self):
+        shutil.rmtree(self.cache._path)
+    def get_df(self, key):
+        return pd.DataFrame({'one': pd.Series([1, 2, 3]),
+                             key: pd.Series([1, 2, 3, 4])})
+    def test_get_dict(self):
+        a = self.get_df('a')
+        b = self.get_df('b')
+        assert self.cache.set('a', a)
+        assert self.cache.set('b', b)
+        d = self.cache.get_dict('a', 'b')
+        assert 'a' in d
+        assert_frame_equal(a, d['a'])
+        assert_frame_equal(b, d['b'])
+    def test_set_get(self):
+        for i in range(3):
+            assert self.cache.set(str(i), self.get_df(str(i)))
+        for i in range(3):
+            assert_frame_equal(self.cache.get(str(i)), self.get_df(str(i)))
+    def test_get_set(self):
+        assert self.cache.set('foo', self.get_df('bar'))
+        assert_frame_equal(self.cache.get('foo'), self.get_df('bar'))
+    def test_get_many(self):
+        assert self.cache.set('foo', self.get_df('bar'))
+        assert self.cache.set('spam', self.get_df('eggs'))
+        result = list(self.cache.get_many('foo', 'spam'))
+        assert_frame_equal(result[0], self.get_df('bar'))
+        assert_frame_equal(result[1], self.get_df('eggs'))
+    def test_set_many(self):
+        assert self.cache.set_many({'foo': self.get_df('bar'),
+                                    'spam': self.get_df('eggs')})
+        assert_frame_equal(self.cache.get('foo'), self.get_df('bar'))
+        assert_frame_equal(self.cache.get('spam'), self.get_df('eggs'))
+    def test_add(self):
+        # sanity check that add() works like set()
+        assert self.cache.add('foo', self.get_df('bar'))
+        assert_frame_equal(self.cache.get('foo'), self.get_df('bar'))
+        assert not self.cache.add('foo', self.get_df('qux'))
+        assert_frame_equal(self.cache.get('foo'), self.get_df('bar'))
+    def test_delete(self):
+        assert self.cache.add('foo', self.get_df('bar'))
+        assert_frame_equal(self.cache.get('foo'), self.get_df('bar'))
+        assert self.cache.delete('foo')
+        assert self.cache.get('foo') is None
+    def test_delete_many(self):
+        assert self.cache.add('foo', self.get_df('bar'))
+        assert self.cache.add('spam', self.get_df('eggs'))
+        assert self.cache.delete_many('foo', 'spam')
+        assert self.cache.get('foo') is None
+        assert self.cache.get('spam') is None
+    def test_timeout(self):
+        self.cache.set('foo', self.get_df('bar'), 0)
+        assert_frame_equal(self.cache.get('foo'), self.get_df('bar'))
+        self.cache.set('baz', self.get_df('qux'), 1)
+        assert_frame_equal(self.cache.get('baz'), self.get_df('qux'))
+        time.sleep(3)
+        # timeout of zero means no timeout
+        assert_frame_equal(self.cache.get('foo'), self.get_df('bar'))
+        assert self.cache.get('baz') is None
+    def test_has(self):
+        assert self.cache.has('foo') in (False, 0)
+        assert self.cache.has('spam') in (False, 0)
+        assert self.cache.set('foo', self.get_df('bar'))
+        assert self.cache.has('foo') in (True, 1)
+        assert self.cache.has('spam') in (False, 0)
+        self.cache.delete('foo')
+        assert self.cache.has('foo') in (False, 0)
+        assert self.cache.has('spam') in (False, 0)
+    def test_prune(self):
+        THRESHOLD = 13
+        c = DataFrameCache(cache_dir=tempfile.mkdtemp(),
+                           threshold=THRESHOLD)
+        for i in range(2 * THRESHOLD):
+            assert c.set(str(i), self.get_df(str(i)))
+        cache_files = os.listdir(c._path)
+        shutil.rmtree(c._path)
+        # There will be a small .expires file for every cached file
+        assert len(cache_files) <= THRESHOLD * 2
+    def test_clear(self):
+        cache_files = os.listdir(self.cache._path)
+        assert self.cache.set('foo', self.get_df('bar'))
+        cache_files = os.listdir(self.cache._path)
+        # There will be a small .expires file for every cached file
+        assert len(cache_files) == 2
+        assert self.cache.clear()
+        cache_files = os.listdir(self.cache._path)
+        assert len(cache_files) == 0
+    def test_non_feather_format(self):
+        # The Feather on-disk format isn't indexed and doesn't handle
+        # Object-type columns with non-homogeneous data
+        # See:
+        # - https://github.com/wesm/feather/tree/master/python#limitations
+        # - https://github.com/wesm/feather/issues/200
+        now = datetime.datetime.now
+        df = pd.DataFrame({'one': pd.Series([1, 2, 3], index=['a', 'b', 'c']),
+                           'two': pd.Series([1, 'string', now(), 4],
+                                            index=['a', 'b', 'c', 'd'])})
+        assert self.cache.set('foo', df)
+        assert_frame_equal(self.cache.get('foo'), df)
diff --git a/contrib/tests/connector_tests.py b/contrib/tests/connector_tests.py
new file mode 100644
index 0000000000..682e1ef005
--- /dev/null
+++ b/contrib/tests/connector_tests.py
@@ -0,0 +1,1047 @@
+"""Unit tests for Superset"""
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+from __future__ import unicode_literals
+from collections import OrderedDict
+import datetime
+import io
+import re
+import unittest
+import markdown
+import pandas as pd
+from pandas.testing import assert_frame_equal
+from sqlalchemy import Date
+from tests.base_tests import SupersetTestCase
+from superset.connectors.sqla.models import SqlaTable, SqlMetric, TableColumn
+from superset.models.core import Database
+from superset.models.helpers import QueryResult
+from superset.utils import QueryStatus
+from ..connectors.pandas.models import (
+    PandasColumn, PandasDatasource, PandasMetric)
+class BaseConnectorTestCase(SupersetTestCase):
+    """
+    Standard tests for Connectors, especially for .query()
+    Connector-specific subclasses should override, `setUp()`
+    to create a Datasource with the appropriate Columns and
+    Metrics, and store a copy of the raw
+    data in the `self.df` attribute
+    """
+    def __init__(self, methodName='runTest'):
+        """Setup assertEqual for DataFrames"""
+        super(BaseConnectorTestCase, self).__init__(methodName)
+        self.addTypeEqualityFunc(pd.DataFrame, 'assertFrameEqual')
+    @classmethod
+    def setUpClass(cls):
+        if cls is BaseConnectorTestCase:
+            raise unittest.SkipTest('Skip tests from BaseConnectorTestCase')
+        super(BaseConnectorTestCase, cls).setUpClass()
+    data = """
+        | region   | district   | project   | received            | value  | 
value2 | category  |
+        | Region 1 | District A | Project A | 2001-01-31 10:00:00 | 33     |  
12.30 | CategoryA |
+        | Region 1 | District A | Project A | 2001-01-31 12:00:00 | 32     |  
13.60 | CategoryB |
+        | Region 1 | District B | Project B | 2001-01-31 13:00:00 | 35     |  
15.50 | CategoryA |
+        | Region 2 | District C | Project C | 2001-01-31 09:00:00 | 12     |  
17.80 | CategoryB |
+        | Region 1 | District A | Project A | 2001-02-28 09:00:00 | 66     |  
11.30 | CategoryB |
+        | Region 1 | District B | Project B | 2001-02-28 08:00:00 | 15     |  
19.90 | CategoryB |
+        | Region 1 | District B | Project B | 2001-02-28 10:00:00 | 25     |  
15.30 | CategoryA |
+        | Region 2 | District C | Project C | 2001-02-28 08:00:00 | 18     |  
13.80 | CategoryA |
+        | Region 1 | District A | Project A | 2001-03-31 11:00:00 | 85     |  
45.10 | CategoryB |
+        | Region 1 | District B | Project B | 2001-03-31 12:00:00 |  5     |  
28.10 | CategoryA |
+        | Region 2 | District C | Project C | 2001-03-31 14:00:00 | 35     |  
22.60 | CategoryB |
+        | Region 1 | District A | Project A | 2001-04-30 10:00:00 | 15     |  
11.00 | CategoryA |
+        | Region 1 | District A | Project A | 2001-04-30 12:00:00 | 15     |  
16.10 | CategoryB |
+        | Region 2 | District C | Project C | 2001-04-30 13:00:00 | 15     |  
18.50 | CategoryA |
+        """  # NOQA: E501
+    def assertFrameEqual(self, frame1, frame2, msg=None):
+        # We don't care about the index, because it is
+        # not part of the data returned to the client
+        return assert_frame_equal(frame1.reset_index(drop=True),
+                                  frame2.reset_index(drop=True),
+                                  msg)
+    def setUp(self):
+        # Create a copy of the raw data as a dataframe to be
+        # used for calculated expected results
+        self.df = self.md_to_df(self.data)
+        self.df['received'] = 
+    def md_to_html(self, md_raw):
+        """
+        Convert a possibly-indented Markdown-table to a file-like object
+        that can be parsed by pd.read_html().
+        See 
+        for the inspiration
+        """
+        _md = []
+        for line in md_raw.split('\n'):
+            if re.match(r'^\s+\#', line):
+                # ignore lines which start with pound sign
+                continue
+            elif re.match(r'^(.*)(\#[^\|]+)$', line):
+                # keep everything before the # outside of the last occurrence
+                # of |
+                _md.append(
+                    re.match(r'^(.*)(\#[^\|]+)$', line).groups()[0].strip())
+            else:
+                _md.append(line.strip())
+        md = '\n'.join(_md)
+        return io.StringIO(
+            markdown.markdown(md, extensions=['markdown.extensions.tables']))
+    def md_to_df(self, md_raw):
+        """
+        Convert a possibly-indented Markdown-table to a Pandas DataFrame
+        """
+        return pd.read_html(self.md_to_html(md_raw))[0]
+    def test_values_for_column(self):
+        result = self.datasource.values_for_column('project')
+        self.assertEqual(result, self.df['project'].unique().tolist())
+    def test_values_for_column_with_limit(self):
+        result = self.datasource.values_for_column('project', 1)
+        self.assertEqual(result, self.df['project'].unique()[:1].tolist())
+    def test_get_query_str(self):
+        p = {
+            'groupby': ['project'],
+            'metrics': ['sum__value'],
+            'granularity': 'received',
+            'from_dttm': datetime.datetime(2001, 1, 1),
+            'to_dttm': datetime.datetime(2001, 12, 31),
+            'filter': [],
+            'is_timeseries': False,
+            'timeseries_limit': 0,
+            'timeseries_limit_metric': None,
+            'row_limit': 5000,
+            'extras': {
+                'time_grain_sqla': None,
+            },
+        }
+        result = self.datasource.get_query_str(p)
+        self.assertIn('project', result)
+    def test_summary_single_metric(self):
+        p = {
+            'groupby': [],
+            'metrics': ['sum__value'],
+            'granularity': 'received',
+            'from_dttm': datetime.datetime(2001, 1, 1),
+            'to_dttm': datetime.datetime(2001, 12, 31),
+            'filter': [],
+            'is_timeseries': False,
+            'timeseries_limit': 0,
+            'timeseries_limit_metric': None,
+            'row_limit': 5000,
+            'extras': {
+                'time_grain_sqla': None,
+            },
+        }
+        result = self.datasource.query(p)
+        self.assertIsInstance(result, QueryResult)
+        self.assertEqual(result.error_message, None)
+        self.assertEqual(result.status, QueryStatus.SUCCESS)
+        expected_df = pd.DataFrame({'sum__value': [self.df['value'].sum()]})
+        self.assertEqual(result.df, expected_df)
+    def test_summary_multiple_metrics(self):
+        p = {
+            'groupby': [],
+            'metrics': ['sum__value', 'avg__value', 'value_percentage', 
+            'granularity': 'received',
+            'from_dttm': datetime.datetime(2001, 1, 1),
+            'to_dttm': datetime.datetime(2001, 12, 31),
+            'filter': [],
+            'is_timeseries': False,
+            'timeseries_limit': 0,
+            'timeseries_limit_metric': None,
+            'row_limit': 5000,
+            'extras': {
+                'time_grain_sqla': None,
+            },
+        }
+        self.df['ratio'] = self.df['value'] / self.df['value2']
+        result = self.datasource.query(p)
+        self.assertIsInstance(result, QueryResult)
+        self.assertEqual(result.error_message, None)
+        self.assertEqual(result.status, QueryStatus.SUCCESS)
+        expected_df = pd.DataFrame(OrderedDict([
+            ('sum__value', [self.df['value'].sum()]),
+            ('avg__value', [self.df['value'].mean()]),
+            ('value_percentage', [sum(self.df['value']) /
+                                  sum(self.df['value'] + self.df['value2'])]),
+            ('ratio', [self.df['ratio'].mean()]),
+        ]))
+        self.assertEqual(result.df, expected_df)
+    def test_from_to_dttm(self):
+        p = {
+            'groupby': [],
+            'metrics': ['sum__value'],
+            'granularity': 'received',
+            'from_dttm': datetime.datetime(2001, 3, 1),
+            'to_dttm': datetime.datetime(2001, 12, 31),
+            'filter': [],
+            'is_timeseries': False,
+            'timeseries_limit': 0,
+            'timeseries_limit_metric': None,
+            'row_limit': 5000,
+            'extras': {
+                'time_grain_sqla': None,
+            },
+        }
+        result = self.datasource.query(p)
+        self.assertIsInstance(result, QueryResult)
+        self.assertEqual(result.error_message, None)
+        self.assertEqual(result.status, QueryStatus.SUCCESS)
+        df = self.df.loc[self.df['received'] > datetime.datetime(2001, 3, 1)]
+        expected_df = pd.DataFrame({'sum__value': [df['value'].sum()]})
+        self.assertEqual(result.df, expected_df)
+    def test_filter_eq_string(self):
+        p = {
+            'groupby': ['project', 'region'],
+            'metrics': ['sum__value'],
+            'granularity': 'received',
+            'from_dttm': datetime.datetime(2001, 1, 1),
+            'to_dttm': datetime.datetime(2001, 12, 31),
+            'filter': [
+                {'col': 'district', 'val': 'District A', 'op': '=='},
+            ],
+            'is_timeseries': False,
+            'timeseries_limit': 0,
+            'timeseries_limit_metric': None,
+            'row_limit': 5000,
+            'extras': {
+                'time_grain_sqla': None,
+            },
+        }
+        result = self.datasource.query(p)
+        self.assertIsInstance(result, QueryResult)
+        self.assertEqual(result.error_message, None)
+        self.assertEqual(result.status, QueryStatus.SUCCESS)
+        expected_df = (self.df
+                           .loc[self.df['district'] == 'District A']
+                           .groupby(p['groupby'])
+                           .aggregate({'value': ['sum']})
+                           .reset_index())
+        expected_df.columns = p['groupby'] + p['metrics']
+        expected_df = expected_df.sort_values(['sum__value'], ascending=False)
+        self.assertEqual(result.df, expected_df)
+    def test_filter_eq_num(self):
+        p = {
+            'groupby': ['project', 'region'],
+            'metrics': ['sum__value'],
+            'granularity': 'received',
+            'from_dttm': datetime.datetime(2001, 1, 1),
+            'to_dttm': datetime.datetime(2001, 12, 31),
+            'filter': [
+                {'col': 'value', 'val': '85', 'op': '=='},
+            ],
+            'is_timeseries': False,
+            'timeseries_limit': 0,
+            'timeseries_limit_metric': None,
+            'row_limit': 5000,
+            'extras': {
+                'time_grain_sqla': None,
+            },
+        }
+        result = self.datasource.query(p)
+        self.assertIsInstance(result, QueryResult)
+        self.assertEqual(result.error_message, None)
+        self.assertEqual(result.status, QueryStatus.SUCCESS)
+        expected_df = (self.df
+                           .loc[self.df['value'] == 85]
+                           .groupby(p['groupby'])
+                           .aggregate({'value': ['sum']})
+                           .reset_index())
+        expected_df.columns = p['groupby'] + p['metrics']
+        expected_df = expected_df.sort_values(['sum__value'], ascending=False)
+        self.assertEqual(result.df, expected_df)
+    def test_filter_eq_date(self):
+        p = {
+            'groupby': ['project', 'region'],
+            'metrics': ['sum__value'],
+            'granularity': 'received',
+            'from_dttm': datetime.datetime(2001, 1, 1),
+            'to_dttm': datetime.datetime(2001, 12, 31),
+            'filter': [
+                {'col': 'received', 'val': '2001-02-28', 'op': '=='},
+            ],
+            'is_timeseries': False,
+            'timeseries_limit': 0,
+            'timeseries_limit_metric': None,
+            'row_limit': 5000,
+            'extras': {
+                'time_grain_sqla': None,
+            },
+        }
+        result = self.datasource.query(p)
+        self.assertIsInstance(result, QueryResult)
+        self.assertEqual(result.error_message, None)
+        self.assertEqual(result.status, QueryStatus.SUCCESS)
+        expected_df = (self.df
+                           .loc[self.df['received'] == datetime.datetime(2001, 
2, 28)]
+                           .groupby(p['groupby'])
+                           .aggregate({'value': ['sum']})
+                           .reset_index())
+        expected_df.columns = p['groupby'] + p['metrics']
+        expected_df = expected_df.sort_values(['sum__value'], ascending=False)
+        self.assertEqual(result.df, expected_df)
+    def test_filter_gte(self):
+        p = {
+            'groupby': ['project', 'region'],
+            'metrics': ['sum__value'],
+            'granularity': 'received',
+            'from_dttm': datetime.datetime(2001, 1, 1),
+            'to_dttm': datetime.datetime(2001, 12, 31),
+            'filter': [
+                {'col': 'value', 'val': '70', 'op': '>='},
+            ],
+            'is_timeseries': False,
+            'timeseries_limit': 0,
+            'timeseries_limit_metric': None,
+            'row_limit': 5000,
+            'extras': {
+                'time_grain_sqla': None,
+            },
+        }
+        result = self.datasource.query(p)
+        self.assertIsInstance(result, QueryResult)
+        self.assertEqual(result.error_message, None)
+        self.assertEqual(result.status, QueryStatus.SUCCESS)
+        expected_df = (self.df
+                           .loc[self.df['value'] >= 70]
+                           .groupby(p['groupby'])
+                           .aggregate({'value': ['sum']})
+                           .reset_index())
+        expected_df.columns = p['groupby'] + p['metrics']
+        expected_df = expected_df.sort_values(['sum__value'], ascending=False)
+        self.assertEqual(result.df, expected_df)
+    def test_filter_in_num(self):
+        p = {
+            'groupby': ['project', 'region'],
+            'metrics': ['sum__value'],
+            'granularity': 'received',
+            'from_dttm': datetime.datetime(2001, 1, 1),
+            'to_dttm': datetime.datetime(2001, 12, 31),
+            'filter': [
+                {'col': 'value', 'val': ['32', '35'], 'op': 'in'},
+            ],
+            'is_timeseries': False,
+            'timeseries_limit': 0,
+            'timeseries_limit_metric': None,
+            'row_limit': 5000,
+            'extras': {
+                'time_grain_sqla': None,
+            },
+        }
+        result = self.datasource.query(p)
+        self.assertIsInstance(result, QueryResult)
+        self.assertEqual(result.error_message, None)
+        self.assertEqual(result.status, QueryStatus.SUCCESS)
+        expected_df = (self.df
+                           .loc[~((self.df['value'] != 32) &
+                                  (self.df['value'] != 35))]
+                           .groupby(p['groupby'])
+                           .aggregate({'value': ['sum']})
+                           .reset_index())
+        expected_df.columns = p['groupby'] + p['metrics']
+        expected_df = (expected_df.sort_values(['sum__value'], ascending=False)
+                                  .reset_index(drop=True))
+        self.assertEqual(result.df, expected_df)
+    def test_filter_in_str(self):
+        p = {
+            'groupby': ['project', 'region'],
+            'metrics': ['sum__value'],
+            'granularity': 'received',
+            'from_dttm': datetime.datetime(2001, 1, 1),
+            'to_dttm': datetime.datetime(2001, 12, 31),
+            'filter': [
+                {'col': 'project', 'val': ['Project A', 'Project C'], 'op': 
+            ],
+            'is_timeseries': False,
+            'timeseries_limit': 0,
+            'timeseries_limit_metric': None,
+            'row_limit': 5000,
+            'extras': {
+                'time_grain_sqla': None,
+            },
+        }
+        result = self.datasource.query(p)
+        self.assertIsInstance(result, QueryResult)
+        self.assertEqual(result.error_message, None)
+        self.assertEqual(result.status, QueryStatus.SUCCESS)
+        expected_df = (self.df
+                           .loc[~((self.df['project'] != 'Project A') &
+                                  (self.df['project'] != 'Project C'))]
+                           .groupby(p['groupby'])
+                           .aggregate({'value': ['sum']})
+                           .reset_index())
+        expected_df.columns = p['groupby'] + p['metrics']
+        expected_df = (expected_df.sort_values(['sum__value'], ascending=False)
+                                  .reset_index(drop=True))
+        self.assertEqual(result.df, expected_df)
+    def test_filter_not_in_num(self):
+        p = {
+            'groupby': ['project', 'region'],
+            'metrics': ['sum__value'],
+            'granularity': 'received',
+            'from_dttm': datetime.datetime(2001, 1, 1),
+            'to_dttm': datetime.datetime(2001, 12, 31),
+            'filter': [
+                {'col': 'value', 'val': ['32', '35'], 'op': 'not in'},
+            ],
+            'is_timeseries': False,
+            'timeseries_limit': 0,
+            'timeseries_limit_metric': None,
+            'row_limit': 5000,
+            'extras': {
+                'time_grain_sqla': None,
+            },
+        }
+        result = self.datasource.query(p)
+        self.assertIsInstance(result, QueryResult)
+        self.assertEqual(result.error_message, None)
+        self.assertEqual(result.status, QueryStatus.SUCCESS)
+        expected_df = (self.df
+                           .loc[((self.df['value'] != 32) &
+                                 (self.df['value'] != 35))]
+                           .groupby(p['groupby'])
+                           .aggregate({'value': ['sum']})
+                           .reset_index())
+        expected_df.columns = p['groupby'] + p['metrics']
+        expected_df = (expected_df.sort_values(['sum__value'], ascending=False)
+                                  .reset_index(drop=True))
+        self.assertEqual(result.df, expected_df)
+    def test_filter_not_in_str(self):
+        p = {
+            'groupby': ['project', 'region'],
+            'metrics': ['sum__value'],
+            'granularity': 'received',
+            'from_dttm': datetime.datetime(2001, 1, 1),
+            'to_dttm': datetime.datetime(2001, 12, 31),
+            'filter': [
+                {'col': 'project', 'val': ['Project A', 'Project C'], 'op': 
'not in'},
+            ],
+            'is_timeseries': False,
+            'timeseries_limit': 0,
+            'timeseries_limit_metric': None,
+            'row_limit': 5000,
+            'extras': {
+                'time_grain_sqla': None,
+            },
+        }
+        result = self.datasource.query(p)
+        self.assertIsInstance(result, QueryResult)
+        self.assertEqual(result.error_message, None)
+        self.assertEqual(result.status, QueryStatus.SUCCESS)
+        expected_df = (self.df
+                           .loc[((self.df['project'] != 'Project A') &
+                                 (self.df['project'] != 'Project C'))]
+                           .groupby(p['groupby'])
+                           .aggregate({'value': ['sum']})
+                           .reset_index())
+        expected_df.columns = p['groupby'] + p['metrics']
+        expected_df = expected_df.sort_values(['sum__value'], ascending=False)
+        self.assertEqual(result.df, expected_df)
+    def test_columns_only(self):
+        p = {
+            'groupby': [],
+            'metrics': [],
+            'granularity': 'received',
+            'from_dttm': datetime.datetime(2001, 1, 1),
+            'to_dttm': datetime.datetime(2001, 12, 31),
+            'filter': [],
+            'is_timeseries': False,
+            'timeseries_limit': 0,
+            'timeseries_limit_metric': None,
+            'row_limit': 5000,
+            'extras': {
+                'time_grain_sqla': None,
+            },
+            'columns': ['project', 'region', 'received', 'value'],
+        }
+        result = self.datasource.query(p)
+        self.assertIsInstance(result, QueryResult)
+        self.assertEqual(result.error_message, None)
+        self.assertEqual(result.status, QueryStatus.SUCCESS)
+        expected_df = self.df[p['columns']].copy()
+        expected_df['received'] = expected_df['received'].astype(str)
+        self.assertEqual(result.df, expected_df)
+    def test_orderby_with_columns(self):
+        p = {
+            'groupby': [],
+            'metrics': [],
+            'granularity': 'received',
+            'from_dttm': datetime.datetime(2001, 1, 1),
+            'to_dttm': datetime.datetime(2001, 12, 31),
+            'filter': [],
+            'is_timeseries': False,
+            'timeseries_limit': 0,
+            'timeseries_limit_metric': None,
+            'row_limit': 5000,
+            'extras': {
+                'time_grain_sqla': None,
+            },
+            'columns': ['project', 'region', 'received', 'value'],
+            'orderby': [
+                ['project', False],
+                ['region', True],
+            ],
+        }
+        result = self.datasource.query(p)
+        self.assertIsInstance(result, QueryResult)
+        self.assertEqual(result.error_message, None)
+        self.assertEqual(result.status, QueryStatus.SUCCESS)
+        expected_df = (self.df
+                           .sort_values(['project', 'region'],
+                                        ascending=[False, True])
+                           .reset_index(drop=True)
+                       [p['columns']])
+        expected_df['received'] = expected_df['received'].astype(str)
+        self.assertEqual(result.df, expected_df)
+    def test_groupby_only(self):
+        p = {
+            'groupby': ['project', 'region'],
+            'metrics': [],
+            'granularity': 'received',
+            'from_dttm': datetime.datetime(2001, 1, 1),
+            'to_dttm': datetime.datetime(2001, 12, 31),
+            'filter': [],
+            'is_timeseries': False,
+            'timeseries_limit': 0,
+            'timeseries_limit_metric': None,
+            'row_limit': 5000,
+            'extras': {
+                'time_grain_sqla': None,
+            },
+        }
+        result = self.datasource.query(p)
+        self.assertIsInstance(result, QueryResult)
+        self.assertEqual(result.error_message, None)
+        self.assertEqual(result.status, QueryStatus.SUCCESS)
+        expected_df = (self.df.groupby(p['groupby'])
+                              .size()
+                              .reset_index()
+                              .sort_values([0], ascending=False)
+                              .drop(0, axis=1))
+        self.assertEqual(result.df, expected_df)
+    def test_groupby_single_metric(self):
+        p = {
+            'groupby': ['project', 'region'],
+            'metrics': ['sum__value'],
+            'granularity': 'received',
+            'from_dttm': datetime.datetime(2001, 1, 1),
+            'to_dttm': datetime.datetime(2001, 12, 31),
+            'filter': [],
+            'is_timeseries': False,
+            'timeseries_limit': 0,
+            'timeseries_limit_metric': None,
+            'row_limit': 5000,
+            'extras': {
+                'time_grain_sqla': None,
+            },
+        }
+        result = self.datasource.query(p)
+        self.assertIsInstance(result, QueryResult)
+        self.assertEqual(result.error_message, None)
+        self.assertEqual(result.status, QueryStatus.SUCCESS)
+        expected_df = (self.df.groupby(p['groupby'])['value']
+                              .sum()
+                              .reset_index()
+                              .sort_values(['value'], ascending=False))
+        expected_df.columns = p['groupby'] + p['metrics']
+        self.assertEqual(result.df, expected_df)
+    def test_groupby_multiple_metrics(self):
+        p = {
+            'groupby': ['project', 'region'],
+            'metrics': ['sum__value', 'avg__value', 'value_percentage', 
+            'granularity': 'received',
+            'from_dttm': datetime.datetime(2001, 1, 1),
+            'to_dttm': datetime.datetime(2001, 12, 31),
+            'filter': [],
+            'is_timeseries': False,
+            'timeseries_limit': 0,
+            'timeseries_limit_metric': None,
+            'row_limit': 5000,
+            'extras': {
+                'time_grain_sqla': None,
+            },
+        }
+        self.df['ratio'] = self.df['value'] / self.df['value2']
+        result = self.datasource.query(p)
+        self.assertIsInstance(result, QueryResult)
+        self.assertEqual(result.error_message, None)
+        self.assertEqual(result.status, QueryStatus.SUCCESS)
+        expected_df = (self.df.groupby(p['groupby'])
+                           .aggregate(OrderedDict([('value', ['sum', 'mean']),
+                                                   ('ratio', ['mean'])])))
+        expected_df['value_percentage'] = (self.df.groupby(p['groupby'])
+                                                  .apply(lambda x: 
sum(x['value']) /
+                                                         sum(x['value'] + 
+        expected_df = expected_df.reset_index()
+        expected_df.columns = (p['groupby'] +
+                               ['sum__value', 'avg__value', 'ratio', 
+        expected_df = (expected_df[p['groupby'] + p['metrics']]
+                       .sort_values(['sum__value'], ascending=False))
+        self.assertEqual(result.df, expected_df)
+    def test_groupby_ratio_metric(self):
+        p = {
+            'groupby': ['project', 'region'],
+            'metrics': ['ratio'],
+            'granularity': 'received',
+            'from_dttm': datetime.datetime(2001, 1, 1),
+            'to_dttm': datetime.datetime(2001, 12, 31),
+            'filter': [],
+            'is_timeseries': False,
+            'timeseries_limit': 0,
+            'timeseries_limit_metric': None,
+            'row_limit': 5000,
+            'extras': {
+                'time_grain_sqla': None,
+            },
+        }
+        self.df['ratio'] = self.df['value'] / self.df['value2']
+        result = self.datasource.query(p)
+        self.assertIsInstance(result, QueryResult)
+        self.assertEqual(result.error_message, None)
+        self.assertEqual(result.status, QueryStatus.SUCCESS)
+        expected_df = (self.df.groupby(p['groupby'])['ratio']
+                              .mean()
+                              .reset_index()
+                              .sort_values(['ratio'], ascending=False))
+        expected_df.columns = p['groupby'] + p['metrics']
+        self.assertEqual(result.df, expected_df)
+    def test_groupby_value_percentage_metric(self):
+        p = {
+            'groupby': ['project', 'region'],
+            'metrics': ['value_percentage'],
+            'granularity': 'received',
+            'from_dttm': datetime.datetime(2001, 1, 1),
+            'to_dttm': datetime.datetime(2001, 12, 31),
+            'filter': [],
+            'is_timeseries': False,
+            'timeseries_limit': 0,
+            'timeseries_limit_metric': None,
+            'row_limit': 5000,
+            'extras': {
+                'time_grain_sqla': None,
+            },
+        }
+        result = self.datasource.query(p)
+        self.assertIsInstance(result, QueryResult)
+        self.assertEqual(result.error_message, None)
+        self.assertEqual(result.status, QueryStatus.SUCCESS)
+        expected_df = (self.df.groupby(p['groupby'])
+                              .apply(lambda x: sum(x['value']) /
+                                     sum(x['value'] + x['value2']))
+                              .reset_index()
+                              .sort_values([0], ascending=False))
+        expected_df.columns = p['groupby'] + p['metrics']
+        self.assertEqual(result.df, expected_df)
+    def test_groupby_category_percentage_metric(self):
+        p = {
+            'groupby': ['project', 'region'],
+            'metrics': ['category_percentage'],
+            'granularity': 'received',
+            'from_dttm': datetime.datetime(2001, 1, 1),
+            'to_dttm': datetime.datetime(2001, 12, 31),
+            'filter': [],
+            'is_timeseries': False,
+            'timeseries_limit': 0,
+            'timeseries_limit_metric': None,
+            'row_limit': 5000,
+            'extras': {
+                'time_grain_sqla': None,
+            },
+        }
+        result = self.datasource.query(p)
+        self.assertIsInstance(result, QueryResult)
+        self.assertEqual(result.error_message, None)
+        self.assertEqual(result.status, QueryStatus.SUCCESS)
+        expected_df = (self.df.groupby(p['groupby'])['category']
+                              .value_counts(normalize=True)
+                              .reset_index(p['groupby'])
+                              .loc['CategoryA']
+                              .reset_index(drop=True)
+                              .sort_values(['category'], ascending=False))
+        expected_df.columns = p['groupby'] + p['metrics']
+        self.assertEqual(result.df, expected_df)
+    def test_groupby_ascending_order(self):
+        p = {
+            'groupby': ['project', 'region'],
+            'metrics': ['sum__value', 'avg__value'],
+            'granularity': 'received',
+            'from_dttm': datetime.datetime(2001, 1, 1),
+            'to_dttm': datetime.datetime(2001, 12, 31),
+            'filter': [],
+            'is_timeseries': False,
+            'timeseries_limit': 0,
+            'timeseries_limit_metric': None,
+            'row_limit': 5000,
+            'extras': {
+                'time_grain_sqla': None,
+            },
+            'order_desc': False,
+        }
+        result = self.datasource.query(p)
+        self.assertIsInstance(result, QueryResult)
+        self.assertEqual(result.error_message, None)
+        self.assertEqual(result.status, QueryStatus.SUCCESS)
+        expected_df = (self.df.groupby(p['groupby'])
+                           .aggregate({'value': ['sum', 'mean']})
+                           .reset_index())
+        expected_df.columns = p['groupby'] + p['metrics']
+        expected_df = expected_df.sort_values(['sum__value'], ascending=True)
+        self.assertEqual(result.df, expected_df)
+    def test_timeseries_single_metric(self):
+        p = {
+            'groupby': [],
+            'metrics': ['sum__value'],
+            'granularity': 'received',
+            'from_dttm': datetime.datetime(2001, 1, 1),
+            'to_dttm': datetime.datetime(2001, 12, 31),
+            'filter': [],
+            'is_timeseries': True,
+            'timeseries_limit': 50,
+            'timeseries_limit_metric': None,
+            'row_limit': 5000,
+            'extras': {
+                # Note that week and month don't work on SQLite
+                # See https://github.com/apache/incubator-superset/issues/617
+                'time_grain_sqla': 'day',
+            },
+            'order_desc': True,
+        }
+        result = self.datasource.query(p)
+        self.assertIsInstance(result, QueryResult)
+        self.assertEqual(result.error_message, None)
+        self.assertEqual(result.status, QueryStatus.SUCCESS)
+        time_grain = PandasDatasource.GRAINS[p['extras']['time_grain_sqla']]
+        expected_df = (self.df.groupby(p['groupby'] +
+                                       [pd.Grouper(key=p['granularity'],
+                                                   freq=time_grain)])
+                           .aggregate({'value': ['sum']})
+                           .reset_index())
+        expected_df.columns = (p['groupby'] +
+                               ['__timestamp'] +
+                               p['metrics'])
+        expected_df['__timestamp'] = expected_df['__timestamp'].astype(str)
+        expected_df = (expected_df.sort_values(p['metrics'][0],
+                                               ascending=(not p['order_desc']))
+                                  .reset_index(drop=True))
+        self.assertEqual(result.df, expected_df)
+    def test_timeseries_multiple_metrics(self):
+        p = {
+            'groupby': [],
+            'metrics': ['sum__value', 'avg__value'],
+            'granularity': 'received',
+            'from_dttm': datetime.datetime(2001, 1, 1),
+            'to_dttm': datetime.datetime(2001, 12, 31),
+            'filter': [],
+            'is_timeseries': True,
+            'timeseries_limit': 50,
+            'timeseries_limit_metric': None,
+            'row_limit': 5000,
+            'extras': {
+                # Note that week and month don't work on SQLite
+                # See https://github.com/apache/incubator-superset/issues/617
+                'time_grain_sqla': 'day',
+            },
+            'order_desc': True,
+        }
+        result = self.datasource.query(p)
+        self.assertIsInstance(result, QueryResult)
+        self.assertEqual(result.error_message, None)
+        self.assertEqual(result.status, QueryStatus.SUCCESS)
+        time_grain = PandasDatasource.GRAINS[p['extras']['time_grain_sqla']]
+        expected_df = (self.df.groupby(p['groupby'] +
+                                       [pd.Grouper(key=p['granularity'],
+                                                   freq=time_grain)])
+                           .aggregate({'value': ['sum', 'mean']})
+                           .reset_index())
+        expected_df.columns = (p['groupby'] +
+                               ['__timestamp'] +
+                               p['metrics'])
+        expected_df['__timestamp'] = expected_df['__timestamp'].astype(str)
+        expected_df = (expected_df.sort_values(p['metrics'][0],
+                                               ascending=(not 
+        self.assertEqual(result.df, expected_df)
+    def test_timeseries_groupby(self):
+        p = {
+            'groupby': ['project'],
+            'metrics': ['sum__value', 'avg__value'],
+            'granularity': 'received',
+            'from_dttm': datetime.datetime(2001, 1, 1),
+            'to_dttm': datetime.datetime(2001, 12, 31),
+            'filter': [],
+            'is_timeseries': True,
+            'timeseries_limit': 50,
+            'timeseries_limit_metric': None,
+            'row_limit': 5000,
+            'extras': {
+                # Note that week and month don't work on SQLite
+                # See https://github.com/apache/incubator-superset/issues/617
+                'time_grain_sqla': 'day',
+            },
+        }
+        result = self.datasource.query(p)
+        self.assertIsInstance(result, QueryResult)
+        self.assertEqual(result.error_message, None)
+        self.assertEqual(result.status, QueryStatus.SUCCESS)
+        time_grain = PandasDatasource.GRAINS[p['extras']['time_grain_sqla']]
+        expected_df = (self.df.groupby(p['groupby'] +
+                                       [pd.Grouper(key=p['granularity'],
+                                                   freq=time_grain)])
+                           .aggregate({'value': ['sum', 'mean']})
+                           .reset_index())
+        expected_df.columns = (p['groupby'] +
+                               ['__timestamp'] +
+                               p['metrics'])
+        expected_df['__timestamp'] = expected_df['__timestamp'].astype(str)
+        expected_df = (expected_df.sort_values(['sum__value'], ascending=False)
+                                  .reset_index(drop=True))
+        self.assertEqual(result.df, expected_df)
+    def test_timeseries_limit(self):
+        p = {
+            'groupby': ['project', 'district'],
+            'metrics': ['sum__value'],
+            'granularity': 'received',
+            'from_dttm': datetime.datetime(2001, 1, 1),
+            'to_dttm': datetime.datetime(2001, 12, 31),
+            'filter': [],
+            'is_timeseries': True,
+            'timeseries_limit': 2,
+            'timeseries_limit_metric': 'avg__value',
+            'row_limit': 5000,
+            'extras': {
+                # Note that week and month don't work on SQLite
+                # See https://github.com/apache/incubator-superset/issues/617
+                'time_grain_sqla': 'day',
+            },
+            'order_desc': True,
+        }
+        result = self.datasource.query(p)
+        self.assertIsInstance(result, QueryResult)
+        self.assertEqual(result.error_message, None)
+        self.assertEqual(result.status, QueryStatus.SUCCESS)
+        time_grain = PandasDatasource.GRAINS[p['extras']['time_grain_sqla']]
+        limit_df = (self.df.groupby(p['groupby'])
+                           .aggregate({'value': 'mean'})
+                           .sort_values('value', ascending=(not 
+                           .iloc[:p['timeseries_limit']])
+        source_df = self.df.set_index(p['groupby'])
+        expected_df = (source_df[source_df.index.isin(limit_df.index)]
+                       .groupby(p['groupby'] + 
+                                freq=time_grain)])
+                       .aggregate({'value': ['sum']})
+                       .reset_index())
+        expected_df.columns = (p['groupby'] +
+                               ['__timestamp'] +
+                               p['metrics'])
+        expected_df['__timestamp'] = expected_df['__timestamp'].astype(str)
+        expected_df = (expected_df.sort_values(['sum__value'],
+                                               ascending=(not p['order_desc']))
+                                  .reset_index(drop=True))
+        self.assertEqual(result.df, expected_df)
+    def test_timeseries_limit_ascending_order(self):
+        p = {
+            'groupby': ['project', 'district'],
+            'metrics': ['sum__value'],
+            'granularity': 'received',
+            'from_dttm': datetime.datetime(2001, 1, 1),
+            'to_dttm': datetime.datetime(2001, 12, 31),
+            'filter': [],
+            'is_timeseries': True,
+            'timeseries_limit': 2,
+            'timeseries_limit_metric': 'avg__value',
+            'row_limit': 5000,
+            'extras': {
+                # Note that week and month don't work on SQLite
+                # See https://github.com/apache/incubator-superset/issues/617
+                'time_grain_sqla': 'day',
+            },
+            'order_desc': False,
+        }
+        result = self.datasource.query(p)
+        self.assertIsInstance(result, QueryResult)
+        self.assertEqual(result.error_message, None)
+        self.assertEqual(result.status, QueryStatus.SUCCESS)
+        time_grain = PandasDatasource.GRAINS[p['extras']['time_grain_sqla']]
+        limit_df = (self.df.groupby(p['groupby'])
+                           .aggregate({'value': 'mean'})
+                           .sort_values('value', ascending=(not 
+                           .iloc[:p['timeseries_limit']])
+        source_df = self.df.set_index(p['groupby'])
+        expected_df = (source_df[source_df.index.isin(limit_df.index)]
+                       .groupby(p['groupby'] + 
+                                freq=time_grain)])
+                       .aggregate({'value': ['sum']})
+                       .reset_index())
+        expected_df.columns = (p['groupby'] +
+                               ['__timestamp'] +
+                               p['metrics'])
+        expected_df['__timestamp'] = expected_df['__timestamp'].astype(str)
+        expected_df = (expected_df.sort_values(['sum__value'],
+                                               ascending=(not p['order_desc']))
+                                  .reset_index(drop=True))
+        self.assertEqual(result.df, expected_df)
+class SqlaConnectorTestCase(BaseConnectorTestCase):
+    columns = [
+        TableColumn(column_name='region', type='VARCHAR(20)'),
+        TableColumn(column_name='district', type='VARCHAR(20)'),
+        TableColumn(column_name='project', type='VARCHAR(20)'),
+        TableColumn(column_name='received', type='DATE', is_dttm=True),
+        TableColumn(column_name='value', type='BIGINT'),
+    ]
+    metrics = [
+        SqlMetric(metric_name='sum__value', metric_type='sum',
+                  expression='SUM(value)'),
+        SqlMetric(metric_name='avg__value', metric_type='avg',
+                  expression='AVG(value)'),
+        SqlMetric(metric_name='ratio', metric_type='avg',
+                  expression='AVG(value/value2)'),
+        SqlMetric(metric_name='value_percentage', metric_type='custom',
+                  expression='SUM(value)/SUM(value + value2)'),
+        SqlMetric(metric_name='category_percentage', metric_type='custom',
+                  expression="SUM(CASE WHEN category='CategoryA' THEN 1 ELSE 0 
+                             'CAST(COUNT(*) AS REAL)'),
+    ]
+    def setUp(self):
+        super(SqlaConnectorTestCase, self).setUp()
+        sqlalchemy_uri = 'sqlite:////tmp/test.db'
+        database = Database(
+            database_name='test_database',
+            sqlalchemy_uri=sqlalchemy_uri)
+        self.connection = database.get_sqla_engine().connect()
+        self.datasource = SqlaTable(table_name='test_datasource',
+                                    database=database,
+                                    columns=self.columns,
+                                    metrics=self.metrics)
+        with database.get_sqla_engine().begin() as connection:
+            self.df.to_sql(self.datasource.table_name,
+                           connection,
+                           if_exists='replace',
+                           index=False,
+                           dtype={'received': Date})
+class PandasConnectorTestCase(BaseConnectorTestCase):
+    columns = [
+        PandasColumn(column_name='region', type='object'),
+        PandasColumn(column_name='district', type='object'),
+        PandasColumn(column_name='project', type='object'),
+        PandasColumn(column_name='received', type='datetime64[D]'),
+        PandasColumn(column_name='value', type='int64'),
+        PandasColumn(column_name='ratio', type='float64',
+                     expression='value / value2'),
+        PandasColumn(column_name='inverse_ratio', type='float64',
+                     expression='value2 / value'),
+    ]
+    metrics = [
+        PandasMetric(metric_name='sum__value', metric_type='sum',
+                     source='value', expression='sum'),
+        PandasMetric(metric_name='avg__value', metric_type='avg',
+                     source='value', expression='mean'),
+        PandasMetric(metric_name='ratio', metric_type='avg',
+                     source='ratio', expression='mean'),
+        PandasMetric(metric_name='value_percentage', metric_type='custom',
+                     source=None, expression='calc_value_percentage'),
+        PandasMetric(metric_name='category_percentage', metric_type='custom',
+                     source='category', expression='calc_category_percentage'),
+    ]
+    def setUp(self):
+        super(PandasConnectorTestCase, self).setUp()
+        self.datasource = PandasDatasource(name='test datasource',
+                                           format='html',
+                                           columns=self.columns,
+                                           metrics=self.metrics)
+        def calc_value_percentage(group):
+            return sum(group['value']) / sum(group['value'] + group['value2'])
+        self.datasource.calc_value_percentage = calc_value_percentage
+        def calc_category_percentage(group):
+            return group.value_counts(normalize=True).loc['CategoryA']
+        self.datasource.calc_category_percentage = calc_category_percentage
+    def test_post_aggregation_filter(self):
+        p = {
+            'groupby': ['project', 'region'],
+            'metrics': ['sum__value'],
+            'granularity': 'received',
+            'from_dttm': datetime.datetime(2001, 1, 1),
+            'to_dttm': datetime.datetime(2001, 12, 31),
+            'is_timeseries': False,
+            'timeseries_limit': 0,
+            'timeseries_limit_metric': None,
+            'row_limit': 5000,
+            'extras': {
+                'time_grain_sqla': None,
+                'having_druid': [
+                    {'col': 'sum__value', 'val': '150', 'op': '>='},
+                ],
+            },
+        }
+        result = self.datasource.query(p)
+        self.assertIsInstance(result, QueryResult)
+        self.assertEqual(result.error_message, None)
+        self.assertEqual(result.status, QueryStatus.SUCCESS)
+        expected_df = (self.df
+                           .groupby(p['groupby'])
+                           .aggregate({'value': ['sum']})
+                           .reset_index())
+        expected_df.columns = p['groupby'] + p['metrics']
+        expected_df = (expected_df.loc[expected_df['sum__value'] >= 150]
+                                  .sort_values(['sum__value'], 
+        self.assertEqual(result.df, expected_df)
+if __name__ == '__main__':
+    unittest.main()
diff --git a/docs/files.rst b/docs/files.rst
new file mode 100644
index 0000000000..7b0040e4cd
--- /dev/null
+++ b/docs/files.rst
@@ -0,0 +1,107 @@
+Superset can visualize data contained in flat files or outputted by REST APIs
+using Pandas.  This page clarifies the features and limitations of using File
+Datasources with Superset.
+File Datasources do not use files uploaded directly to the Superset server. The
+files are stored elsewhere and downloaded (and cached) by Superset when 
+This approach offers two important advantages over uploaded files:
+1. If the remote file is updated, then Superset will automatically download
+and use the new data when the datasource timeout expires. If the file had
+been uploaded manually then each change in the source data would require a
+new manual upload.
+2. Data can be provided by a REST API rather than an actual file. This provides
+a method to use Superset to visualize current data stored in other systems
+without needing to manually extract it first.
+.. note ::
+    A File Datasource downloads the full content of the source url
+    and then performs all filtering, grouping and aggregation locally
+    on the Superset server.  File Datasources that access large
+    volumes of data may impact server performance and affect other users.
+    Server administrators should ensure that adequate memory and CPU
+    resources are available before enabling the File Datasource.
+File Datasources are not enabled by default. To enable them the systems
+administrator should update the `superset_config.py` or other configuration
+file to include: :: python
+    # Include additional data sources
+        'contrib.connectors.pandas.models': ['PandasDatasource'],
+    }
+Supported Formats
+File Datasources use the `Pandas library <http://pandas.pydata.org/>`_
+directly. Using a default installation in Superset, Pandas can read the
+following formats:
+* csv
+* html
+* json
+* Microsoft Excel
+* Stata
+If the appropriate dependencies have also been installed then the following
+additional formats are supported:
+* HDF5 (if PyTables is installed: `pip install tables`)
+* Feather (if Feather is installed: `pip install feather-format`)
+See the `Pandas Dependencies 
+documentation for more information.
+Adding a File Datasource
+When you add a new File Datasource you need to provide the following 
+* Source URL: the URL that the file to be visualized can be downloaded from.
+This can be a file hosted on another server or on a file sharing platform
+such as Dropbox, Google Drive or Amazon S3. It can also be the URL of a REST 
+end point.
+* Source Credentials: if the Source URL requires authentication then specify
+the credentials to be used. Credentials entered as ``["username", 
"password"]`` -
+i.e. as a valid username and password enclosed in quotation marks, separated
+by a comma and surrounded by parentheses - will be treated as a separate 
+and password and the File Datasource will use HTTP Basic Auth to authenticate 
+the remote server. Text in any other format will be passed to the remote server
+as an `Authentication` header. Typically this is used with API tokens issued by
+the remote server. Remote servers that require authentication should also use
+an HTTPS Source URL.
+* Source Parameters: a JSON-formatted dictionary of additional query parameters
+that are passed to the remote server. This field will not be required for file
+downloads, but is useful for specifying requests against REST APIs.
+* Format: The format of the data returned by the remote server
+* Read Parameters: a JSON-formatted dictionary of additional parameters that 
+passed to Pandas when the file retrieved from the remote Server is read into a
+Common aggregations can be defined and used in Superset.
+The first and simpler use case is to use the checkbox matrix exposed in your
+datasource's edit view (``Sources -> File Datasources ->
+[your datasource] -> Edit -> [tab] List Datasource Column``).
+Clicking the ``GroupBy`` and ``Filterable`` checkboxes will make the column
+appear in the related dropdowns while in explore view. Checking
+``Count Distinct``, ``Min``, ``Max`` ``Average`` or ``Sum`` will result in 
+new metrics that will appear in the ``List Metrics`` tab. 
+You can create your own aggregations manually from the ``List Metrics`` tab for
+more complex cases.
+File Datasources allow post aggregation in Superset. Any Metric that has been
+defined for the Datasource can be used as part of a Result Filter to limit
+the returned data.
diff --git a/docs/index.rst b/docs/index.rst
index eba2e94516..5a3b831630 100644
--- a/docs/index.rst
+++ b/docs/index.rst
@@ -25,10 +25,10 @@ intelligence web application
     endorsed by the ASF.
 - A rich set of data visualizations
 - An easy-to-use interface for exploring and visualizing data
@@ -61,7 +61,7 @@ Features
 .. toctree::
     :maxdepth: 2
@@ -74,6 +74,7 @@ Contents
+    files
diff --git a/setup.py b/setup.py
index 8d0dba3a0f..f1997274ce 100644
--- a/setup.py
+++ b/setup.py
@@ -79,6 +79,17 @@ def get_git_sha():
+        # PandasConnector deps
+        # @TODO sort into main deps
+        # prior to merge. Currently,
+        # separate to minimize merge conflicts
+        'beautifulsoup4==4.6.0',
+        'bottleneck==1.2.1',
+        'feather-format==0.4.0',
+        'html5lib==0.999999999',
+        'lxml==3.8.0',
+        'numexpr==2.6.4',
+        'xlrd==1.1.0',
         'cors': ['Flask-Cors>=2.0.0'],
diff --git a/superset/__init__.py b/superset/__init__.py
index 9ef665ecca..2308eddb1e 100644
--- a/superset/__init__.py
+++ b/superset/__init__.py
@@ -90,6 +90,13 @@ def get_js_manifest():
 cache = utils.setup_cache(app, conf.get('CACHE_CONFIG'))
 tables_cache = utils.setup_cache(app, conf.get('TABLE_NAMES_CACHE_CONFIG'))
+# For example:
+#    'CACHE_TYPE': 'contrib.connectors.pandas.cache.dataframe',
+#    'CACHE_DEFAULT_TIMEOUT': 60 * 60 * 24,
+#    'CACHE_DIR': '/tmp/pandasdatasource_cache',
+#    'CACHE_THRESHOLD': 200}
+dataframe_cache = utils.setup_cache(app, conf.get('DATAFRAME_CACHE_CONFIG'))
 migrate = Migrate(app, db, directory=APP_DIR + '/migrations')
diff --git a/superset/assets/javascripts/explore/stores/visTypes.js 
index ef9dc4112c..b8af343f70 100644
--- a/superset/assets/javascripts/explore/stores/visTypes.js
+++ b/superset/assets/javascripts/explore/stores/visTypes.js
@@ -1374,7 +1374,7 @@ export function sectionsToRender(vizType, datasourceType) 
   const viz = visTypes[vizType];
   return [].concat(
-    datasourceType === 'table' ? sections.sqlaTimeSeries : 
+    (datasourceType === 'table' || datasourceType === 'pandas') ? 
sections.sqlaTimeSeries : sections.druidTimeSeries,
     datasourceType === 'table' ? sections.sqlClause : [],
     datasourceType === 'table' ? sections.filters[0] : sections.filters,
diff --git a/superset/migrations/versions/b2cd059e8803_add_pandasdatasource.py 
new file mode 100644
index 0000000000..b8545d457b
--- /dev/null
+++ b/superset/migrations/versions/b2cd059e8803_add_pandasdatasource.py
@@ -0,0 +1,108 @@
+"""Add PandasDatasource
+Revision ID: b2cd059e8803
+Revises: ca69c70ec99b
+Create Date: 2017-08-30 09:33:40.431377
+# revision identifiers, used by Alembic.
+revision = 'b2cd059e8803'
+down_revision = 'ca69c70ec99b'
+from alembic import op
+import sqlalchemy as sa
+import sqlalchemy_utils
+    ('csv', 'CSV'),
+    ('html', 'HTML')
+def upgrade():
+    # ### commands auto generated by Alembic - please adjust! ###
+    op.create_table('pandas_datasources',
+    sa.Column('created_on', sa.DateTime(), nullable=True),
+    sa.Column('changed_on', sa.DateTime(), nullable=True),
+    sa.Column('id', sa.Integer(), nullable=False),
+    sa.Column('description', sa.Text(), nullable=True),
+    sa.Column('default_endpoint', sa.Text(), nullable=True),
+    sa.Column('is_featured', sa.Boolean(), nullable=True),
+    sa.Column('filter_select_enabled', sa.Boolean(), nullable=True),
+    sa.Column('offset', sa.Integer(), nullable=True),
+    sa.Column('cache_timeout', sa.Integer(), nullable=True),
+    sa.Column('params', sa.String(length=1000), nullable=True),
+    sa.Column('perm', sa.String(length=1000), nullable=True),
+    sa.Column('name', sa.String(length=100), nullable=False),
+    sa.Column('source_url', sa.String(length=1000), nullable=False),
+    sa.Column('source_auth', sqlalchemy_utils.types.json.JSONType(), 
+    sa.Column('source_parameters', sqlalchemy_utils.types.json.JSONType(), 
+    sa.Column('format', sqlalchemy_utils.types.choice.ChoiceType(FORMATS), 
+    sa.Column('additional_parameters', sqlalchemy_utils.types.json.JSONType(), 
+    sa.Column('user_id', sa.Integer(), nullable=True),
+    sa.Column('fetch_values_predicate', sa.String(length=1000), nullable=True),
+    sa.Column('main_dttm_col', sa.String(length=250), nullable=True),
+    sa.Column('created_by_fk', sa.Integer(), nullable=True),
+    sa.Column('changed_by_fk', sa.Integer(), nullable=True),
+    sa.ForeignKeyConstraint(['changed_by_fk'], ['ab_user.id'], ),
+    sa.ForeignKeyConstraint(['created_by_fk'], ['ab_user.id'], ),
+    sa.ForeignKeyConstraint(['user_id'], ['ab_user.id'], ),
+    sa.PrimaryKeyConstraint('id')
+    )
+    op.create_table('pandas_columns',
+    sa.Column('created_on', sa.DateTime(), nullable=True),
+    sa.Column('changed_on', sa.DateTime(), nullable=True),
+    sa.Column('column_name', sa.String(length=255), nullable=True),
+    sa.Column('verbose_name', sa.String(length=1024), nullable=True),
+    sa.Column('is_active', sa.Boolean(), nullable=True),
+    sa.Column('type', sa.String(length=32), nullable=True),
+    sa.Column('groupby', sa.Boolean(), nullable=True),
+    sa.Column('count_distinct', sa.Boolean(), nullable=True),
+    sa.Column('sum', sa.Boolean(), nullable=True),
+    sa.Column('avg', sa.Boolean(), nullable=True),
+    sa.Column('max', sa.Boolean(), nullable=True),
+    sa.Column('min', sa.Boolean(), nullable=True),
+    sa.Column('filterable', sa.Boolean(), nullable=True),
+    sa.Column('description', sa.Text(), nullable=True),
+    sa.Column('expression', sa.Text(), nullable=True),
+    sa.Column('id', sa.Integer(), nullable=False),
+    sa.Column('pandas_datasource_id', sa.Integer(), nullable=True),
+    sa.Column('created_by_fk', sa.Integer(), nullable=True),
+    sa.Column('changed_by_fk', sa.Integer(), nullable=True),
+    sa.ForeignKeyConstraint(['changed_by_fk'], ['ab_user.id'], ),
+    sa.ForeignKeyConstraint(['created_by_fk'], ['ab_user.id'], ),
+    sa.ForeignKeyConstraint(['pandas_datasource_id'], 
['pandas_datasources.id'], ),
+    sa.PrimaryKeyConstraint('id')
+    )
+    op.create_table('pandas_metrics',
+    sa.Column('created_on', sa.DateTime(), nullable=True),
+    sa.Column('changed_on', sa.DateTime(), nullable=True),
+    sa.Column('metric_name', sa.String(length=512), nullable=True),
+    sa.Column('verbose_name', sa.String(length=1024), nullable=True),
+    sa.Column('metric_type', sa.String(length=32), nullable=True),
+    sa.Column('description', sa.Text(), nullable=True),
+    sa.Column('warning_text', sa.Text(), nullable=True),
+    sa.Column('is_restricted', sa.Boolean(), nullable=True),
+    sa.Column('d3format', sa.String(length=128), nullable=True),
+    sa.Column('id', sa.Integer(), nullable=False),
+    sa.Column('pandas_datasource_id', sa.Integer(), nullable=True),
+    sa.Column('source', sa.Text(), nullable=True),
+    sa.Column('expression', sa.Text(), nullable=True),
+    sa.Column('created_by_fk', sa.Integer(), nullable=True),
+    sa.Column('changed_by_fk', sa.Integer(), nullable=True),
+    sa.ForeignKeyConstraint(['changed_by_fk'], ['ab_user.id'], ),
+    sa.ForeignKeyConstraint(['created_by_fk'], ['ab_user.id'], ),
+    sa.ForeignKeyConstraint(['pandas_datasource_id'], 
['pandas_datasources.id'], ),
+    sa.PrimaryKeyConstraint('id')
+    )
+    # ### end Alembic commands ###
+def downgrade():
+    # ### commands auto generated by Alembic - please adjust! ###
+    op.drop_table('pandas_metrics')
+    op.drop_table('pandas_columns')
+    op.drop_table('pandas_datasources')
+    # ### end Alembic commands ###
diff --git a/superset/migrations/versions/ff40cbbdde10_.py 
new file mode 100644
index 0000000000..917d31317b
--- /dev/null
+++ b/superset/migrations/versions/ff40cbbdde10_.py
@@ -0,0 +1,22 @@
+"""merge PandasConnector
+Revision ID: ff40cbbdde10
+Revises: ('a9c47e2c1547', 'b2cd059e8803')
+Create Date: 2017-09-28 09:07:11.852491
+# revision identifiers, used by Alembic.
+revision = 'ff40cbbdde10'
+down_revision = ('4736ec66ce19', 'b2cd059e8803')
+from alembic import op
+import sqlalchemy as sa
+def upgrade():
+    pass
+def downgrade():
+    pass
diff --git a/superset/views/core.py b/superset/views/core.py
index 4d55fae635..106fdb442b 100755
--- a/superset/views/core.py
+++ b/superset/views/core.py
@@ -1343,6 +1343,9 @@ def checkbox(self, model_view, id_, attr, value):
+        if 'pandas' in ConnectorRegistry.sources:
+            model = ConnectorRegistry.sources['pandas'].column_class
+            modelview_to_model[model_view] = model
         model = modelview_to_model[model_view]
         obj = db.session.query(model).filter_by(id=id_).first()
         if obj:


