ueshin commented on a change in pull request #34053:
URL: https://github.com/apache/spark/pull/34053#discussion_r716999030



##########
File path: python/pyspark/pandas/namespace.py
##########
@@ -2747,6 +2751,499 @@ def merge(
     )
 
 
+def merge_asof(
+    left: Union[DataFrame, Series],
+    right: Union[DataFrame, Series],
+    on: Optional[Name] = None,
+    left_on: Optional[Name] = None,
+    right_on: Optional[Name] = None,
+    left_index: bool = False,
+    right_index: bool = False,
+    by: Optional[Union[Name, List[Name]]] = None,
+    left_by: Optional[Union[Name, List[Name]]] = None,
+    right_by: Optional[Union[Name, List[Name]]] = None,
+    suffixes: Tuple[str, str] = ("_x", "_y"),
+    tolerance: Optional[Any] = None,
+    allow_exact_matches: bool = True,
+    direction: str = "backward",
+) -> DataFrame:
+    """
+    Perform an asof merge.
+
+    This is similar to a left-join except that we match on nearest
+    key rather than equal keys.
+
+    For each row in the left DataFrame:
+
+      - A "backward" search selects the last row in the right DataFrame whose
+        'on' key is less than or equal to the left's key.
+
+      - A "forward" search selects the first row in the right DataFrame whose
+        'on' key is greater than or equal to the left's key.
+
+      - A "nearest" search selects the row in the right DataFrame whose 'on'
+        key is closest in absolute distance to the left's key.
+
+    Optionally match on equivalent keys with 'by' before searching with 'on'.
+
+    .. versionadded:: 3.3.0
+
+    Parameters
+    ----------
+    left : DataFrame or named Series
+    right : DataFrame or named Series
+    on : label
+        Field name to join on. Must be found in both DataFrames.
+        The data MUST be ordered. Furthermore this must be a numeric column,
+        such as datetimelike, integer, or float. On or left_on/right_on
+        must be given.
+    left_on : label
+        Field name to join on in left DataFrame.
+    right_on : label
+        Field name to join on in right DataFrame.
+    left_index : bool
+        Use the index of the left DataFrame as the join key.
+    right_index : bool
+        Use the index of the right DataFrame as the join key.
+    by : column name or list of column names
+        Match on these columns before performing merge operation.
+    left_by : column name
+        Field names to match on in the left DataFrame.
+    right_by : column name
+        Field names to match on in the right DataFrame.
+    suffixes : 2-length sequence (tuple, list, ...)
+        Suffix to apply to overlapping column names in the left and right
+        side, respectively.
+    tolerance : int or Timedelta, optional, default None
+        Select asof tolerance within this range; must be compatible
+        with the merge index.
+    allow_exact_matches : bool, default True
+
+        - If True, allow matching with the same 'on' value
+          (i.e. less-than-or-equal-to / greater-than-or-equal-to)
+        - If False, don't match the same 'on' value
+          (i.e., strictly less-than / strictly greater-than).
+
+    direction : 'backward' (default), 'forward', or 'nearest'
+        Whether to search for prior, subsequent, or closest matches.
+
+    Returns
+    -------
+    merged : DataFrame
+
+    See Also
+    --------
+    merge : Merge with a database-style join.
+    merge_ordered : Merge with optional filling/interpolation.
+
+    Examples
+    --------
+    >>> left = ps.DataFrame({"a": [1, 5, 10], "left_val": ["a", "b", "c"]})
+    >>> left
+        a left_val
+    0   1        a
+    1   5        b
+    2  10        c
+
+    >>> right = ps.DataFrame({"a": [1, 2, 3, 6, 7], "right_val": [1, 2, 3, 6, 
7]})
+    >>> right
+       a  right_val
+    0  1          1
+    1  2          2
+    2  3          3
+    3  6          6
+    4  7          7
+
+    >>> ps.merge_asof(left, right, 
on="a").sort_values("a").reset_index(drop=True)
+        a left_val  right_val
+    0   1        a          1
+    1   5        b          3
+    2  10        c          7
+
+    >>> ps.merge_asof(
+    ...     left,
+    ...     right,
+    ...     on="a",
+    ...     allow_exact_matches=False
+    ... ).sort_values("a").reset_index(drop=True)
+        a left_val  right_val
+    0   1        a        NaN
+    1   5        b        3.0
+    2  10        c        7.0
+
+    >>> ps.merge_asof(
+    ...     left,
+    ...     right,
+    ...     on="a",
+    ...     direction="forward"
+    ... ).sort_values("a").reset_index(drop=True)
+        a left_val  right_val
+    0   1        a        1.0
+    1   5        b        6.0
+    2  10        c        NaN
+
+    >>> ps.merge_asof(
+    ...     left,
+    ...     right,
+    ...     on="a",
+    ...     direction="nearest"
+    ... ).sort_values("a").reset_index(drop=True)
+        a left_val  right_val
+    0   1        a          1
+    1   5        b          6
+    2  10        c          7
+
+    We can use indexed DataFrames as well.
+
+    >>> left = ps.DataFrame({"left_val": ["a", "b", "c"]}, index=[1, 5, 10])
+    >>> left
+       left_val
+    1         a
+    5         b
+    10        c
+
+    >>> right = ps.DataFrame({"right_val": [1, 2, 3, 6, 7]}, index=[1, 2, 3, 
6, 7])
+    >>> right
+       right_val
+    1          1
+    2          2
+    3          3
+    6          6
+    7          7
+
+    >>> ps.merge_asof(left, right, left_index=True, 
right_index=True).sort_index()
+       left_val  right_val
+    1         a          1
+    5         b          3
+    10        c          7
+
+    Here is a real-world times-series example
+
+    >>> quotes = ps.DataFrame(
+    ...     {
+    ...         "time": [
+    ...             pd.Timestamp("2016-05-25 13:30:00.023"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.023"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.030"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.041"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.048"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.049"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.072"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.075")
+    ...         ],
+    ...         "ticker": [
+    ...                "GOOG",
+    ...                "MSFT",
+    ...                "MSFT",
+    ...                "MSFT",
+    ...                "GOOG",
+    ...                "AAPL",
+    ...                "GOOG",
+    ...                "MSFT"
+    ...            ],
+    ...            "bid": [720.50, 51.95, 51.97, 51.99, 720.50, 97.99, 720.50, 
52.01],
+    ...            "ask": [720.93, 51.96, 51.98, 52.00, 720.93, 98.01, 720.88, 
52.03]
+    ...     }
+    ... )
+    >>> quotes
+                         time ticker     bid     ask
+    0 2016-05-25 13:30:00.023   GOOG  720.50  720.93
+    1 2016-05-25 13:30:00.023   MSFT   51.95   51.96
+    2 2016-05-25 13:30:00.030   MSFT   51.97   51.98
+    3 2016-05-25 13:30:00.041   MSFT   51.99   52.00
+    4 2016-05-25 13:30:00.048   GOOG  720.50  720.93
+    5 2016-05-25 13:30:00.049   AAPL   97.99   98.01
+    6 2016-05-25 13:30:00.072   GOOG  720.50  720.88
+    7 2016-05-25 13:30:00.075   MSFT   52.01   52.03
+
+    >>> trades = ps.DataFrame(
+    ...        {
+    ...            "time": [
+    ...                pd.Timestamp("2016-05-25 13:30:00.023"),
+    ...                pd.Timestamp("2016-05-25 13:30:00.038"),
+    ...                pd.Timestamp("2016-05-25 13:30:00.048"),
+    ...                pd.Timestamp("2016-05-25 13:30:00.048"),
+    ...                pd.Timestamp("2016-05-25 13:30:00.048")
+    ...            ],
+    ...            "ticker": ["MSFT", "MSFT", "GOOG", "GOOG", "AAPL"],
+    ...            "price": [51.95, 51.95, 720.77, 720.92, 98.0],
+    ...            "quantity": [75, 155, 100, 100, 100]
+    ...        }
+    ...    )
+    >>> trades
+                         time ticker   price  quantity
+    0 2016-05-25 13:30:00.023   MSFT   51.95        75
+    1 2016-05-25 13:30:00.038   MSFT   51.95       155
+    2 2016-05-25 13:30:00.048   GOOG  720.77       100
+    3 2016-05-25 13:30:00.048   GOOG  720.92       100
+    4 2016-05-25 13:30:00.048   AAPL   98.00       100
+
+    By default we are taking the asof of the quotes
+
+    >>> ps.merge_asof(
+    ...    trades, quotes, on="time", by="ticker"
+    ... ).sort_values(["time", "ticker", "price"]).reset_index(drop=True)
+                         time ticker   price  quantity     bid     ask
+    0 2016-05-25 13:30:00.023   MSFT   51.95        75   51.95   51.96
+    1 2016-05-25 13:30:00.038   MSFT   51.95       155   51.97   51.98
+    2 2016-05-25 13:30:00.048   AAPL   98.00       100     NaN     NaN
+    3 2016-05-25 13:30:00.048   GOOG  720.77       100  720.50  720.93
+    4 2016-05-25 13:30:00.048   GOOG  720.92       100  720.50  720.93
+
+    We only asof within 2ms between the quote time and the trade time
+
+    >>> ps.merge_asof(
+    ...     trades,
+    ...     quotes,
+    ...     on="time",
+    ...     by="ticker",
+    ...     tolerance=F.expr("INTERVAL 2 MILLISECONDS")  # pd.Timedelta("2ms")
+    ... ).sort_values(["time", "ticker", "price"]).reset_index(drop=True)
+                         time ticker   price  quantity     bid     ask
+    0 2016-05-25 13:30:00.023   MSFT   51.95        75   51.95   51.96
+    1 2016-05-25 13:30:00.038   MSFT   51.95       155     NaN     NaN
+    2 2016-05-25 13:30:00.048   AAPL   98.00       100     NaN     NaN
+    3 2016-05-25 13:30:00.048   GOOG  720.77       100  720.50  720.93
+    4 2016-05-25 13:30:00.048   GOOG  720.92       100  720.50  720.93
+
+    We only asof within 10ms between the quote time and the trade time
+    and we exclude exact matches on time. However *prior* data will
+    propagate forward
+
+    >>> ps.merge_asof(
+    ...     trades,
+    ...     quotes,
+    ...     on="time",
+    ...     by="ticker",
+    ...     tolerance=F.expr("INTERVAL 10 MILLISECONDS"),  # 
pd.Timedelta("10ms")

Review comment:
       Yes, it should be handled in pyspark side, but it's still WIP 
[SPARK-33054](https://issues.apache.org/jira/browse/SPARK-33054). We can remove 
this workaround when it's done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to