atrsa commented on code in PR #39509:
URL: https://github.com/apache/superset/pull/39509#discussion_r3347469520


##########
tests/unit_tests/models/test_helpers_offset.py:
##########
@@ -0,0 +1,88 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import ast
+from pathlib import Path
+
+HELPERS_PATH = (
+    Path(__file__).resolve().parents[3] / "superset" / "models" / "helpers.py"
+)
+
+
+def _uses_supports_offset(node: ast.AST) -> bool:
+    """True if any attribute access on `node` references 'supports_offset'."""
+    return any(
+        isinstance(child, ast.Attribute) and child.attr == "supports_offset"
+        for child in ast.walk(node)
+    )
+
+
+def _is_qry_offset_assignment(stmt: ast.AST) -> bool:
+    """True if stmt is `qry = qry.offset(...)` (any LHS, call to `.offset`)."""
+    if not isinstance(stmt, ast.Assign):
+        return False
+    call = stmt.value
+    if not isinstance(call, ast.Call):
+        return False
+    func = call.func
+    return isinstance(func, ast.Attribute) and func.attr == "offset"
+
+
+def test_helpers_guards_offset_with_supports_offset_flag() -> None:
+    """
+    Regression guard: the `.offset()` call in get_sqla_query must be wrapped
+    in an `if` that checks `supports_offset`. Without this guard,
+    engines that do not support OFFSET (Elasticsearch SQL) crash drill-
+    to-detail on page 2+.
+
+    We parse the AST rather than grep the source so the test survives
+    Black-style reformatting and trivial refactors.
+    """
+    source = HELPERS_PATH.read_text()
+    assert "supports_offset" in source, (

Review Comment:
   This is a unit-test module, not production code. `assert` is the standard 
pytest idiom and pytest never runs under `-O`, so the statement can't be 
optimized away here. Rewriting to `pytest.fail()` would be non-idiomatic. False 
positive — not changing.



##########
tests/unit_tests/db_engine_specs/test_elasticsearch.py:
##########
@@ -92,3 +93,325 @@ def test_opendistro_sqla_column_label(original: str, 
expected: str) -> None:
     from superset.db_engine_specs.elasticsearch import OpenDistroEngineSpec
 
     assert OpenDistroEngineSpec.make_label_compatible(original) == expected
+
+
+def test_elasticsearch_spec_opts_out_of_offset_fetch() -> None:
+    """
+    Elasticsearch SQL does not support OFFSET. The spec must opt out so the
+    query builder does not emit OFFSET clauses that crash the parser.
+    """
+    from superset.db_engine_specs.elasticsearch import ElasticSearchEngineSpec
+
+    assert ElasticSearchEngineSpec.supports_offset is False
+
+
+def test_opendistro_spec_opts_out_of_offset_fetch() -> None:
+    """
+    OpenDistro/OpenSearch SQL also does not support OFFSET.
+    """
+    from superset.db_engine_specs.elasticsearch import OpenDistroEngineSpec
+
+    assert OpenDistroEngineSpec.supports_offset is False
+
+
+def _build_fake_database(transport_responses: list[dict[str, Any]]) -> 
MagicMock:
+    """
+    Build a mocked Database whose get_raw_connection() yields a connection
+    whose es.transport.perform_request returns transport_responses 
sequentially.
+    """
+    database = MagicMock(name="Database")
+
+    responses_iter = iter(transport_responses)
+
+    def perform_request(method, path, body=None, **_kwargs):
+        return next(responses_iter)
+
+    transport = MagicMock()
+    transport.perform_request.side_effect = perform_request
+    conn = MagicMock()
+    conn.es.transport = transport
+
+    ctx = MagicMock()
+    ctx.__enter__ = MagicMock(return_value=conn)
+    ctx.__exit__ = MagicMock(return_value=False)
+    database.get_raw_connection.return_value = ctx
+    database._transport = transport  # expose for assertions
+    return database
+
+
+def test_fetch_data_with_cursor_returns_first_page_when_page_index_zero() -> 
None:
+    """
+    Page index 0 = return the rows from the initial query, no cursor
+    iteration needed. The cursor must still be closed if present.
+    """
+    from superset.db_engine_specs.elasticsearch import ElasticSearchEngineSpec
+
+    database = _build_fake_database(
+        [
+            {
+                "columns": [{"name": "a"}, {"name": "b"}],
+                "rows": [[1, "x"], [2, "y"]],
+                "cursor": "CUR-1",
+            },
+            {},  # close
+        ]
+    )
+
+    rows, cols = ElasticSearchEngineSpec.fetch_data_with_cursor(
+        database=database,
+        sql="SELECT a, b FROM idx",
+        page_index=0,
+        page_size=2,
+    )
+
+    assert rows == [[1, "x"], [2, "y"]]
+    assert cols == ["a", "b"]
+
+    calls = database._transport.perform_request.call_args_list
+    assert len(calls) == 2
+    assert calls[0][0][0] == "POST"
+    assert calls[0][0][1] == "/_sql"
+    assert calls[0].kwargs["body"] == {"query": "SELECT a, b FROM idx", 
"fetch_size": 2}
+    assert calls[1][0][1] == "/_sql/close"
+    assert calls[1].kwargs["body"] == {"cursor": "CUR-1"}
+
+
+def test_fetch_data_with_cursor_iterates_to_target_page() -> None:
+    """
+    For page_index=2, the code executes the initial query, then sends the
+    cursor twice. The rows returned belong to the third page.
+    """
+    from superset.db_engine_specs.elasticsearch import ElasticSearchEngineSpec
+
+    database = _build_fake_database(
+        [
+            {"columns": [{"name": "a"}], "rows": [[0]], "cursor": "C1"},
+            {"rows": [[1]], "cursor": "C2"},
+            {"rows": [[2]], "cursor": "C3"},
+            {},  # close
+        ]
+    )
+
+    rows, cols = ElasticSearchEngineSpec.fetch_data_with_cursor(
+        database=database,
+        sql="SELECT a FROM idx",
+        page_index=2,
+        page_size=1,
+    )
+
+    assert rows == [[2]]
+    assert cols == ["a"]
+
+    calls = database._transport.perform_request.call_args_list
+    assert len(calls) == 4
+    assert calls[1].kwargs["body"] == {"cursor": "C1"}
+    assert calls[2].kwargs["body"] == {"cursor": "C2"}
+    assert calls[3][0][1] == "/_sql/close"
+    assert calls[3].kwargs["body"] == {"cursor": "C3"}
+
+
+def test_fetch_data_with_cursor_returns_empty_when_dataset_exhausted() -> None:
+    """
+    If the dataset has fewer pages than the requested page_index, the
+    cursor becomes falsy mid-iteration. Return empty rows, do not call
+    close, do not raise.
+    """
+    from superset.db_engine_specs.elasticsearch import ElasticSearchEngineSpec
+
+    database = _build_fake_database(
+        [
+            {"columns": [{"name": "a"}], "rows": [[0]], "cursor": "C1"},
+            {"rows": [[1]]},  # no cursor → dataset ends here
+        ]
+    )
+
+    rows, cols = ElasticSearchEngineSpec.fetch_data_with_cursor(
+        database=database,
+        sql="SELECT a FROM idx",
+        page_index=5,
+        page_size=1,
+    )
+
+    assert rows == []
+    assert cols == ["a"]
+    assert len(database._transport.perform_request.call_args_list) == 2
+
+
+def test_fetch_data_with_cursor_does_not_close_when_no_cursor_present() -> 
None:
+    """
+    Some responses (tiny result sets) come back without a cursor token.
+    The code must not send a close request with a missing cursor.
+    """
+    from superset.db_engine_specs.elasticsearch import ElasticSearchEngineSpec
+
+    database = _build_fake_database(
+        [
+            {"columns": [{"name": "a"}], "rows": [[0], [1]]},
+        ]
+    )
+
+    rows, _ = ElasticSearchEngineSpec.fetch_data_with_cursor(
+        database=database,
+        sql="SELECT a FROM idx",
+        page_index=0,
+        page_size=50,
+    )
+
+    assert rows == [[0], [1]]
+    assert len(database._transport.perform_request.call_args_list) == 1
+
+
+def test_fetch_data_with_cursor_closes_cursor_even_if_iteration_raises() -> 
None:
+    """
+    If an intermediate cursor request raises, the cursor from the most
+    recent successful response must still be closed. Prevents server-side
+    cursor leaks on transport errors.
+    """
+    from superset.db_engine_specs.elasticsearch import ElasticSearchEngineSpec
+
+    class BoomError(RuntimeError):
+        pass
+
+    responses = [
+        {"columns": [{"name": "a"}], "rows": [[0]], "cursor": "C1"},
+    ]
+
+    call_count = {"n": 0}
+    recorded_close = {}
+
+    def perform_request(method, path, body=None, **_kwargs):
+        call_count["n"] += 1
+        # calls: 1=initial query, 2=cursor follow-up (raises), 3=close
+        if call_count["n"] == 2:
+            raise BoomError("transport blew up")
+        if path.endswith("/close"):
+            recorded_close["body"] = body
+            return {}
+        return responses[call_count["n"] - 1]
+
+    transport = MagicMock()
+    transport.perform_request.side_effect = perform_request
+    conn = MagicMock()
+    conn.es.transport = transport
+    ctx = MagicMock()
+    ctx.__enter__ = MagicMock(return_value=conn)
+    ctx.__exit__ = MagicMock(return_value=False)
+    database = MagicMock()
+    database.get_raw_connection.return_value = ctx
+
+    with pytest.raises(BoomError):
+        ElasticSearchEngineSpec.fetch_data_with_cursor(
+            database=database,
+            sql="SELECT a FROM idx",
+            page_index=3,
+            page_size=1,
+        )
+
+    assert recorded_close.get("body") == {"cursor": "C1"}, (
+        "The cursor from the last successful response must be closed "
+        "even when a later iteration raises."
+    )
+
+
[email protected](
+    "sql_in,expected_query",

Review Comment:
   `@pytest.mark.parametrize("sql_in,expected_query", ...)` with a 
comma-separated string is fully valid and the most common pytest idiom — pytest 
splits the names internally. A tuple of names is equivalent, not required. 
False positive — not changing.



##########
superset/db_engine_specs/elasticsearch.py:
##########
@@ -29,16 +32,108 @@
     SupersetDBAPIProgrammingError,
 )
 
+if TYPE_CHECKING:
+    from superset.models.core import Database
+
 logger = logging.getLogger()
 
 
+def _fetch_page_via_cursor(
+    database: Database,
+    sql: str,
+    page_index: int,
+    page_size: int,
+    sql_path: str,
+    close_path: str,
+) -> tuple[list[list[Any]], list[str]]:
+    """
+    Iterate Elasticsearch/OpenSearch SQL cursor pagination to return a single
+    page of results.
+
+    Executes ``sql`` with ``fetch_size = page_size``, then sends cursor
+    follow-up requests ``page_index`` times to skip earlier pages. Closes the
+    cursor when done to release server-side state. Returns
+    ``(rows, columns)``.
+
+    If the dataset is exhausted before reaching ``page_index``, returns an
+    empty rows list with the column names from the initial request.
+
+    Note: the Elasticsearch SQL cursor is forward-only, so cost is linear in
+    ``page_index`` — reaching page N issues N round trips to the cluster.
+    Deep pagination (hundreds of pages) will therefore be noticeably slower
+    than on ``OFFSET``-capable engines. This is a protocol limitation, not
+    an implementation choice.
+    """
+    # The Elasticsearch SQL API rejects trailing semicolons, and any LIMIT
+    # in the submitted statement caps the result set before the cursor can
+    # page through it. ``fetch_size`` drives pagination instead.
+    # Assumption: Superset only appends a trailing ``LIMIT N`` for engines
+    # with ``supports_offset=False``. If that ever changes (e.g.
+    # ``FETCH FIRST N ROWS`` or ``TOP N``), extend this sanitizer to match.
+    sanitized_sql = sql.strip().rstrip(";").strip()
+    sanitized_sql = re.sub(
+        r"\s+LIMIT\s+\d+\s*$", "", sanitized_sql, flags=re.IGNORECASE
+    )
+
+    # The raw transport does not auto-set Content-Type the way the Python
+    # DB-API driver does; ES rejects POSTs without a JSON content type.
+    json_headers = {"Content-Type": "application/json"}
+    with database.get_raw_connection() as conn:
+        transport = conn.es.transport
+        response = transport.perform_request(
+            "POST",
+            sql_path,
+            headers=json_headers,
+            body={"query": sanitized_sql, "fetch_size": page_size},
+        )
+        columns = [col["name"] for col in response.get("columns", [])]
+        rows = response.get("rows", [])
+        cursor = response.get("cursor")
+
+        try:
+            for _ in range(page_index):
+                if not cursor:
+                    # Dataset exhausted before reaching the target page —
+                    # no cursor to close (ES returns no cursor on the final
+                    # page). Return immediately with empty rows.
+                    return [], columns
+                response = transport.perform_request(
+                    "POST",
+                    sql_path,
+                    headers=json_headers,
+                    body={"cursor": cursor},
+                )
+                rows = response.get("rows", [])
+                cursor = response.get("cursor")
+
+            return rows, columns
+        finally:
+            if cursor:
+                # Best-effort cleanup. If close itself fails we don't want
+                # to mask the original error (if any) — swallow and log.
+                try:
+                    transport.perform_request(
+                        "POST",
+                        close_path,
+                        headers=json_headers,
+                        body={"cursor": cursor},
+                    )
+                except Exception:  # pylint: disable=broad-except

Review Comment:
   This is a best-effort cursor close in a `finally` block; a failure here must 
not mask the original error, so the broad catch is intentional and already 
annotated with `# pylint: disable=broad-except` plus an explanatory comment. 
The suggested `except (ConnectionError, TimeoutError, Exception)` is redundant 
since `Exception` already subsumes the other two. False positive — keeping 
as-is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to