BryanCutler commented on a change in pull request #32332:
URL: https://github.com/apache/spark/pull/32332#discussion_r679351798



##########
File path: python/pyspark/sql/session.py
##########
@@ -483,13 +483,22 @@ def _inferSchema(self, rdd, samplingRatio=None, 
names=None):
                 row, names, 
infer_dict_as_struct=infer_dict_as_struct)).reduce(_merge_type)
         return schema
 
-    def _createFromRDD(self, rdd, schema, samplingRatio):
+    def _create_verified_converter(self, schema, verifySchema):
+        converter = _create_converter(schema)
+        verify_func = _make_type_verifier(schema) if verifySchema else lambda 
_: True
+
+        def verify_and_convert(obj):
+            verify_func(obj)
+            return converter(obj)
+        return verify_and_convert

Review comment:
       I'm a little confused by this, if `verifySchema` is `False` then could 
you just return `converter`?

##########
File path: python/pyspark/sql/tests/test_dataframe.py
##########
@@ -759,6 +759,30 @@ def test_create_dataframe_from_pandas_with_dst(self):
                 os.environ['TZ'] = orig_env_tz
             time.tzset()
 
+    # SPARK-35211: inferred schema verification
+    @unittest.skipIf(not have_pandas, pandas_requirement_message)  # type: 
ignore
+    def test_create_dataframe_from_pandas_with_mismatched_udt(self):
+        from pyspark.testing.sqlutils import ExamplePoint
+        import pandas as pd
+        pdf = pd.DataFrame({'point': pd.Series([ExamplePoint(1, 1), 
ExamplePoint(2, 2)])})
+        # The underlying sqlType of ExamplePoint is ArrayType(DoubleType(), 
False)
+        # There is a data type mismatch between 1 and DoubleType, a TypeError 
is expected
+        with self.assertRaises(TypeError):
+            with self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": 
"false"}):
+                self.spark.createDataFrame(pdf)
+        # With verifySchema disabled, there will be unexpected behaviours
+        self.assertEquals(self.spark.createDataFrame(pdf, 
verifySchema=False).collect(),
+                          [Row(point=ExamplePoint(0.0, 0.0)), 
Row(point=ExamplePoint(0.0, 0.0))])

Review comment:
       Can you add a positive test case where `verifySchema` is `True` and 
results are correct?

##########
File path: python/pyspark/sql/session.py
##########
@@ -697,12 +712,17 @@ def prepare(obj):
                 verify_func(obj)
                 return obj,
         else:
+            no_need_to_prepare = True
             prepare = lambda obj: obj
 
         if isinstance(data, RDD):
-            rdd, schema = self._createFromRDD(data.map(prepare), schema, 
samplingRatio)
+            rdd, schema = self._createFromRDD(
+                data if no_need_to_prepare else data.map(prepare),

Review comment:
       I think you could just say `data if not verifySchema else ...` and 
remove `no_need_to_prepare`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to