[GitHub] [airflow] dstandish commented on a change in pull request #6210: [AIRFLOW-5567] BaseAsyncOperator

2019-10-17 Thread GitBox
dstandish commented on a change in pull request #6210: [AIRFLOW-5567] 
BaseAsyncOperator
URL: https://github.com/apache/airflow/pull/6210#discussion_r336287664
 
 

 ##
 File path: airflow/models/base_async_operator.py
 ##
 @@ -0,0 +1,161 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Base Asynchronous Operator for kicking off a long running
+operations and polling for completion with reschedule mode.
+"""
+
+from abc import abstractmethod
+from typing import Dict, List, Optional, Union
+
+from airflow.models import SkipMixin, TaskReschedule
+from airflow.models.xcom import XCOM_EXTERNAL_RESOURCE_ID_KEY
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
 
 Review comment:
   > It seems to me we definitely want to wipe XComs at the end of a DagRun as 
to not affect a re run of this dag for this execution date.
   
   if this is your concern, xcom can be cleared at start if dag is re-run. 
(indeed, that's how it works now, which is a problem for this PR)
   
   probably makes sense to find some way to clear xcom when a task instance 
re-run but _not_ merely after a reschedule/poke.  then we can leave the xcom 
value in place at end of run, _and_ it is safe to re-run, and everybody's happy?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] dstandish commented on a change in pull request #6210: [AIRFLOW-5567] BaseAsyncOperator

2019-10-16 Thread GitBox
dstandish commented on a change in pull request #6210: [AIRFLOW-5567] 
BaseAsyncOperator
URL: https://github.com/apache/airflow/pull/6210#discussion_r335593800
 
 

 ##
 File path: airflow/models/base_async_operator.py
 ##
 @@ -0,0 +1,161 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Base Asynchronous Operator for kicking off a long running
+operations and polling for completion with reschedule mode.
+"""
+
+from abc import abstractmethod
+from typing import Dict, List, Optional, Union
+
+from airflow.models import SkipMixin, TaskReschedule
+from airflow.models.xcom import XCOM_EXTERNAL_RESOURCE_ID_KEY
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
 
 Review comment:
   OK I think I may have found an issue.
   
   In live testing it appeared XCOM was not recorded.  In actuality, the 
problem is XCOM is cleared when task restarts after reschedule.  So after first 
rescheduled poke, the xcom is obliterated, and resource id is lost.
   
   XCOM data appears to be cleared at start of each task.  [See 
here](https://github.com/apache/airflow/blob/1a4c16432b8718189269e63c3afcd2709eed7379/airflow/models/taskinstance.py#L913).
   
   So when task restarts after reschedule, we lose the resource id.
   
   Probably a similar explanation for invisible logs issue i commented on 
earlier.
   
   Here's my sample operator:
   ```
   class Op(BaseAsyncOperator):
   
   def submit_request(self, context: Dict) -> str:
   return '129uh8981h9u80eh'
   
   def poke(self, context):
   ti = context['ti']
   print(f"try_number: {ti.try_number}")
   
   for k, v in context.items():
   print(f"{k}: {v}")
   
   print("\n sleeping")
   import time
   time.sleep(60)
   
   return False
   ```
   
   Not sure what is best way to resolve.
   
   It's also curious that state goes initially to `up_for_reschedule` before 
later becoming `up_for_retry` I am not sure why that is but I have not used 
sensors / rescheduling before...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] dstandish commented on a change in pull request #6210: [AIRFLOW-5567] BaseAsyncOperator

2019-10-16 Thread GitBox
dstandish commented on a change in pull request #6210: [AIRFLOW-5567] 
BaseAsyncOperator
URL: https://github.com/apache/airflow/pull/6210#discussion_r335593800
 
 

 ##
 File path: airflow/models/base_async_operator.py
 ##
 @@ -0,0 +1,161 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Base Asynchronous Operator for kicking off a long running
+operations and polling for completion with reschedule mode.
+"""
+
+from abc import abstractmethod
+from typing import Dict, List, Optional, Union
+
+from airflow.models import SkipMixin, TaskReschedule
+from airflow.models.xcom import XCOM_EXTERNAL_RESOURCE_ID_KEY
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
 
 Review comment:
   OK I think I may have found an issue.
   
   In live testing it appeared XCOM was not recorded.  In actuality, the 
problem is XCOM is cleared when task restarts after reschedule.  So after first 
rescheduled poke, the xcom is obliterated, and resource id is lost.
   
   XCOM data appears to be cleared at start of each task.  [See 
here](https://github.com/apache/airflow/blob/1a4c16432b8718189269e63c3afcd2709eed7379/airflow/models/taskinstance.py#L913).
   
   So when task restarts after reschedule, we lose the resource id.
   
   Probably a similar explanation for invisible logs issue i commented on 
earlier.
   
   Here's my sample operator:
   ```
   class Op(BaseAsyncOperator):
   
   def submit_request(self, context: Dict) -> str:
   return '129uh8981h9u80eh'
   
   def poke(self, context):
   ti = context['ti']
   print(f"try_number: {ti.try_number}")
   
   for k, v in context.items():
   print(f"{k}: {v}")
   
   print("\n sleeping")
   import time
   time.sleep(60)
   
   return False
   ```
   
   Not sure what is best way to resolve.
   
   It's also curious that state goes initially to `Up for reschedule` before 
later becoming `Waiting for retry` I am not sure why that is but I have not 
used sensors / rescheduling before...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] dstandish commented on a change in pull request #6210: [AIRFLOW-5567] BaseAsyncOperator

2019-10-16 Thread GitBox
dstandish commented on a change in pull request #6210: [AIRFLOW-5567] 
BaseAsyncOperator
URL: https://github.com/apache/airflow/pull/6210#discussion_r335593800
 
 

 ##
 File path: airflow/models/base_async_operator.py
 ##
 @@ -0,0 +1,161 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Base Asynchronous Operator for kicking off a long running
+operations and polling for completion with reschedule mode.
+"""
+
+from abc import abstractmethod
+from typing import Dict, List, Optional, Union
+
+from airflow.models import SkipMixin, TaskReschedule
+from airflow.models.xcom import XCOM_EXTERNAL_RESOURCE_ID_KEY
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
 
 Review comment:
   OK I think I may have found an issue.
   
   In live testing it appeared XCOM was not recorded.  In actuality, the 
problem is XCOM is cleared when task restarts after reschedule.  So after first 
rescheduled poke, the xcom is obliterated, and resource id is lost.
   
   XCOM data appears to be cleared at start of each task.  [See 
here](https://github.com/apache/airflow/blob/1a4c16432b8718189269e63c3afcd2709eed7379/airflow/models/taskinstance.py#L913).
   
   So when task restarts after reschedule, we lose the resource id.
   
   Probably a similar explanation for invisible logs issue i commented on 
earlier.
   
   Here's my sample operator:
   ```
   class Op(BaseAsyncOperator):
   
   def submit_request(self, context: Dict) -> str:
   return '129uh8981h9u80eh'
   
   def poke(self, context):
   ti = context['ti']
   print(f"try_number: {ti.try_number}")
   
   for k, v in context.items():
   print(f"{k}: {v}")
   
   print("\n sleeping")
   import time
   time.sleep(60)
   
   return False
   ```
   
   Not sure what is best way to resolve.
   
   It's also curious that state goes initially to `Up for reschedule` before 
later becoming `Up for retry` I am not sure why that is but I have not used 
sensors / rescheduling before...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] dstandish commented on a change in pull request #6210: [AIRFLOW-5567] BaseAsyncOperator

2019-10-16 Thread GitBox
dstandish commented on a change in pull request #6210: [AIRFLOW-5567] 
BaseAsyncOperator
URL: https://github.com/apache/airflow/pull/6210#discussion_r335593800
 
 

 ##
 File path: airflow/models/base_async_operator.py
 ##
 @@ -0,0 +1,161 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Base Asynchronous Operator for kicking off a long running
+operations and polling for completion with reschedule mode.
+"""
+
+from abc import abstractmethod
+from typing import Dict, List, Optional, Union
+
+from airflow.models import SkipMixin, TaskReschedule
+from airflow.models.xcom import XCOM_EXTERNAL_RESOURCE_ID_KEY
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
 
 Review comment:
   OK I think I may have found an issue.
   
   In live testing it appeared XCOM was not recorded.  In actuality, the 
problem is XCOM is cleared when task restarts after reschedule.  So after first 
rescheduled poke, the xcom is obliterated, and resource id is lost.
   
   XCOM data appears to be cleared at start of each task.  [See 
here](https://github.com/apache/airflow/blob/1a4c16432b8718189269e63c3afcd2709eed7379/airflow/models/taskinstance.py#L913).
   
   So when task restarts after reschedule, we lose the resource id.
   
   Probably a similar explanation for invisible logs issue i commented on 
earlier.
   
   Here's my sample operator:
   ```
   class Op(BaseAsyncOperator):
   
   def submit_request(self, context: Dict) -> str:
   return '129uh8981h9u80eh'
   
   def poke(self, context):
   ti = context['ti']
   print(f"try_number: {ti.try_number}")
   
   for k, v in context.items():
   print(f"{k}: {v}")
   
   print("\n sleeping")
   import time
   time.sleep(60)
   
   return False
   ```
   
   Not sure what is best way to resolve.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] dstandish commented on a change in pull request #6210: [AIRFLOW-5567] BaseAsyncOperator

2019-10-16 Thread GitBox
dstandish commented on a change in pull request #6210: [AIRFLOW-5567] 
BaseAsyncOperator
URL: https://github.com/apache/airflow/pull/6210#discussion_r335593800
 
 

 ##
 File path: airflow/models/base_async_operator.py
 ##
 @@ -0,0 +1,161 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Base Asynchronous Operator for kicking off a long running
+operations and polling for completion with reschedule mode.
+"""
+
+from abc import abstractmethod
+from typing import Dict, List, Optional, Union
+
+from airflow.models import SkipMixin, TaskReschedule
+from airflow.models.xcom import XCOM_EXTERNAL_RESOURCE_ID_KEY
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
 
 Review comment:
   OK I think I may have found an issue.
   
   In live testing it appeared XCOM was not recorded.  In actuality, the 
problem is XCOM is cleared when task restarts after reschedule.  So after first 
rescheduled poke, the xcom is obliterated, and resource id is lost.
   
   XCOM data appears to be cleared at start of each task.  [See 
here](https://github.com/apache/airflow/blob/1a4c16432b8718189269e63c3afcd2709eed7379/airflow/models/taskinstance.py#L913).
   
   So during when task restarts after reschedule, we lose the resource id.
   
   Probably a similar explanation for invisible logs issue i commented on 
earlier.
   
   Here's my sample operator:
   ```
   class Op(BaseAsyncOperator):
   
   def submit_request(self, context: Dict) -> str:
   return '129uh8981h9u80eh'
   
   def poke(self, context):
   ti = context['ti']
   print(f"try_number: {ti.try_number}")
   
   for k, v in context.items():
   print(f"{k}: {v}")
   
   print("\n sleeping")
   import time
   time.sleep(60)
   
   return False
   ```
   
   Not sure what is best way to resolve.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] dstandish commented on a change in pull request #6210: [AIRFLOW-5567] BaseAsyncOperator

2019-10-16 Thread GitBox
dstandish commented on a change in pull request #6210: [AIRFLOW-5567] 
BaseAsyncOperator
URL: https://github.com/apache/airflow/pull/6210#discussion_r335593800
 
 

 ##
 File path: airflow/models/base_async_operator.py
 ##
 @@ -0,0 +1,161 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Base Asynchronous Operator for kicking off a long running
+operations and polling for completion with reschedule mode.
+"""
+
+from abc import abstractmethod
+from typing import Dict, List, Optional, Union
+
+from airflow.models import SkipMixin, TaskReschedule
+from airflow.models.xcom import XCOM_EXTERNAL_RESOURCE_ID_KEY
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
 
 Review comment:
   OK I think I may have found an issue.
   
   In live testing it appeared XCOM was not recorded.  In actuality, the 
problem is XCOM is cleared when task restarts after reschedule.  So after first 
rescheduled poke, the xcom is obliterated.
   
   XCOM data appears to be cleared at start of each task.  [See 
here](https://github.com/apache/airflow/blob/1a4c16432b8718189269e63c3afcd2709eed7379/airflow/models/taskinstance.py#L913).
   
   So during when task restarts after reschedule, we lose the resource id.
   
   Probably a similar explanation for invisible logs issue i commented on 
earlier.
   
   Here's my sample operator:
   ```
   class Op(BaseAsyncOperator):
   
   def submit_request(self, context: Dict) -> str:
   return '129uh8981h9u80eh'
   
   def poke(self, context):
   ti = context['ti']
   print(f"try_number: {ti.try_number}")
   
   for k, v in context.items():
   print(f"{k}: {v}")
   
   print("\n sleeping")
   import time
   time.sleep(60)
   
   return False
   ```
   
   Not sure what is best way to resolve.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] dstandish commented on a change in pull request #6210: [AIRFLOW-5567] BaseAsyncOperator

2019-10-16 Thread GitBox
dstandish commented on a change in pull request #6210: [AIRFLOW-5567] 
BaseAsyncOperator
URL: https://github.com/apache/airflow/pull/6210#discussion_r335593800
 
 

 ##
 File path: airflow/models/base_async_operator.py
 ##
 @@ -0,0 +1,161 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Base Asynchronous Operator for kicking off a long running
+operations and polling for completion with reschedule mode.
+"""
+
+from abc import abstractmethod
+from typing import Dict, List, Optional, Union
+
+from airflow.models import SkipMixin, TaskReschedule
+from airflow.models.xcom import XCOM_EXTERNAL_RESOURCE_ID_KEY
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
 
 Review comment:
   OK I think I may have found an issue.
   
   In live testing it appeared XCOM was not recorded.  In actuality, the 
problem is XCOM is cleared when task restarts after reschedule.  
   
   XCOM data appears to be cleared at start of each task.  [See 
here](https://github.com/apache/airflow/blob/1a4c16432b8718189269e63c3afcd2709eed7379/airflow/models/taskinstance.py#L913).
   
   So during when task restarts after reschedule, we lose the resource id.
   
   Probably a similar explanation for invisible logs issue i commented on 
earlier.
   
   Here's my sample operator:
   ```
   class Op(BaseAsyncOperator):
   
   def submit_request(self, context: Dict) -> str:
   return '129uh8981h9u80eh'
   
   def poke(self, context):
   ti = context['ti']
   print(f"try_number: {ti.try_number}")
   
   for k, v in context.items():
   print(f"{k}: {v}")
   
   print("\n sleeping")
   import time
   time.sleep(60)
   
   return False
   ```
   
   Not sure what is best way to resolve.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] dstandish commented on a change in pull request #6210: [AIRFLOW-5567] BaseAsyncOperator

2019-10-16 Thread GitBox
dstandish commented on a change in pull request #6210: [AIRFLOW-5567] 
BaseAsyncOperator
URL: https://github.com/apache/airflow/pull/6210#discussion_r335593800
 
 

 ##
 File path: airflow/models/base_async_operator.py
 ##
 @@ -0,0 +1,161 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Base Asynchronous Operator for kicking off a long running
+operations and polling for completion with reschedule mode.
+"""
+
+from abc import abstractmethod
+from typing import Dict, List, Optional, Union
+
+from airflow.models import SkipMixin, TaskReschedule
+from airflow.models.xcom import XCOM_EXTERNAL_RESOURCE_ID_KEY
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
 
 Review comment:
   OK I think I may have found an issue.
   
   In live testing it appeared XCOM was not recorded.  In actuality, the 
problem is XCOM is cleared when task restarts after reschedule.  
   
   XCOM data appears to be cleared at start of each task.  [See 
here](https://github.com/apache/airflow/blob/1a4c16432b8718189269e63c3afcd2709eed7379/airflow/models/taskinstance.py#L913).
   
   So during when task restarts after reschedule, we lose the resource id.
   
   Probably a similar explanation for invisible logs issue i commented on 
earlier.
   
   Here's my sample operator:
   ```
   class Op(BaseAsyncOperator):
   
   def submit_request(self, context: Dict) -> str:
   return '129uh8981h9u80eh'
   
   def poke(self, context):
   ti = context['ti']
   print(f"try_number: {ti.try_number}")
   
   for k, v in context.items():
   print(f"{k}: {v}")
   
   print("\n sleeping")
   import time
   time.sleep(60)
   
   return False
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] dstandish commented on a change in pull request #6210: [AIRFLOW-5567] BaseAsyncOperator

2019-10-16 Thread GitBox
dstandish commented on a change in pull request #6210: [AIRFLOW-5567] 
BaseAsyncOperator
URL: https://github.com/apache/airflow/pull/6210#discussion_r335587922
 
 

 ##
 File path: airflow/models/base_async_operator.py
 ##
 @@ -0,0 +1,161 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Base Asynchronous Operator for kicking off a long running
+operations and polling for completion with reschedule mode.
+"""
+
+from abc import abstractmethod
+from typing import Dict, List, Optional, Union
+
+from airflow.models import SkipMixin, TaskReschedule
+from airflow.models.xcom import XCOM_EXTERNAL_RESOURCE_ID_KEY
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class BaseAsyncOperator(BaseSensorOperator, SkipMixin):
 
 Review comment:
   is there a need to reference SkipMixin given that BaseSensor already extends 
this class?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] dstandish commented on a change in pull request #6210: [AIRFLOW-5567] BaseAsyncOperator

2019-10-16 Thread GitBox
dstandish commented on a change in pull request #6210: [AIRFLOW-5567] 
BaseAsyncOperator
URL: https://github.com/apache/airflow/pull/6210#discussion_r335589712
 
 

 ##
 File path: airflow/models/base_async_operator.py
 ##
 @@ -0,0 +1,161 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Base Asynchronous Operator for kicking off a long running
+operations and polling for completion with reschedule mode.
+"""
+
+from abc import abstractmethod
+from typing import Dict, List, Optional, Union
+
+from airflow.models import SkipMixin, TaskReschedule
+from airflow.models.xcom import XCOM_EXTERNAL_RESOURCE_ID_KEY
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class BaseAsyncOperator(BaseSensorOperator, SkipMixin):
+"""
+AsyncOperators are derived from this class and inherit these attributes.
+AsyncOperators should be used for long running operations where the task
+can tolerate a longer poke interval. They use the task rescheduling
+mechanism similar to sensors to avoid occupying a worker slot between
+pokes.
+
+Developing concrete operators that provide parameterized flexibility
+for synchronous or asynchronous poking depending on the invocation is
+possible by programing against this `BaseAsyncOperator` interface,
+and overriding the execute method as demonstrated below.
+
+```python3
+class DummyFlexiblePokingOperator(BaseAsyncOperator):
+  def __init__(self, async=False, *args, **kwargs):
+self.async = async
+super().__init(*args, **kwargs)
+
+  def execute(self, context: Dict) -> None:
+if self.async:
+  # use the BaseAsyncOperator's execute
+  super().execute(context)
+else:
+  self.submit_request(context)
+  while not self.poke():
+time.sleep(self.poke_interval)
+  self.process_results(context)
+
+  def sumbit_request(self, context: Dict) -> Optional[str]:
+return None
+
+  def poke(self, context: Dict) -> bool:
+return bool(random.getrandbits(1))
+```
+
+AsyncOperators must override the following methods:
+:py:meth:`submit_request`: fire a request for a long running operation
+:py:meth:`poke`: a method to check if the long running operation is
+complete it should return True when a success criteria is met.
+
+Optionally, AsyncOperators can override:
+:py:meth: `process_result` to perform any operations after the success
+criteria is met in :py:meth: `poke`
+
+:py:meth: `poke` is executed at a time interval and succeed when a
+criteria is met and fail if and when they time out.
+
+:param soft_fail: Set to true to mark the task as SKIPPED on failure
+:type soft_fail: bool
+:param poke_interval: Time in seconds that the job should wait in
+between each tries
+:type poke_interval: int
+:param timeout: Time, in seconds before the task times out and fails.
+:type timeout: int
+
+"""
+ui_color = '#9933ff'  # type: str
+
+@apply_defaults
+def __init__(self,
+ *args,
+ **kwargs) -> None:
+super().__init__(mode='reschedule', *args, **kwargs)
+
+@abstractmethod
 
 Review comment:
   should we extend ABC in this class?  maybe there is a reason we don't do 
this?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] dstandish commented on a change in pull request #6210: [AIRFLOW-5567] BaseAsyncOperator

2019-10-16 Thread GitBox
dstandish commented on a change in pull request #6210: [AIRFLOW-5567] 
BaseAsyncOperator
URL: https://github.com/apache/airflow/pull/6210#discussion_r335581468
 
 

 ##
 File path: airflow/models/base_async_operator.py
 ##
 @@ -0,0 +1,161 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Base Asynchronous Operator for kicking off a long running
+operations and polling for completion with reschedule mode.
+"""
+
+from abc import abstractmethod
+from typing import Dict, List, Optional, Union
+
+from airflow.models import SkipMixin, TaskReschedule
+from airflow.models.xcom import XCOM_EXTERNAL_RESOURCE_ID_KEY
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class BaseAsyncOperator(BaseSensorOperator, SkipMixin):
+"""
+AsyncOperators are derived from this class and inherit these attributes.
+AsyncOperators should be used for long running operations where the task
+can tolerate a longer poke interval. They use the task rescheduling
+mechanism similar to sensors to avoid occupying a worker slot between
+pokes.
+
+Developing concrete operators that provide parameterized flexibility
+for synchronous or asynchronous poking depending on the invocation is
+possible by programing against this `BaseAsyncOperator` interface,
+and overriding the execute method as demonstrated below.
+
+```python3
+class DummyFlexiblePokingOperator(BaseAsyncOperator):
+  def __init__(self, async=False, *args, **kwargs):
+self.async = async
+super().__init(*args, **kwargs)
+
+  def execute(self, context: Dict) -> None:
+if self.async:
+  # use the BaseAsyncOperator's execute
+  super().execute(context)
+else:
+  self.submit_request(context)
+  while not self.poke():
+time.sleep(self.poke_interval)
+  self.process_results(context)
+
+  def sumbit_request(self, context: Dict) -> Optional[str]:
+return None
+
+  def poke(self, context: Dict) -> bool:
+return bool(random.getrandbits(1))
+```
+
+AsyncOperators must override the following methods:
+:py:meth:`submit_request`: fire a request for a long running operation
+:py:meth:`poke`: a method to check if the long running operation is
+complete it should return True when a success criteria is met.
+
+Optionally, AsyncOperators can override:
+:py:meth: `process_result` to perform any operations after the success
+criteria is met in :py:meth: `poke`
+
+:py:meth: `poke` is executed at a time interval and succeed when a
+criteria is met and fail if and when they time out.
+
+:param soft_fail: Set to true to mark the task as SKIPPED on failure
+:type soft_fail: bool
+:param poke_interval: Time in seconds that the job should wait in
+between each tries
+:type poke_interval: int
+:param timeout: Time, in seconds before the task times out and fails.
+:type timeout: int
+
+"""
+ui_color = '#9933ff'  # type: str
+
+@apply_defaults
+def __init__(self,
+ *args,
+ **kwargs) -> None:
+super().__init__(mode='reschedule', *args, **kwargs)
+
+@abstractmethod
+def submit_request(self, context: Dict) -> Optional[Union[str, List, 
Dict]]:
+"""
+This method should kick off a long running operation.
+This method should return the ID for the long running operation if
+applicable.
+Context is the same dictionary used as when rendering jinja templates.
+
+Refer to get_template_context for more context.
+
+:returns: a resource_id for the long running operation.
+:rtype: Optional[Union[String, List, Dict]]
+"""
+raise NotImplementedError
+
+def process_result(self, context: Dict):
+"""
+This method can optionally be overriden to process the result of a 
long running operation.
+Context is the same dictionary used as when rendering jinja templates.
+
+Refer to get_template_context for more context.
+"""
+self.log.info('Using 

[GitHub] [airflow] dstandish commented on a change in pull request #6210: [AIRFLOW-5567] BaseAsyncOperator

2019-10-16 Thread GitBox
dstandish commented on a change in pull request #6210: [AIRFLOW-5567] 
BaseAsyncOperator
URL: https://github.com/apache/airflow/pull/6210#discussion_r335581468
 
 

 ##
 File path: airflow/models/base_async_operator.py
 ##
 @@ -0,0 +1,161 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Base Asynchronous Operator for kicking off a long running
+operations and polling for completion with reschedule mode.
+"""
+
+from abc import abstractmethod
+from typing import Dict, List, Optional, Union
+
+from airflow.models import SkipMixin, TaskReschedule
+from airflow.models.xcom import XCOM_EXTERNAL_RESOURCE_ID_KEY
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class BaseAsyncOperator(BaseSensorOperator, SkipMixin):
+"""
+AsyncOperators are derived from this class and inherit these attributes.
+AsyncOperators should be used for long running operations where the task
+can tolerate a longer poke interval. They use the task rescheduling
+mechanism similar to sensors to avoid occupying a worker slot between
+pokes.
+
+Developing concrete operators that provide parameterized flexibility
+for synchronous or asynchronous poking depending on the invocation is
+possible by programing against this `BaseAsyncOperator` interface,
+and overriding the execute method as demonstrated below.
+
+```python3
+class DummyFlexiblePokingOperator(BaseAsyncOperator):
+  def __init__(self, async=False, *args, **kwargs):
+self.async = async
+super().__init(*args, **kwargs)
+
+  def execute(self, context: Dict) -> None:
+if self.async:
+  # use the BaseAsyncOperator's execute
+  super().execute(context)
+else:
+  self.submit_request(context)
+  while not self.poke():
+time.sleep(self.poke_interval)
+  self.process_results(context)
+
+  def sumbit_request(self, context: Dict) -> Optional[str]:
+return None
+
+  def poke(self, context: Dict) -> bool:
+return bool(random.getrandbits(1))
+```
+
+AsyncOperators must override the following methods:
+:py:meth:`submit_request`: fire a request for a long running operation
+:py:meth:`poke`: a method to check if the long running operation is
+complete it should return True when a success criteria is met.
+
+Optionally, AsyncOperators can override:
+:py:meth: `process_result` to perform any operations after the success
+criteria is met in :py:meth: `poke`
+
+:py:meth: `poke` is executed at a time interval and succeed when a
+criteria is met and fail if and when they time out.
+
+:param soft_fail: Set to true to mark the task as SKIPPED on failure
+:type soft_fail: bool
+:param poke_interval: Time in seconds that the job should wait in
+between each tries
+:type poke_interval: int
+:param timeout: Time, in seconds before the task times out and fails.
+:type timeout: int
+
+"""
+ui_color = '#9933ff'  # type: str
+
+@apply_defaults
+def __init__(self,
+ *args,
+ **kwargs) -> None:
+super().__init__(mode='reschedule', *args, **kwargs)
+
+@abstractmethod
+def submit_request(self, context: Dict) -> Optional[Union[str, List, 
Dict]]:
+"""
+This method should kick off a long running operation.
+This method should return the ID for the long running operation if
+applicable.
+Context is the same dictionary used as when rendering jinja templates.
+
+Refer to get_template_context for more context.
+
+:returns: a resource_id for the long running operation.
+:rtype: Optional[Union[String, List, Dict]]
+"""
+raise NotImplementedError
+
+def process_result(self, context: Dict):
+"""
+This method can optionally be overriden to process the result of a 
long running operation.
+Context is the same dictionary used as when rendering jinja templates.
+
+Refer to get_template_context for more context.
+"""
+self.log.info('Using