itholic commented on a change in pull request #35191:
URL: https://github.com/apache/spark/pull/35191#discussion_r800269304



##########
File path: python/pyspark/pandas/series.py
##########
@@ -5228,22 +5228,128 @@ def asof(self, where: Union[Any, List]) -> 
Union[Scalar, "Series"]:
             where = [where]
         index_scol = self._internal.index_spark_columns[0]
         index_type = self._internal.spark_type_for(index_scol)
+
+        # e.g where = [10, 20]
+        # In the comments below , will explain how the dataframe will look 
after transformations.
+        # e.g pd.Series([2, 1, np.nan, 4], index=[10, 20, 30, 40], 
name="Koalas")
+
+        column_prefix_constant = "col_"
         cond = [
-            F.max(F.when(index_scol <= SF.lit(index).cast(index_type), 
self.spark.column))
-            for index in where
+            F.when(
+                index_scol <= SF.lit(index).cast(index_type),
+                F.struct(
+                    F.lit(column_prefix_constant + str(index) + "_" + 
str(idx)).alias("identifier"),
+                    self.spark.column.alias("col_value"),
+                ),
+            ).alias(column_prefix_constant + str(index) + "_" + str(idx))

Review comment:
       I think we might want to return the pandas-on-Spark Series rather than 
pandas Series, and should leverage the pandas DataFrame directly when only the 
`where` has duplicate item.
   
   So, how about this ??
   
   ```python
           cond = [
               F.last(
                   F.when(index_scol <= SF.lit(index).cast(index_type), 
self.spark.column),
                   ignorenulls=True,
               )
               for index in where
           ]
   ```
   
   Then
   
   ```python
           # The data is expected to be small so it's fine to transpose/use 
default index.
           with ps.option_context("compute.default_index_type", "distributed", 
"compute.max_rows", 1):
               if len(where) == len(set(where)):
                   psdf: DataFrame = DataFrame(sdf)
                   psdf.columns = pd.Index(where)
                   return first_series(psdf.transpose()).rename(self.name)
               else:
                   # If `where` has duplicate items, leverage the pandas 
directly
                   # since pandas API on Spark doesn't support the duplicate 
column name.
                   pdf: pd.DataFrame = sdf.limit(1).toPandas()
                   pdf.columns = pd.Index(where)
                   return 
first_series(DataFrame(pdf.transpose())).rename(self.name)
   ```
   
   ??




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to