ueshin commented on a change in pull request #34053:
URL: https://github.com/apache/spark/pull/34053#discussion_r716998942



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
##########
@@ -1597,3 +1597,119 @@ case class LateralJoin(
     copy(left = newChild)
   }
 }
+
+/**
+ * A logical plan for as-of join.
+ */
+case class AsOfJoin(
+    left: LogicalPlan,
+    right: LogicalPlan,
+    asOfCondition: Expression,
+    condition: Option[Expression],
+    joinType: JoinType,
+    orderExpression: Expression,
+    toleranceAssertion: Option[Expression]) extends BinaryNode {
+
+  require(Seq(Inner, LeftOuter).contains(joinType),
+    s"Unsupported as-of join type $joinType")
+
+  override protected def stringArgs: Iterator[Any] = super.stringArgs.take(5)

Review comment:
       I don't think they are very useful to appear in explains because they 
are for only internal usage.

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
##########
@@ -249,6 +249,20 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog {
               s"join condition '${condition.sql}' " +
                 s"of type ${condition.dataType.catalogString} is not a 
boolean.")
 
+          case j @ AsOfJoin(_, _, _, Some(condition), _, _, _)
+              if condition.dataType != BooleanType =>
+            failAnalysis(
+              s"join condition '${condition.sql}' " +
+                s"of type ${condition.dataType.catalogString} is not a 
boolean.")
+
+          case j @ AsOfJoin(_, _, _, _, _, _, Some(toleranceAssertion)) =>
+            if (!toleranceAssertion.foldable) {
+              failAnalysis("Input argument tolerance must be a constant.")
+            }
+            if (!toleranceAssertion.eval().asInstanceOf[Boolean]) {

Review comment:
       That makes sense, but the result of `eval()` is type dependent, e.g. if 
the `tolerance` is `CatelendarInterval`, the cast will fail, IIUC, whereas if 
we make the condition with expressions, it will only return `Boolean` value 
after the analysis phase. WDYT?

##########
File path: python/pyspark/pandas/namespace.py
##########
@@ -2747,6 +2751,499 @@ def merge(
     )
 
 
+def merge_asof(
+    left: Union[DataFrame, Series],
+    right: Union[DataFrame, Series],
+    on: Optional[Name] = None,
+    left_on: Optional[Name] = None,
+    right_on: Optional[Name] = None,
+    left_index: bool = False,
+    right_index: bool = False,
+    by: Optional[Union[Name, List[Name]]] = None,
+    left_by: Optional[Union[Name, List[Name]]] = None,
+    right_by: Optional[Union[Name, List[Name]]] = None,
+    suffixes: Tuple[str, str] = ("_x", "_y"),
+    tolerance: Optional[Any] = None,
+    allow_exact_matches: bool = True,
+    direction: str = "backward",
+) -> DataFrame:
+    """
+    Perform an asof merge.
+
+    This is similar to a left-join except that we match on nearest
+    key rather than equal keys.
+
+    For each row in the left DataFrame:
+
+      - A "backward" search selects the last row in the right DataFrame whose
+        'on' key is less than or equal to the left's key.
+
+      - A "forward" search selects the first row in the right DataFrame whose
+        'on' key is greater than or equal to the left's key.
+
+      - A "nearest" search selects the row in the right DataFrame whose 'on'
+        key is closest in absolute distance to the left's key.
+
+    Optionally match on equivalent keys with 'by' before searching with 'on'.
+
+    .. versionadded:: 3.3.0
+
+    Parameters
+    ----------
+    left : DataFrame or named Series
+    right : DataFrame or named Series
+    on : label
+        Field name to join on. Must be found in both DataFrames.
+        The data MUST be ordered. Furthermore this must be a numeric column,
+        such as datetimelike, integer, or float. On or left_on/right_on
+        must be given.
+    left_on : label
+        Field name to join on in left DataFrame.
+    right_on : label
+        Field name to join on in right DataFrame.
+    left_index : bool
+        Use the index of the left DataFrame as the join key.
+    right_index : bool
+        Use the index of the right DataFrame as the join key.
+    by : column name or list of column names
+        Match on these columns before performing merge operation.
+    left_by : column name
+        Field names to match on in the left DataFrame.
+    right_by : column name
+        Field names to match on in the right DataFrame.
+    suffixes : 2-length sequence (tuple, list, ...)
+        Suffix to apply to overlapping column names in the left and right
+        side, respectively.
+    tolerance : int or Timedelta, optional, default None
+        Select asof tolerance within this range; must be compatible
+        with the merge index.
+    allow_exact_matches : bool, default True
+
+        - If True, allow matching with the same 'on' value
+          (i.e. less-than-or-equal-to / greater-than-or-equal-to)
+        - If False, don't match the same 'on' value
+          (i.e., strictly less-than / strictly greater-than).
+
+    direction : 'backward' (default), 'forward', or 'nearest'
+        Whether to search for prior, subsequent, or closest matches.
+
+    Returns
+    -------
+    merged : DataFrame
+
+    See Also
+    --------
+    merge : Merge with a database-style join.
+    merge_ordered : Merge with optional filling/interpolation.
+
+    Examples
+    --------
+    >>> left = ps.DataFrame({"a": [1, 5, 10], "left_val": ["a", "b", "c"]})
+    >>> left
+        a left_val
+    0   1        a
+    1   5        b
+    2  10        c
+
+    >>> right = ps.DataFrame({"a": [1, 2, 3, 6, 7], "right_val": [1, 2, 3, 6, 
7]})
+    >>> right
+       a  right_val
+    0  1          1
+    1  2          2
+    2  3          3
+    3  6          6
+    4  7          7
+
+    >>> ps.merge_asof(left, right, 
on="a").sort_values("a").reset_index(drop=True)
+        a left_val  right_val
+    0   1        a          1
+    1   5        b          3
+    2  10        c          7
+
+    >>> ps.merge_asof(
+    ...     left,
+    ...     right,
+    ...     on="a",
+    ...     allow_exact_matches=False
+    ... ).sort_values("a").reset_index(drop=True)
+        a left_val  right_val
+    0   1        a        NaN
+    1   5        b        3.0
+    2  10        c        7.0
+
+    >>> ps.merge_asof(
+    ...     left,
+    ...     right,
+    ...     on="a",
+    ...     direction="forward"
+    ... ).sort_values("a").reset_index(drop=True)
+        a left_val  right_val
+    0   1        a        1.0
+    1   5        b        6.0
+    2  10        c        NaN
+
+    >>> ps.merge_asof(
+    ...     left,
+    ...     right,
+    ...     on="a",
+    ...     direction="nearest"
+    ... ).sort_values("a").reset_index(drop=True)
+        a left_val  right_val
+    0   1        a          1
+    1   5        b          6
+    2  10        c          7
+
+    We can use indexed DataFrames as well.
+
+    >>> left = ps.DataFrame({"left_val": ["a", "b", "c"]}, index=[1, 5, 10])
+    >>> left
+       left_val
+    1         a
+    5         b
+    10        c
+
+    >>> right = ps.DataFrame({"right_val": [1, 2, 3, 6, 7]}, index=[1, 2, 3, 
6, 7])
+    >>> right
+       right_val
+    1          1
+    2          2
+    3          3
+    6          6
+    7          7
+
+    >>> ps.merge_asof(left, right, left_index=True, 
right_index=True).sort_index()
+       left_val  right_val
+    1         a          1
+    5         b          3
+    10        c          7
+
+    Here is a real-world times-series example
+
+    >>> quotes = ps.DataFrame(
+    ...     {
+    ...         "time": [
+    ...             pd.Timestamp("2016-05-25 13:30:00.023"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.023"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.030"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.041"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.048"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.049"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.072"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.075")
+    ...         ],
+    ...         "ticker": [
+    ...                "GOOG",
+    ...                "MSFT",
+    ...                "MSFT",
+    ...                "MSFT",
+    ...                "GOOG",
+    ...                "AAPL",
+    ...                "GOOG",
+    ...                "MSFT"
+    ...            ],
+    ...            "bid": [720.50, 51.95, 51.97, 51.99, 720.50, 97.99, 720.50, 
52.01],
+    ...            "ask": [720.93, 51.96, 51.98, 52.00, 720.93, 98.01, 720.88, 
52.03]
+    ...     }
+    ... )
+    >>> quotes
+                         time ticker     bid     ask
+    0 2016-05-25 13:30:00.023   GOOG  720.50  720.93
+    1 2016-05-25 13:30:00.023   MSFT   51.95   51.96
+    2 2016-05-25 13:30:00.030   MSFT   51.97   51.98
+    3 2016-05-25 13:30:00.041   MSFT   51.99   52.00
+    4 2016-05-25 13:30:00.048   GOOG  720.50  720.93
+    5 2016-05-25 13:30:00.049   AAPL   97.99   98.01
+    6 2016-05-25 13:30:00.072   GOOG  720.50  720.88
+    7 2016-05-25 13:30:00.075   MSFT   52.01   52.03
+
+    >>> trades = ps.DataFrame(
+    ...        {
+    ...            "time": [
+    ...                pd.Timestamp("2016-05-25 13:30:00.023"),
+    ...                pd.Timestamp("2016-05-25 13:30:00.038"),
+    ...                pd.Timestamp("2016-05-25 13:30:00.048"),
+    ...                pd.Timestamp("2016-05-25 13:30:00.048"),
+    ...                pd.Timestamp("2016-05-25 13:30:00.048")
+    ...            ],
+    ...            "ticker": ["MSFT", "MSFT", "GOOG", "GOOG", "AAPL"],
+    ...            "price": [51.95, 51.95, 720.77, 720.92, 98.0],
+    ...            "quantity": [75, 155, 100, 100, 100]
+    ...        }
+    ...    )
+    >>> trades
+                         time ticker   price  quantity
+    0 2016-05-25 13:30:00.023   MSFT   51.95        75
+    1 2016-05-25 13:30:00.038   MSFT   51.95       155
+    2 2016-05-25 13:30:00.048   GOOG  720.77       100
+    3 2016-05-25 13:30:00.048   GOOG  720.92       100
+    4 2016-05-25 13:30:00.048   AAPL   98.00       100
+
+    By default we are taking the asof of the quotes
+
+    >>> ps.merge_asof(
+    ...    trades, quotes, on="time", by="ticker"
+    ... ).sort_values(["time", "ticker", "price"]).reset_index(drop=True)
+                         time ticker   price  quantity     bid     ask
+    0 2016-05-25 13:30:00.023   MSFT   51.95        75   51.95   51.96
+    1 2016-05-25 13:30:00.038   MSFT   51.95       155   51.97   51.98
+    2 2016-05-25 13:30:00.048   AAPL   98.00       100     NaN     NaN
+    3 2016-05-25 13:30:00.048   GOOG  720.77       100  720.50  720.93
+    4 2016-05-25 13:30:00.048   GOOG  720.92       100  720.50  720.93
+
+    We only asof within 2ms between the quote time and the trade time
+
+    >>> ps.merge_asof(
+    ...     trades,
+    ...     quotes,
+    ...     on="time",
+    ...     by="ticker",
+    ...     tolerance=F.expr("INTERVAL 2 MILLISECONDS")  # pd.Timedelta("2ms")
+    ... ).sort_values(["time", "ticker", "price"]).reset_index(drop=True)
+                         time ticker   price  quantity     bid     ask
+    0 2016-05-25 13:30:00.023   MSFT   51.95        75   51.95   51.96
+    1 2016-05-25 13:30:00.038   MSFT   51.95       155     NaN     NaN
+    2 2016-05-25 13:30:00.048   AAPL   98.00       100     NaN     NaN
+    3 2016-05-25 13:30:00.048   GOOG  720.77       100  720.50  720.93
+    4 2016-05-25 13:30:00.048   GOOG  720.92       100  720.50  720.93
+
+    We only asof within 10ms between the quote time and the trade time
+    and we exclude exact matches on time. However *prior* data will
+    propagate forward
+
+    >>> ps.merge_asof(
+    ...     trades,
+    ...     quotes,
+    ...     on="time",
+    ...     by="ticker",
+    ...     tolerance=F.expr("INTERVAL 10 MILLISECONDS"),  # 
pd.Timedelta("10ms")

Review comment:
       Yes, it should be handled in `pyspark side, and it's still WIP 
[SPARK-33054](https://issues.apache.org/jira/browse/SPARK-33054). We can remove 
this workaround when it's done.

##########
File path: python/pyspark/pandas/namespace.py
##########
@@ -2747,6 +2751,499 @@ def merge(
     )
 
 
+def merge_asof(
+    left: Union[DataFrame, Series],
+    right: Union[DataFrame, Series],
+    on: Optional[Name] = None,
+    left_on: Optional[Name] = None,
+    right_on: Optional[Name] = None,
+    left_index: bool = False,
+    right_index: bool = False,
+    by: Optional[Union[Name, List[Name]]] = None,
+    left_by: Optional[Union[Name, List[Name]]] = None,
+    right_by: Optional[Union[Name, List[Name]]] = None,
+    suffixes: Tuple[str, str] = ("_x", "_y"),
+    tolerance: Optional[Any] = None,
+    allow_exact_matches: bool = True,
+    direction: str = "backward",
+) -> DataFrame:
+    """
+    Perform an asof merge.
+
+    This is similar to a left-join except that we match on nearest
+    key rather than equal keys.
+
+    For each row in the left DataFrame:
+
+      - A "backward" search selects the last row in the right DataFrame whose
+        'on' key is less than or equal to the left's key.
+
+      - A "forward" search selects the first row in the right DataFrame whose
+        'on' key is greater than or equal to the left's key.
+
+      - A "nearest" search selects the row in the right DataFrame whose 'on'
+        key is closest in absolute distance to the left's key.
+
+    Optionally match on equivalent keys with 'by' before searching with 'on'.
+
+    .. versionadded:: 3.3.0
+
+    Parameters
+    ----------
+    left : DataFrame or named Series
+    right : DataFrame or named Series
+    on : label
+        Field name to join on. Must be found in both DataFrames.
+        The data MUST be ordered. Furthermore this must be a numeric column,
+        such as datetimelike, integer, or float. On or left_on/right_on
+        must be given.
+    left_on : label
+        Field name to join on in left DataFrame.
+    right_on : label
+        Field name to join on in right DataFrame.
+    left_index : bool
+        Use the index of the left DataFrame as the join key.
+    right_index : bool
+        Use the index of the right DataFrame as the join key.
+    by : column name or list of column names
+        Match on these columns before performing merge operation.
+    left_by : column name
+        Field names to match on in the left DataFrame.
+    right_by : column name
+        Field names to match on in the right DataFrame.
+    suffixes : 2-length sequence (tuple, list, ...)
+        Suffix to apply to overlapping column names in the left and right
+        side, respectively.
+    tolerance : int or Timedelta, optional, default None
+        Select asof tolerance within this range; must be compatible
+        with the merge index.
+    allow_exact_matches : bool, default True
+
+        - If True, allow matching with the same 'on' value
+          (i.e. less-than-or-equal-to / greater-than-or-equal-to)
+        - If False, don't match the same 'on' value
+          (i.e., strictly less-than / strictly greater-than).
+
+    direction : 'backward' (default), 'forward', or 'nearest'
+        Whether to search for prior, subsequent, or closest matches.
+
+    Returns
+    -------
+    merged : DataFrame
+
+    See Also
+    --------
+    merge : Merge with a database-style join.
+    merge_ordered : Merge with optional filling/interpolation.
+
+    Examples
+    --------
+    >>> left = ps.DataFrame({"a": [1, 5, 10], "left_val": ["a", "b", "c"]})
+    >>> left
+        a left_val
+    0   1        a
+    1   5        b
+    2  10        c
+
+    >>> right = ps.DataFrame({"a": [1, 2, 3, 6, 7], "right_val": [1, 2, 3, 6, 
7]})
+    >>> right
+       a  right_val
+    0  1          1
+    1  2          2
+    2  3          3
+    3  6          6
+    4  7          7
+
+    >>> ps.merge_asof(left, right, 
on="a").sort_values("a").reset_index(drop=True)
+        a left_val  right_val
+    0   1        a          1
+    1   5        b          3
+    2  10        c          7
+
+    >>> ps.merge_asof(
+    ...     left,
+    ...     right,
+    ...     on="a",
+    ...     allow_exact_matches=False
+    ... ).sort_values("a").reset_index(drop=True)
+        a left_val  right_val
+    0   1        a        NaN
+    1   5        b        3.0
+    2  10        c        7.0
+
+    >>> ps.merge_asof(
+    ...     left,
+    ...     right,
+    ...     on="a",
+    ...     direction="forward"
+    ... ).sort_values("a").reset_index(drop=True)
+        a left_val  right_val
+    0   1        a        1.0
+    1   5        b        6.0
+    2  10        c        NaN
+
+    >>> ps.merge_asof(
+    ...     left,
+    ...     right,
+    ...     on="a",
+    ...     direction="nearest"
+    ... ).sort_values("a").reset_index(drop=True)
+        a left_val  right_val
+    0   1        a          1
+    1   5        b          6
+    2  10        c          7
+
+    We can use indexed DataFrames as well.
+
+    >>> left = ps.DataFrame({"left_val": ["a", "b", "c"]}, index=[1, 5, 10])
+    >>> left
+       left_val
+    1         a
+    5         b
+    10        c
+
+    >>> right = ps.DataFrame({"right_val": [1, 2, 3, 6, 7]}, index=[1, 2, 3, 
6, 7])
+    >>> right
+       right_val
+    1          1
+    2          2
+    3          3
+    6          6
+    7          7
+
+    >>> ps.merge_asof(left, right, left_index=True, 
right_index=True).sort_index()
+       left_val  right_val
+    1         a          1
+    5         b          3
+    10        c          7
+
+    Here is a real-world times-series example
+
+    >>> quotes = ps.DataFrame(
+    ...     {
+    ...         "time": [
+    ...             pd.Timestamp("2016-05-25 13:30:00.023"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.023"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.030"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.041"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.048"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.049"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.072"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.075")
+    ...         ],
+    ...         "ticker": [
+    ...                "GOOG",
+    ...                "MSFT",
+    ...                "MSFT",
+    ...                "MSFT",
+    ...                "GOOG",
+    ...                "AAPL",
+    ...                "GOOG",
+    ...                "MSFT"
+    ...            ],
+    ...            "bid": [720.50, 51.95, 51.97, 51.99, 720.50, 97.99, 720.50, 
52.01],
+    ...            "ask": [720.93, 51.96, 51.98, 52.00, 720.93, 98.01, 720.88, 
52.03]
+    ...     }
+    ... )
+    >>> quotes
+                         time ticker     bid     ask
+    0 2016-05-25 13:30:00.023   GOOG  720.50  720.93
+    1 2016-05-25 13:30:00.023   MSFT   51.95   51.96
+    2 2016-05-25 13:30:00.030   MSFT   51.97   51.98
+    3 2016-05-25 13:30:00.041   MSFT   51.99   52.00
+    4 2016-05-25 13:30:00.048   GOOG  720.50  720.93
+    5 2016-05-25 13:30:00.049   AAPL   97.99   98.01
+    6 2016-05-25 13:30:00.072   GOOG  720.50  720.88
+    7 2016-05-25 13:30:00.075   MSFT   52.01   52.03
+
+    >>> trades = ps.DataFrame(
+    ...        {
+    ...            "time": [
+    ...                pd.Timestamp("2016-05-25 13:30:00.023"),
+    ...                pd.Timestamp("2016-05-25 13:30:00.038"),
+    ...                pd.Timestamp("2016-05-25 13:30:00.048"),
+    ...                pd.Timestamp("2016-05-25 13:30:00.048"),
+    ...                pd.Timestamp("2016-05-25 13:30:00.048")
+    ...            ],
+    ...            "ticker": ["MSFT", "MSFT", "GOOG", "GOOG", "AAPL"],
+    ...            "price": [51.95, 51.95, 720.77, 720.92, 98.0],
+    ...            "quantity": [75, 155, 100, 100, 100]
+    ...        }
+    ...    )
+    >>> trades
+                         time ticker   price  quantity
+    0 2016-05-25 13:30:00.023   MSFT   51.95        75
+    1 2016-05-25 13:30:00.038   MSFT   51.95       155
+    2 2016-05-25 13:30:00.048   GOOG  720.77       100
+    3 2016-05-25 13:30:00.048   GOOG  720.92       100
+    4 2016-05-25 13:30:00.048   AAPL   98.00       100
+
+    By default we are taking the asof of the quotes
+
+    >>> ps.merge_asof(
+    ...    trades, quotes, on="time", by="ticker"
+    ... ).sort_values(["time", "ticker", "price"]).reset_index(drop=True)
+                         time ticker   price  quantity     bid     ask
+    0 2016-05-25 13:30:00.023   MSFT   51.95        75   51.95   51.96
+    1 2016-05-25 13:30:00.038   MSFT   51.95       155   51.97   51.98
+    2 2016-05-25 13:30:00.048   AAPL   98.00       100     NaN     NaN
+    3 2016-05-25 13:30:00.048   GOOG  720.77       100  720.50  720.93
+    4 2016-05-25 13:30:00.048   GOOG  720.92       100  720.50  720.93
+
+    We only asof within 2ms between the quote time and the trade time
+
+    >>> ps.merge_asof(
+    ...     trades,
+    ...     quotes,
+    ...     on="time",
+    ...     by="ticker",
+    ...     tolerance=F.expr("INTERVAL 2 MILLISECONDS")  # pd.Timedelta("2ms")
+    ... ).sort_values(["time", "ticker", "price"]).reset_index(drop=True)
+                         time ticker   price  quantity     bid     ask
+    0 2016-05-25 13:30:00.023   MSFT   51.95        75   51.95   51.96
+    1 2016-05-25 13:30:00.038   MSFT   51.95       155     NaN     NaN
+    2 2016-05-25 13:30:00.048   AAPL   98.00       100     NaN     NaN
+    3 2016-05-25 13:30:00.048   GOOG  720.77       100  720.50  720.93
+    4 2016-05-25 13:30:00.048   GOOG  720.92       100  720.50  720.93
+
+    We only asof within 10ms between the quote time and the trade time
+    and we exclude exact matches on time. However *prior* data will
+    propagate forward
+
+    >>> ps.merge_asof(
+    ...     trades,
+    ...     quotes,
+    ...     on="time",
+    ...     by="ticker",
+    ...     tolerance=F.expr("INTERVAL 10 MILLISECONDS"),  # 
pd.Timedelta("10ms")

Review comment:
       Yes, it should be handled in pyspark side, and it's still WIP 
[SPARK-33054](https://issues.apache.org/jira/browse/SPARK-33054). We can remove 
this workaround when it's done.

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
##########
@@ -1597,3 +1597,119 @@ case class LateralJoin(
     copy(left = newChild)
   }
 }
+
+/**
+ * A logical plan for as-of join.
+ */
+case class AsOfJoin(
+    left: LogicalPlan,
+    right: LogicalPlan,
+    asOfCondition: Expression,
+    condition: Option[Expression],
+    joinType: JoinType,
+    orderExpression: Expression,
+    toleranceAssertion: Option[Expression]) extends BinaryNode {
+
+  require(Seq(Inner, LeftOuter).contains(joinType),
+    s"Unsupported as-of join type $joinType")
+
+  override protected def stringArgs: Iterator[Any] = super.stringArgs.take(5)
+
+  override def output: Seq[Attribute] = {
+    joinType match {
+      case LeftOuter =>
+        left.output ++ right.output.map(_.withNullability(true))
+      case _ =>
+        left.output ++ right.output
+    }
+  }
+
+  def duplicateResolved: Boolean = 
left.outputSet.intersect(right.outputSet).isEmpty
+
+  override lazy val resolved: Boolean = {
+    childrenResolved &&
+      expressions.forall(_.resolved) &&
+      duplicateResolved &&
+      asOfCondition.dataType == BooleanType &&
+      condition.forall(_.dataType == BooleanType) &&
+      toleranceAssertion.forall { assertion =>
+        assertion.foldable && assertion.eval().asInstanceOf[Boolean]
+      }
+  }
+
+  final override val nodePatterns: Seq[TreePattern] = Seq(AS_OF_JOIN)
+
+  override protected def withNewChildrenInternal(
+      newLeft: LogicalPlan, newRight: LogicalPlan): AsOfJoin = {
+    copy(left = newLeft, right = newRight)
+  }
+}
+
+object AsOfJoin {
+
+  def apply(
+      left: LogicalPlan,
+      right: LogicalPlan,
+      leftAsOf: Expression,
+      rightAsOf: Expression,
+      condition: Option[Expression],
+      joinType: JoinType,
+      tolerance: Option[Expression],
+      allowExactMatches: Boolean,
+      direction: AsOfJoinDirection): AsOfJoin = {
+    val asOfCond = makeAsOfCond(leftAsOf, rightAsOf, tolerance, 
allowExactMatches, direction)
+    val orderingExpr = makeOrderingExpr(leftAsOf, rightAsOf, direction)
+    AsOfJoin(left, right, asOfCond, condition, joinType,
+      orderingExpr, tolerance.map(t => GreaterThanOrEqual(t, 
Literal.default(t.dataType))))
+  }
+
+  private def makeAsOfCond(
+      leftAsOf: Expression,
+      rightAsOf: Expression,
+      tolerance: Option[Expression],
+      allowExactMatches: Boolean,
+      direction: AsOfJoinDirection): Expression = {
+    val base = (allowExactMatches, direction) match {
+      case (true, Backward) => GreaterThanOrEqual(leftAsOf, rightAsOf)
+      case (false, Backward) => GreaterThan(leftAsOf, rightAsOf)
+      case (true, Forward) => LessThanOrEqual(leftAsOf, rightAsOf)
+      case (false, Forward) => LessThan(leftAsOf, rightAsOf)
+      case (true, Nearest) => Literal.TrueLiteral
+      case (false, Nearest) => Not(EqualTo(leftAsOf, rightAsOf))
+    }
+    tolerance match {
+      case Some(tolerance) =>
+        (allowExactMatches, direction) match {
+          case (true, Backward) =>
+            And(base, GreaterThanOrEqual(rightAsOf, Subtract(leftAsOf, 
tolerance)))
+          case (false, Backward) =>
+            And(base, GreaterThan(rightAsOf, Subtract(leftAsOf, tolerance)))
+          case (true, Forward) =>
+            And(base, LessThanOrEqual(rightAsOf, Add(leftAsOf, tolerance)))

Review comment:
       I think we can leave it as-is because `Analyzer` should replace it with 
a proper expression; otherwise we should manage by hand here as well.

##########
File path: python/pyspark/pandas/namespace.py
##########
@@ -2747,6 +2751,499 @@ def merge(
     )
 
 
+def merge_asof(
+    left: Union[DataFrame, Series],
+    right: Union[DataFrame, Series],
+    on: Optional[Name] = None,
+    left_on: Optional[Name] = None,
+    right_on: Optional[Name] = None,
+    left_index: bool = False,
+    right_index: bool = False,
+    by: Optional[Union[Name, List[Name]]] = None,
+    left_by: Optional[Union[Name, List[Name]]] = None,
+    right_by: Optional[Union[Name, List[Name]]] = None,
+    suffixes: Tuple[str, str] = ("_x", "_y"),
+    tolerance: Optional[Any] = None,
+    allow_exact_matches: bool = True,
+    direction: str = "backward",
+) -> DataFrame:
+    """
+    Perform an asof merge.
+
+    This is similar to a left-join except that we match on nearest
+    key rather than equal keys.
+
+    For each row in the left DataFrame:
+
+      - A "backward" search selects the last row in the right DataFrame whose
+        'on' key is less than or equal to the left's key.
+
+      - A "forward" search selects the first row in the right DataFrame whose
+        'on' key is greater than or equal to the left's key.
+
+      - A "nearest" search selects the row in the right DataFrame whose 'on'
+        key is closest in absolute distance to the left's key.
+
+    Optionally match on equivalent keys with 'by' before searching with 'on'.
+
+    .. versionadded:: 3.3.0
+
+    Parameters
+    ----------
+    left : DataFrame or named Series
+    right : DataFrame or named Series
+    on : label
+        Field name to join on. Must be found in both DataFrames.
+        The data MUST be ordered. Furthermore this must be a numeric column,
+        such as datetimelike, integer, or float. On or left_on/right_on
+        must be given.
+    left_on : label
+        Field name to join on in left DataFrame.
+    right_on : label
+        Field name to join on in right DataFrame.
+    left_index : bool
+        Use the index of the left DataFrame as the join key.
+    right_index : bool
+        Use the index of the right DataFrame as the join key.
+    by : column name or list of column names
+        Match on these columns before performing merge operation.
+    left_by : column name
+        Field names to match on in the left DataFrame.
+    right_by : column name
+        Field names to match on in the right DataFrame.
+    suffixes : 2-length sequence (tuple, list, ...)
+        Suffix to apply to overlapping column names in the left and right
+        side, respectively.
+    tolerance : int or Timedelta, optional, default None
+        Select asof tolerance within this range; must be compatible
+        with the merge index.
+    allow_exact_matches : bool, default True
+
+        - If True, allow matching with the same 'on' value
+          (i.e. less-than-or-equal-to / greater-than-or-equal-to)
+        - If False, don't match the same 'on' value
+          (i.e., strictly less-than / strictly greater-than).
+
+    direction : 'backward' (default), 'forward', or 'nearest'
+        Whether to search for prior, subsequent, or closest matches.
+
+    Returns
+    -------
+    merged : DataFrame
+
+    See Also
+    --------
+    merge : Merge with a database-style join.
+    merge_ordered : Merge with optional filling/interpolation.
+
+    Examples
+    --------
+    >>> left = ps.DataFrame({"a": [1, 5, 10], "left_val": ["a", "b", "c"]})
+    >>> left
+        a left_val
+    0   1        a
+    1   5        b
+    2  10        c
+
+    >>> right = ps.DataFrame({"a": [1, 2, 3, 6, 7], "right_val": [1, 2, 3, 6, 
7]})
+    >>> right
+       a  right_val
+    0  1          1
+    1  2          2
+    2  3          3
+    3  6          6
+    4  7          7
+
+    >>> ps.merge_asof(left, right, 
on="a").sort_values("a").reset_index(drop=True)
+        a left_val  right_val
+    0   1        a          1
+    1   5        b          3
+    2  10        c          7
+
+    >>> ps.merge_asof(
+    ...     left,
+    ...     right,
+    ...     on="a",
+    ...     allow_exact_matches=False
+    ... ).sort_values("a").reset_index(drop=True)
+        a left_val  right_val
+    0   1        a        NaN
+    1   5        b        3.0
+    2  10        c        7.0
+
+    >>> ps.merge_asof(
+    ...     left,
+    ...     right,
+    ...     on="a",
+    ...     direction="forward"
+    ... ).sort_values("a").reset_index(drop=True)
+        a left_val  right_val
+    0   1        a        1.0
+    1   5        b        6.0
+    2  10        c        NaN
+
+    >>> ps.merge_asof(
+    ...     left,
+    ...     right,
+    ...     on="a",
+    ...     direction="nearest"
+    ... ).sort_values("a").reset_index(drop=True)
+        a left_val  right_val
+    0   1        a          1
+    1   5        b          6
+    2  10        c          7
+
+    We can use indexed DataFrames as well.
+
+    >>> left = ps.DataFrame({"left_val": ["a", "b", "c"]}, index=[1, 5, 10])
+    >>> left
+       left_val
+    1         a
+    5         b
+    10        c
+
+    >>> right = ps.DataFrame({"right_val": [1, 2, 3, 6, 7]}, index=[1, 2, 3, 
6, 7])
+    >>> right
+       right_val
+    1          1
+    2          2
+    3          3
+    6          6
+    7          7
+
+    >>> ps.merge_asof(left, right, left_index=True, 
right_index=True).sort_index()
+       left_val  right_val
+    1         a          1
+    5         b          3
+    10        c          7
+
+    Here is a real-world times-series example
+
+    >>> quotes = ps.DataFrame(
+    ...     {
+    ...         "time": [
+    ...             pd.Timestamp("2016-05-25 13:30:00.023"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.023"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.030"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.041"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.048"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.049"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.072"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.075")
+    ...         ],
+    ...         "ticker": [
+    ...                "GOOG",
+    ...                "MSFT",
+    ...                "MSFT",
+    ...                "MSFT",
+    ...                "GOOG",
+    ...                "AAPL",
+    ...                "GOOG",
+    ...                "MSFT"
+    ...            ],
+    ...            "bid": [720.50, 51.95, 51.97, 51.99, 720.50, 97.99, 720.50, 
52.01],
+    ...            "ask": [720.93, 51.96, 51.98, 52.00, 720.93, 98.01, 720.88, 
52.03]
+    ...     }
+    ... )
+    >>> quotes
+                         time ticker     bid     ask
+    0 2016-05-25 13:30:00.023   GOOG  720.50  720.93
+    1 2016-05-25 13:30:00.023   MSFT   51.95   51.96
+    2 2016-05-25 13:30:00.030   MSFT   51.97   51.98
+    3 2016-05-25 13:30:00.041   MSFT   51.99   52.00
+    4 2016-05-25 13:30:00.048   GOOG  720.50  720.93
+    5 2016-05-25 13:30:00.049   AAPL   97.99   98.01
+    6 2016-05-25 13:30:00.072   GOOG  720.50  720.88
+    7 2016-05-25 13:30:00.075   MSFT   52.01   52.03
+
+    >>> trades = ps.DataFrame(
+    ...        {
+    ...            "time": [
+    ...                pd.Timestamp("2016-05-25 13:30:00.023"),
+    ...                pd.Timestamp("2016-05-25 13:30:00.038"),
+    ...                pd.Timestamp("2016-05-25 13:30:00.048"),
+    ...                pd.Timestamp("2016-05-25 13:30:00.048"),
+    ...                pd.Timestamp("2016-05-25 13:30:00.048")
+    ...            ],
+    ...            "ticker": ["MSFT", "MSFT", "GOOG", "GOOG", "AAPL"],
+    ...            "price": [51.95, 51.95, 720.77, 720.92, 98.0],
+    ...            "quantity": [75, 155, 100, 100, 100]
+    ...        }
+    ...    )
+    >>> trades
+                         time ticker   price  quantity
+    0 2016-05-25 13:30:00.023   MSFT   51.95        75
+    1 2016-05-25 13:30:00.038   MSFT   51.95       155
+    2 2016-05-25 13:30:00.048   GOOG  720.77       100
+    3 2016-05-25 13:30:00.048   GOOG  720.92       100
+    4 2016-05-25 13:30:00.048   AAPL   98.00       100
+
+    By default we are taking the asof of the quotes
+
+    >>> ps.merge_asof(
+    ...    trades, quotes, on="time", by="ticker"
+    ... ).sort_values(["time", "ticker", "price"]).reset_index(drop=True)
+                         time ticker   price  quantity     bid     ask
+    0 2016-05-25 13:30:00.023   MSFT   51.95        75   51.95   51.96
+    1 2016-05-25 13:30:00.038   MSFT   51.95       155   51.97   51.98
+    2 2016-05-25 13:30:00.048   AAPL   98.00       100     NaN     NaN
+    3 2016-05-25 13:30:00.048   GOOG  720.77       100  720.50  720.93
+    4 2016-05-25 13:30:00.048   GOOG  720.92       100  720.50  720.93
+
+    We only asof within 2ms between the quote time and the trade time
+
+    >>> ps.merge_asof(
+    ...     trades,
+    ...     quotes,
+    ...     on="time",
+    ...     by="ticker",
+    ...     tolerance=F.expr("INTERVAL 2 MILLISECONDS")  # pd.Timedelta("2ms")
+    ... ).sort_values(["time", "ticker", "price"]).reset_index(drop=True)
+                         time ticker   price  quantity     bid     ask
+    0 2016-05-25 13:30:00.023   MSFT   51.95        75   51.95   51.96
+    1 2016-05-25 13:30:00.038   MSFT   51.95       155     NaN     NaN
+    2 2016-05-25 13:30:00.048   AAPL   98.00       100     NaN     NaN
+    3 2016-05-25 13:30:00.048   GOOG  720.77       100  720.50  720.93
+    4 2016-05-25 13:30:00.048   GOOG  720.92       100  720.50  720.93
+
+    We only asof within 10ms between the quote time and the trade time
+    and we exclude exact matches on time. However *prior* data will
+    propagate forward
+
+    >>> ps.merge_asof(
+    ...     trades,
+    ...     quotes,
+    ...     on="time",
+    ...     by="ticker",
+    ...     tolerance=F.expr("INTERVAL 10 MILLISECONDS"),  # 
pd.Timedelta("10ms")

Review comment:
       Yes, it should be handled in pyspark side, but it's still WIP 
[SPARK-33054](https://issues.apache.org/jira/browse/SPARK-33054). We can remove 
this workaround when it's done.

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
##########
@@ -1597,3 +1597,119 @@ case class LateralJoin(
     copy(left = newChild)
   }
 }
+
+/**
+ * A logical plan for as-of join.
+ */
+case class AsOfJoin(
+    left: LogicalPlan,
+    right: LogicalPlan,
+    asOfCondition: Expression,
+    condition: Option[Expression],
+    joinType: JoinType,
+    orderExpression: Expression,
+    toleranceAssertion: Option[Expression]) extends BinaryNode {
+
+  require(Seq(Inner, LeftOuter).contains(joinType),
+    s"Unsupported as-of join type $joinType")
+
+  override protected def stringArgs: Iterator[Any] = super.stringArgs.take(5)
+
+  override def output: Seq[Attribute] = {
+    joinType match {
+      case LeftOuter =>
+        left.output ++ right.output.map(_.withNullability(true))
+      case _ =>
+        left.output ++ right.output
+    }
+  }
+
+  def duplicateResolved: Boolean = 
left.outputSet.intersect(right.outputSet).isEmpty
+
+  override lazy val resolved: Boolean = {
+    childrenResolved &&
+      expressions.forall(_.resolved) &&
+      duplicateResolved &&
+      asOfCondition.dataType == BooleanType &&
+      condition.forall(_.dataType == BooleanType) &&
+      toleranceAssertion.forall { assertion =>
+        assertion.foldable && assertion.eval().asInstanceOf[Boolean]
+      }
+  }
+
+  final override val nodePatterns: Seq[TreePattern] = Seq(AS_OF_JOIN)
+
+  override protected def withNewChildrenInternal(
+      newLeft: LogicalPlan, newRight: LogicalPlan): AsOfJoin = {
+    copy(left = newLeft, right = newRight)
+  }
+}
+
+object AsOfJoin {
+
+  def apply(
+      left: LogicalPlan,
+      right: LogicalPlan,
+      leftAsOf: Expression,
+      rightAsOf: Expression,
+      condition: Option[Expression],
+      joinType: JoinType,
+      tolerance: Option[Expression],
+      allowExactMatches: Boolean,
+      direction: AsOfJoinDirection): AsOfJoin = {
+    val asOfCond = makeAsOfCond(leftAsOf, rightAsOf, tolerance, 
allowExactMatches, direction)
+    val orderingExpr = makeOrderingExpr(leftAsOf, rightAsOf, direction)
+    AsOfJoin(left, right, asOfCond, condition, joinType,
+      orderingExpr, tolerance.map(t => GreaterThanOrEqual(t, 
Literal.default(t.dataType))))
+  }
+
+  private def makeAsOfCond(
+      leftAsOf: Expression,
+      rightAsOf: Expression,
+      tolerance: Option[Expression],
+      allowExactMatches: Boolean,
+      direction: AsOfJoinDirection): Expression = {
+    val base = (allowExactMatches, direction) match {
+      case (true, Backward) => GreaterThanOrEqual(leftAsOf, rightAsOf)
+      case (false, Backward) => GreaterThan(leftAsOf, rightAsOf)
+      case (true, Forward) => LessThanOrEqual(leftAsOf, rightAsOf)
+      case (false, Forward) => LessThan(leftAsOf, rightAsOf)
+      case (true, Nearest) => Literal.TrueLiteral
+      case (false, Nearest) => Not(EqualTo(leftAsOf, rightAsOf))
+    }
+    tolerance match {
+      case Some(tolerance) =>
+        (allowExactMatches, direction) match {
+          case (true, Backward) =>
+            And(base, GreaterThanOrEqual(rightAsOf, Subtract(leftAsOf, 
tolerance)))
+          case (false, Backward) =>
+            And(base, GreaterThan(rightAsOf, Subtract(leftAsOf, tolerance)))
+          case (true, Forward) =>
+            And(base, LessThanOrEqual(rightAsOf, Add(leftAsOf, tolerance)))

Review comment:
       I think we can leave it as-is because `Analyzer` should replace it with 
a proper expression; otherwise we have to manage by hand here as well.

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
##########
@@ -1597,3 +1597,119 @@ case class LateralJoin(
     copy(left = newChild)
   }
 }
+
+/**
+ * A logical plan for as-of join.
+ */
+case class AsOfJoin(
+    left: LogicalPlan,
+    right: LogicalPlan,
+    asOfCondition: Expression,
+    condition: Option[Expression],
+    joinType: JoinType,
+    orderExpression: Expression,
+    toleranceAssertion: Option[Expression]) extends BinaryNode {
+
+  require(Seq(Inner, LeftOuter).contains(joinType),
+    s"Unsupported as-of join type $joinType")
+
+  override protected def stringArgs: Iterator[Any] = super.stringArgs.take(5)
+
+  override def output: Seq[Attribute] = {
+    joinType match {
+      case LeftOuter =>
+        left.output ++ right.output.map(_.withNullability(true))
+      case _ =>
+        left.output ++ right.output
+    }
+  }
+
+  def duplicateResolved: Boolean = 
left.outputSet.intersect(right.outputSet).isEmpty
+
+  override lazy val resolved: Boolean = {
+    childrenResolved &&
+      expressions.forall(_.resolved) &&
+      duplicateResolved &&
+      asOfCondition.dataType == BooleanType &&
+      condition.forall(_.dataType == BooleanType) &&
+      toleranceAssertion.forall { assertion =>
+        assertion.foldable && assertion.eval().asInstanceOf[Boolean]
+      }
+  }
+
+  final override val nodePatterns: Seq[TreePattern] = Seq(AS_OF_JOIN)
+
+  override protected def withNewChildrenInternal(
+      newLeft: LogicalPlan, newRight: LogicalPlan): AsOfJoin = {
+    copy(left = newLeft, right = newRight)
+  }
+}
+
+object AsOfJoin {
+
+  def apply(
+      left: LogicalPlan,
+      right: LogicalPlan,
+      leftAsOf: Expression,
+      rightAsOf: Expression,
+      condition: Option[Expression],
+      joinType: JoinType,
+      tolerance: Option[Expression],
+      allowExactMatches: Boolean,
+      direction: AsOfJoinDirection): AsOfJoin = {
+    val asOfCond = makeAsOfCond(leftAsOf, rightAsOf, tolerance, 
allowExactMatches, direction)
+    val orderingExpr = makeOrderingExpr(leftAsOf, rightAsOf, direction)
+    AsOfJoin(left, right, asOfCond, condition, joinType,
+      orderingExpr, tolerance.map(t => GreaterThanOrEqual(t, 
Literal.default(t.dataType))))
+  }
+
+  private def makeAsOfCond(
+      leftAsOf: Expression,
+      rightAsOf: Expression,
+      tolerance: Option[Expression],
+      allowExactMatches: Boolean,
+      direction: AsOfJoinDirection): Expression = {
+    val base = (allowExactMatches, direction) match {
+      case (true, Backward) => GreaterThanOrEqual(leftAsOf, rightAsOf)
+      case (false, Backward) => GreaterThan(leftAsOf, rightAsOf)
+      case (true, Forward) => LessThanOrEqual(leftAsOf, rightAsOf)
+      case (false, Forward) => LessThan(leftAsOf, rightAsOf)
+      case (true, Nearest) => Literal.TrueLiteral
+      case (false, Nearest) => Not(EqualTo(leftAsOf, rightAsOf))
+    }
+    tolerance match {
+      case Some(tolerance) =>
+        (allowExactMatches, direction) match {
+          case (true, Backward) =>
+            And(base, GreaterThanOrEqual(rightAsOf, Subtract(leftAsOf, 
tolerance)))
+          case (false, Backward) =>
+            And(base, GreaterThan(rightAsOf, Subtract(leftAsOf, tolerance)))
+          case (true, Forward) =>
+            And(base, LessThanOrEqual(rightAsOf, Add(leftAsOf, tolerance)))

Review comment:
       I think we can leave it as-is because `Analyzer` should replace it with 
a proper expression; otherwise we have to manage it here as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to