[GitHub] [airflow] josh-fell commented on a diff in pull request #29143: Demonstrate usage of the PythonSensor

2023-01-25 Thread via GitHub


josh-fell commented on code in PR #29143:
URL: https://github.com/apache/airflow/pull/29143#discussion_r1086725027


##
docs/apache-airflow/howto/operator/python.rst:
##
@@ -225,11 +225,29 @@ Jinja templating can be used in same way as described for 
the PythonOperator.
 PythonSensor
 
 
-Use the :class:`~airflow.sensors.python.PythonSensor` to use arbitrary 
callable for sensing. The callable
-should return True when it succeeds, False otherwise.
+A PythonSensor waits for a certain condition to be ``True``, for example to 
wait for a file to exist. The
+PythonSensor is available via ``@task.sensor`` and 
``airflow.sensors.python.PythonSensor``. The callable
+should return a boolean ``True`` or ``False``, indicating whether a condition 
is met. For example:
 
-.. exampleinclude:: /../../airflow/example_dags/example_sensors.py
-:language: python
-:dedent: 4
-:start-after: [START example_python_sensors]
-:end-before: [END example_python_sensors]
+.. code-block:: python
+
+import datetime
+
+from airflow.decorators import dag, task
+from airflow.sensors.python import PythonSensor
+
+
+@dag(start_date=datetime.datetime(2023, 1, 1), schedule=None)
+def example():
+@task.sensor
+def wait_for_success():
+return datetime.datetime.now().minute % 2 == 0
+
+wait_for_success()
+PythonSensor(task_id="wait_for_even_minute", 
python_callable=wait_for_success)

Review Comment:
   Yeah the `PythonSensor` task succeeds but the callable is not actually 
called.
   
   ```python
   import datetime
   from airflow.decorators import dag, task
   from airflow.sensors.python import PythonSensor
   
   
   @dag(start_date=datetime.datetime(2023, 1, 1), schedule=None)
   def example():
   @task.sensor
   def wait_for_success():
   print("Poking.")
   return datetime.datetime.now().minute % 2 == 0
   
   wait_for_success()
   PythonSensor(task_id="wait_for_even_minute", 
python_callable=wait_for_success)
   
   
   example()
   ```
   Looking at the task logs for both tasks, only the `@task.sensor`-decorated 
task executes the `wait_for_success()` function. The DAG run was triggered on 
an odd minute as well:
   **wait_for_success**
   https://user-images.githubusercontent.com/48934154/214590213-53a9382c-82bf-42da-96ad-84e076361566.png";>
   **wait_for_even_minute**
   https://user-images.githubusercontent.com/48934154/214590426-d18eeb88-4245-4daf-8fa2-24b31c375a2a.png";>
   
   You can't reference a TaskFlow function as the `python_callable` for 
`PythonOperator`. The `PythonOperator` task fails with "ERROR - Object of type 
PlainXComArg is not JSON serializable. If you are using pickle instead of JSON 
for XCom, then you need to enable pickle support for XCom in your airflow 
config or make sure to decorate your object with attr.". Either case it should 
probably throw a more pertinent exception when a TaskFlow function is used as a 
`python_callable` in either scenario (and for all Python operators I suppose).
   
   +1 for separate functions.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] josh-fell commented on a diff in pull request #29143: Demonstrate usage of the PythonSensor

2023-01-25 Thread via GitHub


josh-fell commented on code in PR #29143:
URL: https://github.com/apache/airflow/pull/29143#discussion_r1086725027


##
docs/apache-airflow/howto/operator/python.rst:
##
@@ -225,11 +225,29 @@ Jinja templating can be used in same way as described for 
the PythonOperator.
 PythonSensor
 
 
-Use the :class:`~airflow.sensors.python.PythonSensor` to use arbitrary 
callable for sensing. The callable
-should return True when it succeeds, False otherwise.
+A PythonSensor waits for a certain condition to be ``True``, for example to 
wait for a file to exist. The
+PythonSensor is available via ``@task.sensor`` and 
``airflow.sensors.python.PythonSensor``. The callable
+should return a boolean ``True`` or ``False``, indicating whether a condition 
is met. For example:
 
-.. exampleinclude:: /../../airflow/example_dags/example_sensors.py
-:language: python
-:dedent: 4
-:start-after: [START example_python_sensors]
-:end-before: [END example_python_sensors]
+.. code-block:: python
+
+import datetime
+
+from airflow.decorators import dag, task
+from airflow.sensors.python import PythonSensor
+
+
+@dag(start_date=datetime.datetime(2023, 1, 1), schedule=None)
+def example():
+@task.sensor
+def wait_for_success():
+return datetime.datetime.now().minute % 2 == 0
+
+wait_for_success()
+PythonSensor(task_id="wait_for_even_minute", 
python_callable=wait_for_success)

Review Comment:
   Yeah the `PythonSensor` task succeeds but the callable is not actually 
called.
   
   ```python
   import datetime
   from airflow.decorators import dag, task
   from airflow.sensors.python import PythonSensor
   
   
   @dag(start_date=datetime.datetime(2023, 1, 1), schedule=None)
   def example():
   @task.sensor
   def wait_for_success():
   print("Poking.")
   return datetime.datetime.now().minute % 2 == 0
   
   wait_for_success()
   PythonSensor(task_id="wait_for_even_minute", 
python_callable=wait_for_success)
   
   
   example()
   ```
   Looking at the task logs for both tasks, only the `@task.sensor`-decorated 
task executes the `wait_for_success()` function. The DAG run was triggered on 
an odd minute as well:
   **wait_for_success**
   https://user-images.githubusercontent.com/48934154/214590213-53a9382c-82bf-42da-96ad-84e076361566.png";>
   **wait_for_even_minute**
   https://user-images.githubusercontent.com/48934154/214590426-d18eeb88-4245-4daf-8fa2-24b31c375a2a.png";>
   
   You can't reference a TaskFlow function as the `python_callable` for 
`PythonOperator`. The `PythonOperator` task fails with "ERROR - Object of type 
PlainXComArg is not JSON serializable. If you are using pickle instead of JSON 
for XCom, then you need to enable pickle support for XCom in your airflow 
config or make sure to decorate your object with attr.". Either case it should 
probably throw a more pertinent exception when a TaskFlow function is used as a 
`python_callable` in either scenario (and for all decorators I suppose).
   
   +1 for separate functions.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] josh-fell commented on a diff in pull request #29143: Demonstrate usage of the PythonSensor

2023-01-24 Thread via GitHub


josh-fell commented on code in PR #29143:
URL: https://github.com/apache/airflow/pull/29143#discussion_r1086144868


##
docs/apache-airflow/howto/operator/python.rst:
##
@@ -225,11 +225,29 @@ Jinja templating can be used in same way as described for 
the PythonOperator.
 PythonSensor
 
 
-Use the :class:`~airflow.sensors.python.PythonSensor` to use arbitrary 
callable for sensing. The callable
-should return True when it succeeds, False otherwise.
+A PythonSensor waits for a certain condition to be ``True``, for example to 
wait for a file to exist. The
+PythonSensor is available via ``@task.sensor`` and 
``airflow.sensors.python.PythonSensor``. The callable
+should return a boolean ``True`` or ``False``, indicating whether a condition 
is met. For example:

Review Comment:
   _Technically_ ~the `PythonSensor`~ they both can also return ~the truthy or 
falsy~ a non-boolean value via `PokeReturnValue` ~and, interestingly enough, 
[this PR](https://github.com/apache/airflow/pull/29146) was opened to mimic the 
same behavior in `@task.sensor`~.
   
   Not sure if you want to dive into this nuance here.
   
   edit: Oh boy Josh, you really didn't do your homework on that comment 🤦 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] josh-fell commented on a diff in pull request #29143: Demonstrate usage of the PythonSensor

2023-01-24 Thread via GitHub


josh-fell commented on code in PR #29143:
URL: https://github.com/apache/airflow/pull/29143#discussion_r1086143815


##
docs/apache-airflow/howto/operator/python.rst:
##
@@ -225,11 +225,29 @@ Jinja templating can be used in same way as described for 
the PythonOperator.
 PythonSensor
 
 
-Use the :class:`~airflow.sensors.python.PythonSensor` to use arbitrary 
callable for sensing. The callable
-should return True when it succeeds, False otherwise.
+A PythonSensor waits for a certain condition to be ``True``, for example to 
wait for a file to exist. The
+PythonSensor is available via ``@task.sensor`` and 
``airflow.sensors.python.PythonSensor``. The callable
+should return a boolean ``True`` or ``False``, indicating whether a condition 
is met. For example:
 
-.. exampleinclude:: /../../airflow/example_dags/example_sensors.py
-:language: python
-:dedent: 4
-:start-after: [START example_python_sensors]
-:end-before: [END example_python_sensors]
+.. code-block:: python
+
+import datetime
+
+from airflow.decorators import dag, task
+from airflow.sensors.python import PythonSensor
+
+
+@dag(start_date=datetime.datetime(2023, 1, 1), schedule=None)
+def example():
+@task.sensor
+def wait_for_success():
+return datetime.datetime.now().minute % 2 == 0
+
+wait_for_success()
+PythonSensor(task_id="wait_for_even_minute", 
python_callable=wait_for_success)

Review Comment:
   This won't function properly since the callable is now a TaskFlow function.



##
docs/apache-airflow/howto/operator/python.rst:
##
@@ -225,11 +225,29 @@ Jinja templating can be used in same way as described for 
the PythonOperator.
 PythonSensor
 
 
-Use the :class:`~airflow.sensors.python.PythonSensor` to use arbitrary 
callable for sensing. The callable
-should return True when it succeeds, False otherwise.
+A PythonSensor waits for a certain condition to be ``True``, for example to 
wait for a file to exist. The
+PythonSensor is available via ``@task.sensor`` and 
``airflow.sensors.python.PythonSensor``. The callable
+should return a boolean ``True`` or ``False``, indicating whether a condition 
is met. For example:

Review Comment:
   _Technically_ the `PythonSensor` can also return the truthy or falsy value 
via `PokeReturnValue` and, interestingly enough, [this 
PR](https://github.com/apache/airflow/pull/29146) was opened to mimic the same 
behavior in `@task.sensor`.
   
   Not sure if you want to dive into this nuance here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org