This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e3e6776576 Fix system tests (#39193)
e3e6776576 is described below

commit e3e67765765b8c0404aa926629e826d9600d437e
Author: max <42827971+moiseen...@users.noreply.github.com>
AuthorDate: Tue Apr 23 09:59:18 2024 +0000

    Fix system tests (#39193)
---
 .../google/cloud/gcs/example_mysql_to_gcs.py       | 194 ++++++++-------
 .../cloud/sql_to_sheets/example_sql_to_sheets.py   | 277 ++++++++++-----------
 .../cloud/transfers/example_postgres_to_gcs.py     | 200 +++++++--------
 3 files changed, 327 insertions(+), 344 deletions(-)

diff --git a/tests/system/providers/google/cloud/gcs/example_mysql_to_gcs.py 
b/tests/system/providers/google/cloud/gcs/example_mysql_to_gcs.py
index 5b96b49c1d..884149d984 100644
--- a/tests/system/providers/google/cloud/gcs/example_mysql_to_gcs.py
+++ b/tests/system/providers/google/cloud/gcs/example_mysql_to_gcs.py
@@ -62,21 +62,38 @@ PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", 
"example-project")
 REGION = "europe-west2"
 ZONE = REGION + "-a"
 NETWORK = "default"
+CONNECTION_ID = f"mysql_{DAG_ID}_{ENV_ID}".replace("-", "_")
+CONNECTION_TYPE = "mysql"
+
+BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
+FILE_NAME = "result.json"
 
 DB_NAME = "testdb"
 DB_PORT = 3306
 DB_USER_NAME = "root"
 DB_USER_PASSWORD = "demo_password"
+SETUP_MYSQL_COMMAND = f"""
+sudo apt update &&
+sudo apt install -y docker.io &&
+sudo docker run -d -p {DB_PORT}:{DB_PORT} --name {DB_NAME} \
+    -e MYSQL_ROOT_PASSWORD={DB_USER_PASSWORD} \
+    -e MYSQL_DATABASE={DB_NAME} \
+    mysql:8.1.0
+"""
+SQL_TABLE = "test_table"
+SQL_CREATE = f"CREATE TABLE IF NOT EXISTS {DB_NAME}.{SQL_TABLE} (col_1 INT, 
col_2 VARCHAR(8))"
+SQL_INSERT = f"INSERT INTO {DB_NAME}.{SQL_TABLE} (col_1, col_2) VALUES (1, 
'one'), (2, 'two')"
+SQL_SELECT = f"SELECT * FROM {DB_NAME}.{SQL_TABLE}"
 
-SHORT_MACHINE_TYPE_NAME = "n1-standard-1"
-DB_INSTANCE_NAME = f"instance-{DAG_ID}-{ENV_ID}".replace("_", "-")
+GCE_MACHINE_TYPE = "n1-standard-1"
+GCE_INSTANCE_NAME = f"instance-{DAG_ID}-{ENV_ID}".replace("_", "-")
 GCE_INSTANCE_BODY = {
-    "name": DB_INSTANCE_NAME,
-    "machine_type": f"zones/{ZONE}/machineTypes/{SHORT_MACHINE_TYPE_NAME}",
+    "name": GCE_INSTANCE_NAME,
+    "machine_type": f"zones/{ZONE}/machineTypes/{GCE_MACHINE_TYPE}",
     "disks": [
         {
             "boot": True,
-            "device_name": DB_INSTANCE_NAME,
+            "device_name": GCE_INSTANCE_NAME,
             "initialize_params": {
                 "disk_size_gb": "10",
                 "disk_type": f"zones/{ZONE}/diskTypes/pd-balanced",
@@ -92,56 +109,41 @@ GCE_INSTANCE_BODY = {
         }
     ],
 }
-DELETE_PERSISTENT_DISK = f"""
+FIREWALL_RULE_NAME = f"allow-http-{DB_PORT}"
+CREATE_FIREWALL_RULE_COMMAND = f"""
 if [ $AIRFLOW__API__GOOGLE_KEY_PATH ]; then \
  gcloud auth activate-service-account 
--key-file=$AIRFLOW__API__GOOGLE_KEY_PATH; \
 fi;
 
-gcloud compute disks delete {DB_INSTANCE_NAME} --project={PROJECT_ID} 
--zone={ZONE} --quiet
+if [ -z gcloud compute firewall-rules list --filter=name:{FIREWALL_RULE_NAME} 
--format="value(name)" ]; then \
+    gcloud compute firewall-rules create {FIREWALL_RULE_NAME} \
+      --project={PROJECT_ID} \
+      --direction=INGRESS \
+      --priority=100 \
+      --network={NETWORK} \
+      --action=ALLOW \
+      --rules=tcp:{DB_PORT} \
+      --source-ranges=0.0.0.0/0
+else
+    echo "Firewall rule {FIREWALL_RULE_NAME} already exists."
+fi
 """
-
-SETUP_MYSQL = f"""
-sudo apt update &&
-sudo apt install -y docker.io &&
-sudo docker run -d -p {DB_PORT}:{DB_PORT} --name {DB_NAME} \
-    -e MYSQL_ROOT_PASSWORD={DB_USER_PASSWORD} \
-    -e MYSQL_DATABASE={DB_NAME} \
-    mysql:8.1.0
-"""
-
-FIREWALL_RULE_NAME = f"allow-http-{DB_PORT}"
-CREATE_FIREWALL_RULE = f"""
+DELETE_FIREWALL_RULE_COMMAND = f"""
 if [ $AIRFLOW__API__GOOGLE_KEY_PATH ]; then \
  gcloud auth activate-service-account 
--key-file=$AIRFLOW__API__GOOGLE_KEY_PATH; \
+fi; \
+if [ gcloud compute firewall-rules list --filter=name:{FIREWALL_RULE_NAME} 
--format="value(name)" ]; then \
+    gcloud compute firewall-rules delete {FIREWALL_RULE_NAME} 
--project={PROJECT_ID} --quiet; \
 fi;
-
-gcloud compute firewall-rules create {FIREWALL_RULE_NAME} \
-  --project={PROJECT_ID} \
-  --direction=INGRESS \
-  --priority=100 \
-  --network={NETWORK} \
-  --action=ALLOW \
-  --rules=tcp:{DB_PORT} \
-  --source-ranges=0.0.0.0/0
 """
-DELETE_FIREWALL_RULE = f"""
+DELETE_PERSISTENT_DISK_COMMAND = f"""
 if [ $AIRFLOW__API__GOOGLE_KEY_PATH ]; then \
  gcloud auth activate-service-account 
--key-file=$AIRFLOW__API__GOOGLE_KEY_PATH; \
 fi;
 
-gcloud compute firewall-rules delete {FIREWALL_RULE_NAME} 
--project={PROJECT_ID} --quiet
+gcloud compute disks delete {GCE_INSTANCE_NAME} --project={PROJECT_ID} 
--zone={ZONE} --quiet
 """
 
-CONNECTION_ID = f"mysql_{DAG_ID}_{ENV_ID}".replace("-", "_")
-
-BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
-FILE_NAME = "result.json"
-
-SQL_TABLE = "test_table"
-SQL_CREATE = f"CREATE TABLE IF NOT EXISTS {SQL_TABLE} (col_1 INT, col_2 
VARCHAR(8))"
-SQL_INSERT = f"INSERT INTO {SQL_TABLE} (col_1, col_2) VALUES (1, 'one'), (2, 
'two')"
-SQL_SELECT = f"SELECT * FROM {SQL_TABLE}"
-
 
 log = logging.getLogger(__name__)
 
@@ -153,8 +155,8 @@ with DAG(
     catchup=False,
     tags=["example", "mysql", "gcs"],
 ) as dag:
-    create_instance = ComputeEngineInsertInstanceOperator(
-        task_id="create_instance",
+    create_gce_instance = ComputeEngineInsertInstanceOperator(
+        task_id="create_gce_instance",
         project_id=PROJECT_ID,
         zone=ZONE,
         body=GCE_INSTANCE_BODY,
@@ -162,127 +164,129 @@ with DAG(
 
     create_firewall_rule = BashOperator(
         task_id="create_firewall_rule",
-        bash_command=CREATE_FIREWALL_RULE,
+        bash_command=CREATE_FIREWALL_RULE_COMMAND,
     )
 
     setup_mysql = SSHOperator(
         task_id="setup_mysql",
         ssh_hook=ComputeEngineSSHHook(
             user="username",
-            instance_name=DB_INSTANCE_NAME,
+            instance_name=GCE_INSTANCE_NAME,
             zone=ZONE,
             project_id=PROJECT_ID,
             use_oslogin=False,
             use_iap_tunnel=False,
             cmd_timeout=180,
         ),
-        command=SETUP_MYSQL,
+        command=SETUP_MYSQL_COMMAND,
         retries=2,
     )
 
     @task
     def get_public_ip() -> str:
         hook = ComputeEngineHook()
-        address = hook.get_instance_address(resource_id=DB_INSTANCE_NAME, 
zone=ZONE, project_id=PROJECT_ID)
+        address = hook.get_instance_address(resource_id=GCE_INSTANCE_NAME, 
zone=ZONE, project_id=PROJECT_ID)
         return address
 
     get_public_ip_task = get_public_ip()
 
     @task
-    def setup_mysql_connection(**kwargs) -> None:
-        public_ip = kwargs["ti"].xcom_pull(task_ids="get_public_ip")
+    def setup_connection(ip_address: str) -> None:
         connection = Connection(
             conn_id=CONNECTION_ID,
-            description="Example MySQL connection",
-            conn_type="mysql",
-            host=public_ip,
+            description="Example connection",
+            conn_type=CONNECTION_TYPE,
+            host=ip_address,
             login=DB_USER_NAME,
             password=DB_USER_PASSWORD,
-            schema=DB_NAME,
+            port=DB_PORT,
         )
         session = Session()
-        if session.query(Connection).filter(Connection.conn_id == 
CONNECTION_ID).first():
-            log.warning("Connection %s already exists", CONNECTION_ID)
-            return None
+        log.info("Removing connection %s if it exists", CONNECTION_ID)
+        query = session.query(Connection).filter(Connection.conn_id == 
CONNECTION_ID)
+        query.delete()
 
         session.add(connection)
         session.commit()
+        log.info("Connection %s created", CONNECTION_ID)
 
-    setup_mysql_connection_task = setup_mysql_connection()
-
-    create_bucket = GCSCreateBucketOperator(
-        task_id="create_bucket",
-        bucket_name=BUCKET_NAME,
-    )
+    setup_connection_task = setup_connection(get_public_ip_task)
 
     create_sql_table = SQLExecuteQueryOperator(
         task_id="create_sql_table",
         conn_id=CONNECTION_ID,
         sql=SQL_CREATE,
+        retries=4,
     )
 
-    insert_data = SQLExecuteQueryOperator(
-        task_id="insert_data",
+    insert_sql_data = SQLExecuteQueryOperator(
+        task_id="insert_sql_data",
         conn_id=CONNECTION_ID,
         sql=SQL_INSERT,
     )
 
+    create_gcs_bucket = GCSCreateBucketOperator(
+        task_id="create_gcs_bucket",
+        bucket_name=BUCKET_NAME,
+    )
+
     # [START howto_operator_mysql_to_gcs]
-    upload_mysql_to_gcs = MySQLToGCSOperator(
-        task_id="mysql_to_gcs", sql=SQL_SELECT, bucket=BUCKET_NAME, 
filename=FILE_NAME, export_format="csv"
+    mysql_to_gcs = MySQLToGCSOperator(
+        task_id="mysql_to_gcs",
+        mysql_conn_id=CONNECTION_ID,
+        sql=SQL_SELECT,
+        bucket=BUCKET_NAME,
+        filename=FILE_NAME,
+        export_format="csv",
     )
     # [END howto_operator_mysql_to_gcs]
 
-    delete_mysql_connection = BashOperator(
-        task_id="delete_mysql_connection",
-        bash_command=f"airflow connections delete {CONNECTION_ID}",
+    delete_gcs_bucket = GCSDeleteBucketOperator(
+        task_id="delete_gcs_bucket",
+        bucket_name=BUCKET_NAME,
         trigger_rule=TriggerRule.ALL_DONE,
     )
 
-    delete_bucket = GCSDeleteBucketOperator(
-        task_id="delete_bucket",
-        bucket_name=BUCKET_NAME,
+    delete_firewall_rule = BashOperator(
+        task_id="delete_firewall_rule",
+        bash_command=DELETE_FIREWALL_RULE_COMMAND,
         trigger_rule=TriggerRule.ALL_DONE,
     )
 
-    delete_instance = ComputeEngineDeleteInstanceOperator(
-        task_id="delete_instance",
-        resource_id=DB_INSTANCE_NAME,
+    delete_gce_instance = ComputeEngineDeleteInstanceOperator(
+        task_id="delete_gce_instance",
+        resource_id=GCE_INSTANCE_NAME,
         zone=ZONE,
         project_id=PROJECT_ID,
         trigger_rule=TriggerRule.ALL_DONE,
     )
 
-    delete_firewall_rule = BashOperator(
-        task_id="delete_firewall_rule",
-        bash_command=DELETE_FIREWALL_RULE,
-        trigger_rule=TriggerRule.ALL_DONE,
-    )
-
     delete_persistent_disk = BashOperator(
         task_id="delete_persistent_disk",
-        bash_command=DELETE_PERSISTENT_DISK,
+        bash_command=DELETE_PERSISTENT_DISK_COMMAND,
         trigger_rule=TriggerRule.ALL_DONE,
     )
 
-    (
-        # TEST SETUP
-        create_instance
-        >> setup_mysql
-        >> get_public_ip_task
-        >> setup_mysql_connection_task
-        >> create_firewall_rule
-        >> create_sql_table
-        >> insert_data
+    delete_connection = BashOperator(
+        task_id="delete_connection",
+        bash_command=f"airflow connections delete {CONNECTION_ID}",
+        trigger_rule=TriggerRule.ALL_DONE,
     )
+
+    # TEST SETUP
+    create_gce_instance >> setup_mysql
+    create_gce_instance >> get_public_ip_task >> setup_connection_task
+    [setup_mysql, setup_connection_task, create_firewall_rule] >> 
create_sql_table >> insert_sql_data
+
     (
-        [insert_data, create_bucket]
+        [create_gcs_bucket, insert_sql_data]
         # TEST BODY
-        >> upload_mysql_to_gcs
-        # TEST TEARDOWN
-        >> [delete_instance, delete_bucket, delete_mysql_connection, 
delete_firewall_rule]
+        >> mysql_to_gcs
     )
-    delete_instance >> delete_persistent_disk
+
+    # TEST TEARDOWN
+    mysql_to_gcs >> [delete_gcs_bucket, delete_firewall_rule, 
delete_gce_instance, delete_connection]
+    delete_gce_instance >> delete_persistent_disk
 
     from tests.system.utils.watcher import watcher
 
diff --git 
a/tests/system/providers/google/cloud/sql_to_sheets/example_sql_to_sheets.py 
b/tests/system/providers/google/cloud/sql_to_sheets/example_sql_to_sheets.py
index 314f388c52..811526a82a 100644
--- a/tests/system/providers/google/cloud/sql_to_sheets/example_sql_to_sheets.py
+++ b/tests/system/providers/google/cloud/sql_to_sheets/example_sql_to_sheets.py
@@ -1,4 +1,3 @@
-#
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -17,51 +16,17 @@
 # under the License.
 
 """
-First, you need a db instance that is accessible from the Airflow environment.
-You can, for example, create a Cloud SQL instance and connect to it from
-within breeze with Cloud SQL proxy:
-https://cloud.google.com/sql/docs/postgres/connect-instance-auth-proxy
-
-# DB setup
-Create db:
-```
-CREATE DATABASE test_db;
-```
-
-Switch to db:
-```
-\c test_db
-```
-
-Create table and insert some rows
-```
-CREATE TABLE test_table (col1 INT, col2 INT);
-INSERT INTO test_table VALUES (1,2), (3,4), (5,6), (7,8);
-```
-
-# Setup connections
-db connection:
-In airflow UI, set one db connection, for example "postgres_default"
-and make sure the "Test" at the bottom succeeds
-
-google cloud connection:
-We need additional scopes for this test
-scopes: https://www.googleapis.com/auth/spreadsheets, 
https://www.googleapis.com/auth/cloud-platform
-
-# Sheet
-Finally, you need a Google Sheet you have access to, for testing you can
-create a public sheet and get its ID.
-
-# Tear Down
-You can delete the db with
-```
-DROP DATABASE test_db;
-```
+Example DAG using SQLToGoogleSheetsOperator.
+
+This DAG relies on the following OS environment variables
+
+* AIRFLOW__API__GOOGLE_KEY_PATH - Path to service account key file. Note, you 
can skip this variable if you
+  run this DAG in a Composer environment.
+
 """
 
 from __future__ import annotations
 
-import json
 import logging
 import os
 from datetime import datetime
@@ -80,38 +45,50 @@ from airflow.providers.google.cloud.operators.compute 
import (
 from airflow.providers.google.suite.operators.sheets import 
GoogleSheetsCreateSpreadsheetOperator
 from airflow.providers.google.suite.transfers.sql_to_sheets import 
SQLToGoogleSheetsOperator
 from airflow.providers.ssh.operators.ssh import SSHOperator
-from airflow.settings import Session
+from airflow.settings import Session, json
 from airflow.utils.trigger_rule import TriggerRule
 
 DAG_ID = "example_sql_to_sheets"
-ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
-PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "example-project")
 
 REGION = "europe-west2"
 ZONE = REGION + "-a"
 NETWORK = "default"
+CONNECTION_ID = f"pg_{DAG_ID}_{ENV_ID}".replace("-", "_")
+CONNECTION_TYPE = "postgres"
 
-SHEETS_CONNECTION_ID = f"connection_{DAG_ID}_{ENV_ID}"
-SPREADSHEET = {
-    "properties": {"title": "Test1"},
-    "sheets": [{"properties": {"title": "Sheet1"}}],
-}
+BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
+FILE_NAME = "result.json"
 
-DB_NAME = f"{DAG_ID}-{ENV_ID}-db".replace("-", "_")
+DB_NAME = "testdb"
 DB_PORT = 5432
-DB_USER_NAME = "demo_user"
+DB_USER_NAME = "root"
 DB_USER_PASSWORD = "demo_password"
-DB_CONNECTION_ID = f"postgres_{DAG_ID}_{ENV_ID}".replace("-", "_")
+SETUP_POSTGRES_COMMAND = f"""
+sudo apt update &&
+sudo apt install -y docker.io &&
+sudo docker run -d -p {DB_PORT}:{DB_PORT} --name {DB_NAME} \
+    -e PGUSER={DB_USER_NAME} \
+    -e POSTGRES_USER={DB_USER_NAME} \
+    -e POSTGRES_PASSWORD={DB_USER_PASSWORD} \
+    -e POSTGRES_DB={DB_NAME} \
+    postgres
+"""
+SQL_TABLE = "test_table"
+SQL_CREATE = f"CREATE TABLE IF NOT EXISTS {SQL_TABLE} (col_1 INT, col_2 
VARCHAR(8))"
+SQL_INSERT = f"INSERT INTO {SQL_TABLE} (col_1, col_2) VALUES (1, 'one'), (2, 
'two')"
+SQL_SELECT = f"SELECT * FROM {SQL_TABLE}"
 
-SHORT_MACHINE_TYPE_NAME = "n1-standard-1"
-DB_INSTANCE_NAME = f"instance-{DAG_ID}-{ENV_ID}".replace("_", "-")
+GCE_MACHINE_TYPE = "n1-standard-1"
+GCE_INSTANCE_NAME = f"instance-{DAG_ID}-{ENV_ID}".replace("_", "-")
 GCE_INSTANCE_BODY = {
-    "name": DB_INSTANCE_NAME,
-    "machine_type": f"zones/{ZONE}/machineTypes/{SHORT_MACHINE_TYPE_NAME}",
+    "name": GCE_INSTANCE_NAME,
+    "machine_type": f"zones/{ZONE}/machineTypes/{GCE_MACHINE_TYPE}",
     "disks": [
         {
             "boot": True,
-            "device_name": DB_INSTANCE_NAME,
+            "device_name": GCE_INSTANCE_NAME,
             "initialize_params": {
                 "disk_size_gb": "10",
                 "disk_type": f"zones/{ZONE}/diskTypes/pd-balanced",
@@ -127,61 +104,60 @@ GCE_INSTANCE_BODY = {
         }
     ],
 }
-DELETE_PERSISTENT_DISK = f"""
+FIREWALL_RULE_NAME = f"allow-http-{DB_PORT}"
+CREATE_FIREWALL_RULE_COMMAND = f"""
 if [ $AIRFLOW__API__GOOGLE_KEY_PATH ]; then \
  gcloud auth activate-service-account 
--key-file=$AIRFLOW__API__GOOGLE_KEY_PATH; \
 fi;
-gcloud compute disks delete {DB_INSTANCE_NAME} --project={PROJECT_ID} 
--zone={ZONE} --quiet
-"""
 
-SETUP_POSTGRES = f"""
-sudo apt update &&
-sudo apt install -y docker.io &&
-sudo docker run -d -p {DB_PORT}:{DB_PORT} --name {DB_NAME} \
-    -e POSTGRES_USER={DB_USER_NAME} \
-    -e POSTGRES_PASSWORD={DB_USER_PASSWORD} \
-    -e POSTGRES_DB={DB_NAME} \
-    postgres
+if [ -z gcloud compute firewall-rules list --filter=name:{FIREWALL_RULE_NAME} 
--format="value(name)" ]; then \
+    gcloud compute firewall-rules create {FIREWALL_RULE_NAME} \
+      --project={PROJECT_ID} \
+      --direction=INGRESS \
+      --priority=100 \
+      --network={NETWORK} \
+      --action=ALLOW \
+      --rules=tcp:{DB_PORT} \
+      --source-ranges=0.0.0.0/0
+else
+    echo "Firewall rule {FIREWALL_RULE_NAME} already exists."
+fi
 """
-
-FIREWALL_RULE_NAME = f"allow-http-{DB_PORT}"
-CREATE_FIREWALL_RULE = f"""
+DELETE_FIREWALL_RULE_COMMAND = f"""
 if [ $AIRFLOW__API__GOOGLE_KEY_PATH ]; then \
  gcloud auth activate-service-account 
--key-file=$AIRFLOW__API__GOOGLE_KEY_PATH; \
+fi; \
+if [ gcloud compute firewall-rules list --filter=name:{FIREWALL_RULE_NAME} 
--format="value(name)" ]; then \
+    gcloud compute firewall-rules delete {FIREWALL_RULE_NAME} 
--project={PROJECT_ID} --quiet; \
 fi;
-gcloud compute firewall-rules create {FIREWALL_RULE_NAME} \
-  --project={PROJECT_ID} \
-  --direction=INGRESS \
-  --priority=100 \
-  --network={NETWORK} \
-  --action=ALLOW \
-  --rules=tcp:{DB_PORT} \
-  --source-ranges=0.0.0.0/0
 """
-DELETE_FIREWALL_RULE = f"""
+DELETE_PERSISTENT_DISK_COMMAND = f"""
 if [ $AIRFLOW__API__GOOGLE_KEY_PATH ]; then \
  gcloud auth activate-service-account 
--key-file=$AIRFLOW__API__GOOGLE_KEY_PATH; \
 fi;
-gcloud compute firewall-rules delete {FIREWALL_RULE_NAME} 
--project={PROJECT_ID} --quiet
+
+gcloud compute disks delete {GCE_INSTANCE_NAME} --project={PROJECT_ID} 
--zone={ZONE} --quiet
 """
 
-SQL_TABLE = "test_table"
-SQL_CREATE = f"CREATE TABLE IF NOT EXISTS {SQL_TABLE} (col_1 INT, col_2 
VARCHAR(8))"
-SQL_INSERT = f"INSERT INTO {SQL_TABLE} (col_1, col_2) VALUES (1, 'one'), (2, 
'two')"
-SQL_SELECT = f"SELECT * FROM {SQL_TABLE}"
+SHEETS_CONNECTION_ID = f"connection_{DAG_ID}_{ENV_ID}"
+SPREADSHEET = {
+    "properties": {"title": "Test1"},
+    "sheets": [{"properties": {"title": "Sheet1"}}],
+}
+
 
 log = logging.getLogger(__name__)
 
 
 with DAG(
-    DAG_ID,
+    dag_id=DAG_ID,
+    schedule="@once",
     start_date=datetime(2021, 1, 1),
-    schedule="@once",  # Override to match your needs
     catchup=False,
-    tags=["example", "sql"],
+    tags=["example", "postgres", "gcs"],
 ) as dag:
-    create_instance = ComputeEngineInsertInstanceOperator(
-        task_id="create_instance",
+    create_gce_instance = ComputeEngineInsertInstanceOperator(
+        task_id="create_gce_instance",
         project_id=PROJECT_ID,
         zone=ZONE,
         body=GCE_INSTANCE_BODY,
@@ -189,55 +165,67 @@ with DAG(
 
     create_firewall_rule = BashOperator(
         task_id="create_firewall_rule",
-        bash_command=CREATE_FIREWALL_RULE,
+        bash_command=CREATE_FIREWALL_RULE_COMMAND,
     )
 
     setup_postgres = SSHOperator(
         task_id="setup_postgres",
         ssh_hook=ComputeEngineSSHHook(
             user="username",
-            instance_name=DB_INSTANCE_NAME,
+            instance_name=GCE_INSTANCE_NAME,
             zone=ZONE,
             project_id=PROJECT_ID,
             use_oslogin=False,
             use_iap_tunnel=False,
             cmd_timeout=180,
         ),
-        command=SETUP_POSTGRES,
+        command=SETUP_POSTGRES_COMMAND,
         retries=2,
-        retry_delay=30,
     )
 
     @task
     def get_public_ip() -> str:
         hook = ComputeEngineHook()
-        address = hook.get_instance_address(resource_id=DB_INSTANCE_NAME, 
zone=ZONE, project_id=PROJECT_ID)
+        address = hook.get_instance_address(resource_id=GCE_INSTANCE_NAME, 
zone=ZONE, project_id=PROJECT_ID)
         return address
 
     get_public_ip_task = get_public_ip()
 
     @task
-    def setup_postgres_connection(**kwargs) -> None:
-        public_ip = kwargs["ti"].xcom_pull(task_ids="get_public_ip")
+    def setup_connection(ip_address: str) -> None:
         connection = Connection(
-            conn_id=DB_CONNECTION_ID,
-            description="Example PostgreSQL connection",
-            conn_type="postgres",
-            host=public_ip,
+            conn_id=CONNECTION_ID,
+            description="Example connection",
+            conn_type=CONNECTION_TYPE,
+            schema=DB_NAME,
+            host=ip_address,
             login=DB_USER_NAME,
             password=DB_USER_PASSWORD,
-            schema=DB_NAME,
             port=DB_PORT,
         )
         session = Session()
-        if session.query(Connection).filter(Connection.conn_id == 
DB_CONNECTION_ID).first():
-            log.warning("Connection %s already exists", DB_CONNECTION_ID)
-            return None
+        log.info("Removing connection %s if it exists", CONNECTION_ID)
+        query = session.query(Connection).filter(Connection.conn_id == 
CONNECTION_ID)
+        query.delete()
 
         session.add(connection)
         session.commit()
+        log.info("Connection %s created", CONNECTION_ID)
 
-    setup_postgres_connection_task = setup_postgres_connection()
+    setup_connection_task = setup_connection(get_public_ip_task)
+
+    create_sql_table = SQLExecuteQueryOperator(
+        task_id="create_sql_table",
+        conn_id=CONNECTION_ID,
+        sql=SQL_CREATE,
+        retries=4,
+    )
+
+    insert_sql_data = SQLExecuteQueryOperator(
+        task_id="insert_sql_data",
+        conn_id=CONNECTION_ID,
+        sql=SQL_INSERT,
+    )
 
     @task
     def setup_sheets_connection():
@@ -259,18 +247,6 @@ with DAG(
 
     setup_sheets_connection_task = setup_sheets_connection()
 
-    create_sql_table = SQLExecuteQueryOperator(
-        task_id="create_sql_table",
-        conn_id=DB_CONNECTION_ID,
-        sql=SQL_CREATE,
-    )
-
-    insert_data = SQLExecuteQueryOperator(
-        task_id="insert_data",
-        conn_id=DB_CONNECTION_ID,
-        sql=SQL_INSERT,
-    )
-
     create_spreadsheet = GoogleSheetsCreateSpreadsheetOperator(
         task_id="create_spreadsheet", spreadsheet=SPREADSHEET, 
gcp_conn_id=SHEETS_CONNECTION_ID
     )
@@ -279,7 +255,7 @@ with DAG(
     upload_sql_to_sheet = SQLToGoogleSheetsOperator(
         task_id="upload_sql_to_sheet",
         sql=SQL_SELECT,
-        sql_conn_id=DB_CONNECTION_ID,
+        sql_conn_id=CONNECTION_ID,
         database=DB_NAME,
         spreadsheet_id="{{ 
task_instance.xcom_pull(task_ids='create_spreadsheet', "
         "key='spreadsheet_id') }}",
@@ -287,54 +263,57 @@ with DAG(
     )
     # [END upload_sql_to_sheets]
 
-    delete_postgres_connection = BashOperator(
-        task_id="delete_postgres_connection",
-        bash_command=f"airflow connections delete {DB_CONNECTION_ID}",
-        trigger_rule=TriggerRule.ALL_DONE,
-    )
-
-    delete_sheets_connection = BashOperator(
-        task_id="delete_temp_sheets_connection",
-        bash_command=f"airflow connections delete {SHEETS_CONNECTION_ID}",
+    delete_firewall_rule = BashOperator(
+        task_id="delete_firewall_rule",
+        bash_command=DELETE_FIREWALL_RULE_COMMAND,
         trigger_rule=TriggerRule.ALL_DONE,
     )
 
-    delete_instance = ComputeEngineDeleteInstanceOperator(
-        task_id="delete_instance",
-        resource_id=DB_INSTANCE_NAME,
+    delete_gce_instance = ComputeEngineDeleteInstanceOperator(
+        task_id="delete_gce_instance",
+        resource_id=GCE_INSTANCE_NAME,
         zone=ZONE,
         project_id=PROJECT_ID,
         trigger_rule=TriggerRule.ALL_DONE,
     )
 
-    delete_firewall_rule = BashOperator(
-        task_id="delete_firewall_rule",
-        bash_command=DELETE_FIREWALL_RULE,
+    delete_persistent_disk = BashOperator(
+        task_id="delete_persistent_disk",
+        bash_command=DELETE_PERSISTENT_DISK_COMMAND,
         trigger_rule=TriggerRule.ALL_DONE,
     )
 
-    delete_persistent_disk = BashOperator(
-        task_id="delete_persistent_disk",
-        bash_command=DELETE_PERSISTENT_DISK,
+    delete_connection = BashOperator(
+        task_id="delete_connection",
+        bash_command=f"airflow connections delete {CONNECTION_ID}",
         trigger_rule=TriggerRule.ALL_DONE,
     )
 
-    # TEST SETUP
-    create_instance >> setup_postgres
-    (create_instance >> get_public_ip_task >> setup_postgres_connection_task)
-    (
-        [setup_postgres, setup_postgres_connection_task, create_firewall_rule]
-        >> create_sql_table
-        >> insert_data
+    delete_sheets_connection = BashOperator(
+        task_id="delete_sheets_connection",
+        bash_command=f"airflow connections delete {SHEETS_CONNECTION_ID}",
+        trigger_rule=TriggerRule.ALL_DONE,
     )
+    # TEST SETUP
+    create_gce_instance >> setup_postgres
+    create_gce_instance >> get_public_ip_task >> setup_connection_task
+    [setup_postgres, setup_connection_task, create_firewall_rule] >> 
create_sql_table >> insert_sql_data
+    setup_sheets_connection_task >> create_spreadsheet
+
     (
-        [insert_data, setup_sheets_connection_task >> create_spreadsheet]
+        [create_spreadsheet, insert_sql_data]
         # TEST BODY
         >> upload_sql_to_sheet
-        # TEST TEARDOWN
-        >> [delete_instance, delete_postgres_connection, 
delete_sheets_connection, delete_firewall_rule]
     )
-    delete_instance >> delete_persistent_disk
+
+    # TEST TEARDOWN
+    upload_sql_to_sheet >> [
+        delete_firewall_rule,
+        delete_gce_instance,
+        delete_connection,
+        delete_sheets_connection,
+    ]
+    delete_gce_instance >> delete_persistent_disk
 
     from tests.system.utils.watcher import watcher
 
diff --git 
a/tests/system/providers/google/cloud/transfers/example_postgres_to_gcs.py 
b/tests/system/providers/google/cloud/transfers/example_postgres_to_gcs.py
index c3994f1462..605a67dfa4 100644
--- a/tests/system/providers/google/cloud/transfers/example_postgres_to_gcs.py
+++ b/tests/system/providers/google/cloud/transfers/example_postgres_to_gcs.py
@@ -1,4 +1,3 @@
-#
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -15,8 +14,9 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+
 """
-Example DAG using PostgresToGCSOperator.
+Example DAG using PostgresSQLToGCSOperator.
 
 This DAG relies on the following OS environment variables
 
@@ -31,7 +31,7 @@ import os
 from datetime import datetime
 
 from airflow.decorators import task
-from airflow.models.connection import Connection
+from airflow.models import Connection
 from airflow.models.dag import DAG
 from airflow.operators.bash import BashOperator
 from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
@@ -57,21 +57,40 @@ PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", 
"example-project")
 REGION = "europe-west2"
 ZONE = REGION + "-a"
 NETWORK = "default"
+CONNECTION_ID = f"pg_{DAG_ID}_{ENV_ID}".replace("-", "_")
+CONNECTION_TYPE = "postgres"
+
+BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
+FILE_NAME = "result.json"
 
 DB_NAME = "testdb"
 DB_PORT = 5432
-DB_USER_NAME = "demo_user"
+DB_USER_NAME = "root"
 DB_USER_PASSWORD = "demo_password"
+SETUP_POSTGRES_COMMAND = f"""
+sudo apt update &&
+sudo apt install -y docker.io &&
+sudo docker run -d -p {DB_PORT}:{DB_PORT} --name {DB_NAME} \
+    -e PGUSER={DB_USER_NAME} \
+    -e POSTGRES_USER={DB_USER_NAME} \
+    -e POSTGRES_PASSWORD={DB_USER_PASSWORD} \
+    -e POSTGRES_DB={DB_NAME} \
+    postgres
+"""
+SQL_TABLE = "test_table"
+SQL_CREATE = f"CREATE TABLE IF NOT EXISTS {SQL_TABLE} (col_1 INT, col_2 
VARCHAR(8))"
+SQL_INSERT = f"INSERT INTO {SQL_TABLE} (col_1, col_2) VALUES (1, 'one'), (2, 
'two')"
+SQL_SELECT = f"SELECT * FROM {SQL_TABLE}"
 
-SHORT_MACHINE_TYPE_NAME = "n1-standard-1"
-DB_INSTANCE_NAME = f"instance-{DAG_ID}-{ENV_ID}".replace("_", "-")
+GCE_MACHINE_TYPE = "n1-standard-1"
+GCE_INSTANCE_NAME = f"instance-{DAG_ID}-{ENV_ID}".replace("_", "-")
 GCE_INSTANCE_BODY = {
-    "name": DB_INSTANCE_NAME,
-    "machine_type": f"zones/{ZONE}/machineTypes/{SHORT_MACHINE_TYPE_NAME}",
+    "name": GCE_INSTANCE_NAME,
+    "machine_type": f"zones/{ZONE}/machineTypes/{GCE_MACHINE_TYPE}",
     "disks": [
         {
             "boot": True,
-            "device_name": DB_INSTANCE_NAME,
+            "device_name": GCE_INSTANCE_NAME,
             "initialize_params": {
                 "disk_size_gb": "10",
                 "disk_type": f"zones/{ZONE}/diskTypes/pd-balanced",
@@ -87,57 +106,41 @@ GCE_INSTANCE_BODY = {
         }
     ],
 }
-DELETE_PERSISTENT_DISK = f"""
+FIREWALL_RULE_NAME = f"allow-http-{DB_PORT}"
+CREATE_FIREWALL_RULE_COMMAND = f"""
 if [ $AIRFLOW__API__GOOGLE_KEY_PATH ]; then \
  gcloud auth activate-service-account 
--key-file=$AIRFLOW__API__GOOGLE_KEY_PATH; \
 fi;
 
-gcloud compute disks delete {DB_INSTANCE_NAME} --project={PROJECT_ID} 
--zone={ZONE} --quiet
-"""
-
-SETUP_POSTGRES = f"""
-sudo apt update &&
-sudo apt install -y docker.io &&
-sudo docker run -d -p {DB_PORT}:{DB_PORT} --name {DB_NAME} \
-    -e POSTGRES_USER={DB_USER_NAME} \
-    -e POSTGRES_PASSWORD={DB_USER_PASSWORD} \
-    -e POSTGRES_DB={DB_NAME} \
-    postgres
+if [ -z gcloud compute firewall-rules list --filter=name:{FIREWALL_RULE_NAME} 
--format="value(name)" ]; then \
+    gcloud compute firewall-rules create {FIREWALL_RULE_NAME} \
+      --project={PROJECT_ID} \
+      --direction=INGRESS \
+      --priority=100 \
+      --network={NETWORK} \
+      --action=ALLOW \
+      --rules=tcp:{DB_PORT} \
+      --source-ranges=0.0.0.0/0
+else
+    echo "Firewall rule {FIREWALL_RULE_NAME} already exists."
+fi
 """
-
-FIREWALL_RULE_NAME = f"allow-http-{DB_PORT}"
-CREATE_FIREWALL_RULE = f"""
+DELETE_FIREWALL_RULE_COMMAND = f"""
 if [ $AIRFLOW__API__GOOGLE_KEY_PATH ]; then \
  gcloud auth activate-service-account 
--key-file=$AIRFLOW__API__GOOGLE_KEY_PATH; \
+fi; \
+if [ gcloud compute firewall-rules list --filter=name:{FIREWALL_RULE_NAME} 
--format="value(name)" ]; then \
+    gcloud compute firewall-rules delete {FIREWALL_RULE_NAME} 
--project={PROJECT_ID} --quiet; \
 fi;
-
-gcloud compute firewall-rules create {FIREWALL_RULE_NAME} \
-  --project={PROJECT_ID} \
-  --direction=INGRESS \
-  --priority=100 \
-  --network={NETWORK} \
-  --action=ALLOW \
-  --rules=tcp:{DB_PORT} \
-  --source-ranges=0.0.0.0/0
 """
-DELETE_FIREWALL_RULE = f"""
+DELETE_PERSISTENT_DISK_COMMAND = f"""
 if [ $AIRFLOW__API__GOOGLE_KEY_PATH ]; then \
  gcloud auth activate-service-account 
--key-file=$AIRFLOW__API__GOOGLE_KEY_PATH; \
 fi;
 
-gcloud compute firewall-rules delete {FIREWALL_RULE_NAME} 
--project={PROJECT_ID} --quiet
+gcloud compute disks delete {GCE_INSTANCE_NAME} --project={PROJECT_ID} 
--zone={ZONE} --quiet
 """
 
-CONNECTION_ID = f"postgres_{DAG_ID}_{ENV_ID}".replace("-", "_")
-
-BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
-FILE_NAME = "result.json"
-
-SQL_TABLE = "test_table"
-SQL_CREATE = f"CREATE TABLE IF NOT EXISTS {SQL_TABLE} (col_1 INT, col_2 
VARCHAR(8))"
-SQL_INSERT = f"INSERT INTO {SQL_TABLE} (col_1, col_2) VALUES (1, 'one'), (2, 
'two')"
-SQL_SELECT = f"SELECT * FROM {SQL_TABLE}"
-
 
 log = logging.getLogger(__name__)
 
@@ -149,8 +152,8 @@ with DAG(
     catchup=False,
     tags=["example", "postgres", "gcs"],
 ) as dag:
-    create_instance = ComputeEngineInsertInstanceOperator(
-        task_id="create_instance",
+    create_gce_instance = ComputeEngineInsertInstanceOperator(
+        task_id="create_gce_instance",
         project_id=PROJECT_ID,
         zone=ZONE,
         body=GCE_INSTANCE_BODY,
@@ -158,132 +161,130 @@ with DAG(
 
     create_firewall_rule = BashOperator(
         task_id="create_firewall_rule",
-        bash_command=CREATE_FIREWALL_RULE,
+        bash_command=CREATE_FIREWALL_RULE_COMMAND,
     )
 
     setup_postgres = SSHOperator(
         task_id="setup_postgres",
         ssh_hook=ComputeEngineSSHHook(
             user="username",
-            instance_name=DB_INSTANCE_NAME,
+            instance_name=GCE_INSTANCE_NAME,
             zone=ZONE,
             project_id=PROJECT_ID,
             use_oslogin=False,
             use_iap_tunnel=False,
             cmd_timeout=180,
         ),
-        command=SETUP_POSTGRES,
+        command=SETUP_POSTGRES_COMMAND,
         retries=2,
-        retry_delay=30,
     )
 
     @task
     def get_public_ip() -> str:
         hook = ComputeEngineHook()
-        address = hook.get_instance_address(resource_id=DB_INSTANCE_NAME, 
zone=ZONE, project_id=PROJECT_ID)
+        address = hook.get_instance_address(resource_id=GCE_INSTANCE_NAME, 
zone=ZONE, project_id=PROJECT_ID)
         return address
 
     get_public_ip_task = get_public_ip()
 
     @task
-    def setup_postgres_connection(**kwargs) -> None:
-        public_ip = kwargs["ti"].xcom_pull(task_ids="get_public_ip")
+    def setup_connection(ip_address: str) -> None:
         connection = Connection(
             conn_id=CONNECTION_ID,
-            description="Example PostgreSQL connection",
-            conn_type="postgres",
-            host=public_ip,
+            description="Example connection",
+            conn_type=CONNECTION_TYPE,
+            schema=DB_NAME,
+            host=ip_address,
             login=DB_USER_NAME,
             password=DB_USER_PASSWORD,
-            schema=DB_NAME,
             port=DB_PORT,
         )
         session = Session()
-        if session.query(Connection).filter(Connection.conn_id == 
CONNECTION_ID).first():
-            log.warning("Connection %s already exists", CONNECTION_ID)
-            return None
+        log.info("Removing connection %s if it exists", CONNECTION_ID)
+        query = session.query(Connection).filter(Connection.conn_id == 
CONNECTION_ID)
+        query.delete()
 
         session.add(connection)
         session.commit()
+        log.info("Connection %s created", CONNECTION_ID)
 
-    setup_postgres_connection_task = setup_postgres_connection()
-
-    create_bucket = GCSCreateBucketOperator(
-        task_id="create_bucket",
-        bucket_name=BUCKET_NAME,
-    )
+    setup_connection_task = setup_connection(get_public_ip_task)
 
     create_sql_table = SQLExecuteQueryOperator(
         task_id="create_sql_table",
         conn_id=CONNECTION_ID,
         sql=SQL_CREATE,
+        retries=4,
     )
 
-    insert_data = SQLExecuteQueryOperator(
-        task_id="insert_data",
+    insert_sql_data = SQLExecuteQueryOperator(
+        task_id="insert_sql_data",
         conn_id=CONNECTION_ID,
         sql=SQL_INSERT,
     )
 
+    create_gcs_bucket = GCSCreateBucketOperator(
+        task_id="create_gcs_bucket",
+        bucket_name=BUCKET_NAME,
+    )
+
     # [START howto_operator_postgres_to_gcs]
-    get_data = PostgresToGCSOperator(
-        task_id="get_data",
+    postgres_to_gcs = PostgresToGCSOperator(
+        task_id="postgres_to_gcs",
         postgres_conn_id=CONNECTION_ID,
         sql=SQL_SELECT,
         bucket=BUCKET_NAME,
         filename=FILE_NAME,
-        gzip=False,
+        export_format="csv",
     )
     # [END howto_operator_postgres_to_gcs]
 
-    delete_postgres_connection = BashOperator(
-        task_id="delete_postgres_connection",
-        bash_command=f"airflow connections delete {CONNECTION_ID}",
+    delete_gcs_bucket = GCSDeleteBucketOperator(
+        task_id="delete_gcs_bucket",
+        bucket_name=BUCKET_NAME,
         trigger_rule=TriggerRule.ALL_DONE,
     )
 
-    delete_bucket = GCSDeleteBucketOperator(
-        task_id="delete_bucket",
-        bucket_name=BUCKET_NAME,
+    delete_firewall_rule = BashOperator(
+        task_id="delete_firewall_rule",
+        bash_command=DELETE_FIREWALL_RULE_COMMAND,
         trigger_rule=TriggerRule.ALL_DONE,
     )
 
-    delete_instance = ComputeEngineDeleteInstanceOperator(
-        task_id="delete_instance",
-        resource_id=DB_INSTANCE_NAME,
+    delete_gce_instance = ComputeEngineDeleteInstanceOperator(
+        task_id="delete_gce_instance",
+        resource_id=GCE_INSTANCE_NAME,
         zone=ZONE,
         project_id=PROJECT_ID,
         trigger_rule=TriggerRule.ALL_DONE,
     )
 
-    delete_firewall_rule = BashOperator(
-        task_id="delete_firewall_rule",
-        bash_command=DELETE_FIREWALL_RULE,
+    delete_persistent_disk = BashOperator(
+        task_id="delete_persistent_disk",
+        bash_command=DELETE_PERSISTENT_DISK_COMMAND,
         trigger_rule=TriggerRule.ALL_DONE,
     )
 
-    delete_persistent_disk = BashOperator(
-        task_id="delete_persistent_disk",
-        bash_command=DELETE_PERSISTENT_DISK,
+    delete_connection = BashOperator(
+        task_id="delete_connection",
+        bash_command=f"airflow connections delete {CONNECTION_ID}",
         trigger_rule=TriggerRule.ALL_DONE,
     )
 
     # TEST SETUP
-    create_instance >> setup_postgres
-    (create_instance >> get_public_ip_task >> setup_postgres_connection_task)
-    (
-        [setup_postgres, setup_postgres_connection_task, create_firewall_rule]
-        >> create_sql_table
-        >> insert_data
-    )
+    create_gce_instance >> setup_postgres
+    create_gce_instance >> get_public_ip_task >> setup_connection_task
+    [setup_postgres, setup_connection_task, create_firewall_rule] >> 
create_sql_table >> insert_sql_data
+
     (
-        [insert_data, create_bucket]
+        [create_gcs_bucket, insert_sql_data]
         # TEST BODY
-        >> get_data
-        # TEST TEARDOWN
-        >> [delete_instance, delete_bucket, delete_postgres_connection, 
delete_firewall_rule]
+        >> postgres_to_gcs
     )
-    delete_instance >> delete_persistent_disk
+
+    # TEST TEARDOWN
+    postgres_to_gcs >> [delete_gcs_bucket, delete_firewall_rule, 
delete_gce_instance, delete_connection]
+    delete_gce_instance >> delete_persistent_disk
 
     from tests.system.utils.watcher import watcher
 
@@ -291,7 +292,6 @@ with DAG(
     # when "tearDown" task with trigger rule is part of the DAG
     list(dag.tasks) >> watcher()
 
-
 from tests.system.utils import get_test_run  # noqa: E402
 
 # Needed to run the example DAG with pytest (see: 
tests/system/README.md#run_via_pytest)


Reply via email to