This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new e3e6776576 Fix system tests (#39193) e3e6776576 is described below commit e3e67765765b8c0404aa926629e826d9600d437e Author: max <42827971+moiseen...@users.noreply.github.com> AuthorDate: Tue Apr 23 09:59:18 2024 +0000 Fix system tests (#39193) --- .../google/cloud/gcs/example_mysql_to_gcs.py | 194 ++++++++------- .../cloud/sql_to_sheets/example_sql_to_sheets.py | 277 ++++++++++----------- .../cloud/transfers/example_postgres_to_gcs.py | 200 +++++++-------- 3 files changed, 327 insertions(+), 344 deletions(-) diff --git a/tests/system/providers/google/cloud/gcs/example_mysql_to_gcs.py b/tests/system/providers/google/cloud/gcs/example_mysql_to_gcs.py index 5b96b49c1d..884149d984 100644 --- a/tests/system/providers/google/cloud/gcs/example_mysql_to_gcs.py +++ b/tests/system/providers/google/cloud/gcs/example_mysql_to_gcs.py @@ -62,21 +62,38 @@ PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "example-project") REGION = "europe-west2" ZONE = REGION + "-a" NETWORK = "default" +CONNECTION_ID = f"mysql_{DAG_ID}_{ENV_ID}".replace("-", "_") +CONNECTION_TYPE = "mysql" + +BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}" +FILE_NAME = "result.json" DB_NAME = "testdb" DB_PORT = 3306 DB_USER_NAME = "root" DB_USER_PASSWORD = "demo_password" +SETUP_MYSQL_COMMAND = f""" +sudo apt update && +sudo apt install -y docker.io && +sudo docker run -d -p {DB_PORT}:{DB_PORT} --name {DB_NAME} \ + -e MYSQL_ROOT_PASSWORD={DB_USER_PASSWORD} \ + -e MYSQL_DATABASE={DB_NAME} \ + mysql:8.1.0 +""" +SQL_TABLE = "test_table" +SQL_CREATE = f"CREATE TABLE IF NOT EXISTS {DB_NAME}.{SQL_TABLE} (col_1 INT, col_2 VARCHAR(8))" +SQL_INSERT = f"INSERT INTO {DB_NAME}.{SQL_TABLE} (col_1, col_2) VALUES (1, 'one'), (2, 'two')" +SQL_SELECT = f"SELECT * FROM {DB_NAME}.{SQL_TABLE}" -SHORT_MACHINE_TYPE_NAME = "n1-standard-1" -DB_INSTANCE_NAME = f"instance-{DAG_ID}-{ENV_ID}".replace("_", "-") +GCE_MACHINE_TYPE = "n1-standard-1" +GCE_INSTANCE_NAME = f"instance-{DAG_ID}-{ENV_ID}".replace("_", "-") GCE_INSTANCE_BODY = { - "name": DB_INSTANCE_NAME, - "machine_type": f"zones/{ZONE}/machineTypes/{SHORT_MACHINE_TYPE_NAME}", + "name": GCE_INSTANCE_NAME, + "machine_type": f"zones/{ZONE}/machineTypes/{GCE_MACHINE_TYPE}", "disks": [ { "boot": True, - "device_name": DB_INSTANCE_NAME, + "device_name": GCE_INSTANCE_NAME, "initialize_params": { "disk_size_gb": "10", "disk_type": f"zones/{ZONE}/diskTypes/pd-balanced", @@ -92,56 +109,41 @@ GCE_INSTANCE_BODY = { } ], } -DELETE_PERSISTENT_DISK = f""" +FIREWALL_RULE_NAME = f"allow-http-{DB_PORT}" +CREATE_FIREWALL_RULE_COMMAND = f""" if [ $AIRFLOW__API__GOOGLE_KEY_PATH ]; then \ gcloud auth activate-service-account --key-file=$AIRFLOW__API__GOOGLE_KEY_PATH; \ fi; -gcloud compute disks delete {DB_INSTANCE_NAME} --project={PROJECT_ID} --zone={ZONE} --quiet +if [ -z gcloud compute firewall-rules list --filter=name:{FIREWALL_RULE_NAME} --format="value(name)" ]; then \ + gcloud compute firewall-rules create {FIREWALL_RULE_NAME} \ + --project={PROJECT_ID} \ + --direction=INGRESS \ + --priority=100 \ + --network={NETWORK} \ + --action=ALLOW \ + --rules=tcp:{DB_PORT} \ + --source-ranges=0.0.0.0/0 +else + echo "Firewall rule {FIREWALL_RULE_NAME} already exists." +fi """ - -SETUP_MYSQL = f""" -sudo apt update && -sudo apt install -y docker.io && -sudo docker run -d -p {DB_PORT}:{DB_PORT} --name {DB_NAME} \ - -e MYSQL_ROOT_PASSWORD={DB_USER_PASSWORD} \ - -e MYSQL_DATABASE={DB_NAME} \ - mysql:8.1.0 -""" - -FIREWALL_RULE_NAME = f"allow-http-{DB_PORT}" -CREATE_FIREWALL_RULE = f""" +DELETE_FIREWALL_RULE_COMMAND = f""" if [ $AIRFLOW__API__GOOGLE_KEY_PATH ]; then \ gcloud auth activate-service-account --key-file=$AIRFLOW__API__GOOGLE_KEY_PATH; \ +fi; \ +if [ gcloud compute firewall-rules list --filter=name:{FIREWALL_RULE_NAME} --format="value(name)" ]; then \ + gcloud compute firewall-rules delete {FIREWALL_RULE_NAME} --project={PROJECT_ID} --quiet; \ fi; - -gcloud compute firewall-rules create {FIREWALL_RULE_NAME} \ - --project={PROJECT_ID} \ - --direction=INGRESS \ - --priority=100 \ - --network={NETWORK} \ - --action=ALLOW \ - --rules=tcp:{DB_PORT} \ - --source-ranges=0.0.0.0/0 """ -DELETE_FIREWALL_RULE = f""" +DELETE_PERSISTENT_DISK_COMMAND = f""" if [ $AIRFLOW__API__GOOGLE_KEY_PATH ]; then \ gcloud auth activate-service-account --key-file=$AIRFLOW__API__GOOGLE_KEY_PATH; \ fi; -gcloud compute firewall-rules delete {FIREWALL_RULE_NAME} --project={PROJECT_ID} --quiet +gcloud compute disks delete {GCE_INSTANCE_NAME} --project={PROJECT_ID} --zone={ZONE} --quiet """ -CONNECTION_ID = f"mysql_{DAG_ID}_{ENV_ID}".replace("-", "_") - -BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}" -FILE_NAME = "result.json" - -SQL_TABLE = "test_table" -SQL_CREATE = f"CREATE TABLE IF NOT EXISTS {SQL_TABLE} (col_1 INT, col_2 VARCHAR(8))" -SQL_INSERT = f"INSERT INTO {SQL_TABLE} (col_1, col_2) VALUES (1, 'one'), (2, 'two')" -SQL_SELECT = f"SELECT * FROM {SQL_TABLE}" - log = logging.getLogger(__name__) @@ -153,8 +155,8 @@ with DAG( catchup=False, tags=["example", "mysql", "gcs"], ) as dag: - create_instance = ComputeEngineInsertInstanceOperator( - task_id="create_instance", + create_gce_instance = ComputeEngineInsertInstanceOperator( + task_id="create_gce_instance", project_id=PROJECT_ID, zone=ZONE, body=GCE_INSTANCE_BODY, @@ -162,127 +164,129 @@ with DAG( create_firewall_rule = BashOperator( task_id="create_firewall_rule", - bash_command=CREATE_FIREWALL_RULE, + bash_command=CREATE_FIREWALL_RULE_COMMAND, ) setup_mysql = SSHOperator( task_id="setup_mysql", ssh_hook=ComputeEngineSSHHook( user="username", - instance_name=DB_INSTANCE_NAME, + instance_name=GCE_INSTANCE_NAME, zone=ZONE, project_id=PROJECT_ID, use_oslogin=False, use_iap_tunnel=False, cmd_timeout=180, ), - command=SETUP_MYSQL, + command=SETUP_MYSQL_COMMAND, retries=2, ) @task def get_public_ip() -> str: hook = ComputeEngineHook() - address = hook.get_instance_address(resource_id=DB_INSTANCE_NAME, zone=ZONE, project_id=PROJECT_ID) + address = hook.get_instance_address(resource_id=GCE_INSTANCE_NAME, zone=ZONE, project_id=PROJECT_ID) return address get_public_ip_task = get_public_ip() @task - def setup_mysql_connection(**kwargs) -> None: - public_ip = kwargs["ti"].xcom_pull(task_ids="get_public_ip") + def setup_connection(ip_address: str) -> None: connection = Connection( conn_id=CONNECTION_ID, - description="Example MySQL connection", - conn_type="mysql", - host=public_ip, + description="Example connection", + conn_type=CONNECTION_TYPE, + host=ip_address, login=DB_USER_NAME, password=DB_USER_PASSWORD, - schema=DB_NAME, + port=DB_PORT, ) session = Session() - if session.query(Connection).filter(Connection.conn_id == CONNECTION_ID).first(): - log.warning("Connection %s already exists", CONNECTION_ID) - return None + log.info("Removing connection %s if it exists", CONNECTION_ID) + query = session.query(Connection).filter(Connection.conn_id == CONNECTION_ID) + query.delete() session.add(connection) session.commit() + log.info("Connection %s created", CONNECTION_ID) - setup_mysql_connection_task = setup_mysql_connection() - - create_bucket = GCSCreateBucketOperator( - task_id="create_bucket", - bucket_name=BUCKET_NAME, - ) + setup_connection_task = setup_connection(get_public_ip_task) create_sql_table = SQLExecuteQueryOperator( task_id="create_sql_table", conn_id=CONNECTION_ID, sql=SQL_CREATE, + retries=4, ) - insert_data = SQLExecuteQueryOperator( - task_id="insert_data", + insert_sql_data = SQLExecuteQueryOperator( + task_id="insert_sql_data", conn_id=CONNECTION_ID, sql=SQL_INSERT, ) + create_gcs_bucket = GCSCreateBucketOperator( + task_id="create_gcs_bucket", + bucket_name=BUCKET_NAME, + ) + # [START howto_operator_mysql_to_gcs] - upload_mysql_to_gcs = MySQLToGCSOperator( - task_id="mysql_to_gcs", sql=SQL_SELECT, bucket=BUCKET_NAME, filename=FILE_NAME, export_format="csv" + mysql_to_gcs = MySQLToGCSOperator( + task_id="mysql_to_gcs", + mysql_conn_id=CONNECTION_ID, + sql=SQL_SELECT, + bucket=BUCKET_NAME, + filename=FILE_NAME, + export_format="csv", ) # [END howto_operator_mysql_to_gcs] - delete_mysql_connection = BashOperator( - task_id="delete_mysql_connection", - bash_command=f"airflow connections delete {CONNECTION_ID}", + delete_gcs_bucket = GCSDeleteBucketOperator( + task_id="delete_gcs_bucket", + bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE, ) - delete_bucket = GCSDeleteBucketOperator( - task_id="delete_bucket", - bucket_name=BUCKET_NAME, + delete_firewall_rule = BashOperator( + task_id="delete_firewall_rule", + bash_command=DELETE_FIREWALL_RULE_COMMAND, trigger_rule=TriggerRule.ALL_DONE, ) - delete_instance = ComputeEngineDeleteInstanceOperator( - task_id="delete_instance", - resource_id=DB_INSTANCE_NAME, + delete_gce_instance = ComputeEngineDeleteInstanceOperator( + task_id="delete_gce_instance", + resource_id=GCE_INSTANCE_NAME, zone=ZONE, project_id=PROJECT_ID, trigger_rule=TriggerRule.ALL_DONE, ) - delete_firewall_rule = BashOperator( - task_id="delete_firewall_rule", - bash_command=DELETE_FIREWALL_RULE, - trigger_rule=TriggerRule.ALL_DONE, - ) - delete_persistent_disk = BashOperator( task_id="delete_persistent_disk", - bash_command=DELETE_PERSISTENT_DISK, + bash_command=DELETE_PERSISTENT_DISK_COMMAND, trigger_rule=TriggerRule.ALL_DONE, ) - ( - # TEST SETUP - create_instance - >> setup_mysql - >> get_public_ip_task - >> setup_mysql_connection_task - >> create_firewall_rule - >> create_sql_table - >> insert_data + delete_connection = BashOperator( + task_id="delete_connection", + bash_command=f"airflow connections delete {CONNECTION_ID}", + trigger_rule=TriggerRule.ALL_DONE, ) + + # TEST SETUP + create_gce_instance >> setup_mysql + create_gce_instance >> get_public_ip_task >> setup_connection_task + [setup_mysql, setup_connection_task, create_firewall_rule] >> create_sql_table >> insert_sql_data + ( - [insert_data, create_bucket] + [create_gcs_bucket, insert_sql_data] # TEST BODY - >> upload_mysql_to_gcs - # TEST TEARDOWN - >> [delete_instance, delete_bucket, delete_mysql_connection, delete_firewall_rule] + >> mysql_to_gcs ) - delete_instance >> delete_persistent_disk + + # TEST TEARDOWN + mysql_to_gcs >> [delete_gcs_bucket, delete_firewall_rule, delete_gce_instance, delete_connection] + delete_gce_instance >> delete_persistent_disk from tests.system.utils.watcher import watcher diff --git a/tests/system/providers/google/cloud/sql_to_sheets/example_sql_to_sheets.py b/tests/system/providers/google/cloud/sql_to_sheets/example_sql_to_sheets.py index 314f388c52..811526a82a 100644 --- a/tests/system/providers/google/cloud/sql_to_sheets/example_sql_to_sheets.py +++ b/tests/system/providers/google/cloud/sql_to_sheets/example_sql_to_sheets.py @@ -1,4 +1,3 @@ -# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -17,51 +16,17 @@ # under the License. """ -First, you need a db instance that is accessible from the Airflow environment. -You can, for example, create a Cloud SQL instance and connect to it from -within breeze with Cloud SQL proxy: -https://cloud.google.com/sql/docs/postgres/connect-instance-auth-proxy - -# DB setup -Create db: -``` -CREATE DATABASE test_db; -``` - -Switch to db: -``` -\c test_db -``` - -Create table and insert some rows -``` -CREATE TABLE test_table (col1 INT, col2 INT); -INSERT INTO test_table VALUES (1,2), (3,4), (5,6), (7,8); -``` - -# Setup connections -db connection: -In airflow UI, set one db connection, for example "postgres_default" -and make sure the "Test" at the bottom succeeds - -google cloud connection: -We need additional scopes for this test -scopes: https://www.googleapis.com/auth/spreadsheets, https://www.googleapis.com/auth/cloud-platform - -# Sheet -Finally, you need a Google Sheet you have access to, for testing you can -create a public sheet and get its ID. - -# Tear Down -You can delete the db with -``` -DROP DATABASE test_db; -``` +Example DAG using SQLToGoogleSheetsOperator. + +This DAG relies on the following OS environment variables + +* AIRFLOW__API__GOOGLE_KEY_PATH - Path to service account key file. Note, you can skip this variable if you + run this DAG in a Composer environment. + """ from __future__ import annotations -import json import logging import os from datetime import datetime @@ -80,38 +45,50 @@ from airflow.providers.google.cloud.operators.compute import ( from airflow.providers.google.suite.operators.sheets import GoogleSheetsCreateSpreadsheetOperator from airflow.providers.google.suite.transfers.sql_to_sheets import SQLToGoogleSheetsOperator from airflow.providers.ssh.operators.ssh import SSHOperator -from airflow.settings import Session +from airflow.settings import Session, json from airflow.utils.trigger_rule import TriggerRule DAG_ID = "example_sql_to_sheets" -ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default") -PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default") +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "example-project") REGION = "europe-west2" ZONE = REGION + "-a" NETWORK = "default" +CONNECTION_ID = f"pg_{DAG_ID}_{ENV_ID}".replace("-", "_") +CONNECTION_TYPE = "postgres" -SHEETS_CONNECTION_ID = f"connection_{DAG_ID}_{ENV_ID}" -SPREADSHEET = { - "properties": {"title": "Test1"}, - "sheets": [{"properties": {"title": "Sheet1"}}], -} +BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}" +FILE_NAME = "result.json" -DB_NAME = f"{DAG_ID}-{ENV_ID}-db".replace("-", "_") +DB_NAME = "testdb" DB_PORT = 5432 -DB_USER_NAME = "demo_user" +DB_USER_NAME = "root" DB_USER_PASSWORD = "demo_password" -DB_CONNECTION_ID = f"postgres_{DAG_ID}_{ENV_ID}".replace("-", "_") +SETUP_POSTGRES_COMMAND = f""" +sudo apt update && +sudo apt install -y docker.io && +sudo docker run -d -p {DB_PORT}:{DB_PORT} --name {DB_NAME} \ + -e PGUSER={DB_USER_NAME} \ + -e POSTGRES_USER={DB_USER_NAME} \ + -e POSTGRES_PASSWORD={DB_USER_PASSWORD} \ + -e POSTGRES_DB={DB_NAME} \ + postgres +""" +SQL_TABLE = "test_table" +SQL_CREATE = f"CREATE TABLE IF NOT EXISTS {SQL_TABLE} (col_1 INT, col_2 VARCHAR(8))" +SQL_INSERT = f"INSERT INTO {SQL_TABLE} (col_1, col_2) VALUES (1, 'one'), (2, 'two')" +SQL_SELECT = f"SELECT * FROM {SQL_TABLE}" -SHORT_MACHINE_TYPE_NAME = "n1-standard-1" -DB_INSTANCE_NAME = f"instance-{DAG_ID}-{ENV_ID}".replace("_", "-") +GCE_MACHINE_TYPE = "n1-standard-1" +GCE_INSTANCE_NAME = f"instance-{DAG_ID}-{ENV_ID}".replace("_", "-") GCE_INSTANCE_BODY = { - "name": DB_INSTANCE_NAME, - "machine_type": f"zones/{ZONE}/machineTypes/{SHORT_MACHINE_TYPE_NAME}", + "name": GCE_INSTANCE_NAME, + "machine_type": f"zones/{ZONE}/machineTypes/{GCE_MACHINE_TYPE}", "disks": [ { "boot": True, - "device_name": DB_INSTANCE_NAME, + "device_name": GCE_INSTANCE_NAME, "initialize_params": { "disk_size_gb": "10", "disk_type": f"zones/{ZONE}/diskTypes/pd-balanced", @@ -127,61 +104,60 @@ GCE_INSTANCE_BODY = { } ], } -DELETE_PERSISTENT_DISK = f""" +FIREWALL_RULE_NAME = f"allow-http-{DB_PORT}" +CREATE_FIREWALL_RULE_COMMAND = f""" if [ $AIRFLOW__API__GOOGLE_KEY_PATH ]; then \ gcloud auth activate-service-account --key-file=$AIRFLOW__API__GOOGLE_KEY_PATH; \ fi; -gcloud compute disks delete {DB_INSTANCE_NAME} --project={PROJECT_ID} --zone={ZONE} --quiet -""" -SETUP_POSTGRES = f""" -sudo apt update && -sudo apt install -y docker.io && -sudo docker run -d -p {DB_PORT}:{DB_PORT} --name {DB_NAME} \ - -e POSTGRES_USER={DB_USER_NAME} \ - -e POSTGRES_PASSWORD={DB_USER_PASSWORD} \ - -e POSTGRES_DB={DB_NAME} \ - postgres +if [ -z gcloud compute firewall-rules list --filter=name:{FIREWALL_RULE_NAME} --format="value(name)" ]; then \ + gcloud compute firewall-rules create {FIREWALL_RULE_NAME} \ + --project={PROJECT_ID} \ + --direction=INGRESS \ + --priority=100 \ + --network={NETWORK} \ + --action=ALLOW \ + --rules=tcp:{DB_PORT} \ + --source-ranges=0.0.0.0/0 +else + echo "Firewall rule {FIREWALL_RULE_NAME} already exists." +fi """ - -FIREWALL_RULE_NAME = f"allow-http-{DB_PORT}" -CREATE_FIREWALL_RULE = f""" +DELETE_FIREWALL_RULE_COMMAND = f""" if [ $AIRFLOW__API__GOOGLE_KEY_PATH ]; then \ gcloud auth activate-service-account --key-file=$AIRFLOW__API__GOOGLE_KEY_PATH; \ +fi; \ +if [ gcloud compute firewall-rules list --filter=name:{FIREWALL_RULE_NAME} --format="value(name)" ]; then \ + gcloud compute firewall-rules delete {FIREWALL_RULE_NAME} --project={PROJECT_ID} --quiet; \ fi; -gcloud compute firewall-rules create {FIREWALL_RULE_NAME} \ - --project={PROJECT_ID} \ - --direction=INGRESS \ - --priority=100 \ - --network={NETWORK} \ - --action=ALLOW \ - --rules=tcp:{DB_PORT} \ - --source-ranges=0.0.0.0/0 """ -DELETE_FIREWALL_RULE = f""" +DELETE_PERSISTENT_DISK_COMMAND = f""" if [ $AIRFLOW__API__GOOGLE_KEY_PATH ]; then \ gcloud auth activate-service-account --key-file=$AIRFLOW__API__GOOGLE_KEY_PATH; \ fi; -gcloud compute firewall-rules delete {FIREWALL_RULE_NAME} --project={PROJECT_ID} --quiet + +gcloud compute disks delete {GCE_INSTANCE_NAME} --project={PROJECT_ID} --zone={ZONE} --quiet """ -SQL_TABLE = "test_table" -SQL_CREATE = f"CREATE TABLE IF NOT EXISTS {SQL_TABLE} (col_1 INT, col_2 VARCHAR(8))" -SQL_INSERT = f"INSERT INTO {SQL_TABLE} (col_1, col_2) VALUES (1, 'one'), (2, 'two')" -SQL_SELECT = f"SELECT * FROM {SQL_TABLE}" +SHEETS_CONNECTION_ID = f"connection_{DAG_ID}_{ENV_ID}" +SPREADSHEET = { + "properties": {"title": "Test1"}, + "sheets": [{"properties": {"title": "Sheet1"}}], +} + log = logging.getLogger(__name__) with DAG( - DAG_ID, + dag_id=DAG_ID, + schedule="@once", start_date=datetime(2021, 1, 1), - schedule="@once", # Override to match your needs catchup=False, - tags=["example", "sql"], + tags=["example", "postgres", "gcs"], ) as dag: - create_instance = ComputeEngineInsertInstanceOperator( - task_id="create_instance", + create_gce_instance = ComputeEngineInsertInstanceOperator( + task_id="create_gce_instance", project_id=PROJECT_ID, zone=ZONE, body=GCE_INSTANCE_BODY, @@ -189,55 +165,67 @@ with DAG( create_firewall_rule = BashOperator( task_id="create_firewall_rule", - bash_command=CREATE_FIREWALL_RULE, + bash_command=CREATE_FIREWALL_RULE_COMMAND, ) setup_postgres = SSHOperator( task_id="setup_postgres", ssh_hook=ComputeEngineSSHHook( user="username", - instance_name=DB_INSTANCE_NAME, + instance_name=GCE_INSTANCE_NAME, zone=ZONE, project_id=PROJECT_ID, use_oslogin=False, use_iap_tunnel=False, cmd_timeout=180, ), - command=SETUP_POSTGRES, + command=SETUP_POSTGRES_COMMAND, retries=2, - retry_delay=30, ) @task def get_public_ip() -> str: hook = ComputeEngineHook() - address = hook.get_instance_address(resource_id=DB_INSTANCE_NAME, zone=ZONE, project_id=PROJECT_ID) + address = hook.get_instance_address(resource_id=GCE_INSTANCE_NAME, zone=ZONE, project_id=PROJECT_ID) return address get_public_ip_task = get_public_ip() @task - def setup_postgres_connection(**kwargs) -> None: - public_ip = kwargs["ti"].xcom_pull(task_ids="get_public_ip") + def setup_connection(ip_address: str) -> None: connection = Connection( - conn_id=DB_CONNECTION_ID, - description="Example PostgreSQL connection", - conn_type="postgres", - host=public_ip, + conn_id=CONNECTION_ID, + description="Example connection", + conn_type=CONNECTION_TYPE, + schema=DB_NAME, + host=ip_address, login=DB_USER_NAME, password=DB_USER_PASSWORD, - schema=DB_NAME, port=DB_PORT, ) session = Session() - if session.query(Connection).filter(Connection.conn_id == DB_CONNECTION_ID).first(): - log.warning("Connection %s already exists", DB_CONNECTION_ID) - return None + log.info("Removing connection %s if it exists", CONNECTION_ID) + query = session.query(Connection).filter(Connection.conn_id == CONNECTION_ID) + query.delete() session.add(connection) session.commit() + log.info("Connection %s created", CONNECTION_ID) - setup_postgres_connection_task = setup_postgres_connection() + setup_connection_task = setup_connection(get_public_ip_task) + + create_sql_table = SQLExecuteQueryOperator( + task_id="create_sql_table", + conn_id=CONNECTION_ID, + sql=SQL_CREATE, + retries=4, + ) + + insert_sql_data = SQLExecuteQueryOperator( + task_id="insert_sql_data", + conn_id=CONNECTION_ID, + sql=SQL_INSERT, + ) @task def setup_sheets_connection(): @@ -259,18 +247,6 @@ with DAG( setup_sheets_connection_task = setup_sheets_connection() - create_sql_table = SQLExecuteQueryOperator( - task_id="create_sql_table", - conn_id=DB_CONNECTION_ID, - sql=SQL_CREATE, - ) - - insert_data = SQLExecuteQueryOperator( - task_id="insert_data", - conn_id=DB_CONNECTION_ID, - sql=SQL_INSERT, - ) - create_spreadsheet = GoogleSheetsCreateSpreadsheetOperator( task_id="create_spreadsheet", spreadsheet=SPREADSHEET, gcp_conn_id=SHEETS_CONNECTION_ID ) @@ -279,7 +255,7 @@ with DAG( upload_sql_to_sheet = SQLToGoogleSheetsOperator( task_id="upload_sql_to_sheet", sql=SQL_SELECT, - sql_conn_id=DB_CONNECTION_ID, + sql_conn_id=CONNECTION_ID, database=DB_NAME, spreadsheet_id="{{ task_instance.xcom_pull(task_ids='create_spreadsheet', " "key='spreadsheet_id') }}", @@ -287,54 +263,57 @@ with DAG( ) # [END upload_sql_to_sheets] - delete_postgres_connection = BashOperator( - task_id="delete_postgres_connection", - bash_command=f"airflow connections delete {DB_CONNECTION_ID}", - trigger_rule=TriggerRule.ALL_DONE, - ) - - delete_sheets_connection = BashOperator( - task_id="delete_temp_sheets_connection", - bash_command=f"airflow connections delete {SHEETS_CONNECTION_ID}", + delete_firewall_rule = BashOperator( + task_id="delete_firewall_rule", + bash_command=DELETE_FIREWALL_RULE_COMMAND, trigger_rule=TriggerRule.ALL_DONE, ) - delete_instance = ComputeEngineDeleteInstanceOperator( - task_id="delete_instance", - resource_id=DB_INSTANCE_NAME, + delete_gce_instance = ComputeEngineDeleteInstanceOperator( + task_id="delete_gce_instance", + resource_id=GCE_INSTANCE_NAME, zone=ZONE, project_id=PROJECT_ID, trigger_rule=TriggerRule.ALL_DONE, ) - delete_firewall_rule = BashOperator( - task_id="delete_firewall_rule", - bash_command=DELETE_FIREWALL_RULE, + delete_persistent_disk = BashOperator( + task_id="delete_persistent_disk", + bash_command=DELETE_PERSISTENT_DISK_COMMAND, trigger_rule=TriggerRule.ALL_DONE, ) - delete_persistent_disk = BashOperator( - task_id="delete_persistent_disk", - bash_command=DELETE_PERSISTENT_DISK, + delete_connection = BashOperator( + task_id="delete_connection", + bash_command=f"airflow connections delete {CONNECTION_ID}", trigger_rule=TriggerRule.ALL_DONE, ) - # TEST SETUP - create_instance >> setup_postgres - (create_instance >> get_public_ip_task >> setup_postgres_connection_task) - ( - [setup_postgres, setup_postgres_connection_task, create_firewall_rule] - >> create_sql_table - >> insert_data + delete_sheets_connection = BashOperator( + task_id="delete_sheets_connection", + bash_command=f"airflow connections delete {SHEETS_CONNECTION_ID}", + trigger_rule=TriggerRule.ALL_DONE, ) + # TEST SETUP + create_gce_instance >> setup_postgres + create_gce_instance >> get_public_ip_task >> setup_connection_task + [setup_postgres, setup_connection_task, create_firewall_rule] >> create_sql_table >> insert_sql_data + setup_sheets_connection_task >> create_spreadsheet + ( - [insert_data, setup_sheets_connection_task >> create_spreadsheet] + [create_spreadsheet, insert_sql_data] # TEST BODY >> upload_sql_to_sheet - # TEST TEARDOWN - >> [delete_instance, delete_postgres_connection, delete_sheets_connection, delete_firewall_rule] ) - delete_instance >> delete_persistent_disk + + # TEST TEARDOWN + upload_sql_to_sheet >> [ + delete_firewall_rule, + delete_gce_instance, + delete_connection, + delete_sheets_connection, + ] + delete_gce_instance >> delete_persistent_disk from tests.system.utils.watcher import watcher diff --git a/tests/system/providers/google/cloud/transfers/example_postgres_to_gcs.py b/tests/system/providers/google/cloud/transfers/example_postgres_to_gcs.py index c3994f1462..605a67dfa4 100644 --- a/tests/system/providers/google/cloud/transfers/example_postgres_to_gcs.py +++ b/tests/system/providers/google/cloud/transfers/example_postgres_to_gcs.py @@ -1,4 +1,3 @@ -# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -15,8 +14,9 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. + """ -Example DAG using PostgresToGCSOperator. +Example DAG using PostgresSQLToGCSOperator. This DAG relies on the following OS environment variables @@ -31,7 +31,7 @@ import os from datetime import datetime from airflow.decorators import task -from airflow.models.connection import Connection +from airflow.models import Connection from airflow.models.dag import DAG from airflow.operators.bash import BashOperator from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator @@ -57,21 +57,40 @@ PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "example-project") REGION = "europe-west2" ZONE = REGION + "-a" NETWORK = "default" +CONNECTION_ID = f"pg_{DAG_ID}_{ENV_ID}".replace("-", "_") +CONNECTION_TYPE = "postgres" + +BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}" +FILE_NAME = "result.json" DB_NAME = "testdb" DB_PORT = 5432 -DB_USER_NAME = "demo_user" +DB_USER_NAME = "root" DB_USER_PASSWORD = "demo_password" +SETUP_POSTGRES_COMMAND = f""" +sudo apt update && +sudo apt install -y docker.io && +sudo docker run -d -p {DB_PORT}:{DB_PORT} --name {DB_NAME} \ + -e PGUSER={DB_USER_NAME} \ + -e POSTGRES_USER={DB_USER_NAME} \ + -e POSTGRES_PASSWORD={DB_USER_PASSWORD} \ + -e POSTGRES_DB={DB_NAME} \ + postgres +""" +SQL_TABLE = "test_table" +SQL_CREATE = f"CREATE TABLE IF NOT EXISTS {SQL_TABLE} (col_1 INT, col_2 VARCHAR(8))" +SQL_INSERT = f"INSERT INTO {SQL_TABLE} (col_1, col_2) VALUES (1, 'one'), (2, 'two')" +SQL_SELECT = f"SELECT * FROM {SQL_TABLE}" -SHORT_MACHINE_TYPE_NAME = "n1-standard-1" -DB_INSTANCE_NAME = f"instance-{DAG_ID}-{ENV_ID}".replace("_", "-") +GCE_MACHINE_TYPE = "n1-standard-1" +GCE_INSTANCE_NAME = f"instance-{DAG_ID}-{ENV_ID}".replace("_", "-") GCE_INSTANCE_BODY = { - "name": DB_INSTANCE_NAME, - "machine_type": f"zones/{ZONE}/machineTypes/{SHORT_MACHINE_TYPE_NAME}", + "name": GCE_INSTANCE_NAME, + "machine_type": f"zones/{ZONE}/machineTypes/{GCE_MACHINE_TYPE}", "disks": [ { "boot": True, - "device_name": DB_INSTANCE_NAME, + "device_name": GCE_INSTANCE_NAME, "initialize_params": { "disk_size_gb": "10", "disk_type": f"zones/{ZONE}/diskTypes/pd-balanced", @@ -87,57 +106,41 @@ GCE_INSTANCE_BODY = { } ], } -DELETE_PERSISTENT_DISK = f""" +FIREWALL_RULE_NAME = f"allow-http-{DB_PORT}" +CREATE_FIREWALL_RULE_COMMAND = f""" if [ $AIRFLOW__API__GOOGLE_KEY_PATH ]; then \ gcloud auth activate-service-account --key-file=$AIRFLOW__API__GOOGLE_KEY_PATH; \ fi; -gcloud compute disks delete {DB_INSTANCE_NAME} --project={PROJECT_ID} --zone={ZONE} --quiet -""" - -SETUP_POSTGRES = f""" -sudo apt update && -sudo apt install -y docker.io && -sudo docker run -d -p {DB_PORT}:{DB_PORT} --name {DB_NAME} \ - -e POSTGRES_USER={DB_USER_NAME} \ - -e POSTGRES_PASSWORD={DB_USER_PASSWORD} \ - -e POSTGRES_DB={DB_NAME} \ - postgres +if [ -z gcloud compute firewall-rules list --filter=name:{FIREWALL_RULE_NAME} --format="value(name)" ]; then \ + gcloud compute firewall-rules create {FIREWALL_RULE_NAME} \ + --project={PROJECT_ID} \ + --direction=INGRESS \ + --priority=100 \ + --network={NETWORK} \ + --action=ALLOW \ + --rules=tcp:{DB_PORT} \ + --source-ranges=0.0.0.0/0 +else + echo "Firewall rule {FIREWALL_RULE_NAME} already exists." +fi """ - -FIREWALL_RULE_NAME = f"allow-http-{DB_PORT}" -CREATE_FIREWALL_RULE = f""" +DELETE_FIREWALL_RULE_COMMAND = f""" if [ $AIRFLOW__API__GOOGLE_KEY_PATH ]; then \ gcloud auth activate-service-account --key-file=$AIRFLOW__API__GOOGLE_KEY_PATH; \ +fi; \ +if [ gcloud compute firewall-rules list --filter=name:{FIREWALL_RULE_NAME} --format="value(name)" ]; then \ + gcloud compute firewall-rules delete {FIREWALL_RULE_NAME} --project={PROJECT_ID} --quiet; \ fi; - -gcloud compute firewall-rules create {FIREWALL_RULE_NAME} \ - --project={PROJECT_ID} \ - --direction=INGRESS \ - --priority=100 \ - --network={NETWORK} \ - --action=ALLOW \ - --rules=tcp:{DB_PORT} \ - --source-ranges=0.0.0.0/0 """ -DELETE_FIREWALL_RULE = f""" +DELETE_PERSISTENT_DISK_COMMAND = f""" if [ $AIRFLOW__API__GOOGLE_KEY_PATH ]; then \ gcloud auth activate-service-account --key-file=$AIRFLOW__API__GOOGLE_KEY_PATH; \ fi; -gcloud compute firewall-rules delete {FIREWALL_RULE_NAME} --project={PROJECT_ID} --quiet +gcloud compute disks delete {GCE_INSTANCE_NAME} --project={PROJECT_ID} --zone={ZONE} --quiet """ -CONNECTION_ID = f"postgres_{DAG_ID}_{ENV_ID}".replace("-", "_") - -BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}" -FILE_NAME = "result.json" - -SQL_TABLE = "test_table" -SQL_CREATE = f"CREATE TABLE IF NOT EXISTS {SQL_TABLE} (col_1 INT, col_2 VARCHAR(8))" -SQL_INSERT = f"INSERT INTO {SQL_TABLE} (col_1, col_2) VALUES (1, 'one'), (2, 'two')" -SQL_SELECT = f"SELECT * FROM {SQL_TABLE}" - log = logging.getLogger(__name__) @@ -149,8 +152,8 @@ with DAG( catchup=False, tags=["example", "postgres", "gcs"], ) as dag: - create_instance = ComputeEngineInsertInstanceOperator( - task_id="create_instance", + create_gce_instance = ComputeEngineInsertInstanceOperator( + task_id="create_gce_instance", project_id=PROJECT_ID, zone=ZONE, body=GCE_INSTANCE_BODY, @@ -158,132 +161,130 @@ with DAG( create_firewall_rule = BashOperator( task_id="create_firewall_rule", - bash_command=CREATE_FIREWALL_RULE, + bash_command=CREATE_FIREWALL_RULE_COMMAND, ) setup_postgres = SSHOperator( task_id="setup_postgres", ssh_hook=ComputeEngineSSHHook( user="username", - instance_name=DB_INSTANCE_NAME, + instance_name=GCE_INSTANCE_NAME, zone=ZONE, project_id=PROJECT_ID, use_oslogin=False, use_iap_tunnel=False, cmd_timeout=180, ), - command=SETUP_POSTGRES, + command=SETUP_POSTGRES_COMMAND, retries=2, - retry_delay=30, ) @task def get_public_ip() -> str: hook = ComputeEngineHook() - address = hook.get_instance_address(resource_id=DB_INSTANCE_NAME, zone=ZONE, project_id=PROJECT_ID) + address = hook.get_instance_address(resource_id=GCE_INSTANCE_NAME, zone=ZONE, project_id=PROJECT_ID) return address get_public_ip_task = get_public_ip() @task - def setup_postgres_connection(**kwargs) -> None: - public_ip = kwargs["ti"].xcom_pull(task_ids="get_public_ip") + def setup_connection(ip_address: str) -> None: connection = Connection( conn_id=CONNECTION_ID, - description="Example PostgreSQL connection", - conn_type="postgres", - host=public_ip, + description="Example connection", + conn_type=CONNECTION_TYPE, + schema=DB_NAME, + host=ip_address, login=DB_USER_NAME, password=DB_USER_PASSWORD, - schema=DB_NAME, port=DB_PORT, ) session = Session() - if session.query(Connection).filter(Connection.conn_id == CONNECTION_ID).first(): - log.warning("Connection %s already exists", CONNECTION_ID) - return None + log.info("Removing connection %s if it exists", CONNECTION_ID) + query = session.query(Connection).filter(Connection.conn_id == CONNECTION_ID) + query.delete() session.add(connection) session.commit() + log.info("Connection %s created", CONNECTION_ID) - setup_postgres_connection_task = setup_postgres_connection() - - create_bucket = GCSCreateBucketOperator( - task_id="create_bucket", - bucket_name=BUCKET_NAME, - ) + setup_connection_task = setup_connection(get_public_ip_task) create_sql_table = SQLExecuteQueryOperator( task_id="create_sql_table", conn_id=CONNECTION_ID, sql=SQL_CREATE, + retries=4, ) - insert_data = SQLExecuteQueryOperator( - task_id="insert_data", + insert_sql_data = SQLExecuteQueryOperator( + task_id="insert_sql_data", conn_id=CONNECTION_ID, sql=SQL_INSERT, ) + create_gcs_bucket = GCSCreateBucketOperator( + task_id="create_gcs_bucket", + bucket_name=BUCKET_NAME, + ) + # [START howto_operator_postgres_to_gcs] - get_data = PostgresToGCSOperator( - task_id="get_data", + postgres_to_gcs = PostgresToGCSOperator( + task_id="postgres_to_gcs", postgres_conn_id=CONNECTION_ID, sql=SQL_SELECT, bucket=BUCKET_NAME, filename=FILE_NAME, - gzip=False, + export_format="csv", ) # [END howto_operator_postgres_to_gcs] - delete_postgres_connection = BashOperator( - task_id="delete_postgres_connection", - bash_command=f"airflow connections delete {CONNECTION_ID}", + delete_gcs_bucket = GCSDeleteBucketOperator( + task_id="delete_gcs_bucket", + bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE, ) - delete_bucket = GCSDeleteBucketOperator( - task_id="delete_bucket", - bucket_name=BUCKET_NAME, + delete_firewall_rule = BashOperator( + task_id="delete_firewall_rule", + bash_command=DELETE_FIREWALL_RULE_COMMAND, trigger_rule=TriggerRule.ALL_DONE, ) - delete_instance = ComputeEngineDeleteInstanceOperator( - task_id="delete_instance", - resource_id=DB_INSTANCE_NAME, + delete_gce_instance = ComputeEngineDeleteInstanceOperator( + task_id="delete_gce_instance", + resource_id=GCE_INSTANCE_NAME, zone=ZONE, project_id=PROJECT_ID, trigger_rule=TriggerRule.ALL_DONE, ) - delete_firewall_rule = BashOperator( - task_id="delete_firewall_rule", - bash_command=DELETE_FIREWALL_RULE, + delete_persistent_disk = BashOperator( + task_id="delete_persistent_disk", + bash_command=DELETE_PERSISTENT_DISK_COMMAND, trigger_rule=TriggerRule.ALL_DONE, ) - delete_persistent_disk = BashOperator( - task_id="delete_persistent_disk", - bash_command=DELETE_PERSISTENT_DISK, + delete_connection = BashOperator( + task_id="delete_connection", + bash_command=f"airflow connections delete {CONNECTION_ID}", trigger_rule=TriggerRule.ALL_DONE, ) # TEST SETUP - create_instance >> setup_postgres - (create_instance >> get_public_ip_task >> setup_postgres_connection_task) - ( - [setup_postgres, setup_postgres_connection_task, create_firewall_rule] - >> create_sql_table - >> insert_data - ) + create_gce_instance >> setup_postgres + create_gce_instance >> get_public_ip_task >> setup_connection_task + [setup_postgres, setup_connection_task, create_firewall_rule] >> create_sql_table >> insert_sql_data + ( - [insert_data, create_bucket] + [create_gcs_bucket, insert_sql_data] # TEST BODY - >> get_data - # TEST TEARDOWN - >> [delete_instance, delete_bucket, delete_postgres_connection, delete_firewall_rule] + >> postgres_to_gcs ) - delete_instance >> delete_persistent_disk + + # TEST TEARDOWN + postgres_to_gcs >> [delete_gcs_bucket, delete_firewall_rule, delete_gce_instance, delete_connection] + delete_gce_instance >> delete_persistent_disk from tests.system.utils.watcher import watcher @@ -291,7 +292,6 @@ with DAG( # when "tearDown" task with trigger rule is part of the DAG list(dag.tasks) >> watcher() - from tests.system.utils import get_test_run # noqa: E402 # Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)