[GitHub] [airflow] jaketf commented on a change in pull request #6210: [AIRFLOW-5567] BaseAsyncOperator

2019-10-25 Thread GitBox
jaketf commented on a change in pull request #6210: [AIRFLOW-5567] 
BaseAsyncOperator
URL: https://github.com/apache/airflow/pull/6210#discussion_r339146836
 
 

 ##
 File path: airflow/models/base_reschedule_poke_operator.py
 ##
 @@ -0,0 +1,218 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Base Operator for kicking off a long running
+operations and polling for completion with reschedule mode.
+"""
+
+from abc import ABC, abstractmethod
+from typing import Dict, List, Iterable, Optional, Union
+from time import sleep
+from datetime import timedelta
+
+from airflow.exceptions import AirflowException, AirflowSensorTimeout, \
+AirflowSkipException, AirflowRescheduleException
+from airflow.models import SkipMixin, TaskReschedule
+from airflow.models.xcom import XCOM_EXTERNAL_RESOURCE_ID_KEY
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils import timezone
+from airflow.utils.decorators import apply_defaults
+from airflow.ti_deps.deps.ready_to_reschedule import ReadyToRescheduleDep
+
+
+class BaseReschedulePokeOperator(BaseOperator, SkipMixin, ABC):
 
 Review comment:
   that is a really good point. I'm not sure. I imagine it wouldn't run until 
the task was rescheduled.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] jaketf commented on a change in pull request #6210: [AIRFLOW-5567] BaseAsyncOperator

2019-10-25 Thread GitBox
jaketf commented on a change in pull request #6210: [AIRFLOW-5567] 
BaseAsyncOperator
URL: https://github.com/apache/airflow/pull/6210#discussion_r339141292
 
 

 ##
 File path: airflow/gcp/hooks/dataproc.py
 ##
 @@ -485,7 +490,8 @@ def submit(
 project_id: str,
 job: Dict,
 region: str = 'global',
-job_error_states: Optional[Iterable[str]] = None
+job_error_states: Optional[Iterable[str]] = None,
+async: bool = False
 
 Review comment:
   I'm removing this dataproc stuff from this PR. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] jaketf commented on a change in pull request #6210: [AIRFLOW-5567] BaseAsyncOperator

2019-10-21 Thread GitBox
jaketf commented on a change in pull request #6210: [AIRFLOW-5567] 
BaseAsyncOperator
URL: https://github.com/apache/airflow/pull/6210#discussion_r337223464
 
 

 ##
 File path: airflow/models/base_async_operator.py
 ##
 @@ -0,0 +1,161 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Base Asynchronous Operator for kicking off a long running
+operations and polling for completion with reschedule mode.
+"""
+
+from abc import abstractmethod
+from typing import Dict, List, Optional, Union
+
+from airflow.models import SkipMixin, TaskReschedule
+from airflow.models.xcom import XCOM_EXTERNAL_RESOURCE_ID_KEY
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
 
 Review comment:
   I have added commit to illustrate the proposed class structure.
   I'll put work into actually making all the tests work / etc. if you guys can 
confirm this is inline with what you're looking for.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] jaketf commented on a change in pull request #6210: [AIRFLOW-5567] BaseAsyncOperator

2019-10-20 Thread GitBox
jaketf commented on a change in pull request #6210: [AIRFLOW-5567] 
BaseAsyncOperator
URL: https://github.com/apache/airflow/pull/6210#discussion_r336785732
 
 

 ##
 File path: airflow/models/base_async_operator.py
 ##
 @@ -0,0 +1,161 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Base Asynchronous Operator for kicking off a long running
+operations and polling for completion with reschedule mode.
+"""
+
+from abc import abstractmethod
+from typing import Dict, List, Optional, Union
+
+from airflow.models import SkipMixin, TaskReschedule
+from airflow.models.xcom import XCOM_EXTERNAL_RESOURCE_ID_KEY
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
 
 Review comment:
   @Fokko  thanks for jumping in. 
   > Since the xcom is a key-value structure, I think we should reserve some 
key for storing this state. Using xcom for this will also enable us to look at 
the state from the Airflow UI, this comes free
   
   @mik-laj had the same feedback so I added 
[`XCOM_EXTERNAL_RESOURCE_ID`](https://github.com/apache/airflow/pull/6210/files#diff-26749963c29f104c58fac199e5aa30c3R39).
 
   
   > I'm also thinking about the class structure. Right now we have the 
BaseOperator -> BaseSensor -> BaseAsyncOperator, which feels a bit awkward. 
Ideally we would like to push the retry logic up in the tree.
   
   @Fokko @JonnyIncognito 
   I'm assuming you mean push the reschedule logic up the tree. What would you 
think of `BaseOperator` > `BaseReschedulePokeOperator` > `BaseSensor` ?
   We'd move the logic from `BaseSensorOperator`'s execute method into 
`BaseReschedulePokeOperator` (formerly known as BaseAsyncOperator) and then 
`BaseSensor` (formerly `BaseSensorOperator`) will become very simple w/ a 
`submit_request` of `pass`. It's a lot of code shuffling but I can make that 
change if we think that's a more sensible class structure. Do we see an 
argument for pushing reschedule logic into BaseOperator?  


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] jaketf commented on a change in pull request #6210: [AIRFLOW-5567] BaseAsyncOperator

2019-10-17 Thread GitBox
jaketf commented on a change in pull request #6210: [AIRFLOW-5567] 
BaseAsyncOperator
URL: https://github.com/apache/airflow/pull/6210#discussion_r336286276
 
 

 ##
 File path: tests/models/test_base_async_operator.py
 ##
 @@ -0,0 +1,288 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+""" Tests for BaseAsyncOperator"""
+
+import random
+import unittest
+import uuid
+from datetime import timedelta
+from unittest.mock import Mock  # pylint: disable=ungrouped-imports
+
+from freezegun import freeze_time
+from parameterized import parameterized
+
+from airflow import DAG, settings
+from airflow.exceptions import AirflowSensorTimeout
+from airflow.models import DagRun, TaskInstance, TaskReschedule
+from airflow.models.base_async_operator import BaseAsyncOperator
+from airflow.models.xcom import XCOM_EXTERNAL_RESOURCE_ID_KEY
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.utils import timezone
+from airflow.utils.state import State
+from airflow.utils.timezone import datetime
+
+DEFAULT_DATE = datetime(2015, 1, 1)
+TEST_DAG_ID = 'unit_test_dag'
+DUMMY_OP = 'dummy_op'
+ASYNC_OP = 'async_op'
+
+
+def _job_id():
+"""yield a random job id."""
+return 'job_id-{}'.format(uuid.uuid4())
+
+
+ALL_ID_TYPES = [
+(_job_id(),),
+(random.randint(0, 10**10),),
+([_job_id(), _job_id()],),
+({'job1': _job_id()},),
+(None,)
+]
+
+
+class DummyAsyncOperator(BaseAsyncOperator):
+"""
+Test subclass of BaseAsyncOperator
+"""
+def __init__(self, return_value=False,
+ **kwargs):
+super().__init__(**kwargs)
+self.return_value = return_value
+
+def poke(self, context):
+"""successful on first poke"""
+return self.return_value
+
+def submit_request(self, context):
+"""pretend to submit a job w/ random id"""
+return _job_id()
+
+def process_result(self, context):
+"""attempt to get the external resource_id"""
+return self.get_external_resource_id(context)
+
+
+class TestBaseAsyncOperator(unittest.TestCase):
+"""Test cases for BaseAsyncOperator."""
+def setUp(self):
+args = {
+'owner': 'airflow',
+'start_date': DEFAULT_DATE
+}
+self.dag = DAG(TEST_DAG_ID, default_args=args)
+
+session = settings.Session()
+session.query(TaskReschedule).delete()
+session.query(DagRun).delete()
+session.query(TaskInstance).delete()
+session.commit()
+
+def _make_dag_run(self):
+return self.dag.create_dagrun(
+run_id='manual__',
+start_date=timezone.utcnow(),
+execution_date=DEFAULT_DATE,
+state=State.RUNNING
+)
+
+def _make_async_op(self, return_value, resource_id=None, **kwargs):
+poke_interval = 'poke_interval'
+timeout = 'timeout'
+if poke_interval not in kwargs:
+kwargs[poke_interval] = 0
+if timeout not in kwargs:
+kwargs[timeout] = 0
+
+async_op = DummyAsyncOperator(
+task_id=ASYNC_OP,
+return_value=return_value,
+resource_id=resource_id,
+dag=self.dag,
+**kwargs
+)
+
+dummy_op = DummyOperator(
+task_id=DUMMY_OP,
+dag=self.dag
+)
+dummy_op.set_upstream(async_op)
+return async_op
+
+@classmethod
+def _run(cls, task):
+task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, 
ignore_ti_state=True)
+
+def test_ok(self):
+""" Test normal behavior"""
+async_op = self._make_async_op(True)
+dr = self._make_dag_run()
+
+self._run(async_op)
+tis = dr.get_task_instances()
+self.assertEqual(len(tis), 2)
+for ti in tis:
+if ti.task_id == ASYNC_OP:
+self.assertEqual(ti.state, State.SUCCESS)
+if ti.task_id == DUMMY_OP:
+self.assertEqual(ti.state, State.NONE)
+
+def test_poke_fail(self):
+""" Test failure in poke"""
+async_op = self._make_async_op(False)
+dr = self._make_dag_run()
+
+with 

[GitHub] [airflow] jaketf commented on a change in pull request #6210: [AIRFLOW-5567] BaseAsyncOperator

2019-10-17 Thread GitBox
jaketf commented on a change in pull request #6210: [AIRFLOW-5567] 
BaseAsyncOperator
URL: https://github.com/apache/airflow/pull/6210#discussion_r336285919
 
 

 ##
 File path: airflow/models/base_async_operator.py
 ##
 @@ -0,0 +1,161 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Base Asynchronous Operator for kicking off a long running
+operations and polling for completion with reschedule mode.
+"""
+
+from abc import abstractmethod
+from typing import Dict, List, Optional, Union
+
+from airflow.models import SkipMixin, TaskReschedule
+from airflow.models.xcom import XCOM_EXTERNAL_RESOURCE_ID_KEY
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
 
 Review comment:
   What kind of consensus should we build on that kind of change for XCom in 
reschedule scenarios?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] jaketf commented on a change in pull request #6210: [AIRFLOW-5567] BaseAsyncOperator

2019-10-17 Thread GitBox
jaketf commented on a change in pull request #6210: [AIRFLOW-5567] 
BaseAsyncOperator
URL: https://github.com/apache/airflow/pull/6210#discussion_r336285752
 
 

 ##
 File path: airflow/models/base_async_operator.py
 ##
 @@ -0,0 +1,161 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Base Asynchronous Operator for kicking off a long running
+operations and polling for completion with reschedule mode.
+"""
+
+from abc import abstractmethod
+from typing import Dict, List, Optional, Union
+
+from airflow.models import SkipMixin, TaskReschedule
+from airflow.models.xcom import XCOM_EXTERNAL_RESOURCE_ID_KEY
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
 
 Review comment:
   It seems to me we definitely want to wipe XComs at the end of a DagRun as to 
not affect a re run of this dag for this execution date.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services