[jira] [Assigned] (AIRFLOW-6778) Add a DAGs PVC Mount Point Option for Workers under Kubernetes Executor
[ https://issues.apache.org/jira/browse/AIRFLOW-6778?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bjorn Olsen reassigned AIRFLOW-6778: Assignee: Daniel Imberman (was: Bjorn Olsen) > Add a DAGs PVC Mount Point Option for Workers under Kubernetes Executor > --- > > Key: AIRFLOW-6778 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6778 > Project: Apache Airflow > Issue Type: Improvement > Components: executor-kubernetes, worker >Affects Versions: 1.10.6, 1.10.7, 1.10.8, 1.10.9 >Reporter: Brandon Willard >Assignee: Daniel Imberman >Priority: Blocker > Labels: kubernetes, options > > The worker pods generated by the Kubernetes Executor force the DAGs PVC to be > mounted at the Airflow DAGs folder. This, combined with a general inability > to specify arbitrary PVCs on workers (see AIRFLOW-3126 and the > linked/duplicated issues), severely constrains the usability of worker pods > and the Kubernetes Executor as a whole. > > For example, if a DAGs-containing PVC is rooted at a Python package (e.g. > {{package/}}) that needs to be installed on each worker (e.g. DAGs in > {{package/dags/}}, package install point at {{package/setup.py}}, and Airflow > DAGs location {{/airflow/dags}}), then the current static mount point logic > will only allow a worker to directly mount the entire package into the > Airflow DAGs location — while the actual DAGs are in a subdirectory — or > exclusively mount the package's sub-path {{package/dags}} (using the existing > {{kubernetes.dags_volume_subpath}} config option). While the latter is at > least correct, it completely foregoes the required parent directory and it > makes the requisite package unavailable for installation (e.g. the files > under {{package/}} are not available). > > -In general, the only approach that seems to work for the Kubernetes Executor > is to specify a worker image with all DAG dependencies pre-loaded, which > largely voids the usefulness of a single DAGs PVC that can be dynamically > updated. At best, one can include a {{requirements.txt}} in the PVC and use > it in tandem with an entry-point script built into the image, but that still > doesn't help with source installations of custom packages stored and updated > in a PVC.- > Edit: This isn't even possible, because worker pods are created using [the > {{command}} field instead of > {{args}}|https://kubernetes.io/docs/tasks/inject-data-application/define-command-argument-container/#notes]! > > A quick fix for this situation is to allow one to specify the DAGs PVC mount > point. With this option, one can mount the PVC anywhere and specify an > Airflow DAGs location that works in conjunction with the mount point (e.g. > mount the PVC at {{/airflow/package}} and independently set the Airflow DAGs > location to {{/airflow/package/dags}}). This option would — in many cases — > obviate the need for the marginally useful {{kubernetes.dags_volume_subpath}} > options, as well. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (AIRFLOW-6778) Add a DAGs PVC Mount Point Option for Workers under Kubernetes Executor
[ https://issues.apache.org/jira/browse/AIRFLOW-6778?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bjorn Olsen reassigned AIRFLOW-6778: Assignee: Bjorn Olsen > Add a DAGs PVC Mount Point Option for Workers under Kubernetes Executor > --- > > Key: AIRFLOW-6778 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6778 > Project: Apache Airflow > Issue Type: Improvement > Components: executor-kubernetes, worker >Affects Versions: 1.10.6, 1.10.7, 1.10.8, 1.10.9 >Reporter: Brandon Willard >Assignee: Bjorn Olsen >Priority: Blocker > Labels: kubernetes, options > > The worker pods generated by the Kubernetes Executor force the DAGs PVC to be > mounted at the Airflow DAGs folder. This, combined with a general inability > to specify arbitrary PVCs on workers (see AIRFLOW-3126 and the > linked/duplicated issues), severely constrains the usability of worker pods > and the Kubernetes Executor as a whole. > > For example, if a DAGs-containing PVC is rooted at a Python package (e.g. > {{package/}}) that needs to be installed on each worker (e.g. DAGs in > {{package/dags/}}, package install point at {{package/setup.py}}, and Airflow > DAGs location {{/airflow/dags}}), then the current static mount point logic > will only allow a worker to directly mount the entire package into the > Airflow DAGs location — while the actual DAGs are in a subdirectory — or > exclusively mount the package's sub-path {{package/dags}} (using the existing > {{kubernetes.dags_volume_subpath}} config option). While the latter is at > least correct, it completely foregoes the required parent directory and it > makes the requisite package unavailable for installation (e.g. the files > under {{package/}} are not available). > > -In general, the only approach that seems to work for the Kubernetes Executor > is to specify a worker image with all DAG dependencies pre-loaded, which > largely voids the usefulness of a single DAGs PVC that can be dynamically > updated. At best, one can include a {{requirements.txt}} in the PVC and use > it in tandem with an entry-point script built into the image, but that still > doesn't help with source installations of custom packages stored and updated > in a PVC.- > Edit: This isn't even possible, because worker pods are created using [the > {{command}} field instead of > {{args}}|https://kubernetes.io/docs/tasks/inject-data-application/define-command-argument-container/#notes]! > > A quick fix for this situation is to allow one to specify the DAGs PVC mount > point. With this option, one can mount the PVC anywhere and specify an > Airflow DAGs location that works in conjunction with the mount point (e.g. > mount the PVC at {{/airflow/package}} and independently set the Airflow DAGs > location to {{/airflow/package/dags}}). This option would — in many cases — > obviate the need for the marginally useful {{kubernetes.dags_volume_subpath}} > options, as well. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (AIRFLOW-6975) Base AWSHook AssumeRoleWithSAML
Bjorn Olsen created AIRFLOW-6975: Summary: Base AWSHook AssumeRoleWithSAML Key: AIRFLOW-6975 URL: https://issues.apache.org/jira/browse/AIRFLOW-6975 Project: Apache Airflow Issue Type: Improvement Components: aws Affects Versions: 1.10.9 Reporter: Bjorn Olsen Assignee: Bjorn Olsen Base AWS Hook currently does AssumeRole but we require it to additionally be able to do AssumeRoleWithSAML. +Current+ [https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_request.html#api_assumerole] The AssumeRole API operation is useful for allowing existing IAM users to access AWS resources that they don't already have access to. (This requires an AWS IAM user) +Proposed addition+ [https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_request.html#api_assumerolewithsaml] The AssumeRoleWithSAML API operation returns a set of temporary security credentials for federated users who are authenticated by your organization's existing identity system. (This allows federated login using another IDP rather than requiring an AWS IAM user). +Use case+ We need to be able to authenticate an AD user against our IDP (Windows Active Directory). We can obtain a SAML assertion from our IDP, and then provide it to AWS STS to exchange it for AWS temporary credentials, thus authorising us to use AWS services. The AWS AssumeRoleWithSAML API is intended for this use case, and the Base AWS Hook should be updated to allow for this method of authentication. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (AIRFLOW-6944) Allow AWS DataSync to "catch up" when Task is already running
[ https://issues.apache.org/jira/browse/AIRFLOW-6944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bjorn Olsen updated AIRFLOW-6944: - Description: Current AWS DataSyncOperator attempts to start running a DataSync Task, with no regard / check for whether the task is already running or not. This attempt will fail (correctly). It is useful to have capability to optionally allow the operator to "catch up" instead of starting the Task - if the Task is of a particular status eg 'QUEUED' then we might want to wait for the currently Queued one to complete, instead of failing or instead of submitting another one (and snowballing). For example, this scenario can happen if the task was previously submitted but the Airflow Operator timed out waiting for it, when DataSync is busy. Or, maybe we want to wait for the Queued task to complete and then submit another Task anyway... Allowing the user some options for starting the Task depending on Status, allows for various use cases. However, the current functionality of "Fail if the Task can't be started" should remain default, to prevent unintentional problems which can arise if we instead decided to always wait if there is already a task queued. For example if the previous task has different Include filters than the new task, then logically they aren't the same. was: Current AWS DataSyncOperator attempts to start running a DataSync Task, with no regard / check for whether the task is already running or not. This attempt will fail (correctly). It is useful to have capability to optionally allow the operator to "catch up" instead of starting the Task - if the Task is of a particular status eg 'QUEUED' then we might want to wait for the currently Queued one to complete, instead of failing or instead of submitting another one (and snowballing). For example, this scenario can happen if the task was previously submitted but the Airflow Operator timed out waiting for it, when DataSync is busy. Or, maybe we want to wait for the Queued task to complete and then submit another Task anyway... Allowing the user some options in terms of status management allows for various use cases. However, the current functionality of "Fail if the Task can't be started" should remain default, to prevent unintentional problems which can arise if we instead decided to always wait if there is already a task queued. For example if the previous task has different Include filters than the new task, then logically they aren't the same. > Allow AWS DataSync to "catch up" when Task is already running > - > > Key: AIRFLOW-6944 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6944 > Project: Apache Airflow > Issue Type: Improvement > Components: aws >Affects Versions: 1.10.9 >Reporter: Bjorn Olsen >Assignee: Bjorn Olsen >Priority: Minor > > Current AWS DataSyncOperator attempts to start running a DataSync Task, with > no regard / check for whether the task is already running or not. This > attempt will fail (correctly). > It is useful to have capability to optionally allow the operator to "catch > up" instead of starting the Task - if the Task is of a particular status eg > 'QUEUED' then we might want to wait for the currently Queued one to complete, > instead of failing or instead of submitting another one (and snowballing). > For example, this scenario can happen if the task was previously submitted > but the Airflow Operator timed out waiting for it, when DataSync is busy. > Or, maybe we want to wait for the Queued task to complete and then submit > another Task anyway... > Allowing the user some options for starting the Task depending on Status, > allows for various use cases. > However, the current functionality of "Fail if the Task can't be started" > should remain default, to prevent unintentional problems which can arise if > we instead decided to always wait if there is already a task queued. For > example if the previous task has different Include filters than the new task, > then logically they aren't the same. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (AIRFLOW-6944) Allow AWS DataSync to "catch up" when Task is already running
[ https://issues.apache.org/jira/browse/AIRFLOW-6944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bjorn Olsen updated AIRFLOW-6944: - Description: Current AWS DataSyncOperator attempts to start running a DataSync Task, with no regard / check for whether the task is already running or not. This attempt will fail (correctly). It is useful to have capability to optionally allow the operator to "catch up" instead of starting the Task - if the Task is of a particular status eg 'QUEUED' then we might want to wait for the currently Queued one to complete, instead of failing or instead of submitting another one (and snowballing). For example, this scenario can happen if the task was previously submitted but the Airflow Operator timed out waiting for it, when DataSync is busy. Or, maybe we want to wait for the Queued task to complete and then submit another Task anyway... Allowing the user some options in terms of status management allows for various use cases. However, the current functionality of "Fail if the Task can't be started" should remain default, to prevent unintentional problems which can arise if we instead decided to always wait if there is already a task queued. For example if the previous task has different Include filters than the new task, then logically they aren't the same. was: Current AWS DataSyncOperator attempts to start running a DataSync Task, with no regard / check for whether the task is already running or not. It is useful to have capability to optionally allow the operator to "catch up" instead of starting the Task - if the Task is of a particular status eg 'QUEUED' then we might want to just use the currently Queued one instead of submitting another one. This scenario can happen if the task was previously submitted but the operator timed out waiting for it, for example. Or, maybe we want to wait for the Queued task to complete and then submit another Task ... Allowing the user some options in terms of status management allows for various robustness when submitting new task executions. However, the current functionality of "Fail if the Task can't be started" should remain default, to prevent unintentional problems which can arise if we instead decided to always wait if there is already a task queued. For example if the previous task has different Include filters than the new task, then logically they aren't the same. > Allow AWS DataSync to "catch up" when Task is already running > - > > Key: AIRFLOW-6944 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6944 > Project: Apache Airflow > Issue Type: Improvement > Components: aws >Affects Versions: 1.10.9 >Reporter: Bjorn Olsen >Assignee: Bjorn Olsen >Priority: Minor > > Current AWS DataSyncOperator attempts to start running a DataSync Task, with > no regard / check for whether the task is already running or not. This > attempt will fail (correctly). > It is useful to have capability to optionally allow the operator to "catch > up" instead of starting the Task - if the Task is of a particular status eg > 'QUEUED' then we might want to wait for the currently Queued one to complete, > instead of failing or instead of submitting another one (and snowballing). > For example, this scenario can happen if the task was previously submitted > but the Airflow Operator timed out waiting for it, when DataSync is busy. > Or, maybe we want to wait for the Queued task to complete and then submit > another Task anyway... > Allowing the user some options in terms of status management allows for > various use cases. > However, the current functionality of "Fail if the Task can't be started" > should remain default, to prevent unintentional problems which can arise if > we instead decided to always wait if there is already a task queued. For > example if the previous task has different Include filters than the new task, > then logically they aren't the same. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (AIRFLOW-6944) Allow AWS DataSync to "catch up" when Task is already running
Bjorn Olsen created AIRFLOW-6944: Summary: Allow AWS DataSync to "catch up" when Task is already running Key: AIRFLOW-6944 URL: https://issues.apache.org/jira/browse/AIRFLOW-6944 Project: Apache Airflow Issue Type: Improvement Components: aws Affects Versions: 1.10.9 Reporter: Bjorn Olsen Assignee: Bjorn Olsen Current AWS DataSyncOperator attempts to start running a DataSync Task, with no regard / check for whether the task is already running or not. It is useful to have capability to optionally allow the operator to "catch up" instead of starting the Task - if the Task is of a particular status eg 'QUEUED' then we might want to just use the currently Queued one instead of submitting another one. This scenario can happen if the task was previously submitted but the operator timed out waiting for it, for example. Or, maybe we want to wait for the Queued task to complete and then submit another Task ... Allowing the user some options in terms of status management allows for various robustness when submitting new task executions. However, the current functionality of "Fail if the Task can't be started" should remain default, to prevent unintentional problems which can arise if we instead decided to always wait if there is already a task queued. For example if the previous task has different Include filters than the new task, then logically they aren't the same. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (AIRFLOW-6824) EMRAddStepsOperator does not work well with multi-step XCom
Bjorn Olsen created AIRFLOW-6824: Summary: EMRAddStepsOperator does not work well with multi-step XCom Key: AIRFLOW-6824 URL: https://issues.apache.org/jira/browse/AIRFLOW-6824 Project: Apache Airflow Issue Type: Bug Components: aws Affects Versions: 1.10.9 Reporter: Bjorn Olsen Assignee: Bjorn Olsen EmrAddStepsOperator allows you to add several steps to EMR for processing - the steps must be supplied as a list. This works well when passing an actual Python list as the 'steps' value, but we want to be able to generate the list of steps from a previous task - using an XCom. We must use the operator as follows, for the templating to work correctly and for it to resolve the XCom: {code:java} add_steps_task = EmrAddStepsOperator( task_id='add_steps', job_flow_id=job_flow_id, aws_conn_id='aws_default', provide_context=True, steps="{{task_instance.xcom_pull(task_ids='generate_steps')}}" ){code} The value in XCom from the 'generate_steps' task looks like (simplified): {code:java} [{'Name':'Step1'}, {'Name':'Step2'}] {code} However this is passed as a string to the operator, which cannot be passed to the underlying boto3 library which expects a list object. The following won't work either: {code:java} add_steps_task = EmrAddStepsOperator( task_id='add_steps', job_flow_id=job_flow_id, aws_conn_id='aws_default', provide_context=True, steps={{task_instance.xcom_pull(task_ids='generate_steps')}} ){code} Since this is not valid Python. We have to pass the steps as a string to the operator, and then convert it into a list after the render_template_fields has happened (immediately before the execute). Therefore the only option is to do the conversion from string to list in the operator's execute method. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (AIRFLOW-6822) AWS hooks dont always cache the boto3 client
Bjorn Olsen created AIRFLOW-6822: Summary: AWS hooks dont always cache the boto3 client Key: AIRFLOW-6822 URL: https://issues.apache.org/jira/browse/AIRFLOW-6822 Project: Apache Airflow Issue Type: Bug Components: aws Affects Versions: 1.10.9 Reporter: Bjorn Olsen Assignee: Bjorn Olsen Implementation of the Amazon AWS hooks (eg S3 hook, Glue hook etc) varies with how they call the underlying aws_hook.get_client_type(X) method. Most of the time the client that gets returned is cached by the superclass, but not always. The client should always be cached for performance reasons - creating a client is a time consuming process. Example of how to do it (athena.py): {code:java} def get_conn(self): """ check if aws conn exists already or create one and return it :return: boto3 session """ if not self.conn: self.conn = self.get_client_type('athena') return self.conn{code} Example of how not to do it: (s3.py): {code:java} def get_conn(self): return self.get_client_type('s3'){code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (AIRFLOW-6672) AWS DataSync - better logging of error message
Bjorn Olsen created AIRFLOW-6672: Summary: AWS DataSync - better logging of error message Key: AIRFLOW-6672 URL: https://issues.apache.org/jira/browse/AIRFLOW-6672 Project: Apache Airflow Issue Type: Improvement Components: aws Affects Versions: 1.10.7 Reporter: Bjorn Olsen Assignee: Bjorn Olsen When the AWS DataSync operator fails, it dumps a TaskDescription to the log. The TaskDescription is in JSON format and contains several elements. This is hard to read to try and see what exactly went wrong. Example 1: [2020-01-28 17:44:39,495] \{datasync.py:354} INFO - task_execution_description=\{"TaskExecutionArn": "arn:aws:datasync:***:***:task/task-***/execution/exec-***", "Status": "ERROR", "Options": {"VerifyMode": "ONLY_FILES_TRANSFERRED", "OverwriteMode": "ALWAYS", "Atime": "BEST_EFFORT", "Mtime": "PRESERVE", "Uid": "INT_VALUE", "Gid": "INT_VALUE", "PreserveDeletedFiles": "PRESERVE", "PreserveDevices": "NONE", "PosixPermissions": "PRESERVE", "BytesPerSecond": -1, "TaskQueueing": "ENABLED"}, "Excludes": [], "Includes": [\{"FilterType": "SIMPLE_PATTERN", "Value": "***"}], "StartTime": datetime.datetime(2020, 1, 28, 17, 36, 2, 816000, tzinfo=tzlocal()), "EstimatedFilesToTransfer": 7, "EstimatedBytesToTransfer": 4534925, "FilesTransferred": 7, "BytesWritten": 4534925, "BytesTransferred": 4534925, "Result": \{"PrepareDuration": 9795, "PrepareStatus": "SUCCESS", "TotalDuration": 351660, "TransferDuration": 338568, "TransferStatus": "SUCCESS", "VerifyDuration": 7006, "VerifyStatus": "ERROR", "ErrorCode": "OpNotSupp", "ErrorDetail": "Operation not supported"}, "ResponseMetadata": \{"RequestId": "***", "HTTPStatusCode": 200, "HTTPHeaders": {"date": "Tue, 28 Jan 2020 15:44:39 GMT", "content-type": "application/x-amz-json-1.1", "content-length": "994", "connection": "keep-alive", "x-amzn-requestid": "***"}, "RetryAttempts": 0}} Example 2: [2020-01-28 18:23:23,322] \{datasync.py:354} INFO - task_execution_description=\{"TaskExecutionArn": "arn:aws:datasync:***:***:task/task-***/execution/exec-***", "Status": "ERROR", "Options": {"VerifyMode": "ONLY_FILES_TRANSFERRED", "OverwriteMode": "ALWAYS", "Atime": "BEST_EFFORT", "Mtime": "PRESERVE", "Uid": "INT_VALUE", "Gid": "INT_VALUE", "PreserveDeletedFiles": "PRESERVE", "PreserveDevices": "NONE", "PosixPermissions": "PRESERVE", "BytesPerSecond": -1, "TaskQueueing": "ENABLED"}, "Excludes": [], "Includes": [\{"FilterType": "SIMPLE_PATTERN", "Value": "***"}], "StartTime": datetime.datetime(2020, 1, 28, 17, 45, 57, 212000, tzinfo=tzlocal()), "EstimatedFilesToTransfer": 0, "EstimatedBytesToTransfer": 0, "FilesTransferred": 0, "BytesWritten": 0, "BytesTransferred": 0, "Result": \{"PrepareDuration": 16687, "PrepareStatus": "SUCCESS", "TotalDuration": 2083467, "TransferDuration": 2065744, "TransferStatus": "ERROR", "VerifyDuration": 5251, "VerifyStatus": "SUCCESS", "ErrorCode": "SockTlsHandshakeFailure", "ErrorDetail": "DataSync agent ran into an error connecting to AWS.Please review the DataSync network requirements and ensure required endpoints are accessible from the agent. Please contact AWS support if the error persists."}, "ResponseMetadata": \{"RequestId": "***", "HTTPStatusCode": 200, "HTTPHeaders": {"date": "Tue, 28 Jan 2020 16:23:23 GMT", "content-type": "application/x-amz-json-1.1", "content-length": "1179", "connection": "keep-alive", "x-amzn-requestid": "***"}, "RetryAttempts": 0}} Note that the 'Result' element contains the statuses and errors that are of interest, however these are hard to see in the log at the moment. Example of a successful one: 'Result': \{'PrepareDuration': 9663, 'PrepareStatus': 'SUCCESS', 'TotalDuration': 352095, 'TransferDuration': 338358, 'TransferStatus': 'SUCCESS', 'VerifyDuration': 7171, 'VerifyStatus': 'SUCCESS'}, Suggested output is to include the previous line/s but also add: [2020-01-28 17:44:39,495] \{datasync.py:354} INFO/ERROR - Status=SUCCESS/ERROR [2020-01-28 17:44:39,495] \{datasync.py:354} INFO/ERROR - PrepareStatus=SUCCESS/ERROR PrepareDuration=9795 [2020-01-28 17:44:39,495] \{datasync.py:354} INFO/ERROR - TransferStatus=SUCCESS/ERROR TransferDuration=9795 [2020-01-28 17:44:39,495] \{datasync.py:354} INFO/ERROR - VerifyStatus=SUCCESS/ERROR TransferDuration=9795 [2020-01-28 17:44:39,495] \{datasync.py:354} ERROR - ErrorCode=OpNotSupp, ErrorDetail=Operation not supported This should make it much clearer what the job status and errors are. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (AIRFLOW-6654) AWS DataSync - bugfix when creating locations
Bjorn Olsen created AIRFLOW-6654: Summary: AWS DataSync - bugfix when creating locations Key: AIRFLOW-6654 URL: https://issues.apache.org/jira/browse/AIRFLOW-6654 Project: Apache Airflow Issue Type: Bug Components: aws Affects Versions: 1.10.7 Reporter: Bjorn Olsen Assignee: Bjorn Olsen If a Location does not exist it will attempt to be created, when creating an AWS Datasync task. However this should only happen if the appropriate create_kwargs where provided - otherwise creation should not be attempted and instead an error should be produced. This is currently not happening, below log indicates a source location was missing and DataSync operator tried to create it. However no kwargs were provided so instead it should just immediately fail. The log message is also a bit hard to understand and can be improved. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (AIRFLOW-6400) pytest not working on Windows
[ https://issues.apache.org/jira/browse/AIRFLOW-6400?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bjorn Olsen updated AIRFLOW-6400: - Description: While Windows isn't supported I am using it to do development and testing. This line in conftest.py is causing problems because "HOME" is not an environment variable on Windows. home = os.environ.get("HOME") It can easily be changed to be cross-platform. Edit: The same issue is causing test discovery to fail on Windows 10. was: While Windows isn't supported I am using it to do development and testing. This line in conftest.py is causing problems because "HOME" is not an environment variable on Windows. home = os.environ.get("HOME") It can easily be changed to be cross-platform. > pytest not working on Windows > - > > Key: AIRFLOW-6400 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6400 > Project: Apache Airflow > Issue Type: Bug > Components: tests >Affects Versions: 1.10.7 >Reporter: Bjorn Olsen >Assignee: Bjorn Olsen >Priority: Minor > > While Windows isn't supported I am using it to do development and testing. > This line in conftest.py is causing problems because "HOME" is not an > environment variable on Windows. > home = os.environ.get("HOME") > It can easily be changed to be cross-platform. > Edit: The same issue is causing test discovery to fail on Windows 10. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (AIRFLOW-6400) pytest not working on Windows
Bjorn Olsen created AIRFLOW-6400: Summary: pytest not working on Windows Key: AIRFLOW-6400 URL: https://issues.apache.org/jira/browse/AIRFLOW-6400 Project: Apache Airflow Issue Type: Bug Components: tests Affects Versions: 1.10.7 Reporter: Bjorn Olsen Assignee: Bjorn Olsen While Windows isn't supported I am using it to do development and testing. This line in conftest.py is causing problems because "HOME" is not an environment variable on Windows. home = os.environ.get("HOME") It can easily be changed to be cross-platform. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (AIRFLOW-6366) Fix migrations for MS SQL Server
[ https://issues.apache.org/jira/browse/AIRFLOW-6366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bjorn Olsen updated AIRFLOW-6366: - Description: Although MS SQL Server is not officially supported as a backend database, I am using it via PyODBC and found a minor issue with migration from 1.10.6 to 1.10.7. Below is a log excerpt showing the issue: INFO [alembic.runtime.migration] Running upgrade 6e96a59344a4 -> d38e04c12aa2, add serialized_dag table sqlalchemy.exc.ProgrammingError: (pyodbc.ProgrammingError) ('42000', "[42000] [Microsoft][ODBC Driver 17 for SQL Server][SQL Server]'JSON_VALID' is not a recognized built-in function name. (195) (SQLExecDirectW)") was: Although MS SQL Server is not officially supported, I am using it via PyODBC and found a minor issue with migration from 1.10.6 to 1.10.7. Below is a log excerpt showing the issue: INFO [alembic.runtime.migration] Running upgrade 6e96a59344a4 -> d38e04c12aa2, add serialized_dag table sqlalchemy.exc.ProgrammingError: (pyodbc.ProgrammingError) ('42000', "[42000] [Microsoft][ODBC Driver 17 for SQL Server][SQL Server]'JSON_VALID' is not a recognized built-in function name. (195) (SQLExecDirectW)") > Fix migrations for MS SQL Server > > > Key: AIRFLOW-6366 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6366 > Project: Apache Airflow > Issue Type: Bug > Components: database >Affects Versions: 1.10.7 >Reporter: Bjorn Olsen >Assignee: Bjorn Olsen >Priority: Minor > > Although MS SQL Server is not officially supported as a backend database, I > am using it via PyODBC and found a minor issue with migration from 1.10.6 to > 1.10.7. > > Below is a log excerpt showing the issue: > INFO [alembic.runtime.migration] Running upgrade 6e96a59344a4 -> > d38e04c12aa2, add serialized_dag table > sqlalchemy.exc.ProgrammingError: (pyodbc.ProgrammingError) ('42000', "[42000] > [Microsoft][ODBC Driver 17 for SQL Server][SQL Server]'JSON_VALID' is not a > recognized built-in function name. (195) (SQLExecDirectW)") -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (AIRFLOW-6366) Fix migrations for MS SQL Server
Bjorn Olsen created AIRFLOW-6366: Summary: Fix migrations for MS SQL Server Key: AIRFLOW-6366 URL: https://issues.apache.org/jira/browse/AIRFLOW-6366 Project: Apache Airflow Issue Type: Bug Components: database Affects Versions: 1.10.7 Reporter: Bjorn Olsen Assignee: Bjorn Olsen Although MS SQL Server is not officially supported, I am using it via PyODBC and found a minor issue with migration from 1.10.6 to 1.10.7. Below is a log excerpt showing the issue: INFO [alembic.runtime.migration] Running upgrade 6e96a59344a4 -> d38e04c12aa2, add serialized_dag table sqlalchemy.exc.ProgrammingError: (pyodbc.ProgrammingError) ('42000', "[42000] [Microsoft][ODBC Driver 17 for SQL Server][SQL Server]'JSON_VALID' is not a recognized built-in function name. (195) (SQLExecDirectW)") -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (AIRFLOW-6327) http_hook: Accept json= parameter for payload
[ https://issues.apache.org/jira/browse/AIRFLOW-6327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bjorn Olsen updated AIRFLOW-6327: - Summary: http_hook: Accept json= parameter for payload (was: http_hook: Add json requests payload) > http_hook: Accept json= parameter for payload > - > > Key: AIRFLOW-6327 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6327 > Project: Apache Airflow > Issue Type: Improvement > Components: hooks >Affects Versions: 1.10.6 >Reporter: Bjorn Olsen >Assignee: Bjorn Olsen >Priority: Minor > > Python requests library allows a user to send json-encoded Python objects by > making use of the "json=" parameter to a request. > This handles JSON encoding of the payload and setting the correct content > type header. > Example: > {code:python} > response = requests.post('https://httpbin.org/post', json={'key':'value'}) > json_response = response.json() > json_response['data'] '{"key": "value"}' > json_response['headers']['Content-Type'] 'application/json' > {code} > http_hook.run() does not yet have the json= parameter which is inconvenient > as then we have to pass the "data=" parameter using json.dumps and specify > the correct headers. > It would be better if we can just do something like the below and let the > Requests library ensure the request is valid: > {code:python} > obj = {'a':1, 'b': 'abc', 'c': [1, 2, {"d":10}]} > response = hook.run( > endpoint, > json=obj) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (AIRFLOW-6327) http_hook: Add json requests payload
Bjorn Olsen created AIRFLOW-6327: Summary: http_hook: Add json requests payload Key: AIRFLOW-6327 URL: https://issues.apache.org/jira/browse/AIRFLOW-6327 Project: Apache Airflow Issue Type: Improvement Components: hooks Affects Versions: 1.10.6 Reporter: Bjorn Olsen Assignee: Bjorn Olsen Python requests library allows a user to send json-encoded Python objects by making use of the "json=" parameter to a request. This handles JSON encoding of the payload and setting the correct content type header. Example: {code:python} response = requests.post('https://httpbin.org/post', json={'key':'value'}) json_response = response.json() json_response['data'] '{"key": "value"}' json_response['headers']['Content-Type'] 'application/json' {code} http_hook.run() does not yet have the json= parameter which is inconvenient as then we have to pass the "data=" parameter using json.dumps and specify the correct headers. It would be better if we can just do something like the below and let the Requests library ensure the request is valid: {code:python} obj = {'a':1, 'b': 'abc', 'c': [1, 2, {"d":10}]} response = hook.run( endpoint, json=obj) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (AIRFLOW-6195) queued_dttm is "None" on UI, and not updated when tasks requeued
[ https://issues.apache.org/jira/browse/AIRFLOW-6195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16990952#comment-16990952 ] Bjorn Olsen commented on AIRFLOW-6195: -- Turns out this actually affects several fields, due to them being missing from refresh_from_db() in the TaskInstance model file. Fix incoming > queued_dttm is "None" on UI, and not updated when tasks requeued > > > Key: AIRFLOW-6195 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6195 > Project: Apache Airflow > Issue Type: Bug > Components: ui >Affects Versions: 1.10.6 >Reporter: Bjorn Olsen >Assignee: Bjorn Olsen >Priority: Minor > Attachments: image-2019-12-08-14-44-56-762.png, > image-2019-12-08-14-45-34-266.png, image-2019-12-08-14-46-09-051.png > > > When inspecting a task instance on the UI, the value for queued_dttm displays > as 'None' despite having a value in the DB. Also, the value for queued_dttm > is from when the task was first queued and not updated if it is requeued - it > is not clear if this is intentional behaviour or not. > On UI: > !image-2019-12-08-14-44-56-762.png! > In DB: > !image-2019-12-08-14-45-34-266.png! > In reality, task was queued on 8 December and run shortly after. > queued_dttm in the DB is from the very first attempt, and is not updated from > recent attempts. > !image-2019-12-08-14-46-09-051.png! > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (AIRFLOW-6195) queued_dttm is "None" on UI, and not updated when tasks requeued
Bjorn Olsen created AIRFLOW-6195: Summary: queued_dttm is "None" on UI, and not updated when tasks requeued Key: AIRFLOW-6195 URL: https://issues.apache.org/jira/browse/AIRFLOW-6195 Project: Apache Airflow Issue Type: Bug Components: ui Affects Versions: 1.10.6 Reporter: Bjorn Olsen Assignee: Bjorn Olsen Attachments: image-2019-12-08-14-44-56-762.png, image-2019-12-08-14-45-34-266.png, image-2019-12-08-14-46-09-051.png When inspecting a task instance on the UI, the value for queued_dttm displays as 'None' despite having a value in the DB. Also, the value for queued_dttm is from when the task was first queued and not updated if it is requeued - it is not clear if this is intentional behaviour or not. On UI: !image-2019-12-08-14-44-56-762.png! In DB: !image-2019-12-08-14-45-34-266.png! In reality, task was queued on 8 December and run shortly after. queued_dttm in the DB is from the very first attempt, and is not updated from recent attempts. !image-2019-12-08-14-46-09-051.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (AIRFLOW-6190) Task instances queued and dequeued before worker is ready, causing intermittently failed tasks
[ https://issues.apache.org/jira/browse/AIRFLOW-6190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bjorn Olsen closed AIRFLOW-6190. Resolution: Duplicate Closing as Duplicate > Task instances queued and dequeued before worker is ready, causing > intermittently failed tasks > -- > > Key: AIRFLOW-6190 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6190 > Project: Apache Airflow > Issue Type: Bug > Components: core >Affects Versions: 1.10.6 >Reporter: Bjorn Olsen >Assignee: Bjorn Olsen >Priority: Minor > Attachments: image-2019-12-06-13-55-33-974.png > > > Below dag creates 20 identical simple tasks which depend on each other in > series. > Installing the DAG and executing all the DAG runs works perfectly the first > time around. > Then Pausing the DAG, clearing all the dag runs, and unpausing the DAG leads > to intermittent task failures. > Edit: This isn't specifically tied to the first and second round; it seems to > randomly affect an entire set of dag runs or not affect the set at all. This > makes me suspect a timing issue between the executor and scheduler (sometimes > they align and sometimes they dont). > {code:java} > from builtins import range > from datetime import timedelta > import airflow > from airflow.models import DAG > from airflow.operators.bash_operator import BashOperator > from airflow.operators.latest_only_operator import LatestOnlyOperator > from airflow.operators.python_operator import (BranchPythonOperator, >PythonOperator) > import sys, os > args = { > 'owner': 'airflow', > 'start_date': airflow.utils.dates.days_ago(5), > } > dag = DAG( > dag_id='bug_testing_dag', > default_args=args, > schedule_interval='@daily', > max_active_runs=1 > ) > def func(): >pass > prev_task = None > for i in range(0,20): > task = PythonOperator( > task_id='task_{0}'.format(i), > python_callable=func, > dag=dag,) > if prev_task: > prev_task >> task > > prev_task = task > if __name__ == "__main__": > dag.cli(){code} > I am using the LocalExecutor. > job_heartbeat_sec = 5 > scheduler_heartbeat_sec = 5 > Example: > !image-2019-12-06-13-55-33-974.png|width=398,height=276! > > The second attempt tasks have 2 Logs shown on the UI if they were successful, > and 2 physical log files on disk. However the tasks that Failed only have 1 > log shown on the UI, despite there being 2 physical log files on disk. > (Presumably the UI uses the Airflow DB which for some reason isn't aware of > the second log for the failed tasks). > > Anyway I am more interested in the intermittent failures than what logs are > shown on the UI. > Here is an example of the second log file for the Failed task attempts: > {code:java} > [2019-12-06 13:40:57,064] {taskinstance.py:624} INFO - Dependencies not met > for [scheduled]>, dependency 'Task Instance State' FAILED: Task is in the > 'scheduled' state which is not a valid state for execution. The task must be > cleared in order to be run. > [2019-12-06 13:40:57,065] {logging_mixin.py:112} INFO - [2019-12-06 > 13:40:57,065] {local_task_job.py:91} INFO - Task is not able to be run > [2019-12-06 13:41:09,004] {taskinstance.py:624} INFO - Dependencies not met > for [failed]>, dependency 'Task Instance State' FAILED: Task is in the 'failed' > state which is not a valid state for execution. The task must be cleared in > order to be run. > [2019-12-06 13:41:09,005] {logging_mixin.py:112} INFO - [2019-12-06 > 13:41:09,005] {local_task_job.py:91} INFO - Task is not able to be run > {code} > > At first I thought this was because the workers were still busy with the > previous TaskInstance (because there is a delay between when a TaskInstance > state is set to SUCCESS, and when the worker is actually done with it, > because of the worker heartbeat). The scheduler thinks the next task can be > SCHEDULED -> QUEUED, but does not start as the worker is still busy, and > therefore it goes back to QUEUED -> SCHEDULED. The task is still in the > worker queue, causing the failure above when the worker eventually wants to > start it. > However what is a mystery to me is why it works the first time the dag_run > runs, and not the second time. Perhaps it is something specific to my > environment. > I'm going to try and debug this myself but if anyone else can replicate this > issue in their environment it could help me understand if it is just > affecting me (or not). > Just install the DAG, let it run 100% once, then clear it and let it run > again (and you should start seeing random failures) > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (AIRFLOW-6190) Task instances queued and dequeued before worker is ready, causing intermittently failed tasks
[ https://issues.apache.org/jira/browse/AIRFLOW-6190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16990843#comment-16990843 ] Bjorn Olsen commented on AIRFLOW-6190: -- [~kaxilnaik] , [~ash] I've done some testing and your PR does resolve this issue as well :) Managed to run my DAG a couple times and it works perfectly after your fixes. I'll discard mine since I think yours are more comprehensive. Along the way I noticed a possible bug with queued_dttm so I'll create a new Jira for that and move on to it. Thanks for the help :) > Task instances queued and dequeued before worker is ready, causing > intermittently failed tasks > -- > > Key: AIRFLOW-6190 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6190 > Project: Apache Airflow > Issue Type: Bug > Components: core >Affects Versions: 1.10.6 >Reporter: Bjorn Olsen >Assignee: Bjorn Olsen >Priority: Minor > Attachments: image-2019-12-06-13-55-33-974.png > > > Below dag creates 20 identical simple tasks which depend on each other in > series. > Installing the DAG and executing all the DAG runs works perfectly the first > time around. > Then Pausing the DAG, clearing all the dag runs, and unpausing the DAG leads > to intermittent task failures. > Edit: This isn't specifically tied to the first and second round; it seems to > randomly affect an entire set of dag runs or not affect the set at all. This > makes me suspect a timing issue between the executor and scheduler (sometimes > they align and sometimes they dont). > {code:java} > from builtins import range > from datetime import timedelta > import airflow > from airflow.models import DAG > from airflow.operators.bash_operator import BashOperator > from airflow.operators.latest_only_operator import LatestOnlyOperator > from airflow.operators.python_operator import (BranchPythonOperator, >PythonOperator) > import sys, os > args = { > 'owner': 'airflow', > 'start_date': airflow.utils.dates.days_ago(5), > } > dag = DAG( > dag_id='bug_testing_dag', > default_args=args, > schedule_interval='@daily', > max_active_runs=1 > ) > def func(): >pass > prev_task = None > for i in range(0,20): > task = PythonOperator( > task_id='task_{0}'.format(i), > python_callable=func, > dag=dag,) > if prev_task: > prev_task >> task > > prev_task = task > if __name__ == "__main__": > dag.cli(){code} > I am using the LocalExecutor. > job_heartbeat_sec = 5 > scheduler_heartbeat_sec = 5 > Example: > !image-2019-12-06-13-55-33-974.png|width=398,height=276! > > The second attempt tasks have 2 Logs shown on the UI if they were successful, > and 2 physical log files on disk. However the tasks that Failed only have 1 > log shown on the UI, despite there being 2 physical log files on disk. > (Presumably the UI uses the Airflow DB which for some reason isn't aware of > the second log for the failed tasks). > > Anyway I am more interested in the intermittent failures than what logs are > shown on the UI. > Here is an example of the second log file for the Failed task attempts: > {code:java} > [2019-12-06 13:40:57,064] {taskinstance.py:624} INFO - Dependencies not met > for [scheduled]>, dependency 'Task Instance State' FAILED: Task is in the > 'scheduled' state which is not a valid state for execution. The task must be > cleared in order to be run. > [2019-12-06 13:40:57,065] {logging_mixin.py:112} INFO - [2019-12-06 > 13:40:57,065] {local_task_job.py:91} INFO - Task is not able to be run > [2019-12-06 13:41:09,004] {taskinstance.py:624} INFO - Dependencies not met > for [failed]>, dependency 'Task Instance State' FAILED: Task is in the 'failed' > state which is not a valid state for execution. The task must be cleared in > order to be run. > [2019-12-06 13:41:09,005] {logging_mixin.py:112} INFO - [2019-12-06 > 13:41:09,005] {local_task_job.py:91} INFO - Task is not able to be run > {code} > > At first I thought this was because the workers were still busy with the > previous TaskInstance (because there is a delay between when a TaskInstance > state is set to SUCCESS, and when the worker is actually done with it, > because of the worker heartbeat). The scheduler thinks the next task can be > SCHEDULED -> QUEUED, but does not start as the worker is still busy, and > therefore it goes back to QUEUED -> SCHEDULED. The task is still in the > worker queue, causing the failure above when the worker eventually wants to > start it. > However what is a mystery to me is why it works the first time the dag_run > runs, and not the second time. Perhaps it is something specific to my > environment. > I'm going to
[jira] [Commented] (AIRFLOW-6190) Task instances queued and dequeued before worker is ready, causing intermittently failed tasks
[ https://issues.apache.org/jira/browse/AIRFLOW-6190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16990119#comment-16990119 ] Bjorn Olsen commented on AIRFLOW-6190: -- Thanks for the feedback guys. It does seem similar. Could you take a look at the fix I found and see what you think? I'm no scheduler expert so I thought it would be great if you could take a look. [https://github.com/baolsen/airflow/pull/10/files] Basically I changed the scheduler to prevent doing SCHEDULED -> QUEUED -> SCHEDULED immediately if the task was just queued now and the executor has not yet had a chance to look at it. This happens when the executor's task was completed but it is still waiting in its heartbeat loop, and meanwhile the scheduler thinks it can queue up new tasks. Do you think your changes would avoid this scenario or could it still happen? > Task instances queued and dequeued before worker is ready, causing > intermittently failed tasks > -- > > Key: AIRFLOW-6190 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6190 > Project: Apache Airflow > Issue Type: Bug > Components: core >Affects Versions: 1.10.6 >Reporter: Bjorn Olsen >Assignee: Bjorn Olsen >Priority: Minor > Attachments: image-2019-12-06-13-55-33-974.png > > > Below dag creates 20 identical simple tasks which depend on each other in > series. > Installing the DAG and executing all the DAG runs works perfectly the first > time around. > Then Pausing the DAG, clearing all the dag runs, and unpausing the DAG leads > to intermittent task failures. > Edit: This isn't specifically tied to the first and second round; it seems to > randomly affect an entire set of dag runs or not affect the set at all. This > makes me suspect a timing issue between the executor and scheduler (sometimes > they align and sometimes they dont). > {code:java} > from builtins import range > from datetime import timedelta > import airflow > from airflow.models import DAG > from airflow.operators.bash_operator import BashOperator > from airflow.operators.latest_only_operator import LatestOnlyOperator > from airflow.operators.python_operator import (BranchPythonOperator, >PythonOperator) > import sys, os > args = { > 'owner': 'airflow', > 'start_date': airflow.utils.dates.days_ago(5), > } > dag = DAG( > dag_id='bug_testing_dag', > default_args=args, > schedule_interval='@daily', > max_active_runs=1 > ) > def func(): >pass > prev_task = None > for i in range(0,20): > task = PythonOperator( > task_id='task_{0}'.format(i), > python_callable=func, > dag=dag,) > if prev_task: > prev_task >> task > > prev_task = task > if __name__ == "__main__": > dag.cli(){code} > I am using the LocalExecutor. > job_heartbeat_sec = 5 > scheduler_heartbeat_sec = 5 > Example: > !image-2019-12-06-13-55-33-974.png|width=398,height=276! > > The second attempt tasks have 2 Logs shown on the UI if they were successful, > and 2 physical log files on disk. However the tasks that Failed only have 1 > log shown on the UI, despite there being 2 physical log files on disk. > (Presumably the UI uses the Airflow DB which for some reason isn't aware of > the second log for the failed tasks). > > Anyway I am more interested in the intermittent failures than what logs are > shown on the UI. > Here is an example of the second log file for the Failed task attempts: > {code:java} > [2019-12-06 13:40:57,064] {taskinstance.py:624} INFO - Dependencies not met > for [scheduled]>, dependency 'Task Instance State' FAILED: Task is in the > 'scheduled' state which is not a valid state for execution. The task must be > cleared in order to be run. > [2019-12-06 13:40:57,065] {logging_mixin.py:112} INFO - [2019-12-06 > 13:40:57,065] {local_task_job.py:91} INFO - Task is not able to be run > [2019-12-06 13:41:09,004] {taskinstance.py:624} INFO - Dependencies not met > for [failed]>, dependency 'Task Instance State' FAILED: Task is in the 'failed' > state which is not a valid state for execution. The task must be cleared in > order to be run. > [2019-12-06 13:41:09,005] {logging_mixin.py:112} INFO - [2019-12-06 > 13:41:09,005] {local_task_job.py:91} INFO - Task is not able to be run > {code} > > At first I thought this was because the workers were still busy with the > previous TaskInstance (because there is a delay between when a TaskInstance > state is set to SUCCESS, and when the worker is actually done with it, > because of the worker heartbeat). The scheduler thinks the next task can be > SCHEDULED -> QUEUED, but does not start as the worker is still busy, and > therefore it goes back to QUEUED
[jira] [Comment Edited] (AIRFLOW-6190) Task instances queued and dequeued before worker is ready, causing intermittently failed tasks
[ https://issues.apache.org/jira/browse/AIRFLOW-6190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16989735#comment-16989735 ] Bjorn Olsen edited comment on AIRFLOW-6190 at 12/6/19 1:02 PM: --- I changed the func to include a random duration sleep and this triggers the error more consistently. This again suggests a timing issue between the scheduler and worker. {code:java} import random, time def func(): time.sleep( random.randint(0,10) ) {code} The scheduler log also starts displaying entries like this, when tasks start to fail. Note the duplicate entry but coming from 2 different Python files. {code:java} Dec 06 14:58:55 airflow[116894]: [2019-12-06 14:58:55,171] {scheduler_job.py:1321} ERROR - Executor reports task instance finished (success) although the task says its queued. Was the task killed externally? {code} {code:java} Dec 06 14:58:55 airflow[116894]: [2019-12-06 14:58:55,176] {taskinstance.py:1072} ERROR - Executor reports task instance finished (success) although the task says its queued. Was the task killed externally? {code} was (Author: bjorn.ols...@gmail.com): I changed the func to include a random duration sleep and this triggers the error more consistently. This again suggests a timing issue between the scheduler and worker. {code:java} import random, time def func(): time.sleep( random.randint(0,10) ) {code} The scheduler log also starts displaying entries like this, when tasks start to fail. Note the duplicate entry but coming from 2 different files. {code:java} Dec 06 14:58:55 airflow[116894]: [2019-12-06 14:58:55,171] {scheduler_job.py:1321} ERROR - Executor reports task instance finished (success) although the task says its queued. Was the task killed externally? {code} {code:java} Dec 06 14:58:55 airflow[116894]: [2019-12-06 14:58:55,176] {taskinstance.py:1072} ERROR - Executor reports task instance finished (success) although the task says its queued. Was the task killed externally? {code} > Task instances queued and dequeued before worker is ready, causing > intermittently failed tasks > -- > > Key: AIRFLOW-6190 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6190 > Project: Apache Airflow > Issue Type: Bug > Components: core >Affects Versions: 1.10.6 >Reporter: Bjorn Olsen >Assignee: Bjorn Olsen >Priority: Minor > Attachments: image-2019-12-06-13-55-33-974.png > > > Below dag creates 20 identical simple tasks which depend on each other in > series. > Installing the DAG and executing all the DAG runs works perfectly the first > time around. > Then Pausing the DAG, clearing all the dag runs, and unpausing the DAG leads > to intermittent task failures. > Edit: This isn't specifically tied to the first and second round; it seems to > randomly affect an entire set of dag runs or not affect the set at all. This > makes me suspect a timing issue between the executor and scheduler (sometimes > they align and sometimes they dont). > {code:java} > from builtins import range > from datetime import timedelta > import airflow > from airflow.models import DAG > from airflow.operators.bash_operator import BashOperator > from airflow.operators.latest_only_operator import LatestOnlyOperator > from airflow.operators.python_operator import (BranchPythonOperator, >PythonOperator) > import sys, os > args = { > 'owner': 'airflow', > 'start_date': airflow.utils.dates.days_ago(5), > } > dag = DAG( > dag_id='bug_testing_dag', > default_args=args, > schedule_interval='@daily', > max_active_runs=1 > ) > def func(): >pass > prev_task = None > for i in range(0,20): > task = PythonOperator( > task_id='task_{0}'.format(i), > python_callable=func, > dag=dag,) > if prev_task: > prev_task >> task > > prev_task = task > if __name__ == "__main__": > dag.cli(){code} > I am using the LocalExecutor. > job_heartbeat_sec = 5 > scheduler_heartbeat_sec = 5 > Example: > !image-2019-12-06-13-55-33-974.png|width=398,height=276! > > The second attempt tasks have 2 Logs shown on the UI if they were successful, > and 2 physical log files on disk. However the tasks that Failed only have 1 > log shown on the UI, despite there being 2 physical log files on disk. > (Presumably the UI uses the Airflow DB which for some reason isn't aware of > the second log for the failed tasks). > > Anyway I am more interested in the intermittent failures than what logs are > shown on the UI. > Here is an example of the second log file for the Failed task attempts: > {code:java} > [2019-12-06 13:40:57,064] {taskinstance.py:624} INFO - Dependencies not met >
[jira] [Comment Edited] (AIRFLOW-6190) Task instances queued and dequeued before worker is ready, causing intermittently failed tasks
[ https://issues.apache.org/jira/browse/AIRFLOW-6190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16989735#comment-16989735 ] Bjorn Olsen edited comment on AIRFLOW-6190 at 12/6/19 12:59 PM: I changed the func to include a random duration sleep and this triggers the error more consistently. This again suggests a timing issue between the scheduler and worker. {code:java} import random, time def func(): time.sleep( random.randint(0,10) ) {code} The scheduler log also starts displaying entries like this, when tasks start to fail: {code:java} Dec 06 14:57:23 airflow[116894]: [2019-12-06 14:57:23,110] {scheduler_job.py:1321} ERROR - Executor reports task instance finished (success) although the task says its queued. Was the task killed externally? {code} was (Author: bjorn.ols...@gmail.com): I changed the func to include a random duration sleep and this triggers the error more consistently. This again suggests a timing issue between the scheduler and worker. {code:java} import random, time def func(): time.sleep( random.randint(0,10) ) {code} > Task instances queued and dequeued before worker is ready, causing > intermittently failed tasks > -- > > Key: AIRFLOW-6190 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6190 > Project: Apache Airflow > Issue Type: Bug > Components: core >Affects Versions: 1.10.6 >Reporter: Bjorn Olsen >Assignee: Bjorn Olsen >Priority: Minor > Attachments: image-2019-12-06-13-55-33-974.png > > > Below dag creates 20 identical simple tasks which depend on each other in > series. > Installing the DAG and executing all the DAG runs works perfectly the first > time around. > Then Pausing the DAG, clearing all the dag runs, and unpausing the DAG leads > to intermittent task failures. > Edit: This isn't specifically tied to the first and second round; it seems to > randomly affect an entire set of dag runs or not affect the set at all. This > makes me suspect a timing issue between the executor and scheduler (sometimes > they align and sometimes they dont). > {code:java} > from builtins import range > from datetime import timedelta > import airflow > from airflow.models import DAG > from airflow.operators.bash_operator import BashOperator > from airflow.operators.latest_only_operator import LatestOnlyOperator > from airflow.operators.python_operator import (BranchPythonOperator, >PythonOperator) > import sys, os > args = { > 'owner': 'airflow', > 'start_date': airflow.utils.dates.days_ago(5), > } > dag = DAG( > dag_id='bug_testing_dag', > default_args=args, > schedule_interval='@daily', > max_active_runs=1 > ) > def func(): >pass > prev_task = None > for i in range(0,20): > task = PythonOperator( > task_id='task_{0}'.format(i), > python_callable=func, > dag=dag,) > if prev_task: > prev_task >> task > > prev_task = task > if __name__ == "__main__": > dag.cli(){code} > I am using the LocalExecutor. > job_heartbeat_sec = 5 > scheduler_heartbeat_sec = 5 > Example: > !image-2019-12-06-13-55-33-974.png|width=398,height=276! > > The second attempt tasks have 2 Logs shown on the UI if they were successful, > and 2 physical log files on disk. However the tasks that Failed only have 1 > log shown on the UI, despite there being 2 physical log files on disk. > (Presumably the UI uses the Airflow DB which for some reason isn't aware of > the second log for the failed tasks). > > Anyway I am more interested in the intermittent failures than what logs are > shown on the UI. > Here is an example of the second log file for the Failed task attempts: > {code:java} > [2019-12-06 13:40:57,064] {taskinstance.py:624} INFO - Dependencies not met > for [scheduled]>, dependency 'Task Instance State' FAILED: Task is in the > 'scheduled' state which is not a valid state for execution. The task must be > cleared in order to be run. > [2019-12-06 13:40:57,065] {logging_mixin.py:112} INFO - [2019-12-06 > 13:40:57,065] {local_task_job.py:91} INFO - Task is not able to be run > [2019-12-06 13:41:09,004] {taskinstance.py:624} INFO - Dependencies not met > for [failed]>, dependency 'Task Instance State' FAILED: Task is in the 'failed' > state which is not a valid state for execution. The task must be cleared in > order to be run. > [2019-12-06 13:41:09,005] {logging_mixin.py:112} INFO - [2019-12-06 > 13:41:09,005] {local_task_job.py:91} INFO - Task is not able to be run > {code} > > At first I thought this was because the workers were still busy with the > previous TaskInstance (because there is a delay between when a TaskInstance >
[jira] [Commented] (AIRFLOW-6190) Task instances queued and dequeued before worker is ready, causing intermittently failed tasks
[ https://issues.apache.org/jira/browse/AIRFLOW-6190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16989735#comment-16989735 ] Bjorn Olsen commented on AIRFLOW-6190: -- I changed the func to include a random duration sleep and this triggers the error more consistently. This again suggests a timing issue between the scheduler and worker. {code:java} import random, time def func(): time.sleep( random.randint(0,10) ) {code} > Task instances queued and dequeued before worker is ready, causing > intermittently failed tasks > -- > > Key: AIRFLOW-6190 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6190 > Project: Apache Airflow > Issue Type: Bug > Components: core >Affects Versions: 1.10.6 >Reporter: Bjorn Olsen >Assignee: Bjorn Olsen >Priority: Minor > Attachments: image-2019-12-06-13-55-33-974.png > > > Below dag creates 20 identical simple tasks which depend on each other in > series. > Installing the DAG and executing all the DAG runs works perfectly the first > time around. > Then Pausing the DAG, clearing all the dag runs, and unpausing the DAG leads > to intermittent task failures. > Edit: This isn't specifically tied to the first and second round; it seems to > randomly affect an entire set of dag runs or not affect the set at all. This > makes me suspect a timing issue between the executor and scheduler (sometimes > they align and sometimes they dont). > {code:java} > from builtins import range > from datetime import timedelta > import airflow > from airflow.models import DAG > from airflow.operators.bash_operator import BashOperator > from airflow.operators.latest_only_operator import LatestOnlyOperator > from airflow.operators.python_operator import (BranchPythonOperator, >PythonOperator) > import sys, os > args = { > 'owner': 'airflow', > 'start_date': airflow.utils.dates.days_ago(5), > } > dag = DAG( > dag_id='bug_testing_dag', > default_args=args, > schedule_interval='@daily', > max_active_runs=1 > ) > def func(): >pass > prev_task = None > for i in range(0,20): > task = PythonOperator( > task_id='task_{0}'.format(i), > python_callable=func, > dag=dag,) > if prev_task: > prev_task >> task > > prev_task = task > if __name__ == "__main__": > dag.cli(){code} > I am using the LocalExecutor. > job_heartbeat_sec = 5 > scheduler_heartbeat_sec = 5 > Example: > !image-2019-12-06-13-55-33-974.png|width=398,height=276! > > The second attempt tasks have 2 Logs shown on the UI if they were successful, > and 2 physical log files on disk. However the tasks that Failed only have 1 > log shown on the UI, despite there being 2 physical log files on disk. > (Presumably the UI uses the Airflow DB which for some reason isn't aware of > the second log for the failed tasks). > > Anyway I am more interested in the intermittent failures than what logs are > shown on the UI. > Here is an example of the second log file for the Failed task attempts: > {code:java} > [2019-12-06 13:40:57,064] {taskinstance.py:624} INFO - Dependencies not met > for [scheduled]>, dependency 'Task Instance State' FAILED: Task is in the > 'scheduled' state which is not a valid state for execution. The task must be > cleared in order to be run. > [2019-12-06 13:40:57,065] {logging_mixin.py:112} INFO - [2019-12-06 > 13:40:57,065] {local_task_job.py:91} INFO - Task is not able to be run > [2019-12-06 13:41:09,004] {taskinstance.py:624} INFO - Dependencies not met > for [failed]>, dependency 'Task Instance State' FAILED: Task is in the 'failed' > state which is not a valid state for execution. The task must be cleared in > order to be run. > [2019-12-06 13:41:09,005] {logging_mixin.py:112} INFO - [2019-12-06 > 13:41:09,005] {local_task_job.py:91} INFO - Task is not able to be run > {code} > > At first I thought this was because the workers were still busy with the > previous TaskInstance (because there is a delay between when a TaskInstance > state is set to SUCCESS, and when the worker is actually done with it, > because of the worker heartbeat). The scheduler thinks the next task can be > SCHEDULED -> QUEUED, but does not start as the worker is still busy, and > therefore it goes back to QUEUED -> SCHEDULED. The task is still in the > worker queue, causing the failure above when the worker eventually wants to > start it. > However what is a mystery to me is why it works the first time the dag_run > runs, and not the second time. Perhaps it is something specific to my > environment. > I'm going to try and debug this myself but if anyone else can replicate this > issue in their environment it could
[jira] [Updated] (AIRFLOW-6190) Task instances queued and dequeued before worker is ready, causing intermittently failed tasks
[ https://issues.apache.org/jira/browse/AIRFLOW-6190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bjorn Olsen updated AIRFLOW-6190: - Description: Below dag creates 20 identical simple tasks which depend on each other in series. Installing the DAG and executing all the DAG runs works perfectly the first time around. Then Pausing the DAG, clearing all the dag runs, and unpausing the DAG leads to intermittent task failures. Edit: This isn't specifically tied to the first and second round; it seems to randomly affect an entire set of dag runs or not affect the set at all. This makes me suspect a timing issue between the executor and scheduler (sometimes they align and sometimes they dont). {code:java} from builtins import range from datetime import timedelta import airflow from airflow.models import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators.latest_only_operator import LatestOnlyOperator from airflow.operators.python_operator import (BranchPythonOperator, PythonOperator) import sys, os args = { 'owner': 'airflow', 'start_date': airflow.utils.dates.days_ago(5), } dag = DAG( dag_id='bug_testing_dag', default_args=args, schedule_interval='@daily', max_active_runs=1 ) def func(): pass prev_task = None for i in range(0,20): task = PythonOperator( task_id='task_{0}'.format(i), python_callable=func, dag=dag,) if prev_task: prev_task >> task prev_task = task if __name__ == "__main__": dag.cli(){code} I am using the LocalExecutor. job_heartbeat_sec = 5 scheduler_heartbeat_sec = 5 Example: !image-2019-12-06-13-55-33-974.png|width=398,height=276! The second attempt tasks have 2 Logs shown on the UI if they were successful, and 2 physical log files on disk. However the tasks that Failed only have 1 log shown on the UI, despite there being 2 physical log files on disk. (Presumably the UI uses the Airflow DB which for some reason isn't aware of the second log for the failed tasks). Anyway I am more interested in the intermittent failures than what logs are shown on the UI. Here is an example of the second log file for the Failed task attempts: {code:java} [2019-12-06 13:40:57,064] {taskinstance.py:624} INFO - Dependencies not met for , dependency 'Task Instance State' FAILED: Task is in the 'scheduled' state which is not a valid state for execution. The task must be cleared in order to be run. [2019-12-06 13:40:57,065] {logging_mixin.py:112} INFO - [2019-12-06 13:40:57,065] {local_task_job.py:91} INFO - Task is not able to be run [2019-12-06 13:41:09,004] {taskinstance.py:624} INFO - Dependencies not met for , dependency 'Task Instance State' FAILED: Task is in the 'failed' state which is not a valid state for execution. The task must be cleared in order to be run. [2019-12-06 13:41:09,005] {logging_mixin.py:112} INFO - [2019-12-06 13:41:09,005] {local_task_job.py:91} INFO - Task is not able to be run {code} At first I thought this was because the workers were still busy with the previous TaskInstance (because there is a delay between when a TaskInstance state is set to SUCCESS, and when the worker is actually done with it, because of the worker heartbeat). The scheduler thinks the next task can be SCHEDULED -> QUEUED, but does not start as the worker is still busy, and therefore it goes back to QUEUED -> SCHEDULED. The task is still in the worker queue, causing the failure above when the worker eventually wants to start it. However what is a mystery to me is why it works the first time the dag_run runs, and not the second time. Perhaps it is something specific to my environment. I'm going to try and debug this myself but if anyone else can replicate this issue in their environment it could help me understand if it is just affecting me (or not). Just install the DAG, let it run 100% once, then clear it and let it run again (and you should start seeing random failures) was: Below dag creates 20 identical simple tasks which depend on each other in series. Installing the DAG and executing all the DAG runs works perfectly the first time around. Then Pausing the DAG, clearing all the dag runs, and unpausing the DAG leads to intermittent task failures. Edit: This isn't specifically tied to the first and second round; it seems to randomly affect an entire set of dag runs or not affect the set at all. This makes me suspect a timing issue between the executor and scheduler (sometimes they align and sometimes they dont). {code:java} from builtins import range from datetime import timedelta import airflow from airflow.models import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators.latest_only_operator import LatestOnlyOperator from airflow.operators.python_operator import
[jira] [Updated] (AIRFLOW-6190) Task instances queued and dequeued before worker is ready, causing intermittently failed tasks
[ https://issues.apache.org/jira/browse/AIRFLOW-6190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bjorn Olsen updated AIRFLOW-6190: - Description: Below dag creates 20 identical simple tasks which depend on each other in series. Installing the DAG and executing all the DAG runs works perfectly the first time around. Then Pausing the DAG, clearing all the dag runs, and unpausing the DAG leads to intermittent task failures. Edit: This isn't specifically tied to the first and second round; it seems to randomly affect an entire set of dag runs or not affect the set at all. {code:java} from builtins import range from datetime import timedelta import airflow from airflow.models import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators.latest_only_operator import LatestOnlyOperator from airflow.operators.python_operator import (BranchPythonOperator, PythonOperator) import sys, os args = { 'owner': 'airflow', 'start_date': airflow.utils.dates.days_ago(5), } dag = DAG( dag_id='bug_testing_dag', default_args=args, schedule_interval='@daily', max_active_runs=1 ) def func(): pass prev_task = None for i in range(0,20): task = PythonOperator( task_id='task_{0}'.format(i), python_callable=func, dag=dag,) if prev_task: prev_task >> task prev_task = task if __name__ == "__main__": dag.cli(){code} I am using the LocalExecutor. Example: !image-2019-12-06-13-55-33-974.png|width=398,height=276! The second attempt tasks have 2 Logs shown on the UI if they were successful, and 2 physical log files on disk. However the tasks that Failed only have 1 log shown on the UI, despite there being 2 physical log files on disk. (Presumably the UI uses the Airflow DB which for some reason isn't aware of the second log for the failed tasks). Anyway I am more interested in the intermittent failures than what logs are shown on the UI. Here is an example of the second log file for the Failed task attempts: {code:java} [2019-12-06 13:40:57,064] {taskinstance.py:624} INFO - Dependencies not met for , dependency 'Task Instance State' FAILED: Task is in the 'scheduled' state which is not a valid state for execution. The task must be cleared in order to be run. [2019-12-06 13:40:57,065] {logging_mixin.py:112} INFO - [2019-12-06 13:40:57,065] {local_task_job.py:91} INFO - Task is not able to be run [2019-12-06 13:41:09,004] {taskinstance.py:624} INFO - Dependencies not met for , dependency 'Task Instance State' FAILED: Task is in the 'failed' state which is not a valid state for execution. The task must be cleared in order to be run. [2019-12-06 13:41:09,005] {logging_mixin.py:112} INFO - [2019-12-06 13:41:09,005] {local_task_job.py:91} INFO - Task is not able to be run {code} At first I thought this was because the workers were still busy with the previous TaskInstance (because there is a delay between when a TaskInstance state is set to SUCCESS, and when the worker is actually done with it, because of the worker heartbeat). The scheduler thinks the next task can be SCHEDULED -> QUEUED, but does not start as the worker is still busy, and therefore it goes back to QUEUED -> SCHEDULED. The task is still in the worker queue, causing the failure above when the worker eventually wants to start it. However what is a mystery to me is why it works the first time the dag_run runs, and not the second time. Perhaps it is something specific to my environment. I'm going to try and debug this myself but if anyone else can replicate this issue in their environment it could help me understand if it is just affecting me (or not). Just install the DAG, let it run 100% once, then clear it and let it run again (and you should start seeing random failures) was: Below dag creates 20 identical simple tasks which depend on each other in series. Installing the DAG and executing all the DAG runs works perfectly the first time around. Then Pausing the DAG, clearing all the dag runs, and unpausing the DAG leads to intermittent task failures. {code:java} from builtins import range from datetime import timedelta import airflow from airflow.models import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators.latest_only_operator import LatestOnlyOperator from airflow.operators.python_operator import (BranchPythonOperator, PythonOperator) import sys, os args = { 'owner': 'airflow', 'start_date': airflow.utils.dates.days_ago(5), } dag = DAG( dag_id='bug_testing_dag', default_args=args, schedule_interval='@daily', max_active_runs=1 ) def func(): pass prev_task = None for i in range(0,20): task = PythonOperator( task_id='task_{0}'.format(i),
[jira] [Updated] (AIRFLOW-6190) Task instances queued and dequeued before worker is ready, causing intermittently failed tasks
[ https://issues.apache.org/jira/browse/AIRFLOW-6190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bjorn Olsen updated AIRFLOW-6190: - Description: Below dag creates 20 identical simple tasks which depend on each other in series. Installing the DAG and executing all the DAG runs works perfectly the first time around. Then Pausing the DAG, clearing all the dag runs, and unpausing the DAG leads to intermittent task failures. Edit: This isn't specifically tied to the first and second round; it seems to randomly affect an entire set of dag runs or not affect the set at all. This makes me suspect a timing issue between the executor and scheduler (sometimes they align and sometimes they dont). {code:java} from builtins import range from datetime import timedelta import airflow from airflow.models import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators.latest_only_operator import LatestOnlyOperator from airflow.operators.python_operator import (BranchPythonOperator, PythonOperator) import sys, os args = { 'owner': 'airflow', 'start_date': airflow.utils.dates.days_ago(5), } dag = DAG( dag_id='bug_testing_dag', default_args=args, schedule_interval='@daily', max_active_runs=1 ) def func(): pass prev_task = None for i in range(0,20): task = PythonOperator( task_id='task_{0}'.format(i), python_callable=func, dag=dag,) if prev_task: prev_task >> task prev_task = task if __name__ == "__main__": dag.cli(){code} I am using the LocalExecutor. Example: !image-2019-12-06-13-55-33-974.png|width=398,height=276! The second attempt tasks have 2 Logs shown on the UI if they were successful, and 2 physical log files on disk. However the tasks that Failed only have 1 log shown on the UI, despite there being 2 physical log files on disk. (Presumably the UI uses the Airflow DB which for some reason isn't aware of the second log for the failed tasks). Anyway I am more interested in the intermittent failures than what logs are shown on the UI. Here is an example of the second log file for the Failed task attempts: {code:java} [2019-12-06 13:40:57,064] {taskinstance.py:624} INFO - Dependencies not met for , dependency 'Task Instance State' FAILED: Task is in the 'scheduled' state which is not a valid state for execution. The task must be cleared in order to be run. [2019-12-06 13:40:57,065] {logging_mixin.py:112} INFO - [2019-12-06 13:40:57,065] {local_task_job.py:91} INFO - Task is not able to be run [2019-12-06 13:41:09,004] {taskinstance.py:624} INFO - Dependencies not met for , dependency 'Task Instance State' FAILED: Task is in the 'failed' state which is not a valid state for execution. The task must be cleared in order to be run. [2019-12-06 13:41:09,005] {logging_mixin.py:112} INFO - [2019-12-06 13:41:09,005] {local_task_job.py:91} INFO - Task is not able to be run {code} At first I thought this was because the workers were still busy with the previous TaskInstance (because there is a delay between when a TaskInstance state is set to SUCCESS, and when the worker is actually done with it, because of the worker heartbeat). The scheduler thinks the next task can be SCHEDULED -> QUEUED, but does not start as the worker is still busy, and therefore it goes back to QUEUED -> SCHEDULED. The task is still in the worker queue, causing the failure above when the worker eventually wants to start it. However what is a mystery to me is why it works the first time the dag_run runs, and not the second time. Perhaps it is something specific to my environment. I'm going to try and debug this myself but if anyone else can replicate this issue in their environment it could help me understand if it is just affecting me (or not). Just install the DAG, let it run 100% once, then clear it and let it run again (and you should start seeing random failures) was: Below dag creates 20 identical simple tasks which depend on each other in series. Installing the DAG and executing all the DAG runs works perfectly the first time around. Then Pausing the DAG, clearing all the dag runs, and unpausing the DAG leads to intermittent task failures. Edit: This isn't specifically tied to the first and second round; it seems to randomly affect an entire set of dag runs or not affect the set at all. {code:java} from builtins import range from datetime import timedelta import airflow from airflow.models import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators.latest_only_operator import LatestOnlyOperator from airflow.operators.python_operator import (BranchPythonOperator, PythonOperator) import sys, os args = { 'owner': 'airflow', 'start_date':
[jira] [Updated] (AIRFLOW-6190) Task instances queued and dequeued before worker is ready, causing intermittently failed tasks
[ https://issues.apache.org/jira/browse/AIRFLOW-6190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bjorn Olsen updated AIRFLOW-6190: - Description: Below dag creates 20 identical simple tasks which depend on each other in series. Installing the DAG and executing all the DAG runs works perfectly the first time around. Then Pausing the DAG, clearing all the dag runs, and unpausing the DAG leads to intermittent task failures. {code:java} from builtins import range from datetime import timedelta import airflow from airflow.models import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators.latest_only_operator import LatestOnlyOperator from airflow.operators.python_operator import (BranchPythonOperator, PythonOperator) import sys, os args = { 'owner': 'airflow', 'start_date': airflow.utils.dates.days_ago(5), } dag = DAG( dag_id='bug_testing_dag', default_args=args, schedule_interval='@daily', max_active_runs=1 ) def func(): pass prev_task = None for i in range(0,20): task = PythonOperator( task_id='task_{0}'.format(i), python_callable=func, dag=dag,) if prev_task: prev_task >> task prev_task = task if __name__ == "__main__": dag.cli(){code} I am using the LocalExecutor. Example: !image-2019-12-06-13-55-33-974.png|width=398,height=276! The second attempt tasks have 2 Logs shown on the UI if they were successful, and 2 physical log files on disk. However the tasks that Failed only have 1 log shown on the UI, despite there being 2 physical log files on disk. (Presumably the UI uses the Airflow DB which for some reason isn't aware of the second log for the failed tasks). Anyway I am more interested in the intermittent failures than what logs are shown on the UI. Here is an example of the second log file for the Failed task attempts: {code:java} [2019-12-06 13:40:57,064] {taskinstance.py:624} INFO - Dependencies not met for , dependency 'Task Instance State' FAILED: Task is in the 'scheduled' state which is not a valid state for execution. The task must be cleared in order to be run. [2019-12-06 13:40:57,065] {logging_mixin.py:112} INFO - [2019-12-06 13:40:57,065] {local_task_job.py:91} INFO - Task is not able to be run [2019-12-06 13:41:09,004] {taskinstance.py:624} INFO - Dependencies not met for , dependency 'Task Instance State' FAILED: Task is in the 'failed' state which is not a valid state for execution. The task must be cleared in order to be run. [2019-12-06 13:41:09,005] {logging_mixin.py:112} INFO - [2019-12-06 13:41:09,005] {local_task_job.py:91} INFO - Task is not able to be run {code} At first I thought this was because the workers were still busy with the previous TaskInstance (because there is a delay between when a TaskInstance state is set to SUCCESS, and when the worker is actually done with it, because of the worker heartbeat). The scheduler thinks the next task can be SCHEDULED -> QUEUED, but does not start as the worker is still busy, and therefore it goes back to QUEUED -> SCHEDULED. The task is still in the worker queue, causing the failure above when the worker eventually wants to start it. However what is a mystery to me is why it works the first time the dag_run runs, and not the second time. Perhaps it is something specific to my environment. I'm going to try and debug this myself but if anyone else can replicate this issue in their environment it could help me understand if it is just affecting me (or not). Just install the DAG, let it run 100% once, then clear it and let it run again (and you should start seeing random failures) was: Below dag creates 20 identical simple tasks which depend on each other in series. Installing the DAG and executing all the DAG runs works perfectly the first time around. Then Pausing the DAG, clearing all the dag runs, and unpausing the DAG leads to intermittent task failures. {code:java} from builtins import range from datetime import timedelta import airflow from airflow.models import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators.latest_only_operator import LatestOnlyOperator from airflow.operators.python_operator import (BranchPythonOperator, PythonOperator) import sys, os args = { 'owner': 'airflow', 'start_date': airflow.utils.dates.days_ago(5), } dag = DAG( dag_id='bug_testing_dag', default_args=args, schedule_interval='@daily', max_active_runs=1 ) def func(): pass prev_task = None for i in range(0,20): task = PythonOperator( task_id='task_{0}'.format(i), python_callable=func, dag=dag,) if prev_task: prev_task >> task prev_task = task if __name__ == "__main__": dag.cli(){code} I am using
[jira] [Updated] (AIRFLOW-6190) Task instances queued and dequeued before worker is ready, causing intermittently failed tasks
[ https://issues.apache.org/jira/browse/AIRFLOW-6190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bjorn Olsen updated AIRFLOW-6190: - Description: Below dag creates 20 identical simple tasks which depend on each other in series. Installing the DAG and executing all the DAG runs works perfectly the first time around. Then Pausing the DAG, clearing all the dag runs, and unpausing the DAG leads to intermittent task failures. {code:java} from builtins import range from datetime import timedelta import airflow from airflow.models import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators.latest_only_operator import LatestOnlyOperator from airflow.operators.python_operator import (BranchPythonOperator, PythonOperator) import sys, os args = { 'owner': 'airflow', 'start_date': airflow.utils.dates.days_ago(5), } dag = DAG( dag_id='bug_testing_dag', default_args=args, schedule_interval='@daily', max_active_runs=1 ) def func(): pass prev_task = None for i in range(0,20): task = PythonOperator( task_id='task_{0}'.format(i), python_callable=func, dag=dag,) if prev_task: prev_task >> task prev_task = task if __name__ == "__main__": dag.cli(){code} I am using the SequentialExecutor. Example: !image-2019-12-06-13-55-33-974.png|width=398,height=276! The second attempt tasks have 2 Logs shown on the UI if they were successful, and 2 physical log files on disk. However the tasks that Failed only have 1 log shown on the UI, despite there being 2 physical log files on disk. (Presumably the UI uses the Airflow DB which for some reason isn't aware of the second log for the failed tasks). Anyway I am more interested in the intermittent failures than what logs are shown on the UI. Here is an example of the second log file for the Failed task attempts: {code:java} [2019-12-06 13:40:57,064] {taskinstance.py:624} INFO - Dependencies not met for , dependency 'Task Instance State' FAILED: Task is in the 'scheduled' state which is not a valid state for execution. The task must be cleared in order to be run. [2019-12-06 13:40:57,065] {logging_mixin.py:112} INFO - [2019-12-06 13:40:57,065] {local_task_job.py:91} INFO - Task is not able to be run [2019-12-06 13:41:09,004] {taskinstance.py:624} INFO - Dependencies not met for , dependency 'Task Instance State' FAILED: Task is in the 'failed' state which is not a valid state for execution. The task must be cleared in order to be run. [2019-12-06 13:41:09,005] {logging_mixin.py:112} INFO - [2019-12-06 13:41:09,005] {local_task_job.py:91} INFO - Task is not able to be run {code} At first I thought this was because the workers were still busy with the previous TaskInstance (because there is a delay between when a TaskInstance state is set to SUCCESS, and when the worker is actually done with it, because of the worker heartbeat). The scheduler thinks the next task can be SCHEDULED -> QUEUED, but does not start as the worker is still busy, and therefore it goes back to QUEUED -> SCHEDULED. The task is still in the worker queue, causing the failure above when the worker eventually wants to start it. However what is a mystery to me is why it works the first time the dag_run runs, and not the second time. Perhaps it is something specific to my environment. I'm going to try and debug this myself but if anyone else can replicate this issue in their environment it could help me understand if it is just affecting me (or not). Just install the DAG, let it run 100% once, then clear it and let it run again (and you should start seeing random failures) was: Below dag creates 20 identical simple tasks which depend on each other in series. Installing the DAG and executing all the DAG runs works perfectly the first time around. Then Pausing the DAG, clearing all the dag runs, and unpausing the DAG leads to intermittent task failures. {code:java} from builtins import range from datetime import timedelta import airflow from airflow.models import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators.latest_only_operator import LatestOnlyOperator from airflow.operators.python_operator import (BranchPythonOperator, PythonOperator) import sys, os args = { 'owner': 'airflow', 'start_date': airflow.utils.dates.days_ago(5), } dag = DAG( dag_id='bug_testing_dag', default_args=args, schedule_interval='@daily', max_active_runs=1 ) def func(): pass prev_task = None for i in range(0,20): task = PythonOperator( task_id='task_{0}'.format(i), python_callable=func, dag=dag,) if prev_task: prev_task >> task prev_task = task if __name__ == "__main__": dag.cli(){code}
[jira] [Updated] (AIRFLOW-6190) Task instances queued and dequeued before worker is ready, causing intermittently failed tasks
[ https://issues.apache.org/jira/browse/AIRFLOW-6190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bjorn Olsen updated AIRFLOW-6190: - Description: Below dag creates 20 identical simple tasks which depend on each other in series. Installing the DAG and executing all the DAG runs works perfectly the first time around. Then Pausing the DAG, clearing all the dag runs, and unpausing the DAG leads to intermittent task failures. {code:java} from builtins import range from datetime import timedelta import airflow from airflow.models import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators.latest_only_operator import LatestOnlyOperator from airflow.operators.python_operator import (BranchPythonOperator, PythonOperator) import sys, os args = { 'owner': 'airflow', 'start_date': airflow.utils.dates.days_ago(5), } dag = DAG( dag_id='bug_testing_dag', default_args=args, schedule_interval='@daily', max_active_runs=1 ) def func(): pass prev_task = None for i in range(0,20): task = PythonOperator( task_id='task_{0}'.format(i), python_callable=func, dag=dag,) if prev_task: prev_task >> task prev_task = task if __name__ == "__main__": dag.cli(){code} Example: !image-2019-12-06-13-55-33-974.png|width=398,height=276! The second attempt tasks have 2 Logs shown on the UI if they were successful, and 2 physical log files on disk. However the tasks that Failed only have 1 log shown on the UI, despite there being 2 physical log files on disk. (Presumably the UI uses the Airflow DB which for some reason isn't aware of the second log for the failed tasks). Anyway I am more interested in the intermittent failures than what logs are shown on the UI. Here is an example of the second log file for the Failed task attempts: {code:java} [2019-12-06 13:40:57,064] {taskinstance.py:624} INFO - Dependencies not met for , dependency 'Task Instance State' FAILED: Task is in the 'scheduled' state which is not a valid state for execution. The task must be cleared in order to be run. [2019-12-06 13:40:57,065] {logging_mixin.py:112} INFO - [2019-12-06 13:40:57,065] {local_task_job.py:91} INFO - Task is not able to be run [2019-12-06 13:41:09,004] {taskinstance.py:624} INFO - Dependencies not met for , dependency 'Task Instance State' FAILED: Task is in the 'failed' state which is not a valid state for execution. The task must be cleared in order to be run. [2019-12-06 13:41:09,005] {logging_mixin.py:112} INFO - [2019-12-06 13:41:09,005] {local_task_job.py:91} INFO - Task is not able to be run {code} At first I thought this was because the workers were still busy with the previous TaskInstance (because there is a delay between when a TaskInstance state is set to SUCCESS, and when the worker is actually done with it, because of the worker heartbeat). The scheduler thinks the next task can be SCHEDULED -> QUEUED, but does not start as the worker is still busy, and therefore it goes back to QUEUED -> SCHEDULED. The task is still in the worker queue, causing the failure above when the worker eventually wants to start it. However what is a mystery to me is why it works the first time the dag_run runs, and not the second time. Perhaps it is something specific to my environment. I'm going to try and debug this myself but if anyone else can replicate this issue in their environment it could help me understand if it is just affecting me (or not). Just install the DAG, let it run 100% once, then clear it and let it run again (and you should start seeing random failures) was: Below dag creates 20 identical simple tasks which depend on each other in series. Installing the DAG and executing all the DAG runs works perfectly the first time around. Then Pausing the DAG, clearing all the dag runs, and unpausing the DAG leads to intermittent task failures. {code:java} from builtins import range from datetime import timedelta import airflow from airflow.models import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators.latest_only_operator import LatestOnlyOperator from airflow.operators.python_operator import (BranchPythonOperator, PythonOperator) import sys, os args = { 'owner': 'airflow', 'start_date': airflow.utils.dates.days_ago(5), } dag = DAG( dag_id='bug_testing_dag', default_args=args, schedule_interval='@daily', max_active_runs=1 ) def func(): pass prev_task = None for i in range(0,20): task = PythonOperator( task_id='task_{0}'.format(i), python_callable=func, dag=dag,) if prev_task: prev_task >> task prev_task = task if __name__ == "__main__": dag.cli(){code} Example:
[jira] [Created] (AIRFLOW-6190) Task instances queued and dequeued before worker is ready, causing intermittently failed tasks
Bjorn Olsen created AIRFLOW-6190: Summary: Task instances queued and dequeued before worker is ready, causing intermittently failed tasks Key: AIRFLOW-6190 URL: https://issues.apache.org/jira/browse/AIRFLOW-6190 Project: Apache Airflow Issue Type: Bug Components: core Affects Versions: 1.10.6 Reporter: Bjorn Olsen Assignee: Bjorn Olsen Attachments: image-2019-12-06-13-55-33-974.png Below dag creates 20 identical simple tasks which depend on each other in series. Installing the DAG and executing all the DAG runs works perfectly the first time around. Then Pausing the DAG, clearing all the dag runs, and unpausing the DAG leads to intermittent task failures. {code:java} from builtins import range from datetime import timedelta import airflow from airflow.models import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators.latest_only_operator import LatestOnlyOperator from airflow.operators.python_operator import (BranchPythonOperator, PythonOperator) import sys, os args = { 'owner': 'airflow', 'start_date': airflow.utils.dates.days_ago(5), } dag = DAG( dag_id='bug_testing_dag', default_args=args, schedule_interval='@daily', max_active_runs=1 ) def func(): pass prev_task = None for i in range(0,20): task = PythonOperator( task_id='task_{0}'.format(i), python_callable=func, dag=dag,) if prev_task: prev_task >> task prev_task = task if __name__ == "__main__": dag.cli(){code} Example: !image-2019-12-06-13-55-33-974.png|width=398,height=276! The second attempt tasks have 2 Logs if they were successful. However the tasks only have 1 log on the UI if they failed. On the OS, you can see a log file for the Failed task attempts: {code:java} [2019-12-06 13:40:57,064] {taskinstance.py:624} INFO - Dependencies not met for , dependency 'Task Instance State' FAILED: Task is in the 'scheduled' state which is not a valid state for execution. The task must be cleared in order to be run. [2019-12-06 13:40:57,065] {logging_mixin.py:112} INFO - [2019-12-06 13:40:57,065] {local_task_job.py:91} INFO - Task is not able to be run [2019-12-06 13:41:09,004] {taskinstance.py:624} INFO - Dependencies not met for , dependency 'Task Instance State' FAILED: Task is in the 'failed' state which is not a valid state for execution. The task must be cleared in order to be run. [2019-12-06 13:41:09,005] {logging_mixin.py:112} INFO - [2019-12-06 13:41:09,005] {local_task_job.py:91} INFO - Task is not able to be run {code} At first I thought this was because the workers were still busy with the previous TaskInstance (there is a delay between when a TaskInstance state is set to SUCCESS, and when the worker is actually done with it, because of the worker heartbeat). The scheduler thinks the next task can be SCHEDULED -> QUEUED, but does not start as the worker is still busy, and therefore it goes back to QUEUED -> SCHEDULED. The task is still in the worker queue, causing the failure above when the worker eventually wants to start it. However what is a mystery to me is why it works the first time the dag_run runs, and not the second time. Perhaps it is something specific to my environment. I'm going to try and debug this myself but if anyone else can replicate this issue in their environment it could help me understand if it is just affecting me (or not). Just install the DAG, let it run 100% once, then clear it and let it run again (and you should start seeing random failures) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (AIRFLOW-6083) AwsLambdaHook is not accepting non-default configuration
[ https://issues.apache.org/jira/browse/AIRFLOW-6083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16987709#comment-16987709 ] Bjorn Olsen edited comment on AIRFLOW-6083 at 12/4/19 10:15 AM: This should be resolved with AIRFLOW-6072 After this PR gets merged you could use the "Extra" part of the Airflow Connection to pass additional config to the boto3 client / resource / session as you please. Edit: You should also be able to change the timeout this way, if desired. was (Author: bjorn.ols...@gmail.com): This should be resolved with AIRFLOW-6072 After this PR gets merged you could use the "Extra" part of the Airflow Connection to pass additional config to the boto3 client / resource / session as you please. > AwsLambdaHook is not accepting non-default configuration > > > Key: AIRFLOW-6083 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6083 > Project: Apache Airflow > Issue Type: Bug > Components: aws >Affects Versions: 1.10.6 >Reporter: Daumantas Pagojus >Assignee: Daumantas Pagojus >Priority: Blocker > > Hello. > While using Airflow we have come across a problem with AwsLambdaHook. > We are using this hook to launch Lambda function which takes a while to > complete (around 5-6 minutes). However, since lambda is invoked through boto3 > client, which has default timeout set to [60 > seconds|https://aws.amazon.com/premiumsupport/knowledge-center/lambda-function-retry-timeout-sdk/], > our Airflow interface shows this lambda failing, even though Lambda finishes > successfully checking at AWS console. This also causes another side effect: > Since boto3 thinks that Lambda has timed out, it automatically spawns another > instance, which also times out and this chain lasts 5 times, spawning 5 > Lambdas and all these Lambdas show as failed in Airflow interface, while they > actually succeed. > > This can be solved by passing in custom configuration when creating a boto3 > client, however, it is not possible to do that when creating AwsLambdaHook as > it does not take in this parameter. > However, we see that AwsLambdaHook inherits and uses AwsHook's function > (get_client_type) to get the boto3 client and this function accepts > configuration parameter (which defaults to None), but it is never passed to > it from the Lambda's hook, which could be easily achieved and would fix the > bug we are facing at the moment. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (AIRFLOW-6083) AwsLambdaHook is not accepting non-default configuration
[ https://issues.apache.org/jira/browse/AIRFLOW-6083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16987709#comment-16987709 ] Bjorn Olsen commented on AIRFLOW-6083: -- This should be resolved with AIRFLOW-6072 After this PR gets merged you could use the "Extra" part of the Airflow Connection to pass additional config to the boto3 client / resource / session as you please. > AwsLambdaHook is not accepting non-default configuration > > > Key: AIRFLOW-6083 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6083 > Project: Apache Airflow > Issue Type: Bug > Components: aws >Affects Versions: 1.10.6 >Reporter: Daumantas Pagojus >Assignee: Daumantas Pagojus >Priority: Blocker > > Hello. > While using Airflow we have come across a problem with AwsLambdaHook. > We are using this hook to launch Lambda function which takes a while to > complete (around 5-6 minutes). However, since lambda is invoked through boto3 > client, which has default timeout set to [60 > seconds|https://aws.amazon.com/premiumsupport/knowledge-center/lambda-function-retry-timeout-sdk/], > our Airflow interface shows this lambda failing, even though Lambda finishes > successfully checking at AWS console. This also causes another side effect: > Since boto3 thinks that Lambda has timed out, it automatically spawns another > instance, which also times out and this chain lasts 5 times, spawning 5 > Lambdas and all these Lambdas show as failed in Airflow interface, while they > actually succeed. > > This can be solved by passing in custom configuration when creating a boto3 > client, however, it is not possible to do that when creating AwsLambdaHook as > it does not take in this parameter. > However, we see that AwsLambdaHook inherits and uses AwsHook's function > (get_client_type) to get the boto3 client and this function accepts > configuration parameter (which defaults to None), but it is never passed to > it from the Lambda's hook, which could be easily achieved and would fix the > bug we are facing at the moment. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (AIRFLOW-4355) Externally triggered DAG is marked as 'success' even if a task has been 'removed'!
[ https://issues.apache.org/jira/browse/AIRFLOW-4355?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bjorn Olsen reassigned AIRFLOW-4355: Assignee: Bjorn Olsen > Externally triggered DAG is marked as 'success' even if a task has been > 'removed'! > -- > > Key: AIRFLOW-4355 > URL: https://issues.apache.org/jira/browse/AIRFLOW-4355 > Project: Apache Airflow > Issue Type: Bug > Components: DAG, DagRun, scheduler >Affects Versions: 1.10.3 >Reporter: t oo >Assignee: Bjorn Olsen >Priority: Blocker > Labels: dynamic > Fix For: 2.0.0 > > Attachments: dag_success_even_if_task_removed.png, treeview.png > > > note: all my dags are purely externally triggered > *Issue:* Dag has 5 parallel tasks that ran successfully and 1 final task that > somehow got 'removed' state (prior dag runs had 'failed' state) and never ran > successfully but still the DAG is showing success! > > *Command ran* (note that previous commands like airflow trigger_dag -e > 20190412 qsr_coremytbl were run before and failed for valid reason (ie python > task failing) ): > airflow trigger_dag -e 20190412T08:00 qsr_coremytbl --conf '\{"hourstr":"08"}' > > *some logs on prior instance of airflow (ec2 was autohealed):* > [2019-04-18 08:29:40,678] \{logging_mixin.py:95} INFO - [2019-04-18 > 08:29:40,678] {__init__.py:4897} WARNING - Failed to get task ' qsr_coremytbl.REPAIR_HIVE_schemeh.mytbl 2019-04-12 08:00:00+00:00 [None]>' > for dag ''. Marking it as removed. > [2019-04-18 08:29:43,582] \{logging_mixin.py:95} INFO - [2019-04-18 > 08:29:43,582] {__init__.py:4906} INFO - Restoring task ' qsr_coremytbl.REPAIR_HIVE_schemeh.mytbl 2019-04-12 08:00:00+00:00 [removed]>' > which was previously removed from DAG '' > [2019-04-18 08:29:43,618] \{jobs.py:1787} INFO - Creating / updating > 08:00:00+00:00 [scheduled]> in ORM > [2019-04-18 08:29:43,676] \{logging_mixin.py:95} INFO - [2019-04-18 > 08:29:43,676] {__init__.py:4897} WARNING - Failed to get task ' qsr_coremytbl.REPAIR_HIVE_schemeh.mytbl 2019-04-12 08:00:00+00:00 > [scheduled]>' for dag ''. Marking it as removed. > > *some logs on newer ec2:* > [myuser@host logs]$ grep -i hive -R * | sed 's#[0-9]#x#g' | sort | uniq -c | > grep -v 'airflow-webserver-access.log' > 2 audit/airflow-audit.log:-xx-xx xx:xx:xx.xx qsr_coremytbl > REPAIR_HIVE_schemeh.mytbl log -xx-xx xx:xx:xx.xx rsawyerx > [('execution_date', u'-xx-xxTxx:xx:xx+xx:xx'), ('task_id', > u'REPAIR_HIVE_schemeh.mytbl'), ('dag_id', u'qsr_coremytbl')] > 1 audit/airflow-audit.log:-xx-xx xx:xx:xx.xx qsr_coremytbl > REPAIR_HIVE_schemeh.mytbl log -xx-xx xx:xx:xx.xx rsawyerx > [('execution_date', u'-xx-xxTxx:xx:xx+xx:xx'), ('task_id', > u'REPAIR_HIVE_schemeh.mytbl'), ('dag_id', u'qsr_coremytbl'), ('format', > u'json')] > 1 audit/airflow-audit.log:-xx-xx xx:xx:xx.xx qsr_coremytbl > REPAIR_HIVE_schemeh.mytbl rendered -xx-xx xx:xx:xx.xx rsawyerx > [('execution_date', u'-xx-xxTxx:xx:xx+xx:xx'), ('task_id', > u'REPAIR_HIVE_schemeh.mytbl'), ('dag_id', u'qsr_coremytbl')] > 1 audit/airflow-audit.log:-xx-xx xx:xx:xx.xx qsr_coremytbl > REPAIR_HIVE_schemeh.mytbl task -xx-xx xx:xx:xx.xx rsawyerx > [('execution_date', u'-xx-xxTxx:xx:xx+xx:xx'), ('task_id', > u'REPAIR_HIVE_schemeh.mytbl'), ('dag_id', u'qsr_coremytbl')] > 1 scheduler/latest/qsr_dag_generation.py.log:[-xx-xx xx:xx:xx,xxx] > \{jobs.py:} INFO - Creating / updating qsr_coremytbl.REPAIR_HIVE_schemeh.mytbl -xx-xx xx:xx:xx+xx:xx > [scheduled]> in ORM > 71 scheduler/latest/qsr_dag_generation.py.log:[-xx-xx xx:xx:xx,xxx] > \{logging_mixin.py:xx} INFO - [-xx-xx xx:xx:xx,xxx] {__init__.py:} > INFO - Restoring task ' -xx-xx xx:xx:xx+xx:xx [removed]>' which was previously removed from DAG > '' > 1 scheduler/-xx-xx/qsr_dag_generation.py.log:[-xx-xx xx:xx:xx,xxx] > \{jobs.py:} INFO - Creating / updating qsr_coremytbl.REPAIR_HIVE_schemeh.mytbl -xx-xx xx:xx:xx+xx:xx > [scheduled]> in ORM > 71 scheduler/-xx-xx/qsr_dag_generation.py.log:[-xx-xx xx:xx:xx,xxx] > \{logging_mixin.py:xx} INFO - [-xx-xx xx:xx:xx,xxx] {__init__.py:} > INFO - Restoring task ' -xx-xx xx:xx:xx+xx:xx [removed]>' which was previously removed from DAG > '' > > mysql> *select * from task_instance where task_id like '%REP%';#* > >
[jira] [Updated] (AIRFLOW-6170) BranchPythonOperator does not do XCom push of returned value
[ https://issues.apache.org/jira/browse/AIRFLOW-6170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bjorn Olsen updated AIRFLOW-6170: - Description: BranchPythonOperator subclasses PythonOperator, and after it has selected a branch we get a message like this in the task log: [2019-12-04 08:39:59,960] \{python_operator.py:114} INFO - Done. Returned value was: This returned value is from the execute() method. A user would expect the returned value from this to be pushed to an XCOM; but this does not happen due to no return value from the BranchPythonOperator's execute method. {code:java} def execute(self, context: Dict): branch = super().execute(context) self.skip_all_except(context['ti'], branch) {code} If we do an XCOM push of the result then we can use the decision made by BranchPythonOperator in downstream tasks. Eg consider this dependency chain: get >> branch >> [ create, update ]>> join >> execute The 'execute' task might need to know whether the 'branch' decided to create a new thing to run, or whether to use the existing one from the 'get'. Without an XCOM push from the branch return value, it is difficult to pick up the correct value later on. was: BranchPythonOperator subclasses PythonOperator and this means that after it has selected a branch we get a message like this: [2019-12-04 08:39:59,960] \{python_operator.py:114} INFO - Done. Returned value was: chosen_task This returned value is from the execute() method. A user would expect the returned value from this to be pushed to an XCOM but this does not happen due to no return value from the BranchPythonOperator's execute method. {code:java} def execute(self, context: Dict): branch = super().execute(context) self.skip_all_except(context['ti'], branch) {code} If we do an XCOM push of the result then we can use the decision made by BranchPythonOperator in downstream tasks. Eg consider this dependency chain: get >> branch >> [ create, update ]>> join >> execute The 'execute' task might need to know whether the 'branch' decided to create a new thing to run, or whether to use the existing one from the 'get'. Without an XCOM push from the branch return value, it is difficult to pick up the correct value later on. > BranchPythonOperator does not do XCom push of returned value > > > Key: AIRFLOW-6170 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6170 > Project: Apache Airflow > Issue Type: Bug > Components: operators >Affects Versions: 1.10.6 >Reporter: Bjorn Olsen >Assignee: Bjorn Olsen >Priority: Minor > > BranchPythonOperator subclasses PythonOperator, and after it has selected a > branch we get a message like this in the task log: > [2019-12-04 08:39:59,960] \{python_operator.py:114} INFO - Done. Returned > value was: > This returned value is from the execute() method. > A user would expect the returned value from this to be pushed to an XCOM; but > this does not happen due to no return value from the BranchPythonOperator's > execute method. > > {code:java} > def execute(self, context: Dict): > branch = super().execute(context) > self.skip_all_except(context['ti'], branch) > {code} > > If we do an XCOM push of the result then we can use the decision made by > BranchPythonOperator in downstream tasks. > Eg consider this dependency chain: > get >> branch >> [ create, update ]>> join >> execute > The 'execute' task might need to know whether the 'branch' decided to create > a new thing to run, or whether to use the existing one from the 'get'. > Without an XCOM push from the branch return value, it is difficult to pick up > the correct value later on. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (AIRFLOW-6170) BranchPythonOperator does not do XCom push of returned value
[ https://issues.apache.org/jira/browse/AIRFLOW-6170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bjorn Olsen updated AIRFLOW-6170: - Priority: Minor (was: Major) > BranchPythonOperator does not do XCom push of returned value > > > Key: AIRFLOW-6170 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6170 > Project: Apache Airflow > Issue Type: Improvement > Components: operators >Affects Versions: 1.10.6 >Reporter: Bjorn Olsen >Assignee: Bjorn Olsen >Priority: Minor > > BranchPythonOperator subclasses PythonOperator and this means that after it > has selected a branch we get a message like this: > [2019-12-04 08:39:59,960] \{python_operator.py:114} INFO - Done. Returned > value was: chosen_task > > This returned value is from the execute() method. A user would expect the > returned value from this to be pushed to an XCOM but this does not happen due > to no return value from the BranchPythonOperator's execute method. > > > {code:java} > def execute(self, context: Dict): > branch = super().execute(context) > self.skip_all_except(context['ti'], branch) > {code} > > If we do an XCOM push of the result then we can use the decision made by > BranchPythonOperator in downstream tasks. > Eg consider this dependency chain: > get >> branch >> [ create, update ]>> join >> execute > The 'execute' task might need to know whether the 'branch' decided to create > a new thing to run, or whether to use the existing one from the 'get'. > Without an XCOM push from the branch return value, it is difficult to pick up > the correct value later on. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (AIRFLOW-6170) BranchPythonOperator does not do XCom push of returned value
[ https://issues.apache.org/jira/browse/AIRFLOW-6170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bjorn Olsen updated AIRFLOW-6170: - Description: BranchPythonOperator subclasses PythonOperator and this means that after it has selected a branch we get a message like this: [2019-12-04 08:39:59,960] \{python_operator.py:114} INFO - Done. Returned value was: chosen_task This returned value is from the execute() method. A user would expect the returned value from this to be pushed to an XCOM but this does not happen due to no return value from the BranchPythonOperator's execute method. {code:java} def execute(self, context: Dict): branch = super().execute(context) self.skip_all_except(context['ti'], branch) {code} If we do an XCOM push of the result then we can use the decision made by BranchPythonOperator in downstream tasks. Eg consider this dependency chain: get >> branch >> [ create, update ]>> join >> execute The 'execute' task might need to know whether the 'branch' decided to create a new thing to run, or whether to use the existing one from the 'get'. Without an XCOM push from the branch return value, it is difficult to pick up the correct value later on. was: BranchPythonOperator subclasses PythonOperator and this means that after it has selected a branch we get a message like this: [2019-12-04 08:39:59,960] \{python_operator.py:114} INFO - Done. Returned value was: chosen_task This returned value is from the execute() method. A user would expect the returned value from this to be pushed to an XCOM but this does not happen due to no return value from the BranchPythonOperator's execute method. If we do an XCOM push of the result then we can use the decision made by BranchPythonOperator in downstream tasks. Eg consider this dependency chain: get >> branch >> [ create, update ]>> join >> execute The 'execute' task might need to know whether the 'branch' decided to create a new thing to run, or whether to use the existing one from the 'get'. Without an XCOM push from the branch return value, it is difficult to pick up the correct value later on. > BranchPythonOperator does not do XCom push of returned value > > > Key: AIRFLOW-6170 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6170 > Project: Apache Airflow > Issue Type: Improvement > Components: operators >Affects Versions: 1.10.6 >Reporter: Bjorn Olsen >Assignee: Bjorn Olsen >Priority: Major > > BranchPythonOperator subclasses PythonOperator and this means that after it > has selected a branch we get a message like this: > [2019-12-04 08:39:59,960] \{python_operator.py:114} INFO - Done. Returned > value was: chosen_task > > This returned value is from the execute() method. A user would expect the > returned value from this to be pushed to an XCOM but this does not happen due > to no return value from the BranchPythonOperator's execute method. > > > {code:java} > def execute(self, context: Dict): > branch = super().execute(context) > self.skip_all_except(context['ti'], branch) > {code} > > If we do an XCOM push of the result then we can use the decision made by > BranchPythonOperator in downstream tasks. > Eg consider this dependency chain: > get >> branch >> [ create, update ]>> join >> execute > The 'execute' task might need to know whether the 'branch' decided to create > a new thing to run, or whether to use the existing one from the 'get'. > Without an XCOM push from the branch return value, it is difficult to pick up > the correct value later on. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (AIRFLOW-6170) BranchPythonOperator does not do XCom push of returned value
[ https://issues.apache.org/jira/browse/AIRFLOW-6170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bjorn Olsen updated AIRFLOW-6170: - Issue Type: Bug (was: Improvement) > BranchPythonOperator does not do XCom push of returned value > > > Key: AIRFLOW-6170 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6170 > Project: Apache Airflow > Issue Type: Bug > Components: operators >Affects Versions: 1.10.6 >Reporter: Bjorn Olsen >Assignee: Bjorn Olsen >Priority: Minor > > BranchPythonOperator subclasses PythonOperator and this means that after it > has selected a branch we get a message like this: > [2019-12-04 08:39:59,960] \{python_operator.py:114} INFO - Done. Returned > value was: chosen_task > > This returned value is from the execute() method. A user would expect the > returned value from this to be pushed to an XCOM but this does not happen due > to no return value from the BranchPythonOperator's execute method. > > > {code:java} > def execute(self, context: Dict): > branch = super().execute(context) > self.skip_all_except(context['ti'], branch) > {code} > > If we do an XCOM push of the result then we can use the decision made by > BranchPythonOperator in downstream tasks. > Eg consider this dependency chain: > get >> branch >> [ create, update ]>> join >> execute > The 'execute' task might need to know whether the 'branch' decided to create > a new thing to run, or whether to use the existing one from the 'get'. > Without an XCOM push from the branch return value, it is difficult to pick up > the correct value later on. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (AIRFLOW-6170) BranchPythonOperator does not do XCom push of returned value
[ https://issues.apache.org/jira/browse/AIRFLOW-6170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bjorn Olsen updated AIRFLOW-6170: - Description: BranchPythonOperator subclasses PythonOperator and this means that after it has selected a branch we get a message like this: [2019-12-04 08:39:59,960] \{python_operator.py:114} INFO - Done. Returned value was: chosen_task This returned value is from the execute() method. A user would expect the returned value from this to be pushed to an XCOM but this does not happen due to no return value from the BranchPythonOperator's execute method. If we do an XCOM push of the result then we can use the decision made by BranchPythonOperator in downstream tasks. Eg consider this dependency chain: get >> branch >> [ create, update ]>> join >> execute The 'execute' task might need to know whether the 'branch' decided to create a new thing to run, or whether to use the existing one from the 'get'. Without an XCOM push from the branch return value, it is difficult to pick up the correct value later on. was: BranchPythonOperator subclasses PythonOperator and this means that after it has selected a branch we get a message like this: [2019-12-04 08:39:59,960] \{python_operator.py:114} INFO - Done. Returned value was: chosen_task This returned value is from the execute() method. A user would expect the returned value from this to be pushed to an XCOM but this does not happen due to no return value from the BranchPythonOperator. If we do an XCOM push of the result then we can use the decision made by BranchPythonOperator in downstream tasks. Eg consider this dependency chain: get >> branch >> [ create, update ]>> join >> execute The 'execute' task might need to know whether the 'branch' decided to create a new thing to run, or whether to use the existing one from the 'get'. Without an XCOM push from the branch return value, it is difficult to pick up the correct value later on. > BranchPythonOperator does not do XCom push of returned value > > > Key: AIRFLOW-6170 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6170 > Project: Apache Airflow > Issue Type: Improvement > Components: operators >Affects Versions: 1.10.6 >Reporter: Bjorn Olsen >Assignee: Bjorn Olsen >Priority: Major > > BranchPythonOperator subclasses PythonOperator and this means that after it > has selected a branch we get a message like this: > [2019-12-04 08:39:59,960] \{python_operator.py:114} INFO - Done. Returned > value was: chosen_task > > This returned value is from the execute() method. A user would expect the > returned value from this to be pushed to an XCOM but this does not happen due > to no return value from the BranchPythonOperator's execute method. > > > > If we do an XCOM push of the result then we can use the decision made by > BranchPythonOperator in downstream tasks. > Eg consider this dependency chain: > get >> branch >> [ create, update ]>> join >> execute > The 'execute' task might need to know whether the 'branch' decided to create > a new thing to run, or whether to use the existing one from the 'get'. > Without an XCOM push from the branch return value, it is difficult to pick up > the correct value later on. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (AIRFLOW-6170) BranchPythonOperator does not do XCom push of returned value
Bjorn Olsen created AIRFLOW-6170: Summary: BranchPythonOperator does not do XCom push of returned value Key: AIRFLOW-6170 URL: https://issues.apache.org/jira/browse/AIRFLOW-6170 Project: Apache Airflow Issue Type: Improvement Components: operators Affects Versions: 1.10.6 Reporter: Bjorn Olsen Assignee: Bjorn Olsen BranchPythonOperator subclasses PythonOperator and this means that after it has selected a branch we get a message like this: [2019-12-04 08:39:59,960] \{python_operator.py:114} INFO - Done. Returned value was: chosen_task This returned value is from the execute() method. A user would expect the returned value from this to be pushed to an XCOM but this does not happen due to no return value from the BranchPythonOperator. If we do an XCOM push of the result then we can use the decision made by BranchPythonOperator in downstream tasks. Eg consider this dependency chain: get >> branch >> [ create, update ]>> join >> execute The 'execute' task might need to know whether the 'branch' decided to create a new thing to run, or whether to use the existing one from the 'get'. Without an XCOM push from the branch return value, it is difficult to pick up the correct value later on. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (AIRFLOW-6075) AWS Hook: Improve documentation
Bjorn Olsen created AIRFLOW-6075: Summary: AWS Hook: Improve documentation Key: AIRFLOW-6075 URL: https://issues.apache.org/jira/browse/AIRFLOW-6075 Project: Apache Airflow Issue Type: Improvement Components: aws Affects Versions: 1.10.6 Reporter: Bjorn Olsen The AWS Hook logic is confusing and could do with some extra logging. This would help users to correctly set up their AWS connections for the first time. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (AIRFLOW-6072) aws_hook: Ability to set outbound proxy
[ https://issues.apache.org/jira/browse/AIRFLOW-6072?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bjorn Olsen updated AIRFLOW-6072: - Description: The boto3 connection used by aws_hook does not respect outbound http_proxy settings (even if these are set in system wide). The way to configure a proxy is to pass a botocore.config.Config object to boto3 when creating a client (according to this SO post). [https://stackoverflow.com/questions/33480108/how-do-you-use-an-http-https-proxy-with-boto3] While the aws_hook get_client_type() method is used extensively by AWS Operators, the "config" argument is not used by any operator. Adding a check to aws_hook for "config" in the "extra_config" of the Airflow Connection, could allow us to pass kwargs there that build the Config object automatically by the hook. Otherwise we have to update every AWS Operator to also take a "config" parameter. To set an outbound proxy is then as simple as adding this to your extra_config: {code:java} { .. , "config":{ "proxies": { "http": "http://myproxy:8080;, "https": "http://myproxy:8080; }}, .. } {code} This needs to work both for the main boto3 clients that do task work, but also during the assume_role process which also uses a boto3 client. was: The boto3 connection used by aws_hook does not respect outbound http_proxy settings (even if these are set in system wide). The way to configure a proxy is to pass a botocore.config.Config object to boto3 when creating a client (according to this SO post). [https://stackoverflow.com/questions/33480108/how-do-you-use-an-http-https-proxy-with-boto3] While the aws_hook get_client_type() method is used extensively by AWS Operators, the "config" argument is not used by any operator. Adding a check to aws_hook for "config" in the "extra_config" of the Airflow Connection, could allow us to pass kwargs there that build the Config object automatically by the hook. Otherwise we have to update every AWS Operator to also take a "config" parameter. To set an outbound proxy is then as simple as adding this to your extra_config: #{ .. , #"config":{"proxies": { #"http": "http://myproxy:8080;, #"https": "http://myproxy:8080"}}, # ..} > aws_hook: Ability to set outbound proxy > --- > > Key: AIRFLOW-6072 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6072 > Project: Apache Airflow > Issue Type: Improvement > Components: aws >Affects Versions: 1.10.6 >Reporter: Bjorn Olsen >Assignee: Bjorn Olsen >Priority: Minor > > The boto3 connection used by aws_hook does not respect outbound http_proxy > settings (even if these are set in system wide). > > The way to configure a proxy is to pass a botocore.config.Config object to > boto3 when creating a client (according to this SO post). > [https://stackoverflow.com/questions/33480108/how-do-you-use-an-http-https-proxy-with-boto3] > While the aws_hook get_client_type() method is used extensively by AWS > Operators, the "config" argument is not used by any operator. > Adding a check to aws_hook for "config" in the "extra_config" of the Airflow > Connection, could allow us to pass kwargs there that build the Config object > automatically by the hook. > Otherwise we have to update every AWS Operator to also take a "config" > parameter. > > To set an outbound proxy is then as simple as adding this to your > extra_config: > {code:java} > { .. , > "config":{ "proxies": { > "http": "http://myproxy:8080;, > "https": "http://myproxy:8080; }}, > .. } > {code} > > This needs to work both for the main boto3 clients that do task work, but > also during the assume_role process which also uses a boto3 client. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (AIRFLOW-6072) aws_hook: Ability to set outbound proxy
[ https://issues.apache.org/jira/browse/AIRFLOW-6072?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bjorn Olsen updated AIRFLOW-6072: - Description: The boto3 connection used by aws_hook does not respect outbound http_proxy settings (even if these are set in system wide). The way to configure a proxy is to pass a botocore.config.Config object to boto3 when creating a client (according to this SO post). [https://stackoverflow.com/questions/33480108/how-do-you-use-an-http-https-proxy-with-boto3] While the aws_hook get_client_type() method is used extensively by AWS Operators, the "config" argument is not used by any operator. Adding a check to aws_hook for "config" in the "extra_config" of the Airflow Connection, could allow us to pass kwargs there that build the Config object automatically by the hook. Otherwise we have to update every AWS Operator to also take a "config" parameter. To set an outbound proxy is then as simple as adding this to your extra_config: #{ .. , #"config":{"proxies": { #"http": "http://myproxy:8080;, #"https": "http://myproxy:8080"}}, # ..} was: The boto3 connection used by aws_hook does not respect outbound http_proxy settings (even if these are set in system wide). The way to configure a proxy is to pass a botocore.config.Config object to boto3 when creating a client (according to this SO post). [https://stackoverflow.com/questions/33480108/how-do-you-use-an-http-https-proxy-with-boto3] While the aws_hook get_client_type() method is used extensively by AWS Operators, the "config" argument is not used by any operator. Adding a check to aws_hook for "config" in the "extra_config" of the Airflow Connection, could allow us to pass kwargs there that build the Config object automatically by the hook is created. Otherwise we have to update every AWS Operator to also take a "config" parameter. > aws_hook: Ability to set outbound proxy > --- > > Key: AIRFLOW-6072 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6072 > Project: Apache Airflow > Issue Type: Improvement > Components: aws >Affects Versions: 1.10.6 >Reporter: Bjorn Olsen >Assignee: Bjorn Olsen >Priority: Minor > > The boto3 connection used by aws_hook does not respect outbound http_proxy > settings (even if these are set in system wide). > > The way to configure a proxy is to pass a botocore.config.Config object to > boto3 when creating a client (according to this SO post). > [https://stackoverflow.com/questions/33480108/how-do-you-use-an-http-https-proxy-with-boto3] > While the aws_hook get_client_type() method is used extensively by AWS > Operators, the "config" argument is not used by any operator. > Adding a check to aws_hook for "config" in the "extra_config" of the Airflow > Connection, could allow us to pass kwargs there that build the Config object > automatically by the hook. > Otherwise we have to update every AWS Operator to also take a "config" > parameter. > > To set an outbound proxy is then as simple as adding this to your > extra_config: > #{ .. , > #"config":{"proxies": { > #"http": "http://myproxy:8080;, > #"https": "http://myproxy:8080"}}, > # ..} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (AIRFLOW-6072) aws_hook: Ability to set outbound proxy
[ https://issues.apache.org/jira/browse/AIRFLOW-6072?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bjorn Olsen updated AIRFLOW-6072: - Priority: Minor (was: Major) > aws_hook: Ability to set outbound proxy > --- > > Key: AIRFLOW-6072 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6072 > Project: Apache Airflow > Issue Type: Improvement > Components: aws >Affects Versions: 1.10.6 >Reporter: Bjorn Olsen >Assignee: Bjorn Olsen >Priority: Minor > > The boto3 connection used by aws_hook does not respect outbound http_proxy > settings (even if these are set in system wide). > > The way to configure a proxy is to pass a botocore.config.Config object to > boto3 when creating a client (according to this SO post). > [https://stackoverflow.com/questions/33480108/how-do-you-use-an-http-https-proxy-with-boto3] > While the aws_hook get_client_type() method is used extensively by AWS > Operators, the "config" argument is not used by any operator. > Adding a check to aws_hook for "config" in the "extra_config" of the Airflow > Connection, could allow us to pass kwargs there that build the Config object > automatically by the hook is created. > Otherwise we have to update every AWS Operator to also take a "config" > parameter. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (AIRFLOW-6072) aws_hook: Ability to set outbound proxy
Bjorn Olsen created AIRFLOW-6072: Summary: aws_hook: Ability to set outbound proxy Key: AIRFLOW-6072 URL: https://issues.apache.org/jira/browse/AIRFLOW-6072 Project: Apache Airflow Issue Type: Improvement Components: aws Affects Versions: 1.10.6 Reporter: Bjorn Olsen Assignee: Bjorn Olsen The boto3 connection used by aws_hook does not respect outbound http_proxy settings (even if these are set in system wide). The way to configure a proxy is to pass a botocore.config.Config object to boto3 when creating a client (according to this SO post). [https://stackoverflow.com/questions/33480108/how-do-you-use-an-http-https-proxy-with-boto3] While the aws_hook get_client_type() method is used extensively by AWS Operators, the "config" argument is not used by any operator. Adding a check to aws_hook for "config" in the "extra_config" of the Airflow Connection, could allow us to pass kwargs there that build the Config object automatically by the hook is created. Otherwise we have to update every AWS Operator to also take a "config" parameter. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (AIRFLOW-6038) AWS DataSync example dags
Bjorn Olsen created AIRFLOW-6038: Summary: AWS DataSync example dags Key: AIRFLOW-6038 URL: https://issues.apache.org/jira/browse/AIRFLOW-6038 Project: Apache Airflow Issue Type: Improvement Components: aws, examples Affects Versions: 1.10.6 Reporter: Bjorn Olsen Assignee: Bjorn Olsen Add example_dags for AWS DataSync operators -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (AIRFLOW-5824) Add a new hook and operator for AWS DataSync
Bjorn Olsen created AIRFLOW-5824: Summary: Add a new hook and operator for AWS DataSync Key: AIRFLOW-5824 URL: https://issues.apache.org/jira/browse/AIRFLOW-5824 Project: Apache Airflow Issue Type: New Feature Components: aws Affects Versions: 1.10.5 Reporter: Bjorn Olsen Assignee: Bjorn Olsen I'm working on some code to add a hook and operator for AWS DataSync task executions. Just creating a Jira ticket for visibility -- This message was sent by Atlassian Jira (v8.3.4#803005)