This is an automated email from the ASF dual-hosted git repository. uranusjr pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new f36f9f4dc1 Implement slicing in lazy sequence (#39483) f36f9f4dc1 is described below commit f36f9f4dc100986a32827b2ac3740ff56ed0a47c Author: Tzu-ping Chung <uranu...@gmail.com> AuthorDate: Thu May 9 10:44:39 2024 +0800 Implement slicing in lazy sequence (#39483) --- airflow/utils/db.py | 77 ++++++++++++++++++---- .../authoring-and-scheduling/datasets.rst | 25 ++++++- tests/models/test_taskinstance.py | 53 +++++++++++++++ 3 files changed, 141 insertions(+), 14 deletions(-) diff --git a/airflow/utils/db.py b/airflow/utils/db.py index fb6416966b..6c68879074 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -2043,15 +2043,70 @@ class LazySelectSequence(Sequence[T]): def __getitem__(self, key: int) -> T: ... @overload - def __getitem__(self, key: slice) -> Self: ... + def __getitem__(self, key: slice) -> Sequence[T]: ... - def __getitem__(self, key: int | slice) -> T | Self: - if not isinstance(key, int): - raise ValueError("non-index access is not supported") - if key >= 0: - stmt = self._select_asc.offset(key) - else: - stmt = self._select_desc.offset(-1 - key) - if (row := self._session.execute(stmt.limit(1)).one_or_none()) is None: - raise IndexError(key) - return self._process_row(row) + def __getitem__(self, key: int | slice) -> T | Sequence[T]: + if isinstance(key, int): + if key >= 0: + stmt = self._select_asc.offset(key) + else: + stmt = self._select_desc.offset(-1 - key) + if (row := self._session.execute(stmt.limit(1)).one_or_none()) is None: + raise IndexError(key) + return self._process_row(row) + elif isinstance(key, slice): + # This implements the slicing syntax. We want to optimize negative + # slicing (e.g. seq[-10:]) by not doing an additional COUNT query + # if possible. We can do this unless the start and stop have + # different signs (i.e. one is positive and another negative). + start, stop, reverse = _coerce_slice(key) + if start >= 0: + if stop is None: + stmt = self._select_asc.offset(start) + elif stop >= 0: + stmt = self._select_asc.slice(start, stop) + else: + stmt = self._select_asc.slice(start, len(self) + stop) + rows = [self._process_row(row) for row in self._session.execute(stmt)] + if reverse: + rows.reverse() + else: + if stop is None: + stmt = self._select_desc.limit(-start) + elif stop < 0: + stmt = self._select_desc.slice(-stop, -start) + else: + stmt = self._select_desc.slice(len(self) - stop, -start) + rows = [self._process_row(row) for row in self._session.execute(stmt)] + if not reverse: + rows.reverse() + return rows + raise TypeError(f"Sequence indices must be integers or slices, not {type(key).__name__}") + + +def _coerce_index(value: Any) -> int | None: + """Check slice attribute's type and convert it to int. + + See CPython documentation on this: + https://docs.python.org/3/reference/datamodel.html#object.__index__ + """ + if value is None or isinstance(value, int): + return value + if (index := getattr(value, "__index__", None)) is not None: + return index() + raise TypeError("slice indices must be integers or None or have an __index__ method") + + +def _coerce_slice(key: slice) -> tuple[int, int | None, bool]: + """Check slice content and convert it for SQL. + + See CPython documentation on this: + https://docs.python.org/3/reference/datamodel.html#slice-objects + """ + if key.step is None or key.step == 1: + reverse = False + elif key.step == -1: + reverse = True + else: + raise ValueError("non-trivial slice step not supported") + return _coerce_index(key.start) or 0, _coerce_index(key.stop), reverse diff --git a/docs/apache-airflow/authoring-and-scheduling/datasets.rst b/docs/apache-airflow/authoring-and-scheduling/datasets.rst index c08b5ea0d8..5854e79edf 100644 --- a/docs/apache-airflow/authoring-and-scheduling/datasets.rst +++ b/docs/apache-airflow/authoring-and-scheduling/datasets.rst @@ -257,16 +257,34 @@ Another way to achieve the same is by accessing ``outlet_events`` in a task's ex @task(outlets=[example_s3_dataset]) def write_to_s3(*, outlet_events): - outlet_events[example_s3_dataset].extras = {"row_count": len(df)} + outlet_events[example_s3_dataset].extra = {"row_count": len(df)} There's minimal magic here---Airflow simply writes the yielded values to the exact same accessor. This also works in classic operators, including ``execute``, ``pre_execute``, and ``post_execute``. +Fetching information from previously emitted dataset events +----------------------------------------------------------- + +.. versionadded:: 2.10.0 + +Events of a dataset defined in a task's ``outlets``, as described in the previous section, can be read by a task that declares the same dataset in its ``inlets``. A dataset event entry contains ``extra`` (see previous section for details), ``timestamp`` indicating when the event was emitted from a task, and ``source_task_instance`` linking the event back to its source. + +Inlet dataset events can be read with the ``inlet_events`` accessor in the execution context. Continuing from the ``write_to_s3`` task in the previous section: + +.. code-block:: python + + @task(inlets=[example_s3_dataset]) + def post_process_s3_file(*, inlet_events): + events = inlet_events[example_s3_dataset] + last_row_count = events[-1].extra["row_count"] + +Each value in the ``inlet_events`` mapping is a sequence-like object that orders past events of a given dataset by ``timestamp``, earliest to latest. It supports most of Python's list interface, so you can use ``[-1]`` to access the last event, ``[-2:]`` for the last two, etc. The accessor is lazy and only hits the database when you access items inside it. + + Fetching information from a triggering dataset event ---------------------------------------------------- -A triggered DAG can fetch information from the dataset that triggered it using the ``triggering_dataset_events`` template or parameter. -See more at :ref:`templates-ref`. +A triggered DAG can fetch information from the dataset that triggered it using the ``triggering_dataset_events`` template or parameter. See more at :ref:`templates-ref`. Example: @@ -301,6 +319,7 @@ Example: Note that this example is using `(.values() | first | first) <https://jinja.palletsprojects.com/en/3.1.x/templates/#jinja-filters.first>`_ to fetch the first of one dataset given to the DAG, and the first of one DatasetEvent for that dataset. An implementation can be quite complex if you have multiple datasets, potentially with multiple DatasetEvents. + Advanced dataset scheduling with conditional expressions -------------------------------------------------------- diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index 11d833a21c..3ab3ccc31f 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -2480,6 +2480,59 @@ class TestTaskInstance: assert not dr.task_instance_scheduling_decisions(session=session).schedulable_tis assert read_task_evaluated + @pytest.mark.parametrize( + "slicer, expected", + [ + (lambda x: x[-2:], [{"from": 8}, {"from": 9}]), + (lambda x: x[-5:-3], [{"from": 5}, {"from": 6}]), + (lambda x: x[:-8], [{"from": 0}, {"from": 1}]), + (lambda x: x[1:-7], [{"from": 1}, {"from": 2}]), + (lambda x: x[-8:4], [{"from": 2}, {"from": 3}]), + (lambda x: x[-5:5], []), + ], + ) + def test_inlet_dataset_extra_slice(self, dag_maker, session, slicer, expected): + from airflow.datasets import Dataset + + ds_uri = "test_inlet_dataset_extra_slice" + + with dag_maker(dag_id="write", schedule="@daily", params={"i": -1}, session=session): + + @task(outlets=Dataset(ds_uri)) + def write(*, params, outlet_events): + outlet_events[ds_uri].extra = {"from": params["i"]} + + write() + + # Run the write DAG 10 times. + dr = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED, conf={"i": 0}) + for ti in dr.get_task_instances(session=session): + ti.run(session=session) + for i in range(1, 10): + dr = dag_maker.create_dagrun_after(dr, run_type=DagRunType.SCHEDULED, conf={"i": i}) + for ti in dr.get_task_instances(session=session): + ti.run(session=session) + + result = "the task does not run" + + with dag_maker(dag_id="read", schedule=None, session=session): + + @task(inlets=Dataset(ds_uri)) + def read(*, inlet_events): + nonlocal result + result = [e.extra for e in slicer(inlet_events[ds_uri])] + + read() + + # Run the read DAG. + dr = dag_maker.create_dagrun() + for ti in dr.get_task_instances(session=session): + ti.run(session=session) + + # Should be done. + assert not dr.task_instance_scheduling_decisions(session=session).schedulable_tis + assert result == expected + def test_changing_of_dataset_when_ddrq_is_already_populated(self, dag_maker): """ Test that when a task that produces dataset has ran, that changing the consumer