I am running Airflow 2.0.1 with the KubernetesExcecutor on OpenShift. Last 
night I finally got a simple PythonOperator test DAG to run, but now the DAG 
tasks mysteriously fail with no indication why. Here is what happens when I run 
the task with DEBUG level logging:

----------
$ airflow tasks run test_python do-it 2021-02-09T04:14:25.310295+00:00 --local 
--pool default_pool --subdir 
/home/airflow/repo/hs-datalabs/Source/Python/datalabs/airflow/dag/test_python.py
[2021-02-09 04:28:12,070] {settings.py:210} DEBUG - Setting up DB connection 
pool (PID 17)
[2021-02-09 04:28:12,071] {settings.py:281} DEBUG - 
settings.prepare_engine_args(): Using pool settings. pool_size=5, 
max_overflow=10, pool_recycle=1800, pid=17
[2021-02-09 04:28:12,334] {cli_action_loggers.py:40} DEBUG - Adding <function 
default_action_log at 0x7f053b740c20> to pre execution callback
[2021-02-09 04:28:15,235] {cli_action_loggers.py:66} DEBUG - Calling callbacks: 
[<function default_action_log at 0x7f053b740c20>]
[2021-02-09 04:28:15,321] {settings.py:210} DEBUG - Setting up DB connection 
pool (PID 17)
[2021-02-09 04:28:15,322] {settings.py:243} DEBUG - 
settings.prepare_engine_args(): Using NullPool
[2021-02-09 04:28:15,323] {dagbag.py:448} INFO - Filling up the DagBag from 
/home/airflow/repo/hs-datalabs/Source/Python/datalabs/airflow/dag/test_python.py
[2021-02-09 04:28:15,426] {dagbag.py:287} DEBUG - Importing 
/home/airflow/repo/hs-datalabs/Source/Python/datalabs/airflow/dag/test_python.py
[2021-02-09 04:28:15,466] {dagbag.py:413} DEBUG - Loaded DAG <DAG: test_python>
[2021-02-09 04:28:15,467] {dagbag.py:287} DEBUG - Importing 
/usr/local/lib/python3.7/site-packages/airflow/example_dags/example_bash_operator.py
[2021-02-09 04:28:15,472] {dagbag.py:413} DEBUG - Loaded DAG <DAG: 
example_bash_operator>
[2021-02-09 04:28:15,472] {dagbag.py:287} DEBUG - Importing 
/usr/local/lib/python3.7/site-packages/airflow/example_dags/example_branch_operator.py
[2021-02-09 04:28:15,477] {dagbag.py:413} DEBUG - Loaded DAG <DAG: 
example_branch_operator>
. . .
[2021-02-09 04:28:15,764] {dagbag.py:287} DEBUG - Importing 
/usr/local/lib/python3.7/site-packages/airflow/example_dags/tutorial_taskflow_api_etl.py
[2021-02-09 04:28:15,766] {dagbag.py:413} DEBUG - Loaded DAG <DAG: 
tutorial_taskflow_api_etl>
[2021-02-09 04:28:15,766] {dagbag.py:287} DEBUG - Importing 
/usr/local/lib/python3.7/site-packages/airflow/example_dags/subdags/subdag.py
[2021-02-09 04:28:15,801] {plugins_manager.py:270} DEBUG - Loading plugins
[2021-02-09 04:28:15,801] {plugins_manager.py:207} DEBUG - Loading plugins from 
directory: /home/airflow/airflow/plugins
[2021-02-09 04:28:15,801] {plugins_manager.py:184} DEBUG - Loading plugins from 
entrypoints
[2021-02-09 04:28:16,128] {plugins_manager.py:414} DEBUG - Integrate DAG plugins
Running <TaskInstance: test_python.do-it 2021-02-09T04:14:25.310295+00:00 
[failed]> on host testpythondoit.3c93d77d6b8141348718e2c6467e55a9-debug
[2021-02-09 04:28:16,442] {cli_action_loggers.py:84} DEBUG - Calling callbacks: 
[]
[2021-02-09 04:28:16,442] {settings.py:292} DEBUG - Disposing DB connection 
pool (PID 17)
----------

Here is my DAG. Note that neither of the callbacks are running either:

----------
from pprint import pprint

from airflow import DAG
from kubernetes.client import models as k8s
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago


def print_that(datestamp, **kwargs):
    pprint(kwargs)
    print(datestamp)

    return "Foobiddy Doobiddy"


def on_failure_callback(context):
    dag_run = context.get('dag_run')
    task_instances = dag_run.get_task_instances()
    print(f'Failure: {task_instances}')


def on_success_callback(context):
    dag_run = context.get('dag_run')
    task_instances = dag_run.get_task_instances()
    print(f'Success: {task_instances}')


with DAG(
    dag_id='test_python',
    default_args={'owner': 'airflow'},
    schedule_interval=None,
    start_date=days_ago(2),
    tags=['testing'],
    on_failure_callback=on_failure_callback,
    on_success_callback=on_success_callback,
) as dag:
    do_it = PythonOperator(
        task_id="do-it",
       python_callable=print_that,
        executor_config={
            "pod_override": k8s.V1Pod(
                spec=k8s.V1PodSpec(
                    containers=[
                        k8s.V1Container(
                            name="base",
                            
image="docker-registry.default.svc:5000/hsg-data-labs-dev/airflow-worker:1.0.1"
                        )
                    ]
                )
            ),
        },
    )


    do_it_again = PythonOperator(
        task_id="do-it-again",
        python_callable=print_that,
        executor_config={
            "pod_override": k8s.V1Pod(
                spec=k8s.V1PodSpec(
                    containers=[
                        k8s.V1Container(
                            name="base",
                            
image="docker-registry.default.svc:5000/hsg-data-labs-dev/airflow-worker:1.0.1"
                        )
                    ]
                )
            ),
        },
    )


do_it >> do_it_again
----------

I would really appreciate some ideas on how I would debug or what might be 
wrong.
Thanks,
Peter

Reply via email to