HyukjinKwon commented on code in PR #39544:
URL: https://github.com/apache/spark/pull/39544#discussion_r1068966627
##########
python/pyspark/sql/connect/session.py:
##########
@@ -215,47 +219,37 @@ def createDataFrame(
_inferred_schema: Optional[StructType] = None
if isinstance(data, pd.DataFrame):
- from pandas.api.types import ( # type: ignore[attr-defined]
- is_datetime64_dtype,
- is_datetime64tz_dtype,
- )
- from pyspark.sql.pandas.types import (
- _check_series_convert_timestamps_internal,
- _get_local_timezone,
+ # Logic was borrowed from `_create_from_pandas_with_arrow` in
+ # `pyspark.sql.pandas.conversion.py`. Should ideally deduplicate
the logics.
+
+ # If no schema supplied by user then get the names of columns only
+ if schema is None:
+ _cols = [str(x) if not isinstance(x, str) else x for x in
data.columns]
+
+ # Determine arrow types to coerce data when creating batches
+ if isinstance(schema, StructType):
+ arrow_types = [to_arrow_type(f.dataType) for f in
schema.fields]
+ _cols = [str(x) if not isinstance(x, str) else x for x in
schema.fieldNames()]
+ elif isinstance(schema, DataType):
+ raise ValueError("Single data type %s is not supported with
Arrow" % str(schema))
+ else:
+ # Any timestamps must be coerced to be compatible with Spark
+ arrow_types = [
+ to_arrow_type(TimestampType())
+ if is_datetime64_dtype(t) or is_datetime64tz_dtype(t)
+ else None
+ for t in data.dtypes
+ ]
+
+ ser = ArrowStreamPandasSerializer(
+ _get_local_timezone(), # 'spark.session.timezone' should be
respected
+ False, #
'spark.sql.execution.pandas.convertToArrowArraySafely' should be respected
+ True,
)
- # First, check if we need to create a copy of the input data to
adjust
- # the timestamps.
- input_data = data
- has_timestamp_data = any(
- [is_datetime64_dtype(data[c]) or
is_datetime64tz_dtype(data[c]) for c in data]
+ _table = pa.Table.from_batches(
+ [ser._create_batch([(c, t) for (_, c), t in zip(data.items(),
arrow_types)])]
)
- if has_timestamp_data:
- input_data = data.copy()
- # We need double conversions for the truncation, first
truncate to microseconds.
- for col in input_data:
- if is_datetime64tz_dtype(input_data[col].dtype):
- input_data[col] =
_check_series_convert_timestamps_internal(
- input_data[col], _get_local_timezone()
- ).astype("datetime64[us, UTC]")
- elif is_datetime64_dtype(input_data[col].dtype):
- input_data[col] =
input_data[col].astype("datetime64[us]")
-
- # Create a new schema and change the types to the truncated
microseconds.
- pd_schema = pa.Schema.from_pandas(input_data)
- new_schema = pa.schema([])
- for x in range(len(pd_schema.types)):
- f = pd_schema.field(x)
- # TODO(SPARK-42027) Add support for struct types.
- if isinstance(f.type, pa.TimestampType) and f.type.unit ==
"ns":
- tmp = f.with_type(pa.timestamp("us"))
Review Comment:
Here, `pa.timestamp("us")` was mapped to `TimestampNTZType`:
https://github.com/apache/spark/blob/70502d7a0436c6c41312f7d2451899f317cd4c20/sql/catalyst/src/main/scala/org/apache/spark/sql/util/ArrowUtils.scala#L76-L77
Since timestamps in Spark SQL do not contain timezone, we should set it to
UTC (for internal purpose and for legacy behaviour) to interpret it as the
local (Spark Session) time (just like the legacy `java.time.Timestamp`),
instead of setting it to `None`.
Setting it to `None` results in TimestampNTZ type, and this can be
configured by setting `spark.sql.timestampType` to `TIMESTAMP_NTZ`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]