Repository: incubator-airflow
Updated Branches:
  refs/heads/master beb285205 -> 935ede22a


[AIRFLOW-643] Improve date handling for sf_hook

Closes #1898 from Jalepeno112/feature/salesforce-
hook-coerce


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/935ede22
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/935ede22
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/935ede22

Branch: refs/heads/master
Commit: 935ede22a2d447a9ef5cc03149d5306e13bd951f
Parents: beb2852
Author: Giovanni Briggs <gbriggs2...@gmail.com>
Authored: Mon Nov 21 13:32:42 2016 -0800
Committer: Chris Riccomini <chr...@wepay.com>
Committed: Mon Nov 21 13:32:42 2016 -0800

----------------------------------------------------------------------
 airflow/contrib/hooks/salesforce_hook.py | 28 +++++++++++++++++++++++++--
 1 file changed, 26 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/935ede22/airflow/contrib/hooks/salesforce_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/salesforce_hook.py 
b/airflow/contrib/hooks/salesforce_hook.py
index efb75d0..67d1605 100644
--- a/airflow/contrib/hooks/salesforce_hook.py
+++ b/airflow/contrib/hooks/salesforce_hook.py
@@ -171,6 +171,9 @@ class SalesforceHook(BaseHook):
         try:
             col = pd.to_datetime(col)
         except ValueError:
+            logging.warning(
+                "Could not convert field to timestamps: {0}".format(col.name)
+            )
             return col
 
         # now convert the newly created datetimes into timestamps
@@ -232,6 +235,8 @@ class SalesforceHook(BaseHook):
                                     converted into Unix timestamps.
                                     False if you want them to be left in the
                                     same format as they were in Salesforce.
+                                    Leaving the value as False will result
+                                    in datetimes being strings.
                                     *Defaults to False*
         :param record_time_added:   *(optional)* True if you want to add a
                                     Unix timestamp field to the resulting data
@@ -253,8 +258,27 @@ class SalesforceHook(BaseHook):
 
         # convert columns with datetime strings to datetimes
         # not all strings will be datetimes, so we ignore any errors that occur
-        if coerce_to_timestamp:
-            possible_timestamp_cols = df.columns[df.dtypes == "object"]
+        # we get the object's definition at this point and only consider
+        # features that are DATE or DATETIME
+        if coerce_to_timestamp and df.shape[0] > 0:
+            # get the object name out of the query results
+            # it's stored in the "attributes" dictionary
+            # for each returned record
+            object_name = query_results[0]['attributes']['type']
+
+            logging.info("Coercing timestamps for: {0}".format(object_name))
+
+            schema = self.describe_object(object_name)
+
+            # possible columns that can be convereted to timestamps
+            # are the ones that are either date or datetime types
+            # strings are too general and we risk unintentional conversion
+            possible_timestamp_cols = [
+                i['name'].lower()
+                for i in schema['fields']
+                if i['type'] in ["date", "datetime"] and
+                i['name'].lower() in df.columns
+            ]
             df[possible_timestamp_cols] = df[possible_timestamp_cols].apply(
                 lambda x: self._to_timestamp(x)
             )

Reply via email to