[jira] [Assigned] (AIRFLOW-6778) Add a DAGs PVC Mount Point Option for Workers under Kubernetes Executor

2020-09-04 Thread Bjorn Olsen (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-6778?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bjorn Olsen reassigned AIRFLOW-6778:


Assignee: Daniel Imberman  (was: Bjorn Olsen)

> Add a DAGs PVC Mount Point Option for Workers under Kubernetes Executor
> ---
>
> Key: AIRFLOW-6778
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6778
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: executor-kubernetes, worker
>Affects Versions: 1.10.6, 1.10.7, 1.10.8, 1.10.9
>Reporter: Brandon Willard
>Assignee: Daniel Imberman
>Priority: Blocker
>  Labels: kubernetes, options
>
> The worker pods generated by the Kubernetes Executor force the DAGs PVC to be 
> mounted at the Airflow DAGs folder.  This, combined with a general inability 
> to specify arbitrary PVCs on workers (see AIRFLOW-3126 and the 
> linked/duplicated issues), severely constrains the usability of worker pods 
> and the Kubernetes Executor as a whole.
>  
> For example, if a DAGs-containing PVC is rooted at a Python package (e.g. 
> {{package/}}) that needs to be installed on each worker (e.g. DAGs in 
> {{package/dags/}}, package install point at {{package/setup.py}}, and Airflow 
> DAGs location {{/airflow/dags}}), then the current static mount point logic 
> will only allow a worker to directly mount the entire package into the 
> Airflow DAGs location  —  while the actual DAGs are in a subdirectory — or 
> exclusively mount the package's sub-path {{package/dags}} (using the existing 
> {{kubernetes.dags_volume_subpath}} config option).  While the latter is at 
> least correct, it completely foregoes the required parent directory and it 
> makes the requisite package unavailable for installation (e.g. the files 
> under {{package/}} are not available).
>  
> -In general, the only approach that seems to work for the Kubernetes Executor 
> is to specify a worker image with all DAG dependencies pre-loaded, which 
> largely voids the usefulness of a single DAGs PVC that can be dynamically 
> updated.  At best, one can include a {{requirements.txt}} in the PVC and use 
> it in tandem with an entry-point script built into the image, but that still 
> doesn't help with source installations of custom packages stored and updated 
> in a PVC.-
> Edit: This isn't even possible, because worker pods are created using [the 
> {{command}} field instead of 
> {{args}}|https://kubernetes.io/docs/tasks/inject-data-application/define-command-argument-container/#notes]!
>  
> A quick fix for this situation is to allow one to specify the DAGs PVC mount 
> point.  With this option, one can mount the PVC anywhere and specify an 
> Airflow DAGs location that works in conjunction with the mount point (e.g. 
> mount the PVC at {{/airflow/package}} and independently set the Airflow DAGs 
> location to {{/airflow/package/dags}}).  This option would — in many cases — 
> obviate the need for the marginally useful {{kubernetes.dags_volume_subpath}} 
> options, as well.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (AIRFLOW-6778) Add a DAGs PVC Mount Point Option for Workers under Kubernetes Executor

2020-09-04 Thread Bjorn Olsen (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-6778?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bjorn Olsen reassigned AIRFLOW-6778:


Assignee: Bjorn Olsen

> Add a DAGs PVC Mount Point Option for Workers under Kubernetes Executor
> ---
>
> Key: AIRFLOW-6778
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6778
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: executor-kubernetes, worker
>Affects Versions: 1.10.6, 1.10.7, 1.10.8, 1.10.9
>Reporter: Brandon Willard
>Assignee: Bjorn Olsen
>Priority: Blocker
>  Labels: kubernetes, options
>
> The worker pods generated by the Kubernetes Executor force the DAGs PVC to be 
> mounted at the Airflow DAGs folder.  This, combined with a general inability 
> to specify arbitrary PVCs on workers (see AIRFLOW-3126 and the 
> linked/duplicated issues), severely constrains the usability of worker pods 
> and the Kubernetes Executor as a whole.
>  
> For example, if a DAGs-containing PVC is rooted at a Python package (e.g. 
> {{package/}}) that needs to be installed on each worker (e.g. DAGs in 
> {{package/dags/}}, package install point at {{package/setup.py}}, and Airflow 
> DAGs location {{/airflow/dags}}), then the current static mount point logic 
> will only allow a worker to directly mount the entire package into the 
> Airflow DAGs location  —  while the actual DAGs are in a subdirectory — or 
> exclusively mount the package's sub-path {{package/dags}} (using the existing 
> {{kubernetes.dags_volume_subpath}} config option).  While the latter is at 
> least correct, it completely foregoes the required parent directory and it 
> makes the requisite package unavailable for installation (e.g. the files 
> under {{package/}} are not available).
>  
> -In general, the only approach that seems to work for the Kubernetes Executor 
> is to specify a worker image with all DAG dependencies pre-loaded, which 
> largely voids the usefulness of a single DAGs PVC that can be dynamically 
> updated.  At best, one can include a {{requirements.txt}} in the PVC and use 
> it in tandem with an entry-point script built into the image, but that still 
> doesn't help with source installations of custom packages stored and updated 
> in a PVC.-
> Edit: This isn't even possible, because worker pods are created using [the 
> {{command}} field instead of 
> {{args}}|https://kubernetes.io/docs/tasks/inject-data-application/define-command-argument-container/#notes]!
>  
> A quick fix for this situation is to allow one to specify the DAGs PVC mount 
> point.  With this option, one can mount the PVC anywhere and specify an 
> Airflow DAGs location that works in conjunction with the mount point (e.g. 
> mount the PVC at {{/airflow/package}} and independently set the Airflow DAGs 
> location to {{/airflow/package/dags}}).  This option would — in many cases — 
> obviate the need for the marginally useful {{kubernetes.dags_volume_subpath}} 
> options, as well.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (AIRFLOW-6975) Base AWSHook AssumeRoleWithSAML

2020-03-02 Thread Bjorn Olsen (Jira)
Bjorn Olsen created AIRFLOW-6975:


 Summary: Base AWSHook AssumeRoleWithSAML
 Key: AIRFLOW-6975
 URL: https://issues.apache.org/jira/browse/AIRFLOW-6975
 Project: Apache Airflow
  Issue Type: Improvement
  Components: aws
Affects Versions: 1.10.9
Reporter: Bjorn Olsen
Assignee: Bjorn Olsen


Base AWS Hook currently does AssumeRole but we require it to additionally be 
able to do AssumeRoleWithSAML.

+Current+

[https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_request.html#api_assumerole]

The AssumeRole API operation is useful for allowing existing IAM users to 
access AWS resources that they don't already have access to.

(This requires an AWS IAM user)

+Proposed addition+

[https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_request.html#api_assumerolewithsaml]

The AssumeRoleWithSAML API operation returns a set of temporary security 
credentials for federated users who are authenticated by your organization's 
existing identity system.

(This allows federated login using another IDP rather than requiring an AWS IAM 
user).

 

+Use case+

We need to be able to authenticate an AD user against our IDP (Windows Active 
Directory).

We can obtain a SAML assertion from our IDP, and then provide it to AWS STS to 
exchange it for AWS temporary credentials, thus authorising us to use AWS 
services. 

The AWS AssumeRoleWithSAML API is intended for this use case, and the Base AWS 
Hook should be updated to allow for this method of authentication.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (AIRFLOW-6944) Allow AWS DataSync to "catch up" when Task is already running

2020-02-27 Thread Bjorn Olsen (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-6944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bjorn Olsen updated AIRFLOW-6944:
-
Description: 
Current AWS DataSyncOperator attempts to start running a DataSync Task, with no 
regard / check for whether the task is already running or not. This attempt 
will fail (correctly).

It is useful to have capability to optionally allow the operator to "catch up" 
instead of starting the Task - if the Task is of a particular status eg 
'QUEUED' then we might want to wait for the currently Queued one to complete, 
instead of failing or instead of submitting another one (and snowballing). 
 For example, this scenario can happen if the task was previously submitted but 
the Airflow Operator timed out waiting for it, when DataSync is busy.

Or, maybe we want to wait for the Queued task to complete and then submit 
another Task anyway...

Allowing the user some options for starting the Task depending on Status, 
allows for various use cases.

However, the current functionality of "Fail if the Task can't be started" 
should remain default, to prevent unintentional problems which can arise if we 
instead decided to always wait if there is already a task queued. For example 
if the previous task has different Include filters than the new task, then 
logically they aren't the same.

 

  was:
Current AWS DataSyncOperator attempts to start running a DataSync Task, with no 
regard / check for whether the task is already running or not. This attempt 
will fail (correctly).

It is useful to have capability to optionally allow the operator to "catch up" 
instead of starting the Task - if the Task is of a particular status eg 
'QUEUED' then we might want to wait for the currently Queued one to complete, 
instead of failing or instead of submitting another one (and snowballing). 
For example, this scenario can happen if the task was previously submitted but 
the Airflow Operator timed out waiting for it, when DataSync is busy.

Or, maybe we want to wait for the Queued task to complete and then submit 
another Task anyway...

Allowing the user some options in terms of status management allows for various 
use cases.

However, the current functionality of "Fail if the Task can't be started" 
should remain default, to prevent unintentional problems which can arise if we 
instead decided to always wait if there is already a task queued. For example 
if the previous task has different Include filters than the new task, then 
logically they aren't the same.

 


> Allow AWS DataSync to "catch up" when Task is already running
> -
>
> Key: AIRFLOW-6944
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6944
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: aws
>Affects Versions: 1.10.9
>Reporter: Bjorn Olsen
>Assignee: Bjorn Olsen
>Priority: Minor
>
> Current AWS DataSyncOperator attempts to start running a DataSync Task, with 
> no regard / check for whether the task is already running or not. This 
> attempt will fail (correctly).
> It is useful to have capability to optionally allow the operator to "catch 
> up" instead of starting the Task - if the Task is of a particular status eg 
> 'QUEUED' then we might want to wait for the currently Queued one to complete, 
> instead of failing or instead of submitting another one (and snowballing). 
>  For example, this scenario can happen if the task was previously submitted 
> but the Airflow Operator timed out waiting for it, when DataSync is busy.
> Or, maybe we want to wait for the Queued task to complete and then submit 
> another Task anyway...
> Allowing the user some options for starting the Task depending on Status, 
> allows for various use cases.
> However, the current functionality of "Fail if the Task can't be started" 
> should remain default, to prevent unintentional problems which can arise if 
> we instead decided to always wait if there is already a task queued. For 
> example if the previous task has different Include filters than the new task, 
> then logically they aren't the same.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (AIRFLOW-6944) Allow AWS DataSync to "catch up" when Task is already running

2020-02-27 Thread Bjorn Olsen (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-6944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bjorn Olsen updated AIRFLOW-6944:
-
Description: 
Current AWS DataSyncOperator attempts to start running a DataSync Task, with no 
regard / check for whether the task is already running or not. This attempt 
will fail (correctly).

It is useful to have capability to optionally allow the operator to "catch up" 
instead of starting the Task - if the Task is of a particular status eg 
'QUEUED' then we might want to wait for the currently Queued one to complete, 
instead of failing or instead of submitting another one (and snowballing). 
For example, this scenario can happen if the task was previously submitted but 
the Airflow Operator timed out waiting for it, when DataSync is busy.

Or, maybe we want to wait for the Queued task to complete and then submit 
another Task anyway...

Allowing the user some options in terms of status management allows for various 
use cases.

However, the current functionality of "Fail if the Task can't be started" 
should remain default, to prevent unintentional problems which can arise if we 
instead decided to always wait if there is already a task queued. For example 
if the previous task has different Include filters than the new task, then 
logically they aren't the same.

 

  was:
Current AWS DataSyncOperator attempts to start running a DataSync Task, with no 
regard / check for whether the task is already running or not.

It is useful to have capability to optionally allow the operator to "catch up" 
instead of starting the Task - if the Task is of a particular status eg 
'QUEUED' then we might want to just use the currently Queued one instead of 
submitting another one. This scenario can happen if the task was previously 
submitted but the operator timed out waiting for it, for example.

Or, maybe we want to wait for the Queued task to complete and then submit 
another Task ...

Allowing the user some options in terms of status management allows for various 
robustness when submitting new task executions.

However, the current functionality of "Fail if the Task can't be started" 
should remain default, to prevent unintentional problems which can arise if we 
instead decided to always wait if there is already a task queued. For example 
if the previous task has different Include filters than the new task, then 
logically they aren't the same.

 


> Allow AWS DataSync to "catch up" when Task is already running
> -
>
> Key: AIRFLOW-6944
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6944
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: aws
>Affects Versions: 1.10.9
>Reporter: Bjorn Olsen
>Assignee: Bjorn Olsen
>Priority: Minor
>
> Current AWS DataSyncOperator attempts to start running a DataSync Task, with 
> no regard / check for whether the task is already running or not. This 
> attempt will fail (correctly).
> It is useful to have capability to optionally allow the operator to "catch 
> up" instead of starting the Task - if the Task is of a particular status eg 
> 'QUEUED' then we might want to wait for the currently Queued one to complete, 
> instead of failing or instead of submitting another one (and snowballing). 
> For example, this scenario can happen if the task was previously submitted 
> but the Airflow Operator timed out waiting for it, when DataSync is busy.
> Or, maybe we want to wait for the Queued task to complete and then submit 
> another Task anyway...
> Allowing the user some options in terms of status management allows for 
> various use cases.
> However, the current functionality of "Fail if the Task can't be started" 
> should remain default, to prevent unintentional problems which can arise if 
> we instead decided to always wait if there is already a task queued. For 
> example if the previous task has different Include filters than the new task, 
> then logically they aren't the same.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (AIRFLOW-6944) Allow AWS DataSync to "catch up" when Task is already running

2020-02-27 Thread Bjorn Olsen (Jira)
Bjorn Olsen created AIRFLOW-6944:


 Summary: Allow AWS DataSync to "catch up" when Task is already 
running
 Key: AIRFLOW-6944
 URL: https://issues.apache.org/jira/browse/AIRFLOW-6944
 Project: Apache Airflow
  Issue Type: Improvement
  Components: aws
Affects Versions: 1.10.9
Reporter: Bjorn Olsen
Assignee: Bjorn Olsen


Current AWS DataSyncOperator attempts to start running a DataSync Task, with no 
regard / check for whether the task is already running or not.

It is useful to have capability to optionally allow the operator to "catch up" 
instead of starting the Task - if the Task is of a particular status eg 
'QUEUED' then we might want to just use the currently Queued one instead of 
submitting another one. This scenario can happen if the task was previously 
submitted but the operator timed out waiting for it, for example.

Or, maybe we want to wait for the Queued task to complete and then submit 
another Task ...

Allowing the user some options in terms of status management allows for various 
robustness when submitting new task executions.

However, the current functionality of "Fail if the Task can't be started" 
should remain default, to prevent unintentional problems which can arise if we 
instead decided to always wait if there is already a task queued. For example 
if the previous task has different Include filters than the new task, then 
logically they aren't the same.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (AIRFLOW-6824) EMRAddStepsOperator does not work well with multi-step XCom

2020-02-17 Thread Bjorn Olsen (Jira)
Bjorn Olsen created AIRFLOW-6824:


 Summary: EMRAddStepsOperator does not work well with multi-step 
XCom
 Key: AIRFLOW-6824
 URL: https://issues.apache.org/jira/browse/AIRFLOW-6824
 Project: Apache Airflow
  Issue Type: Bug
  Components: aws
Affects Versions: 1.10.9
Reporter: Bjorn Olsen
Assignee: Bjorn Olsen


EmrAddStepsOperator allows you to add several steps to EMR for processing - the 
steps must be supplied as a list.
This works well when passing an actual Python list as the 'steps' value, but we 
want to be able to generate the list of steps from a previous task - using an 
XCom.

We must use the operator as follows, for the templating to work correctly and 
for it to resolve the XCom:

 
{code:java}
add_steps_task = EmrAddStepsOperator(
 task_id='add_steps',
 job_flow_id=job_flow_id,
 aws_conn_id='aws_default',
 provide_context=True,
 steps="{{task_instance.xcom_pull(task_ids='generate_steps')}}"
 ){code}
 

The value in XCom from the 'generate_steps' task looks like (simplified):
{code:java}
[{'Name':'Step1'}, {'Name':'Step2'}]
{code}

However this is passed as a string to the operator, which cannot be passed to 
the underlying boto3 library which expects a list object.

The following won't work either:
{code:java}
add_steps_task = EmrAddStepsOperator(
 task_id='add_steps',
 job_flow_id=job_flow_id,
 aws_conn_id='aws_default',
 provide_context=True,
 steps={{task_instance.xcom_pull(task_ids='generate_steps')}}
 ){code}
Since this is not valid Python.

We have to pass the steps as a string to the operator, and then convert it into 
a list after the render_template_fields has happened (immediately before the 
execute). Therefore the only option is to do the conversion from string to list 
in the operator's execute method.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (AIRFLOW-6822) AWS hooks dont always cache the boto3 client

2020-02-17 Thread Bjorn Olsen (Jira)
Bjorn Olsen created AIRFLOW-6822:


 Summary: AWS hooks dont always cache the boto3 client
 Key: AIRFLOW-6822
 URL: https://issues.apache.org/jira/browse/AIRFLOW-6822
 Project: Apache Airflow
  Issue Type: Bug
  Components: aws
Affects Versions: 1.10.9
Reporter: Bjorn Olsen
Assignee: Bjorn Olsen


Implementation of the Amazon AWS hooks (eg S3 hook, Glue hook etc) varies with 
how they call the underlying aws_hook.get_client_type(X) method.

Most of the time the client that gets returned is cached by the superclass, but 
not always. The client should always be cached for performance reasons - 
creating a client is a time consuming process.

Example of how to do it (athena.py):


 
{code:java}
def get_conn(self): 
"""
    check if aws conn exists already or create one and return it 
    :return: boto3 session
    """
    if not self.conn:
    self.conn = self.get_client_type('athena')
    return self.conn{code}
 
 
Example of how not to do it: (s3.py):
 
{code:java}
def get_conn(self):
 return self.get_client_type('s3'){code}
 
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (AIRFLOW-6672) AWS DataSync - better logging of error message

2020-01-28 Thread Bjorn Olsen (Jira)
Bjorn Olsen created AIRFLOW-6672:


 Summary: AWS DataSync - better logging of error message
 Key: AIRFLOW-6672
 URL: https://issues.apache.org/jira/browse/AIRFLOW-6672
 Project: Apache Airflow
  Issue Type: Improvement
  Components: aws
Affects Versions: 1.10.7
Reporter: Bjorn Olsen
Assignee: Bjorn Olsen


When the AWS DataSync operator fails, it dumps a TaskDescription to the log. 
The TaskDescription is in JSON format and contains several elements. This is 
hard to read to try and see what exactly went wrong.

Example 1:

[2020-01-28 17:44:39,495] \{datasync.py:354} INFO - 
task_execution_description=\{"TaskExecutionArn": 
"arn:aws:datasync:***:***:task/task-***/execution/exec-***", "Status": "ERROR", 
"Options": {"VerifyMode": "ONLY_FILES_TRANSFERRED", "OverwriteMode": "ALWAYS", 
"Atime": "BEST_EFFORT", "Mtime": "PRESERVE", "Uid": "INT_VALUE", "Gid": 
"INT_VALUE", "PreserveDeletedFiles": "PRESERVE", "PreserveDevices": "NONE", 
"PosixPermissions": "PRESERVE", "BytesPerSecond": -1, "TaskQueueing": 
"ENABLED"}, "Excludes": [], "Includes": [\{"FilterType": "SIMPLE_PATTERN", 
"Value": "***"}], "StartTime": datetime.datetime(2020, 1, 28, 17, 36, 2, 
816000, tzinfo=tzlocal()), "EstimatedFilesToTransfer": 7, 
"EstimatedBytesToTransfer": 4534925, "FilesTransferred": 7, "BytesWritten": 
4534925, "BytesTransferred": 4534925, "Result": \{"PrepareDuration": 9795, 
"PrepareStatus": "SUCCESS", "TotalDuration": 351660, "TransferDuration": 
338568, "TransferStatus": "SUCCESS", "VerifyDuration": 7006, "VerifyStatus": 
"ERROR", "ErrorCode": "OpNotSupp", "ErrorDetail": "Operation not supported"}, 
"ResponseMetadata": \{"RequestId": "***", "HTTPStatusCode": 200, "HTTPHeaders": 
{"date": "Tue, 28 Jan 2020 15:44:39 GMT", "content-type": 
"application/x-amz-json-1.1", "content-length": "994", "connection": 
"keep-alive", "x-amzn-requestid": "***"}, "RetryAttempts": 0}}

Example 2:
[2020-01-28 18:23:23,322] \{datasync.py:354} INFO - 
task_execution_description=\{"TaskExecutionArn": 
"arn:aws:datasync:***:***:task/task-***/execution/exec-***", "Status": "ERROR", 
"Options": {"VerifyMode": "ONLY_FILES_TRANSFERRED", "OverwriteMode": "ALWAYS", 
"Atime": "BEST_EFFORT", "Mtime": "PRESERVE", "Uid": "INT_VALUE", "Gid": 
"INT_VALUE", "PreserveDeletedFiles": "PRESERVE", "PreserveDevices": "NONE", 
"PosixPermissions": "PRESERVE", "BytesPerSecond": -1, "TaskQueueing": 
"ENABLED"}, "Excludes": [], "Includes": [\{"FilterType": "SIMPLE_PATTERN", 
"Value": "***"}], "StartTime": datetime.datetime(2020, 1, 28, 17, 45, 57, 
212000, tzinfo=tzlocal()), "EstimatedFilesToTransfer": 0, 
"EstimatedBytesToTransfer": 0, "FilesTransferred": 0, "BytesWritten": 0, 
"BytesTransferred": 0, "Result": \{"PrepareDuration": 16687, "PrepareStatus": 
"SUCCESS", "TotalDuration": 2083467, "TransferDuration": 2065744, 
"TransferStatus": "ERROR", "VerifyDuration": 5251, "VerifyStatus": "SUCCESS", 
"ErrorCode": "SockTlsHandshakeFailure", "ErrorDetail": "DataSync agent ran into 
an error connecting to AWS.Please review the DataSync network requirements and 
ensure required endpoints are accessible from the agent. Please contact AWS 
support if the error persists."}, "ResponseMetadata": \{"RequestId": "***", 
"HTTPStatusCode": 200, "HTTPHeaders": {"date": "Tue, 28 Jan 2020 16:23:23 GMT", 
"content-type": "application/x-amz-json-1.1", "content-length": "1179", 
"connection": "keep-alive", "x-amzn-requestid": "***"}, "RetryAttempts": 0}}

 

Note that the 'Result' element contains the statuses and errors that are of 
interest, however these are hard to see in the log at the moment.

Example of a successful one:
'Result': \{'PrepareDuration': 9663, 'PrepareStatus': 'SUCCESS', 
'TotalDuration': 352095, 'TransferDuration': 338358, 'TransferStatus': 
'SUCCESS', 'VerifyDuration': 7171, 'VerifyStatus': 'SUCCESS'},


Suggested output is to include the previous line/s but also add:

[2020-01-28 17:44:39,495] \{datasync.py:354} INFO/ERROR - Status=SUCCESS/ERROR

[2020-01-28 17:44:39,495] \{datasync.py:354} INFO/ERROR - 
PrepareStatus=SUCCESS/ERROR PrepareDuration=9795

[2020-01-28 17:44:39,495] \{datasync.py:354} INFO/ERROR - 
TransferStatus=SUCCESS/ERROR TransferDuration=9795

[2020-01-28 17:44:39,495] \{datasync.py:354} INFO/ERROR - 
VerifyStatus=SUCCESS/ERROR TransferDuration=9795
[2020-01-28 17:44:39,495] \{datasync.py:354} ERROR - ErrorCode=OpNotSupp, 
ErrorDetail=Operation not supported

 

This should make it much clearer what the job status and errors are.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (AIRFLOW-6654) AWS DataSync - bugfix when creating locations

2020-01-27 Thread Bjorn Olsen (Jira)
Bjorn Olsen created AIRFLOW-6654:


 Summary: AWS DataSync - bugfix when creating locations
 Key: AIRFLOW-6654
 URL: https://issues.apache.org/jira/browse/AIRFLOW-6654
 Project: Apache Airflow
  Issue Type: Bug
  Components: aws
Affects Versions: 1.10.7
Reporter: Bjorn Olsen
Assignee: Bjorn Olsen


If a Location does not exist it will attempt to be created, when creating an 
AWS Datasync task. However this should only happen if the appropriate 
create_kwargs where provided - otherwise creation should not be attempted and 
instead an error should be produced.

 

This is currently not happening, below log indicates a source location was 
missing and DataSync operator tried to create it. However no kwargs were 
provided so instead it should just immediately fail.

The log message is also a bit hard to understand and can be improved.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (AIRFLOW-6400) pytest not working on Windows

2019-12-30 Thread Bjorn Olsen (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-6400?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bjorn Olsen updated AIRFLOW-6400:
-
Description: 
While Windows isn't supported I am using it to do development and testing.

This line in conftest.py is causing problems because "HOME" is not an 
environment variable on Windows.
 home = os.environ.get("HOME")
 It can easily be changed to be cross-platform.

Edit: The same issue is causing test discovery to fail on Windows 10.

  was:
While Windows isn't supported I am using it to do development and testing.

This line in conftest.py is causing problems because "HOME" is not an 
environment variable on Windows.
home = os.environ.get("HOME")
It can easily be changed to be cross-platform.


> pytest not working on Windows
> -
>
> Key: AIRFLOW-6400
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6400
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: tests
>Affects Versions: 1.10.7
>Reporter: Bjorn Olsen
>Assignee: Bjorn Olsen
>Priority: Minor
>
> While Windows isn't supported I am using it to do development and testing.
> This line in conftest.py is causing problems because "HOME" is not an 
> environment variable on Windows.
>  home = os.environ.get("HOME")
>  It can easily be changed to be cross-platform.
> Edit: The same issue is causing test discovery to fail on Windows 10.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (AIRFLOW-6400) pytest not working on Windows

2019-12-30 Thread Bjorn Olsen (Jira)
Bjorn Olsen created AIRFLOW-6400:


 Summary: pytest not working on Windows
 Key: AIRFLOW-6400
 URL: https://issues.apache.org/jira/browse/AIRFLOW-6400
 Project: Apache Airflow
  Issue Type: Bug
  Components: tests
Affects Versions: 1.10.7
Reporter: Bjorn Olsen
Assignee: Bjorn Olsen


While Windows isn't supported I am using it to do development and testing.

This line in conftest.py is causing problems because "HOME" is not an 
environment variable on Windows.
home = os.environ.get("HOME")
It can easily be changed to be cross-platform.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (AIRFLOW-6366) Fix migrations for MS SQL Server

2019-12-26 Thread Bjorn Olsen (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-6366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bjorn Olsen updated AIRFLOW-6366:
-
Description: 
Although MS SQL Server is not officially supported as a backend database, I am 
using it via PyODBC and found a minor issue with migration from 1.10.6 to 
1.10.7.

 

Below is a log excerpt showing the issue:

INFO [alembic.runtime.migration] Running upgrade 6e96a59344a4 -> d38e04c12aa2, 
add serialized_dag table

sqlalchemy.exc.ProgrammingError: (pyodbc.ProgrammingError) ('42000', "[42000] 
[Microsoft][ODBC Driver 17 for SQL Server][SQL Server]'JSON_VALID' is not a 
recognized built-in function name. (195) (SQLExecDirectW)")

  was:
Although MS SQL Server is not officially supported, I am using it via PyODBC 
and found a minor issue with migration from 1.10.6 to 1.10.7.

 

Below is a log excerpt showing the issue:

INFO [alembic.runtime.migration] Running upgrade 6e96a59344a4 -> d38e04c12aa2, 
add serialized_dag table

sqlalchemy.exc.ProgrammingError: (pyodbc.ProgrammingError) ('42000', "[42000] 
[Microsoft][ODBC Driver 17 for SQL Server][SQL Server]'JSON_VALID' is not a 
recognized built-in function name. (195) (SQLExecDirectW)")


> Fix migrations for MS SQL Server
> 
>
> Key: AIRFLOW-6366
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6366
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: database
>Affects Versions: 1.10.7
>Reporter: Bjorn Olsen
>Assignee: Bjorn Olsen
>Priority: Minor
>
> Although MS SQL Server is not officially supported as a backend database, I 
> am using it via PyODBC and found a minor issue with migration from 1.10.6 to 
> 1.10.7.
>  
> Below is a log excerpt showing the issue:
> INFO [alembic.runtime.migration] Running upgrade 6e96a59344a4 -> 
> d38e04c12aa2, add serialized_dag table
> sqlalchemy.exc.ProgrammingError: (pyodbc.ProgrammingError) ('42000', "[42000] 
> [Microsoft][ODBC Driver 17 for SQL Server][SQL Server]'JSON_VALID' is not a 
> recognized built-in function name. (195) (SQLExecDirectW)")



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (AIRFLOW-6366) Fix migrations for MS SQL Server

2019-12-26 Thread Bjorn Olsen (Jira)
Bjorn Olsen created AIRFLOW-6366:


 Summary: Fix migrations for MS SQL Server
 Key: AIRFLOW-6366
 URL: https://issues.apache.org/jira/browse/AIRFLOW-6366
 Project: Apache Airflow
  Issue Type: Bug
  Components: database
Affects Versions: 1.10.7
Reporter: Bjorn Olsen
Assignee: Bjorn Olsen


Although MS SQL Server is not officially supported, I am using it via PyODBC 
and found a minor issue with migration from 1.10.6 to 1.10.7.

 

Below is a log excerpt showing the issue:

INFO [alembic.runtime.migration] Running upgrade 6e96a59344a4 -> d38e04c12aa2, 
add serialized_dag table

sqlalchemy.exc.ProgrammingError: (pyodbc.ProgrammingError) ('42000', "[42000] 
[Microsoft][ODBC Driver 17 for SQL Server][SQL Server]'JSON_VALID' is not a 
recognized built-in function name. (195) (SQLExecDirectW)")



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (AIRFLOW-6327) http_hook: Accept json= parameter for payload

2019-12-23 Thread Bjorn Olsen (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-6327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bjorn Olsen updated AIRFLOW-6327:
-
Summary: http_hook: Accept json= parameter for payload  (was: http_hook: 
Add json requests payload)

> http_hook: Accept json= parameter for payload
> -
>
> Key: AIRFLOW-6327
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6327
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: hooks
>Affects Versions: 1.10.6
>Reporter: Bjorn Olsen
>Assignee: Bjorn Olsen
>Priority: Minor
>
> Python requests library allows a user to send json-encoded Python objects by 
> making use of the "json=" parameter to a request.
> This handles JSON encoding of the payload and setting the correct content 
> type header.
> Example:
> {code:python}
> response = requests.post('https://httpbin.org/post', json={'key':'value'})
> json_response = response.json() 
> json_response['data'] '{"key": "value"}' 
> json_response['headers']['Content-Type'] 'application/json'
> {code}
> http_hook.run() does not yet have the json= parameter which is inconvenient 
> as then we have to pass the "data=" parameter using json.dumps and specify 
> the correct headers.
> It would be better if we can just do something like the below and let the 
> Requests library ensure the request is valid:
> {code:python}
> obj = {'a':1, 'b': 'abc', 'c': [1, 2, {"d":10}]}
> response = hook.run(
>  endpoint,
>  json=obj)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (AIRFLOW-6327) http_hook: Add json requests payload

2019-12-23 Thread Bjorn Olsen (Jira)
Bjorn Olsen created AIRFLOW-6327:


 Summary: http_hook: Add json requests payload
 Key: AIRFLOW-6327
 URL: https://issues.apache.org/jira/browse/AIRFLOW-6327
 Project: Apache Airflow
  Issue Type: Improvement
  Components: hooks
Affects Versions: 1.10.6
Reporter: Bjorn Olsen
Assignee: Bjorn Olsen


Python requests library allows a user to send json-encoded Python objects by 
making use of the "json=" parameter to a request.

This handles JSON encoding of the payload and setting the correct content type 
header.

Example:
{code:python}
response = requests.post('https://httpbin.org/post', json={'key':'value'})
json_response = response.json() 
json_response['data'] '{"key": "value"}' 
json_response['headers']['Content-Type'] 'application/json'
{code}
http_hook.run() does not yet have the json= parameter which is inconvenient as 
then we have to pass the "data=" parameter using json.dumps and specify the 
correct headers.

It would be better if we can just do something like the below and let the 
Requests library ensure the request is valid:
{code:python}
obj = {'a':1, 'b': 'abc', 'c': [1, 2, {"d":10}]}
response = hook.run(
 endpoint,
 json=obj)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (AIRFLOW-6195) queued_dttm is "None" on UI, and not updated when tasks requeued

2019-12-08 Thread Bjorn Olsen (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-6195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16990952#comment-16990952
 ] 

Bjorn Olsen commented on AIRFLOW-6195:
--

Turns out this actually affects several fields, due to them being missing from 
refresh_from_db() in the TaskInstance model file. Fix incoming

> queued_dttm is "None" on UI, and not updated when tasks requeued
> 
>
> Key: AIRFLOW-6195
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6195
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: ui
>Affects Versions: 1.10.6
>Reporter: Bjorn Olsen
>Assignee: Bjorn Olsen
>Priority: Minor
> Attachments: image-2019-12-08-14-44-56-762.png, 
> image-2019-12-08-14-45-34-266.png, image-2019-12-08-14-46-09-051.png
>
>
> When inspecting a task instance on the UI, the value for queued_dttm displays 
> as 'None' despite having a value in the DB. Also, the value for queued_dttm 
> is from when the task was first queued and not updated if it is requeued - it 
> is not clear if this is intentional behaviour or not.
> On UI:
> !image-2019-12-08-14-44-56-762.png!
> In DB:
> !image-2019-12-08-14-45-34-266.png!
> In reality, task was queued on 8 December and run shortly after.
> queued_dttm in the DB is from the very first attempt, and is not updated from 
> recent attempts.
> !image-2019-12-08-14-46-09-051.png!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (AIRFLOW-6195) queued_dttm is "None" on UI, and not updated when tasks requeued

2019-12-08 Thread Bjorn Olsen (Jira)
Bjorn Olsen created AIRFLOW-6195:


 Summary: queued_dttm is "None" on UI, and not updated when tasks 
requeued
 Key: AIRFLOW-6195
 URL: https://issues.apache.org/jira/browse/AIRFLOW-6195
 Project: Apache Airflow
  Issue Type: Bug
  Components: ui
Affects Versions: 1.10.6
Reporter: Bjorn Olsen
Assignee: Bjorn Olsen
 Attachments: image-2019-12-08-14-44-56-762.png, 
image-2019-12-08-14-45-34-266.png, image-2019-12-08-14-46-09-051.png

When inspecting a task instance on the UI, the value for queued_dttm displays 
as 'None' despite having a value in the DB. Also, the value for queued_dttm is 
from when the task was first queued and not updated if it is requeued - it is 
not clear if this is intentional behaviour or not.

On UI:
!image-2019-12-08-14-44-56-762.png!

In DB:

!image-2019-12-08-14-45-34-266.png!

In reality, task was queued on 8 December and run shortly after.

queued_dttm in the DB is from the very first attempt, and is not updated from 
recent attempts.

!image-2019-12-08-14-46-09-051.png!

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (AIRFLOW-6190) Task instances queued and dequeued before worker is ready, causing intermittently failed tasks

2019-12-08 Thread Bjorn Olsen (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-6190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bjorn Olsen closed AIRFLOW-6190.

Resolution: Duplicate

Closing as Duplicate

> Task instances queued and dequeued before worker is ready, causing 
> intermittently failed tasks
> --
>
> Key: AIRFLOW-6190
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6190
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.10.6
>Reporter: Bjorn Olsen
>Assignee: Bjorn Olsen
>Priority: Minor
> Attachments: image-2019-12-06-13-55-33-974.png
>
>
> Below dag creates 20 identical simple tasks which depend on each other in 
> series.
> Installing the DAG and executing all the DAG runs works perfectly the first 
> time around.
> Then Pausing the DAG, clearing all the dag runs, and unpausing the DAG leads 
> to intermittent task failures.
> Edit: This isn't specifically tied to the first and second round; it seems to 
> randomly affect an entire set of dag runs or not affect the set at all. This 
> makes me suspect a timing issue between the executor and scheduler (sometimes 
> they align and sometimes they dont).
> {code:java}
> from builtins import range
> from datetime import timedelta
> import airflow
> from airflow.models import DAG
> from airflow.operators.bash_operator import BashOperator
> from airflow.operators.latest_only_operator import LatestOnlyOperator
> from airflow.operators.python_operator import (BranchPythonOperator,
>PythonOperator)
> import sys, os
> args = {
> 'owner': 'airflow',
> 'start_date': airflow.utils.dates.days_ago(5),
> }
> dag = DAG(
> dag_id='bug_testing_dag',
> default_args=args,
> schedule_interval='@daily',
> max_active_runs=1
> )
> def func():
>pass
> prev_task = None
> for i in range(0,20):
> task = PythonOperator(
> task_id='task_{0}'.format(i),
> python_callable=func,
> dag=dag,)
> if prev_task:
> prev_task >> task
> 
> prev_task = task
> if __name__ == "__main__":
> dag.cli(){code}
> I am using the LocalExecutor.
>  job_heartbeat_sec = 5
> scheduler_heartbeat_sec = 5
> Example:
> !image-2019-12-06-13-55-33-974.png|width=398,height=276!
>  
> The second attempt tasks have 2 Logs shown on the UI if they were successful, 
> and 2 physical log files on disk. However the tasks that Failed only have 1 
> log shown on the UI, despite there being 2 physical log files on disk. 
> (Presumably the UI uses the Airflow DB which for some reason isn't aware of 
> the second log for the failed tasks).
>  
> Anyway I am more interested in the intermittent failures than what logs are 
> shown on the UI. 
> Here is an example of the second log file for the Failed task attempts: 
> {code:java}
> [2019-12-06 13:40:57,064] {taskinstance.py:624} INFO - Dependencies not met 
> for  [scheduled]>, dependency 'Task Instance State' FAILED: Task is in the 
> 'scheduled' state which is not a valid state for execution. The task must be 
> cleared in order to be run.
> [2019-12-06 13:40:57,065] {logging_mixin.py:112} INFO - [2019-12-06 
> 13:40:57,065] {local_task_job.py:91} INFO - Task is not able to be run
> [2019-12-06 13:41:09,004] {taskinstance.py:624} INFO - Dependencies not met 
> for  [failed]>, dependency 'Task Instance State' FAILED: Task is in the 'failed' 
> state which is not a valid state for execution. The task must be cleared in 
> order to be run.
> [2019-12-06 13:41:09,005] {logging_mixin.py:112} INFO - [2019-12-06 
> 13:41:09,005] {local_task_job.py:91} INFO - Task is not able to be run
> {code}
>  
> At first I thought this was because the workers were still busy with the 
> previous TaskInstance (because there is a delay between when a TaskInstance 
> state is set to SUCCESS, and when the worker is actually done with it, 
> because of the worker heartbeat). The scheduler thinks the next task can be 
> SCHEDULED -> QUEUED, but does not start as the worker is still busy, and 
> therefore it goes back to QUEUED -> SCHEDULED. The task is still in the 
> worker queue, causing the failure above when the worker eventually wants to 
> start it.
> However what is a mystery to me is why it works the first time the dag_run 
> runs, and not the second time. Perhaps it is something specific to my 
> environment. 
> I'm going to try and debug this myself but if anyone else can replicate this 
> issue in their environment it could help me understand if it is just 
> affecting me (or not). 
>  Just install the DAG, let it run 100% once, then clear it and let it run 
> again (and you should start seeing random failures)
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (AIRFLOW-6190) Task instances queued and dequeued before worker is ready, causing intermittently failed tasks

2019-12-08 Thread Bjorn Olsen (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-6190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16990843#comment-16990843
 ] 

Bjorn Olsen commented on AIRFLOW-6190:
--

[~kaxilnaik] , [~ash] 
I've done some testing and your PR does resolve this issue as well :) Managed 
to run my DAG a couple times and it works perfectly after your fixes. I'll 
discard mine since I think yours are more comprehensive. Along the way I 
noticed a possible bug with queued_dttm so I'll create a new Jira for that and 
move on to it.
Thanks for the help :)

> Task instances queued and dequeued before worker is ready, causing 
> intermittently failed tasks
> --
>
> Key: AIRFLOW-6190
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6190
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.10.6
>Reporter: Bjorn Olsen
>Assignee: Bjorn Olsen
>Priority: Minor
> Attachments: image-2019-12-06-13-55-33-974.png
>
>
> Below dag creates 20 identical simple tasks which depend on each other in 
> series.
> Installing the DAG and executing all the DAG runs works perfectly the first 
> time around.
> Then Pausing the DAG, clearing all the dag runs, and unpausing the DAG leads 
> to intermittent task failures.
> Edit: This isn't specifically tied to the first and second round; it seems to 
> randomly affect an entire set of dag runs or not affect the set at all. This 
> makes me suspect a timing issue between the executor and scheduler (sometimes 
> they align and sometimes they dont).
> {code:java}
> from builtins import range
> from datetime import timedelta
> import airflow
> from airflow.models import DAG
> from airflow.operators.bash_operator import BashOperator
> from airflow.operators.latest_only_operator import LatestOnlyOperator
> from airflow.operators.python_operator import (BranchPythonOperator,
>PythonOperator)
> import sys, os
> args = {
> 'owner': 'airflow',
> 'start_date': airflow.utils.dates.days_ago(5),
> }
> dag = DAG(
> dag_id='bug_testing_dag',
> default_args=args,
> schedule_interval='@daily',
> max_active_runs=1
> )
> def func():
>pass
> prev_task = None
> for i in range(0,20):
> task = PythonOperator(
> task_id='task_{0}'.format(i),
> python_callable=func,
> dag=dag,)
> if prev_task:
> prev_task >> task
> 
> prev_task = task
> if __name__ == "__main__":
> dag.cli(){code}
> I am using the LocalExecutor.
>  job_heartbeat_sec = 5
> scheduler_heartbeat_sec = 5
> Example:
> !image-2019-12-06-13-55-33-974.png|width=398,height=276!
>  
> The second attempt tasks have 2 Logs shown on the UI if they were successful, 
> and 2 physical log files on disk. However the tasks that Failed only have 1 
> log shown on the UI, despite there being 2 physical log files on disk. 
> (Presumably the UI uses the Airflow DB which for some reason isn't aware of 
> the second log for the failed tasks).
>  
> Anyway I am more interested in the intermittent failures than what logs are 
> shown on the UI. 
> Here is an example of the second log file for the Failed task attempts: 
> {code:java}
> [2019-12-06 13:40:57,064] {taskinstance.py:624} INFO - Dependencies not met 
> for  [scheduled]>, dependency 'Task Instance State' FAILED: Task is in the 
> 'scheduled' state which is not a valid state for execution. The task must be 
> cleared in order to be run.
> [2019-12-06 13:40:57,065] {logging_mixin.py:112} INFO - [2019-12-06 
> 13:40:57,065] {local_task_job.py:91} INFO - Task is not able to be run
> [2019-12-06 13:41:09,004] {taskinstance.py:624} INFO - Dependencies not met 
> for  [failed]>, dependency 'Task Instance State' FAILED: Task is in the 'failed' 
> state which is not a valid state for execution. The task must be cleared in 
> order to be run.
> [2019-12-06 13:41:09,005] {logging_mixin.py:112} INFO - [2019-12-06 
> 13:41:09,005] {local_task_job.py:91} INFO - Task is not able to be run
> {code}
>  
> At first I thought this was because the workers were still busy with the 
> previous TaskInstance (because there is a delay between when a TaskInstance 
> state is set to SUCCESS, and when the worker is actually done with it, 
> because of the worker heartbeat). The scheduler thinks the next task can be 
> SCHEDULED -> QUEUED, but does not start as the worker is still busy, and 
> therefore it goes back to QUEUED -> SCHEDULED. The task is still in the 
> worker queue, causing the failure above when the worker eventually wants to 
> start it.
> However what is a mystery to me is why it works the first time the dag_run 
> runs, and not the second time. Perhaps it is something specific to my 
> environment. 
> I'm going to 

[jira] [Commented] (AIRFLOW-6190) Task instances queued and dequeued before worker is ready, causing intermittently failed tasks

2019-12-06 Thread Bjorn Olsen (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-6190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16990119#comment-16990119
 ] 

Bjorn Olsen commented on AIRFLOW-6190:
--

Thanks for the feedback guys. It does seem similar.

Could you take a look at the fix I found and see what you think? 
I'm no scheduler expert so I thought it would be great if you could take a 
look. 
[https://github.com/baolsen/airflow/pull/10/files]

Basically I changed the scheduler to prevent doing SCHEDULED -> QUEUED -> 
SCHEDULED immediately if the task was just queued now and the executor has not 
yet had a chance to look at it. This happens when the executor's task was 
completed but it is still waiting in its heartbeat loop, and meanwhile the 
scheduler thinks it can queue up new tasks.

Do you think your changes would avoid this scenario or could it still happen?

> Task instances queued and dequeued before worker is ready, causing 
> intermittently failed tasks
> --
>
> Key: AIRFLOW-6190
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6190
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.10.6
>Reporter: Bjorn Olsen
>Assignee: Bjorn Olsen
>Priority: Minor
> Attachments: image-2019-12-06-13-55-33-974.png
>
>
> Below dag creates 20 identical simple tasks which depend on each other in 
> series.
> Installing the DAG and executing all the DAG runs works perfectly the first 
> time around.
> Then Pausing the DAG, clearing all the dag runs, and unpausing the DAG leads 
> to intermittent task failures.
> Edit: This isn't specifically tied to the first and second round; it seems to 
> randomly affect an entire set of dag runs or not affect the set at all. This 
> makes me suspect a timing issue between the executor and scheduler (sometimes 
> they align and sometimes they dont).
> {code:java}
> from builtins import range
> from datetime import timedelta
> import airflow
> from airflow.models import DAG
> from airflow.operators.bash_operator import BashOperator
> from airflow.operators.latest_only_operator import LatestOnlyOperator
> from airflow.operators.python_operator import (BranchPythonOperator,
>PythonOperator)
> import sys, os
> args = {
> 'owner': 'airflow',
> 'start_date': airflow.utils.dates.days_ago(5),
> }
> dag = DAG(
> dag_id='bug_testing_dag',
> default_args=args,
> schedule_interval='@daily',
> max_active_runs=1
> )
> def func():
>pass
> prev_task = None
> for i in range(0,20):
> task = PythonOperator(
> task_id='task_{0}'.format(i),
> python_callable=func,
> dag=dag,)
> if prev_task:
> prev_task >> task
> 
> prev_task = task
> if __name__ == "__main__":
> dag.cli(){code}
> I am using the LocalExecutor.
>  job_heartbeat_sec = 5
> scheduler_heartbeat_sec = 5
> Example:
> !image-2019-12-06-13-55-33-974.png|width=398,height=276!
>  
> The second attempt tasks have 2 Logs shown on the UI if they were successful, 
> and 2 physical log files on disk. However the tasks that Failed only have 1 
> log shown on the UI, despite there being 2 physical log files on disk. 
> (Presumably the UI uses the Airflow DB which for some reason isn't aware of 
> the second log for the failed tasks).
>  
> Anyway I am more interested in the intermittent failures than what logs are 
> shown on the UI. 
> Here is an example of the second log file for the Failed task attempts: 
> {code:java}
> [2019-12-06 13:40:57,064] {taskinstance.py:624} INFO - Dependencies not met 
> for  [scheduled]>, dependency 'Task Instance State' FAILED: Task is in the 
> 'scheduled' state which is not a valid state for execution. The task must be 
> cleared in order to be run.
> [2019-12-06 13:40:57,065] {logging_mixin.py:112} INFO - [2019-12-06 
> 13:40:57,065] {local_task_job.py:91} INFO - Task is not able to be run
> [2019-12-06 13:41:09,004] {taskinstance.py:624} INFO - Dependencies not met 
> for  [failed]>, dependency 'Task Instance State' FAILED: Task is in the 'failed' 
> state which is not a valid state for execution. The task must be cleared in 
> order to be run.
> [2019-12-06 13:41:09,005] {logging_mixin.py:112} INFO - [2019-12-06 
> 13:41:09,005] {local_task_job.py:91} INFO - Task is not able to be run
> {code}
>  
> At first I thought this was because the workers were still busy with the 
> previous TaskInstance (because there is a delay between when a TaskInstance 
> state is set to SUCCESS, and when the worker is actually done with it, 
> because of the worker heartbeat). The scheduler thinks the next task can be 
> SCHEDULED -> QUEUED, but does not start as the worker is still busy, and 
> therefore it goes back to QUEUED 

[jira] [Comment Edited] (AIRFLOW-6190) Task instances queued and dequeued before worker is ready, causing intermittently failed tasks

2019-12-06 Thread Bjorn Olsen (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-6190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16989735#comment-16989735
 ] 

Bjorn Olsen edited comment on AIRFLOW-6190 at 12/6/19 1:02 PM:
---

I changed the func to include a random duration sleep and this triggers the 
error more consistently.

This again suggests a timing issue between the scheduler and worker.

 
{code:java}
import random, time
def func():
   time.sleep( random.randint(0,10) )

{code}
 

The scheduler log also starts displaying entries like this, when tasks start to 
fail.

Note the duplicate entry but coming from 2 different Python files.
{code:java}
Dec 06 14:58:55  airflow[116894]: [2019-12-06 14:58:55,171] 
{scheduler_job.py:1321} ERROR - Executor reports task instance  finished (success) 
although the task says its queued. Was the task killed externally?
{code}
{code:java}
Dec 06 14:58:55  airflow[116894]: [2019-12-06 14:58:55,176] 
{taskinstance.py:1072} ERROR - Executor reports task instance  finished (success) 
although the task says its queued. Was the task killed externally?
{code}


was (Author: bjorn.ols...@gmail.com):
I changed the func to include a random duration sleep and this triggers the 
error more consistently.

This again suggests a timing issue between the scheduler and worker.

 
{code:java}
import random, time
def func():
   time.sleep( random.randint(0,10) )

{code}
 

The scheduler log also starts displaying entries like this, when tasks start to 
fail. Note the duplicate entry but coming from 2 different files.
{code:java}
Dec 06 14:58:55  airflow[116894]: [2019-12-06 14:58:55,171] 
{scheduler_job.py:1321} ERROR - Executor reports task instance  finished (success) 
although the task says its queued. Was the task killed externally?
{code}
{code:java}
Dec 06 14:58:55  airflow[116894]: [2019-12-06 14:58:55,176] 
{taskinstance.py:1072} ERROR - Executor reports task instance  finished (success) 
although the task says its queued. Was the task killed externally?
{code}

> Task instances queued and dequeued before worker is ready, causing 
> intermittently failed tasks
> --
>
> Key: AIRFLOW-6190
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6190
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.10.6
>Reporter: Bjorn Olsen
>Assignee: Bjorn Olsen
>Priority: Minor
> Attachments: image-2019-12-06-13-55-33-974.png
>
>
> Below dag creates 20 identical simple tasks which depend on each other in 
> series.
> Installing the DAG and executing all the DAG runs works perfectly the first 
> time around.
> Then Pausing the DAG, clearing all the dag runs, and unpausing the DAG leads 
> to intermittent task failures.
> Edit: This isn't specifically tied to the first and second round; it seems to 
> randomly affect an entire set of dag runs or not affect the set at all. This 
> makes me suspect a timing issue between the executor and scheduler (sometimes 
> they align and sometimes they dont).
> {code:java}
> from builtins import range
> from datetime import timedelta
> import airflow
> from airflow.models import DAG
> from airflow.operators.bash_operator import BashOperator
> from airflow.operators.latest_only_operator import LatestOnlyOperator
> from airflow.operators.python_operator import (BranchPythonOperator,
>PythonOperator)
> import sys, os
> args = {
> 'owner': 'airflow',
> 'start_date': airflow.utils.dates.days_ago(5),
> }
> dag = DAG(
> dag_id='bug_testing_dag',
> default_args=args,
> schedule_interval='@daily',
> max_active_runs=1
> )
> def func():
>pass
> prev_task = None
> for i in range(0,20):
> task = PythonOperator(
> task_id='task_{0}'.format(i),
> python_callable=func,
> dag=dag,)
> if prev_task:
> prev_task >> task
> 
> prev_task = task
> if __name__ == "__main__":
> dag.cli(){code}
> I am using the LocalExecutor.
>  job_heartbeat_sec = 5
> scheduler_heartbeat_sec = 5
> Example:
> !image-2019-12-06-13-55-33-974.png|width=398,height=276!
>  
> The second attempt tasks have 2 Logs shown on the UI if they were successful, 
> and 2 physical log files on disk. However the tasks that Failed only have 1 
> log shown on the UI, despite there being 2 physical log files on disk. 
> (Presumably the UI uses the Airflow DB which for some reason isn't aware of 
> the second log for the failed tasks).
>  
> Anyway I am more interested in the intermittent failures than what logs are 
> shown on the UI. 
> Here is an example of the second log file for the Failed task attempts: 
> {code:java}
> [2019-12-06 13:40:57,064] {taskinstance.py:624} INFO - Dependencies not met 
> 

[jira] [Comment Edited] (AIRFLOW-6190) Task instances queued and dequeued before worker is ready, causing intermittently failed tasks

2019-12-06 Thread Bjorn Olsen (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-6190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16989735#comment-16989735
 ] 

Bjorn Olsen edited comment on AIRFLOW-6190 at 12/6/19 12:59 PM:


I changed the func to include a random duration sleep and this triggers the 
error more consistently.

This again suggests a timing issue between the scheduler and worker.

 
{code:java}
import random, time
def func():
   time.sleep( random.randint(0,10) )

{code}
 

The scheduler log also starts displaying entries like this, when tasks start to 
fail:
{code:java}
Dec 06 14:57:23  airflow[116894]: [2019-12-06 14:57:23,110] 
{scheduler_job.py:1321} ERROR - Executor reports task instance  finished (success) 
although the task says its queued. Was the task killed externally?
{code}


was (Author: bjorn.ols...@gmail.com):
I changed the func to include a random duration sleep and this triggers the 
error more consistently.

This again suggests a timing issue between the scheduler and worker.

 
{code:java}
import random, time
def func():
   time.sleep( random.randint(0,10) )

{code}
 

> Task instances queued and dequeued before worker is ready, causing 
> intermittently failed tasks
> --
>
> Key: AIRFLOW-6190
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6190
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.10.6
>Reporter: Bjorn Olsen
>Assignee: Bjorn Olsen
>Priority: Minor
> Attachments: image-2019-12-06-13-55-33-974.png
>
>
> Below dag creates 20 identical simple tasks which depend on each other in 
> series.
> Installing the DAG and executing all the DAG runs works perfectly the first 
> time around.
> Then Pausing the DAG, clearing all the dag runs, and unpausing the DAG leads 
> to intermittent task failures.
> Edit: This isn't specifically tied to the first and second round; it seems to 
> randomly affect an entire set of dag runs or not affect the set at all. This 
> makes me suspect a timing issue between the executor and scheduler (sometimes 
> they align and sometimes they dont).
> {code:java}
> from builtins import range
> from datetime import timedelta
> import airflow
> from airflow.models import DAG
> from airflow.operators.bash_operator import BashOperator
> from airflow.operators.latest_only_operator import LatestOnlyOperator
> from airflow.operators.python_operator import (BranchPythonOperator,
>PythonOperator)
> import sys, os
> args = {
> 'owner': 'airflow',
> 'start_date': airflow.utils.dates.days_ago(5),
> }
> dag = DAG(
> dag_id='bug_testing_dag',
> default_args=args,
> schedule_interval='@daily',
> max_active_runs=1
> )
> def func():
>pass
> prev_task = None
> for i in range(0,20):
> task = PythonOperator(
> task_id='task_{0}'.format(i),
> python_callable=func,
> dag=dag,)
> if prev_task:
> prev_task >> task
> 
> prev_task = task
> if __name__ == "__main__":
> dag.cli(){code}
> I am using the LocalExecutor.
>  job_heartbeat_sec = 5
> scheduler_heartbeat_sec = 5
> Example:
> !image-2019-12-06-13-55-33-974.png|width=398,height=276!
>  
> The second attempt tasks have 2 Logs shown on the UI if they were successful, 
> and 2 physical log files on disk. However the tasks that Failed only have 1 
> log shown on the UI, despite there being 2 physical log files on disk. 
> (Presumably the UI uses the Airflow DB which for some reason isn't aware of 
> the second log for the failed tasks).
>  
> Anyway I am more interested in the intermittent failures than what logs are 
> shown on the UI. 
> Here is an example of the second log file for the Failed task attempts: 
> {code:java}
> [2019-12-06 13:40:57,064] {taskinstance.py:624} INFO - Dependencies not met 
> for  [scheduled]>, dependency 'Task Instance State' FAILED: Task is in the 
> 'scheduled' state which is not a valid state for execution. The task must be 
> cleared in order to be run.
> [2019-12-06 13:40:57,065] {logging_mixin.py:112} INFO - [2019-12-06 
> 13:40:57,065] {local_task_job.py:91} INFO - Task is not able to be run
> [2019-12-06 13:41:09,004] {taskinstance.py:624} INFO - Dependencies not met 
> for  [failed]>, dependency 'Task Instance State' FAILED: Task is in the 'failed' 
> state which is not a valid state for execution. The task must be cleared in 
> order to be run.
> [2019-12-06 13:41:09,005] {logging_mixin.py:112} INFO - [2019-12-06 
> 13:41:09,005] {local_task_job.py:91} INFO - Task is not able to be run
> {code}
>  
> At first I thought this was because the workers were still busy with the 
> previous TaskInstance (because there is a delay between when a TaskInstance 
> 

[jira] [Commented] (AIRFLOW-6190) Task instances queued and dequeued before worker is ready, causing intermittently failed tasks

2019-12-06 Thread Bjorn Olsen (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-6190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16989735#comment-16989735
 ] 

Bjorn Olsen commented on AIRFLOW-6190:
--

I changed the func to include a random duration sleep and this triggers the 
error more consistently.

This again suggests a timing issue between the scheduler and worker.

 
{code:java}
import random, time
def func():
   time.sleep( random.randint(0,10) )

{code}
 

> Task instances queued and dequeued before worker is ready, causing 
> intermittently failed tasks
> --
>
> Key: AIRFLOW-6190
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6190
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.10.6
>Reporter: Bjorn Olsen
>Assignee: Bjorn Olsen
>Priority: Minor
> Attachments: image-2019-12-06-13-55-33-974.png
>
>
> Below dag creates 20 identical simple tasks which depend on each other in 
> series.
> Installing the DAG and executing all the DAG runs works perfectly the first 
> time around.
> Then Pausing the DAG, clearing all the dag runs, and unpausing the DAG leads 
> to intermittent task failures.
> Edit: This isn't specifically tied to the first and second round; it seems to 
> randomly affect an entire set of dag runs or not affect the set at all. This 
> makes me suspect a timing issue between the executor and scheduler (sometimes 
> they align and sometimes they dont).
> {code:java}
> from builtins import range
> from datetime import timedelta
> import airflow
> from airflow.models import DAG
> from airflow.operators.bash_operator import BashOperator
> from airflow.operators.latest_only_operator import LatestOnlyOperator
> from airflow.operators.python_operator import (BranchPythonOperator,
>PythonOperator)
> import sys, os
> args = {
> 'owner': 'airflow',
> 'start_date': airflow.utils.dates.days_ago(5),
> }
> dag = DAG(
> dag_id='bug_testing_dag',
> default_args=args,
> schedule_interval='@daily',
> max_active_runs=1
> )
> def func():
>pass
> prev_task = None
> for i in range(0,20):
> task = PythonOperator(
> task_id='task_{0}'.format(i),
> python_callable=func,
> dag=dag,)
> if prev_task:
> prev_task >> task
> 
> prev_task = task
> if __name__ == "__main__":
> dag.cli(){code}
> I am using the LocalExecutor.
>  job_heartbeat_sec = 5
> scheduler_heartbeat_sec = 5
> Example:
> !image-2019-12-06-13-55-33-974.png|width=398,height=276!
>  
> The second attempt tasks have 2 Logs shown on the UI if they were successful, 
> and 2 physical log files on disk. However the tasks that Failed only have 1 
> log shown on the UI, despite there being 2 physical log files on disk. 
> (Presumably the UI uses the Airflow DB which for some reason isn't aware of 
> the second log for the failed tasks).
>  
> Anyway I am more interested in the intermittent failures than what logs are 
> shown on the UI. 
> Here is an example of the second log file for the Failed task attempts: 
> {code:java}
> [2019-12-06 13:40:57,064] {taskinstance.py:624} INFO - Dependencies not met 
> for  [scheduled]>, dependency 'Task Instance State' FAILED: Task is in the 
> 'scheduled' state which is not a valid state for execution. The task must be 
> cleared in order to be run.
> [2019-12-06 13:40:57,065] {logging_mixin.py:112} INFO - [2019-12-06 
> 13:40:57,065] {local_task_job.py:91} INFO - Task is not able to be run
> [2019-12-06 13:41:09,004] {taskinstance.py:624} INFO - Dependencies not met 
> for  [failed]>, dependency 'Task Instance State' FAILED: Task is in the 'failed' 
> state which is not a valid state for execution. The task must be cleared in 
> order to be run.
> [2019-12-06 13:41:09,005] {logging_mixin.py:112} INFO - [2019-12-06 
> 13:41:09,005] {local_task_job.py:91} INFO - Task is not able to be run
> {code}
>  
> At first I thought this was because the workers were still busy with the 
> previous TaskInstance (because there is a delay between when a TaskInstance 
> state is set to SUCCESS, and when the worker is actually done with it, 
> because of the worker heartbeat). The scheduler thinks the next task can be 
> SCHEDULED -> QUEUED, but does not start as the worker is still busy, and 
> therefore it goes back to QUEUED -> SCHEDULED. The task is still in the 
> worker queue, causing the failure above when the worker eventually wants to 
> start it.
> However what is a mystery to me is why it works the first time the dag_run 
> runs, and not the second time. Perhaps it is something specific to my 
> environment. 
> I'm going to try and debug this myself but if anyone else can replicate this 
> issue in their environment it could 

[jira] [Updated] (AIRFLOW-6190) Task instances queued and dequeued before worker is ready, causing intermittently failed tasks

2019-12-06 Thread Bjorn Olsen (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-6190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bjorn Olsen updated AIRFLOW-6190:
-
Description: 
Below dag creates 20 identical simple tasks which depend on each other in 
series.

Installing the DAG and executing all the DAG runs works perfectly the first 
time around.

Then Pausing the DAG, clearing all the dag runs, and unpausing the DAG leads to 
intermittent task failures.

Edit: This isn't specifically tied to the first and second round; it seems to 
randomly affect an entire set of dag runs or not affect the set at all. This 
makes me suspect a timing issue between the executor and scheduler (sometimes 
they align and sometimes they dont).
{code:java}
from builtins import range
from datetime import timedelta

import airflow
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.latest_only_operator import LatestOnlyOperator

from airflow.operators.python_operator import (BranchPythonOperator,
   PythonOperator)
import sys, os

args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(5),
}

dag = DAG(
dag_id='bug_testing_dag',
default_args=args,
schedule_interval='@daily',
max_active_runs=1
)

def func():
   pass

prev_task = None
for i in range(0,20):
task = PythonOperator(
task_id='task_{0}'.format(i),
python_callable=func,
dag=dag,)
if prev_task:
prev_task >> task

prev_task = task

if __name__ == "__main__":
dag.cli(){code}
I am using the LocalExecutor.

 job_heartbeat_sec = 5

scheduler_heartbeat_sec = 5

Example:

!image-2019-12-06-13-55-33-974.png|width=398,height=276!

 

The second attempt tasks have 2 Logs shown on the UI if they were successful, 
and 2 physical log files on disk. However the tasks that Failed only have 1 log 
shown on the UI, despite there being 2 physical log files on disk. (Presumably 
the UI uses the Airflow DB which for some reason isn't aware of the second log 
for the failed tasks).

 

Anyway I am more interested in the intermittent failures than what logs are 
shown on the UI. 

Here is an example of the second log file for the Failed task attempts: 
{code:java}
[2019-12-06 13:40:57,064] {taskinstance.py:624} INFO - Dependencies not met for 
, 
dependency 'Task Instance State' FAILED: Task is in the 'scheduled' state which 
is not a valid state for execution. The task must be cleared in order to be run.
[2019-12-06 13:40:57,065] {logging_mixin.py:112} INFO - [2019-12-06 
13:40:57,065] {local_task_job.py:91} INFO - Task is not able to be run
[2019-12-06 13:41:09,004] {taskinstance.py:624} INFO - Dependencies not met for 
, 
dependency 'Task Instance State' FAILED: Task is in the 'failed' state which is 
not a valid state for execution. The task must be cleared in order to be run.
[2019-12-06 13:41:09,005] {logging_mixin.py:112} INFO - [2019-12-06 
13:41:09,005] {local_task_job.py:91} INFO - Task is not able to be run
{code}
 

At first I thought this was because the workers were still busy with the 
previous TaskInstance (because there is a delay between when a TaskInstance 
state is set to SUCCESS, and when the worker is actually done with it, because 
of the worker heartbeat). The scheduler thinks the next task can be SCHEDULED 
-> QUEUED, but does not start as the worker is still busy, and therefore it 
goes back to QUEUED -> SCHEDULED. The task is still in the worker queue, 
causing the failure above when the worker eventually wants to start it.

However what is a mystery to me is why it works the first time the dag_run 
runs, and not the second time. Perhaps it is something specific to my 
environment. 

I'm going to try and debug this myself but if anyone else can replicate this 
issue in their environment it could help me understand if it is just affecting 
me (or not). 
 Just install the DAG, let it run 100% once, then clear it and let it run again 
(and you should start seeing random failures)

 

  was:
Below dag creates 20 identical simple tasks which depend on each other in 
series.

Installing the DAG and executing all the DAG runs works perfectly the first 
time around.

Then Pausing the DAG, clearing all the dag runs, and unpausing the DAG leads to 
intermittent task failures.

Edit: This isn't specifically tied to the first and second round; it seems to 
randomly affect an entire set of dag runs or not affect the set at all. This 
makes me suspect a timing issue between the executor and scheduler (sometimes 
they align and sometimes they dont).


{code:java}
from builtins import range
from datetime import timedelta

import airflow
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.latest_only_operator import LatestOnlyOperator

from airflow.operators.python_operator import 

[jira] [Updated] (AIRFLOW-6190) Task instances queued and dequeued before worker is ready, causing intermittently failed tasks

2019-12-06 Thread Bjorn Olsen (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-6190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bjorn Olsen updated AIRFLOW-6190:
-
Description: 
Below dag creates 20 identical simple tasks which depend on each other in 
series.

Installing the DAG and executing all the DAG runs works perfectly the first 
time around.

Then Pausing the DAG, clearing all the dag runs, and unpausing the DAG leads to 
intermittent task failures.

Edit: This isn't specifically tied to the first and second round; it seems to 
randomly affect an entire set of dag runs or not affect the set at all.
{code:java}
from builtins import range
from datetime import timedelta

import airflow
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.latest_only_operator import LatestOnlyOperator

from airflow.operators.python_operator import (BranchPythonOperator,
   PythonOperator)
import sys, os

args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(5),
}

dag = DAG(
dag_id='bug_testing_dag',
default_args=args,
schedule_interval='@daily',
max_active_runs=1
)

def func():
   pass

prev_task = None
for i in range(0,20):
task = PythonOperator(
task_id='task_{0}'.format(i),
python_callable=func,
dag=dag,)
if prev_task:
prev_task >> task

prev_task = task

if __name__ == "__main__":
dag.cli(){code}
I am using the LocalExecutor.

 

Example:

!image-2019-12-06-13-55-33-974.png|width=398,height=276!

 

The second attempt tasks have 2 Logs shown on the UI if they were successful, 
and 2 physical log files on disk. However the tasks that Failed only have 1 log 
shown on the UI, despite there being 2 physical log files on disk. (Presumably 
the UI uses the Airflow DB which for some reason isn't aware of the second log 
for the failed tasks).

 

Anyway I am more interested in the intermittent failures than what logs are 
shown on the UI. 

Here is an example of the second log file for the Failed task attempts: 
{code:java}
[2019-12-06 13:40:57,064] {taskinstance.py:624} INFO - Dependencies not met for 
, 
dependency 'Task Instance State' FAILED: Task is in the 'scheduled' state which 
is not a valid state for execution. The task must be cleared in order to be run.
[2019-12-06 13:40:57,065] {logging_mixin.py:112} INFO - [2019-12-06 
13:40:57,065] {local_task_job.py:91} INFO - Task is not able to be run
[2019-12-06 13:41:09,004] {taskinstance.py:624} INFO - Dependencies not met for 
, 
dependency 'Task Instance State' FAILED: Task is in the 'failed' state which is 
not a valid state for execution. The task must be cleared in order to be run.
[2019-12-06 13:41:09,005] {logging_mixin.py:112} INFO - [2019-12-06 
13:41:09,005] {local_task_job.py:91} INFO - Task is not able to be run
{code}
 

At first I thought this was because the workers were still busy with the 
previous TaskInstance (because there is a delay between when a TaskInstance 
state is set to SUCCESS, and when the worker is actually done with it, because 
of the worker heartbeat). The scheduler thinks the next task can be SCHEDULED 
-> QUEUED, but does not start as the worker is still busy, and therefore it 
goes back to QUEUED -> SCHEDULED. The task is still in the worker queue, 
causing the failure above when the worker eventually wants to start it.

However what is a mystery to me is why it works the first time the dag_run 
runs, and not the second time. Perhaps it is something specific to my 
environment. 

I'm going to try and debug this myself but if anyone else can replicate this 
issue in their environment it could help me understand if it is just affecting 
me (or not). 
 Just install the DAG, let it run 100% once, then clear it and let it run again 
(and you should start seeing random failures)

 

  was:
Below dag creates 20 identical simple tasks which depend on each other in 
series.

Installing the DAG and executing all the DAG runs works perfectly the first 
time around.

Then Pausing the DAG, clearing all the dag runs, and unpausing the DAG leads to 
intermittent task failures.
{code:java}
from builtins import range
from datetime import timedelta

import airflow
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.latest_only_operator import LatestOnlyOperator

from airflow.operators.python_operator import (BranchPythonOperator,
   PythonOperator)
import sys, os

args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(5),
}

dag = DAG(
dag_id='bug_testing_dag',
default_args=args,
schedule_interval='@daily',
max_active_runs=1
)

def func():
   pass

prev_task = None
for i in range(0,20):
task = PythonOperator(
task_id='task_{0}'.format(i),

[jira] [Updated] (AIRFLOW-6190) Task instances queued and dequeued before worker is ready, causing intermittently failed tasks

2019-12-06 Thread Bjorn Olsen (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-6190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bjorn Olsen updated AIRFLOW-6190:
-
Description: 
Below dag creates 20 identical simple tasks which depend on each other in 
series.

Installing the DAG and executing all the DAG runs works perfectly the first 
time around.

Then Pausing the DAG, clearing all the dag runs, and unpausing the DAG leads to 
intermittent task failures.

Edit: This isn't specifically tied to the first and second round; it seems to 
randomly affect an entire set of dag runs or not affect the set at all. This 
makes me suspect a timing issue between the executor and scheduler (sometimes 
they align and sometimes they dont).


{code:java}
from builtins import range
from datetime import timedelta

import airflow
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.latest_only_operator import LatestOnlyOperator

from airflow.operators.python_operator import (BranchPythonOperator,
   PythonOperator)
import sys, os

args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(5),
}

dag = DAG(
dag_id='bug_testing_dag',
default_args=args,
schedule_interval='@daily',
max_active_runs=1
)

def func():
   pass

prev_task = None
for i in range(0,20):
task = PythonOperator(
task_id='task_{0}'.format(i),
python_callable=func,
dag=dag,)
if prev_task:
prev_task >> task

prev_task = task

if __name__ == "__main__":
dag.cli(){code}
I am using the LocalExecutor.

 

Example:

!image-2019-12-06-13-55-33-974.png|width=398,height=276!

 

The second attempt tasks have 2 Logs shown on the UI if they were successful, 
and 2 physical log files on disk. However the tasks that Failed only have 1 log 
shown on the UI, despite there being 2 physical log files on disk. (Presumably 
the UI uses the Airflow DB which for some reason isn't aware of the second log 
for the failed tasks).

 

Anyway I am more interested in the intermittent failures than what logs are 
shown on the UI. 

Here is an example of the second log file for the Failed task attempts: 
{code:java}
[2019-12-06 13:40:57,064] {taskinstance.py:624} INFO - Dependencies not met for 
, 
dependency 'Task Instance State' FAILED: Task is in the 'scheduled' state which 
is not a valid state for execution. The task must be cleared in order to be run.
[2019-12-06 13:40:57,065] {logging_mixin.py:112} INFO - [2019-12-06 
13:40:57,065] {local_task_job.py:91} INFO - Task is not able to be run
[2019-12-06 13:41:09,004] {taskinstance.py:624} INFO - Dependencies not met for 
, 
dependency 'Task Instance State' FAILED: Task is in the 'failed' state which is 
not a valid state for execution. The task must be cleared in order to be run.
[2019-12-06 13:41:09,005] {logging_mixin.py:112} INFO - [2019-12-06 
13:41:09,005] {local_task_job.py:91} INFO - Task is not able to be run
{code}
 

At first I thought this was because the workers were still busy with the 
previous TaskInstance (because there is a delay between when a TaskInstance 
state is set to SUCCESS, and when the worker is actually done with it, because 
of the worker heartbeat). The scheduler thinks the next task can be SCHEDULED 
-> QUEUED, but does not start as the worker is still busy, and therefore it 
goes back to QUEUED -> SCHEDULED. The task is still in the worker queue, 
causing the failure above when the worker eventually wants to start it.

However what is a mystery to me is why it works the first time the dag_run 
runs, and not the second time. Perhaps it is something specific to my 
environment. 

I'm going to try and debug this myself but if anyone else can replicate this 
issue in their environment it could help me understand if it is just affecting 
me (or not). 
 Just install the DAG, let it run 100% once, then clear it and let it run again 
(and you should start seeing random failures)

 

  was:
Below dag creates 20 identical simple tasks which depend on each other in 
series.

Installing the DAG and executing all the DAG runs works perfectly the first 
time around.

Then Pausing the DAG, clearing all the dag runs, and unpausing the DAG leads to 
intermittent task failures.

Edit: This isn't specifically tied to the first and second round; it seems to 
randomly affect an entire set of dag runs or not affect the set at all.
{code:java}
from builtins import range
from datetime import timedelta

import airflow
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.latest_only_operator import LatestOnlyOperator

from airflow.operators.python_operator import (BranchPythonOperator,
   PythonOperator)
import sys, os

args = {
'owner': 'airflow',
'start_date': 

[jira] [Updated] (AIRFLOW-6190) Task instances queued and dequeued before worker is ready, causing intermittently failed tasks

2019-12-06 Thread Bjorn Olsen (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-6190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bjorn Olsen updated AIRFLOW-6190:
-
Description: 
Below dag creates 20 identical simple tasks which depend on each other in 
series.

Installing the DAG and executing all the DAG runs works perfectly the first 
time around.

Then Pausing the DAG, clearing all the dag runs, and unpausing the DAG leads to 
intermittent task failures.
{code:java}
from builtins import range
from datetime import timedelta

import airflow
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.latest_only_operator import LatestOnlyOperator

from airflow.operators.python_operator import (BranchPythonOperator,
   PythonOperator)
import sys, os

args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(5),
}

dag = DAG(
dag_id='bug_testing_dag',
default_args=args,
schedule_interval='@daily',
max_active_runs=1
)

def func():
   pass

prev_task = None
for i in range(0,20):
task = PythonOperator(
task_id='task_{0}'.format(i),
python_callable=func,
dag=dag,)
if prev_task:
prev_task >> task

prev_task = task

if __name__ == "__main__":
dag.cli(){code}
I am using the LocalExecutor.

 

Example:

!image-2019-12-06-13-55-33-974.png|width=398,height=276!

 

The second attempt tasks have 2 Logs shown on the UI if they were successful, 
and 2 physical log files on disk. However the tasks that Failed only have 1 log 
shown on the UI, despite there being 2 physical log files on disk. (Presumably 
the UI uses the Airflow DB which for some reason isn't aware of the second log 
for the failed tasks).

 

Anyway I am more interested in the intermittent failures than what logs are 
shown on the UI. 

Here is an example of the second log file for the Failed task attempts: 
{code:java}
[2019-12-06 13:40:57,064] {taskinstance.py:624} INFO - Dependencies not met for 
, 
dependency 'Task Instance State' FAILED: Task is in the 'scheduled' state which 
is not a valid state for execution. The task must be cleared in order to be run.
[2019-12-06 13:40:57,065] {logging_mixin.py:112} INFO - [2019-12-06 
13:40:57,065] {local_task_job.py:91} INFO - Task is not able to be run
[2019-12-06 13:41:09,004] {taskinstance.py:624} INFO - Dependencies not met for 
, 
dependency 'Task Instance State' FAILED: Task is in the 'failed' state which is 
not a valid state for execution. The task must be cleared in order to be run.
[2019-12-06 13:41:09,005] {logging_mixin.py:112} INFO - [2019-12-06 
13:41:09,005] {local_task_job.py:91} INFO - Task is not able to be run
{code}
 

At first I thought this was because the workers were still busy with the 
previous TaskInstance (because there is a delay between when a TaskInstance 
state is set to SUCCESS, and when the worker is actually done with it, because 
of the worker heartbeat). The scheduler thinks the next task can be SCHEDULED 
-> QUEUED, but does not start as the worker is still busy, and therefore it 
goes back to QUEUED -> SCHEDULED. The task is still in the worker queue, 
causing the failure above when the worker eventually wants to start it.

However what is a mystery to me is why it works the first time the dag_run 
runs, and not the second time. Perhaps it is something specific to my 
environment. 

I'm going to try and debug this myself but if anyone else can replicate this 
issue in their environment it could help me understand if it is just affecting 
me (or not). 
 Just install the DAG, let it run 100% once, then clear it and let it run again 
(and you should start seeing random failures)

 

  was:
Below dag creates 20 identical simple tasks which depend on each other in 
series.

Installing the DAG and executing all the DAG runs works perfectly the first 
time around.

Then Pausing the DAG, clearing all the dag runs, and unpausing the DAG leads to 
intermittent task failures.
{code:java}
from builtins import range
from datetime import timedelta

import airflow
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.latest_only_operator import LatestOnlyOperator

from airflow.operators.python_operator import (BranchPythonOperator,
   PythonOperator)
import sys, os

args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(5),
}

dag = DAG(
dag_id='bug_testing_dag',
default_args=args,
schedule_interval='@daily',
max_active_runs=1
)

def func():
   pass

prev_task = None
for i in range(0,20):
task = PythonOperator(
task_id='task_{0}'.format(i),
python_callable=func,
dag=dag,)
if prev_task:
prev_task >> task

prev_task = task

if __name__ == "__main__":
dag.cli(){code}
I am using 

[jira] [Updated] (AIRFLOW-6190) Task instances queued and dequeued before worker is ready, causing intermittently failed tasks

2019-12-06 Thread Bjorn Olsen (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-6190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bjorn Olsen updated AIRFLOW-6190:
-
Description: 
Below dag creates 20 identical simple tasks which depend on each other in 
series.

Installing the DAG and executing all the DAG runs works perfectly the first 
time around.

Then Pausing the DAG, clearing all the dag runs, and unpausing the DAG leads to 
intermittent task failures.
{code:java}
from builtins import range
from datetime import timedelta

import airflow
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.latest_only_operator import LatestOnlyOperator

from airflow.operators.python_operator import (BranchPythonOperator,
   PythonOperator)
import sys, os

args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(5),
}

dag = DAG(
dag_id='bug_testing_dag',
default_args=args,
schedule_interval='@daily',
max_active_runs=1
)

def func():
   pass

prev_task = None
for i in range(0,20):
task = PythonOperator(
task_id='task_{0}'.format(i),
python_callable=func,
dag=dag,)
if prev_task:
prev_task >> task

prev_task = task

if __name__ == "__main__":
dag.cli(){code}
I am using the SequentialExecutor.

 

Example:

!image-2019-12-06-13-55-33-974.png|width=398,height=276!

 

The second attempt tasks have 2 Logs shown on the UI if they were successful, 
and 2 physical log files on disk. However the tasks that Failed only have 1 log 
shown on the UI, despite there being 2 physical log files on disk. (Presumably 
the UI uses the Airflow DB which for some reason isn't aware of the second log 
for the failed tasks).

 

Anyway I am more interested in the intermittent failures than what logs are 
shown on the UI. 

Here is an example of the second log file for the Failed task attempts: 
{code:java}
[2019-12-06 13:40:57,064] {taskinstance.py:624} INFO - Dependencies not met for 
, 
dependency 'Task Instance State' FAILED: Task is in the 'scheduled' state which 
is not a valid state for execution. The task must be cleared in order to be run.
[2019-12-06 13:40:57,065] {logging_mixin.py:112} INFO - [2019-12-06 
13:40:57,065] {local_task_job.py:91} INFO - Task is not able to be run
[2019-12-06 13:41:09,004] {taskinstance.py:624} INFO - Dependencies not met for 
, 
dependency 'Task Instance State' FAILED: Task is in the 'failed' state which is 
not a valid state for execution. The task must be cleared in order to be run.
[2019-12-06 13:41:09,005] {logging_mixin.py:112} INFO - [2019-12-06 
13:41:09,005] {local_task_job.py:91} INFO - Task is not able to be run
{code}
 

At first I thought this was because the workers were still busy with the 
previous TaskInstance (because there is a delay between when a TaskInstance 
state is set to SUCCESS, and when the worker is actually done with it, because 
of the worker heartbeat). The scheduler thinks the next task can be SCHEDULED 
-> QUEUED, but does not start as the worker is still busy, and therefore it 
goes back to QUEUED -> SCHEDULED. The task is still in the worker queue, 
causing the failure above when the worker eventually wants to start it.

However what is a mystery to me is why it works the first time the dag_run 
runs, and not the second time. Perhaps it is something specific to my 
environment. 

I'm going to try and debug this myself but if anyone else can replicate this 
issue in their environment it could help me understand if it is just affecting 
me (or not). 
 Just install the DAG, let it run 100% once, then clear it and let it run again 
(and you should start seeing random failures)

 

  was:
Below dag creates 20 identical simple tasks which depend on each other in 
series.

Installing the DAG and executing all the DAG runs works perfectly the first 
time around.

Then Pausing the DAG, clearing all the dag runs, and unpausing the DAG leads to 
intermittent task failures.
{code:java}
from builtins import range
from datetime import timedelta

import airflow
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.latest_only_operator import LatestOnlyOperator

from airflow.operators.python_operator import (BranchPythonOperator,
   PythonOperator)
import sys, os

args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(5),
}

dag = DAG(
dag_id='bug_testing_dag',
default_args=args,
schedule_interval='@daily',
max_active_runs=1
)

def func():
   pass

prev_task = None
for i in range(0,20):
task = PythonOperator(
task_id='task_{0}'.format(i),
python_callable=func,
dag=dag,)
if prev_task:
prev_task >> task

prev_task = task

if __name__ == "__main__":
dag.cli(){code}

[jira] [Updated] (AIRFLOW-6190) Task instances queued and dequeued before worker is ready, causing intermittently failed tasks

2019-12-06 Thread Bjorn Olsen (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-6190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bjorn Olsen updated AIRFLOW-6190:
-
Description: 
Below dag creates 20 identical simple tasks which depend on each other in 
series.

Installing the DAG and executing all the DAG runs works perfectly the first 
time around.

Then Pausing the DAG, clearing all the dag runs, and unpausing the DAG leads to 
intermittent task failures.
{code:java}
from builtins import range
from datetime import timedelta

import airflow
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.latest_only_operator import LatestOnlyOperator

from airflow.operators.python_operator import (BranchPythonOperator,
   PythonOperator)
import sys, os

args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(5),
}

dag = DAG(
dag_id='bug_testing_dag',
default_args=args,
schedule_interval='@daily',
max_active_runs=1
)

def func():
   pass

prev_task = None
for i in range(0,20):
task = PythonOperator(
task_id='task_{0}'.format(i),
python_callable=func,
dag=dag,)
if prev_task:
prev_task >> task

prev_task = task

if __name__ == "__main__":
dag.cli(){code}
Example:

!image-2019-12-06-13-55-33-974.png|width=398,height=276!

 

The second attempt tasks have 2 Logs shown on the UI if they were successful, 
and 2 physical log files on disk. However the tasks that Failed only have 1 log 
shown on the UI, despite there being 2 physical log files on disk. (Presumably 
the UI uses the Airflow DB which for some reason isn't aware of the second log 
for the failed tasks).

 

Anyway I am more interested in the intermittent failures than what logs are 
shown on the UI. 

Here is an example of the second log file for the Failed task attempts: 
{code:java}
[2019-12-06 13:40:57,064] {taskinstance.py:624} INFO - Dependencies not met for 
, 
dependency 'Task Instance State' FAILED: Task is in the 'scheduled' state which 
is not a valid state for execution. The task must be cleared in order to be run.
[2019-12-06 13:40:57,065] {logging_mixin.py:112} INFO - [2019-12-06 
13:40:57,065] {local_task_job.py:91} INFO - Task is not able to be run
[2019-12-06 13:41:09,004] {taskinstance.py:624} INFO - Dependencies not met for 
, 
dependency 'Task Instance State' FAILED: Task is in the 'failed' state which is 
not a valid state for execution. The task must be cleared in order to be run.
[2019-12-06 13:41:09,005] {logging_mixin.py:112} INFO - [2019-12-06 
13:41:09,005] {local_task_job.py:91} INFO - Task is not able to be run
{code}
 

At first I thought this was because the workers were still busy with the 
previous TaskInstance (because there is a delay between when a TaskInstance 
state is set to SUCCESS, and when the worker is actually done with it, because 
of the worker heartbeat). The scheduler thinks the next task can be SCHEDULED 
-> QUEUED, but does not start as the worker is still busy, and therefore it 
goes back to QUEUED -> SCHEDULED. The task is still in the worker queue, 
causing the failure above when the worker eventually wants to start it.

However what is a mystery to me is why it works the first time the dag_run 
runs, and not the second time. Perhaps it is something specific to my 
environment. 

I'm going to try and debug this myself but if anyone else can replicate this 
issue in their environment it could help me understand if it is just affecting 
me (or not). 
 Just install the DAG, let it run 100% once, then clear it and let it run again 
(and you should start seeing random failures)

 

  was:
Below dag creates 20 identical simple tasks which depend on each other in 
series.

Installing the DAG and executing all the DAG runs works perfectly the first 
time around.

Then Pausing the DAG, clearing all the dag runs, and unpausing the DAG leads to 
intermittent task failures.
{code:java}
from builtins import range
from datetime import timedelta

import airflow
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.latest_only_operator import LatestOnlyOperator

from airflow.operators.python_operator import (BranchPythonOperator,
   PythonOperator)
import sys, os

args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(5),
}

dag = DAG(
dag_id='bug_testing_dag',
default_args=args,
schedule_interval='@daily',
max_active_runs=1
)

def func():
   pass

prev_task = None
for i in range(0,20):
task = PythonOperator(
task_id='task_{0}'.format(i),
python_callable=func,
dag=dag,)
if prev_task:
prev_task >> task

prev_task = task

if __name__ == "__main__":
dag.cli(){code}
Example:


[jira] [Created] (AIRFLOW-6190) Task instances queued and dequeued before worker is ready, causing intermittently failed tasks

2019-12-06 Thread Bjorn Olsen (Jira)
Bjorn Olsen created AIRFLOW-6190:


 Summary: Task instances queued and dequeued before worker is 
ready, causing intermittently failed tasks
 Key: AIRFLOW-6190
 URL: https://issues.apache.org/jira/browse/AIRFLOW-6190
 Project: Apache Airflow
  Issue Type: Bug
  Components: core
Affects Versions: 1.10.6
Reporter: Bjorn Olsen
Assignee: Bjorn Olsen
 Attachments: image-2019-12-06-13-55-33-974.png

Below dag creates 20 identical simple tasks which depend on each other in 
series.

Installing the DAG and executing all the DAG runs works perfectly the first 
time around.

Then Pausing the DAG, clearing all the dag runs, and unpausing the DAG leads to 
intermittent task failures.
{code:java}
from builtins import range
from datetime import timedelta

import airflow
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.latest_only_operator import LatestOnlyOperator

from airflow.operators.python_operator import (BranchPythonOperator,
   PythonOperator)
import sys, os

args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(5),
}

dag = DAG(
dag_id='bug_testing_dag',
default_args=args,
schedule_interval='@daily',
max_active_runs=1
)

def func():
   pass

prev_task = None
for i in range(0,20):
task = PythonOperator(
task_id='task_{0}'.format(i),
python_callable=func,
dag=dag,)
if prev_task:
prev_task >> task

prev_task = task

if __name__ == "__main__":
dag.cli(){code}
Example:

!image-2019-12-06-13-55-33-974.png|width=398,height=276!

 

The second attempt tasks have 2 Logs if they were successful.

However the tasks only have 1 log on the UI if they failed. On the OS, you can 
see a log file for the Failed task attempts:

 

 
{code:java}
[2019-12-06 13:40:57,064] {taskinstance.py:624} INFO - Dependencies not met for 
, 
dependency 'Task Instance State' FAILED: Task is in the 'scheduled' state which 
is not a valid state for execution. The task must be cleared in order to be run.
[2019-12-06 13:40:57,065] {logging_mixin.py:112} INFO - [2019-12-06 
13:40:57,065] {local_task_job.py:91} INFO - Task is not able to be run
[2019-12-06 13:41:09,004] {taskinstance.py:624} INFO - Dependencies not met for 
, 
dependency 'Task Instance State' FAILED: Task is in the 'failed' state which is 
not a valid state for execution. The task must be cleared in order to be run.
[2019-12-06 13:41:09,005] {logging_mixin.py:112} INFO - [2019-12-06 
13:41:09,005] {local_task_job.py:91} INFO - Task is not able to be run
{code}
 

At first I thought this was because the workers were still busy with the 
previous TaskInstance (there is a delay between when a TaskInstance state is 
set to SUCCESS, and when the worker is actually done with it, because of the 
worker heartbeat). The scheduler thinks the next task can be SCHEDULED -> 
QUEUED, but does not start as the worker is still busy, and therefore it goes 
back to QUEUED -> SCHEDULED. The task is still in the worker queue, causing the 
failure above when the worker eventually wants to start it.

However what is a mystery to me is why it works the first time the dag_run 
runs, and not the second time. Perhaps it is something specific to my 
environment. 

I'm going to try and debug this myself but if anyone else can replicate this 
issue in their environment it could help me understand if it is just affecting 
me (or not). 
Just install the DAG, let it run 100% once, then clear it and let it run again 
(and you should start seeing random failures)

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (AIRFLOW-6083) AwsLambdaHook is not accepting non-default configuration

2019-12-04 Thread Bjorn Olsen (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-6083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16987709#comment-16987709
 ] 

Bjorn Olsen edited comment on AIRFLOW-6083 at 12/4/19 10:15 AM:


This should be resolved with AIRFLOW-6072

After this PR gets merged you could use the "Extra" part of the Airflow 
Connection to pass additional config to the boto3 client / resource / session 
as you please.

 

Edit: You should also be able to change the timeout this way, if desired.


was (Author: bjorn.ols...@gmail.com):
This should be resolved with AIRFLOW-6072

After this PR gets merged you could use the "Extra" part of the Airflow 
Connection to pass additional config to the boto3 client / resource / session 
as you please.

> AwsLambdaHook is not accepting non-default configuration
> 
>
> Key: AIRFLOW-6083
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6083
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: aws
>Affects Versions: 1.10.6
>Reporter: Daumantas Pagojus
>Assignee: Daumantas Pagojus
>Priority: Blocker
>
> Hello.
> While using Airflow we have come across a problem with AwsLambdaHook.
> We are using this hook to launch Lambda function which takes a while to 
> complete (around 5-6 minutes). However, since lambda is invoked through boto3 
> client, which has default timeout set to [60 
> seconds|https://aws.amazon.com/premiumsupport/knowledge-center/lambda-function-retry-timeout-sdk/],
>  our Airflow interface shows this lambda failing, even though Lambda finishes 
> successfully checking at AWS console. This also causes another side effect: 
> Since boto3 thinks that Lambda has timed out, it automatically spawns another 
> instance, which also times out and this chain lasts 5 times, spawning 5 
> Lambdas and all these Lambdas show as failed in Airflow interface, while they 
> actually succeed.
>  
> This can be solved by passing in custom configuration when creating a boto3 
> client, however, it is not possible to do that when creating AwsLambdaHook as 
> it does not take in this parameter.
> However, we see that AwsLambdaHook inherits and uses AwsHook's function 
> (get_client_type) to get the boto3 client and this function accepts 
> configuration parameter (which defaults to None), but it is never passed to 
> it from the Lambda's hook, which could be easily achieved and would fix the 
> bug we are facing at the moment.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (AIRFLOW-6083) AwsLambdaHook is not accepting non-default configuration

2019-12-04 Thread Bjorn Olsen (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-6083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16987709#comment-16987709
 ] 

Bjorn Olsen commented on AIRFLOW-6083:
--

This should be resolved with AIRFLOW-6072

After this PR gets merged you could use the "Extra" part of the Airflow 
Connection to pass additional config to the boto3 client / resource / session 
as you please.

> AwsLambdaHook is not accepting non-default configuration
> 
>
> Key: AIRFLOW-6083
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6083
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: aws
>Affects Versions: 1.10.6
>Reporter: Daumantas Pagojus
>Assignee: Daumantas Pagojus
>Priority: Blocker
>
> Hello.
> While using Airflow we have come across a problem with AwsLambdaHook.
> We are using this hook to launch Lambda function which takes a while to 
> complete (around 5-6 minutes). However, since lambda is invoked through boto3 
> client, which has default timeout set to [60 
> seconds|https://aws.amazon.com/premiumsupport/knowledge-center/lambda-function-retry-timeout-sdk/],
>  our Airflow interface shows this lambda failing, even though Lambda finishes 
> successfully checking at AWS console. This also causes another side effect: 
> Since boto3 thinks that Lambda has timed out, it automatically spawns another 
> instance, which also times out and this chain lasts 5 times, spawning 5 
> Lambdas and all these Lambdas show as failed in Airflow interface, while they 
> actually succeed.
>  
> This can be solved by passing in custom configuration when creating a boto3 
> client, however, it is not possible to do that when creating AwsLambdaHook as 
> it does not take in this parameter.
> However, we see that AwsLambdaHook inherits and uses AwsHook's function 
> (get_client_type) to get the boto3 client and this function accepts 
> configuration parameter (which defaults to None), but it is never passed to 
> it from the Lambda's hook, which could be easily achieved and would fix the 
> bug we are facing at the moment.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (AIRFLOW-4355) Externally triggered DAG is marked as 'success' even if a task has been 'removed'!

2019-12-04 Thread Bjorn Olsen (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-4355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bjorn Olsen reassigned AIRFLOW-4355:


Assignee: Bjorn Olsen

> Externally triggered DAG is marked as 'success' even if a task has been 
> 'removed'!
> --
>
> Key: AIRFLOW-4355
> URL: https://issues.apache.org/jira/browse/AIRFLOW-4355
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: DAG, DagRun, scheduler
>Affects Versions: 1.10.3
>Reporter: t oo
>Assignee: Bjorn Olsen
>Priority: Blocker
>  Labels: dynamic
> Fix For: 2.0.0
>
> Attachments: dag_success_even_if_task_removed.png, treeview.png
>
>
> note: all my dags are purely externally triggered
> *Issue:* Dag has 5 parallel tasks that ran successfully and 1 final task that 
> somehow got 'removed' state (prior dag runs had 'failed' state) and never ran 
> successfully but still the DAG is showing success!
>  
> *Command ran* (note that previous commands like airflow trigger_dag -e 
> 20190412 qsr_coremytbl were run before and failed for valid reason (ie python 
> task failing) ):
> airflow trigger_dag -e 20190412T08:00 qsr_coremytbl --conf '\{"hourstr":"08"}'
>  
> *some logs on prior instance of airflow (ec2 was autohealed):*
> [2019-04-18 08:29:40,678] \{logging_mixin.py:95} INFO - [2019-04-18 
> 08:29:40,678] {__init__.py:4897} WARNING - Failed to get task ' qsr_coremytbl.REPAIR_HIVE_schemeh.mytbl 2019-04-12 08:00:00+00:00 [None]>' 
> for dag ''. Marking it as removed.
>  [2019-04-18 08:29:43,582] \{logging_mixin.py:95} INFO - [2019-04-18 
> 08:29:43,582] {__init__.py:4906} INFO - Restoring task ' qsr_coremytbl.REPAIR_HIVE_schemeh.mytbl 2019-04-12 08:00:00+00:00 [removed]>' 
> which was previously removed from DAG ''
>  [2019-04-18 08:29:43,618] \{jobs.py:1787} INFO - Creating / updating 
>  08:00:00+00:00 [scheduled]> in ORM
>  [2019-04-18 08:29:43,676] \{logging_mixin.py:95} INFO - [2019-04-18 
> 08:29:43,676] {__init__.py:4897} WARNING - Failed to get task ' qsr_coremytbl.REPAIR_HIVE_schemeh.mytbl 2019-04-12 08:00:00+00:00 
> [scheduled]>' for dag ''. Marking it as removed.
>  
> *some logs on newer ec2:*
> [myuser@host logs]$ grep -i hive -R * | sed 's#[0-9]#x#g' | sort | uniq -c | 
> grep -v 'airflow-webserver-access.log'
>  2 audit/airflow-audit.log:-xx-xx xx:xx:xx.xx  qsr_coremytbl 
> REPAIR_HIVE_schemeh.mytbl log -xx-xx xx:xx:xx.xx rsawyerx 
> [('execution_date', u'-xx-xxTxx:xx:xx+xx:xx'), ('task_id', 
> u'REPAIR_HIVE_schemeh.mytbl'), ('dag_id', u'qsr_coremytbl')]
>  1 audit/airflow-audit.log:-xx-xx xx:xx:xx.xx  qsr_coremytbl 
> REPAIR_HIVE_schemeh.mytbl log -xx-xx xx:xx:xx.xx rsawyerx 
> [('execution_date', u'-xx-xxTxx:xx:xx+xx:xx'), ('task_id', 
> u'REPAIR_HIVE_schemeh.mytbl'), ('dag_id', u'qsr_coremytbl'), ('format', 
> u'json')]
>  1 audit/airflow-audit.log:-xx-xx xx:xx:xx.xx  qsr_coremytbl 
> REPAIR_HIVE_schemeh.mytbl rendered -xx-xx xx:xx:xx.xx rsawyerx 
> [('execution_date', u'-xx-xxTxx:xx:xx+xx:xx'), ('task_id', 
> u'REPAIR_HIVE_schemeh.mytbl'), ('dag_id', u'qsr_coremytbl')]
>  1 audit/airflow-audit.log:-xx-xx xx:xx:xx.xx  qsr_coremytbl 
> REPAIR_HIVE_schemeh.mytbl task -xx-xx xx:xx:xx.xx rsawyerx 
> [('execution_date', u'-xx-xxTxx:xx:xx+xx:xx'), ('task_id', 
> u'REPAIR_HIVE_schemeh.mytbl'), ('dag_id', u'qsr_coremytbl')]
>  1 scheduler/latest/qsr_dag_generation.py.log:[-xx-xx xx:xx:xx,xxx] 
> \{jobs.py:} INFO - Creating / updating  qsr_coremytbl.REPAIR_HIVE_schemeh.mytbl -xx-xx xx:xx:xx+xx:xx 
> [scheduled]> in ORM
>  71 scheduler/latest/qsr_dag_generation.py.log:[-xx-xx xx:xx:xx,xxx] 
> \{logging_mixin.py:xx} INFO - [-xx-xx xx:xx:xx,xxx] {__init__.py:} 
> INFO - Restoring task ' -xx-xx xx:xx:xx+xx:xx [removed]>' which was previously removed from DAG 
> ''
>  1 scheduler/-xx-xx/qsr_dag_generation.py.log:[-xx-xx xx:xx:xx,xxx] 
> \{jobs.py:} INFO - Creating / updating  qsr_coremytbl.REPAIR_HIVE_schemeh.mytbl -xx-xx xx:xx:xx+xx:xx 
> [scheduled]> in ORM
>  71 scheduler/-xx-xx/qsr_dag_generation.py.log:[-xx-xx xx:xx:xx,xxx] 
> \{logging_mixin.py:xx} INFO - [-xx-xx xx:xx:xx,xxx] {__init__.py:} 
> INFO - Restoring task ' -xx-xx xx:xx:xx+xx:xx [removed]>' which was previously removed from DAG 
> ''
>  
> mysql> *select * from task_instance where task_id like '%REP%';#*
>  
> 

[jira] [Updated] (AIRFLOW-6170) BranchPythonOperator does not do XCom push of returned value

2019-12-03 Thread Bjorn Olsen (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-6170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bjorn Olsen updated AIRFLOW-6170:
-
Description: 
BranchPythonOperator subclasses PythonOperator, and after it has selected a 
branch we get a message like this in the task log:

[2019-12-04 08:39:59,960] \{python_operator.py:114} INFO - Done. Returned value 
was: 

 This returned value is from the execute() method.

A user would expect the returned value from this to be pushed to an XCOM; but 
this does not happen due to no return value from the BranchPythonOperator's 
execute method.
  
{code:java}
 def execute(self, context: Dict):
 branch = super().execute(context)
 self.skip_all_except(context['ti'], branch)
 {code}
 

If we do an XCOM push of the result then we can use the decision made by 
BranchPythonOperator in downstream tasks.

Eg consider this dependency chain:
 get >> branch >>   [ create, update ]>> join >> execute

The 'execute' task might need to know whether the 'branch' decided to create a 
new thing to run, or whether to use the existing one from the 'get'. Without an 
XCOM push from the branch return value, it is difficult to pick up the correct 
value later on.

  was:
BranchPythonOperator subclasses PythonOperator and this means that after it has 
selected a branch we get a message like this:

[2019-12-04 08:39:59,960] \{python_operator.py:114} INFO - Done. Returned value 
was: chosen_task

 

This returned value is from the execute() method. A user would expect the 
returned value from this to be pushed to an XCOM but this does not happen due 
to no return value from the BranchPythonOperator's execute method.

 
 
{code:java}
 def execute(self, context: Dict):
 branch = super().execute(context)
 self.skip_all_except(context['ti'], branch)
 {code}
 

If we do an XCOM push of the result then we can use the decision made by 
BranchPythonOperator in downstream tasks.

Eg consider this dependency chain:
 get >> branch >>   [ create, update ]>> join >> execute

The 'execute' task might need to know whether the 'branch' decided to create a 
new thing to run, or whether to use the existing one from the 'get'. Without an 
XCOM push from the branch return value, it is difficult to pick up the correct 
value later on.


> BranchPythonOperator does not do XCom push of returned value
> 
>
> Key: AIRFLOW-6170
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6170
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: operators
>Affects Versions: 1.10.6
>Reporter: Bjorn Olsen
>Assignee: Bjorn Olsen
>Priority: Minor
>
> BranchPythonOperator subclasses PythonOperator, and after it has selected a 
> branch we get a message like this in the task log:
> [2019-12-04 08:39:59,960] \{python_operator.py:114} INFO - Done. Returned 
> value was: 
>  This returned value is from the execute() method.
> A user would expect the returned value from this to be pushed to an XCOM; but 
> this does not happen due to no return value from the BranchPythonOperator's 
> execute method.
>   
> {code:java}
>  def execute(self, context: Dict):
>  branch = super().execute(context)
>  self.skip_all_except(context['ti'], branch)
>  {code}
>  
> If we do an XCOM push of the result then we can use the decision made by 
> BranchPythonOperator in downstream tasks.
> Eg consider this dependency chain:
>  get >> branch >>   [ create, update ]>> join >> execute
> The 'execute' task might need to know whether the 'branch' decided to create 
> a new thing to run, or whether to use the existing one from the 'get'. 
> Without an XCOM push from the branch return value, it is difficult to pick up 
> the correct value later on.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (AIRFLOW-6170) BranchPythonOperator does not do XCom push of returned value

2019-12-03 Thread Bjorn Olsen (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-6170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bjorn Olsen updated AIRFLOW-6170:
-
Priority: Minor  (was: Major)

> BranchPythonOperator does not do XCom push of returned value
> 
>
> Key: AIRFLOW-6170
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6170
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: operators
>Affects Versions: 1.10.6
>Reporter: Bjorn Olsen
>Assignee: Bjorn Olsen
>Priority: Minor
>
> BranchPythonOperator subclasses PythonOperator and this means that after it 
> has selected a branch we get a message like this:
> [2019-12-04 08:39:59,960] \{python_operator.py:114} INFO - Done. Returned 
> value was: chosen_task
>  
> This returned value is from the execute() method. A user would expect the 
> returned value from this to be pushed to an XCOM but this does not happen due 
> to no return value from the BranchPythonOperator's execute method.
>  
>  
> {code:java}
>  def execute(self, context: Dict):
>  branch = super().execute(context)
>  self.skip_all_except(context['ti'], branch)
>  {code}
>  
> If we do an XCOM push of the result then we can use the decision made by 
> BranchPythonOperator in downstream tasks.
> Eg consider this dependency chain:
>  get >> branch >>   [ create, update ]>> join >> execute
> The 'execute' task might need to know whether the 'branch' decided to create 
> a new thing to run, or whether to use the existing one from the 'get'. 
> Without an XCOM push from the branch return value, it is difficult to pick up 
> the correct value later on.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (AIRFLOW-6170) BranchPythonOperator does not do XCom push of returned value

2019-12-03 Thread Bjorn Olsen (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-6170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bjorn Olsen updated AIRFLOW-6170:
-
Description: 
BranchPythonOperator subclasses PythonOperator and this means that after it has 
selected a branch we get a message like this:

[2019-12-04 08:39:59,960] \{python_operator.py:114} INFO - Done. Returned value 
was: chosen_task

 

This returned value is from the execute() method. A user would expect the 
returned value from this to be pushed to an XCOM but this does not happen due 
to no return value from the BranchPythonOperator's execute method.

 
 
{code:java}
 def execute(self, context: Dict):
 branch = super().execute(context)
 self.skip_all_except(context['ti'], branch)
 {code}
 

If we do an XCOM push of the result then we can use the decision made by 
BranchPythonOperator in downstream tasks.

Eg consider this dependency chain:
 get >> branch >>   [ create, update ]>> join >> execute

The 'execute' task might need to know whether the 'branch' decided to create a 
new thing to run, or whether to use the existing one from the 'get'. Without an 
XCOM push from the branch return value, it is difficult to pick up the correct 
value later on.

  was:
BranchPythonOperator subclasses PythonOperator and this means that after it has 
selected a branch we get a message like this:

[2019-12-04 08:39:59,960] \{python_operator.py:114} INFO - Done. Returned value 
was: chosen_task

 

This returned value is from the execute() method. A user would expect the 
returned value from this to be pushed to an XCOM but this does not happen due 
to no return value from the BranchPythonOperator's execute method.

 

 

 

If we do an XCOM push of the result then we can use the decision made by 
BranchPythonOperator in downstream tasks.

Eg consider this dependency chain:
 get >> branch >>   [ create, update ]>> join >> execute

The 'execute' task might need to know whether the 'branch' decided to create a 
new thing to run, or whether to use the existing one from the 'get'. Without an 
XCOM push from the branch return value, it is difficult to pick up the correct 
value later on.


> BranchPythonOperator does not do XCom push of returned value
> 
>
> Key: AIRFLOW-6170
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6170
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: operators
>Affects Versions: 1.10.6
>Reporter: Bjorn Olsen
>Assignee: Bjorn Olsen
>Priority: Major
>
> BranchPythonOperator subclasses PythonOperator and this means that after it 
> has selected a branch we get a message like this:
> [2019-12-04 08:39:59,960] \{python_operator.py:114} INFO - Done. Returned 
> value was: chosen_task
>  
> This returned value is from the execute() method. A user would expect the 
> returned value from this to be pushed to an XCOM but this does not happen due 
> to no return value from the BranchPythonOperator's execute method.
>  
>  
> {code:java}
>  def execute(self, context: Dict):
>  branch = super().execute(context)
>  self.skip_all_except(context['ti'], branch)
>  {code}
>  
> If we do an XCOM push of the result then we can use the decision made by 
> BranchPythonOperator in downstream tasks.
> Eg consider this dependency chain:
>  get >> branch >>   [ create, update ]>> join >> execute
> The 'execute' task might need to know whether the 'branch' decided to create 
> a new thing to run, or whether to use the existing one from the 'get'. 
> Without an XCOM push from the branch return value, it is difficult to pick up 
> the correct value later on.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (AIRFLOW-6170) BranchPythonOperator does not do XCom push of returned value

2019-12-03 Thread Bjorn Olsen (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-6170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bjorn Olsen updated AIRFLOW-6170:
-
Issue Type: Bug  (was: Improvement)

> BranchPythonOperator does not do XCom push of returned value
> 
>
> Key: AIRFLOW-6170
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6170
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: operators
>Affects Versions: 1.10.6
>Reporter: Bjorn Olsen
>Assignee: Bjorn Olsen
>Priority: Minor
>
> BranchPythonOperator subclasses PythonOperator and this means that after it 
> has selected a branch we get a message like this:
> [2019-12-04 08:39:59,960] \{python_operator.py:114} INFO - Done. Returned 
> value was: chosen_task
>  
> This returned value is from the execute() method. A user would expect the 
> returned value from this to be pushed to an XCOM but this does not happen due 
> to no return value from the BranchPythonOperator's execute method.
>  
>  
> {code:java}
>  def execute(self, context: Dict):
>  branch = super().execute(context)
>  self.skip_all_except(context['ti'], branch)
>  {code}
>  
> If we do an XCOM push of the result then we can use the decision made by 
> BranchPythonOperator in downstream tasks.
> Eg consider this dependency chain:
>  get >> branch >>   [ create, update ]>> join >> execute
> The 'execute' task might need to know whether the 'branch' decided to create 
> a new thing to run, or whether to use the existing one from the 'get'. 
> Without an XCOM push from the branch return value, it is difficult to pick up 
> the correct value later on.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (AIRFLOW-6170) BranchPythonOperator does not do XCom push of returned value

2019-12-03 Thread Bjorn Olsen (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-6170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bjorn Olsen updated AIRFLOW-6170:
-
Description: 
BranchPythonOperator subclasses PythonOperator and this means that after it has 
selected a branch we get a message like this:

[2019-12-04 08:39:59,960] \{python_operator.py:114} INFO - Done. Returned value 
was: chosen_task

 

This returned value is from the execute() method. A user would expect the 
returned value from this to be pushed to an XCOM but this does not happen due 
to no return value from the BranchPythonOperator's execute method.

 

 

 

If we do an XCOM push of the result then we can use the decision made by 
BranchPythonOperator in downstream tasks.

Eg consider this dependency chain:
 get >> branch >>   [ create, update ]>> join >> execute

The 'execute' task might need to know whether the 'branch' decided to create a 
new thing to run, or whether to use the existing one from the 'get'. Without an 
XCOM push from the branch return value, it is difficult to pick up the correct 
value later on.

  was:
BranchPythonOperator subclasses PythonOperator and this means that after it has 
selected a branch we get a message like this:

[2019-12-04 08:39:59,960] \{python_operator.py:114} INFO - Done. Returned value 
was: chosen_task

 

This returned value is from the execute() method. A user would expect the 
returned value from this to be pushed to an XCOM but this does not happen due 
to no return value from the BranchPythonOperator.

 

If we do an XCOM push of the result then we can use the decision made by 
BranchPythonOperator in downstream tasks.

Eg consider this dependency chain:
get >> branch >>   [ create, update ]>> join >> execute

The 'execute' task might need to know whether the 'branch' decided to create a 
new thing to run, or whether to use the existing one from the 'get'. Without an 
XCOM push from the branch return value, it is difficult to pick up the correct 
value later on.


> BranchPythonOperator does not do XCom push of returned value
> 
>
> Key: AIRFLOW-6170
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6170
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: operators
>Affects Versions: 1.10.6
>Reporter: Bjorn Olsen
>Assignee: Bjorn Olsen
>Priority: Major
>
> BranchPythonOperator subclasses PythonOperator and this means that after it 
> has selected a branch we get a message like this:
> [2019-12-04 08:39:59,960] \{python_operator.py:114} INFO - Done. Returned 
> value was: chosen_task
>  
> This returned value is from the execute() method. A user would expect the 
> returned value from this to be pushed to an XCOM but this does not happen due 
> to no return value from the BranchPythonOperator's execute method.
>  
>  
>  
> If we do an XCOM push of the result then we can use the decision made by 
> BranchPythonOperator in downstream tasks.
> Eg consider this dependency chain:
>  get >> branch >>   [ create, update ]>> join >> execute
> The 'execute' task might need to know whether the 'branch' decided to create 
> a new thing to run, or whether to use the existing one from the 'get'. 
> Without an XCOM push from the branch return value, it is difficult to pick up 
> the correct value later on.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (AIRFLOW-6170) BranchPythonOperator does not do XCom push of returned value

2019-12-03 Thread Bjorn Olsen (Jira)
Bjorn Olsen created AIRFLOW-6170:


 Summary: BranchPythonOperator does not do XCom push of returned 
value
 Key: AIRFLOW-6170
 URL: https://issues.apache.org/jira/browse/AIRFLOW-6170
 Project: Apache Airflow
  Issue Type: Improvement
  Components: operators
Affects Versions: 1.10.6
Reporter: Bjorn Olsen
Assignee: Bjorn Olsen


BranchPythonOperator subclasses PythonOperator and this means that after it has 
selected a branch we get a message like this:

[2019-12-04 08:39:59,960] \{python_operator.py:114} INFO - Done. Returned value 
was: chosen_task

 

This returned value is from the execute() method. A user would expect the 
returned value from this to be pushed to an XCOM but this does not happen due 
to no return value from the BranchPythonOperator.

 

If we do an XCOM push of the result then we can use the decision made by 
BranchPythonOperator in downstream tasks.

Eg consider this dependency chain:
get >> branch >>   [ create, update ]>> join >> execute

The 'execute' task might need to know whether the 'branch' decided to create a 
new thing to run, or whether to use the existing one from the 'get'. Without an 
XCOM push from the branch return value, it is difficult to pick up the correct 
value later on.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (AIRFLOW-6075) AWS Hook: Improve documentation

2019-11-26 Thread Bjorn Olsen (Jira)
Bjorn Olsen created AIRFLOW-6075:


 Summary: AWS Hook: Improve documentation
 Key: AIRFLOW-6075
 URL: https://issues.apache.org/jira/browse/AIRFLOW-6075
 Project: Apache Airflow
  Issue Type: Improvement
  Components: aws
Affects Versions: 1.10.6
Reporter: Bjorn Olsen


The AWS Hook logic is confusing and could do with some extra logging. This 
would help users to correctly set up their AWS connections for the first time.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (AIRFLOW-6072) aws_hook: Ability to set outbound proxy

2019-11-26 Thread Bjorn Olsen (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-6072?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bjorn Olsen updated AIRFLOW-6072:
-
Description: 
The boto3 connection used by aws_hook does not respect outbound http_proxy 
settings (even if these are set in system wide). 

 

The way to configure a proxy is to pass a botocore.config.Config object to 
boto3 when creating a client (according to this SO post).

[https://stackoverflow.com/questions/33480108/how-do-you-use-an-http-https-proxy-with-boto3]

While the aws_hook get_client_type() method is used extensively by AWS 
Operators, the "config" argument is not used by any operator. 

Adding a check to aws_hook for "config" in the "extra_config" of the Airflow 
Connection, could allow us to pass kwargs there that build the Config object 
automatically by the hook.

Otherwise we have to update every AWS Operator to also take a "config" 
parameter.

 

To set an outbound proxy is then as simple as adding this to your extra_config:
{code:java}
{ .. ,
  "config":{ "proxies": {
    "http": "http://myproxy:8080;,
    "https": "http://myproxy:8080; }},
 .. }
{code}
 

This needs to work both for the main boto3 clients that do task work, but also 
during the assume_role process which also uses a boto3 client.

  was:
The boto3 connection used by aws_hook does not respect outbound http_proxy 
settings (even if these are set in system wide). 

 

The way to configure a proxy is to pass a botocore.config.Config object to 
boto3 when creating a client (according to this SO post).

[https://stackoverflow.com/questions/33480108/how-do-you-use-an-http-https-proxy-with-boto3]

While the aws_hook get_client_type() method is used extensively by AWS 
Operators, the "config" argument is not used by any operator. 

Adding a check to aws_hook for "config" in the "extra_config" of the Airflow 
Connection, could allow us to pass kwargs there that build the Config object 
automatically by the hook.

Otherwise we have to update every AWS Operator to also take a "config" 
parameter.

 

To set an outbound proxy is then as simple as adding this to your extra_config:
#{ .. ,
#"config":{"proxies": {
#"http": "http://myproxy:8080;,
#"https": "http://myproxy:8080"}},
# ..}


> aws_hook: Ability to set outbound proxy
> ---
>
> Key: AIRFLOW-6072
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6072
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: aws
>Affects Versions: 1.10.6
>Reporter: Bjorn Olsen
>Assignee: Bjorn Olsen
>Priority: Minor
>
> The boto3 connection used by aws_hook does not respect outbound http_proxy 
> settings (even if these are set in system wide). 
>  
> The way to configure a proxy is to pass a botocore.config.Config object to 
> boto3 when creating a client (according to this SO post).
> [https://stackoverflow.com/questions/33480108/how-do-you-use-an-http-https-proxy-with-boto3]
> While the aws_hook get_client_type() method is used extensively by AWS 
> Operators, the "config" argument is not used by any operator. 
> Adding a check to aws_hook for "config" in the "extra_config" of the Airflow 
> Connection, could allow us to pass kwargs there that build the Config object 
> automatically by the hook.
> Otherwise we have to update every AWS Operator to also take a "config" 
> parameter.
>  
> To set an outbound proxy is then as simple as adding this to your 
> extra_config:
> {code:java}
> { .. ,
>   "config":{ "proxies": {
>     "http": "http://myproxy:8080;,
>     "https": "http://myproxy:8080; }},
>  .. }
> {code}
>  
> This needs to work both for the main boto3 clients that do task work, but 
> also during the assume_role process which also uses a boto3 client.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (AIRFLOW-6072) aws_hook: Ability to set outbound proxy

2019-11-26 Thread Bjorn Olsen (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-6072?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bjorn Olsen updated AIRFLOW-6072:
-
Description: 
The boto3 connection used by aws_hook does not respect outbound http_proxy 
settings (even if these are set in system wide). 

 

The way to configure a proxy is to pass a botocore.config.Config object to 
boto3 when creating a client (according to this SO post).

[https://stackoverflow.com/questions/33480108/how-do-you-use-an-http-https-proxy-with-boto3]

While the aws_hook get_client_type() method is used extensively by AWS 
Operators, the "config" argument is not used by any operator. 

Adding a check to aws_hook for "config" in the "extra_config" of the Airflow 
Connection, could allow us to pass kwargs there that build the Config object 
automatically by the hook.

Otherwise we have to update every AWS Operator to also take a "config" 
parameter.

 

To set an outbound proxy is then as simple as adding this to your extra_config:
#{ .. ,
#"config":{"proxies": {
#"http": "http://myproxy:8080;,
#"https": "http://myproxy:8080"}},
# ..}

  was:
The boto3 connection used by aws_hook does not respect outbound http_proxy 
settings (even if these are set in system wide). 

 

The way to configure a proxy is to pass a botocore.config.Config object to 
boto3 when creating a client (according to this SO post).

[https://stackoverflow.com/questions/33480108/how-do-you-use-an-http-https-proxy-with-boto3]

While the aws_hook get_client_type() method is used extensively by AWS 
Operators, the "config" argument is not used by any operator. 

Adding a check to aws_hook for "config" in the "extra_config" of the Airflow 
Connection, could allow us to pass kwargs there that build the Config object 
automatically by the hook is created.

Otherwise we have to update every AWS Operator to also take a "config" 
parameter.


> aws_hook: Ability to set outbound proxy
> ---
>
> Key: AIRFLOW-6072
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6072
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: aws
>Affects Versions: 1.10.6
>Reporter: Bjorn Olsen
>Assignee: Bjorn Olsen
>Priority: Minor
>
> The boto3 connection used by aws_hook does not respect outbound http_proxy 
> settings (even if these are set in system wide). 
>  
> The way to configure a proxy is to pass a botocore.config.Config object to 
> boto3 when creating a client (according to this SO post).
> [https://stackoverflow.com/questions/33480108/how-do-you-use-an-http-https-proxy-with-boto3]
> While the aws_hook get_client_type() method is used extensively by AWS 
> Operators, the "config" argument is not used by any operator. 
> Adding a check to aws_hook for "config" in the "extra_config" of the Airflow 
> Connection, could allow us to pass kwargs there that build the Config object 
> automatically by the hook.
> Otherwise we have to update every AWS Operator to also take a "config" 
> parameter.
>  
> To set an outbound proxy is then as simple as adding this to your 
> extra_config:
> #{ .. ,
> #"config":{"proxies": {
> #"http": "http://myproxy:8080;,
> #"https": "http://myproxy:8080"}},
> # ..}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (AIRFLOW-6072) aws_hook: Ability to set outbound proxy

2019-11-26 Thread Bjorn Olsen (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-6072?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bjorn Olsen updated AIRFLOW-6072:
-
Priority: Minor  (was: Major)

> aws_hook: Ability to set outbound proxy
> ---
>
> Key: AIRFLOW-6072
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6072
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: aws
>Affects Versions: 1.10.6
>Reporter: Bjorn Olsen
>Assignee: Bjorn Olsen
>Priority: Minor
>
> The boto3 connection used by aws_hook does not respect outbound http_proxy 
> settings (even if these are set in system wide). 
>  
> The way to configure a proxy is to pass a botocore.config.Config object to 
> boto3 when creating a client (according to this SO post).
> [https://stackoverflow.com/questions/33480108/how-do-you-use-an-http-https-proxy-with-boto3]
> While the aws_hook get_client_type() method is used extensively by AWS 
> Operators, the "config" argument is not used by any operator. 
> Adding a check to aws_hook for "config" in the "extra_config" of the Airflow 
> Connection, could allow us to pass kwargs there that build the Config object 
> automatically by the hook is created.
> Otherwise we have to update every AWS Operator to also take a "config" 
> parameter.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (AIRFLOW-6072) aws_hook: Ability to set outbound proxy

2019-11-26 Thread Bjorn Olsen (Jira)
Bjorn Olsen created AIRFLOW-6072:


 Summary: aws_hook: Ability to set outbound proxy
 Key: AIRFLOW-6072
 URL: https://issues.apache.org/jira/browse/AIRFLOW-6072
 Project: Apache Airflow
  Issue Type: Improvement
  Components: aws
Affects Versions: 1.10.6
Reporter: Bjorn Olsen
Assignee: Bjorn Olsen


The boto3 connection used by aws_hook does not respect outbound http_proxy 
settings (even if these are set in system wide). 

 

The way to configure a proxy is to pass a botocore.config.Config object to 
boto3 when creating a client (according to this SO post).

[https://stackoverflow.com/questions/33480108/how-do-you-use-an-http-https-proxy-with-boto3]

While the aws_hook get_client_type() method is used extensively by AWS 
Operators, the "config" argument is not used by any operator. 

Adding a check to aws_hook for "config" in the "extra_config" of the Airflow 
Connection, could allow us to pass kwargs there that build the Config object 
automatically by the hook is created.

Otherwise we have to update every AWS Operator to also take a "config" 
parameter.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (AIRFLOW-6038) AWS DataSync example dags

2019-11-21 Thread Bjorn Olsen (Jira)
Bjorn Olsen created AIRFLOW-6038:


 Summary: AWS DataSync example dags
 Key: AIRFLOW-6038
 URL: https://issues.apache.org/jira/browse/AIRFLOW-6038
 Project: Apache Airflow
  Issue Type: Improvement
  Components: aws, examples
Affects Versions: 1.10.6
Reporter: Bjorn Olsen
Assignee: Bjorn Olsen


Add example_dags for AWS DataSync operators



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (AIRFLOW-5824) Add a new hook and operator for AWS DataSync

2019-10-31 Thread Bjorn Olsen (Jira)
Bjorn Olsen created AIRFLOW-5824:


 Summary: Add a new hook and operator for AWS DataSync
 Key: AIRFLOW-5824
 URL: https://issues.apache.org/jira/browse/AIRFLOW-5824
 Project: Apache Airflow
  Issue Type: New Feature
  Components: aws
Affects Versions: 1.10.5
Reporter: Bjorn Olsen
Assignee: Bjorn Olsen


I'm working on some code to add a hook and operator for AWS DataSync task 
executions.

Just creating a Jira ticket for visibility



--
This message was sent by Atlassian Jira
(v8.3.4#803005)