[GitHub] [airflow] milton0825 commented on a change in pull request #6309: [AIRFLOW-3783][AIRFLOW-5395] Switch to HEADER arg to unload data from Redshift to S3

2019-10-12 Thread GitBox
milton0825 commented on a change in pull request #6309: 
[AIRFLOW-3783][AIRFLOW-5395] Switch to HEADER arg to unload data from Redshift 
to S3
URL: https://github.com/apache/airflow/pull/6309#discussion_r334244845
 
 

 ##
 File path: airflow/operators/redshift_to_s3_operator.py
 ##
 @@ -85,52 +93,16 @@ def __init__(
 self.autocommit = autocommit
 self.include_header = include_header
 
-if self.include_header and 'PARALLEL OFF' not in [uo.upper().strip() 
for uo in self.unload_options]:
-self.unload_options = list(self.unload_options) + ['PARALLEL OFF', 
]
+if self.include_header and 'HEADER' not in [uo.upper().strip() for uo 
in self.unload_options]:
+self.unload_options = list(self.unload_options) + ['HEADER', ]
 
 def execute(self, context):
-self.hook = PostgresHook(postgres_conn_id=self.redshift_conn_id)
-self.s3 = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
-credentials = self.s3.get_credentials()
-unload_options = '\n\t\t\t'.join(self.unload_options)
-
-if self.include_header:
-self.log.info("Retrieving headers from %s.%s...",
-  self.schema, self.table)
-
-columns_query = """SELECT column_name
-FROM information_schema.columns
-WHERE table_schema = '{schema}'
-AND   table_name = '{table}'
-ORDER BY ordinal_position
-""".format(schema=self.schema,
-   table=self.table)
-
-cursor = self.hook.get_conn().cursor()
-cursor.execute(columns_query)
-rows = cursor.fetchall()
-columns = [row[0] for row in rows]
-column_names = ', '.join("{0}".format(c) for c in columns)
-column_headers = ', '.join("\\'{0}\\'".format(c) for c in columns)
-column_castings = ', '.join("CAST({0} AS text) AS {0}".format(c)
-for c in columns)
-
-select_query = """SELECT {column_names} FROM
-(SELECT 2 sort_order, {column_castings}
- FROM {schema}.{table}
-UNION ALL
-SELECT 1 sort_order, {column_headers})
- ORDER BY sort_order"""\
-.format(column_names=column_names,
-column_castings=column_castings,
-column_headers=column_headers,
-schema=self.schema,
-table=self.table)
-else:
-select_query = "SELECT * FROM {schema}.{table}"\
-.format(schema=self.schema,
-table=self.table)
+postgres_hook = PostgresHook(postgres_conn_id=self.redshift_conn_id)
+s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
 
+credentials = s3_hook.get_credentials()
+unload_options = '\n\t\t\t'.join(self.unload_options)
+select_query = "SELECT * FROM 
{schema}.{table}".format(schema=self.schema, table=self.table)
 
 Review comment:
   Yup I think allowing to specify `select_query` should work. We either have 
to deprecate the `schema` and `table` field, or we need to have some validation 
to check that these fields cannot be used at the same time.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] milton0825 commented on a change in pull request #6309: [AIRFLOW-3783][AIRFLOW-5395] Switch to HEADER arg to unload data from Redshift to S3

2019-10-11 Thread GitBox
milton0825 commented on a change in pull request #6309: 
[AIRFLOW-3783][AIRFLOW-5395] Switch to HEADER arg to unload data from Redshift 
to S3
URL: https://github.com/apache/airflow/pull/6309#discussion_r334209505
 
 

 ##
 File path: airflow/operators/redshift_to_s3_operator.py
 ##
 @@ -85,52 +93,16 @@ def __init__(
 self.autocommit = autocommit
 self.include_header = include_header
 
-if self.include_header and 'PARALLEL OFF' not in [uo.upper().strip() 
for uo in self.unload_options]:
-self.unload_options = list(self.unload_options) + ['PARALLEL OFF', 
]
+if self.include_header and 'HEADER' not in [uo.upper().strip() for uo 
in self.unload_options]:
+self.unload_options = list(self.unload_options) + ['HEADER', ]
 
 def execute(self, context):
-self.hook = PostgresHook(postgres_conn_id=self.redshift_conn_id)
-self.s3 = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
-credentials = self.s3.get_credentials()
-unload_options = '\n\t\t\t'.join(self.unload_options)
-
-if self.include_header:
-self.log.info("Retrieving headers from %s.%s...",
-  self.schema, self.table)
-
-columns_query = """SELECT column_name
-FROM information_schema.columns
-WHERE table_schema = '{schema}'
-AND   table_name = '{table}'
-ORDER BY ordinal_position
-""".format(schema=self.schema,
-   table=self.table)
-
-cursor = self.hook.get_conn().cursor()
-cursor.execute(columns_query)
-rows = cursor.fetchall()
-columns = [row[0] for row in rows]
-column_names = ', '.join("{0}".format(c) for c in columns)
-column_headers = ', '.join("\\'{0}\\'".format(c) for c in columns)
-column_castings = ', '.join("CAST({0} AS text) AS {0}".format(c)
-for c in columns)
-
-select_query = """SELECT {column_names} FROM
-(SELECT 2 sort_order, {column_castings}
- FROM {schema}.{table}
-UNION ALL
-SELECT 1 sort_order, {column_headers})
- ORDER BY sort_order"""\
-.format(column_names=column_names,
-column_castings=column_castings,
-column_headers=column_headers,
-schema=self.schema,
-table=self.table)
-else:
-select_query = "SELECT * FROM {schema}.{table}"\
-.format(schema=self.schema,
-table=self.table)
+postgres_hook = PostgresHook(postgres_conn_id=self.redshift_conn_id)
+s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
 
+credentials = s3_hook.get_credentials()
+unload_options = '\n\t\t\t'.join(self.unload_options)
+select_query = "SELECT * FROM 
{schema}.{table}".format(schema=self.schema, table=self.table)
 
 Review comment:
   Should we allow user to choose which columns to select?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services