Repository: incubator-airflow Updated Branches: refs/heads/master beb285205 -> 935ede22a
[AIRFLOW-643] Improve date handling for sf_hook Closes #1898 from Jalepeno112/feature/salesforce- hook-coerce Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/935ede22 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/935ede22 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/935ede22 Branch: refs/heads/master Commit: 935ede22a2d447a9ef5cc03149d5306e13bd951f Parents: beb2852 Author: Giovanni Briggs <gbriggs2...@gmail.com> Authored: Mon Nov 21 13:32:42 2016 -0800 Committer: Chris Riccomini <chr...@wepay.com> Committed: Mon Nov 21 13:32:42 2016 -0800 ---------------------------------------------------------------------- airflow/contrib/hooks/salesforce_hook.py | 28 +++++++++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/935ede22/airflow/contrib/hooks/salesforce_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/salesforce_hook.py b/airflow/contrib/hooks/salesforce_hook.py index efb75d0..67d1605 100644 --- a/airflow/contrib/hooks/salesforce_hook.py +++ b/airflow/contrib/hooks/salesforce_hook.py @@ -171,6 +171,9 @@ class SalesforceHook(BaseHook): try: col = pd.to_datetime(col) except ValueError: + logging.warning( + "Could not convert field to timestamps: {0}".format(col.name) + ) return col # now convert the newly created datetimes into timestamps @@ -232,6 +235,8 @@ class SalesforceHook(BaseHook): converted into Unix timestamps. False if you want them to be left in the same format as they were in Salesforce. + Leaving the value as False will result + in datetimes being strings. *Defaults to False* :param record_time_added: *(optional)* True if you want to add a Unix timestamp field to the resulting data @@ -253,8 +258,27 @@ class SalesforceHook(BaseHook): # convert columns with datetime strings to datetimes # not all strings will be datetimes, so we ignore any errors that occur - if coerce_to_timestamp: - possible_timestamp_cols = df.columns[df.dtypes == "object"] + # we get the object's definition at this point and only consider + # features that are DATE or DATETIME + if coerce_to_timestamp and df.shape[0] > 0: + # get the object name out of the query results + # it's stored in the "attributes" dictionary + # for each returned record + object_name = query_results[0]['attributes']['type'] + + logging.info("Coercing timestamps for: {0}".format(object_name)) + + schema = self.describe_object(object_name) + + # possible columns that can be convereted to timestamps + # are the ones that are either date or datetime types + # strings are too general and we risk unintentional conversion + possible_timestamp_cols = [ + i['name'].lower() + for i in schema['fields'] + if i['type'] in ["date", "datetime"] and + i['name'].lower() in df.columns + ] df[possible_timestamp_cols] = df[possible_timestamp_cols].apply( lambda x: self._to_timestamp(x) )