[jira] [Assigned] (AIRFLOW-2146) Initialize default Google BigQuery Connection with valid conn_type & Fix broken DBApiHook

2018-02-25 Thread Kaxil Naik (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-2146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kaxil Naik reassigned AIRFLOW-2146:
---

Assignee: Kaxil Naik

> Initialize default Google BigQuery Connection with valid conn_type & Fix 
> broken DBApiHook
> -
>
> Key: AIRFLOW-2146
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2146
> Project: Apache Airflow
>  Issue Type: Task
>  Components: contrib, gcp
>Reporter: Kaxil Naik
>Assignee: Kaxil Naik
>Priority: Major
> Fix For: 1.10.0
>
>
> `airflow initdb` creates a connection with conn_id='bigquery_default' and 
> conn_type='bigquery'. However, bigquery is not a valid conn_type, according 
> to models.Connection._types, and BigQuery connections should use the 
> google_cloud_platform conn_type.
> Also as [renanleme|https://github.com/renanleme] mentioned 
> [here|https://github.com/apache/incubator-airflow/pull/3031#issuecomment-368132910]
>  the dags he has created are broken when he is using `get_records()` from 
> BigQueryHook which is extended from DbApiHook.
> *Error Log*:
> {code}
> Traceback (most recent call last):
>   File "/src/apache-airflow/airflow/models.py", line 1519, in _run_raw_task
> result = task_copy.execute(context=context)
>   File "/airflow/dags/lib/operators/test_operator.py", line 21, in execute
> records = self._get_db_hook(self.source_conn_id).get_records(self.sql)
>   File "/src/apache-airflow/airflow/hooks/base_hook.py", line 92, in 
> get_records
> raise NotImplementedError()
> {code}
> *Dag*:
> {code:python}
> from datetime import datetime
> from airflow import DAG
> from lib.operators.test_operator import TestOperator
> default_args = {
> 'depends_on_past': False,
> 'start_date': datetime(2018, 2, 21),
> }
> dag = DAG(
> 'test_dag',
> default_args=default_args,
> schedule_interval='0 6 * * *'
> )
> sql = '''
> SELECT id from YOUR_BIGQUERY_TABLE limit 10
> '''
> compare_grouped_event = TestOperator(
> task_id='test_operator',
> source_conn_id='gcp_airflow',
> sql=sql,
> dag=dag
> )
> {code}
> *Operator*:
> {code:python}
> from airflow.hooks.base_hook import BaseHook
> from airflow.models import BaseOperator
> from airflow.utils.decorators import apply_defaults
> class TestOperator(BaseOperator):
> @apply_defaults
> def __init__(
> self,
> sql,
> source_conn_id=None,
> *args, **kwargs):
> super(TestOperator, self).__init__(*args, **kwargs)
> self.sql = sql
> self.source_conn_id = source_conn_id
> def execute(self, context=None):
> records = self._get_db_hook(self.source_conn_id).get_records(self.sql)
> self.log.info('Fetched records from source')
> @staticmethod
> def _get_db_hook(conn_id):
> return BaseHook.get_hook(conn_id=conn_id)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (AIRFLOW-2146) Initialize default Google BigQuery Connection with valid conn_type & Fix broken DBApiHook

2018-02-25 Thread Anonymous (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-2146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anonymous reassigned AIRFLOW-2146:
--

Assignee: (was: Kaxil Naik)

> Initialize default Google BigQuery Connection with valid conn_type & Fix 
> broken DBApiHook
> -
>
> Key: AIRFLOW-2146
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2146
> Project: Apache Airflow
>  Issue Type: Task
>  Components: contrib, gcp
>Reporter: Kaxil Naik
>Priority: Major
> Fix For: 1.10.0
>
>
> `airflow initdb` creates a connection with conn_id='bigquery_default' and 
> conn_type='bigquery'. However, bigquery is not a valid conn_type, according 
> to models.Connection._types, and BigQuery connections should use the 
> google_cloud_platform conn_type.
> Also as [renanleme|https://github.com/renanleme] mentioned 
> [here|https://github.com/apache/incubator-airflow/pull/3031#issuecomment-368132910]
>  the dags he has created are broken when he is using `get_records()` from 
> BigQueryHook which is extended from DbApiHook.
> *Error Log*:
> {code}
> Traceback (most recent call last):
>   File "/src/apache-airflow/airflow/models.py", line 1519, in _run_raw_task
> result = task_copy.execute(context=context)
>   File "/airflow/dags/lib/operators/test_operator.py", line 21, in execute
> records = self._get_db_hook(self.source_conn_id).get_records(self.sql)
>   File "/src/apache-airflow/airflow/hooks/base_hook.py", line 92, in 
> get_records
> raise NotImplementedError()
> {code}
> *Dag*:
> {code:python}
> from datetime import datetime
> from airflow import DAG
> from lib.operators.test_operator import TestOperator
> default_args = {
> 'depends_on_past': False,
> 'start_date': datetime(2018, 2, 21),
> }
> dag = DAG(
> 'test_dag',
> default_args=default_args,
> schedule_interval='0 6 * * *'
> )
> sql = '''
> SELECT id from YOUR_BIGQUERY_TABLE limit 10
> '''
> compare_grouped_event = TestOperator(
> task_id='test_operator',
> source_conn_id='gcp_airflow',
> sql=sql,
> dag=dag
> )
> {code}
> *Operator*:
> {code:python}
> from airflow.hooks.base_hook import BaseHook
> from airflow.models import BaseOperator
> from airflow.utils.decorators import apply_defaults
> class TestOperator(BaseOperator):
> @apply_defaults
> def __init__(
> self,
> sql,
> source_conn_id=None,
> *args, **kwargs):
> super(TestOperator, self).__init__(*args, **kwargs)
> self.sql = sql
> self.source_conn_id = source_conn_id
> def execute(self, context=None):
> records = self._get_db_hook(self.source_conn_id).get_records(self.sql)
> self.log.info('Fetched records from source')
> @staticmethod
> def _get_db_hook(conn_id):
> return BaseHook.get_hook(conn_id=conn_id)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)