atrsa commented on code in PR #39509:
URL: https://github.com/apache/superset/pull/39509#discussion_r3347999153


##########
tests/unit_tests/db_engine_specs/test_elasticsearch.py:
##########
@@ -92,3 +93,325 @@ def test_opendistro_sqla_column_label(original: str, 
expected: str) -> None:
     from superset.db_engine_specs.elasticsearch import OpenDistroEngineSpec
 
     assert OpenDistroEngineSpec.make_label_compatible(original) == expected
+
+
+def test_elasticsearch_spec_opts_out_of_offset_fetch() -> None:
+    """
+    Elasticsearch SQL does not support OFFSET. The spec must opt out so the
+    query builder does not emit OFFSET clauses that crash the parser.
+    """
+    from superset.db_engine_specs.elasticsearch import ElasticSearchEngineSpec
+
+    assert ElasticSearchEngineSpec.supports_offset is False
+
+
+def test_opendistro_spec_opts_out_of_offset_fetch() -> None:
+    """
+    OpenDistro/OpenSearch SQL also does not support OFFSET.
+    """
+    from superset.db_engine_specs.elasticsearch import OpenDistroEngineSpec
+
+    assert OpenDistroEngineSpec.supports_offset is False
+
+
+def _build_fake_database(transport_responses: list[dict[str, Any]]) -> 
MagicMock:
+    """
+    Build a mocked Database whose get_raw_connection() yields a connection
+    whose es.transport.perform_request returns transport_responses 
sequentially.
+    """
+    database = MagicMock(name="Database")
+
+    responses_iter = iter(transport_responses)
+
+    def perform_request(method, path, body=None, **_kwargs):

Review Comment:
   Added explicit parameter and return type annotations to this 
`perform_request` mock callback.



##########
tests/unit_tests/db_engine_specs/test_elasticsearch.py:
##########
@@ -92,3 +93,325 @@ def test_opendistro_sqla_column_label(original: str, 
expected: str) -> None:
     from superset.db_engine_specs.elasticsearch import OpenDistroEngineSpec
 
     assert OpenDistroEngineSpec.make_label_compatible(original) == expected
+
+
+def test_elasticsearch_spec_opts_out_of_offset_fetch() -> None:
+    """
+    Elasticsearch SQL does not support OFFSET. The spec must opt out so the
+    query builder does not emit OFFSET clauses that crash the parser.
+    """
+    from superset.db_engine_specs.elasticsearch import ElasticSearchEngineSpec
+
+    assert ElasticSearchEngineSpec.supports_offset is False
+
+
+def test_opendistro_spec_opts_out_of_offset_fetch() -> None:
+    """
+    OpenDistro/OpenSearch SQL also does not support OFFSET.
+    """
+    from superset.db_engine_specs.elasticsearch import OpenDistroEngineSpec
+
+    assert OpenDistroEngineSpec.supports_offset is False
+
+
+def _build_fake_database(transport_responses: list[dict[str, Any]]) -> 
MagicMock:
+    """
+    Build a mocked Database whose get_raw_connection() yields a connection
+    whose es.transport.perform_request returns transport_responses 
sequentially.
+    """
+    database = MagicMock(name="Database")
+
+    responses_iter = iter(transport_responses)
+
+    def perform_request(method, path, body=None, **_kwargs):
+        return next(responses_iter)
+
+    transport = MagicMock()
+    transport.perform_request.side_effect = perform_request
+    conn = MagicMock()
+    conn.es.transport = transport
+
+    ctx = MagicMock()
+    ctx.__enter__ = MagicMock(return_value=conn)
+    ctx.__exit__ = MagicMock(return_value=False)
+    database.get_raw_connection.return_value = ctx
+    database._transport = transport  # expose for assertions
+    return database
+
+
+def test_fetch_data_with_cursor_returns_first_page_when_page_index_zero() -> 
None:
+    """
+    Page index 0 = return the rows from the initial query, no cursor
+    iteration needed. The cursor must still be closed if present.
+    """
+    from superset.db_engine_specs.elasticsearch import ElasticSearchEngineSpec
+
+    database = _build_fake_database(
+        [
+            {
+                "columns": [{"name": "a"}, {"name": "b"}],
+                "rows": [[1, "x"], [2, "y"]],
+                "cursor": "CUR-1",
+            },
+            {},  # close
+        ]
+    )
+
+    rows, cols = ElasticSearchEngineSpec.fetch_data_with_cursor(
+        database=database,
+        sql="SELECT a, b FROM idx",
+        page_index=0,
+        page_size=2,
+    )
+
+    assert rows == [[1, "x"], [2, "y"]]
+    assert cols == ["a", "b"]
+
+    calls = database._transport.perform_request.call_args_list
+    assert len(calls) == 2
+    assert calls[0][0][0] == "POST"
+    assert calls[0][0][1] == "/_sql"
+    assert calls[0].kwargs["body"] == {"query": "SELECT a, b FROM idx", 
"fetch_size": 2}
+    assert calls[1][0][1] == "/_sql/close"
+    assert calls[1].kwargs["body"] == {"cursor": "CUR-1"}
+
+
+def test_fetch_data_with_cursor_iterates_to_target_page() -> None:
+    """
+    For page_index=2, the code executes the initial query, then sends the
+    cursor twice. The rows returned belong to the third page.
+    """
+    from superset.db_engine_specs.elasticsearch import ElasticSearchEngineSpec
+
+    database = _build_fake_database(
+        [
+            {"columns": [{"name": "a"}], "rows": [[0]], "cursor": "C1"},
+            {"rows": [[1]], "cursor": "C2"},
+            {"rows": [[2]], "cursor": "C3"},
+            {},  # close
+        ]
+    )
+
+    rows, cols = ElasticSearchEngineSpec.fetch_data_with_cursor(
+        database=database,
+        sql="SELECT a FROM idx",
+        page_index=2,
+        page_size=1,
+    )
+
+    assert rows == [[2]]
+    assert cols == ["a"]
+
+    calls = database._transport.perform_request.call_args_list
+    assert len(calls) == 4
+    assert calls[1].kwargs["body"] == {"cursor": "C1"}
+    assert calls[2].kwargs["body"] == {"cursor": "C2"}
+    assert calls[3][0][1] == "/_sql/close"
+    assert calls[3].kwargs["body"] == {"cursor": "C3"}
+
+
+def test_fetch_data_with_cursor_returns_empty_when_dataset_exhausted() -> None:
+    """
+    If the dataset has fewer pages than the requested page_index, the
+    cursor becomes falsy mid-iteration. Return empty rows, do not call
+    close, do not raise.
+    """
+    from superset.db_engine_specs.elasticsearch import ElasticSearchEngineSpec
+
+    database = _build_fake_database(
+        [
+            {"columns": [{"name": "a"}], "rows": [[0]], "cursor": "C1"},
+            {"rows": [[1]]},  # no cursor → dataset ends here
+        ]
+    )
+
+    rows, cols = ElasticSearchEngineSpec.fetch_data_with_cursor(
+        database=database,
+        sql="SELECT a FROM idx",
+        page_index=5,
+        page_size=1,
+    )
+
+    assert rows == []
+    assert cols == ["a"]
+    assert len(database._transport.perform_request.call_args_list) == 2
+
+
+def test_fetch_data_with_cursor_does_not_close_when_no_cursor_present() -> 
None:
+    """
+    Some responses (tiny result sets) come back without a cursor token.
+    The code must not send a close request with a missing cursor.
+    """
+    from superset.db_engine_specs.elasticsearch import ElasticSearchEngineSpec
+
+    database = _build_fake_database(
+        [
+            {"columns": [{"name": "a"}], "rows": [[0], [1]]},
+        ]
+    )
+
+    rows, _ = ElasticSearchEngineSpec.fetch_data_with_cursor(
+        database=database,
+        sql="SELECT a FROM idx",
+        page_index=0,
+        page_size=50,
+    )
+
+    assert rows == [[0], [1]]
+    assert len(database._transport.perform_request.call_args_list) == 1
+
+
+def test_fetch_data_with_cursor_closes_cursor_even_if_iteration_raises() -> 
None:
+    """
+    If an intermediate cursor request raises, the cursor from the most
+    recent successful response must still be closed. Prevents server-side
+    cursor leaks on transport errors.
+    """
+    from superset.db_engine_specs.elasticsearch import ElasticSearchEngineSpec
+
+    class BoomError(RuntimeError):
+        pass

Review Comment:
   Added a docstring to the `BoomError` test helper.



##########
tests/unit_tests/db_engine_specs/test_elasticsearch.py:
##########
@@ -92,3 +93,325 @@ def test_opendistro_sqla_column_label(original: str, 
expected: str) -> None:
     from superset.db_engine_specs.elasticsearch import OpenDistroEngineSpec
 
     assert OpenDistroEngineSpec.make_label_compatible(original) == expected
+
+
+def test_elasticsearch_spec_opts_out_of_offset_fetch() -> None:
+    """
+    Elasticsearch SQL does not support OFFSET. The spec must opt out so the
+    query builder does not emit OFFSET clauses that crash the parser.
+    """
+    from superset.db_engine_specs.elasticsearch import ElasticSearchEngineSpec
+
+    assert ElasticSearchEngineSpec.supports_offset is False
+
+
+def test_opendistro_spec_opts_out_of_offset_fetch() -> None:
+    """
+    OpenDistro/OpenSearch SQL also does not support OFFSET.
+    """
+    from superset.db_engine_specs.elasticsearch import OpenDistroEngineSpec
+
+    assert OpenDistroEngineSpec.supports_offset is False
+
+
+def _build_fake_database(transport_responses: list[dict[str, Any]]) -> 
MagicMock:
+    """
+    Build a mocked Database whose get_raw_connection() yields a connection
+    whose es.transport.perform_request returns transport_responses 
sequentially.
+    """
+    database = MagicMock(name="Database")
+
+    responses_iter = iter(transport_responses)
+
+    def perform_request(method, path, body=None, **_kwargs):
+        return next(responses_iter)
+
+    transport = MagicMock()
+    transport.perform_request.side_effect = perform_request
+    conn = MagicMock()
+    conn.es.transport = transport
+
+    ctx = MagicMock()
+    ctx.__enter__ = MagicMock(return_value=conn)
+    ctx.__exit__ = MagicMock(return_value=False)
+    database.get_raw_connection.return_value = ctx
+    database._transport = transport  # expose for assertions
+    return database
+
+
+def test_fetch_data_with_cursor_returns_first_page_when_page_index_zero() -> 
None:
+    """
+    Page index 0 = return the rows from the initial query, no cursor
+    iteration needed. The cursor must still be closed if present.
+    """
+    from superset.db_engine_specs.elasticsearch import ElasticSearchEngineSpec
+
+    database = _build_fake_database(
+        [
+            {
+                "columns": [{"name": "a"}, {"name": "b"}],
+                "rows": [[1, "x"], [2, "y"]],
+                "cursor": "CUR-1",
+            },
+            {},  # close
+        ]
+    )
+
+    rows, cols = ElasticSearchEngineSpec.fetch_data_with_cursor(
+        database=database,
+        sql="SELECT a, b FROM idx",
+        page_index=0,
+        page_size=2,
+    )
+
+    assert rows == [[1, "x"], [2, "y"]]
+    assert cols == ["a", "b"]
+
+    calls = database._transport.perform_request.call_args_list
+    assert len(calls) == 2
+    assert calls[0][0][0] == "POST"
+    assert calls[0][0][1] == "/_sql"
+    assert calls[0].kwargs["body"] == {"query": "SELECT a, b FROM idx", 
"fetch_size": 2}
+    assert calls[1][0][1] == "/_sql/close"
+    assert calls[1].kwargs["body"] == {"cursor": "CUR-1"}
+
+
+def test_fetch_data_with_cursor_iterates_to_target_page() -> None:
+    """
+    For page_index=2, the code executes the initial query, then sends the
+    cursor twice. The rows returned belong to the third page.
+    """
+    from superset.db_engine_specs.elasticsearch import ElasticSearchEngineSpec
+
+    database = _build_fake_database(
+        [
+            {"columns": [{"name": "a"}], "rows": [[0]], "cursor": "C1"},
+            {"rows": [[1]], "cursor": "C2"},
+            {"rows": [[2]], "cursor": "C3"},
+            {},  # close
+        ]
+    )
+
+    rows, cols = ElasticSearchEngineSpec.fetch_data_with_cursor(
+        database=database,
+        sql="SELECT a FROM idx",
+        page_index=2,
+        page_size=1,
+    )
+
+    assert rows == [[2]]
+    assert cols == ["a"]
+
+    calls = database._transport.perform_request.call_args_list
+    assert len(calls) == 4
+    assert calls[1].kwargs["body"] == {"cursor": "C1"}
+    assert calls[2].kwargs["body"] == {"cursor": "C2"}
+    assert calls[3][0][1] == "/_sql/close"
+    assert calls[3].kwargs["body"] == {"cursor": "C3"}
+
+
+def test_fetch_data_with_cursor_returns_empty_when_dataset_exhausted() -> None:
+    """
+    If the dataset has fewer pages than the requested page_index, the
+    cursor becomes falsy mid-iteration. Return empty rows, do not call
+    close, do not raise.
+    """
+    from superset.db_engine_specs.elasticsearch import ElasticSearchEngineSpec
+
+    database = _build_fake_database(
+        [
+            {"columns": [{"name": "a"}], "rows": [[0]], "cursor": "C1"},
+            {"rows": [[1]]},  # no cursor → dataset ends here
+        ]
+    )
+
+    rows, cols = ElasticSearchEngineSpec.fetch_data_with_cursor(
+        database=database,
+        sql="SELECT a FROM idx",
+        page_index=5,
+        page_size=1,
+    )
+
+    assert rows == []
+    assert cols == ["a"]
+    assert len(database._transport.perform_request.call_args_list) == 2
+
+
+def test_fetch_data_with_cursor_does_not_close_when_no_cursor_present() -> 
None:
+    """
+    Some responses (tiny result sets) come back without a cursor token.
+    The code must not send a close request with a missing cursor.
+    """
+    from superset.db_engine_specs.elasticsearch import ElasticSearchEngineSpec
+
+    database = _build_fake_database(
+        [
+            {"columns": [{"name": "a"}], "rows": [[0], [1]]},
+        ]
+    )
+
+    rows, _ = ElasticSearchEngineSpec.fetch_data_with_cursor(
+        database=database,
+        sql="SELECT a FROM idx",
+        page_index=0,
+        page_size=50,
+    )
+
+    assert rows == [[0], [1]]
+    assert len(database._transport.perform_request.call_args_list) == 1
+
+
+def test_fetch_data_with_cursor_closes_cursor_even_if_iteration_raises() -> 
None:
+    """
+    If an intermediate cursor request raises, the cursor from the most
+    recent successful response must still be closed. Prevents server-side
+    cursor leaks on transport errors.
+    """
+    from superset.db_engine_specs.elasticsearch import ElasticSearchEngineSpec
+
+    class BoomError(RuntimeError):
+        pass
+
+    responses = [
+        {"columns": [{"name": "a"}], "rows": [[0]], "cursor": "C1"},
+    ]
+
+    call_count = {"n": 0}
+    recorded_close = {}
+
+    def perform_request(method, path, body=None, **_kwargs):

Review Comment:
   Added explicit parameter and return type annotations to this 
`perform_request` mock callback.



##########
tests/unit_tests/models/test_helpers_offset.py:
##########
@@ -0,0 +1,88 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import ast
+from pathlib import Path
+
+HELPERS_PATH = (
+    Path(__file__).resolve().parents[3] / "superset" / "models" / "helpers.py"
+)
+
+
+def _uses_supports_offset(node: ast.AST) -> bool:
+    """True if any attribute access on `node` references 'supports_offset'."""
+    return any(
+        isinstance(child, ast.Attribute) and child.attr == "supports_offset"
+        for child in ast.walk(node)
+    )
+
+
+def _is_qry_offset_assignment(stmt: ast.AST) -> bool:
+    """True if stmt is `qry = qry.offset(...)` (any LHS, call to `.offset`)."""
+    if not isinstance(stmt, ast.Assign):
+        return False
+    call = stmt.value
+    if not isinstance(call, ast.Call):
+        return False
+    func = call.func
+    return isinstance(func, ast.Attribute) and func.attr == "offset"
+
+
+def test_helpers_guards_offset_with_supports_offset_flag() -> None:
+    """
+    Regression guard: the `.offset()` call in get_sqla_query must be wrapped
+    in an `if` that checks `supports_offset`. Without this guard,
+    engines that do not support OFFSET (Elasticsearch SQL) crash drill-
+    to-detail on page 2+.
+
+    We parse the AST rather than grep the source so the test survives
+    Black-style reformatting and trivial refactors.
+    """
+    source = HELPERS_PATH.read_text()
+    assert "supports_offset" in source, (
+        "helpers.py no longer references supports_offset; the OFFSET "
+        "guard is gone — Elasticsearch drill-to-detail will crash on page 2+."
+    )
+
+    tree = ast.parse(source)
+    unguarded: list[int] = []
+
+    class Visitor(ast.NodeVisitor):
+        def __init__(self) -> None:

Review Comment:
   Added a docstring to the `Visitor` class.



##########
tests/unit_tests/models/test_helpers_offset.py:
##########
@@ -0,0 +1,88 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import ast
+from pathlib import Path
+
+HELPERS_PATH = (
+    Path(__file__).resolve().parents[3] / "superset" / "models" / "helpers.py"
+)
+
+
+def _uses_supports_offset(node: ast.AST) -> bool:
+    """True if any attribute access on `node` references 'supports_offset'."""
+    return any(
+        isinstance(child, ast.Attribute) and child.attr == "supports_offset"
+        for child in ast.walk(node)
+    )
+
+
+def _is_qry_offset_assignment(stmt: ast.AST) -> bool:
+    """True if stmt is `qry = qry.offset(...)` (any LHS, call to `.offset`)."""
+    if not isinstance(stmt, ast.Assign):
+        return False
+    call = stmt.value
+    if not isinstance(call, ast.Call):
+        return False
+    func = call.func
+    return isinstance(func, ast.Attribute) and func.attr == "offset"
+
+
+def test_helpers_guards_offset_with_supports_offset_flag() -> None:
+    """
+    Regression guard: the `.offset()` call in get_sqla_query must be wrapped
+    in an `if` that checks `supports_offset`. Without this guard,
+    engines that do not support OFFSET (Elasticsearch SQL) crash drill-
+    to-detail on page 2+.
+
+    We parse the AST rather than grep the source so the test survives
+    Black-style reformatting and trivial refactors.
+    """
+    source = HELPERS_PATH.read_text()
+    assert "supports_offset" in source, (
+        "helpers.py no longer references supports_offset; the OFFSET "
+        "guard is gone — Elasticsearch drill-to-detail will crash on page 2+."
+    )
+
+    tree = ast.parse(source)
+    unguarded: list[int] = []
+
+    class Visitor(ast.NodeVisitor):
+        def __init__(self) -> None:
+            self._in_guarded_if = 0

Review Comment:
   Added a docstring to the `__init__` method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to