This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new f36f9f4dc1 Implement slicing in lazy sequence (#39483)
f36f9f4dc1 is described below

commit f36f9f4dc100986a32827b2ac3740ff56ed0a47c
Author: Tzu-ping Chung <uranu...@gmail.com>
AuthorDate: Thu May 9 10:44:39 2024 +0800

    Implement slicing in lazy sequence (#39483)
---
 airflow/utils/db.py                                | 77 ++++++++++++++++++----
 .../authoring-and-scheduling/datasets.rst          | 25 ++++++-
 tests/models/test_taskinstance.py                  | 53 +++++++++++++++
 3 files changed, 141 insertions(+), 14 deletions(-)

diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index fb6416966b..6c68879074 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -2043,15 +2043,70 @@ class LazySelectSequence(Sequence[T]):
     def __getitem__(self, key: int) -> T: ...
 
     @overload
-    def __getitem__(self, key: slice) -> Self: ...
+    def __getitem__(self, key: slice) -> Sequence[T]: ...
 
-    def __getitem__(self, key: int | slice) -> T | Self:
-        if not isinstance(key, int):
-            raise ValueError("non-index access is not supported")
-        if key >= 0:
-            stmt = self._select_asc.offset(key)
-        else:
-            stmt = self._select_desc.offset(-1 - key)
-        if (row := self._session.execute(stmt.limit(1)).one_or_none()) is None:
-            raise IndexError(key)
-        return self._process_row(row)
+    def __getitem__(self, key: int | slice) -> T | Sequence[T]:
+        if isinstance(key, int):
+            if key >= 0:
+                stmt = self._select_asc.offset(key)
+            else:
+                stmt = self._select_desc.offset(-1 - key)
+            if (row := self._session.execute(stmt.limit(1)).one_or_none()) is 
None:
+                raise IndexError(key)
+            return self._process_row(row)
+        elif isinstance(key, slice):
+            # This implements the slicing syntax. We want to optimize negative
+            # slicing (e.g. seq[-10:]) by not doing an additional COUNT query
+            # if possible. We can do this unless the start and stop have
+            # different signs (i.e. one is positive and another negative).
+            start, stop, reverse = _coerce_slice(key)
+            if start >= 0:
+                if stop is None:
+                    stmt = self._select_asc.offset(start)
+                elif stop >= 0:
+                    stmt = self._select_asc.slice(start, stop)
+                else:
+                    stmt = self._select_asc.slice(start, len(self) + stop)
+                rows = [self._process_row(row) for row in 
self._session.execute(stmt)]
+                if reverse:
+                    rows.reverse()
+            else:
+                if stop is None:
+                    stmt = self._select_desc.limit(-start)
+                elif stop < 0:
+                    stmt = self._select_desc.slice(-stop, -start)
+                else:
+                    stmt = self._select_desc.slice(len(self) - stop, -start)
+                rows = [self._process_row(row) for row in 
self._session.execute(stmt)]
+                if not reverse:
+                    rows.reverse()
+            return rows
+        raise TypeError(f"Sequence indices must be integers or slices, not 
{type(key).__name__}")
+
+
+def _coerce_index(value: Any) -> int | None:
+    """Check slice attribute's type and convert it to int.
+
+    See CPython documentation on this:
+    https://docs.python.org/3/reference/datamodel.html#object.__index__
+    """
+    if value is None or isinstance(value, int):
+        return value
+    if (index := getattr(value, "__index__", None)) is not None:
+        return index()
+    raise TypeError("slice indices must be integers or None or have an 
__index__ method")
+
+
+def _coerce_slice(key: slice) -> tuple[int, int | None, bool]:
+    """Check slice content and convert it for SQL.
+
+    See CPython documentation on this:
+    https://docs.python.org/3/reference/datamodel.html#slice-objects
+    """
+    if key.step is None or key.step == 1:
+        reverse = False
+    elif key.step == -1:
+        reverse = True
+    else:
+        raise ValueError("non-trivial slice step not supported")
+    return _coerce_index(key.start) or 0, _coerce_index(key.stop), reverse
diff --git a/docs/apache-airflow/authoring-and-scheduling/datasets.rst 
b/docs/apache-airflow/authoring-and-scheduling/datasets.rst
index c08b5ea0d8..5854e79edf 100644
--- a/docs/apache-airflow/authoring-and-scheduling/datasets.rst
+++ b/docs/apache-airflow/authoring-and-scheduling/datasets.rst
@@ -257,16 +257,34 @@ Another way to achieve the same is by accessing 
``outlet_events`` in a task's ex
 
     @task(outlets=[example_s3_dataset])
     def write_to_s3(*, outlet_events):
-        outlet_events[example_s3_dataset].extras = {"row_count": len(df)}
+        outlet_events[example_s3_dataset].extra = {"row_count": len(df)}
 
 There's minimal magic here---Airflow simply writes the yielded values to the 
exact same accessor. This also works in classic operators, including 
``execute``, ``pre_execute``, and ``post_execute``.
 
 
+Fetching information from previously emitted dataset events
+-----------------------------------------------------------
+
+.. versionadded:: 2.10.0
+
+Events of a dataset defined in a task's ``outlets``, as described in the 
previous section, can be read by a task that declares the same dataset in its 
``inlets``. A dataset event entry contains ``extra`` (see previous section for 
details), ``timestamp`` indicating when the event was emitted from a task, and 
``source_task_instance`` linking the event back to its source.
+
+Inlet dataset events can be read with the ``inlet_events`` accessor in the 
execution context. Continuing from the ``write_to_s3`` task in the previous 
section:
+
+.. code-block:: python
+
+    @task(inlets=[example_s3_dataset])
+    def post_process_s3_file(*, inlet_events):
+        events = inlet_events[example_s3_dataset]
+        last_row_count = events[-1].extra["row_count"]
+
+Each value in the ``inlet_events`` mapping is a sequence-like object that 
orders past events of a given dataset by ``timestamp``, earliest to latest. It 
supports most of Python's list interface, so you can use ``[-1]`` to access the 
last event, ``[-2:]`` for the last two, etc. The accessor is lazy and only hits 
the database when you access items inside it.
+
+
 Fetching information from a triggering dataset event
 ----------------------------------------------------
 
-A triggered DAG can fetch information from the dataset that triggered it using 
the ``triggering_dataset_events`` template or parameter.
-See more at :ref:`templates-ref`.
+A triggered DAG can fetch information from the dataset that triggered it using 
the ``triggering_dataset_events`` template or parameter. See more at 
:ref:`templates-ref`.
 
 Example:
 
@@ -301,6 +319,7 @@ Example:
 
 Note that this example is using `(.values() | first | first) 
<https://jinja.palletsprojects.com/en/3.1.x/templates/#jinja-filters.first>`_ 
to fetch the first of one dataset given to the DAG, and the first of one 
DatasetEvent for that dataset. An implementation can be quite complex if you 
have multiple datasets, potentially with multiple DatasetEvents.
 
+
 Advanced dataset scheduling with conditional expressions
 --------------------------------------------------------
 
diff --git a/tests/models/test_taskinstance.py 
b/tests/models/test_taskinstance.py
index 11d833a21c..3ab3ccc31f 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -2480,6 +2480,59 @@ class TestTaskInstance:
         assert not 
dr.task_instance_scheduling_decisions(session=session).schedulable_tis
         assert read_task_evaluated
 
+    @pytest.mark.parametrize(
+        "slicer, expected",
+        [
+            (lambda x: x[-2:], [{"from": 8}, {"from": 9}]),
+            (lambda x: x[-5:-3], [{"from": 5}, {"from": 6}]),
+            (lambda x: x[:-8], [{"from": 0}, {"from": 1}]),
+            (lambda x: x[1:-7], [{"from": 1}, {"from": 2}]),
+            (lambda x: x[-8:4], [{"from": 2}, {"from": 3}]),
+            (lambda x: x[-5:5], []),
+        ],
+    )
+    def test_inlet_dataset_extra_slice(self, dag_maker, session, slicer, 
expected):
+        from airflow.datasets import Dataset
+
+        ds_uri = "test_inlet_dataset_extra_slice"
+
+        with dag_maker(dag_id="write", schedule="@daily", params={"i": -1}, 
session=session):
+
+            @task(outlets=Dataset(ds_uri))
+            def write(*, params, outlet_events):
+                outlet_events[ds_uri].extra = {"from": params["i"]}
+
+            write()
+
+        # Run the write DAG 10 times.
+        dr = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED, conf={"i": 
0})
+        for ti in dr.get_task_instances(session=session):
+            ti.run(session=session)
+        for i in range(1, 10):
+            dr = dag_maker.create_dagrun_after(dr, 
run_type=DagRunType.SCHEDULED, conf={"i": i})
+            for ti in dr.get_task_instances(session=session):
+                ti.run(session=session)
+
+        result = "the task does not run"
+
+        with dag_maker(dag_id="read", schedule=None, session=session):
+
+            @task(inlets=Dataset(ds_uri))
+            def read(*, inlet_events):
+                nonlocal result
+                result = [e.extra for e in slicer(inlet_events[ds_uri])]
+
+            read()
+
+        # Run the read DAG.
+        dr = dag_maker.create_dagrun()
+        for ti in dr.get_task_instances(session=session):
+            ti.run(session=session)
+
+        # Should be done.
+        assert not 
dr.task_instance_scheduling_decisions(session=session).schedulable_tis
+        assert result == expected
+
     def test_changing_of_dataset_when_ddrq_is_already_populated(self, 
dag_maker):
         """
         Test that when a task that produces dataset has ran, that changing the 
consumer

Reply via email to