HyukjinKwon commented on code in PR #39544:
URL: https://github.com/apache/spark/pull/39544#discussion_r1068966627


##########
python/pyspark/sql/connect/session.py:
##########
@@ -215,47 +219,37 @@ def createDataFrame(
         _inferred_schema: Optional[StructType] = None
 
         if isinstance(data, pd.DataFrame):
-            from pandas.api.types import (  # type: ignore[attr-defined]
-                is_datetime64_dtype,
-                is_datetime64tz_dtype,
-            )
-            from pyspark.sql.pandas.types import (
-                _check_series_convert_timestamps_internal,
-                _get_local_timezone,
+            # Logic was borrowed from `_create_from_pandas_with_arrow` in
+            # `pyspark.sql.pandas.conversion.py`. Should ideally deduplicate 
the logics.
+
+            # If no schema supplied by user then get the names of columns only
+            if schema is None:
+                _cols = [str(x) if not isinstance(x, str) else x for x in 
data.columns]
+
+            # Determine arrow types to coerce data when creating batches
+            if isinstance(schema, StructType):
+                arrow_types = [to_arrow_type(f.dataType) for f in 
schema.fields]
+                _cols = [str(x) if not isinstance(x, str) else x for x in 
schema.fieldNames()]
+            elif isinstance(schema, DataType):
+                raise ValueError("Single data type %s is not supported with 
Arrow" % str(schema))
+            else:
+                # Any timestamps must be coerced to be compatible with Spark
+                arrow_types = [
+                    to_arrow_type(TimestampType())
+                    if is_datetime64_dtype(t) or is_datetime64tz_dtype(t)
+                    else None
+                    for t in data.dtypes
+                ]
+
+            ser = ArrowStreamPandasSerializer(
+                _get_local_timezone(),  # 'spark.session.timezone' should be 
respected
+                False,  # 
'spark.sql.execution.pandas.convertToArrowArraySafely' should be respected
+                True,
             )
 
-            # First, check if we need to create a copy of the input data to 
adjust
-            # the timestamps.
-            input_data = data
-            has_timestamp_data = any(
-                [is_datetime64_dtype(data[c]) or 
is_datetime64tz_dtype(data[c]) for c in data]
+            _table = pa.Table.from_batches(
+                [ser._create_batch([(c, t) for (_, c), t in zip(data.items(), 
arrow_types)])]
             )
-            if has_timestamp_data:
-                input_data = data.copy()
-                # We need double conversions for the truncation, first 
truncate to microseconds.
-                for col in input_data:
-                    if is_datetime64tz_dtype(input_data[col].dtype):
-                        input_data[col] = 
_check_series_convert_timestamps_internal(
-                            input_data[col], _get_local_timezone()
-                        ).astype("datetime64[us, UTC]")
-                    elif is_datetime64_dtype(input_data[col].dtype):
-                        input_data[col] = 
input_data[col].astype("datetime64[us]")
-
-                # Create a new schema and change the types to the truncated 
microseconds.
-                pd_schema = pa.Schema.from_pandas(input_data)
-                new_schema = pa.schema([])
-                for x in range(len(pd_schema.types)):
-                    f = pd_schema.field(x)
-                    # TODO(SPARK-42027) Add support for struct types.
-                    if isinstance(f.type, pa.TimestampType) and f.type.unit == 
"ns":
-                        tmp = f.with_type(pa.timestamp("us"))

Review Comment:
   Here, `pa.timestamp("us")` was mapped to `TimestampNTZType`: 
   
   
https://github.com/apache/spark/blob/70502d7a0436c6c41312f7d2451899f317cd4c20/sql/catalyst/src/main/scala/org/apache/spark/sql/util/ArrowUtils.scala#L76-L77
   
   Since timestamps in Spark SQL do not contain timezone, we should set it to 
UTC (for internal purpose and for legacy behaviour) to interpret it as the 
local (Spark Session) time (just like the legacy `java.time.Timestamp`), 
instead of setting it to `None`.
   
   Setting it to `None` results in TimestampNTZ type, and this can be 
configured by setting `spark.sql.timestampType` to `TIMESTAMP_NTZ`.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to