Which task specifically is it complaining about? What version did you upgrade from? A better late-than-never Ash
On Oct 24 2022, at 2:00 pm, Anthony Joyce <[email protected]> wrote: > Dear All- > > After upgrading to Airflow 2.4.2 and running `airflow db upgrade`, I started > receiving “[…] already registered for DAG warnings. I posted by DAG file > below for reference. > > Anyone know what I can do to relieve these warnings? It seems like DAG is > written appropriately, but another set of eyes is always helpful. Thanks all! > > Best, > > Anthony > > > > default_args = { > "owner": "airflow", > "start_date": datetime(2019, 6, 21), > "email": [__email__], > "email_on_failure": True, > "on_failure_callback": send_slack_message_failed, > "email_on_retry": False, > "retries": 1, > "retry_delay": timedelta(minutes=1), > } > > with DAG( > dag_id="PAPro_ETL", > catchup=False, > default_args=default_args, > schedule_interval="@daily", > max_active_runs=1, > ) as dag: > start = DummyOperator(task_id="start") > > send_slack_message_success = SlackAPIPostOperator( > task_id="send_slack_message_success", > token=Variable.get("slack_token"), > channel=Variable.get("slack_status_channel"), > username="BI-ETL-airflow", > text=":white_check_mark: {{ dag }} for {{ ds }} was completed @ " > + f"{datetime.now().strftime('%F %T')} UTC.", > ) > > for truncate_table in truncate_tables: > if truncate_table == "qu_pap_clicks" or truncate_table == > "qu_pap_impressions": > truncate_staging = PythonOperator( > task_id=f"truncate_staging_{truncate_table}_table", > python_callable=truncate_staging_table, > op_args=[truncate_table], > ) > > load_destination_table = PostgresOperator( > task_id=f"load_destination_{truncate_table}_table", > postgres_conn_id=destination_connection, > sql=f"papro_etl/insert_{truncate_table}.sql" > ) > > with TaskGroup(group_id=f'{truncate_table}_extract') as LoadStagingTable: > for schema_name, schema in schemas.items(): > extract_to_staging = PostgresOperator( > task_id=f"{schema_name}_{truncate_table}_extract", > postgres_conn_id=staging_connection, > sql=f"papro_etl/extract_{truncate_table}.sql", > params={"schema_name": schema_name, "schema": schema}, > ) > > (start >> truncate_staging >> LoadStagingTable > >> load_destination_table >> send_slack_message_success) > > else: > truncate_staging = PythonOperator( > task_id=f"truncate_staging_{truncate_table}_table", > python_callable=truncate_staging_table, > op_args=[truncate_table], > ) > > compare_rows = PythonOperator( > task_id=f"compare_{truncate_table}_rows", > python_callable=compare_staging_to_destination, > op_args=[truncate_table], > provide_context=True, > ) > > load_destination_table = PythonOperator( > task_id=f"load_destination_{truncate_table}_table", > python_callable=truncate_and_load_destination_table, > op_args=[truncate_table] > ) > > with TaskGroup(group_id=f'{truncate_table}_extract') as LoadStagingTable: > for schema_name, schema in schemas.items(): > extract_to_staging = PostgresOperator( > task_id=f"{schema_name}_{truncate_table}_extract", > postgres_conn_id=staging_connection, > sql=f"papro_etl/extract_{truncate_table}.sql", > params={"schema_name": schema_name, "schema": schema}, > ) > (start >> truncate_staging >> LoadStagingTable > >> compare_rows >> load_destination_table > >> send_slack_message_success) >
