itholic commented on a change in pull request #35191:
URL: https://github.com/apache/spark/pull/35191#discussion_r800269304
##########
File path: python/pyspark/pandas/series.py
##########
@@ -5228,22 +5228,128 @@ def asof(self, where: Union[Any, List]) ->
Union[Scalar, "Series"]:
where = [where]
index_scol = self._internal.index_spark_columns[0]
index_type = self._internal.spark_type_for(index_scol)
+
+ # e.g where = [10, 20]
+ # In the comments below , will explain how the dataframe will look
after transformations.
+ # e.g pd.Series([2, 1, np.nan, 4], index=[10, 20, 30, 40],
name="Koalas")
+
+ column_prefix_constant = "col_"
cond = [
- F.max(F.when(index_scol <= SF.lit(index).cast(index_type),
self.spark.column))
- for index in where
+ F.when(
+ index_scol <= SF.lit(index).cast(index_type),
+ F.struct(
+ F.lit(column_prefix_constant + str(index) + "_" +
str(idx)).alias("identifier"),
+ self.spark.column.alias("col_value"),
+ ),
+ ).alias(column_prefix_constant + str(index) + "_" + str(idx))
Review comment:
I think we might want to return the pandas-on-Spark Series rather than
pandas Series, and should leverage the pandas DataFrame directly when only the
`where` has duplicate item.
So, how about this ??
```python
cond = [
F.last(
F.when(index_scol <= SF.lit(index).cast(index_type),
self.spark.column),
ignorenulls=True,
)
for index in where
]
```
Then
```python
# The data is expected to be small so it's fine to transpose/use
default index.
with ps.option_context("compute.default_index_type", "distributed",
"compute.max_rows", 1):
if len(where) == len(set(where)):
psdf: DataFrame = DataFrame(sdf)
psdf.columns = pd.Index(where)
return first_series(psdf.transpose()).rename(self.name)
else:
# If `where` has duplicate items, leverage the pandas
directly
# since pandas API on Spark doesn't support the duplicate
column name.
pdf: pd.DataFrame = sdf.limit(1).toPandas()
pdf.columns = pd.Index(where)
return
first_series(DataFrame(pdf.transpose())).rename(self.name)
```
??
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]